diff --git a/module/pkg/patterns/testdata/xid_correlation_verdict.schema.json b/module/pkg/patterns/testdata/xid_correlation_verdict.schema.json new file mode 100644 index 00000000..f27e0a98 --- /dev/null +++ b/module/pkg/patterns/testdata/xid_correlation_verdict.schema.json @@ -0,0 +1,65 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://tracecore.ai/schemas/patterns/xid_correlation_verdict/v0", + "title": "XidCorrelationVerdict", + "description": "v0.3.0 NORTHSTAR pattern #3 (xid_correlation) verdict shape. Pinned by TestXidCorrelationVerdict_SchemaConformance.", + "type": "object", + "required": ["pattern.id", "headline", "remediation", "evidence_trail", "xid_code", "node", "evicted_pod"], + "additionalProperties": false, + "properties": { + "pattern.id": { + "type": "string", + "const": "16", + "description": "xid_correlation pattern identifier; string-typed numeric uniform across pattern detectors." + }, + "headline": { + "type": "string", + "minLength": 1 + }, + "remediation": { + "type": "string", + "minLength": 1 + }, + "xid_code": { + "type": "integer", + "description": "NVIDIA driver Xid event code (e.g. 79 = GPU fallen off the bus)." + }, + "node": { + "type": "string", + "minLength": 1, + "description": "Kubernetes node name where both the Xid and the eviction occurred." + }, + "evicted_pod": { + "type": "string", + "minLength": 1, + "description": "namespace/name of the evicted pod that correlated with the Xid." + }, + "evidence_trail": { + "type": "array", + "minItems": 2, + "items": { + "type": "object", + "required": ["kind", "uid", "timestamp", "description"], + "additionalProperties": false, + "properties": { + "kind": { + "type": "string", + "enum": ["kernel_event", "pod_event"] + }, + "uid": { + "type": "string", + "minLength": 1 + }, + "timestamp": { + "type": "string", + "format": "date-time" + }, + "description": { + "type": "string", + "minLength": 1 + } + } + } + } + } +} diff --git a/module/pkg/patterns/verdict.go b/module/pkg/patterns/verdict.go index 14799520..f8394466 100644 --- a/module/pkg/patterns/verdict.go +++ b/module/pkg/patterns/verdict.go @@ -24,8 +24,8 @@ const ( // bottom as the operator-facing timeline. type EvidenceRef struct { // Kind names the receiver-side surface the evidence came from. - // Today's vocabulary: "pod_event", "node_condition". M17/M18 will - // extend with "kernel_event", "nccl_fr", "kineto", "pyspy". + // Today's vocabulary: "pod_event", "node_condition", "nccl_fr", + // "kernel_event". Future extensions: "kineto", "pyspy". Kind string `json:"kind"` // UID is the upstream identifier — for Kubernetes Events diff --git a/module/pkg/patterns/xid_correlation.go b/module/pkg/patterns/xid_correlation.go new file mode 100644 index 00000000..765ef016 --- /dev/null +++ b/module/pkg/patterns/xid_correlation.go @@ -0,0 +1,252 @@ +// SPDX-License-Identifier: Apache-2.0 + +package patterns + +import ( + "fmt" + "sort" + "time" +) + +// DefaultXidCorrelationWindow is the maximum gap between an Xid +// kernel event and a downstream pod eviction on the same node for +// the detector to join them. 60s reflects the kubelet's typical +// reaction to a GPU-driven NotReady transition: the driver-side Xid +// fires immediately on the hardware fault, the node-status loop +// flips NotReady within ~10-20s, the eviction-manager begins evicting +// pods within tens of seconds after that. Operators on long-tail +// drain paths (eviction-soft, controller-managed graceful drain) +// raise this via XidCorrelationDetector.CorrelationWindow. +const DefaultXidCorrelationWindow = 60 * time.Second + +// XidRecord is the typed projection of one GPU Xid kernel event. The +// patterndetectorprocessor builds these from log records carrying +// the customer-stable `kernelevents.xid` attribute (per RFC-0013 §3 +// and the journald-kernel recipe); the detector reads XidRecord +// values directly — no plog grep — so a schema rename in the OTTL +// recipe surfaces as a compile error. +type XidRecord struct { + // Code is the NVIDIA driver Xid event code. 79 ("GPU has fallen + // off the bus") is the canonical hardware-failure trigger; the + // full enumeration is documented in the NVIDIA driver release + // notes. The detector doesn't gate on Code value — any Xid + + // downstream eviction is operator-relevant. + Code int `json:"code"` + + // Node is the Kubernetes node name the Xid fired on. Stamped by + // the k8sattributes processor on the DaemonSet's resource + // attributes (`k8s.node.name`); the patterndetectorprocessor + // hoists it onto each XidRecord. + Node string `json:"node"` + + // Timestamp is the kernel log record's wall-clock time — the + // moment the GPU driver emitted the Xid line. + Timestamp time.Time `json:"timestamp"` + + // Detail is the raw kernel log line body, e.g. "NVRM: Xid (PCI: + // 0000:3b:00): 79, GPU has fallen off the bus". Rendered into + // the verdict's evidence-trail description. + Detail string `json:"detail,omitempty"` +} + +// XidCorrelationVerdict is the v0.3.0 NORTHSTAR pattern #3 output. +// JSON field names follow the verdict.schema.json snake-case +// convention. Distinct shape from PodEvictedVerdict (no confidence) +// because the pattern's emission rule is "both layers joined or no +// verdict" — partial verdicts would be Xid-without-eviction, which +// operators already see via the raw kernelevents.xid telemetry. +type XidCorrelationVerdict struct { + PatternID string `json:"pattern.id"` + Headline string `json:"headline"` + Remediation string `json:"remediation"` + EvidenceTrail []EvidenceRef `json:"evidence_trail"` + + // XidCode is the NVIDIA driver Xid code that triggered the + // correlation. Load-bearing for operator triage — pasted into + // NVIDIA driver-bug lookups. + XidCode int `json:"xid_code"` + + // Node is the Kubernetes node where both the Xid and the + // eviction occurred. Pinned on the verdict so the alert payload + // is self-contained. + Node string `json:"node"` + + // EvictedPod is the namespace/name of the evicted pod. One + // verdict emits per evicted pod (not per Xid) so alert routing + // can fan out to per-pod owners. + EvictedPod string `json:"evicted_pod"` +} + +// XidCorrelationDetector is the xid_correlation pattern detector +// (NORTHSTAR pattern #3 in the v0.3.0 ladder). Zero-value usage is +// permitted — CorrelationWindow defaults to +// DefaultXidCorrelationWindow. +type XidCorrelationDetector struct { + // CorrelationWindow is the maximum (eviction.EventTime - + // xid.Timestamp) for a same-node Xid → eviction pair to join. + // Zero means use DefaultXidCorrelationWindow. + CorrelationWindow time.Duration +} + +// Evaluate scans Xid kernel events and Pod eviction records and +// emits one XidCorrelationVerdict per (xid → evicted_pod) pair where +// the same node hosted both within CorrelationWindow and the +// eviction's EventTime is >= the Xid's Timestamp (causality flows +// forward). When several Xids on the same node fall inside the +// window before an eviction, the verdict cites the MOST RECENT Xid +// as the proximate cause. +// +// Output is sorted by (eviction EventTime ascending, EventUID +// ascending) so the slice is deterministic for golden tests. +// +// Inputs are read-only snapshots; the detector does not mutate +// either slice. Order of inputs is not assumed. +func (d XidCorrelationDetector) Evaluate(xids []XidRecord, events []Record) []XidCorrelationVerdict { + window := d.CorrelationWindow + if window <= 0 { + window = DefaultXidCorrelationWindow + } + + // Index Xids by node, ascending Timestamp, so we can binary- + // search for the most-recent Xid <= an eviction's EventTime in + // O(log N) per eviction. + xidIdx := indexXidsByNode(xids) + + verdicts := make([]XidCorrelationVerdict, 0) + for i := range events { + ev := events[i] + if ev.Hint != HintPodEvicted { + continue + } + xid, ok := mostRecentXidWithin(xidIdx[ev.ReportingInstance], ev.EventTime, window) + if !ok { + continue + } + verdicts = append(verdicts, buildXidCorrelationVerdict(xid, ev)) + } + + sort.SliceStable(verdicts, func(i, j int) bool { + ti := verdicts[i].EvidenceTrail[1].Timestamp + tj := verdicts[j].EvidenceTrail[1].Timestamp + if !ti.Equal(tj) { + return ti.Before(tj) + } + return verdicts[i].EvidenceTrail[1].UID < verdicts[j].EvidenceTrail[1].UID + }) + return verdicts +} + +// indexXidsByNode groups Xid records by node name and sorts each +// bucket by Timestamp ascending so the detector can binary-search +// for the most-recent Xid before an eviction. +func indexXidsByNode(xids []XidRecord) map[string][]XidRecord { + idx := map[string][]XidRecord{} + for _, x := range xids { + idx[x.Node] = append(idx[x.Node], x) + } + for k := range idx { + recs := idx[k] + sort.SliceStable(recs, func(i, j int) bool { + return recs[i].Timestamp.Before(recs[j].Timestamp) + }) + idx[k] = recs + } + return idx +} + +// mostRecentXidWithin returns the most recent XidRecord in the +// per-node-sorted bucket whose Timestamp is <= evTime and +// >= evTime-window. Bucket is sorted ascending; binary-search finds +// the rightmost element <= evTime, then we verify the window bound. +// Mirrors mostRecentConditionWithin's shape on the pod_evicted side. +func mostRecentXidWithin(bucket []XidRecord, evTime time.Time, window time.Duration) (XidRecord, bool) { + if len(bucket) == 0 { + return XidRecord{}, false + } + i := sort.Search(len(bucket), func(i int) bool { + return bucket[i].Timestamp.After(evTime) + }) + if i == 0 { + return XidRecord{}, false + } + candidate := bucket[i-1] + if evTime.Sub(candidate.Timestamp) > window { + return XidRecord{}, false + } + return candidate, true +} + +// buildXidCorrelationVerdict materializes the verdict for one Xid → +// eviction join. Evidence trail is in causal order: kernel_event +// first (the hardware fault), pod_event second (the kubelet's +// response). +func buildXidCorrelationVerdict(xid XidRecord, ev Record) XidCorrelationVerdict { + pod := displayPodName(ev) + dt := ev.EventTime.Sub(xid.Timestamp) + return XidCorrelationVerdict{ + PatternID: PatternIDXidCorrelation, + XidCode: xid.Code, + Node: xid.Node, + EvictedPod: pod, + Headline: xidCorrelationHeadline(xid, pod, dt), + Remediation: xidCorrelationRemediation(xid, pod), + EvidenceTrail: []EvidenceRef{ + { + Kind: EvidenceKindXid, + UID: xidEvidenceUID(xid), + Timestamp: xid.Timestamp, + Description: xidEvidenceDescription(xid), + }, + { + Kind: EvidenceKindPodEvent, + UID: ev.EventUID, + Timestamp: ev.EventTime, + Description: fmt.Sprintf("Pod %s evicted on node %s", pod, ev.ReportingInstance), + }, + }, + } +} + +// xidEvidenceUID synthesizes a stable identifier for the Xid +// evidence ref. kmsg lines have no upstream UID — the (node, code, +// timestamp) triple is the smallest globally-unique key. +func xidEvidenceUID(xid XidRecord) string { + return fmt.Sprintf("%s/xid=%d/%d", xid.Node, xid.Code, xid.Timestamp.UnixNano()) +} + +// xidEvidenceDescription renders the operator-facing prose for the +// Xid evidence ref. Falls back to a generic shape when Detail is +// empty (e.g. a downstream stripped the body for PII reasons). +func xidEvidenceDescription(xid XidRecord) string { + if xid.Detail != "" { + return xid.Detail + } + return fmt.Sprintf("GPU Xid %d on node %s", xid.Code, xid.Node) +} + +// xidCorrelationHeadline renders the operator-facing one-liner. The +// shape is regex-asserted: /Xid \d+ on .* → .* evicted .*s later/. +func xidCorrelationHeadline(xid XidRecord, pod string, dt time.Duration) string { + return fmt.Sprintf("Xid %d on %s → %s evicted %ds later", xid.Code, xid.Node, pod, int(dt.Seconds())) +} + +// xidCorrelationRemediation returns the operator-actionable +// remediation prose. Pins the offending node + pod so the alert +// payload is self-contained. +func xidCorrelationRemediation(xid XidRecord, pod string) string { + return fmt.Sprintf( + "Likely GPU hardware failure on node %s (Xid %d). Drain the node, reseat or RMA the GPU, and reschedule pod %s once node returns Ready. Inspect dmesg / nvidia-smi -q for further driver state.", + xid.Node, xid.Code, pod, + ) +} + +// PatternIDXidCorrelation is the xid-correlation pattern identifier. +// String-typed numeric, uniform across pattern detectors. Unique +// vs PatternIDPodEvicted ("14") and PatternIDNCCLHang ("15"). +const PatternIDXidCorrelation = "16" + +// EvidenceKindXid names the GPU-Xid kernel-event surface. Hoisted +// alongside EvidenceKindPodEvent / EvidenceKindNodeCondition / +// EvidenceKindNCCLFR. The "kernel_event" wire value matches the +// vocabulary documented on verdict.go's EvidenceRef.Kind comment. +const EvidenceKindXid = "kernel_event" diff --git a/module/pkg/patterns/xid_correlation_test.go b/module/pkg/patterns/xid_correlation_test.go new file mode 100644 index 00000000..7df0a3dc --- /dev/null +++ b/module/pkg/patterns/xid_correlation_test.go @@ -0,0 +1,451 @@ +// SPDX-License-Identifier: Apache-2.0 + +package patterns_test + +import ( + "encoding/json" + "os" + "path/filepath" + "regexp" + "testing" + "time" + + "github.com/santhosh-tekuri/jsonschema/v6" + "github.com/stretchr/testify/require" + + "github.com/tracecoreai/tracecore/module/pkg/patterns" +) + +// xid_correlation detector test suite. The detector reads GPU Xid +// kernel events alongside Kubernetes Pod eviction Records and emits +// one XidCorrelationVerdict per (Xid → Pod-Evicted) join within a +// configurable correlation window. An Xid without a downstream +// eviction (on the same node, within the window) emits no verdict — +// the pattern's operator value is the correlation, not the Xid alone. + +// TestXidCorrelationDetector_PositiveXid79ThenEviction pins the +// canonical hardware-failure-to-eviction journey: an Xid 79 (GPU +// fallen off bus) lands on gpu-node-0001, 10s later the kubelet +// evicts a pod on that same node. The detector joins them and emits +// one verdict naming both surfaces in causal order. +func TestXidCorrelationDetector_PositiveXid79ThenEviction(t *testing.T) { + t.Parallel() + + xidAt := time.Unix(1_700_000_000, 0).UTC() + evictAt := xidAt.Add(10 * time.Second) + xids := []patterns.XidRecord{ + { + Code: 79, + Node: "gpu-node-0001", + Timestamp: xidAt, + Detail: "NVRM: Xid (PCI:0000:3b:00): 79, GPU has fallen off the bus.", + }, + } + events := []patterns.Record{ + { + EventUID: "evict-1", + Reason: "Evicted", + Hint: patterns.HintPodEvicted, + Note: "Pod evicted by kubelet after node went NotReady.", + ReportingInstance: "gpu-node-0001", + EventTime: evictAt, + Regarding: patterns.ObjectRef{Kind: "Pod", Namespace: "training", Name: "job-rank-3", UID: "pod-uid-3"}, + }, + } + + verdicts := patterns.XidCorrelationDetector{}.Evaluate(xids, events) + require.Len(t, verdicts, 1) + + v := verdicts[0] + require.Equal(t, patterns.PatternIDXidCorrelation, v.PatternID) + require.Equal(t, 79, v.XidCode) + require.Equal(t, "gpu-node-0001", v.Node) + require.Equal(t, "training/job-rank-3", v.EvictedPod) + + headlineRE := regexp.MustCompile(`Xid 79.*gpu-node-0001.*training/job-rank-3`) + require.Regexp(t, headlineRE, v.Headline, "headline must name the Xid code, node, and pod") + require.Regexp(t, regexp.MustCompile(`GPU|hardware|drain`), v.Remediation) + + require.Len(t, v.EvidenceTrail, 2, "two evidence layers: kernel_event + pod_event") + require.Equal(t, patterns.EvidenceKindXid, v.EvidenceTrail[0].Kind, "Xid surfaces first (causal order)") + require.Equal(t, patterns.EvidenceKindPodEvent, v.EvidenceTrail[1].Kind) + require.Equal(t, "evict-1", v.EvidenceTrail[1].UID) +} + +// TestXidCorrelationDetector_NegativeXidNoEviction pins the +// no-correlation contract: an Xid arrives but no pod on the same node +// is evicted in the window. Zero verdicts — the pattern's operator +// value is the join, not the kernel event alone (operators get raw +// Xid telemetry via the kernelevents.xid attribute already). +func TestXidCorrelationDetector_NegativeXidNoEviction(t *testing.T) { + t.Parallel() + + xidAt := time.Unix(1_700_000_000, 0).UTC() + xids := []patterns.XidRecord{ + {Code: 79, Node: "gpu-node-0001", Timestamp: xidAt, Detail: "NVRM: Xid 79"}, + } + verdicts := patterns.XidCorrelationDetector{}.Evaluate(xids, nil) + require.Empty(t, verdicts) +} + +// TestXidCorrelationDetector_NegativeCrossNode asserts the +// same-node-required contract: an Xid on node-A and a pod eviction +// on node-B inside the window do NOT correlate — eviction-on-B is +// not caused by GPU-failure-on-A. +func TestXidCorrelationDetector_NegativeCrossNode(t *testing.T) { + t.Parallel() + + xidAt := time.Unix(1_700_000_000, 0).UTC() + evictAt := xidAt.Add(10 * time.Second) + xids := []patterns.XidRecord{ + {Code: 79, Node: "node-A", Timestamp: xidAt, Detail: "NVRM: Xid 79"}, + } + events := []patterns.Record{ + { + EventUID: "evict-cross", + Reason: "Evicted", + Hint: patterns.HintPodEvicted, + ReportingInstance: "node-B", + EventTime: evictAt, + Regarding: patterns.ObjectRef{Kind: "Pod", Namespace: "t", Name: "p"}, + }, + } + + verdicts := patterns.XidCorrelationDetector{}.Evaluate(xids, events) + require.Empty(t, verdicts, "cross-node correlation MUST NOT fire") +} + +// TestXidCorrelationDetector_EdgeOutOfWindow pins the window bound: +// an Xid + eviction on the same node but separated by more than the +// default correlation window (60s) do NOT correlate. 61s is the +// just-over-the-edge falsifier. +func TestXidCorrelationDetector_EdgeOutOfWindow(t *testing.T) { + t.Parallel() + + xidAt := time.Unix(1_700_000_000, 0).UTC() + evictAt := xidAt.Add(61 * time.Second) + xids := []patterns.XidRecord{ + {Code: 79, Node: "gpu-node-0001", Timestamp: xidAt, Detail: "NVRM: Xid 79"}, + } + events := []patterns.Record{ + { + EventUID: "evict-late", + Reason: "Evicted", + Hint: patterns.HintPodEvicted, + ReportingInstance: "gpu-node-0001", + EventTime: evictAt, + Regarding: patterns.ObjectRef{Kind: "Pod", Namespace: "t", Name: "p"}, + }, + } + + verdicts := patterns.XidCorrelationDetector{}.Evaluate(xids, events) + require.Empty(t, verdicts, "eviction outside default 60s window MUST NOT correlate") +} + +// TestXidCorrelationDetector_EdgeAtWindowBoundary pins the inclusive +// side of the window: eviction exactly `window` after Xid MUST +// correlate (strict-greater-than check on the boundary). Paired with +// EdgeOutOfWindow to fence both sides — flipping the operator from +// `>` to `>=` in mostRecentXidWithin would now fail a test. +func TestXidCorrelationDetector_EdgeAtWindowBoundary(t *testing.T) { + t.Parallel() + + xidAt := time.Unix(1_700_000_000, 0).UTC() + evictAt := xidAt.Add(60 * time.Second) + xids := []patterns.XidRecord{ + {Code: 79, Node: "gpu-node-0001", Timestamp: xidAt, Detail: "NVRM: Xid 79"}, + } + events := []patterns.Record{ + { + EventUID: "evict-edge", + Reason: "Evicted", + Hint: patterns.HintPodEvicted, + ReportingInstance: "gpu-node-0001", + EventTime: evictAt, + Regarding: patterns.ObjectRef{Kind: "Pod", Namespace: "t", Name: "p"}, + }, + } + + verdicts := patterns.XidCorrelationDetector{}.Evaluate(xids, events) + require.Len(t, verdicts, 1, "eviction at exactly window boundary MUST correlate") +} + +// TestXidCorrelationDetector_MultiPodPerXid pins the multi-pod +// emission decision (commit body §): one verdict PER evicted pod. +// Per-pod is the operator-actionable shape — each verdict's +// Remediation pins a specific pod to drain/recreate; collapsing into +// one verdict would force the operator to parse a list and lose the +// alert-routing-by-pod path. 3 pods evicted on the same node after +// one Xid → 3 verdicts, deterministically sorted. +func TestXidCorrelationDetector_MultiPodPerXid(t *testing.T) { + t.Parallel() + + xidAt := time.Unix(1_700_000_000, 0).UTC() + xids := []patterns.XidRecord{ + {Code: 79, Node: "gpu-node-0001", Timestamp: xidAt, Detail: "NVRM: Xid 79"}, + } + events := []patterns.Record{ + { + EventUID: "evict-a", Reason: "Evicted", Hint: patterns.HintPodEvicted, + ReportingInstance: "gpu-node-0001", EventTime: xidAt.Add(5 * time.Second), + Regarding: patterns.ObjectRef{Kind: "Pod", Namespace: "t", Name: "pod-a"}, + }, + { + EventUID: "evict-b", Reason: "Evicted", Hint: patterns.HintPodEvicted, + ReportingInstance: "gpu-node-0001", EventTime: xidAt.Add(10 * time.Second), + Regarding: patterns.ObjectRef{Kind: "Pod", Namespace: "t", Name: "pod-b"}, + }, + { + EventUID: "evict-c", Reason: "Evicted", Hint: patterns.HintPodEvicted, + ReportingInstance: "gpu-node-0001", EventTime: xidAt.Add(20 * time.Second), + Regarding: patterns.ObjectRef{Kind: "Pod", Namespace: "t", Name: "pod-c"}, + }, + } + + verdicts := patterns.XidCorrelationDetector{}.Evaluate(xids, events) + require.Len(t, verdicts, 3, "one verdict per evicted pod") + require.Equal(t, "t/pod-a", verdicts[0].EvictedPod, "sorted by eviction EventTime") + require.Equal(t, "t/pod-b", verdicts[1].EvictedPod) + require.Equal(t, "t/pod-c", verdicts[2].EvictedPod) +} + +// TestXidCorrelationDetector_WindowConfigurable asserts the +// CorrelationWindow field overrides the default — operators on +// long-tail eviction paths (eviction-soft thresholds, controller +// drain) raise it. +func TestXidCorrelationDetector_WindowConfigurable(t *testing.T) { + t.Parallel() + + xidAt := time.Unix(1_700_000_000, 0).UTC() + evictAt := xidAt.Add(90 * time.Second) // 90s, over default 60s + xids := []patterns.XidRecord{ + {Code: 79, Node: "gpu-node-0001", Timestamp: xidAt, Detail: "NVRM: Xid 79"}, + } + events := []patterns.Record{ + { + EventUID: "evict-late", Reason: "Evicted", Hint: patterns.HintPodEvicted, + ReportingInstance: "gpu-node-0001", EventTime: evictAt, + Regarding: patterns.ObjectRef{Kind: "Pod", Namespace: "t", Name: "p"}, + }, + } + + // Default window (60s): no verdict. + require.Empty(t, patterns.XidCorrelationDetector{}.Evaluate(xids, events)) + + // Loosened window (2min): correlation fires. + v := patterns.XidCorrelationDetector{CorrelationWindow: 2 * time.Minute}.Evaluate(xids, events) + require.Len(t, v, 1) +} + +// TestXidCorrelationDetector_PreXidEvictionExcluded pins the +// causal-direction contract: eviction BEFORE the Xid does NOT +// correlate. Causality flows forward — a pod evicted at T-5s cannot +// have been caused by a GPU failure at T. Mirror of pod_evicted's +// future-transition exclusion. +func TestXidCorrelationDetector_PreXidEvictionExcluded(t *testing.T) { + t.Parallel() + + xidAt := time.Unix(1_700_000_000, 0).UTC() + evictAt := xidAt.Add(-5 * time.Second) // 5s BEFORE Xid + xids := []patterns.XidRecord{ + {Code: 79, Node: "gpu-node-0001", Timestamp: xidAt, Detail: "NVRM: Xid 79"}, + } + events := []patterns.Record{ + { + EventUID: "evict-pre", Reason: "Evicted", Hint: patterns.HintPodEvicted, + ReportingInstance: "gpu-node-0001", EventTime: evictAt, + Regarding: patterns.ObjectRef{Kind: "Pod", Namespace: "t", Name: "p"}, + }, + } + verdicts := patterns.XidCorrelationDetector{}.Evaluate(xids, events) + require.Empty(t, verdicts, "eviction before Xid MUST NOT correlate — causality flows forward") +} + +// TestXidCorrelationDetector_NonEvictedHintIgnored pins the +// hint-discriminator contract: only events with HintPodEvicted are +// candidates for correlation. Killing / Preempted / FailedScheduling +// events on the same node within the window do NOT correlate. +func TestXidCorrelationDetector_NonEvictedHintIgnored(t *testing.T) { + t.Parallel() + + xidAt := time.Unix(1_700_000_000, 0).UTC() + xids := []patterns.XidRecord{ + {Code: 79, Node: "gpu-node-0001", Timestamp: xidAt, Detail: "NVRM: Xid 79"}, + } + events := []patterns.Record{ + { + EventUID: "k1", Reason: "Killing", Hint: "", + ReportingInstance: "gpu-node-0001", EventTime: xidAt.Add(5 * time.Second), + Regarding: patterns.ObjectRef{Kind: "Pod", Name: "p1"}, + }, + { + EventUID: "f1", Reason: "FailedScheduling", Hint: patterns.HintScheduleFailure, + ReportingInstance: "gpu-node-0001", EventTime: xidAt.Add(10 * time.Second), + Regarding: patterns.ObjectRef{Kind: "Pod", Name: "p2"}, + }, + } + verdicts := patterns.XidCorrelationDetector{}.Evaluate(xids, events) + require.Empty(t, verdicts, "non-evicted hints must not correlate") +} + +// TestXidCorrelationDetector_DeterministicOrder pins the golden-test +// stability contract: output sorted by (eviction EventTime ascending, +// EventUID ascending) so replay runs are reproducible. +func TestXidCorrelationDetector_DeterministicOrder(t *testing.T) { + t.Parallel() + + xidAt := time.Unix(1_700_000_000, 0).UTC() + xids := []patterns.XidRecord{ + {Code: 79, Node: "node-1", Timestamp: xidAt, Detail: "x"}, + {Code: 48, Node: "node-2", Timestamp: xidAt.Add(time.Second), Detail: "x"}, + } + mk := func(uid, ns, name, node string, dt time.Duration) patterns.Record { + return patterns.Record{ + EventUID: uid, Reason: "Evicted", Hint: patterns.HintPodEvicted, + ReportingInstance: node, EventTime: xidAt.Add(dt), + Regarding: patterns.ObjectRef{Kind: "Pod", Namespace: ns, Name: name}, + } + } + // Intentionally out-of-order on input. + events := []patterns.Record{ + mk("c", "t", "pod-c", "node-1", 15*time.Second), + mk("a", "t", "pod-a", "node-1", 5*time.Second), + mk("b", "t", "pod-b", "node-2", 10*time.Second), + } + + verdicts := patterns.XidCorrelationDetector{}.Evaluate(xids, events) + require.Len(t, verdicts, 3) + require.Equal(t, "t/pod-a", verdicts[0].EvictedPod) + require.Equal(t, "t/pod-b", verdicts[1].EvictedPod) + require.Equal(t, "t/pod-c", verdicts[2].EvictedPod) +} + +// TestXidCorrelationDetector_MostRecentXidWins pins the +// multiple-Xid-then-eviction contract: when several Xids land on the +// same node in succession, the verdict cites the MOST RECENT Xid as +// the trigger — that's the proximate cause, even if earlier Xids on +// the same node also fall inside the window. +func TestXidCorrelationDetector_MostRecentXidWins(t *testing.T) { + t.Parallel() + + t0 := time.Unix(1_700_000_000, 0).UTC() + xids := []patterns.XidRecord{ + {Code: 48, Node: "n", Timestamp: t0, Detail: "Xid 48"}, + {Code: 79, Node: "n", Timestamp: t0.Add(20 * time.Second), Detail: "Xid 79"}, + } + events := []patterns.Record{ + { + EventUID: "evict-1", Reason: "Evicted", Hint: patterns.HintPodEvicted, + ReportingInstance: "n", EventTime: t0.Add(30 * time.Second), + Regarding: patterns.ObjectRef{Kind: "Pod", Namespace: "t", Name: "p"}, + }, + } + verdicts := patterns.XidCorrelationDetector{}.Evaluate(xids, events) + require.Len(t, verdicts, 1) + require.Equal(t, 79, verdicts[0].XidCode, "the most recent Xid is the proximate cause") +} + +// TestXidCorrelationVerdict_SchemaConformance pins the +// XidCorrelationVerdict JSON shape against +// testdata/xid_correlation_verdict.schema.json — a struct drift or +// schema loosening fails this test before it ships. +func TestXidCorrelationVerdict_SchemaConformance(t *testing.T) { + t.Parallel() + + schemaPath := filepath.Join("testdata", "xid_correlation_verdict.schema.json") + schemaBytes, err := os.ReadFile(schemaPath) //nolint:gosec // schemaPath is a test-local relative path + require.NoError(t, err) + + compiler := jsonschema.NewCompiler() + var schemaDoc any + require.NoError(t, json.Unmarshal(schemaBytes, &schemaDoc)) + require.NoError(t, compiler.AddResource(schemaPath, schemaDoc)) + schema, err := compiler.Compile(schemaPath) + require.NoError(t, err) + + xidAt := time.Unix(1_700_000_000, 0).UTC() + xids := []patterns.XidRecord{ + {Code: 79, Node: "gpu-node-0001", Timestamp: xidAt, Detail: "NVRM: Xid 79"}, + } + events := []patterns.Record{ + { + EventUID: "evict-1", Reason: "Evicted", Hint: patterns.HintPodEvicted, + ReportingInstance: "gpu-node-0001", EventTime: xidAt.Add(10 * time.Second), + Regarding: patterns.ObjectRef{Kind: "Pod", Namespace: "training", Name: "job-rank-3"}, + }, + } + verdicts := patterns.XidCorrelationDetector{}.Evaluate(xids, events) + require.Len(t, verdicts, 1) + + bs, err := json.Marshal(verdicts[0]) + require.NoError(t, err) + var decoded any + require.NoError(t, json.Unmarshal(bs, &decoded)) + require.NoError(t, schema.Validate(decoded), + "XidCorrelationVerdict shape failed schema validation; struct drifted or schema needs updating") +} + +// TestXidCorrelationVerdict_SchemaRejectsDrift is the drift-rejection +// battery. Each row is a falsifier for one schema constraint; +// removing the constraint flips the row to PASS. +func TestXidCorrelationVerdict_SchemaRejectsDrift(t *testing.T) { + t.Parallel() + + schemaPath := filepath.Join("testdata", "xid_correlation_verdict.schema.json") + schemaBytes, err := os.ReadFile(schemaPath) //nolint:gosec // schemaPath is a test-local relative path + require.NoError(t, err) + + compiler := jsonschema.NewCompiler() + var schemaDoc any + require.NoError(t, json.Unmarshal(schemaBytes, &schemaDoc)) + require.NoError(t, compiler.AddResource(schemaPath, schemaDoc)) + schema, err := compiler.Compile(schemaPath) + require.NoError(t, err) + + validEvidence := []any{ + map[string]any{"kind": "kernel_event", "uid": "u1", "timestamp": "2026-05-18T10:00:00Z", "description": "d"}, + map[string]any{"kind": "pod_event", "uid": "u2", "timestamp": "2026-05-18T10:00:10Z", "description": "d"}, + } + base := func() map[string]any { + return map[string]any{ + "pattern.id": "16", + "headline": "x", + "remediation": "y", + "xid_code": 79, + "node": "gpu-node-0001", + "evicted_pod": "training/job-rank-3", + "evidence_trail": validEvidence, + } + } + cases := []struct { + name string + mutate func(map[string]any) + guardName string + }{ + {"extra_top_level_field", func(m map[string]any) { m["future_field"] = "rejected" }, "additionalProperties:false"}, + {"confidence_reintroduced", func(m map[string]any) { m["confidence"] = "full" }, "additionalProperties:false rejects re-add"}, + {"missing_layers_reintroduced", func(m map[string]any) { m["missing_layers"] = []any{"node_condition"} }, "additionalProperties:false rejects re-add"}, + {"pattern_id_numeric_not_string", func(m map[string]any) { m["pattern.id"] = 16 }, "pattern.id string const"}, + {"xid_code_string_not_int", func(m map[string]any) { m["xid_code"] = "79" }, "xid_code integer"}, + {"evidence_kind_outside_enum", func(m map[string]any) { + m["evidence_trail"] = []any{ + map[string]any{"kind": "nccl_fr", "uid": "u", "timestamp": "2026-05-18T10:00:00Z", "description": "d"}, + map[string]any{"kind": "nccl_fr", "uid": "u2", "timestamp": "2026-05-18T10:00:00Z", "description": "d"}, + } + }, "evidence_trail.kind enum"}, + {"evidence_trail_under_min", func(m map[string]any) { + m["evidence_trail"] = []any{validEvidence[0]} + }, "evidence_trail minItems:2"}, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + m := base() + tc.mutate(m) + require.Error(t, schema.Validate(m), + "schema must reject %s; guard %q regressed", tc.name, tc.guardName) + }) + } +} diff --git a/module/processor/patterndetectorprocessor/config.go b/module/processor/patterndetectorprocessor/config.go index 3b44e8c3..2a4f2867 100644 --- a/module/processor/patterndetectorprocessor/config.go +++ b/module/processor/patterndetectorprocessor/config.go @@ -17,6 +17,12 @@ const DefaultJoinWindow = 30 * time.Second // hung. Operators override at the YAML level via nccl_hang_threshold. const DefaultNCCLHangThreshold = 5 * time.Minute +// DefaultXidCorrelationWindow mirrors +// patterns.DefaultXidCorrelationWindow — the max (eviction_time - +// xid_time) for a same-node Xid → eviction pair to correlate. +// Operators override via xid_correlation_window. +const DefaultXidCorrelationWindow = 60 * time.Second + // Config is the operator-facing YAML for the patterndetector processor. type Config struct { // JoinWindow is the max gap between a node-pressure condition's @@ -38,6 +44,12 @@ type Config struct { // DefaultNCCLHangThreshold. Floor 1s — sub-1s thresholds chase // normal long-tail collectives and produce noise. NCCLHangThreshold time.Duration `yaml:"nccl_hang_threshold,omitempty" mapstructure:"nccl_hang_threshold"` + + // XidCorrelationWindow is the max (eviction_time - xid_time) + // for a same-node Xid → eviction pair to correlate. Zero means + // use DefaultXidCorrelationWindow. Floor 1s — sub-1s windows + // almost never see a kubelet-driven eviction. + XidCorrelationWindow time.Duration `yaml:"xid_correlation_window,omitempty" mapstructure:"xid_correlation_window"` } // Validate enforces operator-actionable rules. @@ -48,15 +60,19 @@ func (c *Config) Validate() error { if c.NCCLHangThreshold != 0 && c.NCCLHangThreshold < time.Second { return fmt.Errorf("nccl_hang_threshold: must be >= 1s, got %s", c.NCCLHangThreshold) } + if c.XidCorrelationWindow != 0 && c.XidCorrelationWindow < time.Second { + return fmt.Errorf("xid_correlation_window: must be >= 1s, got %s", c.XidCorrelationWindow) + } return nil } func defaultConfig() *Config { t := true return &Config{ - JoinWindow: DefaultJoinWindow, - EmitPartialVerdicts: &t, - NCCLHangThreshold: DefaultNCCLHangThreshold, + JoinWindow: DefaultJoinWindow, + EmitPartialVerdicts: &t, + NCCLHangThreshold: DefaultNCCLHangThreshold, + XidCorrelationWindow: DefaultXidCorrelationWindow, } } @@ -72,6 +88,9 @@ func (c *Config) withDefaults() *Config { if out.NCCLHangThreshold == 0 { out.NCCLHangThreshold = DefaultNCCLHangThreshold } + if out.XidCorrelationWindow == 0 { + out.XidCorrelationWindow = DefaultXidCorrelationWindow + } return &out } diff --git a/module/processor/patterndetectorprocessor/example_config.yaml b/module/processor/patterndetectorprocessor/example_config.yaml index 690274b4..6883c4df 100644 --- a/module/processor/patterndetectorprocessor/example_config.yaml +++ b/module/processor/patterndetectorprocessor/example_config.yaml @@ -21,6 +21,8 @@ processors: patterndetector: join_window: 30s emit_partial_verdicts: true + nccl_hang_threshold: 5m + xid_correlation_window: 60s exporters: debug: diff --git a/module/processor/patterndetectorprocessor/patterndetector.go b/module/processor/patterndetectorprocessor/patterndetector.go index d680f032..e4ef3ce9 100644 --- a/module/processor/patterndetectorprocessor/patterndetector.go +++ b/module/processor/patterndetectorprocessor/patterndetector.go @@ -92,7 +92,7 @@ func (p *patterndetectorProcessor) Capabilities() consumer.Capabilities { // typed model, runs the configured detectors, and appends one verdict // record per match before forwarding downstream. func (p *patterndetectorProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error { - events, nodeConds, ncclRecs := collectInputs(ld) + events, nodeConds, ncclRecs, xidRecs := collectInputs(ld) det := patterns.PodEvictedDetector{JoinWindow: p.cfg.JoinWindow} verdicts := det.Evaluate(events, nodeConds) @@ -108,6 +108,11 @@ func (p *patterndetectorProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs appendNCCLHangVerdict(ld, v, p.logger()) } + xidDet := patterns.XidCorrelationDetector{CorrelationWindow: p.cfg.XidCorrelationWindow} + for _, v := range xidDet.Evaluate(xidRecs, events) { + appendXidCorrelationVerdict(ld, v, p.logger()) + } + if err := p.next.ConsumeLogs(ctx, ld); err != nil { return fmt.Errorf("patterndetector: next.ConsumeLogs: %w", err) } @@ -115,10 +120,11 @@ func (p *patterndetectorProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs } // collectInputs walks the incoming plog.Logs and projects each log -// record into one of three typed shapes: patterns.Record (pod event), -// patterns.NodeRecord (node-condition transition), or -// patterns.NCCLFRRecord (NCCL FlightRecorder entry). Records that -// look like none are skipped — the processor is a NO-OP for them. +// record into one of four typed shapes: patterns.Record (pod event), +// patterns.NodeRecord (node-condition transition), +// patterns.NCCLFRRecord (NCCL FlightRecorder entry), or +// patterns.XidRecord (GPU Xid kernel event). Records that look like +// none are skipped — the processor is a NO-OP for them. // // Classification is structural: // - pod events carry `k8s.event.hint` (any value); the projection @@ -127,15 +133,19 @@ func (p *patterndetectorProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs // `k8s.node.condition.pressure` (no hint required). // - NCCL FR records carry `nccl.fr.collective_seq_id` AND a rank // attribute (gen_ai.training.rank / nccl.rank / nccl.fr.rank). +// - Xid records carry the customer-stable `kernelevents.xid` +// attribute (per RFC-0013 §3 and the journald-kernel recipe); +// node is read from resource `k8s.node.name`. // // Records that match multiple classes are projected as the first one -// matched (pod_event > node_condition > nccl_fr) — the priority -// reflects that a malformed upstream record is rarer than a real -// classification overlap. -func collectInputs(ld plog.Logs) ([]patterns.Record, []patterns.NodeRecord, []patterns.NCCLFRRecord) { +// matched (pod_event > node_condition > nccl_fr > xid) — the +// priority reflects that a malformed upstream record is rarer than a +// real classification overlap. +func collectInputs(ld plog.Logs) ([]patterns.Record, []patterns.NodeRecord, []patterns.NCCLFRRecord, []patterns.XidRecord) { var events []patterns.Record var nodes []patterns.NodeRecord var nccl []patterns.NCCLFRRecord + var xids []patterns.XidRecord for i := 0; i < ld.ResourceLogs().Len(); i++ { rl := ld.ResourceLogs().At(i) resAttrs := rl.Resource().Attributes() @@ -153,11 +163,50 @@ func collectInputs(ld plog.Logs) ([]patterns.Record, []patterns.NodeRecord, []pa } if rec, ok := projectNCCLFRRecord(lr); ok { nccl = append(nccl, rec) + continue + } + if rec, ok := projectXidRecord(lr, resAttrs); ok { + xids = append(xids, rec) } } } } - return events, nodes, nccl + return events, nodes, nccl, xids +} + +// projectXidRecord reads OTel attributes off a log record and builds +// a patterns.XidRecord. The projection's gate is the customer-stable +// `kernelevents.xid` attribute (RFC-0013 §3, emitted by the OTTL +// transform in the journald-kernel recipe). Node is read from the +// enclosing ResourceLogs's `k8s.node.name` (stamped by +// k8sattributes on a DaemonSet); a record without a node is dropped +// because the correlation pattern requires same-node attribution. +func projectXidRecord(lr plog.LogRecord, resAttrs pcommon.Map) (patterns.XidRecord, bool) { + xid, ok := lr.Attributes().Get("kernelevents.xid") + if !ok { + return patterns.XidRecord{}, false + } + node, ok := resAttrs.Get("k8s.node.name") + if !ok { + if v, lrOK := lr.Attributes().Get("k8s.node.name"); lrOK { + node = v + ok = true + } + } + if !ok { + return patterns.XidRecord{}, false + } + r := patterns.XidRecord{ + Code: int(xid.Int()), + Node: node.AsString(), + Detail: lr.Body().AsString(), + } + if t := lr.Timestamp(); t != 0 { + r.Timestamp = t.AsTime() + } else { + r.Timestamp = lr.ObservedTimestamp().AsTime() + } + return r, true } // projectNCCLFRRecord reads OTel attributes off a log record and @@ -380,3 +429,38 @@ func ncclHangEvidenceTimestamp(v patterns.NCCLHangVerdict) time.Time { } return time.Time{} } + +// appendXidCorrelationVerdict adds a verdict log record for an +// xid_correlation match. Mirrors appendVerdict's wire-format contract +// — broken-out scalar attrs plus full pattern.verdict_json — so +// downstream consumers don't branch on pattern.id to find +// headline/remediation. +func appendXidCorrelationVerdict(ld plog.Logs, v patterns.XidCorrelationVerdict, logger *zap.Logger) { + rl := ld.ResourceLogs().AppendEmpty() + sl := rl.ScopeLogs().AppendEmpty() + sl.Scope().SetName(instrumentationScope) + lr := sl.LogRecords().AppendEmpty() + lr.Body().SetStr(v.Headline) + now := time.Now() + lr.SetObservedTimestamp(pcommon.NewTimestampFromTime(now)) + if t := xidCorrelationEvidenceTimestamp(v); !t.IsZero() { + lr.SetTimestamp(pcommon.NewTimestampFromTime(t)) + } else { + lr.SetTimestamp(pcommon.NewTimestampFromTime(now)) + } + lr.Attributes().PutStr(VerdictAttrPatternID, v.PatternID) + lr.Attributes().PutStr(VerdictAttrHeadline, v.Headline) + lr.Attributes().PutStr(VerdictAttrRemediation, v.Remediation) + if b, err := json.Marshal(v); err == nil { + lr.Attributes().PutStr(VerdictAttrVerdictJSON, string(b)) + } else if logger != nil { + logger.Warn("patterndetector: failed to marshal xid_correlation verdict JSON; broken-out attrs still emit", zap.Error(err)) + } +} + +func xidCorrelationEvidenceTimestamp(v patterns.XidCorrelationVerdict) time.Time { + if len(v.EvidenceTrail) > 0 { + return v.EvidenceTrail[0].Timestamp + } + return time.Time{} +} diff --git a/module/processor/patterndetectorprocessor/patterndetector_test.go b/module/processor/patterndetectorprocessor/patterndetector_test.go index 2c8429b5..e086b8e1 100644 --- a/module/processor/patterndetectorprocessor/patterndetector_test.go +++ b/module/processor/patterndetectorprocessor/patterndetector_test.go @@ -141,10 +141,11 @@ func TestConfig_Validate(t *testing.T) { cfg Config wantErr bool }{ - "defaults": {cfg: *defaultConfig()}, - "zero-window": {cfg: Config{}}, - "floor-window": {cfg: Config{JoinWindow: time.Second}}, - "sub-1s-window": {cfg: Config{JoinWindow: 500 * time.Millisecond}, wantErr: true}, + "defaults": {cfg: *defaultConfig()}, + "zero-window": {cfg: Config{}}, + "floor-window": {cfg: Config{JoinWindow: time.Second}}, + "sub-1s-window": {cfg: Config{JoinWindow: 500 * time.Millisecond}, wantErr: true}, + "sub-1s-xid-window": {cfg: Config{XidCorrelationWindow: 500 * time.Millisecond}, wantErr: true}, } for name, tc := range cases { tc := tc @@ -164,6 +165,8 @@ func TestFactory_Surface(t *testing.T) { require.Equal(t, componentType(), f.Type()) cfg := f.CreateDefaultConfig().(*Config) require.Equal(t, DefaultJoinWindow, cfg.JoinWindow) + require.Equal(t, DefaultNCCLHangThreshold, cfg.NCCLHangThreshold) + require.Equal(t, DefaultXidCorrelationWindow, cfg.XidCorrelationWindow) require.True(t, cfg.emitPartialEnabled()) } diff --git a/module/processor/patterndetectorprocessor/xid_correlation_test.go b/module/processor/patterndetectorprocessor/xid_correlation_test.go new file mode 100644 index 00000000..604c2bd3 --- /dev/null +++ b/module/processor/patterndetectorprocessor/xid_correlation_test.go @@ -0,0 +1,180 @@ +// SPDX-License-Identifier: Apache-2.0 + +package patterndetectorprocessor + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + + "github.com/tracecoreai/tracecore/module/pkg/patterns" +) + +// TestPatternDetector_XidCorrelationWiringEmitsVerdict pins the +// wiring contract: when a Xid log record (carrying kernelevents.xid) +// arrives in the same plog.Logs batch as a Pod-Evicted event on the +// same node within the correlation window, the processor emits one +// xid_correlation verdict alongside any pod_evicted/nccl_hang +// verdicts. +func TestPatternDetector_XidCorrelationWiringEmitsVerdict(t *testing.T) { + t.Parallel() + + xidAt := mustParseTime(t, "2026-05-31T10:00:00Z") + evictAt := xidAt.Add(10 * time.Second) + ld := plog.NewLogs() + + // Xid kernel-event log record: resource carries k8s.node.name + // (stamped by k8sattributes on the DaemonSet); attrs carry the + // customer-stable kernelevents.xid. + xidRL := ld.ResourceLogs().AppendEmpty() + xidRL.Resource().Attributes().PutStr("k8s.node.name", "gpu-node-0001") + xidSL := xidRL.ScopeLogs().AppendEmpty() + xidLR := xidSL.LogRecords().AppendEmpty() + xidLR.SetTimestamp(pcommon.NewTimestampFromTime(xidAt)) + xidLR.Body().SetStr("NVRM: Xid (PCI:0000:3b:00): 79, GPU has fallen off the bus.") + xidLR.Attributes().PutInt("kernelevents.xid", 79) + + // Pod eviction event. + evRL := ld.ResourceLogs().AppendEmpty() + evRL.Resource().Attributes().PutStr("k8s.namespace.name", "training") + evRL.Resource().Attributes().PutStr("k8s.pod.name", "job-rank-3") + evSL := evRL.ScopeLogs().AppendEmpty() + evLR := evSL.LogRecords().AppendEmpty() + evLR.SetTimestamp(pcommon.NewTimestampFromTime(evictAt)) + ev := evLR.Attributes() + ev.PutStr("k8s.event.uid", "evict-1") + ev.PutStr("k8s.event.hint", string(patterns.HintPodEvicted)) + ev.PutStr("k8s.event.reason", "Evicted") + ev.PutStr("k8s.event.reporting_instance", "gpu-node-0001") + ev.PutStr("k8s.regarding.kind", "Pod") + ev.PutStr("k8s.regarding.namespace", "training") + ev.PutStr("k8s.regarding.name", "job-rank-3") + + sink := newLogsSink() + p := newProcessor(testSettings(), defaultConfig(), sink) + require.NoError(t, p.Start(context.Background(), componenttestHost{})) + t.Cleanup(func() { _ = p.Shutdown(context.Background()) }) + + require.NoError(t, p.ConsumeLogs(context.Background(), ld)) + + verdicts := extractXidCorrelationVerdicts(t, sink.at(0)) + require.Len(t, verdicts, 1) + v := verdicts[0] + require.Equal(t, patterns.PatternIDXidCorrelation, v.PatternID) + require.Equal(t, 79, v.XidCode) + require.Equal(t, "gpu-node-0001", v.Node) + require.Equal(t, "training/job-rank-3", v.EvictedPod) +} + +// TestPatternDetector_XidCorrelationWiringHealthy pins the negative +// path: an Xid kernel event with NO downstream eviction on the same +// node MUST NOT emit a verdict. +func TestPatternDetector_XidCorrelationWiringHealthy(t *testing.T) { + t.Parallel() + + xidAt := mustParseTime(t, "2026-05-31T10:00:00Z") + ld := plog.NewLogs() + xidRL := ld.ResourceLogs().AppendEmpty() + xidRL.Resource().Attributes().PutStr("k8s.node.name", "gpu-node-0001") + xidSL := xidRL.ScopeLogs().AppendEmpty() + xidLR := xidSL.LogRecords().AppendEmpty() + xidLR.SetTimestamp(pcommon.NewTimestampFromTime(xidAt)) + xidLR.Body().SetStr("NVRM: Xid 79") + xidLR.Attributes().PutInt("kernelevents.xid", 79) + + sink := newLogsSink() + p := newProcessor(testSettings(), defaultConfig(), sink) + require.NoError(t, p.Start(context.Background(), componenttestHost{})) + t.Cleanup(func() { _ = p.Shutdown(context.Background()) }) + + require.NoError(t, p.ConsumeLogs(context.Background(), ld)) + require.Empty(t, extractXidCorrelationVerdicts(t, sink.at(0))) +} + +// TestPatternDetector_XidCorrelationWindowConfigurable asserts the +// processor surfaces the detector's CorrelationWindow via YAML +// config. +func TestPatternDetector_XidCorrelationWindowConfigurable(t *testing.T) { + t.Parallel() + + xidAt := mustParseTime(t, "2026-05-31T10:00:00Z") + evictAt := xidAt.Add(90 * time.Second) // over default 60s + + build := func() plog.Logs { + ld := plog.NewLogs() + xidRL := ld.ResourceLogs().AppendEmpty() + xidRL.Resource().Attributes().PutStr("k8s.node.name", "n") + xidSL := xidRL.ScopeLogs().AppendEmpty() + xidLR := xidSL.LogRecords().AppendEmpty() + xidLR.SetTimestamp(pcommon.NewTimestampFromTime(xidAt)) + xidLR.Body().SetStr("NVRM: Xid 79") + xidLR.Attributes().PutInt("kernelevents.xid", 79) + + evRL := ld.ResourceLogs().AppendEmpty() + evSL := evRL.ScopeLogs().AppendEmpty() + evLR := evSL.LogRecords().AppendEmpty() + evLR.SetTimestamp(pcommon.NewTimestampFromTime(evictAt)) + a := evLR.Attributes() + a.PutStr("k8s.event.uid", "evict-late") + a.PutStr("k8s.event.hint", string(patterns.HintPodEvicted)) + a.PutStr("k8s.event.reporting_instance", "n") + a.PutStr("k8s.regarding.kind", "Pod") + a.PutStr("k8s.regarding.namespace", "t") + a.PutStr("k8s.regarding.name", "p") + return ld + } + + // Default window (60s): no verdict. + sink1 := newLogsSink() + p1 := newProcessor(testSettings(), defaultConfig(), sink1) + require.NoError(t, p1.Start(context.Background(), componenttestHost{})) + t.Cleanup(func() { _ = p1.Shutdown(context.Background()) }) + require.NoError(t, p1.ConsumeLogs(context.Background(), build())) + require.Empty(t, extractXidCorrelationVerdicts(t, sink1.at(0))) + + // Loosened window (2min): correlation fires. + sink2 := newLogsSink() + cfg := &Config{XidCorrelationWindow: 2 * time.Minute} + p2 := newProcessor(testSettings(), cfg, sink2) + require.NoError(t, p2.Start(context.Background(), componenttestHost{})) + t.Cleanup(func() { _ = p2.Shutdown(context.Background()) }) + require.NoError(t, p2.ConsumeLogs(context.Background(), build())) + require.Len(t, extractXidCorrelationVerdicts(t, sink2.at(0)), 1) +} + +// extractXidCorrelationVerdicts walks the output plog.Logs and +// decodes the xid_correlation verdict JSON attribute on each +// verdict-shaped log record. +func extractXidCorrelationVerdicts(t *testing.T, ld plog.Logs) []patterns.XidCorrelationVerdict { + t.Helper() + out := []patterns.XidCorrelationVerdict{} + for i := 0; i < ld.ResourceLogs().Len(); i++ { + rl := ld.ResourceLogs().At(i) + for j := 0; j < rl.ScopeLogs().Len(); j++ { + sl := rl.ScopeLogs().At(j) + if sl.Scope().Name() != instrumentationScope { + continue + } + for k := 0; k < sl.LogRecords().Len(); k++ { + lr := sl.LogRecords().At(k) + patternID, ok := lr.Attributes().Get(VerdictAttrPatternID) + if !ok || patternID.AsString() != patterns.PatternIDXidCorrelation { + continue + } + js, ok := lr.Attributes().Get(VerdictAttrVerdictJSON) + if !ok { + continue + } + var v patterns.XidCorrelationVerdict + require.NoError(t, json.Unmarshal([]byte(js.AsString()), &v)) + out = append(out, v) + } + } + } + return out +}