Diff Coverage

Diff: origin/master...HEAD, staged and unstaged changes

Source File Diff Coverage (%) Missing Lines
hyper_parallel/core/moe_utils.py 98.2% 115
hyper_parallel/platform/torch/common/moe.py 95.7% 490-491
hyper_parallel/trainer/callbacks/base.py 0.0% 228-231,667,669-670,673,676-677,680-688,692-693,695-697,699,708-709,711-713,718,720,723-724,726
hyper_parallel/trainer/config.py 0.0% 279-280
hyper_parallel/core/moe_utils.py
111
112
113
114
115
116
117
118
119

    if hasattr(group, "group"):
        return group

    return SimpleNamespace(group=group)


def _get_moe_layers(model: "nn.Module") -> list:
    """Collect all MoE sub-modules with ``enable_expert_bias=True``.
hyper_parallel/platform/torch/common/moe.py
486
487
488
489
490
491
492
493
494
495
    expert_fraction.scatter_add_(0, flat_experts, torch.ones_like(flat_scores))

    num_sub_sequence = 1
    if sequence_partition_group is not None:
        num_sub_sequence = dist.get_world_size(sequence_partition_group)
        dist.all_reduce(expert_fraction, group=sequence_partition_group)

    expert_fraction = expert_fraction / (num_tokens * num_sub_sequence * top_k)

    expert_prob = torch.zeros(
hyper_parallel/trainer/callbacks/base.py
224
225
226
227
228
229
230
231
232
233
234
235
                metrics["tflops"] = f"{observed_tflops:.1f}"
                metrics["mfu"] = f"{mfu * 100:.1f}%"

        # Include aux_loss from MoEMonitorCallback when available.
        moe_cb = self.trainer.moe_monitor_callback
        aux_loss = getattr(moe_cb, 'last_mean_aux_loss', None)
        if aux_loss is not None:
            metrics["aux_loss"] = f"{aux_loss:.6f}"

        logger.info_rank0(" | ".join(f"{k}={v}" for k, v in metrics.items()))

        record = {
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
        """Initialize MoEMonitorCallback from trainer config."""
        super().__init__(trainer)
        moe_cfg = getattr(trainer.args, 'moe_monitor', None)
        self.enabled = getattr(moe_cfg, 'enabled', False) if moe_cfg else False
        self._impl = None

        if self.enabled:
            from hyper_parallel.core.moe_utils import (  # pylint: disable=C0415
                MoEMonitorCallback as _CoreMoEMonitorCallback,
            )
            from hyper_parallel.core.fully_shard.hsdp_utils import (  # pylint: disable=C0415
                GroupInfo,
            )
            lr = getattr(moe_cfg, 'lr', 1e-3)
            num_recomputations = getattr(moe_cfg, 'num_recomputations', 1)

            # Resolve DP/TP/CP groups from trainer's device mesh.
            dp_group = getattr(self.trainer, '_dp_group_info', None)
            tp_group = None
            cp_group = None
            mesh = getattr(self.trainer, 'mesh', None)
            if mesh is not None:
                for name, attr_name in [("tp", "tp_group"), ("cp", "cp_group")]:
                    try:
                        raw_group = mesh.get_group(name)
                        group_info = GroupInfo(
                            group_name=name, group=raw_group,
                            rank_size=raw_group.size(),
                        )
                        if attr_name == "tp_group":
                            tp_group = group_info
                        else:
                            cp_group = group_info
                    except (KeyError, ValueError, AttributeError):
                        pass

            self._impl = _CoreMoEMonitorCallback(
                model=self.trainer.model,
                lr=lr,
                dp_group=dp_group,
                tp_group=tp_group,
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
                cp_group=cp_group,
                num_recomputations=num_recomputations,
            )

    @property
    def last_mean_aux_loss(self) -> Optional[float]:
        """Mean aux_loss across MoE layers from the last ``on_step_end``."""
        if self._impl is not None:
            return self._impl.last_mean_aux_loss
        return None

    def on_train_begin(self, state: "TrainerState", **kwargs) -> None:
        """Log one-time confirmation when MoE monitoring is enabled."""
        if self.enabled and platform.get_rank() == 0:
            logger.info("MoEMonitorCallback: MoE expert-load monitoring enabled")

    def on_step_end(self, state: "TrainerState", *, loss: float = None,
                    grad_norm: float = None, **kwargs) -> None:
        """Delegate expert bias update to core MoEMonitorCallback."""
        if self._impl is not None:
            self._impl.on_step_end()

    def on_substep_end(self, state: "TrainerState", **kwargs) -> None:
        """No-op; expert bias updates happen in on_step_end."""

class GradientHealthCallback(Callback):
    """Detect NaN / Inf grad_norm and raise / warn.
hyper_parallel/trainer/config.py
275
276
277
278
279
280
281
282
283
284
        num_recomputations: Number of forward executions per optimizer step.
            Default ``1``. Set to ``2`` when activation checkpoint is enabled.
    """
    enabled: bool = False
    lr: float = 1e-3
    num_recomputations: int = 1

@dataclass
class EvalConfig:
    """``train.eval.*`` — eval cadence + dataset."""