training_args_sm.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. # Copyright 2021 The HuggingFace Team. All rights reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import importlib.util
  15. import json
  16. import os
  17. import warnings
  18. from dataclasses import dataclass, field
  19. import torch
  20. from ..training_args import TrainingArguments
  21. from ..utils import cached_property, is_sagemaker_dp_enabled, logging
  22. logger = logging.get_logger(__name__)
  23. # TODO: should be moved to `utils` after refactoring of SageMakerTrainer
  24. def is_sagemaker_model_parallel_available():
  25. # Get the sagemaker specific mp parameters from smp_options variable.
  26. smp_options = os.getenv("SM_HP_MP_PARAMETERS", "{}")
  27. try:
  28. # Parse it and check the field "partitions" is included, it is required for model parallel.
  29. smp_options = json.loads(smp_options)
  30. if "partitions" not in smp_options:
  31. return False
  32. except json.JSONDecodeError:
  33. return False
  34. # Get the sagemaker specific framework parameters from mpi_options variable.
  35. mpi_options = os.getenv("SM_FRAMEWORK_PARAMS", "{}")
  36. try:
  37. # Parse it and check the field "sagemaker_distributed_dataparallel_enabled".
  38. mpi_options = json.loads(mpi_options)
  39. if not mpi_options.get("sagemaker_mpi_enabled", False):
  40. return False
  41. except json.JSONDecodeError:
  42. return False
  43. # Lastly, check if the `smdistributed` module is present.
  44. return importlib.util.find_spec("smdistributed") is not None
  45. if is_sagemaker_model_parallel_available():
  46. import smdistributed.modelparallel.torch as smp
  47. smp.init()
  48. @dataclass
  49. class SageMakerTrainingArguments(TrainingArguments):
  50. mp_parameters: str = field(
  51. default="",
  52. metadata={"help": "Used by the SageMaker launcher to send mp-specific args. Ignored in SageMakerTrainer"},
  53. )
  54. def __post_init__(self):
  55. super().__post_init__()
  56. warnings.warn(
  57. "`SageMakerTrainingArguments` is deprecated and will be removed in v5 of Transformers. You can use "
  58. "`TrainingArguments` instead.",
  59. FutureWarning,
  60. )
  61. @cached_property
  62. def _setup_devices(self) -> "torch.device":
  63. logger.info("PyTorch: setting up devices")
  64. if torch.distributed.is_available() and torch.distributed.is_initialized() and self.local_rank == -1:
  65. logger.warning(
  66. "torch.distributed process group is initialized, but local_rank == -1. "
  67. "In order to use Torch DDP, launch your script with `python -m torch.distributed.launch"
  68. )
  69. if self.no_cuda:
  70. device = torch.device("cpu")
  71. self._n_gpu = 0
  72. elif is_sagemaker_model_parallel_available():
  73. local_rank = smp.local_rank()
  74. device = torch.device("cuda", local_rank)
  75. self._n_gpu = 1
  76. elif is_sagemaker_dp_enabled():
  77. import smdistributed.dataparallel.torch.torch_smddp # noqa: F401
  78. torch.distributed.init_process_group(backend="smddp", timeout=self.ddp_timeout_delta)
  79. self.local_rank = int(os.getenv("SMDATAPARALLEL_LOCAL_RANK"))
  80. device = torch.device("cuda", self.local_rank)
  81. self._n_gpu = 1
  82. elif self.local_rank == -1:
  83. # if n_gpu is > 1 we'll use nn.DataParallel.
  84. # If you only want to use a specific subset of GPUs use `CUDA_VISIBLE_DEVICES=0`
  85. # Explicitly set CUDA to the first (index 0) CUDA device, otherwise `set_device` will
  86. # trigger an error that a device index is missing. Index 0 takes into account the
  87. # GPUs available in the environment, so `CUDA_VISIBLE_DEVICES=1,2` with `cuda:0`
  88. # will use the first GPU in that env, i.e. GPU#1
  89. device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
  90. # Sometimes the line in the postinit has not been run before we end up here, so just checking we're not at
  91. # the default value.
  92. self._n_gpu = torch.cuda.device_count()
  93. else:
  94. # Here, we'll use torch.distributed.
  95. # Initializes the distributed backend which will take care of synchronizing nodes/GPUs
  96. if not torch.distributed.is_initialized():
  97. torch.distributed.init_process_group(backend="nccl", timeout=self.ddp_timeout_delta)
  98. device = torch.device("cuda", self.local_rank)
  99. self._n_gpu = 1
  100. if device.type == "cuda":
  101. torch.cuda.set_device(device)
  102. return device
  103. @property
  104. def world_size(self):
  105. if is_sagemaker_model_parallel_available():
  106. return smp.dp_size()
  107. return super().world_size
  108. @property
  109. def place_model_on_device(self):
  110. return not is_sagemaker_model_parallel_available()
  111. @property
  112. def _no_sync_in_gradient_accumulation(self):
  113. return False