_eventloop.py 9.4 KB


  1. from __future__ import annotations
  2. import math
  3. import sys
  4. from abc import ABCMeta, abstractmethod
  5. from collections.abc import AsyncIterator, Awaitable
  6. from os import PathLike
  7. from signal import Signals
  8. from socket import AddressFamily, SocketKind, socket
  9. from typing import (
  10. IO,
  11. TYPE_CHECKING,
  12. Any,
  13. Callable,
  14. ContextManager,
  15. Sequence,
  16. TypeVar,
  17. Union,
  18. overload,
  19. )
  20. if sys.version_info >= (3, 11):
  21. from typing import TypeVarTuple, Unpack
  22. else:
  23. from typing_extensions import TypeVarTuple, Unpack
  24. if sys.version_info >= (3, 10):
  25. from typing import TypeAlias
  26. else:
  27. from typing_extensions import TypeAlias
  28. if TYPE_CHECKING:
  29. from .._core._synchronization import CapacityLimiter, Event, Lock, Semaphore
  30. from .._core._tasks import CancelScope
  31. from .._core._testing import TaskInfo
  32. from ..from_thread import BlockingPortal
  33. from ._sockets import (
  34. ConnectedUDPSocket,
  35. ConnectedUNIXDatagramSocket,
  36. IPSockAddrType,
  37. SocketListener,
  38. SocketStream,
  39. UDPSocket,
  40. UNIXDatagramSocket,
  41. UNIXSocketStream,
  42. )
  43. from ._subprocesses import Process
  44. from ._tasks import TaskGroup
  45. from ._testing import TestRunner
  46. T_Retval = TypeVar("T_Retval")
  47. PosArgsT = TypeVarTuple("PosArgsT")
  48. StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"]
  49. class AsyncBackend(metaclass=ABCMeta):
  50. @classmethod
  51. @abstractmethod
  52. def run(
  53. cls,
  54. func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
  55. args: tuple[Unpack[PosArgsT]],
  56. kwargs: dict[str, Any],
  57. options: dict[str, Any],
  58. ) -> T_Retval:
  59. """
  60. Run the given coroutine function in an asynchronous event loop.
  61. The current thread must not be already running an event loop.
  62. :param func: a coroutine function
  63. :param args: positional arguments to ``func``
  64. :param kwargs: positional arguments to ``func``
  65. :param options: keyword arguments to call the backend ``run()`` implementation
  66. with
  67. :return: the return value of the coroutine function
  68. """
  69. @classmethod
  70. @abstractmethod
  71. def current_token(cls) -> object:
  72. """
  73. :return:
  74. """
  75. @classmethod
  76. @abstractmethod
  77. def current_time(cls) -> float:
  78. """
  79. Return the current value of the event loop's internal clock.
  80. :return: the clock value (seconds)
  81. """
  82. @classmethod
  83. @abstractmethod
  84. def cancelled_exception_class(cls) -> type[BaseException]:
  85. """Return the exception class that is raised in a task if it's cancelled."""
  86. @classmethod
  87. @abstractmethod
  88. async def checkpoint(cls) -> None:
  89. """
  90. Check if the task has been cancelled, and allow rescheduling of other tasks.
  91. This is effectively the same as running :meth:`checkpoint_if_cancelled` and then
  92. :meth:`cancel_shielded_checkpoint`.
  93. """
  94. @classmethod
  95. async def checkpoint_if_cancelled(cls) -> None:
  96. """
  97. Check if the current task group has been cancelled.
  98. This will check if the task has been cancelled, but will not allow other tasks
  99. to be scheduled if not.
  100. """
  101. if cls.current_effective_deadline() == -math.inf:
  102. await cls.checkpoint()
  103. @classmethod
  104. async def cancel_shielded_checkpoint(cls) -> None:
  105. """
  106. Allow the rescheduling of other tasks.
  107. This will give other tasks the opportunity to run, but without checking if the
  108. current task group has been cancelled, unlike with :meth:`checkpoint`.
  109. """
  110. with cls.create_cancel_scope(shield=True):
  111. await cls.sleep(0)
  112. @classmethod
  113. @abstractmethod
  114. async def sleep(cls, delay: float) -> None:
  115. """
  116. Pause the current task for the specified duration.
  117. :param delay: the duration, in seconds
  118. """
  119. @classmethod
  120. @abstractmethod
  121. def create_cancel_scope(
  122. cls, *, deadline: float = math.inf, shield: bool = False
  123. ) -> CancelScope:
  124. pass
  125. @classmethod
  126. @abstractmethod
  127. def current_effective_deadline(cls) -> float:
  128. """
  129. Return the nearest deadline among all the cancel scopes effective for the
  130. current task.
  131. :return:
  132. - a clock value from the event loop's internal clock
  133. - ``inf`` if there is no deadline in effect
  134. - ``-inf`` if the current scope has been cancelled
  135. :rtype: float
  136. """
  137. @classmethod
  138. @abstractmethod
  139. def create_task_group(cls) -> TaskGroup:
  140. pass
  141. @classmethod
  142. @abstractmethod
  143. def create_event(cls) -> Event:
  144. pass
  145. @classmethod
  146. @abstractmethod
  147. def create_lock(cls, *, fast_acquire: bool) -> Lock:
  148. pass
  149. @classmethod
  150. @abstractmethod
  151. def create_semaphore(
  152. cls,
  153. initial_value: int,
  154. *,
  155. max_value: int | None = None,
  156. fast_acquire: bool = False,
  157. ) -> Semaphore:
  158. pass
  159. @classmethod
  160. @abstractmethod
  161. def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter:
  162. pass
  163. @classmethod
  164. @abstractmethod
  165. async def run_sync_in_worker_thread(
  166. cls,
  167. func: Callable[[Unpack[PosArgsT]], T_Retval],
  168. args: tuple[Unpack[PosArgsT]],
  169. abandon_on_cancel: bool = False,
  170. limiter: CapacityLimiter | None = None,
  171. ) -> T_Retval:
  172. pass
  173. @classmethod
  174. @abstractmethod
  175. def check_cancelled(cls) -> None:
  176. pass
  177. @classmethod
  178. @abstractmethod
  179. def run_async_from_thread(
  180. cls,
  181. func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
  182. args: tuple[Unpack[PosArgsT]],
  183. token: object,
  184. ) -> T_Retval:
  185. pass
  186. @classmethod
  187. @abstractmethod
  188. def run_sync_from_thread(
  189. cls,
  190. func: Callable[[Unpack[PosArgsT]], T_Retval],
  191. args: tuple[Unpack[PosArgsT]],
  192. token: object,
  193. ) -> T_Retval:
  194. pass
  195. @classmethod
  196. @abstractmethod
  197. def create_blocking_portal(cls) -> BlockingPortal:
  198. pass
  199. @classmethod
  200. @abstractmethod
  201. async def open_process(
  202. cls,
  203. command: StrOrBytesPath | Sequence[StrOrBytesPath],
  204. *,
  205. stdin: int | IO[Any] | None,
  206. stdout: int | IO[Any] | None,
  207. stderr: int | IO[Any] | None,
  208. **kwargs: Any,
  209. ) -> Process:
  210. pass
  211. @classmethod
  212. @abstractmethod
  213. def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None:
  214. pass
  215. @classmethod
  216. @abstractmethod
  217. async def connect_tcp(
  218. cls, host: str, port: int, local_address: IPSockAddrType | None = None
  219. ) -> SocketStream:
  220. pass
  221. @classmethod
  222. @abstractmethod
  223. async def connect_unix(cls, path: str | bytes) -> UNIXSocketStream:
  224. pass
  225. @classmethod
  226. @abstractmethod
  227. def create_tcp_listener(cls, sock: socket) -> SocketListener:
  228. pass
  229. @classmethod
  230. @abstractmethod
  231. def create_unix_listener(cls, sock: socket) -> SocketListener:
  232. pass
  233. @classmethod
  234. @abstractmethod
  235. async def create_udp_socket(
  236. cls,
  237. family: AddressFamily,
  238. local_address: IPSockAddrType | None,
  239. remote_address: IPSockAddrType | None,
  240. reuse_port: bool,
  241. ) -> UDPSocket | ConnectedUDPSocket:
  242. pass
  243. @classmethod
  244. @overload
  245. async def create_unix_datagram_socket(
  246. cls, raw_socket: socket, remote_path: None
  247. ) -> UNIXDatagramSocket: ...
  248. @classmethod
  249. @overload
  250. async def create_unix_datagram_socket(
  251. cls, raw_socket: socket, remote_path: str | bytes
  252. ) -> ConnectedUNIXDatagramSocket: ...
  253. @classmethod
  254. @abstractmethod
  255. async def create_unix_datagram_socket(
  256. cls, raw_socket: socket, remote_path: str | bytes | None
  257. ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket:
  258. pass
  259. @classmethod
  260. @abstractmethod
  261. async def getaddrinfo(
  262. cls,
  263. host: bytes | str | None,
  264. port: str | int | None,
  265. *,
  266. family: int | AddressFamily = 0,
  267. type: int | SocketKind = 0,
  268. proto: int = 0,
  269. flags: int = 0,
  270. ) -> list[
  271. tuple[
  272. AddressFamily,
  273. SocketKind,
  274. int,
  275. str,
  276. tuple[str, int] | tuple[str, int, int, int],
  277. ]
  278. ]:
  279. pass
  280. @classmethod
  281. @abstractmethod
  282. async def getnameinfo(
  283. cls, sockaddr: IPSockAddrType, flags: int = 0
  284. ) -> tuple[str, str]:
  285. pass
  286. @classmethod
  287. @abstractmethod
  288. async def wait_socket_readable(cls, sock: socket) -> None:
  289. pass
  290. @classmethod
  291. @abstractmethod
  292. async def wait_socket_writable(cls, sock: socket) -> None:
  293. pass
  294. @classmethod
  295. @abstractmethod
  296. def current_default_thread_limiter(cls) -> CapacityLimiter:
  297. pass
  298. @classmethod
  299. @abstractmethod
  300. def open_signal_receiver(
  301. cls, *signals: Signals
  302. ) -> ContextManager[AsyncIterator[Signals]]:
  303. pass
  304. @classmethod
  305. @abstractmethod
  306. def get_current_task(cls) -> TaskInfo:
  307. pass
  308. @classmethod
  309. @abstractmethod
  310. def get_running_tasks(cls) -> Sequence[TaskInfo]:
  311. pass
  312. @classmethod
  313. @abstractmethod
  314. async def wait_all_tasks_blocked(cls) -> None:
  315. pass
  316. @classmethod
  317. @abstractmethod
  318. def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
  319. pass