Diff Coverage

Diff: origin/master...HEAD, staged and unstaged changes

Source File Diff Coverage (%) Missing Lines
hyper_parallel/auto_parallel/hyper_offload/execution/replay/executor.py 50.0% 74
hyper_parallel/auto_parallel/hyper_offload/planning/__init__.py 100%  
hyper_parallel/auto_parallel/hyper_offload/planning/peak.py 100%  
hyper_parallel/auto_parallel/hyper_offload/planning/peak_strategy/__init__.py 100%  
hyper_parallel/auto_parallel/hyper_offload/planning/peak_strategy/logging_utils.py 100%  
hyper_parallel/auto_parallel/hyper_offload/planning/peak_strategy/masking.py 94.6% 107,111,129,133
hyper_parallel/auto_parallel/hyper_offload/planning/peak_strategy/modeling.py 92.9% 55,61,112,121-123,142
hyper_parallel/auto_parallel/hyper_offload/planning/peak_strategy/planner.py 97.6% 63,75
hyper_parallel/auto_parallel/hyper_offload/planning/peak_strategy/schedule_builder.py 95.9% 134,146
hyper_parallel/auto_parallel/hyper_offload/planning/peak_strategy/selection.py 92.1% 34,51,60,199,212,238,270,318,324-328
hyper_parallel/auto_parallel/hyper_offload/planning/peak_strategy/types.py 95.8% 90-94
hyper_parallel/auto_parallel/hyper_offload/execution/replay/executor.py
70
71
72
73
74
75
76
77
78
        for action in self._schedule.pre_actions(self.op_idx):
            if action.kind == ResidencyActionType.COPY_H2D:
                self.residency_manager.copy_h2d(action.storage_id)
            elif action.kind == ResidencyActionType.RELEASE_HOST:
                self.residency_manager.release_host(action.storage_id)
            else:
                raise RuntimeError(f"unsupported pre action {action} at op={self.op_idx}")

    def on_op_end(self, result) -> Any:
hyper_parallel/auto_parallel/hyper_offload/planning/peak_strategy/masking.py
103
104
105
106
107
108
109
110
111
112
113
114
115

def find_latest_release_position(context: PositionSearchContext) -> int:
    """Binary-search the farthest safe release position."""
    if context.bound_position <= context.origin_position:
        return context.origin_position
    lower = context.origin_position
    upper = min(context.bound_position, len(context.resident_bytes) - 1)
    if upper <= lower:
        return context.origin_position

    left = lower
    right = upper
    best = context.origin_position
125
126
127
128
129
130
131
132
133
134
135
136
137

def find_earliest_prefetch_position(context: PositionSearchContext) -> int:
    """Binary-search the earliest safe prefetch position."""
    if context.bound_position >= context.origin_position:
        return context.origin_position
    lower = max(context.bound_position, 0)
    upper = context.origin_position
    if lower >= upper:
        return context.origin_position

    left = lower
    right = upper
    best = context.origin_position
hyper_parallel/auto_parallel/hyper_offload/planning/peak_strategy/modeling.py
51
52
53
54
55
56
57
58
59

    for sid, accesses in accesses_by_storage.items():
        size_bytes = trace.storage_sizes.get(sid)
        if size_bytes is None or size_bytes <= 0 or not accesses:
            continue

        ordered = sorted(accesses, key=lambda access: access.op_id)
        first_op_id = ordered[0].op_id
        last_op_id = extend_last_use_for_alias_ops(trace, sid, ordered[-1].op_id)
57
58
59
60
61
62
63
64
65
        ordered = sorted(accesses, key=lambda access: access.op_id)
        first_op_id = ordered[0].op_id
        last_op_id = extend_last_use_for_alias_ops(trace, sid, ordered[-1].op_id)
        if first_op_id > last_op_id:
            continue

        _add_lifetime_bytes(resident_bytes, edge_bytes, first_op_id, last_op_id, size_bytes)
        if sid in trace.retained_sids:
            continue
108
109
110
111
112
113
114
115
116
    extended = last_op_id
    for idx in range(last_op_id + 1, len(trace.ops)):
        op = trace.ops[idx]
        if is_alias_forward_op(op, sid):
            extended = idx
    return extended


def is_alias_forward_op(op, sid: int) -> bool:
117
118
119
120
121
122
123
124
125
126
127
    """Infer view-like storage forwarding from the trace-local access pattern."""
    accesses = [access for access in op.accesses if access.storage_id == sid]
    if not accesses:
        return False
    op_storage_ids = {access.storage_id for access in op.accesses}
    access_kinds = {access.kind for access in accesses}
    return (
        len(op_storage_ids) == 1
        and AccessKind.READ in access_kinds
        and AccessKind.WRITE in access_kinds
    )
138
139
140
141
142
143
144
145
146
        for index, access in enumerate(context.ordered_accesses)
        if is_version_start_access(context.ordered_accesses, index, access)
    ]
    if not version_starts:
        version_starts = [0]

    for position, version_start in enumerate(version_starts):
        version_stop = version_stop_at(version_starts, position, len(context.ordered_accesses))
        version_candidates = build_interior_candidates(context, version_start, version_stop)
