Diff Coverage

Diff: origin/master...HEAD, staged and unstaged changes

Source File Diff Coverage (%) Missing Lines
hyper_parallel/core/pipeline_parallel/scheduler.py 0.0% 1143,1163,1247
hyper_parallel/core/pipeline_parallel/stage.py 0.0% 309,312,360,364
hyper_parallel/platform/mindspore/platform.py 100%  
hyper_parallel/platform/torch/platform.py 50.0% 589
hyper_parallel/core/pipeline_parallel/scheduler.py
1139
1140
1141
1142
1143
1144
1145
1146
1147
        # by 2 slots so the wrap-around grad (rank 0 stage ``rs`` -> rank
        # last_stage stage ``rs - 1``) lands AFTER its producer in column-scan
        # time.  Matches the +2 cooldown-rhythm offset that non-short Interleaved
        # 1F1B naturally has from extra 1F1B ops on rank last_stage.
        bubble = 2 * self._trailing_bubble()
        for op_idx in range(warmup_ops + fwd_bwd_ops, total_ops):
            ops.append(None)
            bwd_stage_idx = self.backward_stage_index(op_idx - warmup_ops, stage_index)
            bwd_micro_idx = bwd_stage_micro_index[bwd_stage_idx]
1159
1160
1161
1162
1163
1164
1165
1166
1167
        last_micro = self.micro_batch_num - 1
        last_stage = self.real_stage_num - 1
        # Double the bubble at the 1F1B->cooldown chunk boundary on rank
        # last_stage; see :meth:`_emit_cooldown_ops` for the alignment rationale.
        bubble = 2 * self._trailing_bubble()
        for op_idx in range(warmup_ops, warmup_ops + fwd_bwd_ops):
            fwd_stage_idx = self.forward_stage_index(op_idx, stage_index)
            fwd_micro_idx = fwd_stage_micro_index[fwd_stage_idx]
            ops.append(MetaStep(fwd_micro_idx, MetaStepType.FWD, fwd_stage_idx))
1243
1244
1245
1246
1247
1248
1249
1250
1251
        if self._short_micro() and stage_index == last_stage and bwd_steps:
            if bwd_steps[-1].micro_index == self.micro_batch_num - 1:
                # Double the bubble at the 1F1B->cooldown chunk boundary;
                # see :meth:`_emit_cooldown_ops` for the alignment rationale.
                ops.extend([None] * (2 * self._trailing_bubble()))
        return ops

    def construct_stage_exec_order(self, stage_index):
        """Construct the execution order for ``stage_index``.
hyper_parallel/core/pipeline_parallel/stage.py
305
306
307
308
309
310
311
312
313
314
315
316
        """
        requires_grad = bool(meta[-1])
        if len(meta) == 4:
            self._update_layout(meta[2])
            buffer = DTensor.from_local(platform.empty(meta[0], dtype=meta[1],
                                                       device=self.device), meta[2].mesh, meta[2].alias_placements)
        else:
            buffer = platform.empty(meta[0], dtype=meta[1], device=self.device)
        buffer.requires_grad = requires_grad
        if micro_index in self.args_recv_info:
            recv_info = self.args_recv_info[micro_index][idx]
            recv_info.buffer = buffer
356
357
358
359
360
361
362
363
364
365
366
367
368
    def _construct_backward_recv_info(self, micro_index, idx, global_rank, tensor_send):
        """construct backward recv info."""
        if micro_index not in self.grad_recv_info:
            shape = tensor_send.shape if not isinstance(tensor_send, DTensor) else tensor_send.local_shape
            buffer = platform.empty(shape, dtype=tensor_send.dtype, device=self.device)
            return _RecvInfo(global_rank, buffer)
        recv_info = self.grad_recv_info[micro_index][idx]
        shape = tensor_send.shape if not isinstance(tensor_send, DTensor) else tensor_send.local_shape
        recv_info.buffer = platform.empty(shape, dtype=tensor_send.dtype, device=self.device)
        return None

    def _extract_meta_from_tensor(self, tensor):
        """
hyper_parallel/platform/torch/platform.py
585
586
587
588
589
590
591
592
593

        Returns:
            Tensor: An uninitialized tensor.
        """
        return torch.empty(size, dtype=dtype, device=device)

    @staticmethod
    def get_rank():
        """