utils.py 8.2 KB


  1. # mypy: allow-untyped-defs
  2. # Copyright (c) Facebook, Inc. and its affiliates.
  3. # All rights reserved.
  4. #
  5. # This source code is licensed under the BSD-style license found in the
  6. # LICENSE file in the root directory of this source tree.
  7. import ipaddress
  8. import random
  9. import re
  10. import socket
  11. import time
  12. import weakref
  13. from datetime import timedelta
  14. from threading import Event, Thread
  15. from typing import Any, Callable, Dict, Optional, Tuple, Union
  16. __all__ = ['parse_rendezvous_endpoint']
  17. def _parse_rendezvous_config(config_str: str) -> Dict[str, str]:
  18. """Extract key-value pairs from a rendezvous configuration string.
  19. Args:
  20. config_str:
  21. A string in format <key1>=<value1>,...,<keyN>=<valueN>.
  22. """
  23. config: Dict[str, str] = {}
  24. config_str = config_str.strip()
  25. if not config_str:
  26. return config
  27. key_values = config_str.split(",")
  28. for kv in key_values:
  29. key, *values = kv.split("=", 1)
  30. key = key.strip()
  31. if not key:
  32. raise ValueError(
  33. "The rendezvous configuration string must be in format "
  34. "<key1>=<value1>,...,<keyN>=<valueN>."
  35. )
  36. value: Optional[str]
  37. if values:
  38. value = values[0].strip()
  39. else:
  40. value = None
  41. if not value:
  42. raise ValueError(
  43. f"The rendezvous configuration option '{key}' must have a value specified."
  44. )
  45. config[key] = value
  46. return config
  47. def _try_parse_port(port_str: str) -> Optional[int]:
  48. """Try to extract the port number from ``port_str``."""
  49. if port_str and re.match(r"^[0-9]{1,5}$", port_str):
  50. return int(port_str)
  51. return None
  52. def parse_rendezvous_endpoint(endpoint: Optional[str], default_port: int) -> Tuple[str, int]:
  53. """Extract the hostname and the port number from a rendezvous endpoint.
  54. Args:
  55. endpoint:
  56. A string in format <hostname>[:<port>].
  57. default_port:
  58. The port number to use if the endpoint does not include one.
  59. Returns:
  60. A tuple of hostname and port number.
  61. """
  62. if endpoint is not None:
  63. endpoint = endpoint.strip()
  64. if not endpoint:
  65. return ("localhost", default_port)
  66. # An endpoint that starts and ends with brackets represents an IPv6 address.
  67. if endpoint[0] == "[" and endpoint[-1] == "]":
  68. host, *rest = endpoint, *[]
  69. else:
  70. host, *rest = endpoint.rsplit(":", 1)
  71. # Sanitize the IPv6 address.
  72. if len(host) > 1 and host[0] == "[" and host[-1] == "]":
  73. host = host[1:-1]
  74. if len(rest) == 1:
  75. port = _try_parse_port(rest[0])
  76. if port is None or port >= 2 ** 16:
  77. raise ValueError(
  78. f"The port number of the rendezvous endpoint '{endpoint}' must be an integer "
  79. "between 0 and 65536."
  80. )
  81. else:
  82. port = default_port
  83. if not re.match(r"^[\w\.:-]+$", host):
  84. raise ValueError(
  85. f"The hostname of the rendezvous endpoint '{endpoint}' must be a dot-separated list of "
  86. "labels, an IPv4 address, or an IPv6 address."
  87. )
  88. return host, port
  89. def _matches_machine_hostname(host: str) -> bool:
  90. """Indicate whether ``host`` matches the hostname of this machine.
  91. This function compares ``host`` to the hostname as well as to the IP
  92. addresses of this machine. Note that it may return a false negative if this
  93. machine has CNAME records beyond its FQDN or IP addresses assigned to
  94. secondary NICs.
  95. """
  96. if host == "localhost":
  97. return True
  98. try:
  99. addr = ipaddress.ip_address(host)
  100. except ValueError:
  101. addr = None
  102. if addr and addr.is_loopback:
  103. return True
  104. try:
  105. host_addr_list = socket.getaddrinfo(
  106. host, None, proto=socket.IPPROTO_TCP, flags=socket.AI_CANONNAME
  107. )
  108. except (ValueError, socket.gaierror) as _:
  109. host_addr_list = []
  110. host_ip_list = [
  111. host_addr_info[4][0]
  112. for host_addr_info in host_addr_list
  113. ]
  114. this_host = socket.gethostname()
  115. if host == this_host:
  116. return True
  117. addr_list = socket.getaddrinfo(
  118. this_host, None, proto=socket.IPPROTO_TCP, flags=socket.AI_CANONNAME
  119. )
  120. for addr_info in addr_list:
  121. # If we have an FQDN in the addr_info, compare it to `host`.
  122. if addr_info[3] and addr_info[3] == host:
  123. return True
  124. # Otherwise if `host` represents an IP address, compare it to our IP
  125. # address.
  126. if addr and addr_info[4][0] == str(addr):
  127. return True
  128. # If the IP address matches one of the provided host's IP addresses
  129. if addr_info[4][0] in host_ip_list:
  130. return True
  131. return False
  132. def _delay(seconds: Union[float, Tuple[float, float]]) -> None:
  133. """Suspend the current thread for ``seconds``.
  134. Args:
  135. seconds:
  136. Either the delay, in seconds, or a tuple of a lower and an upper
  137. bound within which a random delay will be picked.
  138. """
  139. if isinstance(seconds, tuple):
  140. seconds = random.uniform(*seconds)
  141. # Ignore delay requests that are less than 10 milliseconds.
  142. if seconds >= 0.01:
  143. time.sleep(seconds)
  144. class _PeriodicTimer:
  145. """Represent a timer that periodically runs a specified function.
  146. Args:
  147. interval:
  148. The interval, in seconds, between each run.
  149. function:
  150. The function to run.
  151. """
  152. # The state of the timer is hold in a separate context object to avoid a
  153. # reference cycle between the timer and the background thread.
  154. class _Context:
  155. interval: float
  156. function: Callable[..., None]
  157. args: Tuple[Any, ...]
  158. kwargs: Dict[str, Any]
  159. stop_event: Event
  160. _name: Optional[str]
  161. _thread: Optional[Thread]
  162. _finalizer: Optional[weakref.finalize]
  163. # The context that is shared between the timer and the background thread.
  164. _ctx: _Context
  165. def __init__(
  166. self,
  167. interval: timedelta,
  168. function: Callable[..., None],
  169. *args: Any,
  170. **kwargs: Any,
  171. ) -> None:
  172. self._name = None
  173. self._ctx = self._Context()
  174. self._ctx.interval = interval.total_seconds()
  175. self._ctx.function = function # type: ignore[assignment]
  176. self._ctx.args = args or ()
  177. self._ctx.kwargs = kwargs or {}
  178. self._ctx.stop_event = Event()
  179. self._thread = None
  180. self._finalizer = None
  181. @property
  182. def name(self) -> Optional[str]:
  183. """Get the name of the timer."""
  184. return self._name
  185. def set_name(self, name: str) -> None:
  186. """Set the name of the timer.
  187. The specified name will be assigned to the background thread and serves
  188. for debugging and troubleshooting purposes.
  189. """
  190. if self._thread:
  191. raise RuntimeError("The timer has already started.")
  192. self._name = name
  193. def start(self) -> None:
  194. """Start the timer."""
  195. if self._thread:
  196. raise RuntimeError("The timer has already started.")
  197. self._thread = Thread(
  198. target=self._run, name=self._name or "PeriodicTimer", args=(self._ctx,), daemon=True
  199. )
  200. # We avoid using a regular finalizer (a.k.a. __del__) for stopping the
  201. # timer as joining a daemon thread during the interpreter shutdown can
  202. # cause deadlocks. The weakref.finalize is a superior alternative that
  203. # provides a consistent behavior regardless of the GC implementation.
  204. self._finalizer = weakref.finalize(
  205. self, self._stop_thread, self._thread, self._ctx.stop_event
  206. )
  207. # We do not attempt to stop our background thread during the interpreter
  208. # shutdown. At that point we do not even know whether it still exists.
  209. self._finalizer.atexit = False
  210. self._thread.start()
  211. def cancel(self) -> None:
  212. """Stop the timer at the next opportunity."""
  213. if self._finalizer:
  214. self._finalizer()
  215. @staticmethod
  216. def _run(ctx) -> None:
  217. while not ctx.stop_event.wait(ctx.interval):
  218. ctx.function(*ctx.args, **ctx.kwargs)
  219. @staticmethod
  220. def _stop_thread(thread, stop_event):
  221. stop_event.set()
  222. thread.join()