dynamic_rendezvous.py 45 KB


  1. # mypy: allow-untyped-defs
  2. # Copyright (c) Facebook, Inc. and its affiliates.
  3. # All rights reserved.
  4. #
  5. # This source code is licensed under the BSD-style license found in the
  6. # LICENSE file in the root directory of this source tree.
  7. import inspect
  8. import logging
  9. import os
  10. import pickle
  11. import socket
  12. import threading
  13. import time
  14. import weakref
  15. from abc import ABC, abstractmethod
  16. from dataclasses import dataclass
  17. from datetime import datetime, timedelta
  18. from enum import Enum
  19. from typing import Any, Callable, cast, Dict, List, Optional, Set, Tuple
  20. from torch.distributed import PrefixStore, Store
  21. from torch.distributed.elastic.events import construct_and_record_rdzv_event, NodeState
  22. from .api import (
  23. RendezvousClosedError,
  24. RendezvousError,
  25. RendezvousGracefulExitError,
  26. RendezvousHandler,
  27. RendezvousInfo,
  28. RendezvousParameters,
  29. RendezvousStateError,
  30. RendezvousStoreInfo,
  31. RendezvousTimeoutError,
  32. )
  33. from .utils import _delay, _PeriodicTimer
  34. __all__ = [
  35. "RendezvousBackend",
  36. "RendezvousTimeout",
  37. "RendezvousSettings",
  38. "DynamicRendezvousHandler",
  39. "create_handler",
  40. ]
  41. logger = logging.getLogger(__name__)
  42. def get_method_name(depth=2):
  43. if len(inspect.stack()) > depth:
  44. return inspect.stack()[depth].function
  45. return "no_method_name"
  46. Token = Any
  47. """Represent an opaque fencing token used by the rendezvous backend."""
  48. class RendezvousBackend(ABC):
  49. """Represent a backend that holds the rendezvous state."""
  50. @property
  51. @abstractmethod
  52. def name(self) -> str:
  53. """Get the name of the backend."""
  54. @abstractmethod
  55. def get_state(self) -> Optional[Tuple[bytes, Token]]:
  56. """Get the rendezvous state.
  57. Returns:
  58. A tuple of the encoded rendezvous state and its fencing token or
  59. ``None`` if no state is found in the backend.
  60. Raises:
  61. RendezvousConnectionError:
  62. The connection to the backend has failed.
  63. RendezvousStateError:
  64. The rendezvous state is corrupt.
  65. """
  66. @abstractmethod
  67. def set_state(
  68. self, state: bytes, token: Optional[Token] = None
  69. ) -> Optional[Tuple[bytes, Token, bool]]:
  70. """Set the rendezvous state.
  71. The new rendezvous state is set conditionally:
  72. - If the specified ``token`` matches the fencing token stored in the
  73. backend, the state will be updated. The new state will be returned
  74. to the caller along with its fencing token.
  75. - If the specified ``token`` does not match the fencing token stored
  76. in the backend, the state won't be updated; instead the existing
  77. state along with its fencing token will be returned to the caller.
  78. - If the specified ``token`` is ``None``, the new state will be set
  79. only if there is no existing state in the backend. Either the new
  80. state or the existing state along with its fencing token will be
  81. returned to the caller.
  82. Args:
  83. state:
  84. The encoded rendezvous state.
  85. token:
  86. An optional fencing token that was retrieved by a previous call
  87. to :py:meth:`get_state` or ``set_state()``.
  88. Returns:
  89. A tuple of the serialized rendezvous state, its fencing token, and
  90. a boolean value indicating whether our set attempt succeeded.
  91. Raises:
  92. RendezvousConnectionError:
  93. The connection to the backend has failed.
  94. RendezvousStateError:
  95. The rendezvous state is corrupt.
  96. """
  97. class RendezvousTimeout:
  98. """Hold the timeout configuration of a rendezvous.
  99. Args:
  100. join:
  101. The time within which the rendezvous is expected to complete.
  102. last_call:
  103. An additional wait amount before completing the rendezvous once the
  104. rendezvous has the minimum number of required participants.
  105. close:
  106. The time within which the rendezvous is expected to close after a
  107. call to :py:meth:`RendezvousHandler.set_closed` or
  108. :py:meth:`RendezvousHandler.shutdown`.
  109. keep_alive:
  110. The time within which a keep-alive heartbeat is expected to
  111. complete.
  112. """
  113. _ZERO = timedelta(0)
  114. _DEFAULT_TIMEOUTS = {
  115. "join": timedelta(seconds=600),
  116. "last_call": timedelta(seconds=30),
  117. "close": timedelta(seconds=30),
  118. "heartbeat": timedelta(seconds=5),
  119. }
  120. _join: timedelta
  121. _last_call: timedelta
  122. _close: timedelta
  123. _heartbeat: timedelta
  124. def __init__(
  125. self,
  126. join: Optional[timedelta] = None,
  127. last_call: Optional[timedelta] = None,
  128. close: Optional[timedelta] = None,
  129. heartbeat: Optional[timedelta] = None,
  130. ) -> None:
  131. self._set_timeouts(join=join, last_call=last_call, close=close, heartbeat=heartbeat)
  132. @property
  133. def join(self) -> timedelta:
  134. """Get the join timeout."""
  135. return self._join
  136. @property
  137. def last_call(self) -> timedelta:
  138. """Get the last call timeout."""
  139. return self._last_call
  140. @property
  141. def close(self) -> timedelta:
  142. """Get the close timeout."""
  143. return self._close
  144. @property
  145. def heartbeat(self) -> timedelta:
  146. """Get the keep-alive heartbeat timeout."""
  147. return self._heartbeat
  148. def _set_timeouts(self, **timeouts: Optional[timedelta]):
  149. for name, timeout in timeouts.items():
  150. if timeout is None:
  151. timeout = self._DEFAULT_TIMEOUTS[name]
  152. if timeout <= self._ZERO:
  153. raise ValueError(f"The {name} timeout ({timeout}) must be positive.")
  154. setattr(self, "_" + name, timeout)
  155. @dataclass(repr=False, eq=False, frozen=True)
  156. class RendezvousSettings:
  157. """Hold the settings of the rendezvous.
  158. Attributes:
  159. run_id:
  160. The run id of the rendezvous.
  161. min_nodes:
  162. The minimum number of nodes to admit to the rendezvous.
  163. max_nodes:
  164. The maximum number of nodes to admit to the rendezvous.
  165. timeout:
  166. The timeout configuration of the rendezvous.
  167. keep_alive_interval:
  168. The amount of time a node waits before sending a heartbeat to keep
  169. it alive in the rendezvous.
  170. keep_alive_max_attempt:
  171. The maximum number of failed heartbeat attempts after which a node
  172. is considered dead.
  173. """
  174. run_id: str
  175. min_nodes: int
  176. max_nodes: int
  177. timeout: RendezvousTimeout
  178. keep_alive_interval: timedelta
  179. keep_alive_max_attempt: int
  180. @dataclass(eq=True, order=True, frozen=True)
  181. class _NodeDesc:
  182. """Describe a node in the rendezvous.
  183. Attributes:
  184. addr:
  185. The FQDN of the node or user specified local node address.
  186. pid:
  187. The id of the process in which the rendezvous handler runs.
  188. local_id:
  189. A process-wide unique id.
  190. """
  191. addr: str
  192. pid: int
  193. local_id: int
  194. def __repr__(self) -> str:
  195. return f"{self.addr}_{self.pid}_{self.local_id}"
  196. class _NodeDescGenerator:
  197. """Generate node descriptors.
  198. A node descriptor is a combination of an FQDN, a process id, and an auto-
  199. incremented integer that uniquely identifies a node in the rendezvous.
  200. """
  201. _lock: threading.Lock
  202. _local_id: int
  203. def __init__(self) -> None:
  204. self._lock = threading.Lock()
  205. # An integer that is incremented with each call to generate().
  206. self._local_id = 0
  207. def generate(self, local_addr: Optional[str] = None) -> _NodeDesc:
  208. # This method can be called by multiple threads concurrently; therefore,
  209. # we must increment the integer atomically.
  210. with self._lock:
  211. local_id = self._local_id
  212. self._local_id += 1
  213. return _NodeDesc(local_addr or socket.getfqdn(), os.getpid(), local_id)
  214. class _RendezvousState:
  215. """Hold the state of a rendezvous.
  216. Attributes:
  217. round:
  218. The current round of the rendezvous.
  219. complete:
  220. A boolean value indicating whether the current round of the
  221. rendezvous is complete.
  222. deadline:
  223. The time at which the current round of the rendezvous will be
  224. considered complete if it is still waiting for nodes to join.
  225. closed:
  226. A boolean value indicating whether the rendezvous is closed.
  227. participants:
  228. A dictionary of the participants and their corresponding ranks.
  229. wait_list:
  230. A set of nodes that are waiting to participate in the next round of
  231. the rendezvous.
  232. redundancy_list:
  233. A set of nodes that are redundant in the current round and can join
  234. the next rendezvous without triggering re-rendezvous.
  235. last_heartbeats:
  236. A dictionary containing each node's last heartbeat time.
  237. """
  238. round: int
  239. complete: bool
  240. deadline: Optional[datetime]
  241. closed: bool
  242. participants: Dict[_NodeDesc, int]
  243. wait_list: Set[_NodeDesc]
  244. redundancy_list: Set[_NodeDesc]
  245. last_heartbeats: Dict[_NodeDesc, datetime]
  246. def __init__(self) -> None:
  247. self.round = 0
  248. self.complete = False
  249. self.deadline = None
  250. self.closed = False
  251. self.participants = {}
  252. self.wait_list = set()
  253. self.redundancy_list = set()
  254. self.last_heartbeats = {}
  255. def _remove_participant_epilogue(state: _RendezvousState, settings: RendezvousSettings) -> None:
  256. if state.complete:
  257. # If we do not have any participants left, move to the next round.
  258. if not state.participants:
  259. msg = "No participants left in the rendezvous, marking rendezvous as incomplete"
  260. logger.debug(msg)
  261. state.complete = False
  262. state.round += 1
  263. else:
  264. if len(state.participants) < settings.min_nodes:
  265. msg = (
  266. f"Number of participants {len(state.participants)}) less than"
  267. f"min_nodes {settings.min_nodes}, clearning deadline in state"
  268. )
  269. logger.debug(msg)
  270. state.deadline = None
  271. class _RendezvousStateHolder(ABC):
  272. """Hold the shared rendezvous state synced with other nodes."""
  273. @property
  274. @abstractmethod
  275. def state(self) -> _RendezvousState:
  276. """Get the local state."""
  277. @abstractmethod
  278. def sync(self) -> Optional[bool]:
  279. """Read or writes the latest state.
  280. Returns:
  281. A boolean value indicating whether the local state, in case marked
  282. as dirty, was successfully synced with other nodes.
  283. """
  284. @abstractmethod
  285. def mark_dirty(self) -> None:
  286. """Mark the local state as dirty."""
  287. class _BackendRendezvousStateHolder(_RendezvousStateHolder):
  288. """Hold the rendezvous state synced with other nodes via a backend.
  289. Args:
  290. backend:
  291. The rendezvous backend to use.
  292. settings:
  293. The rendezvous settings.
  294. cache_duration:
  295. The amount of time, in seconds, to cache the last rendezvous state
  296. before requesting it from the backend again.
  297. """
  298. _backend: RendezvousBackend
  299. _state: _RendezvousState
  300. _settings: RendezvousSettings
  301. _cache_duration: int
  302. _token: Token
  303. _dirty: bool
  304. _last_sync_time: float
  305. _dead_nodes: List[_NodeDesc]
  306. def __init__(
  307. self,
  308. backend: RendezvousBackend,
  309. settings: RendezvousSettings,
  310. cache_duration: int = 1,
  311. ) -> None:
  312. self._backend = backend
  313. self._state = _RendezvousState()
  314. self._settings = settings
  315. self._cache_duration = cache_duration
  316. self._token = None
  317. self._dirty = False
  318. self._last_sync_time = -1
  319. self._dead_nodes = []
  320. def _record(self, message: str, node_state: NodeState = NodeState.RUNNING):
  321. construct_and_record_rdzv_event(
  322. name=f"{self.__class__.__name__}.{get_method_name()}",
  323. run_id=self._settings.run_id,
  324. message=message,
  325. node_state=node_state,
  326. )
  327. @property
  328. def state(self) -> _RendezvousState:
  329. """See base class."""
  330. return self._state
  331. def sync(self) -> Optional[bool]:
  332. """See base class."""
  333. state_bits: Optional[bytes] = None
  334. token = None
  335. has_set: Optional[bool]
  336. if self._dirty:
  337. has_set = False
  338. state_bits = pickle.dumps(self._state)
  339. set_response = self._backend.set_state(state_bits, self._token)
  340. if set_response is not None:
  341. state_bits, token, has_set = set_response
  342. else:
  343. has_set = None
  344. if self._cache_duration > 0:
  345. # Avoid overloading the backend if we are asked to retrieve the
  346. # state repeatedly. Try to serve the cached state.
  347. if self._last_sync_time >= max(time.monotonic() - self._cache_duration, 0):
  348. return None
  349. get_response = self._backend.get_state()
  350. if get_response is not None:
  351. state_bits, token = get_response
  352. if state_bits is not None:
  353. try:
  354. self._state = pickle.loads(state_bits)
  355. except pickle.PickleError as exc:
  356. raise RendezvousStateError(
  357. "The rendezvous state is corrupt. See inner exception for details."
  358. ) from exc
  359. else:
  360. self._state = _RendezvousState()
  361. if has_set and self._dead_nodes and logger.isEnabledFor(logging.DEBUG):
  362. node_list = ", ".join(f"'{dead_node}'" for dead_node in self._dead_nodes)
  363. msg = (
  364. f"As part of the sync operation the node(s) {node_list} have been removed from the "
  365. f"rendezvous '{self._settings.run_id}' since they had no heartbeat."
  366. )
  367. self._record(message=msg)
  368. logger.debug(msg)
  369. self._token = token
  370. self._dirty = False
  371. self._last_sync_time = time.monotonic()
  372. self._sanitize()
  373. return has_set
  374. def _sanitize(self) -> None:
  375. state = self._state
  376. expire_time = datetime.utcnow() - (
  377. self._settings.keep_alive_interval * self._settings.keep_alive_max_attempt
  378. )
  379. # Filter out the dead nodes.
  380. self._dead_nodes = [
  381. node
  382. for node, last_heartbeat in state.last_heartbeats.items()
  383. if last_heartbeat < expire_time
  384. ]
  385. participant_removed = False
  386. for dead_node in self._dead_nodes:
  387. msg = f"Detected dead node '{dead_node}', removing it from the rendezvous"
  388. logger.debug(msg)
  389. del state.last_heartbeats[dead_node]
  390. try:
  391. del state.participants[dead_node]
  392. participant_removed = True
  393. except KeyError:
  394. pass
  395. try:
  396. state.wait_list.remove(dead_node)
  397. except KeyError:
  398. pass
  399. try:
  400. state.redundancy_list.remove(dead_node)
  401. except KeyError:
  402. pass
  403. if participant_removed:
  404. # Common epilogue shared with the _remove_from_participants()
  405. # function of _DistributedRendezvousOpExecutor.
  406. _remove_participant_epilogue(state, self._settings)
  407. def mark_dirty(self) -> None:
  408. """See base class.
  409. If the local rendezvous state is dirty, the next sync call will try to
  410. write the changes back to the backend. However this attempt might fail
  411. if another node, which had the same state, also made changes and wrote
  412. them before us.
  413. """
  414. self._dirty = True
  415. class _Action(Enum):
  416. """Specifies the possible actions based on the state of the rendezvous."""
  417. KEEP_ALIVE = 1
  418. ADD_TO_PARTICIPANTS = 2
  419. ADD_TO_WAIT_LIST = 3
  420. ADD_TO_REDUNDANCY_LIST = 4
  421. REMOVE_FROM_PARTICIPANTS = 5
  422. REMOVE_FROM_WAIT_LIST = 6
  423. REMOVE_FROM_REDUNDANCY_LIST = 7
  424. MARK_RENDEZVOUS_COMPLETE = 8
  425. MARK_RENDEZVOUS_CLOSED = 9
  426. SYNC = 10
  427. ERROR_CLOSED = 11
  428. ERROR_TIMEOUT = 12
  429. FINISH = 13
  430. class _RendezvousContext:
  431. """Holds the context of the rendezvous.
  432. Attributes:
  433. node:
  434. The node descriptor associated with the current rendezvous handler
  435. instance.
  436. state:
  437. The current state of the rendezvous.
  438. settings:
  439. The rendezvous settings.
  440. """
  441. node: _NodeDesc
  442. state: _RendezvousState
  443. settings: RendezvousSettings
  444. def __init__(
  445. self, node: _NodeDesc, state: _RendezvousState, settings: RendezvousSettings
  446. ) -> None:
  447. self.node = node
  448. self.state = state
  449. self.settings = settings
  450. class _RendezvousOpExecutor(ABC):
  451. """Execute rendezvous operations."""
  452. @abstractmethod
  453. def run(
  454. self,
  455. state_handler: Callable[[_RendezvousContext, float], _Action],
  456. deadline: float,
  457. update_deadline: Optional[Callable[[timedelta], float]] = None,
  458. ) -> None:
  459. """Execute a rendezvous operation.
  460. An operation is run inside a state machine and is expected to transition
  461. the rendezvous from one state to another.
  462. Args:
  463. state_handler:
  464. A callable that is expected to return the next state transition
  465. action based on the current state of the rendezvous.
  466. deadline:
  467. The time, in seconds, at which the operation will be considered
  468. timed-out.
  469. update_deadline:
  470. Function to generate a new operation deadline if the current
  471. node may participate in the next rendezvous.
  472. """
  473. class _DistributedRendezvousOpExecutor(_RendezvousOpExecutor):
  474. """Execute rendezvous operations using a shared state.
  475. Args:
  476. node:
  477. The node descriptor associated with the current rendezvous handler
  478. instance.
  479. state_holder:
  480. The ``RendezvousStateHolder`` to use to sync the rendezvous state
  481. with other nodes.
  482. settings:
  483. The rendezvous settings.
  484. """
  485. _node: _NodeDesc
  486. _state: _RendezvousState
  487. _state_holder: _RendezvousStateHolder
  488. _settings: RendezvousSettings
  489. def __init__(
  490. self,
  491. node: _NodeDesc,
  492. state_holder: _RendezvousStateHolder,
  493. settings: RendezvousSettings,
  494. ) -> None:
  495. self._node = node
  496. self._state_holder = state_holder
  497. self._settings = settings
  498. def _record(self, message: str, node_state: NodeState = NodeState.RUNNING) -> None:
  499. construct_and_record_rdzv_event(
  500. name=f"{self.__class__.__name__}.{get_method_name()}",
  501. run_id=self._settings.run_id,
  502. message=message,
  503. node_state=node_state,
  504. hostname=self._node.addr,
  505. pid=self._node.pid,
  506. local_id=self._node.local_id,
  507. )
  508. def run(
  509. self,
  510. state_handler: Callable[[_RendezvousContext, float], _Action],
  511. deadline: float,
  512. update_deadline: Optional[Callable[[timedelta], float]] = None,
  513. ) -> None:
  514. """See base class."""
  515. action = None
  516. while action != _Action.FINISH:
  517. # Reads or writes the latest rendezvous state shared by all nodes in
  518. # the rendezvous. Note that our local changes might get overridden
  519. # by another node if that node synced its changes before us.
  520. has_set = self._state_holder.sync()
  521. if has_set is not None:
  522. if has_set:
  523. msg = (
  524. f"The node '{self._node}' has successfully synced its local changes with "
  525. f"other nodes in the rendezvous '{self._settings.run_id}'."
  526. )
  527. else:
  528. msg = (
  529. f"The node '{self._node}' has a stale state and failed to sync its local "
  530. f"changes with other nodes in the rendezvous '{self._settings.run_id}'."
  531. )
  532. self._record(message=msg)
  533. logger.debug(msg)
  534. self._state = self._state_holder.state
  535. ctx = _RendezvousContext(self._node, self._state, self._settings)
  536. # Determine the next action to take based on the current state of
  537. # the rendezvous.
  538. action = state_handler(ctx, deadline)
  539. if action == _Action.FINISH:
  540. continue
  541. if action == _Action.ERROR_CLOSED:
  542. raise RendezvousClosedError
  543. if action == _Action.ERROR_TIMEOUT:
  544. raise RendezvousTimeoutError
  545. if action == _Action.SYNC:
  546. # Delay the execution by one second to avoid overloading the
  547. # backend if we are asked to poll for state changes.
  548. _delay(seconds=1)
  549. else:
  550. if action == _Action.KEEP_ALIVE:
  551. self._keep_alive()
  552. elif action == _Action.ADD_TO_PARTICIPANTS:
  553. self._add_to_participants()
  554. elif action == _Action.ADD_TO_WAIT_LIST:
  555. self._add_to_wait_list()
  556. elif action == _Action.ADD_TO_REDUNDANCY_LIST:
  557. self._add_to_redundancy_list()
  558. elif action == _Action.REMOVE_FROM_PARTICIPANTS:
  559. self._remove_from_participants()
  560. elif action == _Action.REMOVE_FROM_WAIT_LIST:
  561. self._remove_from_wait_list()
  562. elif action == _Action.REMOVE_FROM_REDUNDANCY_LIST:
  563. self._remove_from_redundancy_list()
  564. # update deadline since the node may participate in rendezvous process
  565. if update_deadline:
  566. deadline = update_deadline(self._settings.timeout.join)
  567. elif action == _Action.MARK_RENDEZVOUS_COMPLETE:
  568. self._mark_rendezvous_complete()
  569. elif action == _Action.MARK_RENDEZVOUS_CLOSED:
  570. self._mark_rendezvous_closed()
  571. # Attempt to sync our changes back to other nodes.
  572. self._state_holder.mark_dirty()
  573. def _keep_alive(self) -> None:
  574. msg = (
  575. f"The node '{self._node}' updated its keep-alive heartbeat time for the rendezvous "
  576. f"'{self._settings.run_id}'. Pending sync."
  577. )
  578. self._record(message=msg)
  579. logger.debug(msg)
  580. self._state.last_heartbeats[self._node] = datetime.utcnow()
  581. def _add_to_participants(self) -> None:
  582. msg = (
  583. f"The node '{self._node}' added itself to the participants of round "
  584. f"{self._state.round} of the rendezvous '{self._settings.run_id}'. Pending sync."
  585. )
  586. self._record(message=msg)
  587. logger.debug(msg)
  588. state = self._state
  589. try:
  590. state.wait_list.remove(self._node)
  591. except KeyError:
  592. pass
  593. # The ranks of the participants will be set once the rendezvous is
  594. # complete.
  595. state.participants[self._node] = 0
  596. self._keep_alive()
  597. if len(state.participants) == self._settings.min_nodes:
  598. state.deadline = datetime.utcnow() + self._settings.timeout.last_call
  599. if len(state.participants) == self._settings.max_nodes:
  600. self._mark_rendezvous_complete()
  601. def _add_to_wait_list(self) -> None:
  602. msg = (
  603. f"The node '{self._node}' added itself to the wait list of round "
  604. f"{self._state.round + 1} of the rendezvous '{self._settings.run_id}'. Pending sync."
  605. )
  606. self._record(message=msg)
  607. logger.debug(msg)
  608. if self._node in self._state.redundancy_list:
  609. self._state.redundancy_list.remove(self._node)
  610. self._state.wait_list.add(self._node)
  611. self._keep_alive()
  612. def _add_to_redundancy_list(self) -> None:
  613. msg = (
  614. f"The node '{self._node}' added itself to the redundancy list of round "
  615. f"{self._state.round + 1} of the rendezvous '{self._settings.run_id}'. Pending sync."
  616. )
  617. self._record(message=msg)
  618. logger.debug(msg)
  619. self._state.redundancy_list.add(self._node)
  620. self._keep_alive()
  621. def _remove_from_participants(self) -> None:
  622. msg = (
  623. f"The node '{self._node}' removed itself from the participants of round "
  624. f"{self._state.round} of the rendezvous '{self._settings.run_id}'. Pending sync."
  625. )
  626. self._record(message=msg)
  627. logger.debug(msg)
  628. state = self._state
  629. del state.participants[self._node]
  630. del state.last_heartbeats[self._node]
  631. # Common epilogue shared with the sanitizer() function of
  632. # _BackendRendezvousStateHolder.
  633. _remove_participant_epilogue(state, self._settings)
  634. def _remove_from_wait_list(self) -> None:
  635. msg = (
  636. f"The node '{self._node}' removed itself from the wait list of round "
  637. f"{self._state.round + 1} of the rendezvous '{self._settings.run_id}'. Pending sync."
  638. )
  639. self._record(message=msg)
  640. logger.debug(msg)
  641. self._state.wait_list.remove(self._node)
  642. del self._state.last_heartbeats[self._node]
  643. def _remove_from_redundancy_list(self) -> None:
  644. msg = (
  645. f"The node '{self._node}' removed itself from the redunant list of round "
  646. f"{self._state.round + 1} of the rendezvous '{self._settings.run_id}'. Pending sync."
  647. )
  648. self._record(message=msg)
  649. logger.debug(msg)
  650. self._state.redundancy_list.remove(self._node)
  651. del self._state.last_heartbeats[self._node]
  652. def _mark_rendezvous_complete(self) -> None:
  653. msg = (
  654. f"The node '{self._node}' marked round {self._state.round} of the rendezvous "
  655. f"'{self._settings.run_id}' as complete. Pending sync."
  656. )
  657. self._record(message=msg, node_state=NodeState.SUCCEEDED)
  658. logger.debug(msg)
  659. state = self._state
  660. state.complete = True
  661. state.deadline = None
  662. # Assign the ranks.
  663. for rank, node in enumerate(sorted(state.participants)):
  664. state.participants[node] = rank
  665. def _mark_rendezvous_closed(self) -> None:
  666. msg = (
  667. f"The node '{self._node}' marked the rendezvous '{self._settings.run_id}' as closed. "
  668. "Pending sync."
  669. )
  670. self._record(message=msg, node_state=NodeState.SUCCEEDED)
  671. logger.debug(msg)
  672. self._state.closed = True
  673. def _should_keep_alive(ctx: _RendezvousContext) -> bool:
  674. """Determine whether a keep-alive heartbeat should be sent."""
  675. try:
  676. last_heartbeat = ctx.state.last_heartbeats[ctx.node]
  677. except KeyError:
  678. return False
  679. return last_heartbeat <= datetime.utcnow() - ctx.settings.keep_alive_interval
  680. class _RendezvousExitOp:
  681. """Represent a rendezvous exit operation."""
  682. def __call__(self, ctx: _RendezvousContext, deadline: float) -> _Action:
  683. if ctx.node in ctx.state.participants:
  684. if time.monotonic() > deadline:
  685. return _Action.ERROR_TIMEOUT
  686. return _Action.REMOVE_FROM_PARTICIPANTS
  687. return _Action.FINISH
  688. class _RendezvousJoinOp:
  689. """Represent a rendezvous join operation."""
  690. def __call__(self, ctx: _RendezvousContext, deadline: float) -> _Action:
  691. state = ctx.state
  692. # A closed rendezvous means that it no longer accepts new nodes.
  693. if state.closed:
  694. if ctx.node in state.redundancy_list:
  695. msg = f"The rendezvous '{ctx.settings.run_id}' is closed, terminating pending rendezvous."
  696. raise RendezvousGracefulExitError(msg)
  697. return _Action.ERROR_CLOSED
  698. if ctx.node in state.redundancy_list:
  699. msg = f"The node {ctx.node} is in redunancy list"
  700. logger.debug(msg)
  701. # don't apply the timeout logic here, since we want to allow the node to rejoin
  702. if len(state.participants) == ctx.settings.max_nodes:
  703. if _should_keep_alive(ctx):
  704. return _Action.KEEP_ALIVE
  705. else:
  706. return _Action.SYNC
  707. else:
  708. # transition to waiting state that will respect timeouts.
  709. msg = f"The node {ctx.node} is removed from redunancy list"
  710. logger.debug(msg)
  711. return _Action.REMOVE_FROM_REDUNDANCY_LIST
  712. is_participant = ctx.node in state.participants
  713. # If we are part of the rendezvous and it is already complete there is
  714. # no further action to take.
  715. if state.complete and is_participant:
  716. return _Action.FINISH
  717. now = time.monotonic()
  718. if now > deadline:
  719. rollback_period = 5 # 5 seconds
  720. # If we still have time to rollback (a short period on top of the
  721. # operation deadline), try to remove ourself from the rendezvous.
  722. # It is okay if we can't though as our keep-alive will eventually
  723. # expire.
  724. if now <= deadline + rollback_period:
  725. # If we are part of the rendezvous, it means we couldn't find
  726. # enough participants to complete it on time.
  727. if is_participant:
  728. return _Action.REMOVE_FROM_PARTICIPANTS
  729. # If we are in the wait list, it means we couldn't wait till the
  730. # next round of the rendezvous.
  731. if ctx.node in state.wait_list:
  732. return _Action.REMOVE_FROM_WAIT_LIST
  733. return _Action.ERROR_TIMEOUT
  734. if state.complete:
  735. # If we are here, it means we are not part of the rendezvous. In
  736. # case the rendezvous has capacity for additional participants add
  737. # ourself to the wait list for the next round.
  738. if len(state.participants) < ctx.settings.max_nodes:
  739. if ctx.node not in state.wait_list:
  740. return _Action.ADD_TO_WAIT_LIST
  741. elif len(state.participants) >= ctx.settings.max_nodes:
  742. if ctx.node not in state.redundancy_list and ctx.node not in state.wait_list:
  743. return _Action.ADD_TO_REDUNDANCY_LIST
  744. elif is_participant:
  745. # If the rendezvous has enough number of participants including us,
  746. # check whether we have passed the rendezvous deadline. If yes,
  747. # complete it.
  748. if len(state.participants) >= ctx.settings.min_nodes and \
  749. len(state.participants) <= ctx.settings.max_nodes:
  750. if cast(datetime, state.deadline) < datetime.utcnow():
  751. msg = (
  752. f"The node '{ctx.node}' marking the rendezvous complete, "
  753. f"quorum established within deadline"
  754. )
  755. logger.debug(msg)
  756. return _Action.MARK_RENDEZVOUS_COMPLETE
  757. else:
  758. msg = f"The node '{ctx.node}' can't complete rendezvous: deadline reached"
  759. logger.debug(msg)
  760. else:
  761. msg = f"The node '{ctx.node}' can't complete rendezvous: not enough participants"
  762. logger.debug(msg)
  763. else:
  764. # The rendezvous is not complete yet and we are not part of it. Try
  765. # to join.
  766. return _Action.ADD_TO_PARTICIPANTS
  767. if _should_keep_alive(ctx):
  768. return _Action.KEEP_ALIVE
  769. # At this point either the rendezvous is not complete, but we are part
  770. # of it, which means we have to wait for other participants to join; or
  771. # the rendezvous is complete, but we are not part of it, which means we
  772. # have to wait for the next round.
  773. return _Action.SYNC
  774. class _RendezvousCloseOp:
  775. """Represent a rendezvous close operation."""
  776. def __call__(self, ctx: _RendezvousContext, deadline: float) -> _Action:
  777. if ctx.state.closed:
  778. return _Action.FINISH
  779. if time.monotonic() > deadline:
  780. return _Action.ERROR_TIMEOUT
  781. return _Action.MARK_RENDEZVOUS_CLOSED
  782. class _RendezvousKeepAliveOp:
  783. """Represent a rendezvous keep-alive update operation."""
  784. def __call__(self, ctx: _RendezvousContext, deadline: float) -> _Action:
  785. if _should_keep_alive(ctx):
  786. if time.monotonic() > deadline:
  787. return _Action.ERROR_TIMEOUT
  788. return _Action.KEEP_ALIVE
  789. return _Action.FINISH
  790. class DynamicRendezvousHandler(RendezvousHandler):
  791. """Represent a handler that sets up a rendezvous among a set of nodes."""
  792. # Static
  793. _node_desc_generator = _NodeDescGenerator()
  794. _this_node: _NodeDesc
  795. _settings: RendezvousSettings
  796. _backend_name: str
  797. _store: Store
  798. _state_holder: _RendezvousStateHolder
  799. _op_executor: _RendezvousOpExecutor
  800. _heartbeat_lock: threading.Lock
  801. _keep_alive_timer: Optional[_PeriodicTimer]
  802. @classmethod
  803. def from_backend(
  804. cls,
  805. run_id: str,
  806. store: Store,
  807. backend: RendezvousBackend,
  808. min_nodes: int,
  809. max_nodes: int,
  810. local_addr: Optional[str] = None,
  811. timeout: Optional[RendezvousTimeout] = None,
  812. ):
  813. """Create a new :py:class:`DynamicRendezvousHandler`.
  814. Args:
  815. run_id:
  816. The run id of the rendezvous.
  817. store:
  818. The C10d store to return as part of the rendezvous.
  819. backend:
  820. The backend to use to hold the rendezvous state.
  821. min_nodes:
  822. The minimum number of nodes to admit to the rendezvous.
  823. max_nodes:
  824. The maximum number of nodes to admit to the rendezvous.
  825. local_addr:
  826. The local node address.
  827. timeout:
  828. The timeout configuration of the rendezvous.
  829. """
  830. # We associate each handler instance with a unique node descriptor.
  831. node = cls._node_desc_generator.generate(local_addr)
  832. settings = RendezvousSettings(
  833. run_id,
  834. min_nodes,
  835. max_nodes,
  836. timeout or RendezvousTimeout(),
  837. keep_alive_interval=timedelta(seconds=5),
  838. keep_alive_max_attempt=3,
  839. )
  840. state_holder = _BackendRendezvousStateHolder(backend, settings)
  841. return cls(node, settings, backend.name, store, state_holder)
  842. def __init__(
  843. self,
  844. node: _NodeDesc,
  845. settings: RendezvousSettings,
  846. backend_name: str,
  847. store: Store,
  848. state_holder: _RendezvousStateHolder,
  849. ) -> None:
  850. if not settings.run_id:
  851. raise ValueError("The run id must be a non-empty string.")
  852. if settings.min_nodes < 1:
  853. raise ValueError(
  854. f"The minimum number of nodes ({settings.min_nodes}) must be greater than zero."
  855. )
  856. if settings.max_nodes < settings.min_nodes:
  857. raise ValueError(
  858. f"The maximum number of nodes ({settings.max_nodes}) must be greater than or equal "
  859. f"to the minimum number of nodes ({settings.min_nodes})."
  860. )
  861. self._this_node = node
  862. self._settings = settings
  863. self._backend_name = backend_name
  864. self._store = store
  865. self._state_holder = state_holder
  866. self._op_executor = _DistributedRendezvousOpExecutor(
  867. self._this_node, self._state_holder, self._settings
  868. )
  869. self._heartbeat_lock = threading.Lock()
  870. self._keep_alive_timer = None
  871. def _record(
  872. self,
  873. message: str,
  874. node_state: NodeState = NodeState.RUNNING,
  875. rank: Optional[int] = None,
  876. ) -> None:
  877. construct_and_record_rdzv_event(
  878. name=f"{self.__class__.__name__}.{get_method_name()}",
  879. run_id=self._settings.run_id,
  880. message=message,
  881. node_state=node_state,
  882. hostname=self._this_node.addr,
  883. pid=self._this_node.pid,
  884. local_id=self._this_node.local_id,
  885. rank=rank,
  886. )
  887. @property
  888. def settings(self) -> RendezvousSettings:
  889. """Get the settings of the rendezvous."""
  890. return self._settings
  891. def get_backend(self) -> str:
  892. """See base class."""
  893. return self._backend_name
  894. def next_rendezvous(self) -> RendezvousInfo:
  895. """See base class."""
  896. msg = (
  897. f"The node '{self._this_node}' attempts to join the next round of the rendezvous "
  898. f"'{self._settings.run_id}'."
  899. )
  900. self._record(message=msg)
  901. logger.info(msg)
  902. try:
  903. self._stop_heartbeats()
  904. # Delay the execution for a small random amount of time if this is our
  905. # first run. This will slightly skew the rendezvous attempts across the
  906. # nodes and reduce the load on the backend.
  907. if self._state_holder.state.round == 0:
  908. _delay(seconds=(0, 0.3))
  909. exit_op = _RendezvousExitOp()
  910. join_op = _RendezvousJoinOp()
  911. deadline = self._get_deadline(self._settings.timeout.join)
  912. self._op_executor.run(exit_op, deadline)
  913. self._op_executor.run(
  914. join_op,
  915. deadline,
  916. self._get_deadline)
  917. self._start_heartbeats()
  918. rank, world_size = self._get_world()
  919. store = self._get_store()
  920. except Exception as e:
  921. self._record(
  922. message=f"{type(e).__name__}: {str(e)}",
  923. node_state=NodeState.FAILED,
  924. )
  925. raise
  926. msg = (
  927. f"The node '{self._this_node}' has joined round {self._state_holder.state.round} of "
  928. f"the rendezvous '{self._settings.run_id}' as rank {rank} in a world of size "
  929. f"{world_size}."
  930. )
  931. self._record(message=msg, rank=rank)
  932. logger.info(msg)
  933. bootstrap_store_info = RendezvousStoreInfo.build(rank, store)
  934. return RendezvousInfo(
  935. store,
  936. rank,
  937. world_size,
  938. bootstrap_store_info,
  939. )
  940. def is_closed(self) -> bool:
  941. """See base class."""
  942. try:
  943. with self._heartbeat_lock:
  944. self._state_holder.sync()
  945. return self._state_holder.state.closed
  946. except Exception as e:
  947. self._record(
  948. message=f"{type(e).__name__}: {str(e)}",
  949. node_state=NodeState.FAILED,
  950. )
  951. raise
  952. def set_closed(self) -> None:
  953. """See base class."""
  954. try:
  955. with self._heartbeat_lock:
  956. self._close()
  957. except Exception as e:
  958. self._record(
  959. message=f"{type(e).__name__}: {str(e)}",
  960. node_state=NodeState.FAILED,
  961. )
  962. raise
  963. def num_nodes_waiting(self) -> int:
  964. """See base class."""
  965. try:
  966. with self._heartbeat_lock:
  967. self._state_holder.sync()
  968. return len(self._state_holder.state.wait_list)
  969. except Exception as e:
  970. self._record(
  971. message=f"{type(e).__name__}: {str(e)}",
  972. node_state=NodeState.FAILED,
  973. )
  974. raise
  975. def get_run_id(self) -> str:
  976. """See base class."""
  977. return self._settings.run_id
  978. def shutdown(self) -> bool:
  979. """See base class."""
  980. self._stop_heartbeats()
  981. try:
  982. self._close()
  983. return True
  984. except RendezvousError as ex:
  985. msg = (
  986. f"The node '{self._this_node}' has failed to shutdown the rendezvous "
  987. f"'{self._settings.run_id}' due to an error of type {type(ex).__name__}."
  988. )
  989. self._record(message=msg, node_state=NodeState.FAILED)
  990. logger.warning(msg)
  991. return False
  992. except Exception as e:
  993. self._record(
  994. message=f"{type(e).__name__}: {str(e)}",
  995. node_state=NodeState.FAILED,
  996. )
  997. raise
  998. def _close(self) -> None:
  999. op = _RendezvousCloseOp()
  1000. deadline = self._get_deadline(self._settings.timeout.close)
  1001. self._op_executor.run(op, deadline)
  1002. msg = f"The node '{self._this_node}' has closed the rendezvous '{self._settings.run_id}'."
  1003. self._record(message=msg, node_state=NodeState.SUCCEEDED)
  1004. logger.info(msg)
  1005. @staticmethod
  1006. def _keep_alive_weak(weak_self) -> None:
  1007. self = weak_self()
  1008. if self is not None:
  1009. self._keep_alive()
  1010. def _keep_alive(self) -> None:
  1011. self._heartbeat_lock.acquire()
  1012. op = _RendezvousKeepAliveOp()
  1013. deadline = self._get_deadline(self._settings.timeout.heartbeat)
  1014. try:
  1015. self._op_executor.run(op, deadline)
  1016. msg = (
  1017. f"The node '{self._this_node}' has sent a keep-alive heartbeat to the rendezvous "
  1018. f"'{self._settings.run_id}'."
  1019. )
  1020. self._record(message=msg)
  1021. logger.debug(msg)
  1022. except RendezvousError as ex:
  1023. msg = (
  1024. f"The node '{self._this_node}' has failed to send a keep-alive heartbeat to the "
  1025. f"rendezvous '{self._settings.run_id}' due to an error of type {type(ex).__name__}."
  1026. )
  1027. self._record(message=msg, node_state=NodeState.FAILED)
  1028. logger.warning(msg)
  1029. finally:
  1030. self._heartbeat_lock.release()
  1031. def _start_heartbeats(self) -> None:
  1032. self._keep_alive_timer = _PeriodicTimer(
  1033. self._settings.keep_alive_interval, self._keep_alive_weak, weakref.ref(self)
  1034. )
  1035. self._keep_alive_timer.set_name(f"RendezvousKeepAliveTimer_{self._this_node.local_id}")
  1036. self._keep_alive_timer.start()
  1037. def _stop_heartbeats(self) -> None:
  1038. if self._keep_alive_timer is None:
  1039. return
  1040. self._keep_alive_timer.cancel()
  1041. def _get_world(self) -> Tuple[int, int]:
  1042. state = self._state_holder.state
  1043. return state.participants[self._this_node], len(state.participants)
  1044. def _get_store(self) -> Store:
  1045. key_prefix = f"torch.rendezvous.{self._settings.run_id}.{self._state_holder.state.round}"
  1046. return PrefixStore(key_prefix, self._store)
  1047. def _get_deadline(self, timeout: timedelta) -> float:
  1048. return time.monotonic() + timeout.total_seconds()
  1049. def _get_timeout(params: RendezvousParameters, key: str) -> Optional[timedelta]:
  1050. timeout = params.get_as_int(key + "_timeout")
  1051. if timeout is None:
  1052. return None
  1053. return timedelta(seconds=timeout)
  1054. def create_handler(
  1055. store: Store, backend: RendezvousBackend, params: RendezvousParameters
  1056. ) -> DynamicRendezvousHandler:
  1057. """Create a new :py:class:`DynamicRendezvousHandler` from the specified parameters.
  1058. Args:
  1059. store:
  1060. The C10d store to return as part of the rendezvous.
  1061. backend:
  1062. The backend to use to hold the rendezvous state.
  1063. +-------------------+------------------------------------------------------+
  1064. | Parameter | Description |
  1065. +===================+======================================================+
  1066. | join_timeout | The total time, in seconds, within which the |
  1067. | | rendezvous is expected to complete. Defaults to 600 |
  1068. | | seconds. |
  1069. +-------------------+------------------------------------------------------+
  1070. | last_call_timeout | An additional wait amount, in seconds, before |
  1071. | | completing the rendezvous once the minimum number of |
  1072. | | nodes has been reached. Defaults to 30 seconds. |
  1073. +-------------------+------------------------------------------------------+
  1074. | close_timeout | The time, in seconds, within which the rendezvous is |
  1075. | | expected to close after a call to |
  1076. | | :py:meth:`RendezvousHandler.set_closed` or |
  1077. | | :py:meth:`RendezvousHandler.shutdown`. Defaults to |
  1078. | | 30 seconds. |
  1079. +-------------------+------------------------------------------------------+
  1080. """
  1081. try:
  1082. timeout = RendezvousTimeout(
  1083. _get_timeout(params, "join"),
  1084. _get_timeout(params, "last_call"),
  1085. _get_timeout(params, "close"),
  1086. )
  1087. return DynamicRendezvousHandler.from_backend(
  1088. params.run_id,
  1089. store,
  1090. backend,
  1091. params.min_nodes,
  1092. params.max_nodes,
  1093. params.local_addr,
  1094. timeout,
  1095. )
  1096. except Exception as e:
  1097. construct_and_record_rdzv_event(
  1098. message=f"{type(e).__name__}: {str(e)}",
  1099. run_id=params.run_id,
  1100. node_state=NodeState.FAILED,
  1101. )
  1102. raise