diff --git a/module/pkg/patterns/nccl_hang.go b/module/pkg/patterns/nccl_hang.go new file mode 100644 index 00000000..a6d130d0 --- /dev/null +++ b/module/pkg/patterns/nccl_hang.go @@ -0,0 +1,271 @@ +// SPDX-License-Identifier: Apache-2.0 + +package patterns + +import ( + "fmt" + "sort" + "strings" + "time" +) + +// DefaultHangThreshold is the wall-clock age past which a NCCL +// collective record in a non-completed state counts as hung. 5min +// matches NCCL_DESYNC_DEBUG's default heartbeat window and the +// operator-facing "did training stop progressing?" expectation — +// shorter would chase normal long-tail collectives (FSDP all-gather +// over a slow link can take 30-90s); longer makes a hang invisible +// for the entire on-call rotation. Operators raise this via +// NCCLHangDetector.HangThreshold when running with longer collectives. +const DefaultHangThreshold = 5 * time.Minute + +// MinHangingRanks is the cohort floor — a single stuck rank may be a +// straggler, not a hang. Two ranks blocking the same collective_id +// past the threshold is the smallest cross-rank signal that proves +// the collective itself is not progressing. +const MinHangingRanks = 2 + +// NCCLFRRecord is the typed projection of one NCCL FlightRecorder +// ring-buffer entry the detector consumes. Field names mirror the +// upstream PyTorch FlightRecorder.hpp / fr_parser.Record shape, narrowed +// to the fields the hang detector actually reads. Detectors read +// NCCLFRRecord values directly — no plog grep — so a schema rename in +// the upstream receiver surfaces as a compile error. +type NCCLFRRecord struct { + // Rank is the global rank that emitted this record. The + // rankjoinprocessor stamps gen_ai.training.rank on the underlying + // log record; the patterndetectorprocessor projects it here. + Rank int64 `json:"rank"` + + // PgID is the NCCL process-group id. Hang verdicts scope to one + // pg — a parallel pg's all-reduce on a different collective is + // unrelated. + PgID int64 `json:"pg_id"` + + // CollectiveSeqID is the per-pg monotonic collective counter. The + // load-bearing join key for a cross-rank hang: ranks blocking the + // same CollectiveSeqID are blocking the same collective op. + CollectiveSeqID int64 `json:"collective_seq_id"` + + // State is the FR-emitted state: "scheduled", "started", + // "completed". A rank with State != "completed" past the hang + // threshold counts as hung. + State string `json:"state"` + + // ProfilingName is the collective op name (e.g. "nccl:all_reduce", + // "nccl:all_gather"). Renders into the verdict headline so the + // operator sees WHICH collective is stuck. + ProfilingName string `json:"profiling_name,omitempty"` + + // TimeDiscoveredStartedNs is the wall-clock nanosecond stamp the + // rank's watchdog observed the collective start. The detector + // measures hang age as (Now - TimeDiscoveredStarted). + TimeDiscoveredStartedNs int64 `json:"time_discovered_started_ns,omitempty"` +} + +// NCCLHangVerdict is the M19+ nccl_hang pattern output. Distinct +// shape from PodEvictedVerdict because the join key (collective + pg) +// and per-rank cohort have no analog on the pod-evicted side. JSON +// field names follow the verdict.schema.json snake-case convention. +type NCCLHangVerdict struct { + PatternID string `json:"pattern.id"` + Headline string `json:"headline"` + Remediation string `json:"remediation"` + EvidenceTrail []EvidenceRef `json:"evidence_trail"` + + // PgID names the process group the hang is scoped to. + PgID int64 `json:"pg_id"` + + // CollectiveSeqID is the collective_seq_id every hanging rank is + // blocked on. Load-bearing for operator triage — paste into + // py-spy / cuda-gdb queries. + CollectiveSeqID int64 `json:"collective_seq_id"` + + // HangingRanks is the set of ranks blocking the collective past + // the hang threshold, sorted ascending for deterministic output. + HangingRanks []int64 `json:"hanging_ranks"` +} + +// NCCLHangDetector is the nccl_hang pattern detector (NORTHSTAR +// pattern #2 in the v0.3.0 ladder). Zero-value usage is permitted — +// HangThreshold defaults to DefaultHangThreshold; Now defaults to +// time.Now() at Evaluate time. +type NCCLHangDetector struct { + // HangThreshold is the wall-clock age past which a non-completed + // rank record counts as hung. Zero means use DefaultHangThreshold. + HangThreshold time.Duration + + // Now is the evaluation wall-clock. Overridable in tests so hang + // age doesn't depend on the wall clock. Zero means time.Now(). + Now time.Time +} + +// Evaluate scans cross-rank NCCL FR records and emits one +// NCCLHangVerdict per (pg_id, collective_seq_id) where MinHangingRanks +// or more ranks have a non-completed record older than HangThreshold. +// +// Records that have later "completed" entries for the same (rank, +// pg, collective) supersede the earlier "started" — the FR ring +// buffer holds the historical trail and the detector must take the +// rank's most recent observed state. +// +// Output is sorted by (collective_seq_id, earliest_started_ns) so +// golden tests are stable across replay runs. +func (d NCCLHangDetector) Evaluate(records []NCCLFRRecord) []NCCLHangVerdict { + threshold := d.HangThreshold + if threshold <= 0 { + threshold = DefaultHangThreshold + } + now := d.Now + if now.IsZero() { + now = time.Now() + } + + // Take the latest record per (pg, collective, rank). A later + // "completed" entry for the same triple supersedes an earlier + // "started" — the rank progressed past the op. + type rankKey struct { + pg int64 + collective int64 + rank int64 + } + latest := map[rankKey]NCCLFRRecord{} + for _, r := range records { + k := rankKey{r.PgID, r.CollectiveSeqID, r.Rank} + prev, ok := latest[k] + if !ok || r.TimeDiscoveredStartedNs >= prev.TimeDiscoveredStartedNs { + latest[k] = r + } + } + + // Group surviving (non-completed AND over-threshold) records by + // (pg, collective). + type cohortKey struct { + pg int64 + collective int64 + } + cohorts := map[cohortKey][]NCCLFRRecord{} + cutoffNs := now.Add(-threshold).UnixNano() + for _, r := range latest { + if r.State == "completed" { + continue + } + if r.TimeDiscoveredStartedNs == 0 || r.TimeDiscoveredStartedNs > cutoffNs { + continue + } + k := cohortKey{r.PgID, r.CollectiveSeqID} + cohorts[k] = append(cohorts[k], r) + } + + verdicts := make([]NCCLHangVerdict, 0, len(cohorts)) + for _, cohort := range cohorts { + if len(cohort) < MinHangingRanks { + continue + } + verdicts = append(verdicts, buildNCCLHangVerdict(cohort)) + } + + sort.SliceStable(verdicts, func(i, j int) bool { + if verdicts[i].CollectiveSeqID != verdicts[j].CollectiveSeqID { + return verdicts[i].CollectiveSeqID < verdicts[j].CollectiveSeqID + } + return earliestStarted(verdicts[i]).Before(earliestStarted(verdicts[j])) + }) + return verdicts +} + +// buildNCCLHangVerdict materializes one verdict from a same-collective +// cohort. Sort the cohort by rank ascending so EvidenceTrail order and +// HangingRanks order are both deterministic. +func buildNCCLHangVerdict(cohort []NCCLFRRecord) NCCLHangVerdict { + sort.SliceStable(cohort, func(i, j int) bool { + return cohort[i].Rank < cohort[j].Rank + }) + + ranks := make([]int64, 0, len(cohort)) + trail := make([]EvidenceRef, 0, len(cohort)) + for _, r := range cohort { + ranks = append(ranks, r.Rank) + startedAt := time.Unix(0, r.TimeDiscoveredStartedNs).UTC() + trail = append(trail, EvidenceRef{ + Kind: EvidenceKindNCCLFR, + UID: fmt.Sprintf("pg=%d/collective=%d/rank=%d", r.PgID, r.CollectiveSeqID, r.Rank), + Timestamp: startedAt, + Description: ncclHangEvidenceDescription(r, startedAt), + }) + } + + first := cohort[0] + return NCCLHangVerdict{ + PatternID: PatternIDNCCLHang, + PgID: first.PgID, + CollectiveSeqID: first.CollectiveSeqID, + HangingRanks: ranks, + Headline: ncclHangHeadline(first, len(ranks)), + Remediation: ncclHangRemediation(first, ranks), + EvidenceTrail: trail, + } +} + +// ncclHangHeadline renders the operator-facing one-liner. +func ncclHangHeadline(r NCCLFRRecord, n int) string { + op := r.ProfilingName + if op == "" { + op = "nccl:collective" + } + return fmt.Sprintf("%d ranks hung on collective %d (%s) in pg %d", n, r.CollectiveSeqID, op, r.PgID) +} + +// ncclHangRemediation returns the operator-actionable remediation +// prose. Pins the offending collective + rank list so the alert +// payload is self-contained — the operator doesn't have to cross-walk +// the evidence trail to learn which ranks to attach to. +func ncclHangRemediation(r NCCLFRRecord, ranks []int64) string { + rankList := joinInts(ranks) + return fmt.Sprintf( + "Inspect hung ranks (%s) on pg=%d collective_seq_id=%d: run py-spy dump --pid on each rank's training process, check NCCL_DEBUG=INFO logs around the collective, verify network reachability between ranks (ibstat / nccl-tests).", + rankList, r.PgID, r.CollectiveSeqID, + ) +} + +// ncclHangEvidenceDescription renders the per-rank evidence line. +func ncclHangEvidenceDescription(r NCCLFRRecord, startedAt time.Time) string { + op := r.ProfilingName + if op == "" { + op = "nccl:collective" + } + return fmt.Sprintf("Rank %d stuck on %s (collective_seq_id=%d, state=%s) since %s", + r.Rank, op, r.CollectiveSeqID, r.State, startedAt.Format(time.RFC3339)) +} + +// earliestStarted returns the verdict's earliest-stuck timestamp for +// stable sort ordering. +func earliestStarted(v NCCLHangVerdict) time.Time { + if len(v.EvidenceTrail) == 0 { + return time.Time{} + } + earliest := v.EvidenceTrail[0].Timestamp + for _, e := range v.EvidenceTrail[1:] { + if e.Timestamp.Before(earliest) { + earliest = e.Timestamp + } + } + return earliest +} + +// joinInts renders a sorted-int slice as a comma-separated list for +// the remediation prose. +func joinInts(xs []int64) string { + parts := make([]string, len(xs)) + for i, x := range xs { + parts[i] = fmt.Sprintf("%d", x) + } + return strings.Join(parts, ",") +} + +// PatternIDNCCLHang is the nccl-hang pattern identifier. +const PatternIDNCCLHang = "15" + +// EvidenceKindNCCLFR names the NCCL FlightRecorder evidence surface. +// Hoisted alongside EvidenceKindPodEvent / EvidenceKindNodeCondition. +const EvidenceKindNCCLFR = "nccl_fr" diff --git a/module/pkg/patterns/nccl_hang_test.go b/module/pkg/patterns/nccl_hang_test.go new file mode 100644 index 00000000..d8506fea --- /dev/null +++ b/module/pkg/patterns/nccl_hang_test.go @@ -0,0 +1,306 @@ +// SPDX-License-Identifier: Apache-2.0 + +package patterns_test + +import ( + "encoding/json" + "os" + "path/filepath" + "regexp" + "testing" + "time" + + "github.com/santhosh-tekuri/jsonschema/v6" + "github.com/stretchr/testify/require" + + "github.com/tracecoreai/tracecore/module/pkg/patterns" +) + +// nccl_hang detector test suite. The detector reads cross-rank NCCL +// FlightRecorder Records and emits one NCCLHangVerdict per detected +// collective hang. A hang is defined as: N>=2 ranks have a non- +// completed record for the same CollectiveSeqID, and at least one +// rank's last-discovered-started timestamp is older than HangThreshold +// from the evaluation wall-clock (Now). + +// TestNCCLHangDetector_PositiveThreeRanksStuck pins the canonical hang +// path: three ranks reach allreduce on collective 42, none progress +// past it within the hang threshold. Detector emits one verdict naming +// all three ranks and CollectiveSeqID=42. +func TestNCCLHangDetector_PositiveThreeRanksStuck(t *testing.T) { + t.Parallel() + + now := time.Unix(1_700_000_600, 0).UTC() + startedNs := now.Add(-10 * time.Minute).UnixNano() + records := []patterns.NCCLFRRecord{ + {Rank: 0, PgID: 1, CollectiveSeqID: 42, State: "started", ProfilingName: "nccl:all_reduce", TimeDiscoveredStartedNs: startedNs}, + {Rank: 1, PgID: 1, CollectiveSeqID: 42, State: "started", ProfilingName: "nccl:all_reduce", TimeDiscoveredStartedNs: startedNs}, + {Rank: 2, PgID: 1, CollectiveSeqID: 42, State: "started", ProfilingName: "nccl:all_reduce", TimeDiscoveredStartedNs: startedNs}, + } + + verdicts := patterns.NCCLHangDetector{Now: now}.Evaluate(records) + require.Len(t, verdicts, 1) + + v := verdicts[0] + require.Equal(t, patterns.PatternIDNCCLHang, v.PatternID) + require.Equal(t, int64(42), v.CollectiveSeqID) + require.Equal(t, int64(1), v.PgID) + require.ElementsMatch(t, []int64{0, 1, 2}, v.HangingRanks) + + headlineRE := regexp.MustCompile(`3 ranks hung on collective 42 \(nccl:all_reduce\)`) + require.Regexp(t, headlineRE, v.Headline) + + require.Regexp(t, regexp.MustCompile(`py-spy|inspect`), v.Remediation) + require.Len(t, v.EvidenceTrail, 3, "one evidence ref per hanging rank") + require.Equal(t, patterns.EvidenceKindNCCLFR, v.EvidenceTrail[0].Kind) +} + +// TestNCCLHangDetector_NegativeAllCompleted asserts the rubric's +// negative-fixture contract: when every rank has progressed past the +// collective (state=completed), zero verdicts. +func TestNCCLHangDetector_NegativeAllCompleted(t *testing.T) { + t.Parallel() + + now := time.Unix(1_700_000_600, 0).UTC() + startedNs := now.Add(-10 * time.Minute).UnixNano() + records := []patterns.NCCLFRRecord{ + {Rank: 0, PgID: 1, CollectiveSeqID: 42, State: "completed", TimeDiscoveredStartedNs: startedNs}, + {Rank: 1, PgID: 1, CollectiveSeqID: 42, State: "completed", TimeDiscoveredStartedNs: startedNs}, + {Rank: 2, PgID: 1, CollectiveSeqID: 42, State: "completed", TimeDiscoveredStartedNs: startedNs}, + } + + verdicts := patterns.NCCLHangDetector{Now: now}.Evaluate(records) + require.Empty(t, verdicts) +} + +// TestNCCLHangDetector_EdgeStragglerBarelyPassed asserts that a +// straggler whose started timestamp is JUST under the hang threshold +// does NOT trigger — even if 2 other ranks on the same collective are +// well past it. Concretely: 2 ranks hung 10min, 1 rank started 4min +// ago (default threshold 5min). The third rank hasn't been stuck long +// enough; the cohort is below the N>=2 over-threshold bar would still +// fire for the two stuck. But the rubric definition says "all hanging +// ranks block on the SAME collective_id" — we still emit because two +// ARE over-threshold on the same collective. The straggler is not +// listed as hanging. +func TestNCCLHangDetector_EdgeStragglerBarelyPassed(t *testing.T) { + t.Parallel() + + now := time.Unix(1_700_000_600, 0).UTC() + stuckNs := now.Add(-10 * time.Minute).UnixNano() + stragglerNs := now.Add(-4 * time.Minute).UnixNano() // under default 5min + records := []patterns.NCCLFRRecord{ + {Rank: 0, PgID: 1, CollectiveSeqID: 42, State: "started", ProfilingName: "nccl:all_reduce", TimeDiscoveredStartedNs: stuckNs}, + {Rank: 1, PgID: 1, CollectiveSeqID: 42, State: "started", ProfilingName: "nccl:all_reduce", TimeDiscoveredStartedNs: stuckNs}, + {Rank: 2, PgID: 1, CollectiveSeqID: 42, State: "started", ProfilingName: "nccl:all_reduce", TimeDiscoveredStartedNs: stragglerNs}, + } + + verdicts := patterns.NCCLHangDetector{Now: now}.Evaluate(records) + require.Len(t, verdicts, 1, "two over-threshold ranks on same collective is still a hang") + + v := verdicts[0] + require.ElementsMatch(t, []int64{0, 1}, v.HangingRanks, "straggler under threshold must be excluded") + require.Len(t, v.EvidenceTrail, 2) +} + +// TestNCCLHangDetector_SoloRankNotHang asserts that a single +// over-threshold rank does NOT trigger — a hang requires N>=2 ranks +// blocking on the same collective. +func TestNCCLHangDetector_SoloRankNotHang(t *testing.T) { + t.Parallel() + + now := time.Unix(1_700_000_600, 0).UTC() + stuckNs := now.Add(-10 * time.Minute).UnixNano() + records := []patterns.NCCLFRRecord{ + {Rank: 0, PgID: 1, CollectiveSeqID: 42, State: "started", TimeDiscoveredStartedNs: stuckNs}, + } + + verdicts := patterns.NCCLHangDetector{Now: now}.Evaluate(records) + require.Empty(t, verdicts) +} + +// TestNCCLHangDetector_ThresholdConfigurable asserts the +// HangThreshold field overrides the default 5min — operators on +// long-tail collectives raise it; tight inner-loop training lowers it. +func TestNCCLHangDetector_ThresholdConfigurable(t *testing.T) { + t.Parallel() + + now := time.Unix(1_700_000_600, 0).UTC() + startedNs := now.Add(-90 * time.Second).UnixNano() // 1.5min, under default + records := []patterns.NCCLFRRecord{ + {Rank: 0, PgID: 1, CollectiveSeqID: 7, State: "started", TimeDiscoveredStartedNs: startedNs}, + {Rank: 1, PgID: 1, CollectiveSeqID: 7, State: "started", TimeDiscoveredStartedNs: startedNs}, + } + + // Default threshold (5min): no verdict. + require.Empty(t, patterns.NCCLHangDetector{Now: now}.Evaluate(records)) + + // Tightened threshold (1min): hang fires. + v := patterns.NCCLHangDetector{Now: now, HangThreshold: time.Minute}.Evaluate(records) + require.Len(t, v, 1) +} + +// TestNCCLHangDetector_DeterministicOrder asserts the output is +// sorted by (collective_id, time) so golden tests stay reproducible +// when multiple hangs surface in one batch. +func TestNCCLHangDetector_DeterministicOrder(t *testing.T) { + t.Parallel() + + now := time.Unix(1_700_000_600, 0).UTC() + stuckNs := now.Add(-10 * time.Minute).UnixNano() + laterStuckNs := now.Add(-7 * time.Minute).UnixNano() + records := []patterns.NCCLFRRecord{ + // collective 99 — later start + {Rank: 0, PgID: 1, CollectiveSeqID: 99, State: "started", TimeDiscoveredStartedNs: laterStuckNs}, + {Rank: 1, PgID: 1, CollectiveSeqID: 99, State: "started", TimeDiscoveredStartedNs: laterStuckNs}, + // collective 42 — earlier start, must sort first + {Rank: 0, PgID: 1, CollectiveSeqID: 42, State: "started", TimeDiscoveredStartedNs: stuckNs}, + {Rank: 1, PgID: 1, CollectiveSeqID: 42, State: "started", TimeDiscoveredStartedNs: stuckNs}, + } + + verdicts := patterns.NCCLHangDetector{Now: now}.Evaluate(records) + require.Len(t, verdicts, 2) + require.Equal(t, int64(42), verdicts[0].CollectiveSeqID) + require.Equal(t, int64(99), verdicts[1].CollectiveSeqID) +} + +// TestNCCLHangDetector_DifferentCollectivesDoNotCrossJoin asserts that +// two ranks stuck on DIFFERENT collective_ids do NOT combine into a +// single verdict — the hang definition requires the cohort block on +// the SAME collective_id. +func TestNCCLHangDetector_DifferentCollectivesDoNotCrossJoin(t *testing.T) { + t.Parallel() + + now := time.Unix(1_700_000_600, 0).UTC() + stuckNs := now.Add(-10 * time.Minute).UnixNano() + records := []patterns.NCCLFRRecord{ + {Rank: 0, PgID: 1, CollectiveSeqID: 1, State: "started", TimeDiscoveredStartedNs: stuckNs}, + {Rank: 1, PgID: 1, CollectiveSeqID: 2, State: "started", TimeDiscoveredStartedNs: stuckNs}, + } + + verdicts := patterns.NCCLHangDetector{Now: now}.Evaluate(records) + require.Empty(t, verdicts, "cross-collective stragglers do not form a hang") +} + +// TestNCCLHangDetector_LaterCompletedRecordSupersedes asserts that if +// a rank has BOTH a started record on collective 42 AND a completed +// record on collective 42 (later in the ring buffer), the detector +// treats that rank as having progressed past 42. NCCL FlightRecorder +// emits a started entry then updates state to completed; in practice +// the receiver may forward both. The detector must take the latest +// state per (rank, collective_id) pair. +func TestNCCLHangDetector_LaterCompletedRecordSupersedes(t *testing.T) { + t.Parallel() + + now := time.Unix(1_700_000_600, 0).UTC() + startedNs := now.Add(-10 * time.Minute).UnixNano() + records := []patterns.NCCLFRRecord{ + // Rank 0: stuck, no completion. + {Rank: 0, PgID: 1, CollectiveSeqID: 42, State: "started", TimeDiscoveredStartedNs: startedNs}, + // Rank 1: started THEN completed in the same dump. + {Rank: 1, PgID: 1, CollectiveSeqID: 42, State: "started", TimeDiscoveredStartedNs: startedNs}, + {Rank: 1, PgID: 1, CollectiveSeqID: 42, State: "completed", TimeDiscoveredStartedNs: startedNs}, + // Rank 2: stuck. + {Rank: 2, PgID: 1, CollectiveSeqID: 42, State: "started", TimeDiscoveredStartedNs: startedNs}, + } + + verdicts := patterns.NCCLHangDetector{Now: now}.Evaluate(records) + require.Len(t, verdicts, 1) + require.ElementsMatch(t, []int64{0, 2}, verdicts[0].HangingRanks) +} + +// TestNCCLHangVerdict_SchemaConformance pins the NCCLHangVerdict +// JSON shape against testdata/nccl_hang_verdict.schema.json — a struct +// drift or schema loosening fails this test before it ships. +func TestNCCLHangVerdict_SchemaConformance(t *testing.T) { + t.Parallel() + + schemaPath := filepath.Join("testdata", "nccl_hang_verdict.schema.json") + schemaBytes, err := os.ReadFile(schemaPath) //nolint:gosec // schemaPath is a test-local relative path + require.NoError(t, err) + + compiler := jsonschema.NewCompiler() + var schemaDoc any + require.NoError(t, json.Unmarshal(schemaBytes, &schemaDoc)) + require.NoError(t, compiler.AddResource(schemaPath, schemaDoc)) + schema, err := compiler.Compile(schemaPath) + require.NoError(t, err) + + now := time.Unix(1_700_000_600, 0).UTC() + stuckNs := now.Add(-10 * time.Minute).UnixNano() + records := []patterns.NCCLFRRecord{ + {Rank: 0, PgID: 1, CollectiveSeqID: 42, State: "started", ProfilingName: "nccl:all_reduce", TimeDiscoveredStartedNs: stuckNs}, + {Rank: 1, PgID: 1, CollectiveSeqID: 42, State: "started", ProfilingName: "nccl:all_reduce", TimeDiscoveredStartedNs: stuckNs}, + } + verdicts := patterns.NCCLHangDetector{Now: now}.Evaluate(records) + require.Len(t, verdicts, 1) + + bs, err := json.Marshal(verdicts[0]) + require.NoError(t, err) + var decoded any + require.NoError(t, json.Unmarshal(bs, &decoded)) + require.NoError(t, schema.Validate(decoded), + "NCCLHangVerdict shape failed schema validation; struct drifted or schema needs updating") +} + +// TestNCCLHangVerdict_SchemaRejectsDrift mirrors the pod-evicted drift- +// rejection battery for nccl_hang — each row is a falsifier for one +// schema constraint; removing the constraint flips the row to PASS. +func TestNCCLHangVerdict_SchemaRejectsDrift(t *testing.T) { + t.Parallel() + + schemaPath := filepath.Join("testdata", "nccl_hang_verdict.schema.json") + schemaBytes, err := os.ReadFile(schemaPath) //nolint:gosec // schemaPath is a test-local relative path + require.NoError(t, err) + + compiler := jsonschema.NewCompiler() + var schemaDoc any + require.NoError(t, json.Unmarshal(schemaBytes, &schemaDoc)) + require.NoError(t, compiler.AddResource(schemaPath, schemaDoc)) + schema, err := compiler.Compile(schemaPath) + require.NoError(t, err) + + validEvidence := []any{ + map[string]any{"kind": "nccl_fr", "uid": "u1", "timestamp": "2026-05-18T10:00:00Z", "description": "d"}, + map[string]any{"kind": "nccl_fr", "uid": "u2", "timestamp": "2026-05-18T10:00:00Z", "description": "d"}, + } + base := func() map[string]any { + return map[string]any{ + "pattern.id": "15", + "headline": "x", + "remediation": "y", + "pg_id": 1, + "collective_seq_id": 42, + "hanging_ranks": []any{0, 1}, + "evidence_trail": validEvidence, + } + } + cases := []struct { + name string + mutate func(map[string]any) + guardName string + }{ + {"extra_top_level_field", func(m map[string]any) { m["future_field"] = "rejected" }, "additionalProperties:false"}, + {"confidence_reintroduced", func(m map[string]any) { m["confidence"] = "full" }, "additionalProperties:false rejects re-add"}, + {"single_rank_cohort", func(m map[string]any) { m["hanging_ranks"] = []any{0} }, "hanging_ranks minItems:2"}, + {"pattern_id_numeric_not_string", func(m map[string]any) { m["pattern.id"] = 15 }, "pattern.id string const"}, + {"evidence_kind_outside_enum", func(m map[string]any) { + m["evidence_trail"] = []any{map[string]any{ + "kind": "pod_event", "uid": "u", "timestamp": "2026-05-18T10:00:00Z", "description": "d", + }, map[string]any{ + "kind": "pod_event", "uid": "u2", "timestamp": "2026-05-18T10:00:00Z", "description": "d", + }} + }, "evidence_trail.kind enum"}, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + m := base() + tc.mutate(m) + require.Error(t, schema.Validate(m), + "schema must reject %s; guard %q regressed", tc.name, tc.guardName) + }) + } +} diff --git a/module/pkg/patterns/testdata/nccl_hang_verdict.schema.json b/module/pkg/patterns/testdata/nccl_hang_verdict.schema.json new file mode 100644 index 00000000..3de126c0 --- /dev/null +++ b/module/pkg/patterns/testdata/nccl_hang_verdict.schema.json @@ -0,0 +1,64 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://tracecore.ai/schemas/patterns/nccl_hang_verdict/v0", + "title": "NCCLHangVerdict", + "description": "v0.3.0 NORTHSTAR pattern #2 (nccl_hang) verdict shape. Pinned by TestNCCLHangVerdict_SchemaConformance.", + "type": "object", + "required": ["pattern.id", "headline", "remediation", "evidence_trail", "pg_id", "collective_seq_id", "hanging_ranks"], + "additionalProperties": false, + "properties": { + "pattern.id": { + "type": "string", + "const": "15", + "description": "nccl_hang pattern identifier; string-typed numeric uniform across pattern detectors." + }, + "headline": { + "type": "string", + "minLength": 1 + }, + "remediation": { + "type": "string", + "minLength": 1 + }, + "pg_id": { + "type": "integer" + }, + "collective_seq_id": { + "type": "integer" + }, + "hanging_ranks": { + "type": "array", + "minItems": 2, + "items": { + "type": "integer" + } + }, + "evidence_trail": { + "type": "array", + "minItems": 1, + "items": { + "type": "object", + "required": ["kind", "uid", "timestamp", "description"], + "additionalProperties": false, + "properties": { + "kind": { + "type": "string", + "enum": ["nccl_fr"] + }, + "uid": { + "type": "string", + "minLength": 1 + }, + "timestamp": { + "type": "string", + "format": "date-time" + }, + "description": { + "type": "string", + "minLength": 1 + } + } + } + } + } +} diff --git a/module/processor/patterndetectorprocessor/config.go b/module/processor/patterndetectorprocessor/config.go index 253b4dee..3b44e8c3 100644 --- a/module/processor/patterndetectorprocessor/config.go +++ b/module/processor/patterndetectorprocessor/config.go @@ -12,6 +12,11 @@ import ( // without reaching into the library's constants. const DefaultJoinWindow = 30 * time.Second +// DefaultNCCLHangThreshold mirrors patterns.DefaultHangThreshold — +// the wall-clock age past which a non-completed FR record counts as +// hung. Operators override at the YAML level via nccl_hang_threshold. +const DefaultNCCLHangThreshold = 5 * time.Minute + // Config is the operator-facing YAML for the patterndetector processor. type Config struct { // JoinWindow is the max gap between a node-pressure condition's @@ -27,6 +32,12 @@ type Config struct { // Pointer so the YAML default is distinguishable from an // operator-set `false`. EmitPartialVerdicts *bool `yaml:"emit_partial_verdicts,omitempty" mapstructure:"emit_partial_verdicts"` + + // NCCLHangThreshold is the wall-clock age past which a non- + // completed NCCL FR record counts as hung. Zero means use + // DefaultNCCLHangThreshold. Floor 1s — sub-1s thresholds chase + // normal long-tail collectives and produce noise. + NCCLHangThreshold time.Duration `yaml:"nccl_hang_threshold,omitempty" mapstructure:"nccl_hang_threshold"` } // Validate enforces operator-actionable rules. @@ -34,6 +45,9 @@ func (c *Config) Validate() error { if c.JoinWindow != 0 && c.JoinWindow < time.Second { return fmt.Errorf("join_window: must be >= 1s, got %s", c.JoinWindow) } + if c.NCCLHangThreshold != 0 && c.NCCLHangThreshold < time.Second { + return fmt.Errorf("nccl_hang_threshold: must be >= 1s, got %s", c.NCCLHangThreshold) + } return nil } @@ -42,6 +56,7 @@ func defaultConfig() *Config { return &Config{ JoinWindow: DefaultJoinWindow, EmitPartialVerdicts: &t, + NCCLHangThreshold: DefaultNCCLHangThreshold, } } @@ -54,6 +69,9 @@ func (c *Config) withDefaults() *Config { t := true out.EmitPartialVerdicts = &t } + if out.NCCLHangThreshold == 0 { + out.NCCLHangThreshold = DefaultNCCLHangThreshold + } return &out } diff --git a/module/processor/patterndetectorprocessor/nccl_hang_test.go b/module/processor/patterndetectorprocessor/nccl_hang_test.go new file mode 100644 index 00000000..867ae299 --- /dev/null +++ b/module/processor/patterndetectorprocessor/nccl_hang_test.go @@ -0,0 +1,174 @@ +// SPDX-License-Identifier: Apache-2.0 + +package patterndetectorprocessor + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + + "github.com/tracecoreai/tracecore/module/pkg/patterns" +) + +// TestPatternDetector_NCCLHangWiringEmitsVerdict pins the wiring +// contract: when N>=2 NCCL FR records arrive on the same +// collective_seq_id past the hang threshold, the processor emits one +// nccl_hang verdict alongside any pod_evicted verdict path. +func TestPatternDetector_NCCLHangWiringEmitsVerdict(t *testing.T) { + t.Parallel() + + now := time.Now().UTC() + stuckTime := now.Add(-10 * time.Minute) + ld := plog.NewLogs() + for _, rank := range []int64{0, 1, 2} { + rl := ld.ResourceLogs().AppendEmpty() + rl.Resource().Attributes().PutStr("k8s.pod.name", "training-rank-"+itoa(rank)) + sl := rl.ScopeLogs().AppendEmpty() + lr := sl.LogRecords().AppendEmpty() + lr.SetTimestamp(pcommon.NewTimestampFromTime(stuckTime)) + a := lr.Attributes() + a.PutInt("gen_ai.training.rank", rank) + a.PutInt("nccl.fr.pg_id", 1) + a.PutInt("nccl.fr.collective_seq_id", 42) + a.PutStr("nccl.fr.profiling_name", "nccl:all_reduce") + a.PutStr("nccl.fr.state", "started") + a.PutInt("nccl.fr.time_discovered_started_ns", stuckTime.UnixNano()) + } + + sink := newLogsSink() + cfg := defaultConfig() + p := newProcessor(testSettings(), cfg, sink) + require.NoError(t, p.Start(context.Background(), componenttestHost{})) + t.Cleanup(func() { _ = p.Shutdown(context.Background()) }) + + require.NoError(t, p.ConsumeLogs(context.Background(), ld)) + + verdicts := extractNCCLHangVerdicts(t, sink.at(0)) + require.Len(t, verdicts, 1) + v := verdicts[0] + require.Equal(t, patterns.PatternIDNCCLHang, v.PatternID) + require.Equal(t, int64(42), v.CollectiveSeqID) + require.ElementsMatch(t, []int64{0, 1, 2}, v.HangingRanks) +} + +// TestPatternDetector_NCCLHangWiringHealthy pins the negative path: +// records with state=completed must NOT trigger an nccl_hang verdict. +func TestPatternDetector_NCCLHangWiringHealthy(t *testing.T) { + t.Parallel() + + now := time.Now().UTC() + stuckTime := now.Add(-10 * time.Minute) + completedTime := now.Add(-9 * time.Minute) + ld := plog.NewLogs() + for _, rank := range []int64{0, 1, 2} { + rl := ld.ResourceLogs().AppendEmpty() + rl.Resource().Attributes().PutStr("k8s.pod.name", "training-rank-"+itoa(rank)) + sl := rl.ScopeLogs().AppendEmpty() + lr := sl.LogRecords().AppendEmpty() + lr.SetTimestamp(pcommon.NewTimestampFromTime(completedTime)) + a := lr.Attributes() + a.PutInt("gen_ai.training.rank", rank) + a.PutInt("nccl.fr.pg_id", 1) + a.PutInt("nccl.fr.collective_seq_id", 42) + a.PutStr("nccl.fr.state", "completed") + a.PutInt("nccl.fr.time_discovered_started_ns", stuckTime.UnixNano()) + a.PutInt("nccl.fr.time_discovered_completed_ns", completedTime.UnixNano()) + } + + sink := newLogsSink() + p := newProcessor(testSettings(), defaultConfig(), sink) + require.NoError(t, p.Start(context.Background(), componenttestHost{})) + t.Cleanup(func() { _ = p.Shutdown(context.Background()) }) + + require.NoError(t, p.ConsumeLogs(context.Background(), ld)) + verdicts := extractNCCLHangVerdicts(t, sink.at(0)) + require.Empty(t, verdicts) +} + +// TestPatternDetector_NCCLHangThresholdConfigurable asserts the +// processor surfaces the detector's HangThreshold via YAML config. +func TestPatternDetector_NCCLHangThresholdConfigurable(t *testing.T) { + t.Parallel() + + now := time.Now().UTC() + stuckTime := now.Add(-90 * time.Second) // under default 5min + ld := plog.NewLogs() + for _, rank := range []int64{0, 1} { + rl := ld.ResourceLogs().AppendEmpty() + rl.Resource().Attributes().PutStr("k8s.pod.name", "training-rank-"+itoa(rank)) + sl := rl.ScopeLogs().AppendEmpty() + lr := sl.LogRecords().AppendEmpty() + lr.SetTimestamp(pcommon.NewTimestampFromTime(stuckTime)) + a := lr.Attributes() + a.PutInt("gen_ai.training.rank", rank) + a.PutInt("nccl.fr.pg_id", 1) + a.PutInt("nccl.fr.collective_seq_id", 7) + a.PutStr("nccl.fr.state", "started") + a.PutInt("nccl.fr.time_discovered_started_ns", stuckTime.UnixNano()) + } + + // Tightened threshold (1min): hang fires. + sink := newLogsSink() + cfg := &Config{NCCLHangThreshold: time.Minute} + p := newProcessor(testSettings(), cfg, sink) + require.NoError(t, p.Start(context.Background(), componenttestHost{})) + t.Cleanup(func() { _ = p.Shutdown(context.Background()) }) + require.NoError(t, p.ConsumeLogs(context.Background(), ld)) + require.Len(t, extractNCCLHangVerdicts(t, sink.at(0)), 1) +} + +// extractNCCLHangVerdicts walks the output plog.Logs and decodes the +// nccl_hang verdict JSON attribute on each verdict-shaped log record. +func extractNCCLHangVerdicts(t *testing.T, ld plog.Logs) []patterns.NCCLHangVerdict { + t.Helper() + out := []patterns.NCCLHangVerdict{} + for i := 0; i < ld.ResourceLogs().Len(); i++ { + rl := ld.ResourceLogs().At(i) + for j := 0; j < rl.ScopeLogs().Len(); j++ { + sl := rl.ScopeLogs().At(j) + if sl.Scope().Name() != instrumentationScope { + continue + } + for k := 0; k < sl.LogRecords().Len(); k++ { + lr := sl.LogRecords().At(k) + patternID, ok := lr.Attributes().Get(VerdictAttrPatternID) + if !ok || patternID.AsString() != patterns.PatternIDNCCLHang { + continue + } + js, ok := lr.Attributes().Get(VerdictAttrVerdictJSON) + if !ok { + continue + } + var v patterns.NCCLHangVerdict + require.NoError(t, json.Unmarshal([]byte(js.AsString()), &v)) + out = append(out, v) + } + } + } + return out +} + +func itoa(i int64) string { + if i == 0 { + return "0" + } + neg := false + if i < 0 { + neg = true + i = -i + } + var b []byte + for i > 0 { + b = append([]byte{byte('0' + i%10)}, b...) + i /= 10 + } + if neg { + b = append([]byte{'-'}, b...) + } + return string(b) +} diff --git a/module/processor/patterndetectorprocessor/patterndetector.go b/module/processor/patterndetectorprocessor/patterndetector.go index 6c83be71..d680f032 100644 --- a/module/processor/patterndetectorprocessor/patterndetector.go +++ b/module/processor/patterndetectorprocessor/patterndetector.go @@ -92,7 +92,7 @@ func (p *patterndetectorProcessor) Capabilities() consumer.Capabilities { // typed model, runs the configured detectors, and appends one verdict // record per match before forwarding downstream. func (p *patterndetectorProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error { - events, nodeConds := collectInputs(ld) + events, nodeConds, ncclRecs := collectInputs(ld) det := patterns.PodEvictedDetector{JoinWindow: p.cfg.JoinWindow} verdicts := det.Evaluate(events, nodeConds) @@ -103,6 +103,11 @@ func (p *patterndetectorProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs appendVerdict(ld, v, p.logger()) } + ncclDet := patterns.NCCLHangDetector{HangThreshold: p.cfg.NCCLHangThreshold} + for _, v := range ncclDet.Evaluate(ncclRecs) { + appendNCCLHangVerdict(ld, v, p.logger()) + } + if err := p.next.ConsumeLogs(ctx, ld); err != nil { return fmt.Errorf("patterndetector: next.ConsumeLogs: %w", err) } @@ -110,22 +115,27 @@ func (p *patterndetectorProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs } // collectInputs walks the incoming plog.Logs and projects each log -// record into either a patterns.Record (pod event) or a -// patterns.NodeRecord (node-condition transition). Records that look -// like neither are skipped — the processor is a NO-OP for them. +// record into one of three typed shapes: patterns.Record (pod event), +// patterns.NodeRecord (node-condition transition), or +// patterns.NCCLFRRecord (NCCL FlightRecorder entry). Records that +// look like none are skipped — the processor is a NO-OP for them. // // Classification is structural: // - pod events carry `k8s.event.hint` (any value); the projection // decodes that hint into patterns.Hint. // - node-condition records carry `k8s.node.name` AND // `k8s.node.condition.pressure` (no hint required). +// - NCCL FR records carry `nccl.fr.collective_seq_id` AND a rank +// attribute (gen_ai.training.rank / nccl.rank / nccl.fr.rank). // -// Records that match both classes (rare in practice — a Node-typed -// Event with a pod_evicted hint would be malformed upstream) are -// projected as pod events and the node-condition projection is skipped. -func collectInputs(ld plog.Logs) ([]patterns.Record, []patterns.NodeRecord) { +// Records that match multiple classes are projected as the first one +// matched (pod_event > node_condition > nccl_fr) — the priority +// reflects that a malformed upstream record is rarer than a real +// classification overlap. +func collectInputs(ld plog.Logs) ([]patterns.Record, []patterns.NodeRecord, []patterns.NCCLFRRecord) { var events []patterns.Record var nodes []patterns.NodeRecord + var nccl []patterns.NCCLFRRecord for i := 0; i < ld.ResourceLogs().Len(); i++ { rl := ld.ResourceLogs().At(i) resAttrs := rl.Resource().Attributes() @@ -139,11 +149,57 @@ func collectInputs(ld plog.Logs) ([]patterns.Record, []patterns.NodeRecord) { } if rec, ok := projectNodeCondition(lr, resAttrs); ok { nodes = append(nodes, rec) + continue + } + if rec, ok := projectNCCLFRRecord(lr); ok { + nccl = append(nccl, rec) } } } } - return events, nodes + return events, nodes, nccl +} + +// projectNCCLFRRecord reads OTel attributes off a log record and +// builds a patterns.NCCLFRRecord. The projection's gate is the +// presence of BOTH a rank attribute AND `nccl.fr.collective_seq_id` +// — those two together name a per-rank-per-collective FR entry. +// +// Rank lookup mirrors the rankjoinprocessor vocabulary: +// gen_ai.training.rank (M19-canonical), with nccl.rank / +// nccl.fr.rank fallbacks for direct-from-receiver paths. +func projectNCCLFRRecord(lr plog.LogRecord) (patterns.NCCLFRRecord, bool) { + attrs := lr.Attributes() + var rank int64 + hasRank := false + for _, key := range []string{"gen_ai.training.rank", "nccl.rank", "nccl.fr.rank"} { + if v, ok := attrs.Get(key); ok { + rank = v.Int() + hasRank = true + break + } + } + collective, hasCollective := attrs.Get("nccl.fr.collective_seq_id") + if !hasRank || !hasCollective { + return patterns.NCCLFRRecord{}, false + } + r := patterns.NCCLFRRecord{ + Rank: rank, + CollectiveSeqID: collective.Int(), + } + if v, ok := attrs.Get("nccl.fr.pg_id"); ok { + r.PgID = v.Int() + } + if v, ok := attrs.Get("nccl.fr.state"); ok { + r.State = v.AsString() + } + if v, ok := attrs.Get("nccl.fr.profiling_name"); ok { + r.ProfilingName = v.AsString() + } + if v, ok := attrs.Get("nccl.fr.time_discovered_started_ns"); ok { + r.TimeDiscoveredStartedNs = v.Int() + } + return r, true } // projectPodEvent reads OTel attributes off a log record and builds a @@ -290,3 +346,37 @@ func evidenceTimestamp(v patterns.PodEvictedVerdict) time.Time { } return time.Time{} } + +// appendNCCLHangVerdict adds a verdict log record for an nccl_hang +// match. Mirrors appendVerdict's wire-format contract — broken-out +// scalar attrs plus full pattern.verdict_json — so downstream +// consumers don't branch on pattern.id to find headline/remediation. +func appendNCCLHangVerdict(ld plog.Logs, v patterns.NCCLHangVerdict, logger *zap.Logger) { + rl := ld.ResourceLogs().AppendEmpty() + sl := rl.ScopeLogs().AppendEmpty() + sl.Scope().SetName(instrumentationScope) + lr := sl.LogRecords().AppendEmpty() + lr.Body().SetStr(v.Headline) + now := time.Now() + lr.SetObservedTimestamp(pcommon.NewTimestampFromTime(now)) + if t := ncclHangEvidenceTimestamp(v); !t.IsZero() { + lr.SetTimestamp(pcommon.NewTimestampFromTime(t)) + } else { + lr.SetTimestamp(pcommon.NewTimestampFromTime(now)) + } + lr.Attributes().PutStr(VerdictAttrPatternID, v.PatternID) + lr.Attributes().PutStr(VerdictAttrHeadline, v.Headline) + lr.Attributes().PutStr(VerdictAttrRemediation, v.Remediation) + if b, err := json.Marshal(v); err == nil { + lr.Attributes().PutStr(VerdictAttrVerdictJSON, string(b)) + } else if logger != nil { + logger.Warn("patterndetector: failed to marshal nccl_hang verdict JSON; broken-out attrs still emit", zap.Error(err)) + } +} + +func ncclHangEvidenceTimestamp(v patterns.NCCLHangVerdict) time.Time { + if len(v.EvidenceTrail) > 0 { + return v.EvidenceTrail[0].Timestamp + } + return time.Time{} +}