Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 271 additions & 0 deletions module/pkg/patterns/nccl_hang.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
// SPDX-License-Identifier: Apache-2.0

package patterns

import (
"fmt"
"sort"
"strings"
"time"
)

// DefaultHangThreshold is the wall-clock age past which a NCCL
// collective record in a non-completed state counts as hung. 5min
// matches NCCL_DESYNC_DEBUG's default heartbeat window and the
// operator-facing "did training stop progressing?" expectation —
// shorter would chase normal long-tail collectives (FSDP all-gather
// over a slow link can take 30-90s); longer makes a hang invisible
// for the entire on-call rotation. Operators raise this via
// NCCLHangDetector.HangThreshold when running with longer collectives.
const DefaultHangThreshold = 5 * time.Minute

// MinHangingRanks is the cohort floor — a single stuck rank may be a
// straggler, not a hang. Two ranks blocking the same collective_id
// past the threshold is the smallest cross-rank signal that proves
// the collective itself is not progressing.
const MinHangingRanks = 2

// NCCLFRRecord is the typed projection of one NCCL FlightRecorder
// ring-buffer entry the detector consumes. Field names mirror the
// upstream PyTorch FlightRecorder.hpp / fr_parser.Record shape, narrowed
// to the fields the hang detector actually reads. Detectors read
// NCCLFRRecord values directly — no plog grep — so a schema rename in
// the upstream receiver surfaces as a compile error.
type NCCLFRRecord struct {
// Rank is the global rank that emitted this record. The
// rankjoinprocessor stamps gen_ai.training.rank on the underlying
// log record; the patterndetectorprocessor projects it here.
Rank int64 `json:"rank"`

// PgID is the NCCL process-group id. Hang verdicts scope to one
// pg — a parallel pg's all-reduce on a different collective is
// unrelated.
PgID int64 `json:"pg_id"`

// CollectiveSeqID is the per-pg monotonic collective counter. The
// load-bearing join key for a cross-rank hang: ranks blocking the
// same CollectiveSeqID are blocking the same collective op.
CollectiveSeqID int64 `json:"collective_seq_id"`

// State is the FR-emitted state: "scheduled", "started",
// "completed". A rank with State != "completed" past the hang
// threshold counts as hung.
State string `json:"state"`

// ProfilingName is the collective op name (e.g. "nccl:all_reduce",
// "nccl:all_gather"). Renders into the verdict headline so the
// operator sees WHICH collective is stuck.
ProfilingName string `json:"profiling_name,omitempty"`

// TimeDiscoveredStartedNs is the wall-clock nanosecond stamp the
// rank's watchdog observed the collective start. The detector
// measures hang age as (Now - TimeDiscoveredStarted).
TimeDiscoveredStartedNs int64 `json:"time_discovered_started_ns,omitempty"`
}

// NCCLHangVerdict is the M19+ nccl_hang pattern output. Distinct
// shape from PodEvictedVerdict because the join key (collective + pg)
// and per-rank cohort have no analog on the pod-evicted side. JSON
// field names follow the verdict.schema.json snake-case convention.
type NCCLHangVerdict struct {
PatternID string `json:"pattern.id"`
Headline string `json:"headline"`
Remediation string `json:"remediation"`
EvidenceTrail []EvidenceRef `json:"evidence_trail"`

// PgID names the process group the hang is scoped to.
PgID int64 `json:"pg_id"`

// CollectiveSeqID is the collective_seq_id every hanging rank is
// blocked on. Load-bearing for operator triage — paste into
// py-spy / cuda-gdb queries.
CollectiveSeqID int64 `json:"collective_seq_id"`

// HangingRanks is the set of ranks blocking the collective past
// the hang threshold, sorted ascending for deterministic output.
HangingRanks []int64 `json:"hanging_ranks"`
}

// NCCLHangDetector is the nccl_hang pattern detector (NORTHSTAR
// pattern #2 in the v0.3.0 ladder). Zero-value usage is permitted —
// HangThreshold defaults to DefaultHangThreshold; Now defaults to
// time.Now() at Evaluate time.
type NCCLHangDetector struct {
// HangThreshold is the wall-clock age past which a non-completed
// rank record counts as hung. Zero means use DefaultHangThreshold.
HangThreshold time.Duration

// Now is the evaluation wall-clock. Overridable in tests so hang
// age doesn't depend on the wall clock. Zero means time.Now().
Now time.Time
}

