Coverage for hyper_parallel / core / shard / ops / parallel_cumsum.py: 87%
30 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-03-01 07:33 +0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-03-01 07:33 +0800
1# Copyright 2026 Huawei Technologies Co., Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ============================================================================
15"""
16Distributed implementation for Cumsum operator.
17"""
19from hyper_parallel.core.layout import Layout
20from .parallel_ops import DistributedOp
22class CumsumDistributedOp(DistributedOp):
23 """Distributed implementation for torch.cumsum."""
25 def infer_layout(self, layouts, extra_args=None):
26 """
27 Infer output layout for torch.cumsum
29 PyTorch semantics:
30 - Computes cumulative sum along dimension `dim`
31 - Output shape is identical to input shape
32 - Operation is sequential along `dim`: each element depends on all preceding elements in that dimension
34 Critical sharding constraint:
35 - The dimension `dim` MUST be unsharded (-1 in tensor_map)
37 Args:
38 layouts (tuple): Layouts of inputs. Expected:
39 layouts[0] (Layout): Input tensor layout (required).
40 extra_args (tuple): Should contain 'dim'. Expected:
41 extra_args[0] (int): Dimension to perform cumsum over (required).
43 Returns:
44 Layout: Output tensor layout (identical to input layout after validation).
45 """
46 if not layouts or layouts[0] is None:
47 raise ValueError(
48 f"Operation {self.op_name}: cumsum requires a valid input tensor layout."
49 )
50 input_layout = layouts[0]
51 in_tensor_map = input_layout.tensor_map
52 input_ndim = len(in_tensor_map)
54 if not extra_args or extra_args[0] is None:
55 raise ValueError(
56 f"Operation {self.op_name}: cumsum requires 'dim' parameter in extra_args."
57 )
58 dim = extra_args[0]
59 if not isinstance(dim, int):
60 raise ValueError(
61 f"Operation {self.op_name}: 'dim' must be an integer, got {type(dim)}."
62 )
64 # Normalize negative dimensions
65 normalized_dim = dim if dim >= 0 else dim + input_ndim
66 if normalized_dim < 0 or normalized_dim >= input_ndim:
67 raise ValueError(
68 f"Operation {self.op_name}: Dimension {dim} out of range for "
69 f"{input_ndim}-dimensional input tensor."
70 )
72 # cumsum dimension must be unsharded
73 if in_tensor_map[normalized_dim] != -1:
74 raise ValueError(
75 f"Operation {self.op_name}: Cannot perform sharding on normalized dimension {normalized_dim}, "
76 f"but found sharding assignment: {in_tensor_map[normalized_dim]}"
77 )
79 mesh_shape = input_layout.mesh_shape
80 alias_name = input_layout.alias_name
81 rank_list = input_layout.rank_list
83 # Create output layout
84 def idx_to_alias(idx, aliases):
85 if idx == -1:
86 return "None"
87 return aliases[len(aliases) - idx - 1]
88 output_map = tuple(idx_to_alias(idx, alias_name) for idx in in_tensor_map)
90 output_layout = Layout(
91 mesh_shape=mesh_shape,
92 alias_name=alias_name,
93 rank_list=rank_list
94 )
95 output_layout = output_layout(*output_map)
96 return output_layout