Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update http11.py #981

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 86 additions & 96 deletions httpcore/_async/http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
WriteError,
map_exceptions,
)
from .._models import Origin, Request, Response
from .._synchronization import AsyncLock, AsyncShieldCancellation
from .._models import Origin, Request
from .._synchronization import AsyncSemaphore
from .._trace import Trace
from .interfaces import AsyncConnectionInterface
from .interfaces import AsyncConnectionInterface, StartResponse

logger = logging.getLogger("httpcore.http11")

Expand Down Expand Up @@ -55,85 +55,103 @@ def __init__(
self._keepalive_expiry: float | None = keepalive_expiry
self._expire_at: float | None = None
self._state = HTTPConnectionState.NEW
self._state_lock = AsyncLock()
self._request_lock = AsyncSemaphore(bound=1)
self._request_count = 0
self._h11_state = h11.Connection(
our_role=h11.CLIENT,
max_incomplete_event_size=self.MAX_INCOMPLETE_EVENT_SIZE,
)

async def handle_async_request(self, request: Request) -> Response:
iterator = self.iterate_response(request)
resp = await anext(iterator)
return Response(
status=resp.status,
headers=resp.headers,
content=iterator,
extensions=resp.extensions,
)

async def iterate_response(
self, request: Request
) -> typing.AsyncIterator[StartResponse | bytes]:
if not self.can_handle_request(request.url.origin):
raise RuntimeError(
f"Attempted to send request to {request.url.origin} on connection "
f"to {self._origin}"
)

async with self._state_lock:
async with self._request_lock:
if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
self._request_count += 1
self._state = HTTPConnectionState.ACTIVE
self._expire_at = None
else:
raise ConnectionNotAvailable()

try:
kwargs = {"request": request}
try:
kwargs = {"request": request}
try:
async with Trace(
"send_request_headers", logger, request, kwargs
) as trace:
await self._send_request_headers(**kwargs)
async with Trace(
"send_request_body", logger, request, kwargs
) as trace:
await self._send_request_body(**kwargs)
except WriteError:
# If we get a write error while we're writing the request,
# then we supress this error and move on to attempting to
# read the response. Servers can sometimes close the request
# pre-emptively and then respond with a well formed HTTP
# error response.
pass

async with Trace(
"send_request_headers", logger, request, kwargs
"receive_response_headers", logger, request, kwargs
) as trace:
await self._send_request_headers(**kwargs)
async with Trace("send_request_body", logger, request, kwargs) as trace:
await self._send_request_body(**kwargs)
except WriteError:
# If we get a write error while we're writing the request,
# then we supress this error and move on to attempting to
# read the response. Servers can sometimes close the request
# pre-emptively and then respond with a well formed HTTP
# error response.
pass

async with Trace(
"receive_response_headers", logger, request, kwargs
) as trace:
(
http_version,
status,
reason_phrase,
headers,
trailing_data,
) = await self._receive_response_headers(**kwargs)
trace.return_value = (
http_version,
status,
reason_phrase,
headers,
(
http_version,
status,
reason_phrase,
headers,
trailing_data,
) = await self._receive_response_headers(**kwargs)
trace.return_value = (
http_version,
status,
reason_phrase,
headers,
)

network_stream = self._network_stream

# CONNECT or Upgrade request
if (status == 101) or (
(request.method == b"CONNECT") and (200 <= status < 300)
):
network_stream = AsyncHTTP11UpgradeStream(
network_stream, trailing_data
)

yield Response(
status=status,
headers=headers,
extensions={
"http_version": http_version,
"reason_phrase": reason_phrase,
"network_stream": network_stream,
},
)

network_stream = self._network_stream

# CONNECT or Upgrade request
if (status == 101) or (
(request.method == b"CONNECT") and (200 <= status < 300)
):
network_stream = AsyncHTTP11UpgradeStream(network_stream, trailing_data)

return Response(
status=status,
headers=headers,
content=HTTP11ConnectionByteStream(self, request),
extensions={
"http_version": http_version,
"reason_phrase": reason_phrase,
"network_stream": network_stream,
},
)
except BaseException as exc:
with AsyncShieldCancellation():
async with Trace("receive_response_body", logger, request, kwargs):
async for chunk in self._receive_response_body(**kwargs):
yield chunk
finally:
await self._response_closed()
async with Trace("response_closed", logger, request) as trace:
await self._response_closed()
raise exc
if self.is_closed():
await self.aclose()

# Sending the request...

Expand Down Expand Up @@ -236,18 +254,17 @@ async def _receive_event(
return event # type: ignore[return-value]

async def _response_closed(self) -> None:
async with self._state_lock:
if (
self._h11_state.our_state is h11.DONE
and self._h11_state.their_state is h11.DONE
):
self._state = HTTPConnectionState.IDLE
self._h11_state.start_next_cycle()
if self._keepalive_expiry is not None:
now = time.monotonic()
self._expire_at = now + self._keepalive_expiry
else:
await self.aclose()
if (
self._h11_state.our_state is h11.DONE
and self._h11_state.their_state is h11.DONE
):
self._state = HTTPConnectionState.IDLE
self._h11_state.start_next_cycle()
if self._keepalive_expiry is not None:
now = time.monotonic()
self._expire_at = now + self._keepalive_expiry
else:
self._state = HTTPConnectionState.CLOSED

# Once the connection is no longer required...

Expand Down Expand Up @@ -321,33 +338,6 @@ async def __aexit__(
await self.aclose()


class HTTP11ConnectionByteStream:
def __init__(self, connection: AsyncHTTP11Connection, request: Request) -> None:
self._connection = connection
self._request = request
self._closed = False

async def __aiter__(self) -> typing.AsyncIterator[bytes]:
kwargs = {"request": self._request}
try:
async with Trace("receive_response_body", logger, self._request, kwargs):
async for chunk in self._connection._receive_response_body(**kwargs):
yield chunk
except BaseException as exc:
# If we get an exception while streaming the response,
# we want to close the response (and possibly the connection)
# before raising that exception.
with AsyncShieldCancellation():
await self.aclose()
raise exc

async def aclose(self) -> None:
if not self._closed:
self._closed = True
async with Trace("response_closed", logger, self._request):
await self._connection._response_closed()


class AsyncHTTP11UpgradeStream(AsyncNetworkStream):
def __init__(self, stream: AsyncNetworkStream, leading_data: bytes) -> None:
self._stream = stream
Expand Down
Loading