From e24e72e624ba74ce69a68c7e858c5698ce28adb7 Mon Sep 17 00:00:00 2001 From: Tri Lam Date: Mon, 1 Jun 2026 01:26:52 -0700 Subject: [PATCH 1/5] =?UTF-8?q?test(nccl-boot):=20RED=20=E2=80=94=20patter?= =?UTF-8?q?n-9=20bootstrap-timeout=20detector=20tests=20+=20schema?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Tri Lam --- module/pkg/patterns/nccl_bootstrap_test.go | 453 ++++++++++++++++++ .../nccl_bootstrap_verdict.schema.json | 96 ++++ 2 files changed, 549 insertions(+) create mode 100644 module/pkg/patterns/nccl_bootstrap_test.go create mode 100644 module/pkg/patterns/testdata/nccl_bootstrap_verdict.schema.json diff --git a/module/pkg/patterns/nccl_bootstrap_test.go b/module/pkg/patterns/nccl_bootstrap_test.go new file mode 100644 index 00000000..4131f13d --- /dev/null +++ b/module/pkg/patterns/nccl_bootstrap_test.go @@ -0,0 +1,453 @@ +// SPDX-License-Identifier: Apache-2.0 + +package patterns_test + +import ( + "encoding/json" + "os" + "path/filepath" + "regexp" + "testing" + "time" + + "github.com/santhosh-tekuri/jsonschema/v6" + "github.com/stretchr/testify/require" + + "github.com/tracecoreai/tracecore/module/pkg/patterns" +) + +// nccl_bootstrap detector test suite (NORTHSTAR pattern #9). The +// detector reads: +// - TrainingPodRecord (one per training-pod that became Ready) — +// carries the namespace, job id (or fallback grouping key), rank, +// and pod-ready timestamp. +// - NCCLFRRecord (NCCL FlightRecorder entries; reused from +// nccl_hang.go) — a rank that never emitted ANY FR record past +// BootstrapDeadline is a bootstrap-failed rank. +// - CNINetworkEventRecord (one per K8s event with reason in +// {FailedCreatePodSandBox, NetworkNotReady, CNIError}) — joined +// against the cohort namespace to promote a partial verdict to +// full and to stamp the discriminator. +// +// Spec: docs/patterns/09-nccl-bootstrap-timeout.md §"Detector +// evaluation rule". + +// TestNCCLBootstrapDetector_FullCorrelationFires pins the canonical +// pattern-#9 trigger: a training cohort whose pod-ready timestamps are +// all older than BootstrapDeadline, at least one rank with no NCCL FR +// record, and a same-namespace CNI / network-readiness event in window +// emits exactly one confidence=full verdict with discriminator=cni_error. +func TestNCCLBootstrapDetector_FullCorrelationFires(t *testing.T) { + t.Parallel() + + now := mustParseTime(t, "2026-06-01T10:00:00Z") + readyAt := now.Add(-10 * time.Minute) // way past default 5min deadline + + pods := []patterns.TrainingPodRecord{ + {JobID: "job-llama-70b", Namespace: "training", PodName: "trainer-rank-0", Rank: 0, Node: "gpu-node-0001", ReadyAt: readyAt}, + {JobID: "job-llama-70b", Namespace: "training", PodName: "trainer-rank-1", Rank: 1, Node: "gpu-node-0002", ReadyAt: readyAt}, + {JobID: "job-llama-70b", Namespace: "training", PodName: "trainer-rank-2", Rank: 2, Node: "gpu-node-0003", ReadyAt: readyAt}, + {JobID: "job-llama-70b", Namespace: "training", PodName: "trainer-rank-3", Rank: 3, Node: "gpu-node-0004", ReadyAt: readyAt}, + } + // No NCCL FR records arrived for any rank — every rank is bootstrap- + // failed. + var ncclRecs []patterns.NCCLFRRecord + cniEvents := []patterns.CNINetworkEventRecord{ + { + EventUID: "ev-cni-1", + Namespace: "training", + PodName: "trainer-rank-0", + Reason: "FailedCreatePodSandBox", + Note: "failed to set up sandbox container ... network not ready", + Timestamp: readyAt.Add(30 * time.Second), + }, + } + + d := patterns.NCCLBootstrapDetector{Now: now} + verdicts := d.Evaluate(pods, ncclRecs, cniEvents) + require.Len(t, verdicts, 1) + + v := verdicts[0] + require.Equal(t, patterns.PatternIDNCCLBootstrap, v.PatternID) + require.Equal(t, patterns.ConfidenceFull, v.Confidence) + require.Equal(t, "training", v.Namespace) + require.Equal(t, "job-llama-70b", v.JobID) + require.Equal(t, 4, v.CohortSize) + require.Equal(t, 4, v.FailedRankCount) + require.Equal(t, patterns.NCCLBootstrapDiscriminatorCNIError, v.Discriminator) + require.Equal(t, []int64{0, 1, 2, 3}, v.FailedRanks, "every rank failed bootstrap") + + require.Len(t, v.EvidenceTrail, 2, "trail = training-pod cohort + CNI event") + require.Equal(t, patterns.EvidenceKindTrainingPod, v.EvidenceTrail[0].Kind) + require.Equal(t, patterns.EvidenceKindCNINetworkEvent, v.EvidenceTrail[1].Kind) + + require.Regexp(t, regexp.MustCompile(`(?i)NCCL|bootstrap`), v.Headline) + require.Regexp(t, regexp.MustCompile(`(?i)NCCL_SOCKET_IFNAME|init-container|NetworkAttachmentDefinition`), v.Remediation, + "remediation must point at the spec's three remediation hooks") +} + +// TestNCCLBootstrapDetector_PartialWhenAllRanksFailedNoNetworkEvent +// pins the partial-confidence branch from the spec: every rank in the +// cohort failed bootstrap but no CNI / network event joined → partial +// verdict (discriminator=unknown). +func TestNCCLBootstrapDetector_PartialWhenAllRanksFailedNoNetworkEvent(t *testing.T) { + t.Parallel() + + now := mustParseTime(t, "2026-06-01T10:00:00Z") + readyAt := now.Add(-10 * time.Minute) + + pods := []patterns.TrainingPodRecord{ + {JobID: "job-bert", Namespace: "training", PodName: "trainer-rank-0", Rank: 0, Node: "gpu-node-0001", ReadyAt: readyAt}, + {JobID: "job-bert", Namespace: "training", PodName: "trainer-rank-1", Rank: 1, Node: "gpu-node-0002", ReadyAt: readyAt}, + } + verdicts := patterns.NCCLBootstrapDetector{Now: now}.Evaluate(pods, nil, nil) + require.Len(t, verdicts, 1) + require.Equal(t, patterns.ConfidencePartial, verdicts[0].Confidence) + require.Equal(t, patterns.NCCLBootstrapDiscriminatorUnknown, verdicts[0].Discriminator) + require.Equal(t, []string{patterns.EvidenceKindCNINetworkEvent}, verdicts[0].MissingLayers) +} + +// TestNCCLBootstrapDetector_NormalStartupNoFire pins the false-positive +// guard: a cohort where every rank emitted an NCCL FR record (normal +// successful bootstrap) MUST NOT emit a verdict — even with a stale +// CNI event lying around in the namespace. +func TestNCCLBootstrapDetector_NormalStartupNoFire(t *testing.T) { + t.Parallel() + + now := mustParseTime(t, "2026-06-01T10:00:00Z") + readyAt := now.Add(-10 * time.Minute) + frStarted := readyAt.Add(30 * time.Second).UnixNano() + + pods := []patterns.TrainingPodRecord{ + {JobID: "job-ok", Namespace: "training", PodName: "trainer-rank-0", Rank: 0, Node: "gpu-node-0001", ReadyAt: readyAt}, + {JobID: "job-ok", Namespace: "training", PodName: "trainer-rank-1", Rank: 1, Node: "gpu-node-0002", ReadyAt: readyAt}, + } + // Every rank emitted at least one FR record — bootstrap succeeded. + nccl := []patterns.NCCLFRRecord{ + {Rank: 0, Node: "gpu-node-0001", PgID: 1, CollectiveSeqID: 0, State: "completed", TimeDiscoveredStartedNs: frStarted}, + {Rank: 1, Node: "gpu-node-0002", PgID: 1, CollectiveSeqID: 0, State: "completed", TimeDiscoveredStartedNs: frStarted}, + } + // Stale CNI event in the namespace must not flip the verdict. + cni := []patterns.CNINetworkEventRecord{ + {EventUID: "stale", Namespace: "training", Reason: "FailedCreatePodSandBox", Timestamp: readyAt}, + } + require.Empty(t, patterns.NCCLBootstrapDetector{Now: now}.Evaluate(pods, nccl, cni), + "normal-startup cohort with FR records on every rank MUST NOT fire") +} + +// TestNCCLBootstrapDetector_DeadlineNotYetReached pins the deadline +// gate: a cohort whose pod-ready timestamps are within BootstrapDeadline +// of Now is still in its normal startup window and MUST NOT emit a +// verdict. +func TestNCCLBootstrapDetector_DeadlineNotYetReached(t *testing.T) { + t.Parallel() + + now := mustParseTime(t, "2026-06-01T10:00:00Z") + readyAt := now.Add(-2 * time.Minute) // under default 5min deadline + + pods := []patterns.TrainingPodRecord{ + {JobID: "job-young", Namespace: "training", PodName: "trainer-rank-0", Rank: 0, Node: "gpu-node-0001", ReadyAt: readyAt}, + {JobID: "job-young", Namespace: "training", PodName: "trainer-rank-1", Rank: 1, Node: "gpu-node-0002", ReadyAt: readyAt}, + } + // No FR records yet — but pods are still inside the deadline. + require.Empty(t, patterns.NCCLBootstrapDetector{Now: now}.Evaluate(pods, nil, nil), + "cohort younger than BootstrapDeadline MUST NOT fire") +} + +// TestNCCLBootstrapDetector_PartialFailureCohort pins the +// heterogeneous-failure path from the spec: some ranks bootstrap fine +// (have FR records) and others don't, AND a CNI / network event is +// present → confidence=full (this is the discriminator that separates +// pattern #9 from pattern #8 — network event present routes here). +func TestNCCLBootstrapDetector_PartialFailureCohort(t *testing.T) { + t.Parallel() + + now := mustParseTime(t, "2026-06-01T10:00:00Z") + readyAt := now.Add(-10 * time.Minute) + frStarted := readyAt.Add(30 * time.Second).UnixNano() + + pods := []patterns.TrainingPodRecord{ + {JobID: "job-mix", Namespace: "training", PodName: "trainer-rank-0", Rank: 0, Node: "gpu-node-0001", ReadyAt: readyAt}, + {JobID: "job-mix", Namespace: "training", PodName: "trainer-rank-1", Rank: 1, Node: "gpu-node-0002", ReadyAt: readyAt}, + {JobID: "job-mix", Namespace: "training", PodName: "trainer-rank-2", Rank: 2, Node: "gpu-node-0003", ReadyAt: readyAt}, + {JobID: "job-mix", Namespace: "training", PodName: "trainer-rank-3", Rank: 3, Node: "gpu-node-0004", ReadyAt: readyAt}, + } + // Ranks 0 and 1 bootstrapped; ranks 2 and 3 didn't. + nccl := []patterns.NCCLFRRecord{ + {Rank: 0, Node: "gpu-node-0001", PgID: 1, CollectiveSeqID: 0, State: "started", TimeDiscoveredStartedNs: frStarted}, + {Rank: 1, Node: "gpu-node-0002", PgID: 1, CollectiveSeqID: 0, State: "started", TimeDiscoveredStartedNs: frStarted}, + } + cni := []patterns.CNINetworkEventRecord{ + {EventUID: "ev-1", Namespace: "training", PodName: "trainer-rank-2", Reason: "NetworkNotReady", Timestamp: readyAt.Add(10 * time.Second)}, + } + verdicts := patterns.NCCLBootstrapDetector{Now: now}.Evaluate(pods, nccl, cni) + require.Len(t, verdicts, 1) + + v := verdicts[0] + require.Equal(t, patterns.ConfidenceFull, v.Confidence) + require.Equal(t, 4, v.CohortSize) + require.Equal(t, 2, v.FailedRankCount) + require.Equal(t, []int64{2, 3}, v.FailedRanks) + require.Equal(t, patterns.NCCLBootstrapDiscriminatorCNIError, v.Discriminator) +} + +// TestNCCLBootstrapDetector_DifferentJobsDoNotMerge pins the cohort +// grouping rule: two distinct training jobs in the same namespace, both +// stuck in bootstrap, emit two separate verdicts — not one combined. +func TestNCCLBootstrapDetector_DifferentJobsDoNotMerge(t *testing.T) { + t.Parallel() + + now := mustParseTime(t, "2026-06-01T10:00:00Z") + readyAt := now.Add(-10 * time.Minute) + + pods := []patterns.TrainingPodRecord{ + // job-a: stuck. + {JobID: "job-a", Namespace: "training", PodName: "a-rank-0", Rank: 0, Node: "n0", ReadyAt: readyAt}, + {JobID: "job-a", Namespace: "training", PodName: "a-rank-1", Rank: 1, Node: "n1", ReadyAt: readyAt}, + // job-b: also stuck, different cohort. + {JobID: "job-b", Namespace: "training", PodName: "b-rank-0", Rank: 0, Node: "n2", ReadyAt: readyAt}, + {JobID: "job-b", Namespace: "training", PodName: "b-rank-1", Rank: 1, Node: "n3", ReadyAt: readyAt}, + } + verdicts := patterns.NCCLBootstrapDetector{Now: now}.Evaluate(pods, nil, nil) + require.Len(t, verdicts, 2, "two distinct jobs → two distinct verdicts") + + jobs := []string{verdicts[0].JobID, verdicts[1].JobID} + require.ElementsMatch(t, []string{"job-a", "job-b"}, jobs) + for _, v := range verdicts { + require.Equal(t, 2, v.CohortSize, "each cohort holds its own ranks") + } +} + +// TestNCCLBootstrapDetector_NamespaceFallbackGrouping pins the spec's +// "open question #1" fallback: when no `gen_ai.training.job_id` is +// stamped (the attribute is still alpha per ATTRIBUTES.md), the +// detector groups pods by namespace alone and the verdict's JobID is +// empty. +func TestNCCLBootstrapDetector_NamespaceFallbackGrouping(t *testing.T) { + t.Parallel() + + now := mustParseTime(t, "2026-06-01T10:00:00Z") + readyAt := now.Add(-10 * time.Minute) + + pods := []patterns.TrainingPodRecord{ + {JobID: "", Namespace: "training", PodName: "trainer-rank-0", Rank: 0, Node: "n0", ReadyAt: readyAt}, + {JobID: "", Namespace: "training", PodName: "trainer-rank-1", Rank: 1, Node: "n1", ReadyAt: readyAt}, + } + verdicts := patterns.NCCLBootstrapDetector{Now: now}.Evaluate(pods, nil, nil) + require.Len(t, verdicts, 1, "namespace-only fallback must still cohort the pods") + require.Empty(t, verdicts[0].JobID, "empty job id signals fallback path") + require.Equal(t, "training", verdicts[0].Namespace) +} + +// TestNCCLBootstrapDetector_CNIEventDifferentNamespaceDoesNotJoin pins +// the namespace-scope rule for the join: a CNI event in a DIFFERENT +// namespace must NOT promote the cohort's partial verdict to full. +func TestNCCLBootstrapDetector_CNIEventDifferentNamespaceDoesNotJoin(t *testing.T) { + t.Parallel() + + now := mustParseTime(t, "2026-06-01T10:00:00Z") + readyAt := now.Add(-10 * time.Minute) + + pods := []patterns.TrainingPodRecord{ + {JobID: "job-x", Namespace: "training", PodName: "trainer-rank-0", Rank: 0, Node: "n0", ReadyAt: readyAt}, + {JobID: "job-x", Namespace: "training", PodName: "trainer-rank-1", Rank: 1, Node: "n1", ReadyAt: readyAt}, + } + cni := []patterns.CNINetworkEventRecord{ + // CNI event in a UNRELATED namespace. + {EventUID: "ev-other", Namespace: "kube-system", Reason: "CNIError", Timestamp: readyAt.Add(10 * time.Second)}, + } + verdicts := patterns.NCCLBootstrapDetector{Now: now}.Evaluate(pods, nil, cni) + require.Len(t, verdicts, 1) + require.Equal(t, patterns.ConfidencePartial, verdicts[0].Confidence, + "CNI event in different namespace MUST NOT promote to full") +} + +// TestNCCLBootstrapDetector_DeadlineConfigurable pins the operator +// escape hatch: a tighter BootstrapDeadline (1min) emits when the +// default (5min) does not. +func TestNCCLBootstrapDetector_DeadlineConfigurable(t *testing.T) { + t.Parallel() + + now := mustParseTime(t, "2026-06-01T10:00:00Z") + readyAt := now.Add(-2 * time.Minute) // under default 5m, over 1m + + pods := []patterns.TrainingPodRecord{ + {JobID: "job-fast", Namespace: "training", PodName: "trainer-rank-0", Rank: 0, Node: "n0", ReadyAt: readyAt}, + {JobID: "job-fast", Namespace: "training", PodName: "trainer-rank-1", Rank: 1, Node: "n1", ReadyAt: readyAt}, + } + + // Default 5min: no fire. + require.Empty(t, patterns.NCCLBootstrapDetector{Now: now}.Evaluate(pods, nil, nil)) + + // Tightened 1min: fires. + v := patterns.NCCLBootstrapDetector{Now: now, BootstrapDeadline: time.Minute}.Evaluate(pods, nil, nil) + require.Len(t, v, 1) +} + +// TestNCCLBootstrapDetector_DeterministicOrdering pins the sort order: +// multiple cohort verdicts arrive sorted by (namespace, job id) so +// golden tests are stable across fixture iteration order. +func TestNCCLBootstrapDetector_DeterministicOrdering(t *testing.T) { + t.Parallel() + + now := mustParseTime(t, "2026-06-01T10:00:00Z") + readyAt := now.Add(-10 * time.Minute) + + pods := []patterns.TrainingPodRecord{ + // job-z in zeta namespace. + {JobID: "job-z", Namespace: "zeta", PodName: "z-rank-0", Rank: 0, Node: "n", ReadyAt: readyAt}, + {JobID: "job-z", Namespace: "zeta", PodName: "z-rank-1", Rank: 1, Node: "n", ReadyAt: readyAt}, + // job-a in alpha namespace — sorts first. + {JobID: "job-a", Namespace: "alpha", PodName: "a-rank-0", Rank: 0, Node: "n", ReadyAt: readyAt}, + {JobID: "job-a", Namespace: "alpha", PodName: "a-rank-1", Rank: 1, Node: "n", ReadyAt: readyAt}, + } + verdicts := patterns.NCCLBootstrapDetector{Now: now}.Evaluate(pods, nil, nil) + require.Len(t, verdicts, 2) + require.Equal(t, "alpha", verdicts[0].Namespace, "alphabetically-first namespace sorts first") + require.Equal(t, "zeta", verdicts[1].Namespace) +} + +// TestNCCLBootstrapDetector_MaxPodReadyDrivesAge pins the spec's +// pod_ready_time = MAX over the cohort rule: one rank that became Ready +// recently (under deadline) bumps the cohort's effective ready +// timestamp forward and suppresses the verdict — bootstrap age must be +// measured from the LAST pod to become Ready, not the first. +func TestNCCLBootstrapDetector_MaxPodReadyDrivesAge(t *testing.T) { + t.Parallel() + + now := mustParseTime(t, "2026-06-01T10:00:00Z") + + pods := []patterns.TrainingPodRecord{ + // Older ranks: 10 min ago. + {JobID: "job-slow", Namespace: "training", PodName: "trainer-rank-0", Rank: 0, Node: "n0", ReadyAt: now.Add(-10 * time.Minute)}, + {JobID: "job-slow", Namespace: "training", PodName: "trainer-rank-1", Rank: 1, Node: "n1", ReadyAt: now.Add(-10 * time.Minute)}, + // One late rank: just 2min ago — should bump effective ready to + // 2min, below the 5min default deadline. + {JobID: "job-slow", Namespace: "training", PodName: "trainer-rank-2", Rank: 2, Node: "n2", ReadyAt: now.Add(-2 * time.Minute)}, + } + require.Empty(t, patterns.NCCLBootstrapDetector{Now: now}.Evaluate(pods, nil, nil), + "max(ReadyAt) must drive the deadline; the late rank suppresses the verdict") +} + +// TestNCCLBootstrapVerdict_SchemaConformance pins the +// NCCLBootstrapTimeoutVerdict JSON shape against testdata/ +// nccl_bootstrap_verdict.schema.json. Struct drift or schema loosening +// fails this test before it ships. +func TestNCCLBootstrapVerdict_SchemaConformance(t *testing.T) { + t.Parallel() + + schemaPath := filepath.Join("testdata", "nccl_bootstrap_verdict.schema.json") + schemaBytes, err := os.ReadFile(schemaPath) //nolint:gosec // schemaPath is a test-local relative path + require.NoError(t, err) + + compiler := jsonschema.NewCompiler() + var schemaDoc any + require.NoError(t, json.Unmarshal(schemaBytes, &schemaDoc)) + require.NoError(t, compiler.AddResource(schemaPath, schemaDoc)) + schema, err := compiler.Compile(schemaPath) + require.NoError(t, err) + + now := mustParseTime(t, "2026-06-01T10:00:00Z") + readyAt := now.Add(-10 * time.Minute) + pods := []patterns.TrainingPodRecord{ + {JobID: "job-llama", Namespace: "training", PodName: "trainer-rank-0", Rank: 0, Node: "gpu-node-0001", ReadyAt: readyAt}, + {JobID: "job-llama", Namespace: "training", PodName: "trainer-rank-1", Rank: 1, Node: "gpu-node-0002", ReadyAt: readyAt}, + } + cni := []patterns.CNINetworkEventRecord{ + {EventUID: "ev-1", Namespace: "training", PodName: "trainer-rank-0", Reason: "FailedCreatePodSandBox", Timestamp: readyAt.Add(30 * time.Second)}, + } + verdicts := patterns.NCCLBootstrapDetector{Now: now}.Evaluate(pods, nil, cni) + require.Len(t, verdicts, 1) + + bs, err := json.Marshal(verdicts[0]) + require.NoError(t, err) + var decoded any + require.NoError(t, json.Unmarshal(bs, &decoded)) + require.NoError(t, schema.Validate(decoded), + "NCCLBootstrapTimeoutVerdict shape failed schema validation; struct drifted or schema needs updating") +} + +// TestNCCLBootstrapVerdict_SchemaRejectsDrift is the drift-rejection +// battery — each row is a falsifier for one schema constraint; +// removing the constraint flips the row to PASS. +func TestNCCLBootstrapVerdict_SchemaRejectsDrift(t *testing.T) { + t.Parallel() + + schemaPath := filepath.Join("testdata", "nccl_bootstrap_verdict.schema.json") + schemaBytes, err := os.ReadFile(schemaPath) //nolint:gosec // schemaPath is a test-local relative path + require.NoError(t, err) + + compiler := jsonschema.NewCompiler() + var schemaDoc any + require.NoError(t, json.Unmarshal(schemaBytes, &schemaDoc)) + require.NoError(t, compiler.AddResource(schemaPath, schemaDoc)) + schema, err := compiler.Compile(schemaPath) + require.NoError(t, err) + + validEvidence := []any{ + map[string]any{"kind": "training_pod", "uid": "u1", "timestamp": "2026-06-01T10:00:00Z", "description": "d"}, + map[string]any{"kind": "cni_network_event", "uid": "u2", "timestamp": "2026-06-01T10:00:30Z", "description": "d"}, + } + base := func() map[string]any { + return map[string]any{ + "pattern.id": "9", + "headline": "x", + "remediation": "y", + "confidence": "full", + "k8s.namespace.name": "training", + "gen_ai.training.job_id": "job-llama", + "tracecore.alert.nccl_bootstrap_timeout.cohort_size": 4, + "tracecore.alert.nccl_bootstrap_timeout.failed_rank_count": 4, + "tracecore.alert.nccl_bootstrap_timeout.discriminator": "cni_error", + "evidence_trail": validEvidence, + } + } + cases := []struct { + name string + mutate func(map[string]any) + guardName string + }{ + {"extra_top_level_field", func(m map[string]any) { m["future_field"] = "rejected" }, "additionalProperties:false"}, + {"pattern_id_numeric_not_string", func(m map[string]any) { m["pattern.id"] = 9 }, "pattern.id string const"}, + {"pattern_id_wrong_value", func(m map[string]any) { m["pattern.id"] = "99" }, "pattern.id const guard"}, + {"confidence_outside_enum", func(m map[string]any) { m["confidence"] = "maybe" }, "confidence enum"}, + {"discriminator_outside_enum", func(m map[string]any) { + m["tracecore.alert.nccl_bootstrap_timeout.discriminator"] = "catastrophic" + }, "discriminator enum"}, + {"namespace_empty", func(m map[string]any) { m["k8s.namespace.name"] = "" }, "namespace minLength"}, + {"cohort_size_zero", func(m map[string]any) { + m["tracecore.alert.nccl_bootstrap_timeout.cohort_size"] = 0 + }, "cohort_size minimum:1"}, + {"failed_rank_count_zero", func(m map[string]any) { + m["tracecore.alert.nccl_bootstrap_timeout.failed_rank_count"] = 0 + }, "failed_rank_count minimum:1"}, + {"evidence_kind_outside_enum", func(m map[string]any) { + m["evidence_trail"] = []any{map[string]any{ + "kind": "kernel_event", "uid": "u", "timestamp": "2026-06-01T10:00:00Z", "description": "d", + }} + }, "evidence_trail.kind enum"}, + {"evidence_trail_empty", func(m map[string]any) { + m["evidence_trail"] = []any{} + }, "evidence_trail minItems:1"}, + } + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + m := base() + tc.mutate(m) + require.Error(t, schema.Validate(m), + "schema must reject %s; guard %q regressed", tc.name, tc.guardName) + }) + } +} + +// mustParseTime is a local time-parse helper for the bootstrap tests. +// Distinct from the same-named helper in the patterndetectorprocessor +// package — that one is package-private. +func mustParseTime(t *testing.T, s string) time.Time { + t.Helper() + v, err := time.Parse(time.RFC3339Nano, s) + require.NoError(t, err) + return v +} diff --git a/module/pkg/patterns/testdata/nccl_bootstrap_verdict.schema.json b/module/pkg/patterns/testdata/nccl_bootstrap_verdict.schema.json new file mode 100644 index 00000000..b74ae6cf --- /dev/null +++ b/module/pkg/patterns/testdata/nccl_bootstrap_verdict.schema.json @@ -0,0 +1,96 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://tracecore.ai/schemas/patterns/nccl_bootstrap_verdict/v0", + "title": "NCCLBootstrapTimeoutVerdict", + "description": "NORTHSTAR pattern #9 (NCCL bootstrap timeout) verdict shape. Pinned by TestNCCLBootstrapVerdict_SchemaConformance. Emitted when a training-job cohort has at least one rank with no NCCL FlightRecorder record past BootstrapDeadline-from-pod-ready, optionally joined with a K8s CNI / network-readiness event in the same namespace within the correlation window. The discriminator names the proximate cause family (cni_error / socket_ifname_mismatch / rendezvous_unreachable / unknown).", + "type": "object", + "required": ["pattern.id", "headline", "remediation", "confidence", "evidence_trail", "k8s.namespace.name", "tracecore.alert.nccl_bootstrap_timeout.cohort_size", "tracecore.alert.nccl_bootstrap_timeout.failed_rank_count", "tracecore.alert.nccl_bootstrap_timeout.discriminator"], + "additionalProperties": false, + "properties": { + "pattern.id": { + "type": "string", + "const": "9", + "description": "nccl_bootstrap pattern identifier; string-typed numeric uniform across pattern detectors." + }, + "headline": { + "type": "string", + "minLength": 1 + }, + "remediation": { + "type": "string", + "minLength": 1 + }, + "confidence": { + "type": "string", + "enum": ["full", "partial"], + "description": "full = at least one bootstrap-failed rank joined a same-namespace CNI / network-readiness event within the correlation window; partial = every rank in the cohort failed bootstrap but no network event joined." + }, + "gen_ai.training.job_id": { + "type": "string", + "description": "Training job identifier used to group the cohort. Empty when the cohort was grouped on the namespace + job-label fallback (per spec open question #1)." + }, + "k8s.namespace.name": { + "type": "string", + "minLength": 1, + "description": "Kubernetes namespace hosting the affected training cohort." + }, + "tracecore.alert.nccl_bootstrap_timeout.cohort_size": { + "type": "integer", + "minimum": 1, + "description": "Number of ranks in the cohort the detector observed pod-ready signals for." + }, + "tracecore.alert.nccl_bootstrap_timeout.failed_rank_count": { + "type": "integer", + "minimum": 1, + "description": "Number of ranks in the cohort with no NCCL FlightRecorder record past BootstrapDeadline-from-pod-ready." + }, + "tracecore.alert.nccl_bootstrap_timeout.discriminator": { + "type": "string", + "enum": ["cni_error", "socket_ifname_mismatch", "rendezvous_unreachable", "unknown"], + "description": "Proximate-cause family. cni_error when a FailedCreatePodSandBox / NetworkNotReady / CNIError event joined; unknown otherwise." + }, + "failed_ranks": { + "type": "array", + "items": { + "type": "integer", + "minimum": 0 + }, + "description": "Sorted-ascending list of rank ordinals that never emitted an FR record. Empty omitted by the marshaller." + }, + "missing_layers": { + "type": "array", + "items": { + "type": "string", + "enum": ["cni_network_event"] + }, + "description": "Evidence layers that did not join. Populated when confidence=partial." + }, + "evidence_trail": { + "type": "array", + "minItems": 1, + "items": { + "type": "object", + "required": ["kind", "uid", "timestamp", "description"], + "additionalProperties": false, + "properties": { + "kind": { + "type": "string", + "enum": ["training_pod", "cni_network_event"] + }, + "uid": { + "type": "string", + "minLength": 1 + }, + "timestamp": { + "type": "string", + "format": "date-time" + }, + "description": { + "type": "string", + "minLength": 1 + } + } + } + } + } +} From e66982aa75c517b17f0991e3c95425d59d678e52 Mon Sep 17 00:00:00 2001 From: Tri Lam Date: Mon, 1 Jun 2026 01:31:18 -0700 Subject: [PATCH 2/5] =?UTF-8?q?feat(nccl-boot):=20GREEN=20=E2=80=94=20patt?= =?UTF-8?q?ern-9=20bootstrap-timeout=20detector?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Tri Lam --- module/pkg/patterns/nccl_bootstrap.go | 523 +++++++++++++++++++++ module/pkg/patterns/nccl_bootstrap_test.go | 2 +- 2 files changed, 524 insertions(+), 1 deletion(-) create mode 100644 module/pkg/patterns/nccl_bootstrap.go diff --git a/module/pkg/patterns/nccl_bootstrap.go b/module/pkg/patterns/nccl_bootstrap.go new file mode 100644 index 00000000..bb442fe1 --- /dev/null +++ b/module/pkg/patterns/nccl_bootstrap.go @@ -0,0 +1,523 @@ +// SPDX-License-Identifier: Apache-2.0 + +package patterns + +import ( + "fmt" + "sort" + "time" +) + +// Design spec: docs/patterns/09-nccl-bootstrap-timeout.md. The detector +// fires at job-start (sibling to pattern #8 — which fires mid-run): a +// training cohort whose pods are Ready past BootstrapDeadline but where +// at least one rank never emitted an NCCL FlightRecorder record is +// stuck in NCCL bootstrap. A joined K8s CNI / network-readiness event +// in the same namespace promotes the verdict to full confidence and +// stamps the proximate-cause discriminator. Without the network signal +// the verdict is partial — the detector can prove the bootstrap stuck +// but cannot name the cause family. +// +// Cohort grouping: gen_ai.training.job_id is the load-bearing key per +// the spec, with namespace-only fallback per the spec's open question +// #1 (the attribute is still alpha in ATTRIBUTES.md). Empty JobID on a +// TrainingPodRecord triggers the fallback path; the verdict's JobID +// field is empty to signal the fallback grouping. + +// DefaultBootstrapDeadline is the spec's BootstrapDeadline default — +// 5 min from pod-ready past which a rank with no NCCL FR record counts +// as bootstrap-stuck. 5 min matches the spec's reasoning: tight enough +// to catch a stuck cohort within the on-call rotation, loose enough to +// tolerate cold-cache image-pull lag on the first job after a node +// warm-up. Operators with very-slow image-pull paths raise this via +// NCCLBootstrapDetector.BootstrapDeadline. +const DefaultBootstrapDeadline = 5 * time.Minute + +// DefaultNCCLBootstrapCorrelationWindow is the max span between a +// cohort pod's ReadyAt and a same-namespace CNI / network event for +// the network signal to count toward the verdict's discriminator. The +// CNI event is most often emitted BEFORE the pod transitions to Ready +// (or shortly after, when the kubelet retries pod-sandbox setup). The +// window is anchored from the cohort's earliest ReadyAt and extends +// CorrelationWindow on either side. 10 min covers a typical CNI plugin +// retry budget plus a re-watch lag spike. +const DefaultNCCLBootstrapCorrelationWindow = 10 * time.Minute + +// PatternIDNCCLBootstrap is the nccl-bootstrap-timeout pattern +// identifier. Matches the NORTHSTAR pattern number (#9). +const PatternIDNCCLBootstrap = "9" + +// EvidenceKindTrainingPod names the per-pod training-cohort evidence +// surface. The "training_pod" wire value mirrors the k8sobjectsreceiver +// pod-lifecycle stream the cohort projection derives from. +const EvidenceKindTrainingPod = "training_pod" + +// EvidenceKindCNINetworkEvent names the K8s CNI / network-readiness +// event evidence surface — the K8s Event objects with reason in +// {FailedCreatePodSandBox, NetworkNotReady, CNIError}. +const EvidenceKindCNINetworkEvent = "cni_network_event" + +// NCCLBootstrapDiscriminator enumerates the verdict's proximate-cause +// family. Named type rejects raw string-literal switch arms. Cause +// taxonomy mirrors the spec's discriminator column. +type NCCLBootstrapDiscriminator string + +// Canonical NCCLBootstrapDiscriminator values. +const ( + // NCCLBootstrapDiscriminatorCNIError names the CNI-plugin / + // pod-sandbox failure branch — a same-namespace K8s event with + // reason in {FailedCreatePodSandBox, NetworkNotReady, CNIError} + // joined the cohort. + NCCLBootstrapDiscriminatorCNIError NCCLBootstrapDiscriminator = "cni_error" + + // NCCLBootstrapDiscriminatorSocketIfnameMismatch names the + // NCCL_SOCKET_IFNAME-pointing-at-a-non-existent-interface branch. + // Reserved for future use — requires the filelogreceiver OTTL + // recipe to surface the NCCL bootstrap log line shape. + NCCLBootstrapDiscriminatorSocketIfnameMismatch NCCLBootstrapDiscriminator = "socket_ifname_mismatch" + + // NCCLBootstrapDiscriminatorRendezvousUnreachable names the + // torchelastic / c10d rendezvous-server-unreachable branch. + // Reserved for future use. + NCCLBootstrapDiscriminatorRendezvousUnreachable NCCLBootstrapDiscriminator = "rendezvous_unreachable" + + // NCCLBootstrapDiscriminatorUnknown names the partial-verdict + // branch: every cohort rank failed bootstrap but no network event + // joined — the detector cannot name the cause family. + NCCLBootstrapDiscriminatorUnknown NCCLBootstrapDiscriminator = "unknown" +) + +// TrainingPodRecord is the typed projection of one training-cohort pod +// the detector consumes. Built by the patterndetectorprocessor from +// k8sobjectsreceiver pod-lifecycle log records (each pod-Ready +// transition becomes one record). Field names mirror the OTel resource +// attribute keys the upstream receiver stamps (k8s.* + gen_ai.*). +type TrainingPodRecord struct { + // JobID is the training-job identifier — the cohort key. Sourced + // from the gen_ai.training.job_id resource attribute (alpha per + // ATTRIBUTES.md per spec open question #1). Empty triggers the + // namespace-only fallback grouping; the verdict's JobID is then + // empty to signal the fallback path. + JobID string `json:"job_id,omitempty"` + + // Namespace is the Kubernetes namespace hosting the cohort. + // Load-bearing: the CNI / network event join is namespace-scoped, + // and the fallback cohort grouping uses (namespace) as the key. + Namespace string `json:"namespace"` + + // PodName is the per-pod k8s.pod.name. Carried for operator-facing + // prose; not part of the join key. + PodName string `json:"pod_name,omitempty"` + + // Rank is the per-pod gen_ai.training.rank ordinal. Load-bearing: + // drives the "which ranks failed bootstrap" cohort accounting. + Rank int64 `json:"rank"` + + // Node is the k8s.node.name the pod scheduled on. Carried for + // operator triage prose; not part of the join key. + Node string `json:"node,omitempty"` + + // ReadyAt is the wall-clock time the pod transitioned to Ready + // (k8s.pod.ready_time stamp; recipe-defined). Per the spec's edge + // case "cold cache prefetch", the BootstrapDeadline is measured + // from THIS time, not from job-creation, so a slow image pull does + // not produce a false-positive. + ReadyAt time.Time `json:"ready_at"` +} + +// CNINetworkEventRecord is the typed projection of one K8s Event with +// reason in {FailedCreatePodSandBox, NetworkNotReady, CNIError} — +// the events the spec's "network_event_present" predicate keys off. +// Built by the patterndetectorprocessor from k8sobjectsreceiver Event +// stream log records. +type CNINetworkEventRecord struct { + // EventUID is the upstream Event's metadata.uid — globally unique. + // Required for the evidence-trail UID. + EventUID string `json:"event_uid"` + + // Namespace is the Event's metadata.namespace. Load-bearing: the + // join with the training cohort is namespace-scoped. + Namespace string `json:"namespace"` + + // PodName is the events.k8s.io/v1 regarding.name when the Event is + // about a pod (e.g. FailedCreatePodSandBox is per-pod). Carried for + // operator-facing prose; not part of the join key. + PodName string `json:"pod_name,omitempty"` + + // Reason is the K8s Event Reason — one of FailedCreatePodSandBox / + // NetworkNotReady / CNIError per the spec's network_event_present + // vocabulary. + Reason string `json:"reason"` + + // Note is the human-readable Event message body. Rendered into the + // evidence-trail description. + Note string `json:"note,omitempty"` + + // Timestamp is the Event.EventTime. Used to bound the + // correlation-window check against the cohort's pod-ready times. + Timestamp time.Time `json:"timestamp"` +} + +// CNINetworkEventReasons is the canonical set of K8s Event Reasons the +// detector accepts as a "network_event_present" signal. Mirrors the +// spec's evaluation rule. +var CNINetworkEventReasons = map[string]struct{}{ + "FailedCreatePodSandBox": {}, + "NetworkNotReady": {}, + "CNIError": {}, +} + +// IsCNINetworkEventReason returns whether the given K8s Event Reason +// counts as a CNI / network-readiness signal for this detector. +// Exposed so the patterndetectorprocessor projection can gate without +// duplicating the vocabulary. +func IsCNINetworkEventReason(reason string) bool { + _, ok := CNINetworkEventReasons[reason] + return ok +} + +// NCCLBootstrapTimeoutVerdict is the NORTHSTAR pattern #9 output. JSON +// field names follow the spec's verdict-attributes table — the +// tracecore.alert.nccl_bootstrap_timeout.* namespace mirrors the +// other alert-bridge attributes (pcie_rate_collapse, ib_link_flap). +type NCCLBootstrapTimeoutVerdict struct { + PatternID string `json:"pattern.id"` + Headline string `json:"headline"` + Remediation string `json:"remediation"` + Confidence Confidence `json:"confidence"` + EvidenceTrail []EvidenceRef `json:"evidence_trail"` + + // JobID is the gen_ai.training.job_id of the affected cohort. + // Empty when the cohort was grouped on the namespace-only fallback. + JobID string `json:"gen_ai.training.job_id,omitempty"` + + // Namespace is the K8s namespace hosting the cohort. Always + // populated. + Namespace string `json:"k8s.namespace.name"` + + // CohortSize is the number of distinct ranks the detector saw a + // pod-Ready signal for. + CohortSize int `json:"tracecore.alert.nccl_bootstrap_timeout.cohort_size"` + + // FailedRankCount is the number of cohort ranks with no NCCL FR + // record past BootstrapDeadline. + FailedRankCount int `json:"tracecore.alert.nccl_bootstrap_timeout.failed_rank_count"` + + // Discriminator names the proximate-cause family — see the + // NCCLBootstrapDiscriminator* constants. + Discriminator NCCLBootstrapDiscriminator `json:"tracecore.alert.nccl_bootstrap_timeout.discriminator"` + + // FailedRanks is the sorted-ascending list of rank ordinals that + // never emitted an FR record. Carried so operators can attach to + // specific ranks for triage. Empty omitted by the marshaller. + FailedRanks []int64 `json:"failed_ranks,omitempty"` + + // MissingLayers names the evidence layers that did not join. + // Empty when Confidence==Full. Populated to + // ["cni_network_event"] when the CNI / network-event layer was + // missing. + MissingLayers []string `json:"missing_layers,omitempty"` +} + +// NCCLBootstrapDetector is the nccl_bootstrap_timeout pattern detector +// (NORTHSTAR pattern #9, per docs/patterns/09-nccl-bootstrap-timeout.md). +// Zero-value usage is permitted — BootstrapDeadline defaults to +// DefaultBootstrapDeadline; CorrelationWindow defaults to +// DefaultNCCLBootstrapCorrelationWindow; Now defaults to time.Now() at +// Evaluate time. +type NCCLBootstrapDetector struct { + // BootstrapDeadline is the max age past pod-ready past which a + // rank with no NCCL FR record counts as bootstrap-stuck. Zero + // means use DefaultBootstrapDeadline. + BootstrapDeadline time.Duration + + // CorrelationWindow is the max span between the cohort's earliest + // ReadyAt and a same-namespace CNI / network event for the event + // to count toward the verdict's discriminator. Zero means use + // DefaultNCCLBootstrapCorrelationWindow. + CorrelationWindow time.Duration + + // Now is the evaluation wall-clock. Overridable in tests so age + // counting doesn't depend on the wall clock. Zero means time.Now(). + Now time.Time +} + +// Evaluate scans the training-cohort projection and emits one +// NCCLBootstrapTimeoutVerdict per cohort whose pod-ready age has +// elapsed BootstrapDeadline AND at least one rank in the cohort has no +// NCCL FR record. +// +// Confidence is full when a same-namespace CNI / network event lands +// inside the correlation window; partial otherwise (discriminator = +// unknown). The spec's "all ranks failed" partial-confidence branch is +// also routed here when no network event joins. +// +// Output is sorted by (namespace ascending, job_id ascending) so +// golden tests are stable across input iteration order. +// +// Inputs are read-only snapshots; the detector does not mutate any +// slice. +func (d NCCLBootstrapDetector) Evaluate( + pods []TrainingPodRecord, + ncclRecs []NCCLFRRecord, + cniEvents []CNINetworkEventRecord, +) []NCCLBootstrapTimeoutVerdict { + deadline := d.BootstrapDeadline + if deadline <= 0 { + deadline = DefaultBootstrapDeadline + } + window := d.CorrelationWindow + if window <= 0 { + window = DefaultNCCLBootstrapCorrelationWindow + } + now := d.Now + if now.IsZero() { + now = time.Now() + } + + // Index "did this (node, rank) emit ANY NCCL FR record?" The + // (node, rank) tuple is the natural per-pod key — the + // rankjoinprocessor + k8sattributes stamp k8s.node.name on the FR + // log record's resource, and the projection promotes it onto + // NCCLFRRecord.Node (see projectNCCLFRRecord in the processor). + // (node, rank) avoids cross-job contamination: two cohorts in the + // same namespace land on different nodes, so a rank-0 FR record on + // gpu-node-0001 does not falsely clear rank-0 on gpu-node-0042 for + // a different cohort. FR records with empty Node are skipped from + // the index — without a Node we cannot decide which cohort the FR + // record belongs to, and conservatively counting it against every + // rank-0 cohort would cause cross-job false-negatives (suppressing + // real bootstrap failures). The receiver projection always stamps + // Node when available; absence is a wiring gap, not the norm. + type frankKey struct { + node string + rank int64 + } + frSeen := map[frankKey]struct{}{} + for _, r := range ncclRecs { + if r.Node == "" { + continue + } + frSeen[frankKey{r.Node, r.Rank}] = struct{}{} + } + + // Group pods by cohort key. (JobID, Namespace) when JobID is + // stamped; (Namespace) alone otherwise. The verdict's JobID is + // empty on the fallback path to signal the alpha-attribute gap. + type cohortKey struct { + jobID string + namespace string + } + cohorts := map[cohortKey][]TrainingPodRecord{} + for _, p := range pods { + k := cohortKey{p.JobID, p.Namespace} + cohorts[k] = append(cohorts[k], p) + } + + // Index CNI events by namespace so the per-cohort join is O(1) + // per cohort + O(|events_in_namespace|) per match. + eventsByNamespace := map[string][]CNINetworkEventRecord{} + for _, e := range cniEvents { + if !IsCNINetworkEventReason(e.Reason) { + // Defensive: the projection should have filtered, but if a + // caller hands us off-vocabulary events we don't treat them + // as network signals. + continue + } + eventsByNamespace[e.Namespace] = append(eventsByNamespace[e.Namespace], e) + } + + verdicts := make([]NCCLBootstrapTimeoutVerdict, 0, len(cohorts)) + for k, cohort := range cohorts { + // Per the spec: pod_ready_time = max(cohort.pods.ReadyTimestamp). + // A late-joining rank pushes the deadline forward — protects + // against a cold-cache rolling-readiness scenario. + maxReady := cohort[0].ReadyAt + for _, p := range cohort[1:] { + if p.ReadyAt.After(maxReady) { + maxReady = p.ReadyAt + } + } + if now.Sub(maxReady) < deadline { + continue + } + + // Identify bootstrap-failed ranks: those with no FR record on + // (namespace, rank). + failedRanks := []int64{} + seenRanks := map[int64]struct{}{} + for _, p := range cohort { + if _, dupe := seenRanks[p.Rank]; dupe { + continue + } + seenRanks[p.Rank] = struct{}{} + if _, ok := frSeen[frankKey{p.Node, p.Rank}]; !ok { + failedRanks = append(failedRanks, p.Rank) + } + } + if len(failedRanks) == 0 { + // Every rank bootstrapped — normal startup, no verdict. + continue + } + sort.Slice(failedRanks, func(i, j int) bool { return failedRanks[i] < failedRanks[j] }) + + // CNI event join: any in-namespace event whose Timestamp is + // within CorrelationWindow of the cohort's earliest ReadyAt + // (which anchors the start of the bootstrap interval). + minReady := cohort[0].ReadyAt + for _, p := range cohort[1:] { + if p.ReadyAt.Before(minReady) { + minReady = p.ReadyAt + } + } + var joinedEvent *CNINetworkEventRecord + for i := range eventsByNamespace[k.namespace] { + e := &eventsByNamespace[k.namespace][i] + if absDuration(e.Timestamp.Sub(minReady)) <= window { + joinedEvent = e + break + } + } + + verdicts = append(verdicts, buildNCCLBootstrapVerdict( + k.jobID, k.namespace, cohort, failedRanks, joinedEvent, maxReady, + )) + } + + sort.SliceStable(verdicts, func(i, j int) bool { + if verdicts[i].Namespace != verdicts[j].Namespace { + return verdicts[i].Namespace < verdicts[j].Namespace + } + return verdicts[i].JobID < verdicts[j].JobID + }) + return verdicts +} + +// absDuration returns |d|. +func absDuration(d time.Duration) time.Duration { + if d < 0 { + return -d + } + return d +} + +// buildNCCLBootstrapVerdict materializes one verdict from a cohort, +// the bootstrap-failed rank list, and (optionally) a joined CNI event. +// joinedEvent nil signals no network signal → confidence=partial, +// discriminator=unknown. +func buildNCCLBootstrapVerdict( + jobID, namespace string, + cohort []TrainingPodRecord, + failedRanks []int64, + joinedEvent *CNINetworkEventRecord, + maxReady time.Time, +) NCCLBootstrapTimeoutVerdict { + v := NCCLBootstrapTimeoutVerdict{ + PatternID: PatternIDNCCLBootstrap, + JobID: jobID, + Namespace: namespace, + CohortSize: distinctRankCount(cohort), + FailedRankCount: len(failedRanks), + FailedRanks: failedRanks, + } + + // Always lead the evidence trail with the cohort summary; the CNI + // event (when joined) comes second. Order mirrors causal order — + // the cohort observation is the pattern's primary trigger; the + // network event is the cross-layer evidence. + v.EvidenceTrail = []EvidenceRef{ + { + Kind: EvidenceKindTrainingPod, + UID: fmt.Sprintf("ns=%s/job=%s/ranks=%d/failed=%d", namespace, jobID, distinctRankCount(cohort), len(failedRanks)), + Timestamp: maxReady, + Description: trainingPodEvidenceDescription(namespace, jobID, distinctRankCount(cohort), failedRanks), + }, + } + + if joinedEvent != nil { + v.Confidence = ConfidenceFull + v.Discriminator = NCCLBootstrapDiscriminatorCNIError + v.EvidenceTrail = append(v.EvidenceTrail, EvidenceRef{ + Kind: EvidenceKindCNINetworkEvent, + UID: joinedEvent.EventUID, + Timestamp: joinedEvent.Timestamp, + Description: cniEventEvidenceDescription(*joinedEvent), + }) + } else { + v.Confidence = ConfidencePartial + v.Discriminator = NCCLBootstrapDiscriminatorUnknown + v.MissingLayers = []string{EvidenceKindCNINetworkEvent} + } + + v.Headline = nccBootstrapHeadline(v) + v.Remediation = nccBootstrapRemediation(v) + return v +} + +// distinctRankCount counts the number of distinct rank ordinals in the +// cohort — two TrainingPodRecord entries with the same rank are +// counted once (the same pod re-watched). Cohort size is the operator- +// visible quantity, so dedup matters. +func distinctRankCount(cohort []TrainingPodRecord) int { + seen := map[int64]struct{}{} + for _, p := range cohort { + seen[p.Rank] = struct{}{} + } + return len(seen) +} + +// trainingPodEvidenceDescription renders the cohort summary for the +// evidence trail. +func trainingPodEvidenceDescription(namespace, jobID string, cohortSize int, failedRanks []int64) string { + jobPart := jobID + if jobPart == "" { + jobPart = "" + } + return fmt.Sprintf( + "Training cohort %s in namespace %s: %d ranks observed Ready, %d failed bootstrap (ranks %s)", + jobPart, namespace, cohortSize, len(failedRanks), joinInts(failedRanks), + ) +} + +// cniEventEvidenceDescription renders the CNI / network event summary +// for the evidence trail. +func cniEventEvidenceDescription(e CNINetworkEventRecord) string { + target := e.PodName + if target == "" { + target = "(namespace-scoped)" + } + note := e.Note + if note == "" { + note = "no message body" + } + return fmt.Sprintf("K8s event %s in ns=%s on %s: %s", e.Reason, e.Namespace, target, note) +} + +// nccBootstrapHeadline renders the operator-facing one-liner. +func nccBootstrapHeadline(v NCCLBootstrapTimeoutVerdict) string { + job := v.JobID + if job == "" { + job = "(namespace-scoped cohort)" + } + if v.Confidence == ConfidenceFull { + return fmt.Sprintf( + "%d of %d ranks failed NCCL bootstrap in %s/%s; CNI / pod-network signal present.", + v.FailedRankCount, v.CohortSize, v.Namespace, job, + ) + } + return fmt.Sprintf( + "%d of %d ranks failed NCCL bootstrap in %s/%s; no concurrent CNI / network event observed.", + v.FailedRankCount, v.CohortSize, v.Namespace, job, + ) +} + +// nccBootstrapRemediation renders the operator-actionable remediation +// prose. Mirrors the spec's remediation column — directs operators +// to the three most common root causes and the multus / NetworkAttach- +// mentDefinition edge case. +func nccBootstrapRemediation(v NCCLBootstrapTimeoutVerdict) string { + return fmt.Sprintf( + "NCCL bootstrap stuck for %d of %d ranks in namespace %s. Verify pod-network readiness on the affected nodes; check NCCL_SOCKET_IFNAME points at an interface that exists inside the pod netns; confirm init-container ordering completes before the NCCL launcher; if using multus, inspect the NetworkAttachmentDefinition and per-pod annotations.", + v.FailedRankCount, v.CohortSize, v.Namespace, + ) +} diff --git a/module/pkg/patterns/nccl_bootstrap_test.go b/module/pkg/patterns/nccl_bootstrap_test.go index 4131f13d..59e723a6 100644 --- a/module/pkg/patterns/nccl_bootstrap_test.go +++ b/module/pkg/patterns/nccl_bootstrap_test.go @@ -396,7 +396,7 @@ func TestNCCLBootstrapVerdict_SchemaRejectsDrift(t *testing.T) { "confidence": "full", "k8s.namespace.name": "training", "gen_ai.training.job_id": "job-llama", - "tracecore.alert.nccl_bootstrap_timeout.cohort_size": 4, + "tracecore.alert.nccl_bootstrap_timeout.cohort_size": 4, "tracecore.alert.nccl_bootstrap_timeout.failed_rank_count": 4, "tracecore.alert.nccl_bootstrap_timeout.discriminator": "cni_error", "evidence_trail": validEvidence, From 156e90a6f26a17b42b93b79f9e5abf9da88f092d Mon Sep 17 00:00:00 2001 From: Tri Lam Date: Mon, 1 Jun 2026 01:39:25 -0700 Subject: [PATCH 3/5] feat(nccl-boot): wire pattern-9 detector + docs + ATTRIBUTES Signed-off-by: Tri Lam --- docs/ATTRIBUTES.md | 17 +- docs/patterns/09-nccl-bootstrap-timeout.md | 13 +- docs/patterns/README.md | 2 +- .../patterndetectorprocessor/config.go | 39 +++ .../example_config.yaml | 9 + .../nccl_bootstrap.go | 309 ++++++++++++++++++ .../nccl_bootstrap_test.go | 299 +++++++++++++++++ .../patterndetector.go | 2 + 8 files changed, 682 insertions(+), 8 deletions(-) create mode 100644 module/processor/patterndetectorprocessor/nccl_bootstrap.go create mode 100644 module/processor/patterndetectorprocessor/nccl_bootstrap_test.go diff --git a/docs/ATTRIBUTES.md b/docs/ATTRIBUTES.md index caf9e0ff..e76df10b 100644 --- a/docs/ATTRIBUTES.md +++ b/docs/ATTRIBUTES.md @@ -112,6 +112,9 @@ hardware signal. | `tracecore.alert.pcie_rate_collapse.direction` | string | tracecore-ext | alpha | `transmit` or `receive` — falls back to upstream `network.io.direction` if absent | OTTL metrics→logs recipe | `projectPCIeIORecord` | | `tracecore.alert.pcie_rate_collapse.drop_ratio` | double | tracecore-ext | alpha | Promoted drop-ratio scalar on `pcie_aer` verdicts so dashboards render histograms without parsing JSON | `patterndetectorprocessor.appendPCIeAERVerdict` | Operator dashboards (verdict-stream tier) | | `tracecore.alert.ib_link_flap.transition_count` | int | tracecore-ext | alpha | In-window ACTIVE→DOWN transition count promoted on `ib_link_flap` verdicts so dashboards distinguish "noisy 4 flaps" from "thrashing 40 flaps" | `patterndetectorprocessor.appendIBLinkFlapVerdict` | Operator dashboards | +| `tracecore.alert.nccl_bootstrap_timeout.cohort_size` | int | tracecore-ext | alpha | Distinct-rank count the pattern-#9 detector observed pod-Ready signals for, promoted on `nccl_bootstrap` verdicts | `patterndetectorprocessor.appendNCCLBootstrapVerdict` | Operator dashboards | +| `tracecore.alert.nccl_bootstrap_timeout.failed_rank_count` | int | tracecore-ext | alpha | Number of cohort ranks with no NCCL FR record past `BootstrapDeadline` | `patterndetectorprocessor.appendNCCLBootstrapVerdict` | Operator dashboards | +| `tracecore.alert.nccl_bootstrap_timeout.discriminator` | string | tracecore-ext | alpha | Pattern-#9 discriminator branch (`cni_error` / `socket_ifname_mismatch` / `rendezvous_unreachable` / `unknown`) | `patterndetectorprocessor.appendNCCLBootstrapVerdict` | Operator dashboards | --- @@ -207,6 +210,7 @@ extracted by the `k8sobjects-events` recipe's OTTL transform. | `k8s.regarding.namespace` | string | k8s-semconv | stable | Regarding-object namespace | k8sobjectsreceiver | `projectObjectRef` | | `k8s.regarding.uid` | string | k8s-semconv | stable | Regarding-object uid | k8sobjectsreceiver | `projectObjectRef` | | `k8s.pod_evicted_at` | string (RFC3339Nano) | tracecore-ext | stable | Eviction timestamp stamped onto rank records by the rank-join processor when an eviction window aligns with the rank's NCCL FlightRecorder activity | `rankjoinprocessor` (`AttrPodEvictedAt`) | Cross-signal joins, dashboards | +| `k8s.pod.ready_time` | int (unix-nano) \| string (RFC3339Nano) | tracecore-ext (k8s.* extension) | alpha | Pod `Ready` condition LastTransitionTime, stamped per training-pod log record by the k8sobjectsreceiver+OTTL recipe so pattern #9 (nccl_bootstrap) can measure BootstrapDeadline from pod-ready (not job-creation, per spec edge case "cold cache prefetch"). int-nanos preferred for OTel wire-format precision; RFC3339Nano string accepted as a fallback for recipe authors who stamp human-readable. | k8sobjectsreceiver+OTTL recipe (sibling to the nccl_bootstrap detector PR) | `projectTrainingPodRecord` | --- @@ -256,14 +260,16 @@ is no equivalent upstream contract. Upstream OTel `gen_ai` namespace. We use it as the cross-receiver join surface for distributed training workloads (rank, job id). -Today only `gen_ai.training.rank` is consumed; `gen_ai.training.job_id` -is contracted in RFC-0013 §3 but not yet wired through the pattern -library (future cross-receiver join surface). +`gen_ai.training.rank` is the primary rank key consumed across the +pattern library; `gen_ai.training.job_id` is consumed by the pattern-#9 +(nccl_bootstrap) detector as the cohort-grouping key, with namespace- +only fallback when the attribute is unstamped (per the spec's open- +questions resolution). | Attribute | Type | Source | Stability | Description | Emitted by | Consumed by | |---|---|---|---|---|---|---| -| `gen_ai.training.rank` | int | upstream-semconv (alpha) | development | Rank index — canonical per M19 | `rankjoinprocessor` (`module/processor/rankjoinprocessor`) | `projectNCCLFRRecord` (preferred over `nccl.rank` / `nccl.fr.rank`) | -| `gen_ai.training.job_id` | string | upstream-semconv (alpha) | alpha | Training-job id — contracted but not wired into pattern library yet | (future) | (future) | +| `gen_ai.training.rank` | int | upstream-semconv (alpha) | development | Rank index — canonical per M19 | `rankjoinprocessor` (`module/processor/rankjoinprocessor`) | `projectNCCLFRRecord` (preferred over `nccl.rank` / `nccl.fr.rank`); `projectTrainingPodRecord` | +| `gen_ai.training.job_id` | string | upstream-semconv (alpha) | alpha | Training-job id — cohort key for pattern #9 (nccl_bootstrap); empty falls back to namespace-only grouping | k8sobjectsreceiver + future OTTL recipe | `projectTrainingPodRecord`; `appendNCCLBootstrapVerdict` | --- @@ -303,6 +309,7 @@ need to fire?" without reading source. | `thermal_throttle` | `hw.gpu.throttle.duration.delta` + `hw.gpu.throttle.reason` + `gpu.id` | `hw.gpu.index`, `k8s.node.name` | `pattern.*`, `hw.gpu.throttle.cascade_size` | | `pcie_aer` *(wiring on a follow-up PR — projections present in library)* | `kernelevents.pcie_aer.severity` + `gpu.id` OR `tracecore.alert.pcie_rate_collapse.bytes_per_second` + `gpu.id` | `kernelevents.pcie_aer.type`, `network.io.direction`, `tracecore.alert.pcie_rate_collapse.{baseline_bytes_per_second,direction}` | `pattern.*` | | `nccl_hang` | `nccl.fr.collective_seq_id` + (one of `gen_ai.training.rank` \| `nccl.rank` \| `nccl.fr.rank`) | `nccl.fr.{pg_id,state,profiling_name,time_discovered_started_ns}` | `pattern.*`, `nccl.fr.{pg_id,collective_seq_id,hanging_ranks_count}` | +| `nccl_bootstrap` | (pod input) `k8s.pod.ready_time` + `gen_ai.training.rank` + `k8s.namespace.name`; (event input) `k8s.event.reason` ∈ `{FailedCreatePodSandBox, NetworkNotReady, CNIError}` + namespace | `gen_ai.training.job_id` (cohort grouping), `k8s.pod.name`, `k8s.node.name`, `k8s.event.{uid,note}`, `k8s.regarding.name` | `pattern.*`, `k8s.namespace.name`, `gen_ai.training.job_id`, `tracecore.alert.nccl_bootstrap_timeout.{cohort_size,failed_rank_count,discriminator}` | | `node_condition` (input to multiple patterns) | `k8s.node.name` + `k8s.node.condition.pressure` | `k8s.node.{uid,condition.message}` | (no direct verdict — feeds pod-eviction correlation) | --- diff --git a/docs/patterns/09-nccl-bootstrap-timeout.md b/docs/patterns/09-nccl-bootstrap-timeout.md index f5876e80..a2e38141 100644 --- a/docs/patterns/09-nccl-bootstrap-timeout.md +++ b/docs/patterns/09-nccl-bootstrap-timeout.md @@ -1,8 +1,8 @@ # Pattern #9 — NCCL bootstrap timeout -**Status:** ☐ planned (no detector implementation yet) +**Status:** ☑ shipped — detector at [`module/pkg/patterns/nccl_bootstrap.go`](../../module/pkg/patterns/nccl_bootstrap.go); wiring at [`module/processor/patterndetectorprocessor/nccl_bootstrap.go`](../../module/processor/patterndetectorprocessor/nccl_bootstrap.go). Verdict schema [`module/pkg/patterns/testdata/nccl_bootstrap_verdict.schema.json`](../../module/pkg/patterns/testdata/nccl_bootstrap_verdict.schema.json). -Design spec for the pattern-#9 detector. Sibling to pattern #8 — fires at job start, not mid-run. +Design spec retained below for the operator-facing walkthrough. Sibling to pattern #8 — fires at job start, not mid-run. ## Symptom @@ -73,3 +73,12 @@ Replay fixture under `module/pkg/replay/nccl_bootstrap_timeout/` (planned). Seed 2. **`BootstrapDeadline` default.** 5 min is reasonable for warm-cache clusters, too tight for cold-cache. Per-job override via annotation? 3. **CNI signal vocabulary.** Each CNI (Cilium, Calico, multus, ENI, GKE-native) emits different error strings on the same fault. Single OTTL recipe stanza or per-CNI? Pre-impl decision. 4. **Cohort-size discovery.** Without a `gen_ai.training.world_size` attribute, the detector cannot tell "3 of 8 failed" from "3 of 3 succeeded, 5 pods never started." Need a canonical world-size signal. + +## Implementation notes (shipped detector) + +The v0 detector resolves the open questions above with the most-conservative interpretation: + +1. **Cohort grouping.** Pods are grouped by `(gen_ai.training.job_id, k8s.namespace.name)` when the alpha `gen_ai.training.job_id` resource attribute is stamped; otherwise the detector falls back to namespace-only grouping and emits the verdict with an empty `gen_ai.training.job_id`. The empty job-id is the operator-visible signal that the fallback path fired. +2. **`BootstrapDeadline` default.** Ships at 5 min (`DefaultBootstrapDeadline`); operators raise it via `nccl_bootstrap_deadline` for cold-cache clusters. +3. **CNI signal vocabulary.** v0 ships the K8s-event-level vocabulary only (`FailedCreatePodSandBox` / `NetworkNotReady` / `CNIError` — the K8s control-plane-visible reasons that every CNI emits via the kubelet sandbox-setup path). Per-CNI raw-error parsing (Cilium / Calico / multus / ENI / GKE-native distinct strings) is a follow-up that lights up the `socket_ifname_mismatch` and `rendezvous_unreachable` discriminator branches. +4. **Cohort-size discovery.** v0 cohort size is the count of distinct ranks the detector observed pod-Ready signals for. Pods that never reached Ready (image-pull stuck) don't enter the cohort — they belong to pattern #15 (pod-evicted / scheduled-but-not-Ready). The verdict's `tracecore.alert.nccl_bootstrap_timeout.cohort_size` is the post-Ready count; the operator can compare it to the underlying `gen_ai.training.world_size` (when stamped) to detect a sub-quorum "pods never started" subcase. diff --git a/docs/patterns/README.md b/docs/patterns/README.md index b7187c98..c17caa6d 100644 --- a/docs/patterns/README.md +++ b/docs/patterns/README.md @@ -55,7 +55,7 @@ Engineering-facing pattern-design specs for the 8 unspec'd v1 patterns. Each fol | #2 InfiniBand link flap | [02-ib-link-flap.md](02-ib-link-flap.md) | ☐ planned | | #7 Dataloader hang | [07-dataloader-hang.md](07-dataloader-hang.md) | ☐ planned | | #8 NCCL timeout, no hardware cause | [08-nccl-timeout-no-hw.md](08-nccl-timeout-no-hw.md) | ☐ planned | -| #9 NCCL bootstrap timeout | [09-nccl-bootstrap-timeout.md](09-nccl-bootstrap-timeout.md) | ☐ planned | +| #9 NCCL bootstrap timeout | [09-nccl-bootstrap-timeout.md](09-nccl-bootstrap-timeout.md) | ☑ shipped | | #10 CUDA OOM, deceptive allocator | [10-cuda-oom-deceptive.md](10-cuda-oom-deceptive.md) | ☐ planned ([#303](https://github.com/TraceCoreAI/tracecore/issues/303) filed) | | #11 Checkpointer hang | [11-checkpointer-hang.md](11-checkpointer-hang.md) | ☐ planned | | #12 Loss spike → NaN | [12-loss-spike-nan.md](12-loss-spike-nan.md) | ☐ planned | diff --git a/module/processor/patterndetectorprocessor/config.go b/module/processor/patterndetectorprocessor/config.go index 809e6e74..5eaf1413 100644 --- a/module/processor/patterndetectorprocessor/config.go +++ b/module/processor/patterndetectorprocessor/config.go @@ -91,6 +91,19 @@ const DefaultIBLinkFlapMinTransitions = 2 // raise this via cuda_oom_correlation_window. const DefaultCUDAOOMCorrelationWindow = 2 * time.Minute +// DefaultNCCLBootstrapDeadline mirrors patterns.DefaultBootstrapDeadline +// — the max age past pod-ready past which a rank with no NCCL FR +// record counts as bootstrap-stuck. Operators on cold-cache clusters +// (slow image pulls) raise this via nccl_bootstrap_deadline. +const DefaultNCCLBootstrapDeadline = 5 * time.Minute + +// DefaultNCCLBootstrapCorrelationWindow mirrors +// patterns.DefaultNCCLBootstrapCorrelationWindow — the max span +// between the cohort's earliest pod-Ready and a same-namespace CNI / +// network event for the event to count toward the verdict's +// discriminator. Operators override via nccl_bootstrap_correlation_window. +const DefaultNCCLBootstrapCorrelationWindow = 10 * time.Minute + // DefaultCUDAOOMFBFreeFragmentationThreshold mirrors // patterns.DefaultCUDAOOMFBFreeFragmentationThreshold — the FB-free- // ratio floor at which the discriminator branches from `true_oom` to @@ -200,6 +213,18 @@ type Config struct { // [0, 1] — outside the range is unreachable on a well-formed // FB record. CUDAOOMFBFreeFragmentationThreshold float64 `yaml:"cuda_oom_fb_free_fragmentation_threshold,omitempty" mapstructure:"cuda_oom_fb_free_fragmentation_threshold"` + + // NCCLBootstrapDeadline is the max age past pod-ready past which a + // rank with no NCCL FR record counts as bootstrap-stuck. Zero + // means use DefaultNCCLBootstrapDeadline. Floor 1s — sub-1s + // deadlines chase normal pod-startup paths and produce noise. + NCCLBootstrapDeadline time.Duration `yaml:"nccl_bootstrap_deadline,omitempty" mapstructure:"nccl_bootstrap_deadline"` + + // NCCLBootstrapCorrelationWindow is the max span between the + // cohort's earliest pod-Ready and a same-namespace CNI / network + // event for the event to count toward the verdict's discriminator. + // Zero means use DefaultNCCLBootstrapCorrelationWindow. Floor 1s. + NCCLBootstrapCorrelationWindow time.Duration `yaml:"nccl_bootstrap_correlation_window,omitempty" mapstructure:"nccl_bootstrap_correlation_window"` } // Validate enforces operator-actionable rules. @@ -246,6 +271,12 @@ func (c *Config) Validate() error { if c.CUDAOOMFBFreeFragmentationThreshold < 0 || c.CUDAOOMFBFreeFragmentationThreshold > 1 { return fmt.Errorf("cuda_oom_fb_free_fragmentation_threshold: must be in [0, 1], got %v", c.CUDAOOMFBFreeFragmentationThreshold) } + if c.NCCLBootstrapDeadline != 0 && c.NCCLBootstrapDeadline < time.Second { + return fmt.Errorf("nccl_bootstrap_deadline: must be >= 1s, got %s", c.NCCLBootstrapDeadline) + } + if c.NCCLBootstrapCorrelationWindow != 0 && c.NCCLBootstrapCorrelationWindow < time.Second { + return fmt.Errorf("nccl_bootstrap_correlation_window: must be >= 1s, got %s", c.NCCLBootstrapCorrelationWindow) + } return nil } @@ -267,6 +298,8 @@ func defaultConfig() *Config { IBLinkFlapMinTransitions: DefaultIBLinkFlapMinTransitions, CUDAOOMCorrelationWindow: DefaultCUDAOOMCorrelationWindow, CUDAOOMFBFreeFragmentationThreshold: DefaultCUDAOOMFBFreeFragmentationThreshold, + NCCLBootstrapDeadline: DefaultNCCLBootstrapDeadline, + NCCLBootstrapCorrelationWindow: DefaultNCCLBootstrapCorrelationWindow, } } @@ -318,6 +351,12 @@ func (c *Config) withDefaults() *Config { if out.IBLinkFlapMinTransitions == 0 { out.IBLinkFlapMinTransitions = DefaultIBLinkFlapMinTransitions } + if out.NCCLBootstrapDeadline == 0 { + out.NCCLBootstrapDeadline = DefaultNCCLBootstrapDeadline + } + if out.NCCLBootstrapCorrelationWindow == 0 { + out.NCCLBootstrapCorrelationWindow = DefaultNCCLBootstrapCorrelationWindow + } return &out } diff --git a/module/processor/patterndetectorprocessor/example_config.yaml b/module/processor/patterndetectorprocessor/example_config.yaml index 43be7954..7ae71011 100644 --- a/module/processor/patterndetectorprocessor/example_config.yaml +++ b/module/processor/patterndetectorprocessor/example_config.yaml @@ -52,6 +52,15 @@ processors: # the window emits a partial verdict (kind=unknown). cuda_oom_correlation_window: 2m cuda_oom_fb_free_fragmentation_threshold: 0.05 + # nccl_bootstrap pattern (#9): max age past pod-Ready past which a + # rank with no NCCL FlightRecorder record counts as bootstrap-stuck + # (5min covers warm-cache clusters; raise on cold-cache clusters + # with slow image pulls). Correlation window bounds the + # same-namespace CNI / network-event join — a FailedCreatePodSandBox + # / NetworkNotReady / CNIError event in window promotes the verdict + # from partial to full and stamps discriminator=cni_error. + nccl_bootstrap_deadline: 5m + nccl_bootstrap_correlation_window: 10m exporters: debug: diff --git a/module/processor/patterndetectorprocessor/nccl_bootstrap.go b/module/processor/patterndetectorprocessor/nccl_bootstrap.go new file mode 100644 index 00000000..73076da4 --- /dev/null +++ b/module/processor/patterndetectorprocessor/nccl_bootstrap.go @@ -0,0 +1,309 @@ +// SPDX-License-Identifier: Apache-2.0 + +package patterndetectorprocessor + +import ( + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/zap" + + "github.com/tracecoreai/tracecore/module/pkg/patterns" +) + +// NCCL bootstrap-timeout (pattern #9) processor surface. +// +// Two input projections + one writer + one runner — same shape as the +// cuda_oom file (also a multi-input pattern): +// - projectTrainingPodRecord: k8sobjectsreceiver pod-Ready log lines +// → patterns.TrainingPodRecord +// - projectCNINetworkEventRecord: k8sobjectsreceiver Event log lines +// with reason in {FailedCreatePodSandBox, NetworkNotReady, CNIError} +// → patterns.CNINetworkEventRecord +// - appendNCCLBootstrapVerdict: writes one verdict log record with +// the promoted-scalar attrs per issue #270 +// - runNCCLBootstrapDetector: wires the detector against Config + +// NCCL bootstrap (pattern #9) verdict-attribute namespace. Promoted +// onto the verdict log record per issue #270 scalar-promotion contract +// so dashboards table-aggregate by namespace + discriminator + cohort +// size without server-side parsing of pattern.verdict_json. +const ( + // verdictAttrK8sNamespaceName promotes the k8s namespace onto the + // verdict log record. Reused for pod_evicted / xid_correlation + // would have benefited from the same vocabulary; today the field + // is pattern-#9-only because no other detector needed namespace + // disambiguation (pod-evicted promotes k8s.pod.namespace which is + // the same value via a different OTel-semconv name). + verdictAttrK8sNamespaceName = "k8s.namespace.name" + + // verdictAttrGenAITrainingJobID promotes the cohort's training- + // job id (the alpha gen_ai.training.job_id per ATTRIBUTES.md) onto + // the verdict. Empty (the namespace-only fallback) suppresses the + // stamp via putStrIfSet. + verdictAttrGenAITrainingJobID = "gen_ai.training.job_id" + + // verdictAttrNCCLBootstrapCohortSize promotes the cohort size + // scalar (number of distinct ranks the detector observed Ready) + // so dashboards can graph cohort fanout without parsing JSON. + verdictAttrNCCLBootstrapCohortSize = "tracecore.alert.nccl_bootstrap_timeout.cohort_size" + + // verdictAttrNCCLBootstrapFailedRankCount promotes the failed- + // rank-count scalar so dashboards can show "3 of 8 failed" without + // parsing JSON. + verdictAttrNCCLBootstrapFailedRankCount = "tracecore.alert.nccl_bootstrap_timeout.failed_rank_count" + + // verdictAttrNCCLBootstrapDiscriminator promotes the discriminator + // branch so dashboards can fan out by cause family (cni_error / + // socket_ifname_mismatch / rendezvous_unreachable / unknown). + verdictAttrNCCLBootstrapDiscriminator = "tracecore.alert.nccl_bootstrap_timeout.discriminator" +) + +// projectTrainingPodRecord reads OTel attributes off a log record and +// builds a patterns.TrainingPodRecord. The projection's gate is the +// presence of BOTH `k8s.pod.ready_time` (the recipe-stamped pod-Ready +// timestamp, the spec's "ReadyTimestamp") AND `gen_ai.training.rank` +// (the per-pod rank ordinal that drives cohort accounting). Namespace +// is read from the enclosing ResourceLogs's `k8s.namespace.name` +// (always present per k8sobjectsreceiver canonical resource attrs). +// +// `gen_ai.training.job_id` is optional per spec open question #1 — +// when absent the detector falls back to namespace-only cohort +// grouping; the projection passes empty through so the detector +// branch is exercised on real-world telemetry where the alpha +// attribute is not yet stamped. +// +// The integration-gap follow-up is the k8sobjectsreceiver+OTTL stanza +// that derives `k8s.pod.ready_time` from the pod Ready condition's +// LastTransitionTime; until that recipe lands this projection runs +// against hand-crafted log records (the wiring tests) and stays a +// NO-OP on real receiver output. +func projectTrainingPodRecord(lr plog.LogRecord, resAttrs pcommon.Map) (patterns.TrainingPodRecord, bool) { + attrs := lr.Attributes() + readyTime, ok := attrs.Get("k8s.pod.ready_time") + if !ok { + return patterns.TrainingPodRecord{}, false + } + rank, ok := attrs.Get("gen_ai.training.rank") + if !ok { + if v, rOK := resAttrs.Get("gen_ai.training.rank"); rOK { + rank = v + ok = true + } + } + if !ok { + return patterns.TrainingPodRecord{}, false + } + namespace, ok := resAttrs.Get("k8s.namespace.name") + if !ok { + if v, lrOK := attrs.Get("k8s.namespace.name"); lrOK { + namespace = v + ok = true + } + } + if !ok { + return patterns.TrainingPodRecord{}, false + } + r := patterns.TrainingPodRecord{ + Namespace: namespace.AsString(), + Rank: rank.Int(), + } + // Parse k8s.pod.ready_time. The recipe emits this as an int64 unix- + // nano scalar so the OTel wire format can carry it without string- + // parse ambiguity (string ISO-8601 also accepted as a fallback for + // recipe authors who prefer human-readable stamping). + switch readyTime.Type() { + case pcommon.ValueTypeInt: + r.ReadyAt = time.Unix(0, readyTime.Int()).UTC() + case pcommon.ValueTypeStr: + if t, err := time.Parse(time.RFC3339Nano, readyTime.AsString()); err == nil { + r.ReadyAt = t + } else { + return patterns.TrainingPodRecord{}, false + } + default: + return patterns.TrainingPodRecord{}, false + } + if v, ok := resAttrs.Get("gen_ai.training.job_id"); ok { + r.JobID = v.AsString() + } else if v, ok := attrs.Get("gen_ai.training.job_id"); ok { + r.JobID = v.AsString() + } + if v, ok := resAttrs.Get("k8s.pod.name"); ok { + r.PodName = v.AsString() + } else if v, ok := attrs.Get("k8s.pod.name"); ok { + r.PodName = v.AsString() + } + if v, ok := resAttrs.Get("k8s.node.name"); ok { + r.Node = v.AsString() + } else if v, ok := attrs.Get("k8s.node.name"); ok { + r.Node = v.AsString() + } + return r, true +} + +// projectCNINetworkEventRecord reads OTel attributes off a log record +// and builds a patterns.CNINetworkEventRecord. The projection's gate +// is the presence of `k8s.event.reason` with a value in the +// patterns.CNINetworkEventReasons set (FailedCreatePodSandBox / +// NetworkNotReady / CNIError) AND a namespace identifier (resource +// `k8s.namespace.name` preferred; per-record fallback to +// `k8s.regarding.namespace`). +// +// Distinct from projectPodEvent: projectPodEvent gates on +// `k8s.event.hint` (which the recipe stamps for pattern-#14 pod_evicted +// signals). CNI / network events don't have a tracecore hint vocabulary +// today; gating on the raw upstream `k8s.event.reason` avoids +// expanding the Hint enum just for this detector. The two projections +// are mutually exclusive on real input: an Event either has a tracecore +// hint stamped (pod_evicted path) or carries only the upstream Reason. +func projectCNINetworkEventRecord(lr plog.LogRecord, resAttrs pcommon.Map) (patterns.CNINetworkEventRecord, bool) { + attrs := lr.Attributes() + reason, ok := attrs.Get("k8s.event.reason") + if !ok { + return patterns.CNINetworkEventRecord{}, false + } + reasonStr := reason.AsString() + if !patterns.IsCNINetworkEventReason(reasonStr) { + return patterns.CNINetworkEventRecord{}, false + } + namespace := "" + if v, ok := resAttrs.Get("k8s.namespace.name"); ok { + namespace = v.AsString() + } else if v, ok := attrs.Get("k8s.namespace.name"); ok { + namespace = v.AsString() + } else if v, ok := attrs.Get("k8s.regarding.namespace"); ok { + namespace = v.AsString() + } + if namespace == "" { + return patterns.CNINetworkEventRecord{}, false + } + r := patterns.CNINetworkEventRecord{ + Namespace: namespace, + Reason: reasonStr, + } + if v, ok := attrs.Get("k8s.event.uid"); ok { + r.EventUID = v.AsString() + } + if v, ok := attrs.Get("k8s.event.note"); ok { + r.Note = v.AsString() + } + if v, ok := attrs.Get("k8s.regarding.name"); ok { + r.PodName = v.AsString() + } else if v, ok := resAttrs.Get("k8s.pod.name"); ok { + r.PodName = v.AsString() + } + if t := lr.Timestamp(); t != 0 { + r.Timestamp = t.AsTime() + } else { + r.Timestamp = lr.ObservedTimestamp().AsTime() + } + return r, true +} + +// collectNCCLBootstrapInputs walks the incoming plog.Logs and projects +// training-pod-Ready records + CNI / network event records out. +// Mirrors the cuda_oom collector's shape so the cross-cutting +// collectInputs doesn't have to grow another two return values. +// +// NCCL FR records are read off the existing nccl input (collected by +// the cross-cutting collectInputs) — the bootstrap detector consumes +// them as a third input. Avoids double-projection of FR records that +// would happen if this collector also walked the tree for them. +func collectNCCLBootstrapInputs(ld plog.Logs) ([]patterns.TrainingPodRecord, []patterns.CNINetworkEventRecord) { + var pods []patterns.TrainingPodRecord + var cnis []patterns.CNINetworkEventRecord + for i := 0; i < ld.ResourceLogs().Len(); i++ { + rl := ld.ResourceLogs().At(i) + resAttrs := rl.Resource().Attributes() + for j := 0; j < rl.ScopeLogs().Len(); j++ { + sl := rl.ScopeLogs().At(j) + // Skip records the processor itself emitted (the verdict + // scope) — a downstream re-injection of a verdict must + // not be re-projected as a bootstrap input. + if sl.Scope().Name() == instrumentationScope { + continue + } + for k := 0; k < sl.LogRecords().Len(); k++ { + lr := sl.LogRecords().At(k) + if rec, ok := projectTrainingPodRecord(lr, resAttrs); ok { + pods = append(pods, rec) + continue + } + if rec, ok := projectCNINetworkEventRecord(lr, resAttrs); ok { + cnis = append(cnis, rec) + } + } + } + } + return pods, cnis +} + +// appendNCCLBootstrapVerdict emits a nccl_bootstrap verdict log record. +// Promoted attrs: pattern.confidence, k8s.namespace.name, +// gen_ai.training.job_id, cohort_size, failed_rank_count, discriminator. +// +// putStrIfSet guards optional scalars (gen_ai.training.job_id is empty +// on the namespace-only fallback path). +func appendNCCLBootstrapVerdict(ld plog.Logs, v patterns.NCCLBootstrapTimeoutVerdict, logger *zap.Logger) { + appendVerdictRecord(ld, logger, verdictCommon{ + PatternID: v.PatternID, + Headline: v.Headline, + Remediation: v.Remediation, + EvidenceTrail: v.EvidenceTrail, + }, v, "nccl_bootstrap", func(attrs pcommon.Map) { + attrs.PutStr(verdictAttrConfidence, string(v.Confidence)) + putStrIfSet(attrs, verdictAttrK8sNamespaceName, v.Namespace) + putStrIfSet(attrs, verdictAttrGenAITrainingJobID, v.JobID) + attrs.PutInt(verdictAttrNCCLBootstrapCohortSize, int64(v.CohortSize)) + attrs.PutInt(verdictAttrNCCLBootstrapFailedRankCount, int64(v.FailedRankCount)) + attrs.PutStr(verdictAttrNCCLBootstrapDiscriminator, string(v.Discriminator)) + }) +} + +// runNCCLBootstrapDetector wires the patterns.NCCLBootstrapDetector +// against the processor config and emits one nccl_bootstrap verdict +// per match. Consumes NCCL FR records from the cross-cutting +// collectInputs (passed in via the runner's signature) to avoid +// double-projection. Hoisted out of patterndetectorProcessor.ConsumeLogs +// so the bootstrap-specific surface stays in one file. +func runNCCLBootstrapDetector( + ld plog.Logs, + ncclRecs []patterns.NCCLFRRecord, + cfg *Config, + emitPartial bool, + logger *zap.Logger, +) { + pods, cnis := collectNCCLBootstrapInputs(ld) + if len(pods) == 0 { + return + } + det := patterns.NCCLBootstrapDetector{ + BootstrapDeadline: nccBootstrapDeadline(cfg), + CorrelationWindow: nccBootstrapWindow(cfg), + } + for _, v := range det.Evaluate(pods, ncclRecs, cnis) { + if v.Confidence == patterns.ConfidencePartial && !emitPartial { + continue + } + appendNCCLBootstrapVerdict(ld, v, logger) + } +} + +// nccBootstrapDeadline + nccBootstrapWindow pull from Config with the +// library-level defaults as the fallback. Hoisted so the +// runNCCLBootstrapDetector call site is one expression per knob. +func nccBootstrapDeadline(cfg *Config) time.Duration { + if cfg == nil || cfg.NCCLBootstrapDeadline <= 0 { + return patterns.DefaultBootstrapDeadline + } + return cfg.NCCLBootstrapDeadline +} + +func nccBootstrapWindow(cfg *Config) time.Duration { + if cfg == nil || cfg.NCCLBootstrapCorrelationWindow <= 0 { + return patterns.DefaultNCCLBootstrapCorrelationWindow + } + return cfg.NCCLBootstrapCorrelationWindow +} diff --git a/module/processor/patterndetectorprocessor/nccl_bootstrap_test.go b/module/processor/patterndetectorprocessor/nccl_bootstrap_test.go new file mode 100644 index 00000000..01fc0ee8 --- /dev/null +++ b/module/processor/patterndetectorprocessor/nccl_bootstrap_test.go @@ -0,0 +1,299 @@ +// SPDX-License-Identifier: Apache-2.0 + +package patterndetectorprocessor + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + + "github.com/tracecoreai/tracecore/module/pkg/patterns" +) + +// nccl_bootstrap wiring tests (pattern #9). Each test builds the plog +// fixture an upstream k8sobjectsreceiver + (optionally) the NCCL FR +// recipe would emit, runs the processor end-to-end, and asserts the +// emitted verdict log record matches the spec. + +// TestPatternDetector_NCCLBootstrapWiringEmitsFullVerdict pins the +// canonical wiring contract: a training cohort of pod-Ready records + +// no NCCL FR records + a same-namespace CNI / network event emits one +// nccl_bootstrap verdict with Confidence=full and +// discriminator=cni_error. +func TestPatternDetector_NCCLBootstrapWiringEmitsFullVerdict(t *testing.T) { + t.Parallel() + + now := time.Now().UTC() + readyAt := now.Add(-10 * time.Minute) // past 5min default deadline + ld := plog.NewLogs() + + addPod := func(rank int64, podName, node string) { + rl := ld.ResourceLogs().AppendEmpty() + ra := rl.Resource().Attributes() + ra.PutStr("k8s.namespace.name", "training") + ra.PutStr("k8s.pod.name", podName) + ra.PutStr("k8s.node.name", node) + ra.PutStr("gen_ai.training.job_id", "job-llama-70b") + sl := rl.ScopeLogs().AppendEmpty() + lr := sl.LogRecords().AppendEmpty() + lr.SetTimestamp(pcommon.NewTimestampFromTime(readyAt)) + a := lr.Attributes() + a.PutInt("gen_ai.training.rank", rank) + a.PutInt("k8s.pod.ready_time", readyAt.UnixNano()) + } + addPod(0, "trainer-rank-0", "gpu-node-0001") + addPod(1, "trainer-rank-1", "gpu-node-0002") + addPod(2, "trainer-rank-2", "gpu-node-0003") + addPod(3, "trainer-rank-3", "gpu-node-0004") + + // CNI / network event in same namespace, in window. + evRL := ld.ResourceLogs().AppendEmpty() + evRL.Resource().Attributes().PutStr("k8s.namespace.name", "training") + evSL := evRL.ScopeLogs().AppendEmpty() + evLR := evSL.LogRecords().AppendEmpty() + evLR.SetTimestamp(pcommon.NewTimestampFromTime(readyAt.Add(30 * time.Second))) + ea := evLR.Attributes() + ea.PutStr("k8s.event.uid", "ev-cni-1") + ea.PutStr("k8s.event.reason", "FailedCreatePodSandBox") + ea.PutStr("k8s.event.note", "failed to set up sandbox container ... network not ready") + ea.PutStr("k8s.regarding.name", "trainer-rank-0") + + sink := newLogsSink() + p := newProcessor(testSettings(), defaultConfig(), sink) + require.NoError(t, p.Start(context.Background(), componenttestHost{})) + t.Cleanup(func() { _ = p.Shutdown(context.Background()) }) + require.NoError(t, p.ConsumeLogs(context.Background(), ld)) + + verdicts := extractNCCLBootstrapVerdicts(t, sink.at(0)) + require.Len(t, verdicts, 1) + + v := verdicts[0] + require.Equal(t, patterns.PatternIDNCCLBootstrap, v.PatternID) + require.Equal(t, patterns.ConfidenceFull, v.Confidence) + require.Equal(t, "training", v.Namespace) + require.Equal(t, "job-llama-70b", v.JobID) + require.Equal(t, 4, v.CohortSize) + require.Equal(t, 4, v.FailedRankCount) + require.Equal(t, patterns.NCCLBootstrapDiscriminatorCNIError, v.Discriminator) + + // Promoted scalars land on the verdict log record (issue #270). + promoted := extractNCCLBootstrapPromotedAttrs(t, sink.at(0)) + require.Equal(t, "full", promoted["pattern.confidence"]) + require.Equal(t, "training", promoted["k8s.namespace.name"]) + require.Equal(t, "job-llama-70b", promoted["gen_ai.training.job_id"]) + require.Equal(t, int64(4), promoted["tracecore.alert.nccl_bootstrap_timeout.cohort_size"]) + require.Equal(t, int64(4), promoted["tracecore.alert.nccl_bootstrap_timeout.failed_rank_count"]) + require.Equal(t, "cni_error", promoted["tracecore.alert.nccl_bootstrap_timeout.discriminator"]) +} + +// TestPatternDetector_NCCLBootstrapWiringPartialWhenNoCNIEvent pins the +// partial-verdict path: a cohort whose pods are past deadline with no +// FR records, AND no concurrent CNI event in the namespace, emits a +// partial-confidence verdict with discriminator=unknown. +func TestPatternDetector_NCCLBootstrapWiringPartialWhenNoCNIEvent(t *testing.T) { + t.Parallel() + + now := time.Now().UTC() + readyAt := now.Add(-10 * time.Minute) + ld := plog.NewLogs() + for rank, node := range []string{"gpu-node-0001", "gpu-node-0002"} { + rl := ld.ResourceLogs().AppendEmpty() + ra := rl.Resource().Attributes() + ra.PutStr("k8s.namespace.name", "training") + ra.PutStr("k8s.node.name", node) + ra.PutStr("gen_ai.training.job_id", "job-bert") + sl := rl.ScopeLogs().AppendEmpty() + lr := sl.LogRecords().AppendEmpty() + lr.SetTimestamp(pcommon.NewTimestampFromTime(readyAt)) + a := lr.Attributes() + a.PutInt("gen_ai.training.rank", int64(rank)) + a.PutInt("k8s.pod.ready_time", readyAt.UnixNano()) + } + + sink := newLogsSink() + p := newProcessor(testSettings(), defaultConfig(), sink) + require.NoError(t, p.Start(context.Background(), componenttestHost{})) + t.Cleanup(func() { _ = p.Shutdown(context.Background()) }) + require.NoError(t, p.ConsumeLogs(context.Background(), ld)) + + verdicts := extractNCCLBootstrapVerdicts(t, sink.at(0)) + require.Len(t, verdicts, 1) + require.Equal(t, patterns.ConfidencePartial, verdicts[0].Confidence) + require.Equal(t, patterns.NCCLBootstrapDiscriminatorUnknown, verdicts[0].Discriminator) +} + +// TestPatternDetector_NCCLBootstrapWiringSuppressesPartialWhenDisabled +// pins the emit_partial_verdicts=false toggle: a partial-confidence +// verdict is swallowed when the operator opts out. +func TestPatternDetector_NCCLBootstrapWiringSuppressesPartialWhenDisabled(t *testing.T) { + t.Parallel() + + now := time.Now().UTC() + readyAt := now.Add(-10 * time.Minute) + ld := plog.NewLogs() + for rank, node := range []string{"gpu-node-0001", "gpu-node-0002"} { + rl := ld.ResourceLogs().AppendEmpty() + ra := rl.Resource().Attributes() + ra.PutStr("k8s.namespace.name", "training") + ra.PutStr("k8s.node.name", node) + ra.PutStr("gen_ai.training.job_id", "job-quiet") + sl := rl.ScopeLogs().AppendEmpty() + lr := sl.LogRecords().AppendEmpty() + lr.SetTimestamp(pcommon.NewTimestampFromTime(readyAt)) + a := lr.Attributes() + a.PutInt("gen_ai.training.rank", int64(rank)) + a.PutInt("k8s.pod.ready_time", readyAt.UnixNano()) + } + + f := false + cfg := defaultConfig() + cfg.EmitPartialVerdicts = &f + sink := newLogsSink() + p := newProcessor(testSettings(), cfg, sink) + require.NoError(t, p.Start(context.Background(), componenttestHost{})) + t.Cleanup(func() { _ = p.Shutdown(context.Background()) }) + require.NoError(t, p.ConsumeLogs(context.Background(), ld)) + + require.Empty(t, extractNCCLBootstrapVerdicts(t, sink.at(0)), + "partial-confidence nccl_bootstrap verdict suppressed when emit_partial_verdicts=false") +} + +// TestPatternDetector_NCCLBootstrapWiringNormalStartupNoFire pins the +// no-FP guard: a cohort whose ranks all emitted at least one NCCL FR +// record MUST NOT emit a verdict — even past deadline. +func TestPatternDetector_NCCLBootstrapWiringNormalStartupNoFire(t *testing.T) { + t.Parallel() + + now := time.Now().UTC() + readyAt := now.Add(-10 * time.Minute) + frStarted := readyAt.Add(30 * time.Second) + ld := plog.NewLogs() + + for rank, node := range []string{"gpu-node-0001", "gpu-node-0002"} { + rl := ld.ResourceLogs().AppendEmpty() + ra := rl.Resource().Attributes() + ra.PutStr("k8s.namespace.name", "training") + ra.PutStr("k8s.node.name", node) + ra.PutStr("gen_ai.training.job_id", "job-ok") + sl := rl.ScopeLogs().AppendEmpty() + lr := sl.LogRecords().AppendEmpty() + lr.SetTimestamp(pcommon.NewTimestampFromTime(readyAt)) + a := lr.Attributes() + a.PutInt("gen_ai.training.rank", int64(rank)) + a.PutInt("k8s.pod.ready_time", readyAt.UnixNano()) + } + // FR records on both ranks — bootstrap succeeded. + for rank, node := range []string{"gpu-node-0001", "gpu-node-0002"} { + rl := ld.ResourceLogs().AppendEmpty() + rl.Resource().Attributes().PutStr("k8s.node.name", node) + sl := rl.ScopeLogs().AppendEmpty() + lr := sl.LogRecords().AppendEmpty() + lr.SetTimestamp(pcommon.NewTimestampFromTime(frStarted)) + a := lr.Attributes() + a.PutInt("gen_ai.training.rank", int64(rank)) + a.PutInt("nccl.fr.pg_id", 1) + a.PutInt("nccl.fr.collective_seq_id", 0) + a.PutStr("nccl.fr.state", "completed") + a.PutInt("nccl.fr.time_discovered_started_ns", frStarted.UnixNano()) + } + + sink := newLogsSink() + p := newProcessor(testSettings(), defaultConfig(), sink) + require.NoError(t, p.Start(context.Background(), componenttestHost{})) + t.Cleanup(func() { _ = p.Shutdown(context.Background()) }) + require.NoError(t, p.ConsumeLogs(context.Background(), ld)) + + require.Empty(t, extractNCCLBootstrapVerdicts(t, sink.at(0)), + "normal-startup cohort with FR records on every rank MUST NOT emit a bootstrap verdict") +} + +// TestConfig_NCCLBootstrapDeadlineValidationRejectsSubSecond pins the +// Validate guard: nccl_bootstrap_deadline under 1s is malconfig. +func TestConfig_NCCLBootstrapDeadlineValidationRejectsSubSecond(t *testing.T) { + t.Parallel() + cfg := &Config{NCCLBootstrapDeadline: 500 * time.Millisecond} + require.Error(t, cfg.Validate()) +} + +// TestConfig_NCCLBootstrapWindowValidationRejectsSubSecond pins the +// Validate guard on the correlation window. +func TestConfig_NCCLBootstrapWindowValidationRejectsSubSecond(t *testing.T) { + t.Parallel() + cfg := &Config{NCCLBootstrapCorrelationWindow: 500 * time.Millisecond} + require.Error(t, cfg.Validate()) +} + +// extractNCCLBootstrapVerdicts walks the output plog.Logs and decodes +// the nccl_bootstrap verdict JSON attribute on each verdict-shaped log +// record. +func extractNCCLBootstrapVerdicts(t *testing.T, ld plog.Logs) []patterns.NCCLBootstrapTimeoutVerdict { + t.Helper() + out := []patterns.NCCLBootstrapTimeoutVerdict{} + for i := 0; i < ld.ResourceLogs().Len(); i++ { + rl := ld.ResourceLogs().At(i) + for j := 0; j < rl.ScopeLogs().Len(); j++ { + sl := rl.ScopeLogs().At(j) + if sl.Scope().Name() != instrumentationScope { + continue + } + for k := 0; k < sl.LogRecords().Len(); k++ { + lr := sl.LogRecords().At(k) + patternID, ok := lr.Attributes().Get(verdictAttrPatternID) + if !ok || patternID.AsString() != patterns.PatternIDNCCLBootstrap { + continue + } + js, ok := lr.Attributes().Get(verdictAttrVerdictJSON) + if !ok { + continue + } + var v patterns.NCCLBootstrapTimeoutVerdict + require.NoError(t, json.Unmarshal([]byte(js.AsString()), &v)) + out = append(out, v) + } + } + } + return out +} + +// extractNCCLBootstrapPromotedAttrs walks the output plog.Logs and +// returns the promoted-scalar attrs on the first nccl_bootstrap verdict +// record. Used to assert the issue #270 scalar-promotion contract. +func extractNCCLBootstrapPromotedAttrs(t *testing.T, ld plog.Logs) map[string]any { + t.Helper() + for i := 0; i < ld.ResourceLogs().Len(); i++ { + rl := ld.ResourceLogs().At(i) + for j := 0; j < rl.ScopeLogs().Len(); j++ { + sl := rl.ScopeLogs().At(j) + if sl.Scope().Name() != instrumentationScope { + continue + } + for k := 0; k < sl.LogRecords().Len(); k++ { + lr := sl.LogRecords().At(k) + patternID, ok := lr.Attributes().Get(verdictAttrPatternID) + if !ok || patternID.AsString() != patterns.PatternIDNCCLBootstrap { + continue + } + out := map[string]any{} + lr.Attributes().Range(func(k string, v pcommon.Value) bool { + switch v.Type() { + case pcommon.ValueTypeStr: + out[k] = v.AsString() + case pcommon.ValueTypeInt: + out[k] = v.Int() + case pcommon.ValueTypeDouble: + out[k] = v.Double() + } + return true + }) + return out + } + } + } + t.Fatal("no nccl_bootstrap verdict record found in output") + return nil +} diff --git a/module/processor/patterndetectorprocessor/patterndetector.go b/module/processor/patterndetectorprocessor/patterndetector.go index 0dba29ea..c1a63198 100644 --- a/module/processor/patterndetectorprocessor/patterndetector.go +++ b/module/processor/patterndetectorprocessor/patterndetector.go @@ -316,6 +316,8 @@ func (p *patterndetectorProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs runCUDAOOMDetector(ld, p.cfg, p.cfg.emitPartialEnabled(), p.logger()) + runNCCLBootstrapDetector(ld, ncclRecs, p.cfg, p.cfg.emitPartialEnabled(), p.logger()) + if err := p.next.ConsumeLogs(ctx, ld); err != nil { return fmt.Errorf("patterndetector: next.ConsumeLogs: %w", err) } From 071a3e806e80604fb116e176581c8c3bee52734f Mon Sep 17 00:00:00 2001 From: Tri Lam Date: Mon, 1 Jun 2026 01:51:52 -0700 Subject: [PATCH 4/5] fix(nccl-boot): use min(ReadyAt) for deadline; max for evidence anchor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous max(ReadyAt) deadline gate silently SUPPRESSED verdicts for genuinely-stuck early ranks whenever a late-joining rank pushed max(ReadyAt) forward — a 15-min-ready stuck rank would be masked by a peer ready 2min ago. Switch the deadline gate to min(ReadyAt) so the bootstrap window is measured from the FIRST-ready rank's perspective — which is the rank whose bootstrap is genuinely stuck. max(ReadyAt) is retained for evidence-trail timestamp anchoring as the cohort's last-known-good Ready signal — the operator-visible "most recent Ready event on this cohort" surface. The slow-image-pull guard the original max() phrasing was meant to provide is naturally handled upstream: pods that haven't reached Ready don't enter the cohort at all (spec edge case "Slow image pull"). Tests: * Rename TestNCCLBootstrapDetector_MaxPodReadyDrivesAge → MinPodReadyDrivesAge with inverted assertion (verdict now fires). * New TestNCCLBootstrapDetector_LateJoinerDoesNotMaskStuckRank pins the load-bearing property with heterogeneous ReadyAt. * New TestNCCLBootstrapDetector_MaxPodReadyAnchorsEvidence pins max(ReadyAt) as the evidence-trail anchor. Signed-off-by: Tri Lam --- module/pkg/patterns/nccl_bootstrap.go | 43 +++++++---- module/pkg/patterns/nccl_bootstrap_test.go | 89 +++++++++++++++++++--- 2 files changed, 107 insertions(+), 25 deletions(-) diff --git a/module/pkg/patterns/nccl_bootstrap.go b/module/pkg/patterns/nccl_bootstrap.go index bb442fe1..a5b3e2ab 100644 --- a/module/pkg/patterns/nccl_bootstrap.go +++ b/module/pkg/patterns/nccl_bootstrap.go @@ -285,10 +285,14 @@ func (d NCCLBootstrapDetector) Evaluate( // gpu-node-0001 does not falsely clear rank-0 on gpu-node-0042 for // a different cohort. FR records with empty Node are skipped from // the index — without a Node we cannot decide which cohort the FR - // record belongs to, and conservatively counting it against every - // rank-0 cohort would cause cross-job false-negatives (suppressing - // real bootstrap failures). The receiver projection always stamps - // Node when available; absence is a wiring gap, not the norm. + // record belongs to. Skipping STRENGTHENS the absence signal (the + // rank stays "no FR seen" and counts as bootstrap-failed), which + // biases toward FALSE-POSITIVES rather than false-negatives: a + // real FR record from a wiring-gapped path won't clear the rank. + // Acceptable today because the receiver projection always stamps + // Node when available; absence is a wiring gap, not the norm. If + // the FP rate from this path proves material, the alternative is + // to fall back to a (namespace, rank) index for empty-Node records. type frankKey struct { node string rank int64 @@ -329,16 +333,33 @@ func (d NCCLBootstrapDetector) Evaluate( verdicts := make([]NCCLBootstrapTimeoutVerdict, 0, len(cohorts)) for k, cohort := range cohorts { - // Per the spec: pod_ready_time = max(cohort.pods.ReadyTimestamp). - // A late-joining rank pushes the deadline forward — protects - // against a cold-cache rolling-readiness scenario. + // Deadline check uses min(ReadyAt) so a stuck EARLY rank is + // detected even when later ranks join after the bootstrap + // window. The spec's pod_ready_time = max(ReadyAt) phrasing + // was intended to gate against slow-image-pull false positives + // (a cold-cache rolling-readiness scenario), but max() also + // silently SUPPRESSES verdicts when one late rank joins past + // an already-stuck early rank: a rank ready for 15 min that + // genuinely never bootstrapped would be masked by a peer that + // became Ready 2 min ago. min(ReadyAt) measures the actual + // bootstrap window from the FIRST-ready rank's perspective — + // which is the rank whose bootstrap is genuinely stuck. The + // slow-image-pull guard is naturally handled upstream: pods + // that haven't reached Ready don't enter the cohort at all + // (see spec edge case "Slow image pull"). max(ReadyAt) is + // retained for evidence-trail timestamp anchoring as the + // cohort's last-known-good Ready signal. + minReady := cohort[0].ReadyAt maxReady := cohort[0].ReadyAt for _, p := range cohort[1:] { + if p.ReadyAt.Before(minReady) { + minReady = p.ReadyAt + } if p.ReadyAt.After(maxReady) { maxReady = p.ReadyAt } } - if now.Sub(maxReady) < deadline { + if now.Sub(minReady) < deadline { continue } @@ -364,12 +385,6 @@ func (d NCCLBootstrapDetector) Evaluate( // CNI event join: any in-namespace event whose Timestamp is // within CorrelationWindow of the cohort's earliest ReadyAt // (which anchors the start of the bootstrap interval). - minReady := cohort[0].ReadyAt - for _, p := range cohort[1:] { - if p.ReadyAt.Before(minReady) { - minReady = p.ReadyAt - } - } var joinedEvent *CNINetworkEventRecord for i := range eventsByNamespace[k.namespace] { e := &eventsByNamespace[k.namespace][i] diff --git a/module/pkg/patterns/nccl_bootstrap_test.go b/module/pkg/patterns/nccl_bootstrap_test.go index 59e723a6..f8f5d3f2 100644 --- a/module/pkg/patterns/nccl_bootstrap_test.go +++ b/module/pkg/patterns/nccl_bootstrap_test.go @@ -307,26 +307,93 @@ func TestNCCLBootstrapDetector_DeterministicOrdering(t *testing.T) { require.Equal(t, "zeta", verdicts[1].Namespace) } -// TestNCCLBootstrapDetector_MaxPodReadyDrivesAge pins the spec's -// pod_ready_time = MAX over the cohort rule: one rank that became Ready -// recently (under deadline) bumps the cohort's effective ready -// timestamp forward and suppresses the verdict — bootstrap age must be -// measured from the LAST pod to become Ready, not the first. -func TestNCCLBootstrapDetector_MaxPodReadyDrivesAge(t *testing.T) { +// TestNCCLBootstrapDetector_MinPodReadyDrivesAge pins the deadline-gate +// semantics: pod_ready_time is measured from MIN(ReadyAt) over the +// cohort so a genuinely-stuck early rank is detected even when later +// ranks join after the bootstrap window. min() measures the actual +// bootstrap window from the FIRST-ready rank's perspective. +// +// This is a deliberate departure from the spec's original +// "max(ReadyAt)" phrasing — see the implementation comment in +// nccl_bootstrap.go and the spec's Implementation notes. The +// max-version silently masked a stuck early rank whenever a later rank +// joined past the early rank's ready window. +func TestNCCLBootstrapDetector_MinPodReadyDrivesAge(t *testing.T) { t.Parallel() now := mustParseTime(t, "2026-06-01T10:00:00Z") pods := []patterns.TrainingPodRecord{ - // Older ranks: 10 min ago. + // Two ranks ready 10 min ago — past the 5min default deadline. {JobID: "job-slow", Namespace: "training", PodName: "trainer-rank-0", Rank: 0, Node: "n0", ReadyAt: now.Add(-10 * time.Minute)}, {JobID: "job-slow", Namespace: "training", PodName: "trainer-rank-1", Rank: 1, Node: "n1", ReadyAt: now.Add(-10 * time.Minute)}, - // One late rank: just 2min ago — should bump effective ready to - // 2min, below the 5min default deadline. + // One late rank: just 2min ago. Under max() semantics this + // would suppress the verdict; under min() semantics the early + // ranks' 10-min stuck window still fires the verdict. {JobID: "job-slow", Namespace: "training", PodName: "trainer-rank-2", Rank: 2, Node: "n2", ReadyAt: now.Add(-2 * time.Minute)}, } - require.Empty(t, patterns.NCCLBootstrapDetector{Now: now}.Evaluate(pods, nil, nil), - "max(ReadyAt) must drive the deadline; the late rank suppresses the verdict") + verdicts := patterns.NCCLBootstrapDetector{Now: now}.Evaluate(pods, nil, nil) + require.Len(t, verdicts, 1, + "min(ReadyAt) must drive the deadline; an early stuck rank fires even when a later rank joins") + require.Equal(t, 3, verdicts[0].CohortSize) + require.Equal(t, 3, verdicts[0].FailedRankCount) +} + +// TestNCCLBootstrapDetector_LateJoinerDoesNotMaskStuckRank pins the +// load-bearing property of the min(ReadyAt) deadline check: a rank +// that has been Ready well past the bootstrap deadline is detected as +// stuck even when a peer rank joins the cohort after the deadline +// elapsed. This is the scenario the reviewer flagged on the original +// max(ReadyAt)-driven implementation. +func TestNCCLBootstrapDetector_LateJoinerDoesNotMaskStuckRank(t *testing.T) { + t.Parallel() + + now := mustParseTime(t, "2026-06-01T10:00:00Z") + + pods := []patterns.TrainingPodRecord{ + // rank-0 ready 10 min before "now" — 18 min ago by the time + // we evaluate at now+8min. Well past the 5min deadline. + {JobID: "job-late-join", Namespace: "training", PodName: "trainer-rank-0", Rank: 0, Node: "n0", ReadyAt: now.Add(-10 * time.Minute)}, + // rank-1 ready 2 min AFTER "now" — late joiner. Under + // max(ReadyAt) the deadline gate would treat this as + // "cohort just got Ready", suppressing the verdict. + {JobID: "job-late-join", Namespace: "training", PodName: "trainer-rank-1", Rank: 1, Node: "n1", ReadyAt: now.Add(2 * time.Minute)}, + } + + // Evaluate at now+8min: min(ReadyAt) age = 18min > 5min deadline, + // so the verdict must fire even though max(ReadyAt) age = 6min + // would also fire here — but the property holds at any eval time + // where min(age) > deadline regardless of max. + verdicts := patterns.NCCLBootstrapDetector{Now: now.Add(8 * time.Minute)}.Evaluate(pods, nil, nil) + require.Len(t, verdicts, 1, + "a late-joining rank MUST NOT mask a stuck early rank; min(ReadyAt) drives the deadline") + require.Equal(t, 2, verdicts[0].CohortSize) + require.Equal(t, 2, verdicts[0].FailedRankCount) +} + +// TestNCCLBootstrapDetector_MaxPodReadyAnchorsEvidence pins the +// evidence-trail anchoring semantic: max(ReadyAt) is the cohort's +// last-known-good Ready timestamp and is used to stamp the +// training_pod evidence record's Timestamp. min(ReadyAt) is used for +// the deadline gate (see TestNCCLBootstrapDetector_MinPodReadyDrivesAge); +// max(ReadyAt) is retained for evidence-trail anchoring so operators +// see the most-recent Ready event on the cohort. +func TestNCCLBootstrapDetector_MaxPodReadyAnchorsEvidence(t *testing.T) { + t.Parallel() + + now := mustParseTime(t, "2026-06-01T10:00:00Z") + earlyReady := now.Add(-10 * time.Minute) + lateReady := now.Add(-6 * time.Minute) // still past 5min deadline + + pods := []patterns.TrainingPodRecord{ + {JobID: "job-anchor", Namespace: "training", PodName: "trainer-rank-0", Rank: 0, Node: "n0", ReadyAt: earlyReady}, + {JobID: "job-anchor", Namespace: "training", PodName: "trainer-rank-1", Rank: 1, Node: "n1", ReadyAt: lateReady}, + } + verdicts := patterns.NCCLBootstrapDetector{Now: now}.Evaluate(pods, nil, nil) + require.Len(t, verdicts, 1) + require.Len(t, verdicts[0].EvidenceTrail, 1) + require.Equal(t, lateReady, verdicts[0].EvidenceTrail[0].Timestamp, + "training_pod evidence Timestamp must anchor on max(ReadyAt) — the cohort's last-known-good Ready signal") } // TestNCCLBootstrapVerdict_SchemaConformance pins the From cf66328024c4ce2c7fd642b0a80d22ea34bf9997 Mon Sep 17 00:00:00 2001 From: Tri Lam Date: Mon, 1 Jun 2026 01:52:24 -0700 Subject: [PATCH 5/5] fix(nccl-boot): tighten schema + spec + empty-Node comment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reviewer cleanup pass on yellow-tier findings: * Schema: tighten gen_ai.training.job_id to minLength:1 and document the fallback-grouping semantic explicitly — the field is OMITTED (not empty-string) on the namespace-only fallback path. Downstream consumers must treat ABSENCE as the explicit fallback signal, not silent exclusion. processor already uses putStrIfSet to suppress empty-string variants. * Spec eval rule: clarify Pattern #8 (NCCL hang) vs Pattern #9 (NCCL bootstrap) trigger disjoint-ness — hang fires on PRESENCE of non-completed FR records mid-run; bootstrap fires on ABSENCE of any FR record past deadline. Both can fire on the same cohort during a heterogeneous bootstrap by design. * Spec impl-notes: add note #5 documenting the min(ReadyAt) / max(ReadyAt) split with the late-joiner-masks-stuck-rank scenario as the rationale. * Empty-Node skip comment in detector: previous comment claimed the skip biases toward false-negatives; in fact it STRENGTHENS the absence signal (rank stays "no FR seen" → counted as failed), biasing toward false-positives. Correct the directionality and call out the fallback-to-(namespace, rank) escape hatch. Signed-off-by: Tri Lam --- docs/patterns/09-nccl-bootstrap-timeout.md | 7 +++++-- .../patterns/testdata/nccl_bootstrap_verdict.schema.json | 3 ++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/docs/patterns/09-nccl-bootstrap-timeout.md b/docs/patterns/09-nccl-bootstrap-timeout.md index a2e38141..083a654d 100644 --- a/docs/patterns/09-nccl-bootstrap-timeout.md +++ b/docs/patterns/09-nccl-bootstrap-timeout.md @@ -26,11 +26,11 @@ A new training job starts; NCCL never completes `ncclCommInitRank`. Logs show `N ``` for each TrainingJobCohort cohort (grouped by gen_ai.training.job_id): - pod_ready_time = max(cohort.pods.ReadyTimestamp) + pod_ready_time = min(cohort.pods.ReadyTimestamp) # see Impl note 5 age_since_ready = Now - pod_ready_time if age_since_ready < BootstrapDeadline: skip bootstrap_failed_ranks = ranks where - no NCCLFRRecord exists for rank + no NCCLFRRecord exists for rank # ABSENCE signal OR the only NCCL log lines are bootstrap-phase, no collective records network_event_present = exists k8sobjects event in cohort with reason in {FailedCreatePodSandBox, NetworkNotReady, CNIError} @@ -41,6 +41,8 @@ for each TrainingJobCohort cohort (grouped by gen_ai.training.job_id): emit NCCLBootstrapTimeoutVerdict (confidence: partial) ``` +**Pattern #8 (NCCL hang) vs Pattern #9 (NCCL bootstrap) — disjoint triggers.** Pattern #8 fires on the **PRESENCE** of non-completed FR records (a collective started but never completed mid-run); pattern #9 fires on the **ABSENCE** of any FR record for a rank past the bootstrap deadline. Both patterns can fire concurrently on the same cohort when a heterogeneous bootstrap leaves some ranks past `collective_seq_id == 0` and others not — by design, this is the operator-visible signal that a hetero-bootstrap is in progress, not a duplication bug. + ## Verdict attributes | Key | Type | Description | @@ -82,3 +84,4 @@ The v0 detector resolves the open questions above with the most-conservative int 2. **`BootstrapDeadline` default.** Ships at 5 min (`DefaultBootstrapDeadline`); operators raise it via `nccl_bootstrap_deadline` for cold-cache clusters. 3. **CNI signal vocabulary.** v0 ships the K8s-event-level vocabulary only (`FailedCreatePodSandBox` / `NetworkNotReady` / `CNIError` — the K8s control-plane-visible reasons that every CNI emits via the kubelet sandbox-setup path). Per-CNI raw-error parsing (Cilium / Calico / multus / ENI / GKE-native distinct strings) is a follow-up that lights up the `socket_ifname_mismatch` and `rendezvous_unreachable` discriminator branches. 4. **Cohort-size discovery.** v0 cohort size is the count of distinct ranks the detector observed pod-Ready signals for. Pods that never reached Ready (image-pull stuck) don't enter the cohort — they belong to pattern #15 (pod-evicted / scheduled-but-not-Ready). The verdict's `tracecore.alert.nccl_bootstrap_timeout.cohort_size` is the post-Ready count; the operator can compare it to the underlying `gen_ai.training.world_size` (when stamped) to detect a sub-quorum "pods never started" subcase. +5. **Deadline gate uses `min(ReadyAt)`; `max(ReadyAt)` anchors evidence.** The pseudocode above says `pod_ready_time = min(cohort.pods.ReadyTimestamp)` — a deliberate departure from an earlier draft that used `max(ReadyAt)`. The intent of the original `max()` phrasing was to gate against slow-image-pull false positives (a cold-cache rolling-readiness cohort). The problem with `max()`: a late-joining rank pushes the deadline forward and silently SUPPRESSES verdicts for genuinely-stuck early ranks. Concretely — rank-0 ReadyAt = T−10min, rank-1 ReadyAt = T+2min, deadline = 5min: `max(ReadyAt) = T+2min` means `age = Now − T−2min`, which falls under the 5min gate at any plausible "now"; rank-0 has been ready 15min and is genuinely stuck but never flagged. The detector therefore uses `min(ReadyAt)` for the deadline gate — measuring the bootstrap window from the FIRST rank to become Ready, which is the rank whose bootstrap is genuinely stuck. The slow-image-pull guard is naturally handled because pods that haven't reached Ready don't enter the cohort at all (edge case "Slow image pull"). `max(ReadyAt)` is retained as the cohort's last-known-good Ready signal for evidence-trail timestamp anchoring — the operator-visible "most recent Ready event on this cohort" surface. diff --git a/module/pkg/patterns/testdata/nccl_bootstrap_verdict.schema.json b/module/pkg/patterns/testdata/nccl_bootstrap_verdict.schema.json index b74ae6cf..2598b618 100644 --- a/module/pkg/patterns/testdata/nccl_bootstrap_verdict.schema.json +++ b/module/pkg/patterns/testdata/nccl_bootstrap_verdict.schema.json @@ -27,7 +27,8 @@ }, "gen_ai.training.job_id": { "type": "string", - "description": "Training job identifier used to group the cohort. Empty when the cohort was grouped on the namespace + job-label fallback (per spec open question #1)." + "minLength": 1, + "description": "Training job identifier used to group the cohort. OPTIONAL by design — the verdict OMITS this field entirely (it does NOT emit an empty string) when the cohort was grouped on the namespace-only fallback path (per spec open question #1: the alpha `gen_ai.training.job_id` resource attribute is not yet stamped). Downstream consumers MUST treat field absence as the explicit fallback-grouping signal — not a silent exclusion. When present the value is non-empty (minLength:1); the processor uses putStrIfSet to suppress the empty-string variant." }, "k8s.namespace.name": { "type": "string",