Coverage for  / home / jenkins / .local / lib / python3.10 / site-packages / hyper_parallel / core / distributed_checkpoint / storage.py: 100%

42 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-05-11 07:26 +0800

1# Copyright 2026 Huawei Technologies Co., Ltd 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# ============================================================================ 

15"""Storage interfaces for checkpoint save and load.""" 

16import abc 

17from dataclasses import dataclass 

18from pathlib import Path 

19from typing import Optional, Union 

20 

21from hyper_parallel.core.distributed_checkpoint.metadata import Metadata, MetadataIndex 

22from hyper_parallel.core.distributed_checkpoint.planner import LoadPlan, LoadPlanner, SavePlan, SavePlanner 

23 

24METADATA_FILE_NAME = ".metadata" 

25 

26 

27@dataclass 

28class StorageInfo: 

29 """ 

30 Storage information for a single logical item. 

31 

32 Torch-aligned: matches torch.distributed.checkpoint.filesystem._StorageInfo. 

33 

34 Attributes: 

35 relative_path: Path relative to checkpoint root. 

36 offset: Byte offset within the file. 

37 length: Byte length of the data (best-effort, may be -1 for tensors). 

38 """ 

39 relative_path: str 

40 offset: int 

41 length: int 

42 

43 

44@dataclass 

45class WriteResult: 

46 """ 

47 Result of writing a single logical item. 

48 

49 Torch-aligned: contains the metadata index and storage information. 

50 """ 

51 index: MetadataIndex 

52 storage_data: StorageInfo 

53 

54 

55class StorageWriter(abc.ABC): 

56 """ 

57 Abstract base class for storage writers. 

58 

59 Defines the interface for writing checkpoint data to storage backends. 

60 """ 

61 

62 @abc.abstractmethod 

63 def initialize_writer(self, checkpoint_id: Optional[Union[Path, str]] = None) -> None: 

64 """ 

65 Initialize storage writer with optional new checkpoint directory. 

66 

67 Args: 

68 checkpoint_id (Optional[Union[Path, str]]): The ID/path of the checkpoint directory. 

69 If None, uses the previously configured checkpoint directory. Default None. 

70 """ 

71 

72 @abc.abstractmethod 

73 def configure_writer(self, is_coordinator: bool, **kwargs) -> None: 

74 """ 

75 Configure storage writer with coordinator and rank information. 

76 

77 Args: 

78 is_coordinator (bool): Whether this rank is the coordinator rank. 

79 **kwargs: Additional keyword arguments (e.g., rank, use_collectives). 

80 """ 

81 

82 @abc.abstractmethod 

83 def optimize_local_plan(self, plan: SavePlan) -> SavePlan: 

84 """ 

85 Optimize local plan for storage-specific optimizations. 

86 

87 Args: 

88 plan (SavePlan): The local save plan to optimize. 

89 

90 Returns: 

91 SavePlan: The optimized local save plan. 

92 """ 

93 

94 @abc.abstractmethod 

95 def optimize_global_plan(self, plans: list[SavePlan]) -> list[SavePlan]: 

96 """ 

97 Optimize global plan from all local plans. 

98 

99 Args: 

100 plans (list[SavePlan]): List of local save plans from all ranks. 

101 

102 Returns: 

103 list[SavePlan]: List of optimized global save plans. 

104 """ 

105 

106 @abc.abstractmethod 

107 def execute_write(self, plan: SavePlan, planner: SavePlanner) -> list[WriteResult]: 

108 """ 

109 Execute write operation to storage and return write results. 

110 

111 Args: 

112 plan (SavePlan): The save plan to execute. 

113 planner (SavePlanner): The save planner instance for accessing tensor data. 

114 

115 Returns: 

116 list[WriteResult]: List of write results containing storage information for each written item. 

117 """ 

118 

119 @abc.abstractmethod 

120 def finalize_checkpoint(self, metadata: Metadata, results: list[list[WriteResult]]) -> None: 

121 """ 

122 Finalize checkpoint writing and complete metadata. 

123 

124 Args: 

125 metadata (Metadata): The checkpoint metadata to finalize. 

126 results (list[list[WriteResult]]): List of write results from all ranks, 

127 where each inner list contains WriteResults from one rank. 

128 """ 

129 

130 

131class StorageReader(abc.ABC): 

132 """ 

133 Abstract base class for storage readers. 

134 

135 Defines the interface for reading checkpoint data from storage backends. 

136 """ 

137 

138 @abc.abstractmethod 

139 def initialize_reader(self, checkpoint_id: Optional[Union[Path, str]] = None) -> None: 

140 """ 

141 Initialize storage reader with optional new checkpoint directory. 

142 

143 Args: 

144 checkpoint_id (Optional[Union[Path, str]]): The ID/path of the checkpoint directory. 

145 If None, uses the previously configured checkpoint directory. Default None. 

146 """ 

147 

148 @abc.abstractmethod 

149 def load_metadata(self, **kwargs) -> Metadata: 

150 """ 

151 Load checkpoint metadata from storage. 

152 

153 Args: 

154 **kwargs: Additional keyword arguments (e.g., rank for rank-local metadata). 

155 

156 Returns: 

157 Metadata: The loaded checkpoint metadata. 

158 """ 

159 

160 @abc.abstractmethod 

161 def configure_reader(self, metadata: Metadata, is_coordinator: bool, **kwargs) -> None: 

162 """ 

163 Configure storage reader with metadata and coordinator information. 

164 

165 Args: 

166 metadata (Metadata): The checkpoint metadata. 

167 is_coordinator (bool): Whether this rank is the coordinator rank. 

168 **kwargs: Additional keyword arguments (e.g., rank, use_collectives). 

169 """ 

170 

171 @abc.abstractmethod 

172 def optimize_local_plan(self, plan: LoadPlan) -> LoadPlan: 

173 """ 

174 Optimize local plan for storage-specific optimizations. 

175 

176 Args: 

177 plan (LoadPlan): The local load plan to optimize. 

178 

179 Returns: 

180 LoadPlan: The optimized local load plan. 

181 """ 

182 

183 @abc.abstractmethod 

184 def optimize_global_plan(self, plans: list[LoadPlan]) -> list[LoadPlan]: 

185 """ 

186 Optimize global plan from all local plans. 

187 

188 Args: 

189 plans (list[LoadPlan]): List of local load plans from all ranks. 

190 

191 Returns: 

192 list[LoadPlan]: List of optimized global load plans. 

193 """ 

194 

195 @abc.abstractmethod 

196 def execute_read(self, plan: LoadPlan, planner: LoadPlanner) -> None: 

197 """ 

198 Execute read operation from storage according to the load plan. 

199 

200 Args: 

201 plan (LoadPlan): The load plan to execute. 

202 planner (LoadPlanner): The load planner instance for applying loaded data. 

203 """