http11.py 13 KB


  1. from __future__ import annotations
  2. import enum
  3. import logging
  4. import ssl
  5. import time
  6. import types
  7. import typing
  8. import h11
  9. from .._backends.base import NetworkStream
  10. from .._exceptions import (
  11. ConnectionNotAvailable,
  12. LocalProtocolError,
  13. RemoteProtocolError,
  14. WriteError,
  15. map_exceptions,
  16. )
  17. from .._models import Origin, Request, Response
  18. from .._synchronization import Lock, ShieldCancellation
  19. from .._trace import Trace
  20. from .interfaces import ConnectionInterface
  21. logger = logging.getLogger("httpcore.http11")
  22. # A subset of `h11.Event` types supported by `_send_event`
  23. H11SendEvent = typing.Union[
  24. h11.Request,
  25. h11.Data,
  26. h11.EndOfMessage,
  27. ]
  28. class HTTPConnectionState(enum.IntEnum):
  29. NEW = 0
  30. ACTIVE = 1
  31. IDLE = 2
  32. CLOSED = 3
  33. class HTTP11Connection(ConnectionInterface):
  34. READ_NUM_BYTES = 64 * 1024
  35. MAX_INCOMPLETE_EVENT_SIZE = 100 * 1024
  36. def __init__(
  37. self,
  38. origin: Origin,
  39. stream: NetworkStream,
  40. keepalive_expiry: float | None = None,
  41. ) -> None:
  42. self._origin = origin
  43. self._network_stream = stream
  44. self._keepalive_expiry: float | None = keepalive_expiry
  45. self._expire_at: float | None = None
  46. self._state = HTTPConnectionState.NEW
  47. self._state_lock = Lock()
  48. self._request_count = 0
  49. self._h11_state = h11.Connection(
  50. our_role=h11.CLIENT,
  51. max_incomplete_event_size=self.MAX_INCOMPLETE_EVENT_SIZE,
  52. )
  53. def handle_request(self, request: Request) -> Response:
  54. if not self.can_handle_request(request.url.origin):
  55. raise RuntimeError(
  56. f"Attempted to send request to {request.url.origin} on connection "
  57. f"to {self._origin}"
  58. )
  59. with self._state_lock:
  60. if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
  61. self._request_count += 1
  62. self._state = HTTPConnectionState.ACTIVE
  63. self._expire_at = None
  64. else:
  65. raise ConnectionNotAvailable()
  66. try:
  67. kwargs = {"request": request}
  68. try:
  69. with Trace(
  70. "send_request_headers", logger, request, kwargs
  71. ) as trace:
  72. self._send_request_headers(**kwargs)
  73. with Trace("send_request_body", logger, request, kwargs) as trace:
  74. self._send_request_body(**kwargs)
  75. except WriteError:
  76. # If we get a write error while we're writing the request,
  77. # then we supress this error and move on to attempting to
  78. # read the response. Servers can sometimes close the request
  79. # pre-emptively and then respond with a well formed HTTP
  80. # error response.
  81. pass
  82. with Trace(
  83. "receive_response_headers", logger, request, kwargs
  84. ) as trace:
  85. (
  86. http_version,
  87. status,
  88. reason_phrase,
  89. headers,
  90. trailing_data,
  91. ) = self._receive_response_headers(**kwargs)
  92. trace.return_value = (
  93. http_version,
  94. status,
  95. reason_phrase,
  96. headers,
  97. )
  98. network_stream = self._network_stream
  99. # CONNECT or Upgrade request
  100. if (status == 101) or (
  101. (request.method == b"CONNECT") and (200 <= status < 300)
  102. ):
  103. network_stream = HTTP11UpgradeStream(network_stream, trailing_data)
  104. return Response(
  105. status=status,
  106. headers=headers,
  107. content=HTTP11ConnectionByteStream(self, request),
  108. extensions={
  109. "http_version": http_version,
  110. "reason_phrase": reason_phrase,
  111. "network_stream": network_stream,
  112. },
  113. )
  114. except BaseException as exc:
  115. with ShieldCancellation():
  116. with Trace("response_closed", logger, request) as trace:
  117. self._response_closed()
  118. raise exc
  119. # Sending the request...
  120. def _send_request_headers(self, request: Request) -> None:
  121. timeouts = request.extensions.get("timeout", {})
  122. timeout = timeouts.get("write", None)
  123. with map_exceptions({h11.LocalProtocolError: LocalProtocolError}):
  124. event = h11.Request(
  125. method=request.method,
  126. target=request.url.target,
  127. headers=request.headers,
  128. )
  129. self._send_event(event, timeout=timeout)
  130. def _send_request_body(self, request: Request) -> None:
  131. timeouts = request.extensions.get("timeout", {})
  132. timeout = timeouts.get("write", None)
  133. assert isinstance(request.stream, typing.Iterable)
  134. for chunk in request.stream:
  135. event = h11.Data(data=chunk)
  136. self._send_event(event, timeout=timeout)
  137. self._send_event(h11.EndOfMessage(), timeout=timeout)
  138. def _send_event(self, event: h11.Event, timeout: float | None = None) -> None:
  139. bytes_to_send = self._h11_state.send(event)
  140. if bytes_to_send is not None:
  141. self._network_stream.write(bytes_to_send, timeout=timeout)
  142. # Receiving the response...
  143. def _receive_response_headers(
  144. self, request: Request
  145. ) -> tuple[bytes, int, bytes, list[tuple[bytes, bytes]], bytes]:
  146. timeouts = request.extensions.get("timeout", {})
  147. timeout = timeouts.get("read", None)
  148. while True:
  149. event = self._receive_event(timeout=timeout)
  150. if isinstance(event, h11.Response):
  151. break
  152. if (
  153. isinstance(event, h11.InformationalResponse)
  154. and event.status_code == 101
  155. ):
  156. break
  157. http_version = b"HTTP/" + event.http_version
  158. # h11 version 0.11+ supports a `raw_items` interface to get the
  159. # raw header casing, rather than the enforced lowercase headers.
  160. headers = event.headers.raw_items()
  161. trailing_data, _ = self._h11_state.trailing_data
  162. return http_version, event.status_code, event.reason, headers, trailing_data
  163. def _receive_response_body(
  164. self, request: Request
  165. ) -> typing.Iterator[bytes]:
  166. timeouts = request.extensions.get("timeout", {})
  167. timeout = timeouts.get("read", None)
  168. while True:
  169. event = self._receive_event(timeout=timeout)
  170. if isinstance(event, h11.Data):
  171. yield bytes(event.data)
  172. elif isinstance(event, (h11.EndOfMessage, h11.PAUSED)):
  173. break
  174. def _receive_event(
  175. self, timeout: float | None = None
  176. ) -> h11.Event | type[h11.PAUSED]:
  177. while True:
  178. with map_exceptions({h11.RemoteProtocolError: RemoteProtocolError}):
  179. event = self._h11_state.next_event()
  180. if event is h11.NEED_DATA:
  181. data = self._network_stream.read(
  182. self.READ_NUM_BYTES, timeout=timeout
  183. )
  184. # If we feed this case through h11 we'll raise an exception like:
  185. #
  186. # httpcore.RemoteProtocolError: can't handle event type
  187. # ConnectionClosed when role=SERVER and state=SEND_RESPONSE
  188. #
  189. # Which is accurate, but not very informative from an end-user
  190. # perspective. Instead we handle this case distinctly and treat
  191. # it as a ConnectError.
  192. if data == b"" and self._h11_state.their_state == h11.SEND_RESPONSE:
  193. msg = "Server disconnected without sending a response."
  194. raise RemoteProtocolError(msg)
  195. self._h11_state.receive_data(data)
  196. else:
  197. # mypy fails to narrow the type in the above if statement above
  198. return event # type: ignore[return-value]
  199. def _response_closed(self) -> None:
  200. with self._state_lock:
  201. if (
  202. self._h11_state.our_state is h11.DONE
  203. and self._h11_state.their_state is h11.DONE
  204. ):
  205. self._state = HTTPConnectionState.IDLE
  206. self._h11_state.start_next_cycle()
  207. if self._keepalive_expiry is not None:
  208. now = time.monotonic()
  209. self._expire_at = now + self._keepalive_expiry
  210. else:
  211. self.close()
  212. # Once the connection is no longer required...
  213. def close(self) -> None:
  214. # Note that this method unilaterally closes the connection, and does
  215. # not have any kind of locking in place around it.
  216. self._state = HTTPConnectionState.CLOSED
  217. self._network_stream.close()
  218. # The ConnectionInterface methods provide information about the state of
  219. # the connection, allowing for a connection pooling implementation to
  220. # determine when to reuse and when to close the connection...
  221. def can_handle_request(self, origin: Origin) -> bool:
  222. return origin == self._origin
  223. def is_available(self) -> bool:
  224. # Note that HTTP/1.1 connections in the "NEW" state are not treated as
  225. # being "available". The control flow which created the connection will
  226. # be able to send an outgoing request, but the connection will not be
  227. # acquired from the connection pool for any other request.
  228. return self._state == HTTPConnectionState.IDLE
  229. def has_expired(self) -> bool:
  230. now = time.monotonic()
  231. keepalive_expired = self._expire_at is not None and now > self._expire_at
  232. # If the HTTP connection is idle but the socket is readable, then the
  233. # only valid state is that the socket is about to return b"", indicating
  234. # a server-initiated disconnect.
  235. server_disconnected = (
  236. self._state == HTTPConnectionState.IDLE
  237. and self._network_stream.get_extra_info("is_readable")
  238. )
  239. return keepalive_expired or server_disconnected
  240. def is_idle(self) -> bool:
  241. return self._state == HTTPConnectionState.IDLE
  242. def is_closed(self) -> bool:
  243. return self._state == HTTPConnectionState.CLOSED
  244. def info(self) -> str:
  245. origin = str(self._origin)
  246. return (
  247. f"{origin!r}, HTTP/1.1, {self._state.name}, "
  248. f"Request Count: {self._request_count}"
  249. )
  250. def __repr__(self) -> str:
  251. class_name = self.__class__.__name__
  252. origin = str(self._origin)
  253. return (
  254. f"<{class_name} [{origin!r}, {self._state.name}, "
  255. f"Request Count: {self._request_count}]>"
  256. )
  257. # These context managers are not used in the standard flow, but are
  258. # useful for testing or working with connection instances directly.
  259. def __enter__(self) -> HTTP11Connection:
  260. return self
  261. def __exit__(
  262. self,
  263. exc_type: type[BaseException] | None = None,
  264. exc_value: BaseException | None = None,
  265. traceback: types.TracebackType | None = None,
  266. ) -> None:
  267. self.close()
  268. class HTTP11ConnectionByteStream:
  269. def __init__(self, connection: HTTP11Connection, request: Request) -> None:
  270. self._connection = connection
  271. self._request = request
  272. self._closed = False
  273. def __iter__(self) -> typing.Iterator[bytes]:
  274. kwargs = {"request": self._request}
  275. try:
  276. with Trace("receive_response_body", logger, self._request, kwargs):
  277. for chunk in self._connection._receive_response_body(**kwargs):
  278. yield chunk
  279. except BaseException as exc:
  280. # If we get an exception while streaming the response,
  281. # we want to close the response (and possibly the connection)
  282. # before raising that exception.
  283. with ShieldCancellation():
  284. self.close()
  285. raise exc
  286. def close(self) -> None:
  287. if not self._closed:
  288. self._closed = True
  289. with Trace("response_closed", logger, self._request):
  290. self._connection._response_closed()
  291. class HTTP11UpgradeStream(NetworkStream):
  292. def __init__(self, stream: NetworkStream, leading_data: bytes) -> None:
  293. self._stream = stream
  294. self._leading_data = leading_data
  295. def read(self, max_bytes: int, timeout: float | None = None) -> bytes:
  296. if self._leading_data:
  297. buffer = self._leading_data[:max_bytes]
  298. self._leading_data = self._leading_data[max_bytes:]
  299. return buffer
  300. else:
  301. return self._stream.read(max_bytes, timeout)
  302. def write(self, buffer: bytes, timeout: float | None = None) -> None:
  303. self._stream.write(buffer, timeout)
  304. def close(self) -> None:
  305. self._stream.close()
  306. def start_tls(
  307. self,
  308. ssl_context: ssl.SSLContext,
  309. server_hostname: str | None = None,
  310. timeout: float | None = None,
  311. ) -> NetworkStream:
  312. return self._stream.start_tls(ssl_context, server_hostname, timeout)
  313. def get_extra_info(self, info: str) -> typing.Any:
  314. return self._stream.get_extra_info(info)