hyper_parallel/auto_parallel/hyper_offload/planning/peak_strategy/planner.py
59
60
61
62
63
64
65
66
67
            "score_weight_neighbor_impact": "neighbor_impact",
        }
        unknown_keys = set(score_weights) - set(allowed_keys)
        if unknown_keys:
            raise TypeError(f"Unexpected score weight keyword(s): {sorted(unknown_keys)}")

        values = {
            field_name: score_weights.get(option_name, 1.0)
            for option_name, field_name in allowed_keys.items()
71
72
73
74
75
76
77
78
79
    def build(self, trace: ActivationTrace) -> ResidencySchedule:
        """Build a residency schedule from a captured activation trace."""
        limit = trace.memory_limit_bytes if trace.memory_limit_bytes is not None else float("inf")
        if not trace.ops or limit == float("inf"):
            return ResidencySchedule()

        accesses_by_storage = self._group_accesses_by_storage(trace)
        profile = self._build_profile_and_candidates(trace, accesses_by_storage)
        unavoidable_peak = self._compute_unavoidable_peak(profile.resident_bytes, profile.candidates)
hyper_parallel/auto_parallel/hyper_offload/planning/peak_strategy/schedule_builder.py
130
131
132
133
134
135
136
137
        order += 1

    for sid, access in context.releasable.items():
        if sid in context.retained_sids:
            continue
        post_actions.append((access.op_id, 20, order, ResidencyActionType.RELEASE_DEVICE, sid))
        order += 1
    return post_actions
142
143
144
145
146
147
148
    emitted: set[tuple[int, ResidencyActionType, int]] = set()
    for op_id, _, _, kind, sid in sorted(post_actions):
        action_key = (op_id, kind, sid)
        if action_key in emitted:
            continue
        schedule.add_post(op_id, kind, sid)
        emitted.add(action_key)
hyper_parallel/auto_parallel/hyper_offload/planning/peak_strategy/selection.py
30
31
32
33
34
35
36
37
38

def find_global_peak(resident_bytes: list[int]) -> tuple[int, int] | None:
    """Return ``(op_id, resident_bytes)`` for the current global peak."""
    if not resident_bytes:
        return None

    peak_idx = 0
    peak_val = resident_bytes[0]
    for idx in range(1, len(resident_bytes)):
47
48
49
50
51
52
53
54
55
    candidates: list[PeakCandidate],
) -> tuple[int, int, int] | None:
    """Return the highest resident floor that candidate selection cannot remove."""
    if not resident_bytes:
        return None

    reducible_bytes = [0] * len(resident_bytes)
    covered_storage_ids = [set() for _ in resident_bytes]
    for candidate in candidates:
56
57
58
59
60
61
62
63
64
        start = max(0, candidate.start_interior)
        end = min(len(resident_bytes) - 1, candidate.end_interior)
        for op_id in range(start, end + 1):
            if candidate.storage_id in covered_storage_ids[op_id]:
                continue
            covered_storage_ids[op_id].add(candidate.storage_id)
            reducible_bytes[op_id] += candidate.size_bytes

    peak_op = 0
195
196
197
198
199
200
201
202
203
    if best is None:
        return True
    if score != best_score:
        return score > best_score
    return candidate.peak_reduction_score > best.peak_reduction_score


def is_candidate_eligible(
    candidate: PeakCandidate,
208
209
210
211
212
213
214
215
    """Return whether a candidate can reduce the current peak slot."""
    if candidate.selection_key in selected_candidate_keys:
        return False
    if candidate.first_op_id >= peak_idx:
        return False
    if not candidate.start_interior <= peak_idx <= candidate.end_interior:
        return False
    return peak_idx < len(resident_bytes) and resident_bytes[peak_idx] > 0
234
235
236
237
238
239
240
241
242
    limit: float,
) -> list[PeakCandidate]:
    """Select adjacent-op gaps that only free memory between operator slots."""
    if limit == float("inf") or not edge_bytes:
        return []

    selected: list[PeakCandidate] = []
    unavailable_keys = set(selected_candidate_keys)
    while True:
266
267
268
269
270
271
272
273
274
    """Score a boundary candidate by its simulated edge-memory reduction."""
    start = candidate.release_start.op_id
    end = candidate.end.op_id
    if end - start != 1 or start < 0 or start >= len(edge_bytes):
        return (0, 0, 0, 0)

    before = edge_bytes[start]
    after = max(0, before - candidate.size_bytes)
    peak_reduction = max(0, before - max(after, limit))
314
315
316
317
318
319
320
321
322
        overlap_end = min(candidate.last_op_id, end)
        if overlap_start < overlap_end:
            overlaps.append((overlap_start, overlap_end))
    if not overlaps:
        return 0.0

    overlaps.sort()
    merged = 0
    current_start, current_end = overlaps[0]
320
321
322
323
324
325
326
327
328
329
330
331
    overlaps.sort()
    merged = 0
    current_start, current_end = overlaps[0]
    for start, end in overlaps[1:]:
        if start <= current_end:
            current_end = max(current_end, end)
            continue
        merged += current_end - current_start
        current_start, current_end = start, end
    merged += current_end - current_start

    return merged / max(1, candidate.last_op_id - candidate.first_op_id)
hyper_parallel/auto_parallel/hyper_offload/planning/peak_strategy/types.py
86
87
88
89
90
91
92
93
94
95
96
97
98
    releasable: dict[int, StorageAccess]

    def __iter__(self):
        """Keep compatibility with the historical tuple return shape."""
        yield self.resident_bytes
        yield self.edge_bytes
        yield self.candidates
        yield self.boundary_candidates
        yield self.releasable


@dataclass(frozen=True)
class ScoreWeights: