| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403 |
- # mypy: allow-untyped-defs
- import contextlib
- import functools
- import gc
- import warnings
- from dataclasses import asdict, dataclass, field
- from itertools import chain
- from typing import (
- Any,
- Callable,
- cast,
- Dict,
- Generator,
- Iterable,
- List,
- no_type_check,
- Optional,
- Set,
- Tuple,
- Union,
- )
- import torch
- import torch.distributed as dist
- import torch.nn as nn
- from torch.distributed._shard.sharded_tensor import ShardedTensor
- from torch.distributed._state_dict_utils import (
- _broadcast_state_dict,
- _flatten_state_dict,
- _gather_state_dict,
- _offload_state_dict_to_cpu,
- _unflatten_state_dict,
- )
- from torch.distributed._tensor import DTensor
- from torch.distributed.algorithms._checkpoint.checkpoint_wrapper import (
- _CHECKPOINT_PREFIX,
- )
- from torch.distributed.fsdp import (
- FullOptimStateDictConfig,
- FullStateDictConfig,
- FullyShardedDataParallel as FSDP,
- OptimStateDictConfig,
- ShardedOptimStateDictConfig,
- ShardedStateDictConfig,
- StateDictConfig,
- StateDictType,
- )
- from torch.distributed.fsdp._common_utils import (
- _get_module_fsdp_state_if_fully_sharded_module,
- FSDP_WRAPPED_MODULE,
- )
- from torch.nn.modules.module import _IncompatibleKeys
- from torch.nn.parallel import DistributedDataParallel as DDP
- from torch.utils._pytree import tree_map_only
- __all__ = [
- "FQNS_T",
- "PrimitiveType",
- "ValueType",
- "DictValueType",
- "ListDictValueType",
- "OptimizerStateType",
- "StateDictOptions",
- "get_model_state_dict",
- "get_optimizer_state_dict",
- "get_state_dict",
- "set_model_state_dict",
- "set_optimizer_state_dict",
- "set_state_dict",
- ]
- _FLAT_PARAM = "_flat_param"
- _PG = "param_groups"
- _PARAMS = "params"
- _STATE = "state"
- FQNS_T = Set[str]
- PrimitiveType = Union[DTensor, ShardedTensor, torch.Tensor, int, float, str]
- ValueType = Union[
- PrimitiveType, List[PrimitiveType], Tuple[PrimitiveType], Dict[str, "ValueType"]
- ]
- DictValueType = Dict[str, ValueType]
- ListDictValueType = List[DictValueType]
- OptimizerStateType = Dict[str, Union[DictValueType, ListDictValueType]]
- _patched_state_dict: Set[Callable] = set()
- @contextlib.contextmanager
- def _gc_context():
- is_enabled = gc.isenabled()
- gc.disable()
- try:
- yield
- finally:
- if is_enabled:
- gc.enable()
- @dataclass
- class StateDictOptions:
- """
- This dataclass specifies how get_state_dict/set_state_dict will work.
- - ``full_state_dict``: if this is set to True, all the tensors in the
- returned state_dict will be gathered. No ShardedTensor and DTensor
- will be in the returned state_dict.
- - ``cpu_offload``: offload all the tensors to cpu. To prevent CPU OOM, if
- ``full_state_dict`` is also true, then only the rank0 will get the
- state_dict and all other ranks will get empty state_dict.
- - ``ignore_frozen_params``: if the value is True, the returned state_dict
- won't contain any frozen parameters -- the ``requires_grad`` is False.
- The default value is False.
- - ``keep_submodule_prefixes`` (deprecated): when ``submodules`` is not None, this option
- indicates whether to keep the submodule prefixes from the state_dict keys.
- or example, if the submodule is ``module.pretrain`` and the full FQN of
- the parameter is ``pretrain.layer1.weight`` of the param. When this option
- is True, the parameter's key in the returned state_dict will be
- ``pretrain.layer1.weight``. If the options is False, the key will be
- ``layer1.weight``.
- Note that if ``keep_submodule_prefixes`` is False, there may be conflicted
- FQNs, hence there should be only one submodule in ``submodules``.
- - ``strict``: the ``strict`` option when ``set_state_dict`` calls
- model.load_state_dict().
- - ``broadcast_from_rank0``: when the option is True, rank0 should receive a
- full state_dict and will broadcast the tensors in the state_dict/
- optim_state_dict one by one to other ranks. Other ranks will receive
- the tensors and shard according to the local shards in the model and
- optimizer. ``full_state_dict`` must be set to True when using this option.
- This option currently only supports DTensor, not the legacy ShardedTensor.
- """
- full_state_dict: bool = False
- cpu_offload: bool = False
- ignore_frozen_params: bool = False
- keep_submodule_prefixes: bool = True
- strict: bool = True
- broadcast_from_rank0: bool = False
- flatten_optimizer_state_dict: bool = False
- @dataclass
- class _StateDictInfo(StateDictOptions):
- fqn_param_mapping: Dict[
- Union[str, torch.Tensor], Union[FQNS_T, torch.Tensor]
- ] = field(default_factory=dict)
- shared_params_mapping: Dict[
- Union[str, torch.Tensor], Union[FQNS_T, torch.Tensor]
- ] = field(default_factory=dict)
- submodule_prefixes: Set[str] = field(default_factory=set)
- handle_model: bool = True
- handle_optim: bool = True
- fsdp_context: Callable = contextlib.nullcontext
- fsdp_modules: List[nn.Module] = field(default_factory=list)
- @functools.lru_cache(maxsize=None)
- def _get_fqns(
- model: nn.Module,
- name: str,
- skip_ddp_prefix: bool = True,
- skip_compiler_prefix: bool = True,
- ) -> FQNS_T:
- """
- This API is used to convert the name of a parameter to the FQNs. For FSDP
- without `use_orig_params`, the name of FlatParameter can be mapped to
- multiple original parameters. As a result, the return type of this function
- is `Set[str]`.
- Args:
- module (nn.Module): the root model.
- name (str): the name
- skip_ddp_prefix (bool): whether to skip DDP's `module` prefix
- Returns:
- The canonical FQNs based on the model traversal.
- """
- # Remove the checkpoint prefix, if it exists.
- name = name.replace(_CHECKPOINT_PREFIX, "")
- if "." not in name:
- return {name}
- obj_names = name.split(".")
- fqn_obj_names = []
- curr_obj = model
- for i, curr_obj_name in enumerate(obj_names):
- if isinstance(curr_obj, DDP):
- assert curr_obj_name == "module"
- curr_obj = curr_obj.module
- if not skip_ddp_prefix:
- fqn_obj_names.append(curr_obj_name)
- elif isinstance(curr_obj, FSDP):
- if i < len(obj_names) - 1 and obj_names[i + 1] == _FLAT_PARAM:
- prefix = ".".join(fqn_obj_names)
- flat_param = getattr(curr_obj, _FLAT_PARAM)
- if prefix:
- prefix = f"{prefix}."
- return {f"{prefix}{fqn}" for fqn in flat_param._fqns}
- curr_obj = getattr(curr_obj, FSDP_WRAPPED_MODULE)
- if curr_obj_name != FSDP_WRAPPED_MODULE:
- fqn_obj_names.append(curr_obj_name)
- curr_obj = getattr(curr_obj, curr_obj_name)
- elif isinstance(curr_obj, torch._dynamo.eval_frame.OptimizedModule):
- assert curr_obj_name == "_orig_mod"
- curr_obj = curr_obj._orig_mod
- if not skip_compiler_prefix:
- fqn_obj_names.append(curr_obj_name)
- else:
- fqn_obj_names.append(curr_obj_name)
- if curr_obj_name == nn.modules.module._EXTRA_STATE_KEY_SUFFIX:
- if i != len(obj_names) - 1:
- raise RuntimeError("Expect `_extra_state` to be the last obj name")
- else:
- curr_obj = getattr(curr_obj, curr_obj_name)
- return {".".join(fqn_obj_names).replace(_CHECKPOINT_PREFIX, "")}
- class _EXTRA_STATE:
- pass
- def _iterate_valid_model_state(model):
- visited_modules: Set[nn.Module] = set()
- def recurse(module: nn.Module, curr_fqn: str) -> Generator:
- visited_modules.add(module)
- curr_fqn = f"{curr_fqn}." if curr_fqn else ""
- for name, submodule in module.named_children():
- if submodule in visited_modules:
- continue
- new_fqn = f"{curr_fqn}{name}"
- yield from recurse(submodule, new_fqn)
- for name, obj in chain(
- module.named_buffers(recurse=False), module.named_parameters(recurse=False)
- ):
- if name in module._non_persistent_buffers_set:
- continue
- new_fqn = f"{curr_fqn}{name}"
- yield new_fqn, obj
- if (
- getattr(module.__class__, "get_extra_state", nn.Module.get_extra_state)
- != nn.Module.get_extra_state
- ):
- new_fqn = f"{curr_fqn}{nn.modules.module._EXTRA_STATE_KEY_SUFFIX}"
- yield new_fqn, _EXTRA_STATE()
- yield from recurse(model, "")
- def _verify_options(
- model: nn.Module,
- optims: Tuple[torch.optim.Optimizer, ...],
- optim_only: bool,
- *,
- submodules: Optional[Set[nn.Module]] = None,
- options: Optional[StateDictOptions] = None,
- ) -> _StateDictInfo:
- """
- Verify the model and options passed by the user and generates _StateDictInfo.
- """
- if submodules:
- warnings.warn(
- "Getting submodules only model/optim state_dict is deprecated and "
- "will be removed in 2.5. This feature can be achieved by manually "
- "filtering out the state_dict returned from get_state_dict.",
- FutureWarning,
- )
- if optim_only and not optims:
- raise RuntimeError(
- "Optimizers are not passed in but optim_only is set to True."
- )
- options = options or StateDictOptions()
- fqn_param_mapping: Dict[
- Union[str, torch.Tensor], Union[Set[str], torch.Tensor]
- ] = {}
- shared_params_mapping: Dict[
- Union[str, torch.Tensor], Union[Set[str], torch.Tensor]
- ] = {}
- for name, param in _iterate_valid_model_state(model):
- if isinstance(param, _EXTRA_STATE):
- continue
- fqns = _get_fqns(model, name)
- fqn = fqn_param_mapping.get(param, None)
- if fqn is not None:
- cast(Set[str], fqn_param_mapping[param]).update(fqns)
- shared_params_mapping[param] = fqn_param_mapping[param]
- else:
- # We need to do copy as _get_fqns is lru_cached
- fqn_param_mapping[param] = fqns.copy()
- for fqn in fqns:
- if not isinstance(param, _EXTRA_STATE):
- fqn_param_mapping[fqn] = param
- for param_, fqns_ in list(shared_params_mapping.items()):
- for fqn in fqns_:
- shared_params_mapping[fqn] = cast(torch.Tensor, param_)
- submodule_prefixes: Set[str] = set()
- if submodules:
- submodules = set(submodules)
- for name, module in model.named_modules():
- if module not in submodules:
- continue
- fqns = _get_fqns(model, name)
- assert len(fqns) == 1, "Submodule FQN should only have 1 instance"
- submodule_prefixes.update(f"{fqn}." for fqn in fqns)
- if options.broadcast_from_rank0 and not options.full_state_dict:
- raise ValueError(
- "full_state_dict must be True when broadcast_from_rank0 is True."
- )
- fsdp_modules = FSDP.fsdp_modules(model)
- state_dict_config: StateDictConfig
- optim_state_dict_config: OptimStateDictConfig
- fsdp_context: Callable
- if fsdp_modules:
- # FSDP API only work if at least one FSDP instance exists.
- if options.full_state_dict:
- state_dict_config = FullStateDictConfig(
- offload_to_cpu=options.cpu_offload, rank0_only=options.cpu_offload
- )
- optim_state_dict_config = FullOptimStateDictConfig(
- offload_to_cpu=options.cpu_offload,
- rank0_only=(options.cpu_offload or options.broadcast_from_rank0),
- )
- state_dict_type = StateDictType.FULL_STATE_DICT
- else:
- state_dict_config = ShardedStateDictConfig(
- offload_to_cpu=options.cpu_offload,
- )
- optim_state_dict_config = ShardedOptimStateDictConfig(
- offload_to_cpu=options.cpu_offload,
- )
- state_dict_type = StateDictType.SHARDED_STATE_DICT
- @contextlib.contextmanager
- def fsdp_state_dict_type_without_warning(
- module,
- state_dict_type,
- state_dict_config,
- optim_state_dict_config,
- ):
- with warnings.catch_warnings():
- with FSDP.state_dict_type(
- module=module,
- state_dict_type=state_dict_type,
- state_dict_config=state_dict_config,
- optim_state_dict_config=optim_state_dict_config,
- ):
- yield
- fsdp_context = functools.partial(
- fsdp_state_dict_type_without_warning,
- module=model,
- state_dict_type=state_dict_type,
- state_dict_config=state_dict_config,
- optim_state_dict_config=optim_state_dict_config,
- )
- else:
- fsdp_context = contextlib.nullcontext
- return _StateDictInfo(
- **asdict(options),
- fqn_param_mapping=fqn_param_mapping,
- shared_params_mapping=shared_params_mapping,
- submodule_prefixes=submodule_prefixes,
- fsdp_context=fsdp_context,
- fsdp_modules=cast(List[nn.Module], fsdp_modules),
- handle_model=not optim_only,
- handle_optim=(len(optims) > 0),
- )
- def _verify_state_dict(
- model_state_dict: Dict[str, ValueType],
- optim_state_dict: OptimizerStateType,
- info: _StateDictInfo,
- ) -> None:
- for module in info.fsdp_modules:
- fsdp_state = _get_module_fsdp_state_if_fully_sharded_module(module)
- assert fsdp_state is not None, "Expected a fsdp_state with a fsdp module."
- # Verify if the model_state_dict and optim_state_dict are valid. This API
- # should give the users an explicit error message to debug or report.
- if (
- info.handle_model
- and not model_state_dict
- and not info.submodule_prefixes
- and not info.ignore_frozen_params
- and not (info.cpu_offload and info.full_state_dict)
- and info.strict
- and not info.broadcast_from_rank0
- ):
- raise RuntimeError(
- "The option indicates that model state_dict is required to save "
- "or load, but model state_dict is empty."
- f"rank = {dist.get_rank()=}."
- )
- if info.handle_optim:
- if (
- not optim_state_dict
- and not (info.cpu_offload and info.full_state_dict)
- and (not info.broadcast_from_rank0)
- ):
- raise RuntimeError(
- "The option indicates that model state_dict is required to save, "
- f"or load but optim state_dict is empty. {optim_state_dict}"
- )
- for key in model_state_dict.keys():
- if _FLAT_PARAM in key:
- raise RuntimeError(
- f"{key} contains {_FLAT_PARAM}. This can happen if the model "
- "is not the root module."
- )
- def _state_dict_fn(obj: Union[nn.Module, torch.optim.Optimizer], api: str) -> Callable:
- call = getattr(obj, api)
- if call in _patched_state_dict:
- call = functools.partial(getattr(obj.__class__, api), self=obj)
- return call
- def _maybe_full_or_cpu_state_dict(
- state_dict: Dict[str, Any], info: _StateDictInfo
- ) -> Dict[str, Any]:
- if info.full_state_dict:
- ranks_only = (
- tuple()
- if (not info.cpu_offload or not torch.distributed.is_initialized())
- else (0,)
- )
- return _gather_state_dict(
- state_dict, cpu_offload=info.cpu_offload, ranks_only=ranks_only
- )
- elif info.cpu_offload:
- return _offload_state_dict_to_cpu(state_dict)
- else:
- return state_dict
- def _get_model_state_dict(
- model: nn.Module, info: _StateDictInfo
- ) -> Dict[str, ValueType]:
- if not info.handle_model:
- return {}
- with info.fsdp_context():
- state_dict = _state_dict_fn(model, "state_dict")()
- for key in list(state_dict.keys()):
- fqns = _get_fqns(model, key)
- assert len(fqns) == 1, (key, fqns)
- fqn = next(iter(fqns))
- if fqn != key:
- # As we only support FSDP, DDP, and TP, the only cases are
- # wrapper-based DDP and compiler. Verify if the assumption
- # is correct.
- def verify(key, fqn) -> bool:
- if len(fqn) >= len(key):
- return False
- fqn_split = fqn.split(".")
- key_split = key.split(".")
- fqn_idx = 0
- for key_idx, key_name in enumerate(key_split):
- if key_name == fqn_split[fqn_idx]:
- fqn_idx += 1
- if fqn_idx == len(fqn_split):
- return key_idx == len(key_split) - 1
- elif key_name in ("module", "_orig_mod"):
- continue
- else:
- return False
- return True
- if not verify(key, fqn):
- raise RuntimeError(f"An unexpected key, {key}, exists. FQN is {fqn}")
- state_dict[fqn] = state_dict.pop(key)
- if info.submodule_prefixes:
- new_state_dict: Dict[str, ValueType] = {}
- # TODO: make this faster.
- for fqn in state_dict.keys():
- for prefix in info.submodule_prefixes:
- if not fqn.startswith(prefix):
- continue
- if info.keep_submodule_prefixes:
- new_state_dict[fqn] = state_dict[fqn]
- else:
- new_fqn = fqn[len(prefix) :]
- new_state_dict[new_fqn] = state_dict[fqn]
- state_dict = new_state_dict
- if info.ignore_frozen_params:
- for key, param in model.named_parameters():
- if param.requires_grad:
- continue
- fqns = _get_fqns(model, key)
- for fqn in fqns:
- state_dict.pop(fqn)
- for key, p in list(state_dict.items()):
- if torch.is_tensor(p) and p.is_meta:
- state_dict.pop(key)
- return _maybe_full_or_cpu_state_dict(state_dict, info)
- def _load_model_state_dict(
- model: nn.Module,
- state_dict: Dict[str, ValueType],
- info: _StateDictInfo,
- ) -> _IncompatibleKeys:
- if not info.handle_model or (not state_dict and not info.broadcast_from_rank0):
- return _IncompatibleKeys({}, {})
- local_state_dict = {}
- for key, value in _iterate_valid_model_state(model):
- fqns = _get_fqns(model, key)
- fqns_with_prefix = _get_fqns(
- model, key, skip_ddp_prefix=False, skip_compiler_prefix=False
- )
- for fqn, fqn_with_prefix in zip(fqns, fqns_with_prefix):
- if (
- not info.broadcast_from_rank0 or dist.get_rank() == 0
- ) and fqn != fqn_with_prefix:
- state_dict[fqn_with_prefix] = state_dict.pop(fqn)
- local_state_dict[fqn_with_prefix] = value
- if info.broadcast_from_rank0:
- device = None
- for key, value in local_state_dict.items():
- if torch.is_tensor(value) and value.dim() > 0:
- if device is None:
- device = value.device
- else:
- assert device == value.device
- assert device is not None
- _broadcast_state_dict(
- state_dict, local_state_dict, device=device, strict=info.strict
- )
- for fqn, local_state in local_state_dict.items():
- state_dict[fqn] = local_state
- with info.fsdp_context():
- return cast(
- _IncompatibleKeys,
- _state_dict_fn(model, "load_state_dict")(
- state_dict=state_dict, strict=info.strict
- ),
- )
- def _init_optim_state(optim: torch.optim.Optimizer) -> None:
- """
- Initialize optim states by calling the step() with zero grads.
- """
- if optim.state:
- # The optimizer state is initialized.
- return
- for param_group in optim.param_groups:
- for param in param_group[_PARAMS]:
- if param.grad is not None:
- raise RuntimeError(
- "state_dict can only be used if the optimizer "
- "states are initialized (usually after one step() with "
- "gradients) or gradients are None. For the later case, "
- "state_dict will fake the gradients as zero "
- "to initialize the optimizer states. However, the "
- "gradients are not None."
- )
- if param.requires_grad:
- param.grad = torch.zeros_like(param)
- # Some optimizers will update parameters regardless of grads due to lr, so
- # make lr to zero when calling `step()`.
- lrs = []
- for param_group in optim.param_groups:
- if "lr" in param_group:
- lrs.append(param_group["lr"])
- param_group["lr"] = 0.0
- optim.step(closure=None)
- # Whether to recover the "lr" should not matter too much as we will
- # restore checkpointing later.
- for param_group in optim.param_groups:
- if "lr" in param_group:
- param_group["lr"] = lrs.pop(0)
- optim.zero_grad(set_to_none=True)
- def _flatten_optim_state_dict(state_dict: OptimizerStateType) -> Dict[str, ValueType]:
- """
- This API flattens the optimizer state_dict to support optimizer resharding for
- MPMD, e.g., pipeline parallelism.
- Without the API, the original optimizer state_dict looks like:
- {
- "state": {
- "layer1.weight": {
- "step": 10, "exp_avg": SomeTensor, "exp_avg_sq": SomeTensor
- },
- "layer2.weight": {
- "step": 10, "exp_avg": SomeTensor, "exp_avg_sq": SomeTensor
- },
- },
- "param_group": [
- {
- "lr": 0.0,
- "betas": (0.9, 0.95), ...,
- "params": ["layer1.weight", "layer2.weight"]
- }
- ]
- }
- With this API, the optimizer state_dict looks like:
- {
- "state.layer1.weight.step": 10,
- "state.layer2.weight.step": 10,
- "state.layer1.weight.exp_avg": SomeTensor,
- "state.layer2.weight.exp_avg": SomeTensor,
- "state.layer1.weight.exp_avg_sq": SomeTensor,
- "state.layer2.weight.exp_avg_sq": SomeTensor,
- "param_group.layer1.weight.lr" : 0.1,
- "param_group.layer2.weight.lr" : 0.1,
- "param_group.layer1.weight.betas" : (0.9, 0.95),
- "param_group.layer2.weight.betas" : (0.9, 0.95),
- }
- Note that if any of the value is a container, like the betas in the example,
- this API won't flattent it.
- """
- def _raise_if_type_not_supported(v):
- if not isinstance(v, (torch.Tensor, int, float)):
- raise NotImplementedError(
- "Flattening optimizer state_dict only supports "
- "tensor, int, float states now. "
- f"Type is {type(v)}."
- )
- ret: Dict[str, ValueType] = {}
- for fqn, state in cast(DictValueType, state_dict[_STATE]).items():
- for k, v in cast(DictValueType, state).items():
- _raise_if_type_not_supported(v)
- ret[f"{_STATE}.{fqn}.{k}"] = v
- for param_group in cast(ListDictValueType, state_dict[_PG]):
- fqns = param_group.pop(_PARAMS)
- for fqn in cast(List[str], fqns):
- for k, v in param_group.items():
- ret[f"{_PG}.{fqn}.{k}"] = v
- return ret
- def _unflatten_optim_state_dict(
- optim: torch.optim.Optimizer,
- state_dict: Dict[str, ValueType],
- info: _StateDictInfo,
- ) -> OptimizerStateType:
- """
- This API unflattens the state_dict generated by _flatten_optim_state_dict().
- See the docstring of _flatten_optim_state_dict() for more detail.
- """
- state: DictValueType = {}
- pg_state: ListDictValueType = []
- return_osd: OptimizerStateType = {_STATE: state, _PG: pg_state}
- for param_group in optim.param_groups:
- pg_state.append({_PARAMS: []})
- for param in param_group[_PARAMS]:
- for fqn in info.fqn_param_mapping[param]:
- params = pg_state[-1][_PARAMS]
- assert isinstance(params, list) # typing
- params.append(fqn)
- if not param.requires_grad:
- continue
- state[fqn] = {}
- for state_name in optim.state[param].keys():
- cast(DictValueType, state[fqn])[state_name] = state_dict[
- f"{_STATE}.{fqn}.{state_name}"
- ]
- first_param_fqn = cast(List[str], pg_state[-1][_PARAMS])[0]
- for k in param_group.keys():
- if k == _PARAMS:
- continue
- value = state_dict[f"{_PG}.{first_param_fqn}.{k}"]
- if k not in pg_state[-1]:
- pg_state[-1][k] = value
- elif pg_state[-1][k] != value:
- raise RuntimeError(
- "All the parameters in the same parameter group should have "
- f"the same saved param_group value. But {first_param_fqn}.{k} "
- f"is {value} while other(s) is {pg_state[-1][k]}."
- )
- return return_osd
- def _get_optim_state_dict(
- model: nn.Module,
- optimizers: Tuple[torch.optim.Optimizer, ...],
- info: _StateDictInfo,
- ) -> OptimizerStateType:
- if not info.handle_optim:
- return {}
- optim_state_dict: OptimizerStateType = {_STATE: {}, _PG: []}
- for optim in optimizers:
- _init_optim_state(optim)
- osd = _state_dict_fn(optim, "state_dict")()
- if info.fsdp_modules:
- with info.fsdp_context():
- osd = FSDP.optim_state_dict(model, optim, osd)
- # We need to specially handle FlatParameter FSDP as
- # FlatParameter FSDP converts the FQNs.
- # There are no easy ways to do this conversion systematically.
- # We can only use a string replacment without correctness check.
- if not osd:
- continue
- for k in list(osd[_STATE].keys()):
- if "_orig_mod" in k:
- osd[_STATE][k.replace("_orig_mod.", "")] = osd[_STATE].pop(k)
- for g in osd[_PG]:
- params = [k.replace("_orig_mod.", "") for k in g[_PARAMS]]
- g[_PARAMS] = params
- else:
- params = list(chain.from_iterable(g[_PARAMS] for g in optim.param_groups))
- param_pid_mapping = dict(zip(params, range(len(params))))
- fqn_pid_mapping = {}
- for key, param in model.named_parameters():
- fqns = _get_fqns(model, key)
- assert len(fqns) == 1
- fqn = next(iter(fqns))
- if param not in param_pid_mapping:
- continue
- pid = param_pid_mapping[param]
- fqn_pid_mapping[fqn] = pid
- fqn_pid_mapping[pid] = fqn
- for key in list(osd[_STATE].keys()):
- fqn = fqn_pid_mapping[key]
- osd[_STATE][fqn] = osd[_STATE].pop(key)
- for group in osd[_PG]:
- group[_PARAMS] = [fqn_pid_mapping[pid] for pid in group[_PARAMS]]
- if not osd:
- continue
- cast(DictValueType, optim_state_dict[_STATE]).update(osd[_STATE])
- cast(ListDictValueType, optim_state_dict[_PG]).extend(osd[_PG])
- if info.flatten_optimizer_state_dict:
- optim_state_dict = cast(
- OptimizerStateType, _flatten_optim_state_dict(optim_state_dict)
- )
- return _maybe_full_or_cpu_state_dict(optim_state_dict, info)
- def _split_optim_state_dict(
- model: nn.Module,
- optim: torch.optim.Optimizer,
- optim_state_dict: OptimizerStateType,
- info: _StateDictInfo,
- ) -> OptimizerStateType:
- """
- Extract the corresponding optim state_dict from ``optim_state_dict`` for
- ``optim`` and return the result optim state_dict.
- Args:
- model (nn.Module): the root model.
- optim (torch.optim.Optimizer): the optimizer.
- optim_state_dict (Dict[str, ValueType]): the superset optim state_dict that
- contains the optim state_dict of ``optim``.
- info (_StateDictInfo): state dict information.
- Returns:
- The optim state_dict of ``optim``.
- """
- state: DictValueType = {}
- pg_state: ListDictValueType = []
- return_osd: OptimizerStateType = {_STATE: state, _PG: pg_state}
- pg_mapping: Dict[int, int] = {}
- if all(
- isinstance(k, int) for k in cast(DictValueType, optim_state_dict[_STATE]).keys()
- ):
- return optim_state_dict
- for param_group in optim.param_groups:
- pg_state.append({_PARAMS: []})
- for param in param_group[_PARAMS]:
- for fqn in info.fqn_param_mapping[param]:
- if fqn in info.shared_params_mapping:
- in_params = False
- for loaded_param_group in cast(
- ListDictValueType, optim_state_dict[_PG]
- ):
- if fqn in cast(List[str], loaded_param_group[_PARAMS]):
- in_params = True
- break
- else:
- in_params = True
- if not in_params:
- continue
- params = pg_state[-1][_PARAMS]
- assert isinstance(params, list)
- params.append(fqn)
- if param.requires_grad:
- state[fqn] = cast(DictValueType, optim_state_dict[_STATE])[fqn]
- for loaded_param_group in cast(
- ListDictValueType, optim_state_dict[_PG]
- ):
- if fqn in cast(List[str], loaded_param_group[_PARAMS]):
- pg_mapping[id(loaded_param_group)] = len(return_osd[_PG]) - 1
- for param_group in cast(ListDictValueType, optim_state_dict[_PG]):
- idx = pg_mapping.get(id(param_group), -1)
- if idx == -1:
- continue
- for key, value in param_group.items():
- if key == _PARAMS:
- continue
- # TODO: check if value is the same if exists.
- pg_state[idx][key] = value
- return return_osd
- def _load_optim_state_dict(
- model: nn.Module,
- optimizers: Tuple[torch.optim.Optimizer, ...],
- state_dict: OptimizerStateType,
- info: _StateDictInfo,
- ) -> None:
- if not info.handle_optim:
- return
- for optim in optimizers:
- _init_optim_state(optim)
- if state_dict:
- if _STATE in state_dict:
- optim_state_dict = _split_optim_state_dict(
- model, optim, state_dict, info
- )
- else:
- optim_state_dict = _unflatten_optim_state_dict(
- optim, cast(Dict[str, ValueType], state_dict), info
- )
- else:
- optim_state_dict = {}
- if info.fsdp_modules:
- # We need to specially handle FlatParameter FSDP as
- # FlatParameter FSDP converts the FQNs.
- for original_fqn, _ in model.named_parameters():
- fqns = _get_fqns(model, original_fqn)
- fqns_with_compiler = _get_fqns(
- model, original_fqn, skip_compiler_prefix=False
- )
- if fqns == fqns_with_compiler:
- continue
- assert len(fqns) == 1
- fqn = fqns.pop()
- fqn_with_compiler = fqns_with_compiler.pop()
- for g in optim_state_dict[_PG]:
- val = cast(Dict[str, Any], g)
- params = [
- key.replace(fqn, fqn_with_compiler) for key in val[_PARAMS]
- ]
- val[_PARAMS] = params
- osd_state = cast(DictValueType, optim_state_dict[_STATE])
- for k in list(osd_state.keys()):
- if fqn in k:
- osd_state[k.replace(fqn, fqn_with_compiler)] = osd_state.pop(k)
- with info.fsdp_context():
- optim_state_dict = FSDP.optim_state_dict_to_load(
- model, optim, optim_state_dict
- )
- elif info.broadcast_from_rank0:
- info.full_state_dict = False
- local_state_dict = _get_optim_state_dict(model, (optim,), info)
- info.full_state_dict = True
- device = None
- def _device(t):
- if t.dim() > 0:
- nonlocal device
- if device is None:
- device = t.device
- elif device != t.device:
- raise ValueError("Device mismatch")
- return t
- _ = tree_map_only(torch.Tensor, _device, local_state_dict)
- assert device is not None
- flatten_osd, osd_mapping = _flatten_state_dict(optim_state_dict)
- flatten_local_osd, local_osd_mapping = _flatten_state_dict(local_state_dict)
- _broadcast_state_dict(flatten_osd, flatten_local_osd, device=device)
- # The modifications listed seek to address the problem where optim might possess
- # dissimilar parameters in comparison to optim_state_dict. This is achieved by
- # incorporating differential parameters within local, which may result in optim
- # having additional parameters ultimately.
- for optim_key in flatten_osd.keys():
- if optim_key not in flatten_local_osd:
- assert optim_key in osd_mapping
- flatten_local_osd[optim_key] = flatten_osd[optim_key]
- local_osd_mapping[optim_key] = osd_mapping[optim_key]
- optim_state_dict = _unflatten_state_dict(
- flatten_local_osd, local_osd_mapping
- )
- # Note that we do not have to convert the FQN back to param id here if
- # order in optim.param_groups[idx][_PARAMS] is the same as the one in
- # optim_state_dict[_PG][idx][_PARAMS].
- _state_dict_fn(optim, "load_state_dict")(state_dict=optim_state_dict)
- def get_model_state_dict(
- model: nn.Module,
- *,
- submodules: Optional[Set[nn.Module]] = None,
- options: Optional[StateDictOptions] = None,
- ) -> Dict[str, ValueType]:
- """
- Return the model state_dict of ``model``.
- See ``get_state_dict`` for the detail usage.
- Args:
- model (nn.Module): the nn.Module to the model.
- submodules (deprecated): Optional[Set[nn.Module]]: only return the model parameters
- that belong to the submodules.
- options (StateDictOptions): the options to control how
- model state_dict and optimizer state_dict should be returned. See
- `StateDictOptions` for the details.
- Returns:
- The state_dict for ``model``.
- :rtype: typing.Dict[str, ValueType]
- """
- with _gc_context():
- info = _verify_options(
- model,
- tuple(),
- optim_only=False,
- submodules=submodules,
- options=options,
- )
- model_state_dict = _get_model_state_dict(model, info)
- _verify_state_dict(model_state_dict, {}, info)
- return model_state_dict
- def get_optimizer_state_dict(
- model: nn.Module,
- optimizers: Union[torch.optim.Optimizer, Iterable[torch.optim.Optimizer]],
- *,
- submodules: Optional[Set[nn.Module]] = None,
- options: Optional[StateDictOptions] = None,
- ) -> OptimizerStateType:
- """
- Return the combined state_dict for optimizers.
- See ``get_state_dict`` for the detail usage.
- Args:
- model (nn.Module): the nn.Module to the model.
- optimizers (Union[None, Optimizer, Iterable[Optimizer]]):
- The optimizers that are used to optimize ``model``.
- submodules (deprecated): Optional[Set[nn.Module]]: only return the model parameters
- that belong to the submodules.
- options (StateDictOptions): the options to control how
- model state_dict and optimizer state_dict should be returned. See
- `StateDictOptions` for the details.
- Returns:
- The state_dict for ``optimizers``.
- :rtype: OptimizerStateType
- """
- with _gc_context():
- optimizers = (
- (optimizers,)
- if isinstance(optimizers, torch.optim.Optimizer)
- else tuple(optimizers)
- )
- info = _verify_options(
- model,
- optimizers,
- optim_only=True,
- submodules=submodules,
- options=options,
- )
- optim_state_dict = _get_optim_state_dict(model, optimizers, info)
- _verify_state_dict({}, optim_state_dict, info)
- return optim_state_dict
- def get_state_dict(
- model: nn.Module,
- optimizers: Union[torch.optim.Optimizer, Iterable[torch.optim.Optimizer]],
- *,
- submodules: Optional[Set[nn.Module]] = None,
- options: Optional[StateDictOptions] = None,
- ) -> Tuple[Dict[str, ValueType], OptimizerStateType]:
- """
- Return the model state_dict and optimizers state_dict.
- ``get_state_dict`` can process any module that is parallelized by PyTorch
- FSDP/fully_shard, DDP/replicate, tensor_parallel/parallelize_module, and any
- combination of these parallelisms. The main functions of ``get_state_dict``
- are: 1.) returning a model and optimizer state_dict that can be resharded
- with a different number of trainers and/or different parallelisms.
- 2.) hiding the parallelism-specific state_dict APIs. Users don't have to call
- these APIs.
- 3.) sanity checking the result state_dict.
- The keys of the result state dictionary are the canonical FQNs (Fully
- Qualified Names). A canonical FQN refers to the FQN based on a parameter's
- position in an nn.Module hierarchy. More specifically, a canonical FQN to a
- parameter is the FQN returned by ``module.named_parameters()`` or
- ``module.named_buffers()`` when the module is not distributed by any
- parallelisms. Since the optimizer internally uses parameter IDs to represent
- a parameter, there will be a conversion from the parameter IDs to the
- canonical FQNs when calling this API.
- ``get_state_dict`` can also process a module that is not parallelized. In
- such a case, ``get_state_dict`` only performs one function -- converting the
- optimizer parameter IDs to the canonical FQNs.
- Example:
- >>> # xdoctest: +SKIP
- >>> import torch
- >>> from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
- >>> from torch.nn.parallel import DistributedDataParallel as DDP
- >>> from torch.distributed.checkpoint.state_dict import get_state_dict
- >>> fsdp_model = FSDP(copy.deepcopy(model))
- >>> fsdp_optim = torch.optim.Adam(model.parameters(), lr=1e-3)
- >>> ddp_model = DDP(copy.deepcopy(model))
- >>> ddp_optim = torch.optim.Adam(model.parameters(), lr=1e-3)
- >>> ddp_state_dict, ddp_optim_state_dict = get_state_dict(ddp_model, ddp_optim)
- >>> fsdp_state_dict, fsdp_optim_state_dict = get_state_dict(fsdp_model, fsdp_optim)
- >>> # if we simply call ddp_model.state_dict() and fsdp_model.state_dict(),
- >>> # the asserts will fail.
- >>> assert ddp_state_dict == fsdp_state_dict
- >>> assert ddp_optim_state == fsdp_optim_state_dict
- Args:
- model (nn.Module): the nn.Module to the model.
- optimizers (Union[None, Optimizer, Iterable[Optimizer]]):
- The optimizers that are used to optimize ``model``.
- submodules (deprecated): Optional[Set[nn.Module]]: only return the model parameters
- that belong to the submodules.
- options (StateDictOptions): the options to control how
- model state_dict and optimizer state_dict should be returned. See
- `StateDictOptions` for the details.
- Returns:
- ``Tuple`` that contain model state_dict and optimizer state_dict.
- :rtype: typing.Tuple[typing.Dict[str, ValueType], OptimizerStateType]
- """
- with _gc_context():
- optimizers = (
- (optimizers,)
- if isinstance(optimizers, torch.optim.Optimizer)
- else tuple(optimizers)
- )
- info = _verify_options(
- model,
- optimizers,
- optim_only=False,
- submodules=submodules,
- options=options,
- )
- model_state_dict = _get_model_state_dict(model, info)
- optim_state_dict = _get_optim_state_dict(model, optimizers, info)
- _verify_state_dict(model_state_dict, optim_state_dict, info)
- return model_state_dict, optim_state_dict
- def _unflatten_model_state_dict(
- model: nn.Module,
- state_dict: Union[Dict[nn.Module, Dict[str, ValueType]], Dict[str, ValueType]],
- ) -> Dict[str, ValueType]:
- if not state_dict:
- return {}
- if isinstance(next(iter(state_dict.keys())), nn.Module):
- warnings.warn(
- "Passing model_state_dict as a ``Dict[nn.Module, Dict[str, Any]]``"
- "is deprecated and will be removed in 2.5. If you need this "
- "feature, please preprocessing the model_state_dict to achieve the "
- "same functionality.",
- FutureWarning,
- )
- cast_state_dict = cast(Dict[nn.Module, Dict[str, ValueType]], state_dict)
- new_state_dict: Dict[str, ValueType] = {}
- for submodule, sub_state_dict in cast_state_dict.items():
- for name, m in model.named_modules():
- if m != submodule:
- continue
- fqns = _get_fqns(model, name)
- assert len(fqns) == 1, "FQNs for a submodule should only have 1 element"
- prefix = f"{next(iter(fqns))}."
- new_state_dict.update(
- {prefix + subfqn: value for subfqn, value in sub_state_dict.items()}
- )
- return new_state_dict
- else:
- return cast(Dict[str, ValueType], state_dict)
- def set_model_state_dict(
- model: nn.Module,
- model_state_dict: Dict[str, ValueType],
- *,
- options: Optional[StateDictOptions] = None,
- ) -> _IncompatibleKeys:
- """Load the model state_dict.
- The counterpart of ``get_model_state_dict`` to set the state_dict to the
- model. See ``set_state_dict`` for the detail usage.
- Args:
- model (nn.Module): the nn.Module to the model.
- model_state_dict: (Dict[str, ValueType]):
- the model state_dict to load. If the key of the ``model_state_dict``
- is nn.Module, the key is a submodule of ``model`` and the value should
- be the state_dict of the submodule. When loading the state_dict,
- the prefix of the submodule will be append to the state_dict.
- options (StateDictOptions): the options to control how
- model state_dict and optimizer state_dict should be loaded. See
- `StateDictOptions` for the details.
- Returns:
- ``NamedTuple`` with ``missing_keys`` and ``unexpected_keys`` fields:
- * **missing_keys** is a list of str containing the missing keys
- * **unexpected_keys** is a list of str containing the unexpected keys
- :type model_state_dict: typing.Dict[str, ValueType]
- """
- model_state_dict: Dict[str, ValueType] = _unflatten_model_state_dict(
- model, model_state_dict
- )
- with _gc_context():
- info = _verify_options(model, tuple(), optim_only=False, options=options)
- _verify_state_dict(model_state_dict, {}, info)
- return _load_model_state_dict(model, model_state_dict, info)
- def set_optimizer_state_dict(
- model: nn.Module,
- optimizers: Union[torch.optim.Optimizer, Iterable[torch.optim.Optimizer]],
- optim_state_dict: OptimizerStateType,
- *,
- options: Optional[StateDictOptions] = None,
- ) -> None:
- """Load the optimizers state_dict.
- The counterpart of ``get_optimizer_state_dict`` to set the state_dict to the
- optimizers. See ``set_state_dict`` for the detail usage.
- Args:
- model (nn.Module): the nn.Module to the model.
- optimizers (Union[Optimizer, Iterable[Optimizer]]):
- The optimizers that are used to optimize ``model``.
- optim_state_dict: OptimizerStateType:
- the optimizer state_dict to load.
- options (StateDictOptions): the options to control how
- model state_dict and optimizer state_dict should be loaded. See
- `StateDictOptions` for the details.
- Returns:
- None
- :type optim_state_dict: typing.OptimizerStateType
- """
- with _gc_context():
- optimizers = (
- (optimizers,)
- if isinstance(optimizers, torch.optim.Optimizer)
- else tuple(optimizers)
- )
- info = _verify_options(model, optimizers, optim_only=True, options=options)
- _verify_state_dict({}, optim_state_dict, info)
- _load_optim_state_dict(model, optimizers, optim_state_dict, info)
- def set_state_dict(
- model: nn.Module,
- optimizers: Union[torch.optim.Optimizer, Iterable[torch.optim.Optimizer]],
- *,
- model_state_dict: Dict[str, ValueType],
- optim_state_dict: OptimizerStateType,
- options: Optional[StateDictOptions] = None,
- ) -> _IncompatibleKeys:
- """Load the model state_dict and optimizers state_dict.
- The counterpart of ``get_state_dict`` to set the state_dict to the model and
- optimizers. The given ``model_state_dict`` and ``optim_state_dict`` do not
- have to be returned by ``get_state_dict`` but must meet the following
- requirements: 1) all FQNs are canonical FQNs as defined in ``get_state_dict``,
- 2) if a tensor is sharded, it must be either a ShardedTensor or DTensor,
- 3) optimizer state_dict cannot contain the parameter IDs; the keys should be
- the canonical FQNs.
- Args:
- model (nn.Module): the nn.Module to the model.
- optimizers (Union[Optimizer, Iterable[Optimizer]]):
- The optimizers that are used to optimize ``model``.
- model_state_dict: (Union[Dict[nn.Module, Dict[str, ValueType]], Dict[str, ValueType]]):
- the model state_dict to load. If the key of the ``model_state_dict``
- is nn.Module, the key is a submodule of ``model`` and the value should
- be the state_dict of the submodule. When loading the state_dict,
- the prefix of the submodule will be append to the state_dict.
- optim_state_dict: OptimizerStateType:
- the optimizer state_dict to load.
- options (StateDictOptions): the options to control how
- model state_dict and optimizer state_dict should be loaded. See
- `StateDictOptions` for the details.
- Returns:
- ``NamedTuple`` with ``missing_keys`` and ``unexpected_keys`` fields:
- * **missing_keys** is a list of str containing the missing keys of the model state_dict.
- * **unexpected_keys** is a list of str containing the unexpected keys of the model state_dict.
- :type model_state_dict: typing.Dict[str, ValueType]
- :type optim_state_dict: typing.OptimizerStateType
- """
- model_state_dict: Dict[str, ValueType] = _unflatten_model_state_dict(
- model, model_state_dict
- )
- with _gc_context():
- optimizers = (
- (optimizers,)
- if isinstance(optimizers, torch.optim.Optimizer)
- else tuple(optimizers)
- )
- info = _verify_options(
- model, optimizers, optim_only=not model_state_dict, options=options
- )
- _verify_state_dict(model_state_dict, optim_state_dict, info)
- _load_optim_state_dict(model, optimizers, optim_state_dict, info)
- return _load_model_state_dict(model, model_state_dict, info)
- # TODO: correct the state_dict function signature.
- # TODO: this API is not yet fully tested. Make it private
- @no_type_check
- def _patch_model_state_dict(
- model: nn.Module,
- *,
- options: Optional[StateDictOptions] = None,
- ) -> None:
- """Patch the ``state_dict`` and ``load_state_dict`` attributes of ``model``.
- Patch the ``state_dict`` and ``load_state_dict`` attributes of ``model`` to
- be a partial function to call ``get_state_dict`` and ``set_state_dict``.
- Example:
- from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
- from torch.distributed.checkpoint.state_dict import patch_model_state_dict
- model = fsdp(model)
- patch_model_state_dict(model)
- Args:
- model (nn.Module): the nn.Module to the model.
- options (StateDictOptions): the options to control how
- model state_dict and optimizer state_dict should be loaded. See
- `StateDictOptions` for the details.
- Returns:
- None
- """
- _state_dict_call = functools.partial(
- get_model_state_dict,
- model=model,
- options=options,
- )
- def state_dict_call():
- return _state_dict_call()
- model.state_dict = state_dict_call
- _load_state_dict_call = functools.partial(
- set_model_state_dict,
- model=model,
- options=options,
- )
- def load_state_dict_call(state_dict: Dict[str, Any]):
- _load_state_dict_call(model_state_dict=state_dict)
- model.load_state_dict = load_state_dict_call
- _patched_state_dict.add(state_dict_call)
- _patched_state_dict.add(load_state_dict_call)
- # TODO: correct the load_state_dict function signature.
- # TODO: this API is not yet fully tested. Make it private
- @no_type_check
- def _patch_optimizer_state_dict(
- model: nn.Module,
- *,
- optimizers: Tuple[torch.optim.Optimizer, ...],
- options: Optional[StateDictOptions] = None,
- ) -> None:
- """Patch the ``state_dict`` and ``load_state_dict`` attributes of ``optimizers``.
- Patch the ``state_dict`` and ``load_state_dict`` attributes of ``optimizers`` to
- be a partial function to call ``get_state_dict`` and ``set_state_dict``.
- Note that if there are multiple optimizers, all of the optimizers will be patched.
- So users only need to call one of the state_dict() to get the full result.
- Example:
- from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
- from torch.distributed.checkpoint.state_dict import patch_model_state_dict
- model = fsdp(model)
- patch_model_state_dict(model)
- Args:
- model (nn.Module): the nn.Module to the model.
- options (StateDictOptions): the options to control how
- model state_dict and optimizer state_dict should be loaded. See
- `StateDictOptions` for the details.
- Returns:
- None
- """
- _state_dict_call = functools.partial(
- get_optimizer_state_dict,
- model=model,
- optimizers=optimizers,
- options=options,
- )
- def state_dict_call():
- return _state_dict_call()
- _load_state_dict_call = functools.partial(
- set_optimizer_state_dict,
- model=model,
- optimizers=optimizers,
- options=options,
- )
- def load_state_dict_call(state_dict: Dict[str, Any]):
- _load_state_dict_call(optim_state_dict=state_dict)
- _patched_state_dict.add(state_dict_call)
- _patched_state_dict.add(load_state_dict_call)
- optimizers = (
- (optimizers,)
- if isinstance(optimizers, torch.optim.Optimizer)
- else tuple(optimizers)
- )
- for optim in optimizers:
- optim.state_dict = state_dict_call
- optim.load_state_dict = load_state_dict_call
|