_tasks.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. from __future__ import annotations
  2. import sys
  3. from abc import ABCMeta, abstractmethod
  4. from collections.abc import Awaitable, Callable
  5. from types import TracebackType
  6. from typing import TYPE_CHECKING, Any, Protocol, TypeVar, overload
  7. if sys.version_info >= (3, 11):
  8. from typing import TypeVarTuple, Unpack
  9. else:
  10. from typing_extensions import TypeVarTuple, Unpack
  11. if TYPE_CHECKING:
  12. from .._core._tasks import CancelScope
  13. T_Retval = TypeVar("T_Retval")
  14. T_contra = TypeVar("T_contra", contravariant=True)
  15. PosArgsT = TypeVarTuple("PosArgsT")
  16. class TaskStatus(Protocol[T_contra]):
  17. @overload
  18. def started(self: TaskStatus[None]) -> None: ...
  19. @overload
  20. def started(self, value: T_contra) -> None: ...
  21. def started(self, value: T_contra | None = None) -> None:
  22. """
  23. Signal that the task has started.
  24. :param value: object passed back to the starter of the task
  25. """
  26. class TaskGroup(metaclass=ABCMeta):
  27. """
  28. Groups several asynchronous tasks together.
  29. :ivar cancel_scope: the cancel scope inherited by all child tasks
  30. :vartype cancel_scope: CancelScope
  31. """
  32. cancel_scope: CancelScope
  33. @abstractmethod
  34. def start_soon(
  35. self,
  36. func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
  37. *args: Unpack[PosArgsT],
  38. name: object = None,
  39. ) -> None:
  40. """
  41. Start a new task in this task group.
  42. :param func: a coroutine function
  43. :param args: positional arguments to call the function with
  44. :param name: name of the task, for the purposes of introspection and debugging
  45. .. versionadded:: 3.0
  46. """
  47. @abstractmethod
  48. async def start(
  49. self,
  50. func: Callable[..., Awaitable[Any]],
  51. *args: object,
  52. name: object = None,
  53. ) -> Any:
  54. """
  55. Start a new task and wait until it signals for readiness.
  56. :param func: a coroutine function
  57. :param args: positional arguments to call the function with
  58. :param name: name of the task, for the purposes of introspection and debugging
  59. :return: the value passed to ``task_status.started()``
  60. :raises RuntimeError: if the task finishes without calling
  61. ``task_status.started()``
  62. .. versionadded:: 3.0
  63. """
  64. @abstractmethod
  65. async def __aenter__(self) -> TaskGroup:
  66. """Enter the task group context and allow starting new tasks."""
  67. @abstractmethod
  68. async def __aexit__(
  69. self,
  70. exc_type: type[BaseException] | None,
  71. exc_val: BaseException | None,
  72. exc_tb: TracebackType | None,
  73. ) -> bool | None:
  74. """Exit the task group context waiting for all tasks to finish."""