Coverage for hyper_parallel / core / checkpoint / storage.py: 100%

42 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-03-01 07:33 +0800

1# Copyright 2026 Huawei Technologies Co., Ltd 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# ============================================================================ 

15"""Storage interfaces for checkpoint save and load.""" 

16import abc 

17from dataclasses import dataclass 

18from pathlib import Path 

19from typing import Optional, Union 

20 

21from hyper_parallel.core.checkpoint.metadata import Metadata, MetadataIndex 

22from hyper_parallel.core.checkpoint.planner import LoadPlan, LoadPlanner, SavePlan, SavePlanner 

23 

24_metadata_file_name = ".metadata" 

25 

26@dataclass 

27class StorageInfo: 

28 """ 

29 Storage information for a single logical item. 

30 

31 Torch-aligned: matches torch.distributed.checkpoint.filesystem._StorageInfo. 

32 

33 Attributes: 

34 relative_path: Path relative to checkpoint root. 

35 offset: Byte offset within the file. 

36 length: Byte length of the data (best-effort, may be -1 for tensors). 

37 """ 

38 relative_path: str 

39 offset: int 

40 length: int 

41 

42 

43@dataclass 

44class WriteResult: 

45 """ 

46 Result of writing a single logical item. 

47 

48 Torch-aligned: contains the metadata index and storage information. 

49 """ 

50 index: MetadataIndex 

51 storage_data: StorageInfo 

52 

53 

54class StorageWriter(abc.ABC): 

55 """ 

56 Abstract base class for storage writers. 

57 

58 Defines the interface for writing checkpoint data to storage backends. 

59 """ 

60 

61 @abc.abstractmethod 

62 def initialize_writer(self, checkpoint_id: Optional[Union[Path, str]] = None) -> None: 

63 """ 

64 Initialize storage writer with optional new checkpoint directory. 

65 

66 Args: 

67 checkpoint_id (Optional[Union[Path, str]]): The ID/path of the checkpoint directory. 

68 If None, uses the previously configured checkpoint directory. Default None. 

69 """ 

70 

71 @abc.abstractmethod 

72 def configure_writer(self, is_coordinator: bool, **kwargs) -> None: 

73 """ 

74 Configure storage writer with coordinator and rank information. 

75 

76 Args: 

77 is_coordinator (bool): Whether this rank is the coordinator rank. 

78 **kwargs: Additional keyword arguments (e.g., rank, use_collectives). 

79 """ 

80 

81 @abc.abstractmethod 

82 def optimize_local_plan(self, plan: SavePlan) -> SavePlan: 

83 """ 

84 Optimize local plan for storage-specific optimizations. 

85 

86 Args: 

87 plan (SavePlan): The local save plan to optimize. 

88 

89 Returns: 

90 SavePlan: The optimized local save plan. 

91 """ 

92 

93 @abc.abstractmethod 

94 def optimize_global_plan(self, plans: list[SavePlan]) -> list[SavePlan]: 

95 """ 

96 Optimize global plan from all local plans. 

97 

98 Args: 

99 plans (list[SavePlan]): List of local save plans from all ranks. 

100 

101 Returns: 

102 list[SavePlan]: List of optimized global save plans. 

103 """ 

104 

105 @abc.abstractmethod 

106 def execute_write(self, plan: SavePlan, planner: SavePlanner) -> list[WriteResult]: 

107 """ 

108 Execute write operation to storage and return write results. 

109 

110 Args: 

111 plan (SavePlan): The save plan to execute. 

112 planner (SavePlanner): The save planner instance for accessing tensor data. 

113 

114 Returns: 

115 list[WriteResult]: List of write results containing storage information for each written item. 

116 """ 

117 

118 @abc.abstractmethod 

119 def finalize_checkpoint(self, metadata: Metadata, results: list[list[WriteResult]]) -> None: 

120 """ 

121 Finalize checkpoint writing and complete metadata. 

122 

123 Args: 

124 metadata (Metadata): The checkpoint metadata to finalize. 

125 results (list[list[WriteResult]]): List of write results from all ranks, 

126 where each inner list contains WriteResults from one rank. 

127 """ 

128 

129 

130class StorageReader(abc.ABC): 

131 """ 

132 Abstract base class for storage readers. 

133 

134 Defines the interface for reading checkpoint data from storage backends. 

135 """ 

136 

137 @abc.abstractmethod 

138 def initialize_reader(self, checkpoint_id: Optional[Union[Path, str]] = None) -> None: 

139 """ 

140 Initialize storage reader with optional new checkpoint directory. 

141 

142 Args: 

143 checkpoint_id (Optional[Union[Path, str]]): The ID/path of the checkpoint directory. 

144 If None, uses the previously configured checkpoint directory. Default None. 

145 """ 

146 

147 @abc.abstractmethod 

148 def load_metadata(self, **kwargs) -> Metadata: 

149 """ 

150 Load checkpoint metadata from storage. 

151 

152 Args: 

153 **kwargs: Additional keyword arguments (e.g., rank for rank-local metadata). 

154 

155 Returns: 

156 Metadata: The loaded checkpoint metadata. 

157 """ 

158 

159 @abc.abstractmethod 

160 def configure_reader(self, metadata: Metadata, is_coordinator: bool, **kwargs) -> None: 

161 """ 

162 Configure storage reader with metadata and coordinator information. 

163 

164 Args: 

165 metadata (Metadata): The checkpoint metadata. 

166 is_coordinator (bool): Whether this rank is the coordinator rank. 

167 **kwargs: Additional keyword arguments (e.g., rank, use_collectives). 

168 """ 

169 

170 @abc.abstractmethod 

171 def optimize_local_plan(self, plan: LoadPlan) -> LoadPlan: 

172 """ 

173 Optimize local plan for storage-specific optimizations. 

174 

175 Args: 

176 plan (LoadPlan): The local load plan to optimize. 

177 

178 Returns: 

179 LoadPlan: The optimized local load plan. 

180 """ 

181 

182 @abc.abstractmethod 

183 def optimize_global_plan(self, plans: list[LoadPlan]) -> list[LoadPlan]: 

184 """ 

185 Optimize global plan from all local plans. 

186 

187 Args: 

188 plans (list[LoadPlan]): List of local load plans from all ranks. 

189 

190 Returns: 

191 list[LoadPlan]: List of optimized global load plans. 

192 """ 

193 

194 @abc.abstractmethod 

195 def execute_read(self, plan: LoadPlan, planner: LoadPlanner) -> None: 

196 """ 

197 Execute read operation from storage according to the load plan. 

198 

199 Args: 

200 plan (LoadPlan): The load plan to execute. 

201 planner (LoadPlanner): The load planner instance for applying loaded data. 

202 """