// Evaluate scans cross-rank NCCL FR records and emits one
// NCCLHangVerdict per (pg_id, collective_seq_id) where MinHangingRanks
// or more ranks have a non-completed record older than HangThreshold.
//
// Records that have later "completed" entries for the same (rank,
// pg, collective) supersede the earlier "started" — the FR ring
// buffer holds the historical trail and the detector must take the
// rank's most recent observed state.
//
// Output is sorted by (collective_seq_id, earliest_started_ns) so
// golden tests are stable across replay runs.
func (d NCCLHangDetector) Evaluate(records []NCCLFRRecord) []NCCLHangVerdict {
threshold := d.HangThreshold
if threshold <= 0 {
threshold = DefaultHangThreshold
}
now := d.Now
if now.IsZero() {
now = time.Now()
}

// Take the latest record per (pg, collective, rank). A later
// "completed" entry for the same triple supersedes an earlier
// "started" — the rank progressed past the op.
type rankKey struct {
pg int64
collective int64
rank int64
}
latest := map[rankKey]NCCLFRRecord{}
for _, r := range records {
k := rankKey{r.PgID, r.CollectiveSeqID, r.Rank}
prev, ok := latest[k]
if !ok || r.TimeDiscoveredStartedNs >= prev.TimeDiscoveredStartedNs {
latest[k] = r
}
}

// Group surviving (non-completed AND over-threshold) records by
// (pg, collective).
type cohortKey struct {
pg int64
collective int64
}
cohorts := map[cohortKey][]NCCLFRRecord{}
cutoffNs := now.Add(-threshold).UnixNano()
for _, r := range latest {
if r.State == "completed" {
continue
}
if r.TimeDiscoveredStartedNs == 0 || r.TimeDiscoveredStartedNs > cutoffNs {
continue
}
k := cohortKey{r.PgID, r.CollectiveSeqID}
cohorts[k] = append(cohorts[k], r)
}

verdicts := make([]NCCLHangVerdict, 0, len(cohorts))
for _, cohort := range cohorts {
if len(cohort) < MinHangingRanks {
continue
}
verdicts = append(verdicts, buildNCCLHangVerdict(cohort))
}

sort.SliceStable(verdicts, func(i, j int) bool {
if verdicts[i].CollectiveSeqID != verdicts[j].CollectiveSeqID {
return verdicts[i].CollectiveSeqID < verdicts[j].CollectiveSeqID
}
return earliestStarted(verdicts[i]).Before(earliestStarted(verdicts[j]))
})
return verdicts
}

// buildNCCLHangVerdict materializes one verdict from a same-collective
// cohort. Sort the cohort by rank ascending so EvidenceTrail order and
// HangingRanks order are both deterministic.
func buildNCCLHangVerdict(cohort []NCCLFRRecord) NCCLHangVerdict {
sort.SliceStable(cohort, func(i, j int) bool {
return cohort[i].Rank < cohort[j].Rank
})

ranks := make([]int64, 0, len(cohort))
trail := make([]EvidenceRef, 0, len(cohort))
for _, r := range cohort {
ranks = append(ranks, r.Rank)
startedAt := time.Unix(0, r.TimeDiscoveredStartedNs).UTC()
trail = append(trail, EvidenceRef{
Kind: EvidenceKindNCCLFR,
UID: fmt.Sprintf("pg=%d/collective=%d/rank=%d", r.PgID, r.CollectiveSeqID, r.Rank),
Timestamp: startedAt,
Description: ncclHangEvidenceDescription(r, startedAt),
})
}

first := cohort[0]
return NCCLHangVerdict{
PatternID: PatternIDNCCLHang,
PgID: first.PgID,
CollectiveSeqID: first.CollectiveSeqID,
HangingRanks: ranks,
Headline: ncclHangHeadline(first, len(ranks)),
Remediation: ncclHangRemediation(first, ranks),
EvidenceTrail: trail,
}
}

// ncclHangHeadline renders the operator-facing one-liner.
func ncclHangHeadline(r NCCLFRRecord, n int) string {
op := r.ProfilingName
if op == "" {
op = "nccl:collective"
}
return fmt.Sprintf("%d ranks hung on collective %d (%s) in pg %d", n, r.CollectiveSeqID, op, r.PgID)
}

// ncclHangRemediation returns the operator-actionable remediation
// prose. Pins the offending collective + rank list so the alert
// payload is self-contained — the operator doesn't have to cross-walk
// the evidence trail to learn which ranks to attach to.
func ncclHangRemediation(r NCCLFRRecord, ranks []int64) string {
rankList := joinInts(ranks)
return fmt.Sprintf(
"Inspect hung ranks (%s) on pg=%d collective_seq_id=%d: run py-spy dump --pid on each rank's training process, check NCCL_DEBUG=INFO logs around the collective, verify network reachability between ranks (ibstat / nccl-tests).",
rankList, r.PgID, r.CollectiveSeqID,
)
}

// ncclHangEvidenceDescription renders the per-rank evidence line.
func ncclHangEvidenceDescription(r NCCLFRRecord, startedAt time.Time) string {
op := r.ProfilingName
if op == "" {
op = "nccl:collective"
}
return fmt.Sprintf("Rank %d stuck on %s (collective_seq_id=%d, state=%s) since %s",
r.Rank, op, r.CollectiveSeqID, r.State, startedAt.Format(time.RFC3339))
}

// earliestStarted returns the verdict's earliest-stuck timestamp for
// stable sort ordering.
func earliestStarted(v NCCLHangVerdict) time.Time {
if len(v.EvidenceTrail) == 0 {
return time.Time{}
}
earliest := v.EvidenceTrail[0].Timestamp
for _, e := range v.EvidenceTrail[1:] {
if e.Timestamp.Before(earliest) {
earliest = e.Timestamp
}
}
return earliest
}

// joinInts renders a sorted-int slice as a comma-separated list for
// the remediation prose.
func joinInts(xs []int64) string {
parts := make([]string, len(xs))
for i, x := range xs {
parts[i] = fmt.Sprintf("%d", x)
}
return strings.Join(parts, ",")
}

// PatternIDNCCLHang is the nccl-hang pattern identifier.
const PatternIDNCCLHang = "15"

// EvidenceKindNCCLFR names the NCCL FlightRecorder evidence surface.
// Hoisted alongside EvidenceKindPodEvent / EvidenceKindNodeCondition.
const EvidenceKindNCCLFR = "nccl_fr"
Loading
Loading