api.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912
  1. # mypy: ignore-errors
  2. # Copyright (c) Facebook, Inc. and its affiliates.
  3. # All rights reserved.
  4. #
  5. # This source code is licensed under the BSD-style license found in the
  6. # LICENSE file in the root directory of this source tree.
  7. import abc
  8. import json
  9. import os
  10. import signal
  11. import socket
  12. import time
  13. import traceback
  14. import warnings
  15. from contextlib import contextmanager
  16. from dataclasses import dataclass, field
  17. from enum import Enum
  18. from typing import Any, Callable, Dict, List, Optional, Tuple, Union
  19. import torch.distributed.elastic.rendezvous as rdzv
  20. import torch.distributed.elastic.utils.store as store_util
  21. from torch.distributed.elastic.rendezvous import RendezvousGracefulExitError
  22. from torch.distributed.elastic.events import Event, EventSource, record
  23. from torch.distributed.elastic.metrics import prof, put_metric
  24. from torch.distributed.elastic.multiprocessing import (
  25. ProcessFailure,
  26. SignalException,
  27. )
  28. from collections import defaultdict
  29. from torch.distributed.elastic.utils.logging import get_logger
  30. __all__ = [
  31. "WorkerSpec",
  32. "Worker",
  33. "WorkerState",
  34. "WorkerGroup",
  35. "RunResult",
  36. "ElasticAgent",
  37. "SimpleElasticAgent",
  38. ]
  39. _TERMINAL_STATE_SYNC_ID = "torchelastic/agent/terminal_state"
  40. DEFAULT_ROLE = "default"
  41. logger = get_logger(__name__)
  42. @dataclass
  43. class WorkerSpec:
  44. """Blueprint information about a particular type of worker.
  45. For a given role, there must only exist a single worker spec.
  46. Worker spec is expected to be homogeneous across all nodes (machine),
  47. that is each node runs the same number of workers for a particular spec.
  48. Args:
  49. role: user-defined role for the workers with this spec
  50. local_world_size: number local workers to run
  51. fn: (deprecated use entrypoint instead)
  52. entrypoint: worker function or command
  53. args: arguments to pass to ``entrypoint``
  54. rdzv_handler: handles rdzv for this set of workers
  55. max_restarts: number of max retries for the workers
  56. monitor_interval: monitor status of workers every ``n`` seconds
  57. master_port: fixed port to run the c10d store on rank 0
  58. if not specified then will chose a random free port
  59. master_addr: fixed master_addr to run the c10d store on rank 0
  60. if not specified then will chose hostname on agent rank 0
  61. redirects: redirect std streams to a file,
  62. selectively redirect for a particular
  63. local rank by passing a map
  64. tee: tees the specified std stream(s) to console + file,
  65. selectively tee for a particular local rank by passing a map,
  66. takes precedence over ``redirects`` settings.
  67. """
  68. role: str
  69. local_world_size: int
  70. rdzv_handler: rdzv.RendezvousHandler
  71. fn: Optional[Callable] = None
  72. # TODO @kiuk - make entrypoint a required field
  73. entrypoint: Union[Callable, str, None] = None
  74. args: Tuple = ()
  75. max_restarts: int = 3
  76. monitor_interval: float = 0.1
  77. master_port: Optional[int] = None
  78. master_addr: Optional[str] = None
  79. local_addr: Optional[str] = None
  80. def __post_init__(self):
  81. assert self.local_world_size > 0
  82. assert self.monitor_interval > 0
  83. if self.fn:
  84. warnings.warn(
  85. "WorkerSpec.fn will be deprecated,"
  86. " please use WorkerSpec.entrypoint instead",
  87. category=DeprecationWarning,
  88. )
  89. self.entrypoint = self.fn
  90. assert self.entrypoint
  91. def get_entrypoint_name(self):
  92. """Get the entry point name.
  93. If the entrypoint is a function (e.g. ``Callable``) returns its ``__qualname__``
  94. else if the entrypoint is a binary (e.g. ``str``), returns the binary name.
  95. """
  96. if isinstance(self.entrypoint, str):
  97. return os.path.basename(self.entrypoint)
  98. else:
  99. assert self.entrypoint is not None
  100. return self.entrypoint.__qualname__
  101. class Worker:
  102. """A worker instance.
  103. Contrast this with ``WorkerSpec`` that represents the specifications of a
  104. worker. A ``Worker`` is created from a ``WorkerSpec``. A ``Worker`` is to
  105. a ``WorkerSpec`` as an object is to a class.
  106. The ``id`` of the worker is interpreted
  107. by the specific implementation of ``ElasticAgent``. For a local
  108. agent, it could be the ``pid (int)`` of the worker, for a remote
  109. agent it could be encoded as ``host:port (string)``.
  110. Args:
  111. id (Any): uniquely identifies a worker (interpreted by the agent)
  112. local_rank (int): local rank of the worker
  113. global_rank (int): global rank of the worker
  114. role_rank (int): rank of the worker across all workers that have the same role
  115. world_size (int): number of workers (globally)
  116. role_world_size (int): number of workers that have the same role
  117. """
  118. __slots__ = [
  119. "id",
  120. "local_rank",
  121. "global_rank",
  122. "role_rank",
  123. "world_size",
  124. "role_world_size",
  125. ]
  126. def __init__(
  127. self,
  128. local_rank: int,
  129. global_rank: int = -1,
  130. role_rank: int = -1,
  131. world_size: int = -1,
  132. role_world_size: int = -1,
  133. ):
  134. # unique identifier for this worker
  135. self.id: Any = None
  136. # rank of the worker among workers with the same role being monitored
  137. # by the same ``agent`` instance.
  138. self.local_rank: int = local_rank
  139. # rank of the worker among all the workers across all roles
  140. # across all ``agent`` instances.
  141. # Global rank is not stable between re-rendezvous.
  142. self.global_rank: int = global_rank
  143. # rank of the worker among all the workers with the same role
  144. # across all ``agent`` instances.
  145. # Role rank is not stable between re-rendezvous.
  146. self.role_rank: int = role_rank
  147. # total number of workers (globally). Due to elasticity
  148. # the world size may change between re-rendezvous.
  149. self.world_size: int = world_size
  150. # total number of workers that share the same role. Due to elasticity
  151. # the role world size may change between re-rendezvous.
  152. self.role_world_size: int = role_world_size
  153. def __str__(self):
  154. return (
  155. f"local_rank={self.local_rank},global_rank={self.global_rank}"
  156. f",role_rank={self.role_rank},world_size={self.world_size}"
  157. f",role_world_size={self.role_world_size}"
  158. )
  159. def __repr__(self):
  160. return str(self)
  161. class WorkerState(str, Enum):
  162. """A state of the ``WorkerGroup``.
  163. Workers in a worker group change state as a unit. If a single worker
  164. in a worker group fails the entire set is considered failed::
  165. UNKNOWN - agent lost track of worker group state, unrecoverable
  166. INIT - worker group object created not yet started
  167. HEALTHY - workers running and healthy
  168. UNHEALTHY - workers running and unhealthy
  169. STOPPED - workers stopped (interrupted) by the agent
  170. SUCCEEDED - workers finished running (exit 0)
  171. FAILED - workers failed to successfully finish (exit !0)
  172. A worker group starts from an initial ``INIT`` state,
  173. then progresses to ``HEALTHY`` or ``UNHEALTHY`` states,
  174. and finally reaches a terminal ``SUCCEEDED`` or ``FAILED`` state.
  175. Worker groups can be interrupted and temporarily put into ``STOPPED`` state
  176. by the agent. Workers in ``STOPPED`` state are scheduled to be restarted
  177. in the near future by the agent. Some examples of workers being put into
  178. ``STOPPED`` state are:
  179. 1. Worker group failure|unhealthy observed
  180. 2. Membership change detected
  181. When actions (start, stop, rdzv, retry, etc) on worker group fails
  182. and results in the action being partially applied to the worker group
  183. the state will be ``UNKNOWN``. Typically this happens on uncaught/unhandled
  184. exceptions during state change events on the agent. The agent is not
  185. expected to recover worker groups in ``UNKNOWN`` state and is better off
  186. self terminating and allowing the job manager to retry the node.
  187. """
  188. UNKNOWN = "UNKNOWN"
  189. INIT = "INIT"
  190. HEALTHY = "HEALTHY"
  191. UNHEALTHY = "UNHEALTHY"
  192. STOPPED = "STOPPED"
  193. SUCCEEDED = "SUCCEEDED"
  194. FAILED = "FAILED"
  195. @staticmethod
  196. def is_running(state: "WorkerState") -> bool:
  197. """Return the state of the Worker.
  198. Returns:
  199. True if the worker state represents workers still running
  200. (e.g. that the process exists but not necessarily healthy).
  201. """
  202. return state in {WorkerState.HEALTHY, WorkerState.UNHEALTHY}
  203. class WorkerGroup:
  204. """A set of ``Worker`` instances.
  205. The class defines a set of ``Worker`` instances for the given ``WorkerSpec`` managed by ``ElasticAgent``. Whether the worker
  206. group contains cross instance workers or not depends on the implementation of the agent.
  207. """
  208. __slots__ = ["spec", "workers", "store", "group_rank", "group_world_size", "state", "master_addr", "master_port"]
  209. def __init__(self, spec: WorkerSpec):
  210. self.spec = spec
  211. self.workers = [Worker(local_rank=i) for i in range(self.spec.local_world_size)]
  212. # assigned after rdzv
  213. self.store = None
  214. self.group_rank = None
  215. self.group_world_size = None
  216. self.master_addr = None
  217. self.master_port = None
  218. self.state = WorkerState.INIT
  219. class _RoleInstanceInfo:
  220. """The class is used by the agent to exchange the information with other agents.
  221. The information is used to determine the rank of the workers that agent
  222. manages in heterogeneous environments, where different agents can have
  223. different number of workers.
  224. """
  225. __slots__ = ["role", "rank", "local_world_size"]
  226. def __init__(self, role: str, rank: int, local_world_size: int):
  227. r"""Initialize the agent class instance.
  228. Args:
  229. role (str): user-defined role for the workers with this spec
  230. rank (int): the rank of the agent
  231. local_world_size (int): number of local workers to run
  232. """
  233. self.role = role
  234. self.rank = rank
  235. self.local_world_size = local_world_size
  236. def serialize(self) -> bytes:
  237. dict_data = {
  238. "role": self.role,
  239. "rank": self.rank,
  240. "local_world_size": self.local_world_size,
  241. }
  242. return json.dumps(dict_data).encode(encoding="UTF-8")
  243. @staticmethod
  244. def deserialize(data: bytes):
  245. dict_data = json.loads(data.decode(encoding="UTF-8"))
  246. return _RoleInstanceInfo(
  247. dict_data["role"], dict_data["rank"], dict_data["local_world_size"]
  248. )
  249. @staticmethod
  250. def compare(obj1, obj2) -> int:
  251. if obj1.role == obj2.role:
  252. return obj1.rank - obj2.rank
  253. elif obj1.role > obj2.role:
  254. return 1
  255. else:
  256. return -1
  257. @staticmethod
  258. def find_role_boundaries(roles_infos: List, role: str) -> Tuple[int, int]:
  259. start_idx, end_idx = -1, -1
  260. for idx, role_info in enumerate(roles_infos):
  261. if role_info.role == role:
  262. if start_idx == -1:
  263. start_idx = idx
  264. end_idx = idx
  265. return (start_idx, end_idx)
  266. @dataclass
  267. class RunResult:
  268. """Return results of the worker executions.
  269. Run results follow an "all-or-nothing" policy where the run is successful if and
  270. only if ALL local workers managed by this agent complete successfully.
  271. If the result is successful (e.g. ``is_failed() = False``) then the ``return_values``
  272. field contains the outputs (return values) of the workers managed by THIS agent mapped
  273. by their GLOBAL ranks. That is ``result.return_values[0]`` is the return value of
  274. global rank 0.
  275. .. note:: ``return_values`` are only meaningful for when the worker entrypoint
  276. is a function. Workers specified as a binary entrypoint do not canonically
  277. have a return value and the ``return_values`` field is meaningless and
  278. may be empty.
  279. If ``is_failed()`` returns ``True`` then the ``failures`` field contains the
  280. failure information, again, mapped by the GLOBAL rank of the worker that failed.
  281. The keys in ``return_values`` and ``failures`` are mutually exclusive, that is,
  282. a worker's final state can only be one of: succeeded, failed. Workers intentionally
  283. terminated by the agent according to the agent's restart policy, are not represented
  284. in either ``return_values`` nor ``failures``.
  285. """
  286. state: WorkerState
  287. return_values: Dict[int, Any] = field(default_factory=dict)
  288. failures: Dict[int, ProcessFailure] = field(default_factory=dict)
  289. def is_failed(self) -> bool:
  290. return self.state == WorkerState.FAILED
  291. def _get_fq_hostname() -> str:
  292. return socket.getfqdn(socket.gethostname())
  293. class ElasticAgent(abc.ABC):
  294. """An agent process responsible for managing one or more worker processes.
  295. The worker processes are assumed to be regular distributed PyTorch scripts.
  296. When the worker process is created by the agent, the agent provides the
  297. necessary information for the worker processes to properly initialize
  298. a torch process group.
  299. The exact deployment topology and ratio of agent-to-worker is dependent
  300. on the specific implementation of the agent and the user's job placement
  301. preferences. For instance, to run a distributed training job on GPU with
  302. 8 trainers (one per GPU) one can:
  303. 1. Use 8 x single GPU instances, place an agent per instance, managing
  304. 1 worker per agent.
  305. 2. Use 4 x double GPU instances, place an agent per instance, managing
  306. 2 workers per agent.
  307. 3. Use 2 x quad GPU instances, place an agent per instance, managing
  308. 4 workers per agent.
  309. 4. Use 1 x 8 GPU instance, place an agent per instance, managing
  310. 8 workers per agent.
  311. Usage
  312. ::
  313. group_result = agent.run()
  314. if group_result.is_failed():
  315. # workers failed
  316. failure = group_result.failures[0]
  317. logger.exception("worker 0 failed with exit code : %s", failure.exit_code)
  318. else:
  319. return group_result.return_values[0] # return rank 0's results
  320. """
  321. @abc.abstractmethod
  322. def run(self, role: str = DEFAULT_ROLE) -> RunResult:
  323. """Run the agent.
  324. Supports retrying the worker group on failures up to ``max_restarts``.
  325. Returns:
  326. The result of the execution, containing the return values or
  327. failure details for each worker mapped by the worker's global rank.
  328. Raises:
  329. Exception - any other failures NOT related to worker process
  330. """
  331. raise NotImplementedError
  332. @abc.abstractmethod
  333. def get_worker_group(self, role: str = DEFAULT_ROLE) -> WorkerGroup:
  334. """Return the ``WorkerGroup`` for the given ``role``.
  335. Note that the worker group is a mutable object and hence in a
  336. multi-threaded/process environment it may change state.
  337. Implementors are encouraged (but not required) to return
  338. a defensive read-only copy.
  339. """
  340. raise NotImplementedError
  341. class SimpleElasticAgent(ElasticAgent):
  342. """An ``ElasticAgent`` that manages one particular type of worker role.
  343. An ``ElasticAgent`` that manages workers (``WorkerGroup``) for a single ``WorkerSpec``
  344. such as one particular type of worker role.
  345. """
  346. def __init__(self, spec: WorkerSpec, exit_barrier_timeout: float = 300):
  347. self._worker_group = WorkerGroup(spec)
  348. self._remaining_restarts = self._worker_group.spec.max_restarts
  349. self._store = None
  350. self._exit_barrier_timeout = exit_barrier_timeout
  351. self._total_execution_time = 0
  352. def get_worker_group(self, role: str = DEFAULT_ROLE) -> WorkerGroup:
  353. return self._worker_group
  354. @abc.abstractmethod
  355. def _start_workers(self, worker_group: WorkerGroup) -> Dict[int, Any]:
  356. r"""Start ``worker_group.spec.local_world_size`` number of workers.
  357. This is according to worker spec for the worker group .
  358. Returns a map of ``local_rank`` to worker ``id``.
  359. """
  360. raise NotImplementedError
  361. @abc.abstractmethod
  362. def _stop_workers(self, worker_group: WorkerGroup, is_restart: bool = False) -> None:
  363. r"""Stop all workers in the given worker group.
  364. Implementors must deal with workers in all states defined by
  365. ``WorkerState``. That is, it must gracefully handle stopping
  366. non-existent workers, unhealthy (stuck) workers, etc.
  367. """
  368. raise NotImplementedError
  369. @abc.abstractmethod
  370. def _monitor_workers(self, worker_group: WorkerGroup) -> RunResult:
  371. r"""Check on the workers for the ``worker_group``.
  372. This function also returns the new state of the worker group.
  373. """
  374. raise NotImplementedError
  375. @abc.abstractmethod
  376. def _shutdown(self, death_sig: signal.Signals = signal.SIGTERM, is_restart: bool = False) -> None:
  377. """Clean up any resources that were allocated during the agent's work.
  378. Args:
  379. death_sig: Signal to send to the child process, SIGTERM is default
  380. """
  381. raise NotImplementedError
  382. @prof
  383. def _rendezvous(self, worker_group: WorkerGroup) -> None:
  384. r"""Run rendezvous for the workers specified by the worker spec.
  385. Assigns workers a new global rank and world size.
  386. Updates the rendezvous store for the worker group.
  387. """
  388. spec = worker_group.spec
  389. with self.record_duration("RENDEZVOUS"):
  390. rdzv_info = spec.rdzv_handler.next_rendezvous()
  391. store = rdzv_info.store
  392. group_rank = rdzv_info.rank
  393. group_world_size = rdzv_info.world_size
  394. # master_addr/master_port could be explicitly overriden
  395. # TODO: BC - specific to static rdzv and can be simplifed further
  396. master_addr = spec.master_addr or rdzv_info.bootstrap_store_info.master_addr
  397. master_port = spec.master_port or rdzv_info.bootstrap_store_info.master_port
  398. self._store = store
  399. with self.record_duration("ASSIGN_WORKER_RANKS"):
  400. workers = self._assign_worker_ranks(store, group_rank, group_world_size, spec)
  401. worker_group.workers = workers
  402. worker_group.store = store
  403. worker_group.group_rank = group_rank
  404. worker_group.group_world_size = group_world_size
  405. worker_group.master_addr = master_addr
  406. worker_group.master_port = master_port
  407. restart_count = spec.max_restarts - self._remaining_restarts
  408. logger.info(
  409. "[%(role)s] Rendezvous complete for workers. Result:\n"
  410. " restart_count=%(restart_count)s\n"
  411. " master_addr=%(master_addr)s\n"
  412. " master_port=%(master_port)s\n"
  413. " group_rank=%(group_rank)s\n"
  414. " group_world_size=%(group_world_size)s\n"
  415. " local_ranks=%(local_ranks)s\n"
  416. " role_ranks=%(role_ranks)s\n"
  417. " global_ranks=%(global_ranks)s\n"
  418. " role_world_sizes=%(role_world_sizes)s\n"
  419. " global_world_sizes=%(global_world_sizes)s\n",
  420. {
  421. "role": spec.role,
  422. "restart_count": restart_count,
  423. "master_addr": master_addr,
  424. "master_port": master_port,
  425. "group_rank": group_rank,
  426. "group_world_size": group_world_size,
  427. "local_ranks": [worker.local_rank for worker in workers],
  428. "role_ranks": [worker.role_rank for worker in workers],
  429. "global_ranks": [worker.global_rank for worker in workers],
  430. "role_world_sizes": [worker.role_world_size for worker in workers],
  431. "global_world_sizes": [worker.world_size for worker in workers]
  432. }
  433. )
  434. # pyre-fixme[56]: Pyre was not able to infer the type of the decorator
  435. # `torch.distributed.elastic.metrics.prof`.
  436. @prof
  437. def _assign_worker_ranks(
  438. self, store, group_rank: int, group_world_size: int, spec: WorkerSpec
  439. ) -> List[Worker]:
  440. """Determine proper ranks for worker processes.
  441. The rank assignment is done according to the following algorithm:
  442. 1. Each agent writes its configuration(group_rank, group_world_size
  443. , num_workers) to the common store.
  444. 2. The rank 0 agent reads all the role_info from the store and
  445. determines each agents worker ranks.
  446. 3. Determine the global rank: the global rank of the workers is computed
  447. by cumulative sum of the local_world_size for all workers in front of it.
  448. For efficiency reasons each worker is assigned a base global rank
  449. such that it's workers are in the range [base_global_rank,
  450. base_global_rank + local_world_size).
  451. 4. Determine the role rank: The role rank is determined using the algorithms
  452. in the point 3 with the exception that the ranks are calculated with
  453. respect to the role name.
  454. 5. The rank 0 agent writes the assigned ranks to the store.
  455. 6. Each agent reads the assigned ranks from the store.
  456. Time complexity: each worker O(1), rank0 O(n), overall O(n)
  457. """
  458. ROLE_INFO_PREFIX = "torchelastic/role_info/"
  459. ASSIGNED_RANKS_PREFIX = "torchelastic/assigned_ranks/"
  460. agent_role_info = _RoleInstanceInfo(
  461. spec.role, group_rank, spec.local_world_size
  462. )
  463. store.set(f"{ROLE_INFO_PREFIX}{group_rank}", agent_role_info.serialize())
  464. # tcp store is collocated with rank 0 so we can use it to do extra compute to reduce overall # of operations.
  465. if group_rank == 0:
  466. role_infos_bytes = store.multi_get(
  467. [f"torchelastic/role_info/{i}" for i in range(group_world_size)]
  468. )
  469. role_infos = [
  470. _RoleInstanceInfo.deserialize(info_bytes)
  471. for info_bytes in role_infos_bytes
  472. ]
  473. role_sizes = defaultdict(lambda: 0)
  474. global_size = 0
  475. for role_info in role_infos:
  476. role_sizes[role_info.role] += role_info.local_world_size
  477. global_size += role_info.local_world_size
  478. base_global_rank = 0
  479. role_ranks = defaultdict(lambda: 0)
  480. keys = []
  481. values = []
  482. for i, role_info in enumerate(role_infos):
  483. keys.append(f"{ASSIGNED_RANKS_PREFIX}{i}")
  484. values.append(
  485. json.dumps(
  486. [
  487. base_global_rank,
  488. global_size,
  489. role_ranks[role_info.role],
  490. role_sizes[role_info.role],
  491. ]
  492. )
  493. )
  494. base_global_rank += role_info.local_world_size
  495. role_ranks[role_info.role] += role_info.local_world_size
  496. store.multi_set(keys, values)
  497. # get will block until the data is available in the store.
  498. base_global_rank, global_world_size, base_role_rank, role_world_size = json.loads(
  499. store.get(f"{ASSIGNED_RANKS_PREFIX}{group_rank}")
  500. )
  501. workers = []
  502. for local_rank in range(spec.local_world_size):
  503. worker = Worker(
  504. local_rank=local_rank,
  505. global_rank=base_global_rank + local_rank,
  506. role_rank=base_role_rank + local_rank,
  507. world_size=global_world_size,
  508. role_world_size=role_world_size,
  509. )
  510. workers.append(worker)
  511. return workers
  512. # pyre-fixme[56]: Pyre was not able to infer the type of the decorator
  513. # `torch.distributed.elastic.metrics.prof`.
  514. @prof
  515. def _initialize_workers(self, worker_group: WorkerGroup) -> None:
  516. r"""Start a fresh set of workers for the worker_group.
  517. Essentially, a rendezvous followed by a ``start_workers``.
  518. The caller should first call ``_stop_workers()`` to stop running workers
  519. prior to calling this method.
  520. Optimistically sets the state of the worker group that
  521. just started as ``HEALTHY`` and delegates the actual monitoring
  522. of state to ``_monitor_workers()`` method
  523. """
  524. role = worker_group.spec.role
  525. logger.info("[%s] Rendezvous'ing worker group", role)
  526. # TODO after stopping workers, wait at least monitor_interval*2 for
  527. # workers on different nodes to fail on a collective op before waiting
  528. # on the rdzv barrier, this way we ensure that nodes enter rdzv
  529. # at around the same time and reduce false positive rdzv timeout errors
  530. self._rendezvous(worker_group)
  531. logger.info("[%s] Starting worker group", role)
  532. worker_ids = self._start_workers(worker_group)
  533. for local_rank, w_id in worker_ids.items():
  534. worker = worker_group.workers[local_rank]
  535. worker.id = w_id
  536. worker_group.state = WorkerState.HEALTHY
  537. # pyre-fixme[56]: Pyre was not able to infer the type of the decorator
  538. # `torch.distributed.elastic.metrics.prof`.
  539. @prof
  540. def _restart_workers(self, worker_group: WorkerGroup) -> None:
  541. """Restart (stops, rendezvous, starts) all local workers in the group."""
  542. role = worker_group.spec.role
  543. logger.info("[%s] Stopping worker group", role)
  544. self._stop_workers(worker_group, is_restart=True)
  545. worker_group.state = WorkerState.STOPPED
  546. self._initialize_workers(worker_group)
  547. # pyre-fixme[56]: Pyre was not able to infer the type of the decorator
  548. # `torch.distributed.elastic.metrics.prof`.
  549. @prof
  550. def run(self, role: str = DEFAULT_ROLE) -> RunResult:
  551. start_time = time.monotonic()
  552. shutdown_called: bool = False
  553. try:
  554. result = self._invoke_run(role)
  555. self._total_execution_time = int(time.monotonic() - start_time)
  556. self._record_metrics(result)
  557. self._record_worker_events(result)
  558. return result
  559. except RendezvousGracefulExitError as e:
  560. logger.info("Rendezvous gracefully exited: %s", e)
  561. except SignalException as e:
  562. logger.warning("Received %s death signal, shutting down workers", e.sigval)
  563. self._shutdown(e.sigval)
  564. shutdown_called = True
  565. raise
  566. finally:
  567. if not shutdown_called:
  568. self._shutdown()
  569. # record the execution time in case there were any exceptions during run.
  570. self._total_execution_time = int(time.monotonic() - start_time)
  571. def get_event_failed(self) -> Event:
  572. return self._construct_event(
  573. state="FAILED",
  574. source=EventSource.AGENT,
  575. raw_error=traceback.format_exc(),
  576. )
  577. def get_event_succeeded(self) -> Event:
  578. return self._construct_event(
  579. state="SUCCEEDED",
  580. source=EventSource.AGENT,
  581. )
  582. def _record_worker_events(self, result: RunResult) -> None:
  583. for worker in self._worker_group.workers:
  584. failure = result.failures.get(worker.global_rank)
  585. state: str = self._get_worker_state(worker, result)
  586. raw_error = json.dumps(failure.error_file_data) if failure else None
  587. record(self._construct_event(state, EventSource.WORKER, worker, raw_error))
  588. def _get_worker_state(self, worker: Worker, result: RunResult) -> str:
  589. failure = result.failures.get(worker.global_rank)
  590. if result.state in {WorkerState.UNHEALTHY, WorkerState.FAILED} and not failure:
  591. # The worker got terminated by the torchelastic agent via SIGTERM signal
  592. return "TERMINATED"
  593. elif failure or worker.global_rank in result.return_values:
  594. return result.state.value
  595. else:
  596. raise ValueError(f"Unknown worker: {worker.global_rank}")
  597. @contextmanager
  598. def record_duration(self, state: str):
  599. start_time = time.perf_counter()
  600. try:
  601. yield
  602. finally:
  603. end_time = time.perf_counter()
  604. duration_ms = (end_time - start_time) * 1000
  605. record(self._construct_event(state=state, source=EventSource.AGENT, duration_ms=duration_ms))
  606. def _construct_event(
  607. self,
  608. state: str,
  609. source: EventSource,
  610. worker: Optional[Worker] = None,
  611. raw_error: Optional[str] = None,
  612. duration_ms: Optional[float] = None,
  613. ) -> Event:
  614. wg = self._worker_group
  615. spec = wg.spec
  616. md = {
  617. "group_world_size": wg.group_world_size,
  618. "entry_point": spec.get_entrypoint_name(),
  619. }
  620. if worker:
  621. md["local_rank"] = (worker.local_rank,)
  622. md["role_rank"] = (worker.role_rank,)
  623. md["role_world_size"] = (worker.role_world_size,)
  624. global_rank = worker.global_rank
  625. worker_id = str(worker.id)
  626. else:
  627. global_rank = None
  628. worker_id = None
  629. md_str = json.dumps(md)
  630. metadata = {
  631. "run_id": spec.rdzv_handler.get_run_id(),
  632. "global_rank": global_rank,
  633. "group_rank": wg.group_rank,
  634. "worker_id": worker_id,
  635. "role": spec.role,
  636. "hostname": _get_fq_hostname(),
  637. "state": state,
  638. "total_run_time": self._total_execution_time,
  639. "rdzv_backend": spec.rdzv_handler.get_backend(),
  640. "raw_error": raw_error,
  641. "metadata": md_str,
  642. "agent_restarts": spec.max_restarts - self._remaining_restarts,
  643. "duration_ms": duration_ms,
  644. }
  645. return Event(
  646. f"torchelastic.worker.status.{state}", source=source, metadata=metadata
  647. )
  648. def _record_metrics(self, group_results: RunResult):
  649. is_failed = group_results.is_failed()
  650. self._record_flakiness_metric(is_failed)
  651. spec = self._worker_group.spec
  652. restarts_happened = self._remaining_restarts != spec.max_restarts
  653. put_metric(f"workers.{spec.role}.run_total", 1)
  654. self._record_metric_with_condition(
  655. "run_success_with_retries", not is_failed and restarts_happened
  656. )
  657. self._record_metric_with_condition(
  658. "run_success_no_retries", not is_failed and not restarts_happened
  659. )
  660. self._record_metric_with_condition(
  661. "run_failed_with_retries", is_failed and restarts_happened
  662. )
  663. self._record_metric_with_condition(
  664. "run_failed_no_retries", is_failed and not restarts_happened
  665. )
  666. def _record_metric_with_condition(self, metric_name, condition):
  667. spec = self._worker_group.spec
  668. if condition:
  669. put_metric(f"workers.{spec.role}.{metric_name}", 1)
  670. else:
  671. put_metric(f"workers.{spec.role}.{metric_name}", 0)
  672. def _record_flakiness_metric(self, is_failed: bool = False):
  673. if is_failed:
  674. flakiness = 100.0
  675. else:
  676. spec = self._worker_group.spec
  677. flakiness = 100.0 - 100.0 * (self._remaining_restarts + 1) / (
  678. spec.max_restarts + 1
  679. )
  680. spec = self._worker_group.spec
  681. put_metric(f"workers.{spec.role}.flakiness", int(flakiness))
  682. def _invoke_run(self, role: str = DEFAULT_ROLE) -> RunResult:
  683. # NOTE: currently only works for a single role
  684. spec = self._worker_group.spec
  685. role = spec.role
  686. logger.info(
  687. "[%s] starting workers for entrypoint: %s", role, spec.get_entrypoint_name()
  688. )
  689. self._initialize_workers(self._worker_group)
  690. monitor_interval = spec.monitor_interval
  691. rdzv_handler = spec.rdzv_handler
  692. while True:
  693. assert self._worker_group.state != WorkerState.INIT
  694. time.sleep(monitor_interval)
  695. run_result = self._monitor_workers(self._worker_group)
  696. state = run_result.state
  697. self._worker_group.state = state
  698. put_metric(f"workers.{role}.remaining_restarts", self._remaining_restarts)
  699. put_metric(f"workers.{role}.{state.name.lower()}", 1)
  700. if state == WorkerState.SUCCEEDED:
  701. logger.info(
  702. "[%s] worker group successfully finished."
  703. " Waiting %s seconds for other agents to finish.",
  704. role, self._exit_barrier_timeout
  705. )
  706. self._exit_barrier()
  707. return run_result
  708. elif state in {WorkerState.UNHEALTHY, WorkerState.FAILED}:
  709. if self._remaining_restarts > 0:
  710. logger.info(
  711. "[%s] Worker group %s. "
  712. "%s/%s attempts left;"
  713. " will restart worker group",
  714. role, state.name, self._remaining_restarts, spec.max_restarts
  715. )
  716. self._remaining_restarts -= 1
  717. self._restart_workers(self._worker_group)
  718. else:
  719. self._stop_workers(self._worker_group)
  720. self._worker_group.state = WorkerState.FAILED
  721. return run_result
  722. elif state == WorkerState.HEALTHY:
  723. # membership changes do not count as retries
  724. num_nodes_waiting = rdzv_handler.num_nodes_waiting()
  725. group_rank = self._worker_group.group_rank
  726. if num_nodes_waiting > 0:
  727. logger.info(
  728. "[%s] Detected %s "
  729. "new nodes from group_rank=%s; "
  730. "will restart worker group",
  731. role, num_nodes_waiting, group_rank
  732. )
  733. self._restart_workers(self._worker_group)
  734. else:
  735. raise Exception(f"[{role}] Worker group in {state.name} state") # noqa: TRY002
  736. def _exit_barrier(self):
  737. """
  738. Define a barrier that keeps the agent process alive until all workers finish.
  739. Wait for ``exit_barrier_timeout`` seconds for all agents to finish
  740. executing their local workers (either successfully or not). This
  741. acts as a safety guard against user scripts that terminate at different
  742. times.
  743. """
  744. logger.info(
  745. "Local worker group finished (%s). "
  746. "Waiting %s seconds for other agents to finish",
  747. self._worker_group.state, self._exit_barrier_timeout
  748. )
  749. start = time.time()
  750. try:
  751. store_util.barrier(
  752. store=self._store,
  753. world_size=self._worker_group.group_world_size,
  754. key_prefix=_TERMINAL_STATE_SYNC_ID,
  755. barrier_timeout=self._exit_barrier_timeout,
  756. )
  757. logger.info(
  758. "Done waiting for other agents. Elapsed: %s seconds", time.time() - start
  759. )
  760. except SignalException as e:
  761. logger.warning("Got termination signal: %s", e.sigval)
  762. raise
  763. except Exception:
  764. logger.exception(
  765. "Error waiting on exit barrier. Elapsed: %s seconds",
  766. time.time() - start
  767. )