Coverage for hyper_parallel / core / shard / ops / parallel_cumsum.py: 87%

30 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-03-01 07:33 +0800

1# Copyright 2026 Huawei Technologies Co., Ltd 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# ============================================================================ 

15""" 

16Distributed implementation for Cumsum operator. 

17""" 

18 

19from hyper_parallel.core.layout import Layout 

20from .parallel_ops import DistributedOp 

21 

22class CumsumDistributedOp(DistributedOp): 

23 """Distributed implementation for torch.cumsum.""" 

24 

25 def infer_layout(self, layouts, extra_args=None): 

26 """ 

27 Infer output layout for torch.cumsum 

28 

29 PyTorch semantics: 

30 - Computes cumulative sum along dimension `dim` 

31 - Output shape is identical to input shape 

32 - Operation is sequential along `dim`: each element depends on all preceding elements in that dimension 

33 

34 Critical sharding constraint: 

35 - The dimension `dim` MUST be unsharded (-1 in tensor_map) 

36 

37 Args: 

38 layouts (tuple): Layouts of inputs. Expected: 

39 layouts[0] (Layout): Input tensor layout (required). 

40 extra_args (tuple): Should contain 'dim'. Expected: 

41 extra_args[0] (int): Dimension to perform cumsum over (required). 

42 

43 Returns: 

44 Layout: Output tensor layout (identical to input layout after validation). 

45 """ 

46 if not layouts or layouts[0] is None: 

47 raise ValueError( 

48 f"Operation {self.op_name}: cumsum requires a valid input tensor layout." 

49 ) 

50 input_layout = layouts[0] 

51 in_tensor_map = input_layout.tensor_map 

52 input_ndim = len(in_tensor_map) 

53 

54 if not extra_args or extra_args[0] is None: 

55 raise ValueError( 

56 f"Operation {self.op_name}: cumsum requires 'dim' parameter in extra_args." 

57 ) 

58 dim = extra_args[0] 

59 if not isinstance(dim, int): 

60 raise ValueError( 

61 f"Operation {self.op_name}: 'dim' must be an integer, got {type(dim)}." 

62 ) 

63 

64 # Normalize negative dimensions 

65 normalized_dim = dim if dim >= 0 else dim + input_ndim 

66 if normalized_dim < 0 or normalized_dim >= input_ndim: 

67 raise ValueError( 

68 f"Operation {self.op_name}: Dimension {dim} out of range for " 

69 f"{input_ndim}-dimensional input tensor." 

70 ) 

71 

72 # cumsum dimension must be unsharded 

73 if in_tensor_map[normalized_dim] != -1: 

74 raise ValueError( 

75 f"Operation {self.op_name}: Cannot perform sharding on normalized dimension {normalized_dim}, " 

76 f"but found sharding assignment: {in_tensor_map[normalized_dim]}" 

77 ) 

78 

79 mesh_shape = input_layout.mesh_shape 

80 alias_name = input_layout.alias_name 

81 rank_list = input_layout.rank_list 

82 

83 # Create output layout 

84 def idx_to_alias(idx, aliases): 

85 if idx == -1: 

86 return "None" 

87 return aliases[len(aliases) - idx - 1] 

88 output_map = tuple(idx_to_alias(idx, alias_name) for idx in in_tensor_map) 

89 

90 output_layout = Layout( 

91 mesh_shape=mesh_shape, 

92 alias_name=alias_name, 

93 rank_list=rank_list 

94 ) 

95 output_layout = output_layout(*output_map) 

96 return output_layout