Coverage for / home / jenkins / .local / lib / python3.10 / site-packages / hyper_parallel / core / distributed_checkpoint / storage.py: 100%
42 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-05-11 07:26 +0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-05-11 07:26 +0800
1# Copyright 2026 Huawei Technologies Co., Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ============================================================================
15"""Storage interfaces for checkpoint save and load."""
16import abc
17from dataclasses import dataclass
18from pathlib import Path
19from typing import Optional, Union
21from hyper_parallel.core.distributed_checkpoint.metadata import Metadata, MetadataIndex
22from hyper_parallel.core.distributed_checkpoint.planner import LoadPlan, LoadPlanner, SavePlan, SavePlanner
24METADATA_FILE_NAME = ".metadata"
27@dataclass
28class StorageInfo:
29 """
30 Storage information for a single logical item.
32 Torch-aligned: matches torch.distributed.checkpoint.filesystem._StorageInfo.
34 Attributes:
35 relative_path: Path relative to checkpoint root.
36 offset: Byte offset within the file.
37 length: Byte length of the data (best-effort, may be -1 for tensors).
38 """
39 relative_path: str
40 offset: int
41 length: int
44@dataclass
45class WriteResult:
46 """
47 Result of writing a single logical item.
49 Torch-aligned: contains the metadata index and storage information.
50 """
51 index: MetadataIndex
52 storage_data: StorageInfo
55class StorageWriter(abc.ABC):
56 """
57 Abstract base class for storage writers.
59 Defines the interface for writing checkpoint data to storage backends.
60 """
62 @abc.abstractmethod
63 def initialize_writer(self, checkpoint_id: Optional[Union[Path, str]] = None) -> None:
64 """
65 Initialize storage writer with optional new checkpoint directory.
67 Args:
68 checkpoint_id (Optional[Union[Path, str]]): The ID/path of the checkpoint directory.
69 If None, uses the previously configured checkpoint directory. Default None.
70 """
72 @abc.abstractmethod
73 def configure_writer(self, is_coordinator: bool, **kwargs) -> None:
74 """
75 Configure storage writer with coordinator and rank information.
77 Args:
78 is_coordinator (bool): Whether this rank is the coordinator rank.
79 **kwargs: Additional keyword arguments (e.g., rank, use_collectives).
80 """
82 @abc.abstractmethod
83 def optimize_local_plan(self, plan: SavePlan) -> SavePlan:
84 """
85 Optimize local plan for storage-specific optimizations.
87 Args:
88 plan (SavePlan): The local save plan to optimize.
90 Returns:
91 SavePlan: The optimized local save plan.
92 """
94 @abc.abstractmethod
95 def optimize_global_plan(self, plans: list[SavePlan]) -> list[SavePlan]:
96 """
97 Optimize global plan from all local plans.
99 Args:
100 plans (list[SavePlan]): List of local save plans from all ranks.
102 Returns:
103 list[SavePlan]: List of optimized global save plans.
104 """
106 @abc.abstractmethod
107 def execute_write(self, plan: SavePlan, planner: SavePlanner) -> list[WriteResult]:
108 """
109 Execute write operation to storage and return write results.
111 Args:
112 plan (SavePlan): The save plan to execute.
113 planner (SavePlanner): The save planner instance for accessing tensor data.
115 Returns:
116 list[WriteResult]: List of write results containing storage information for each written item.
117 """
119 @abc.abstractmethod
120 def finalize_checkpoint(self, metadata: Metadata, results: list[list[WriteResult]]) -> None:
121 """
122 Finalize checkpoint writing and complete metadata.
124 Args:
125 metadata (Metadata): The checkpoint metadata to finalize.
126 results (list[list[WriteResult]]): List of write results from all ranks,
127 where each inner list contains WriteResults from one rank.
128 """
131class StorageReader(abc.ABC):
132 """
133 Abstract base class for storage readers.
135 Defines the interface for reading checkpoint data from storage backends.
136 """
138 @abc.abstractmethod
139 def initialize_reader(self, checkpoint_id: Optional[Union[Path, str]] = None) -> None:
140 """
141 Initialize storage reader with optional new checkpoint directory.
143 Args:
144 checkpoint_id (Optional[Union[Path, str]]): The ID/path of the checkpoint directory.
145 If None, uses the previously configured checkpoint directory. Default None.
146 """
148 @abc.abstractmethod
149 def load_metadata(self, **kwargs) -> Metadata:
150 """
151 Load checkpoint metadata from storage.
153 Args:
154 **kwargs: Additional keyword arguments (e.g., rank for rank-local metadata).
156 Returns:
157 Metadata: The loaded checkpoint metadata.
158 """
160 @abc.abstractmethod
161 def configure_reader(self, metadata: Metadata, is_coordinator: bool, **kwargs) -> None:
162 """
163 Configure storage reader with metadata and coordinator information.
165 Args:
166 metadata (Metadata): The checkpoint metadata.
167 is_coordinator (bool): Whether this rank is the coordinator rank.
168 **kwargs: Additional keyword arguments (e.g., rank, use_collectives).
169 """
171 @abc.abstractmethod
172 def optimize_local_plan(self, plan: LoadPlan) -> LoadPlan:
173 """
174 Optimize local plan for storage-specific optimizations.
176 Args:
177 plan (LoadPlan): The local load plan to optimize.
179 Returns:
180 LoadPlan: The optimized local load plan.
181 """
183 @abc.abstractmethod
184 def optimize_global_plan(self, plans: list[LoadPlan]) -> list[LoadPlan]:
185 """
186 Optimize global plan from all local plans.
188 Args:
189 plans (list[LoadPlan]): List of local load plans from all ranks.
191 Returns:
192 list[LoadPlan]: List of optimized global load plans.
193 """
195 @abc.abstractmethod
196 def execute_read(self, plan: LoadPlan, planner: LoadPlanner) -> None:
197 """
198 Execute read operation from storage according to the load plan.
200 Args:
201 plan (LoadPlan): The load plan to execute.
202 planner (LoadPlanner): The load planner instance for applying loaded data.
203 """