diff --git a/internal/integration/doc.go b/internal/integration/doc.go deleted file mode 100644 index 1f81c96b..00000000 --- a/internal/integration/doc.go +++ /dev/null @@ -1,7 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -// Package integration holds end-to-end tests against the OCB-generated -// tracecore binary. Tests boot ./_build/tracecore against minimal configs -// and assert on operator-visible scrape output. Skipped if _build/tracecore -// is missing; run `make build` first. -package integration diff --git a/module/pkg/patterns/doc.go b/module/pkg/patterns/doc.go deleted file mode 100644 index 3a9e99be..00000000 --- a/module/pkg/patterns/doc.go +++ /dev/null @@ -1,17 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -// Package patterns implements tracecore's failure-pattern detectors. -// -// Each detector consumes typed records (today: Kubernetes Event -// Records and NodeRecords, sourced via upstream OTel -// k8sobjectsreceiver per RFC-0013 PR-J; M17/M18 will add NCCL -// FlightRecorder, kernel events via journaldreceiver, Kineto, py-spy) -// and emits a Verdict naming the pattern, the implicated rank/node, -// an ordered evidence trail, and a remediation hint. -// -// The package shape is deliberately minimal at v0 — three concrete -// types (Verdict, EvidenceRef, Confidence) plus one detector -// (PodEvicted, pattern #14). The generic Detector interface is -// deferred until a second detector lands; per PRINCIPLES §3 we wait -// for the rule of three before extracting an abstraction. -package patterns diff --git a/module/pkg/patterns/verdict.go b/module/pkg/patterns/verdict.go index 448b8e1a..6a3b5736 100644 --- a/module/pkg/patterns/verdict.go +++ b/module/pkg/patterns/verdict.go @@ -1,5 +1,19 @@ // SPDX-License-Identifier: Apache-2.0 +// Package patterns implements tracecore's failure-pattern detectors. +// +// Each detector consumes typed records (today: Kubernetes Event +// Records and NodeRecords, sourced via upstream OTel +// k8sobjectsreceiver per RFC-0013 PR-J; M17/M18 will add NCCL +// FlightRecorder, kernel events via journaldreceiver, Kineto, py-spy) +// and emits a Verdict naming the pattern, the implicated rank/node, +// an ordered evidence trail, and a remediation hint. +// +// The package shape is deliberately minimal at v0 — three concrete +// types (Verdict, EvidenceRef, Confidence) plus one detector +// (PodEvicted, pattern #14). The generic Detector interface is +// deferred until a second detector lands; per PRINCIPLES §3 we wait +// for the rule of three before extracting an abstraction. package patterns import "time" diff --git a/module/processor/patterndetectorprocessor/doc.go b/module/processor/patterndetectorprocessor/doc.go deleted file mode 100644 index dfbd0853..00000000 --- a/module/processor/patterndetectorprocessor/doc.go +++ /dev/null @@ -1,17 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -// Package patterndetectorprocessor is the OTel processor surface that -// wraps the pattern-detection library at module/pkg/patterns/. It -// receives Kubernetes Event + node-condition log records, projects -// them into the pattern library's typed model, runs the configured -// detectors, and emits one verdict log record per matched pattern. -// -// Today's detector set: pod-evicted (M19, pattern #14). M17/M18 + -// future detectors slot in here as additional cases in the dispatch -// switch; the processor surface is signal-agnostic over which -// detectors run. -// -// Cross-link: the detector library lives at module/pkg/patterns/; -// the hermetic replay corpus that anchors every detector's golden -// tests lives at module/pkg/replay/. -package patterndetectorprocessor diff --git a/module/processor/patterndetectorprocessor/factory.go b/module/processor/patterndetectorprocessor/factory.go deleted file mode 100644 index 78d3bbe6..00000000 --- a/module/processor/patterndetectorprocessor/factory.go +++ /dev/null @@ -1,42 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package patterndetectorprocessor - -import ( - "context" - "fmt" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/processor" -) - -// componentType is the kind name registered in components.yaml. -func componentType() component.Type { return component.MustNewType("patterndetector") } - -// stability is the OCB-surfaced stability level. Alpha — the verdict -// wire format and pattern set are pinned by tests but may grow. -const stability = component.StabilityLevelAlpha - -// NewFactory returns the upstream processor.Factory for patterndetector. -// Only logs are wired today; the detector set operates on log records. -func NewFactory() processor.Factory { - return processor.NewFactory( - componentType(), - createDefaultConfig, - processor.WithLogs(createLogs, stability), - ) -} - -func createDefaultConfig() component.Config { return defaultConfig() } - -func createLogs(_ context.Context, set processor.Settings, cfg component.Config, next consumer.Logs) (processor.Logs, error) { - c, ok := cfg.(*Config) - if !ok { - return nil, fmt.Errorf("patterndetector: unexpected config type %T", cfg) - } - if err := c.Validate(); err != nil { - return nil, fmt.Errorf("patterndetector: %w", err) - } - return newProcessor(set, c, next), nil -} diff --git a/module/processor/patterndetectorprocessor/hbm_ecc_test.go b/module/processor/patterndetectorprocessor/hbm_ecc_test.go index a1988419..70b4b1c8 100644 --- a/module/processor/patterndetectorprocessor/hbm_ecc_test.go +++ b/module/processor/patterndetectorprocessor/hbm_ecc_test.go @@ -272,11 +272,11 @@ func extractHBMECCVerdicts(t *testing.T, ld plog.Logs) []patterns.HBMECCVerdict } for k := 0; k < sl.LogRecords().Len(); k++ { lr := sl.LogRecords().At(k) - patternID, ok := lr.Attributes().Get(VerdictAttrPatternID) + patternID, ok := lr.Attributes().Get(verdictAttrPatternID) if !ok || patternID.AsString() != patterns.PatternIDHBMECC { continue } - js, ok := lr.Attributes().Get(VerdictAttrVerdictJSON) + js, ok := lr.Attributes().Get(verdictAttrVerdictJSON) if !ok { continue } diff --git a/module/processor/patterndetectorprocessor/nccl_hang_test.go b/module/processor/patterndetectorprocessor/nccl_hang_test.go index 867ae299..b3351e86 100644 --- a/module/processor/patterndetectorprocessor/nccl_hang_test.go +++ b/module/processor/patterndetectorprocessor/nccl_hang_test.go @@ -136,11 +136,11 @@ func extractNCCLHangVerdicts(t *testing.T, ld plog.Logs) []patterns.NCCLHangVerd } for k := 0; k < sl.LogRecords().Len(); k++ { lr := sl.LogRecords().At(k) - patternID, ok := lr.Attributes().Get(VerdictAttrPatternID) + patternID, ok := lr.Attributes().Get(verdictAttrPatternID) if !ok || patternID.AsString() != patterns.PatternIDNCCLHang { continue } - js, ok := lr.Attributes().Get(VerdictAttrVerdictJSON) + js, ok := lr.Attributes().Get(verdictAttrVerdictJSON) if !ok { continue } diff --git a/module/processor/patterndetectorprocessor/patterndetector.go b/module/processor/patterndetectorprocessor/patterndetector.go index 2a581513..a1a0b7c3 100644 --- a/module/processor/patterndetectorprocessor/patterndetector.go +++ b/module/processor/patterndetectorprocessor/patterndetector.go @@ -1,5 +1,19 @@ // SPDX-License-Identifier: Apache-2.0 +// Package patterndetectorprocessor is the OTel processor surface that +// wraps the pattern-detection library at module/pkg/patterns/. It +// receives Kubernetes Event + node-condition log records, projects +// them into the pattern library's typed model, runs the configured +// detectors, and emits one verdict log record per matched pattern. +// +// Today's detector set: pod-evicted (M19, pattern #14). M17/M18 + +// future detectors slot in here as additional cases in the dispatch +// switch; the processor surface is signal-agnostic over which +// detectors run. +// +// Cross-link: the detector library lives at module/pkg/patterns/; +// the hermetic replay corpus that anchors every detector's golden +// tests lives at module/pkg/replay/. package patterndetectorprocessor import ( @@ -18,30 +32,60 @@ import ( "github.com/tracecoreai/tracecore/module/pkg/patterns" ) +// componentType is the kind name registered in components.yaml. +func componentType() component.Type { return component.MustNewType("patterndetector") } + +// stability is the OCB-surfaced stability level. Alpha — the verdict +// wire format and pattern set are pinned by tests but may grow. +const stability = component.StabilityLevelAlpha + +// NewFactory returns the upstream processor.Factory for patterndetector. +// Only logs are wired today; the detector set operates on log records. +func NewFactory() processor.Factory { + return processor.NewFactory( + componentType(), + createDefaultConfig, + processor.WithLogs(createLogs, stability), + ) +} + +func createDefaultConfig() component.Config { return defaultConfig() } + +func createLogs(_ context.Context, set processor.Settings, cfg component.Config, next consumer.Logs) (processor.Logs, error) { + c, ok := cfg.(*Config) + if !ok { + return nil, fmt.Errorf("patterndetector: unexpected config type %T", cfg) + } + if err := c.Validate(); err != nil { + return nil, fmt.Errorf("patterndetector: %w", err) + } + return newProcessor(set, c, next), nil +} + // instrumentationScope pins the OTel scope name (per OTel convention, // the package's Go import path). const instrumentationScope = "github.com/tracecoreai/tracecore/module/processor/patterndetectorprocessor" -// VerdictAttrPatternID is the canonical pattern.id attribute on an +// verdictAttrPatternID is the canonical pattern.id attribute on an // emitted verdict log record. Mirrors the JSON field name in the // pattern library's PodEvictedVerdict shape. -const VerdictAttrPatternID = "pattern.id" +const verdictAttrPatternID = "pattern.id" -// VerdictAttrConfidence is the canonical confidence attribute on an +// verdictAttrConfidence is the canonical confidence attribute on an // emitted verdict log record. -const VerdictAttrConfidence = "pattern.confidence" +const verdictAttrConfidence = "pattern.confidence" -// VerdictAttrHeadline is the operator-facing one-line summary. -const VerdictAttrHeadline = "pattern.headline" +// verdictAttrHeadline is the operator-facing one-line summary. +const verdictAttrHeadline = "pattern.headline" -// VerdictAttrRemediation is the operator-actionable remediation prose. -const VerdictAttrRemediation = "pattern.remediation" +// verdictAttrRemediation is the operator-actionable remediation prose. +const verdictAttrRemediation = "pattern.remediation" -// VerdictAttrVerdictJSON is the full verdict marshalled to JSON for +// verdictAttrVerdictJSON is the full verdict marshalled to JSON for // downstream consumers that want the whole evidence trail. Carried as // a string-typed attribute so the OTel wire format can ship it // alongside the broken-out scalar fields above. -const VerdictAttrVerdictJSON = "pattern.verdict_json" +const verdictAttrVerdictJSON = "pattern.verdict_json" // Operator-facing scalar attributes promoted onto verdict log // records (issue #270). Dashboards table-aggregate and LogQL queries @@ -51,73 +95,73 @@ const VerdictAttrVerdictJSON = "pattern.verdict_json" // keys so a dashboard query reads the same name on the verdict as // on the underlying input record. const ( - // VerdictAttrK8sPodName is the evicted/affected Pod's + // verdictAttrK8sPodName is the evicted/affected Pod's // metadata.name. Emitted on pod_evicted and xid_correlation // verdicts. - VerdictAttrK8sPodName = "k8s.pod.name" + verdictAttrK8sPodName = "k8s.pod.name" - // VerdictAttrK8sPodNamespace is the evicted/affected Pod's - // metadata.namespace. Emitted alongside VerdictAttrK8sPodName. - VerdictAttrK8sPodNamespace = "k8s.pod.namespace" + // verdictAttrK8sPodNamespace is the evicted/affected Pod's + // metadata.namespace. Emitted alongside verdictAttrK8sPodName. + verdictAttrK8sPodNamespace = "k8s.pod.namespace" - // VerdictAttrK8sNodeName is the node the eviction/Xid/AER was + // verdictAttrK8sNodeName is the node the eviction/Xid/AER was // attributed to. Emitted on pod_evicted, xid_correlation, and // pcie_aer verdicts. - VerdictAttrK8sNodeName = "k8s.node.name" + verdictAttrK8sNodeName = "k8s.node.name" - // VerdictAttrK8sEventReason is the upstream Kubernetes Event + // verdictAttrK8sEventReason is the upstream Kubernetes Event // Reason (e.g. "Evicted"). Emitted on pod_evicted verdicts. - VerdictAttrK8sEventReason = "k8s.event.reason" + verdictAttrK8sEventReason = "k8s.event.reason" - // VerdictAttrNCCLPgID names the NCCL process-group id the hang + // verdictAttrNCCLPgID names the NCCL process-group id the hang // is scoped to. Emitted on nccl_hang verdicts. - VerdictAttrNCCLPgID = "nccl.fr.pg_id" + verdictAttrNCCLPgID = "nccl.fr.pg_id" - // VerdictAttrNCCLCollectiveSeqID names the collective_seq_id the + // verdictAttrNCCLCollectiveSeqID names the collective_seq_id the // hanging ranks are blocked on. Emitted on nccl_hang verdicts. - VerdictAttrNCCLCollectiveSeqID = "nccl.fr.collective_seq_id" + verdictAttrNCCLCollectiveSeqID = "nccl.fr.collective_seq_id" - // VerdictAttrNCCLHangingRanksCount is the cohort size — len of + // verdictAttrNCCLHangingRanksCount is the cohort size — len of // HangingRanks. Promoted as a scalar so dashboards can graph // hang severity without unmarshalling the rank list. - VerdictAttrNCCLHangingRanksCount = "nccl.fr.hanging_ranks_count" + verdictAttrNCCLHangingRanksCount = "nccl.fr.hanging_ranks_count" - // VerdictAttrKernelXid is the NVIDIA driver Xid event code that + // verdictAttrKernelXid is the NVIDIA driver Xid event code that // triggered the correlation. Emitted on xid_correlation // verdicts. - VerdictAttrKernelXid = "kernelevents.xid" + verdictAttrKernelXid = "kernelevents.xid" - // VerdictAttrThermalGPUCount is the cascade size — distinct + // verdictAttrThermalGPUCount is the cascade size — distinct // GPUs on the node that each cleared ThrottleDeltaThreshold. // Promoted as a scalar so dashboards can graph cascade severity // without unmarshalling gpu_ids. Emitted on thermal_throttle // verdicts. - VerdictAttrThermalGPUCount = "hw.gpu.throttle.cascade_size" + verdictAttrThermalGPUCount = "hw.gpu.throttle.cascade_size" - // VerdictAttrPCIeAERGPUID promotes the verdict's PCI BDF onto + // verdictAttrPCIeAERGPUID promotes the verdict's PCI BDF onto // the emitted log record so dashboards can table-aggregate by // GPU without parsing pattern.verdict_json (issue #270 // scalar-promotion contract). Mirrors the customer-stable // `gpu.id` contract from RFC-0013 §3 so a downstream LogQL // query joins this verdict to raw metric/kernel telemetry on // the same attribute name. Emitted on pcie_aer verdicts. - VerdictAttrPCIeAERGPUID = "gpu.id" + verdictAttrPCIeAERGPUID = "gpu.id" - // VerdictAttrPCIeAERSeverity promotes the AER severity + // verdictAttrPCIeAERSeverity promotes the AER severity // ("Corrected" / "Fatal" / "Uncorrected") so the dashboard can // fan out by severity (a "Fatal" verdict needs page-loud // routing; "Corrected" lands in the analytics drawer). - VerdictAttrPCIeAERSeverity = "kernelevents.pcie_aer.severity" + verdictAttrPCIeAERSeverity = "kernelevents.pcie_aer.severity" - // VerdictAttrPCIeAERType promotes the AER type ("Data Link + // verdictAttrPCIeAERType promotes the AER type ("Data Link // Layer" / "Physical Layer" / "Transaction Layer") so operators // can triage Physical Layer (= cable / connector) differently // from Data Link Layer (= switch / lane). - VerdictAttrPCIeAERType = "kernelevents.pcie_aer.type" + verdictAttrPCIeAERType = "kernelevents.pcie_aer.type" - // VerdictAttrPCIeAERDropRatio promotes the drop-ratio scalar so + // verdictAttrPCIeAERDropRatio promotes the drop-ratio scalar so // dashboards can render a histogram without parsing JSON. - VerdictAttrPCIeAERDropRatio = "tracecore.alert.pcie_rate_collapse.drop_ratio" + verdictAttrPCIeAERDropRatio = "tracecore.alert.pcie_rate_collapse.drop_ratio" ) // patterndetectorProcessor implements processor.Logs. Pure data-path @@ -779,12 +823,12 @@ func appendVerdictRecord( lr.SetTimestamp(pcommon.NewTimestampFromTime(now)) } attrs := lr.Attributes() - attrs.PutStr(VerdictAttrPatternID, c.PatternID) - attrs.PutStr(VerdictAttrHeadline, c.Headline) - attrs.PutStr(VerdictAttrRemediation, c.Remediation) + attrs.PutStr(verdictAttrPatternID, c.PatternID) + attrs.PutStr(verdictAttrHeadline, c.Headline) + attrs.PutStr(verdictAttrRemediation, c.Remediation) stampAttrs(attrs) if b, err := json.Marshal(vAny); err == nil { - attrs.PutStr(VerdictAttrVerdictJSON, string(b)) + attrs.PutStr(verdictAttrVerdictJSON, string(b)) } else if logger != nil { logger.Warn("patterndetector: failed to marshal verdict JSON; broken-out attrs still emit", zap.String("pattern", patternKindForLog), zap.Error(err)) @@ -825,11 +869,11 @@ func appendVerdict(ld plog.Logs, v patterns.PodEvictedVerdict, logger *zap.Logge Remediation: v.Remediation, EvidenceTrail: v.EvidenceTrail, }, v, "pod_evicted", func(attrs pcommon.Map) { - attrs.PutStr(VerdictAttrConfidence, string(v.Confidence)) - putStrIfSet(attrs, VerdictAttrK8sPodName, v.PodName) - putStrIfSet(attrs, VerdictAttrK8sPodNamespace, v.PodNamespace) - putStrIfSet(attrs, VerdictAttrK8sNodeName, v.NodeName) - putStrIfSet(attrs, VerdictAttrK8sEventReason, v.EventReason) + attrs.PutStr(verdictAttrConfidence, string(v.Confidence)) + putStrIfSet(attrs, verdictAttrK8sPodName, v.PodName) + putStrIfSet(attrs, verdictAttrK8sPodNamespace, v.PodNamespace) + putStrIfSet(attrs, verdictAttrK8sNodeName, v.NodeName) + putStrIfSet(attrs, verdictAttrK8sEventReason, v.EventReason) }) } @@ -845,9 +889,9 @@ func appendNCCLHangVerdict(ld plog.Logs, v patterns.NCCLHangVerdict, logger *zap Remediation: v.Remediation, EvidenceTrail: v.EvidenceTrail, }, v, "nccl_hang", func(attrs pcommon.Map) { - attrs.PutInt(VerdictAttrNCCLPgID, v.PgID) - attrs.PutInt(VerdictAttrNCCLCollectiveSeqID, v.CollectiveSeqID) - attrs.PutInt(VerdictAttrNCCLHangingRanksCount, int64(len(v.HangingRanks))) + attrs.PutInt(verdictAttrNCCLPgID, v.PgID) + attrs.PutInt(verdictAttrNCCLCollectiveSeqID, v.CollectiveSeqID) + attrs.PutInt(verdictAttrNCCLHangingRanksCount, int64(len(v.HangingRanks))) }) } @@ -860,10 +904,10 @@ func appendXidCorrelationVerdict(ld plog.Logs, v patterns.XidCorrelationVerdict, Remediation: v.Remediation, EvidenceTrail: v.EvidenceTrail, }, v, "xid_correlation", func(attrs pcommon.Map) { - attrs.PutInt(VerdictAttrKernelXid, int64(v.XidCode)) - putStrIfSet(attrs, VerdictAttrK8sNodeName, v.Node) - putStrIfSet(attrs, VerdictAttrK8sPodName, v.PodName) - putStrIfSet(attrs, VerdictAttrK8sPodNamespace, v.PodNamespace) + attrs.PutInt(verdictAttrKernelXid, int64(v.XidCode)) + putStrIfSet(attrs, verdictAttrK8sNodeName, v.Node) + putStrIfSet(attrs, verdictAttrK8sPodName, v.PodName) + putStrIfSet(attrs, verdictAttrK8sPodNamespace, v.PodNamespace) }) } @@ -891,8 +935,8 @@ func appendThermalThrottleVerdict(ld plog.Logs, v patterns.ThermalThrottleVerdic Remediation: v.Remediation, EvidenceTrail: v.EvidenceTrail, }, v, "thermal_throttle", func(attrs pcommon.Map) { - attrs.PutInt(VerdictAttrThermalGPUCount, int64(v.GPUCount)) - putStrIfSet(attrs, VerdictAttrK8sNodeName, v.Node) + attrs.PutInt(verdictAttrThermalGPUCount, int64(v.GPUCount)) + putStrIfSet(attrs, verdictAttrK8sNodeName, v.Node) }) } @@ -908,10 +952,10 @@ func appendPCIeAERVerdict(ld plog.Logs, v patterns.PCIeAERVerdict, logger *zap.L Remediation: v.Remediation, EvidenceTrail: v.EvidenceTrail, }, v, "pcie_aer", func(attrs pcommon.Map) { - putStrIfSet(attrs, VerdictAttrPCIeAERGPUID, v.GPUID) - putStrIfSet(attrs, VerdictAttrPCIeAERSeverity, v.Severity) - putStrIfSet(attrs, VerdictAttrPCIeAERType, v.AERType) - putStrIfSet(attrs, VerdictAttrK8sNodeName, v.Node) - attrs.PutDouble(VerdictAttrPCIeAERDropRatio, v.DropRatio) + putStrIfSet(attrs, verdictAttrPCIeAERGPUID, v.GPUID) + putStrIfSet(attrs, verdictAttrPCIeAERSeverity, v.Severity) + putStrIfSet(attrs, verdictAttrPCIeAERType, v.AERType) + putStrIfSet(attrs, verdictAttrK8sNodeName, v.Node) + attrs.PutDouble(verdictAttrPCIeAERDropRatio, v.DropRatio) }) } diff --git a/module/processor/patterndetectorprocessor/patterndetector_test.go b/module/processor/patterndetectorprocessor/patterndetector_test.go index 447bfe74..99065fe7 100644 --- a/module/processor/patterndetectorprocessor/patterndetector_test.go +++ b/module/processor/patterndetectorprocessor/patterndetector_test.go @@ -302,7 +302,7 @@ func extractVerdicts(t *testing.T, ld plog.Logs) []patterns.PodEvictedVerdict { } for k := 0; k < sl.LogRecords().Len(); k++ { lr := sl.LogRecords().At(k) - js, ok := lr.Attributes().Get(VerdictAttrVerdictJSON) + js, ok := lr.Attributes().Get(verdictAttrVerdictJSON) if !ok { continue } diff --git a/module/processor/patterndetectorprocessor/pcie_aer_test.go b/module/processor/patterndetectorprocessor/pcie_aer_test.go index 93ce0b9c..79fc35a5 100644 --- a/module/processor/patterndetectorprocessor/pcie_aer_test.go +++ b/module/processor/patterndetectorprocessor/pcie_aer_test.go @@ -276,11 +276,11 @@ func extractPCIeAERVerdicts(t *testing.T, ld plog.Logs) []patterns.PCIeAERVerdic } for k := 0; k < sl.LogRecords().Len(); k++ { lr := sl.LogRecords().At(k) - patternID, ok := lr.Attributes().Get(VerdictAttrPatternID) + patternID, ok := lr.Attributes().Get(verdictAttrPatternID) if !ok || patternID.AsString() != patterns.PatternIDPCIeAER { continue } - js, ok := lr.Attributes().Get(VerdictAttrVerdictJSON) + js, ok := lr.Attributes().Get(verdictAttrVerdictJSON) if !ok { continue } @@ -308,7 +308,7 @@ func extractPCIeAERPromotedAttrs(t *testing.T, ld plog.Logs) map[string]any { } for k := 0; k < sl.LogRecords().Len(); k++ { lr := sl.LogRecords().At(k) - patternID, ok := lr.Attributes().Get(VerdictAttrPatternID) + patternID, ok := lr.Attributes().Get(verdictAttrPatternID) if !ok || patternID.AsString() != patterns.PatternIDPCIeAER { continue } diff --git a/module/processor/patterndetectorprocessor/thermal_throttle_test.go b/module/processor/patterndetectorprocessor/thermal_throttle_test.go index c922d439..9ec82558 100644 --- a/module/processor/patterndetectorprocessor/thermal_throttle_test.go +++ b/module/processor/patterndetectorprocessor/thermal_throttle_test.go @@ -211,14 +211,14 @@ func TestPatternDetector_ThermalThrottleWiringPromotesScalarAttrs(t *testing.T) } for k := 0; k < sl.LogRecords().Len(); k++ { lr := sl.LogRecords().At(k) - patternID, ok := lr.Attributes().Get(VerdictAttrPatternID) + patternID, ok := lr.Attributes().Get(verdictAttrPatternID) if !ok || patternID.AsString() != patterns.PatternIDThermalThrottle { continue } - node, ok := lr.Attributes().Get(VerdictAttrK8sNodeName) + node, ok := lr.Attributes().Get(verdictAttrK8sNodeName) require.True(t, ok, "k8s.node.name MUST be promoted onto thermal_throttle verdict") require.Equal(t, "gpu-node-0001", node.AsString()) - cnt, ok := lr.Attributes().Get(VerdictAttrThermalGPUCount) + cnt, ok := lr.Attributes().Get(verdictAttrThermalGPUCount) require.True(t, ok, "cascade size MUST be promoted onto thermal_throttle verdict") require.Equal(t, int64(4), cnt.Int()) found = true @@ -286,11 +286,11 @@ func extractThermalThrottleVerdicts(t *testing.T, ld plog.Logs) []patterns.Therm } for k := 0; k < sl.LogRecords().Len(); k++ { lr := sl.LogRecords().At(k) - patternID, ok := lr.Attributes().Get(VerdictAttrPatternID) + patternID, ok := lr.Attributes().Get(verdictAttrPatternID) if !ok || patternID.AsString() != patterns.PatternIDThermalThrottle { continue } - js, ok := lr.Attributes().Get(VerdictAttrVerdictJSON) + js, ok := lr.Attributes().Get(verdictAttrVerdictJSON) if !ok { continue } diff --git a/module/processor/patterndetectorprocessor/verdict_attrs_test.go b/module/processor/patterndetectorprocessor/verdict_attrs_test.go index 6b1dbb97..e50a178a 100644 --- a/module/processor/patterndetectorprocessor/verdict_attrs_test.go +++ b/module/processor/patterndetectorprocessor/verdict_attrs_test.go @@ -61,13 +61,13 @@ func TestVerdictAttrs_PodEvicted(t *testing.T) { require.NoError(t, p.ConsumeLogs(context.Background(), ld)) lr := findVerdictRecord(t, sink.at(0), patterns.PatternIDPodEvicted) - assertStrAttr(t, lr, VerdictAttrK8sPodName, "job-rank-3") - assertStrAttr(t, lr, VerdictAttrK8sPodNamespace, "training") - assertStrAttr(t, lr, VerdictAttrK8sNodeName, "gpu-node-0007") - assertStrAttr(t, lr, VerdictAttrK8sEventReason, "Evicted") + assertStrAttr(t, lr, verdictAttrK8sPodName, "job-rank-3") + assertStrAttr(t, lr, verdictAttrK8sPodNamespace, "training") + assertStrAttr(t, lr, verdictAttrK8sNodeName, "gpu-node-0007") + assertStrAttr(t, lr, verdictAttrK8sEventReason, "Evicted") // pattern.verdict_json remains alongside the scalars — it's the // canonical full payload for advanced operators (issue #270 AC). - _, ok := lr.Attributes().Get(VerdictAttrVerdictJSON) + _, ok := lr.Attributes().Get(verdictAttrVerdictJSON) require.True(t, ok, "pattern.verdict_json must remain alongside scalar attrs") } @@ -102,10 +102,10 @@ func TestVerdictAttrs_NCCLHang(t *testing.T) { require.NoError(t, p.ConsumeLogs(context.Background(), ld)) lr := findVerdictRecord(t, sink.at(0), patterns.PatternIDNCCLHang) - assertIntAttr(t, lr, VerdictAttrNCCLPgID, 7) - assertIntAttr(t, lr, VerdictAttrNCCLCollectiveSeqID, 123) - assertIntAttr(t, lr, VerdictAttrNCCLHangingRanksCount, 4) - _, ok := lr.Attributes().Get(VerdictAttrVerdictJSON) + assertIntAttr(t, lr, verdictAttrNCCLPgID, 7) + assertIntAttr(t, lr, verdictAttrNCCLCollectiveSeqID, 123) + assertIntAttr(t, lr, verdictAttrNCCLHangingRanksCount, 4) + _, ok := lr.Attributes().Get(verdictAttrVerdictJSON) require.True(t, ok, "pattern.verdict_json must remain alongside scalar attrs") } @@ -149,11 +149,11 @@ func TestVerdictAttrs_XidCorrelation(t *testing.T) { require.NoError(t, p.ConsumeLogs(context.Background(), ld)) lr := findVerdictRecord(t, sink.at(0), patterns.PatternIDXidCorrelation) - assertIntAttr(t, lr, VerdictAttrKernelXid, 79) - assertStrAttr(t, lr, VerdictAttrK8sNodeName, "gpu-node-0042") - assertStrAttr(t, lr, VerdictAttrK8sPodName, "job-rank-7") - assertStrAttr(t, lr, VerdictAttrK8sPodNamespace, "training") - _, ok := lr.Attributes().Get(VerdictAttrVerdictJSON) + assertIntAttr(t, lr, verdictAttrKernelXid, 79) + assertStrAttr(t, lr, verdictAttrK8sNodeName, "gpu-node-0042") + assertStrAttr(t, lr, verdictAttrK8sPodName, "job-rank-7") + assertStrAttr(t, lr, verdictAttrK8sPodNamespace, "training") + _, ok := lr.Attributes().Get(verdictAttrVerdictJSON) require.True(t, ok, "pattern.verdict_json must remain alongside scalar attrs") } @@ -171,7 +171,7 @@ func findVerdictRecord(t *testing.T, ld plog.Logs, patternID string) plog.LogRec } for k := 0; k < sl.LogRecords().Len(); k++ { lr := sl.LogRecords().At(k) - id, ok := lr.Attributes().Get(VerdictAttrPatternID) + id, ok := lr.Attributes().Get(verdictAttrPatternID) if ok && id.AsString() == patternID { return lr } diff --git a/module/processor/patterndetectorprocessor/xid_correlation_test.go b/module/processor/patterndetectorprocessor/xid_correlation_test.go index 604c2bd3..4eb0e587 100644 --- a/module/processor/patterndetectorprocessor/xid_correlation_test.go +++ b/module/processor/patterndetectorprocessor/xid_correlation_test.go @@ -162,11 +162,11 @@ func extractXidCorrelationVerdicts(t *testing.T, ld plog.Logs) []patterns.XidCor } for k := 0; k < sl.LogRecords().Len(); k++ { lr := sl.LogRecords().At(k) - patternID, ok := lr.Attributes().Get(VerdictAttrPatternID) + patternID, ok := lr.Attributes().Get(verdictAttrPatternID) if !ok || patternID.AsString() != patterns.PatternIDXidCorrelation { continue } - js, ok := lr.Attributes().Get(VerdictAttrVerdictJSON) + js, ok := lr.Attributes().Get(verdictAttrVerdictJSON) if !ok { continue } diff --git a/module/processor/rankjoinprocessor/config.go b/module/processor/rankjoinprocessor/config.go index 735a7fc1..a60b8ac8 100644 --- a/module/processor/rankjoinprocessor/config.go +++ b/module/processor/rankjoinprocessor/config.go @@ -35,12 +35,6 @@ const AttrPodEvictedAt = "k8s.pod_evicted_at" // AttrEventHint is the canonical k8s.event.hint attribute key. const AttrEventHint = "k8s.event.hint" -// HintPodEvicted is the canonical k8s.event.hint value for a kubelet -// eviction. Mirrors module/pkg/patterns.HintPodEvicted; kept as a -// string literal here to avoid the import cycle the processor would -// otherwise have to dodge. -const HintPodEvicted = "pod_evicted" - // Config is the operator-facing YAML for the rankjoinprocessor. type Config struct { // EvictionMatchWindow is the max |evictionTime - rankRecordTime| diff --git a/module/processor/rankjoinprocessor/doc.go b/module/processor/rankjoinprocessor/doc.go deleted file mode 100644 index 8e25cef1..00000000 --- a/module/processor/rankjoinprocessor/doc.go +++ /dev/null @@ -1,19 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -// Package rankjoinprocessor is the windowed cross-signal join processor. -// It correlates NCCL FlightRecorder log records (carrying nccl.rank / -// nccl.fr.rank) against Kubernetes pod-evicted events (carrying -// k8s.event.hint=pod_evicted) within a configurable wall-clock window -// (default 5s, per RFC-0013 §M19) and stamps the matched pair with the -// canonical M19 join attributes so downstream consumers see one -// causally-linked record per evicted rank. -// -// The processor operates on the logs signal only. It is stateless -// across pipeline restarts — the join window is in-memory; restart -// drops in-flight pairs (acceptable: pod-evicted is a one-shot -// correlation, not a steady-state stream). -// -// Cross-link: the underlying pattern-detection library lives at -// module/pkg/patterns/; the replay corpus that anchors M19 golden -// tests lives at module/pkg/replay/pod_evicted/. -package rankjoinprocessor diff --git a/module/processor/rankjoinprocessor/factory.go b/module/processor/rankjoinprocessor/factory.go deleted file mode 100644 index 00218e34..00000000 --- a/module/processor/rankjoinprocessor/factory.go +++ /dev/null @@ -1,48 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package rankjoinprocessor - -import ( - "context" - "fmt" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/processor" -) - -// componentType is the kind name registered in components.yaml. -// Wrapped in a function so MustNewType is not a top-level side effect -// (mirrors the ncclfrreceiver pattern). -func componentType() component.Type { return component.MustNewType("rankjoin") } - -// stability is the OCB-surfaced stability level. Alpha — the M19 join -// shape is new in v0.2.x and may evolve once real-cluster validation -// lands. -const stability = component.StabilityLevelAlpha - -// NewFactory returns the upstream processor.Factory for rankjoin. -// Only the logs signal is wired; metrics + traces surface upstream's -// "signal not supported" via processor.NewFactory's defaults. -func NewFactory() processor.Factory { - return processor.NewFactory( - componentType(), - createDefaultConfig, - processor.WithLogs(createLogs, stability), - ) -} - -// createDefaultConfig matches upstream component.CreateDefaultConfigFunc. -func createDefaultConfig() component.Config { return defaultConfig() } - -// createLogs is the processor.CreateLogsFunc wired by WithLogs. -func createLogs(_ context.Context, set processor.Settings, cfg component.Config, next consumer.Logs) (processor.Logs, error) { - c, ok := cfg.(*Config) - if !ok { - return nil, fmt.Errorf("rankjoin: unexpected config type %T", cfg) - } - if err := c.Validate(); err != nil { - return nil, fmt.Errorf("rankjoin: %w", err) - } - return newProcessor(set, c, next), nil -} diff --git a/module/processor/rankjoinprocessor/rankjoin.go b/module/processor/rankjoinprocessor/rankjoin.go index cba4af32..ec8d86df 100644 --- a/module/processor/rankjoinprocessor/rankjoin.go +++ b/module/processor/rankjoinprocessor/rankjoin.go @@ -1,5 +1,21 @@ // SPDX-License-Identifier: Apache-2.0 +// Package rankjoinprocessor is the windowed cross-signal join processor. +// It correlates NCCL FlightRecorder log records (carrying nccl.rank / +// nccl.fr.rank) against Kubernetes pod-evicted events (carrying +// k8s.event.hint=pod_evicted) within a configurable wall-clock window +// (default 5s, per RFC-0013 §M19) and stamps the matched pair with the +// canonical M19 join attributes so downstream consumers see one +// causally-linked record per evicted rank. +// +// The processor operates on the logs signal only. It is stateless +// across pipeline restarts — the join window is in-memory; restart +// drops in-flight pairs (acceptable: pod-evicted is a one-shot +// correlation, not a steady-state stream). +// +// Cross-link: the underlying pattern-detection library lives at +// module/pkg/patterns/; the replay corpus that anchors M19 golden +// tests lives at module/pkg/replay/pod_evicted/. package rankjoinprocessor import ( @@ -14,8 +30,46 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/processor" "go.uber.org/zap" + + "github.com/tracecoreai/tracecore/module/pkg/patterns" ) +// componentType is the kind name registered in components.yaml. +// Wrapped in a function so MustNewType is not a top-level side effect +// (mirrors the ncclfrreceiver pattern). +func componentType() component.Type { return component.MustNewType("rankjoin") } + +// stability is the OCB-surfaced stability level. Alpha — the M19 join +// shape is new in v0.2.x and may evolve once real-cluster validation +// lands. +const stability = component.StabilityLevelAlpha + +// NewFactory returns the upstream processor.Factory for rankjoin. +// Only the logs signal is wired; metrics + traces surface upstream's +// "signal not supported" via processor.NewFactory's defaults. +func NewFactory() processor.Factory { + return processor.NewFactory( + componentType(), + createDefaultConfig, + processor.WithLogs(createLogs, stability), + ) +} + +// createDefaultConfig matches upstream component.CreateDefaultConfigFunc. +func createDefaultConfig() component.Config { return defaultConfig() } + +// createLogs is the processor.CreateLogsFunc wired by WithLogs. +func createLogs(_ context.Context, set processor.Settings, cfg component.Config, next consumer.Logs) (processor.Logs, error) { + c, ok := cfg.(*Config) + if !ok { + return nil, fmt.Errorf("rankjoin: unexpected config type %T", cfg) + } + if err := c.Validate(); err != nil { + return nil, fmt.Errorf("rankjoin: %w", err) + } + return newProcessor(set, c, next), nil +} + // instrumentationScope pins the OTel scope name (the package import // path, per OTel convention). const instrumentationScope = "github.com/tracecoreai/tracecore/module/processor/rankjoinprocessor" @@ -377,7 +431,7 @@ func isPodEvicted(attrs pcommon.Map) bool { if !ok { return false } - return v.AsString() == HintPodEvicted + return v.AsString() == string(patterns.HintPodEvicted) } // rankFromLogRecord returns the rank for a NCCL FR record. Prefers diff --git a/module/processor/rankjoinprocessor/rankjoin_test.go b/module/processor/rankjoinprocessor/rankjoin_test.go index a2c9b4dc..6468afa8 100644 --- a/module/processor/rankjoinprocessor/rankjoin_test.go +++ b/module/processor/rankjoinprocessor/rankjoin_test.go @@ -398,7 +398,7 @@ func newEvictionLogs(pod string, ts time.Time) plog.Logs { lr.SetTimestamp(pcommon.NewTimestampFromTime(ts)) lr.SetObservedTimestamp(pcommon.NewTimestampFromTime(ts)) lr.Body().SetStr("kubelet evicted pod") - lr.Attributes().PutStr(AttrEventHint, HintPodEvicted) + lr.Attributes().PutStr(AttrEventHint, string(patterns.HintPodEvicted)) lr.Attributes().PutStr("k8s.event.reason", "Evicted") return ld } @@ -442,7 +442,7 @@ func appendEvictionRecord(ld plog.Logs, pod string, ts time.Time) { lr.SetTimestamp(pcommon.NewTimestampFromTime(ts)) lr.SetObservedTimestamp(pcommon.NewTimestampFromTime(ts)) lr.Body().SetStr("kubelet evicted pod") - lr.Attributes().PutStr(AttrEventHint, HintPodEvicted) + lr.Attributes().PutStr(AttrEventHint, string(patterns.HintPodEvicted)) lr.Attributes().PutStr("k8s.event.reason", "Evicted") } diff --git a/module/receiver/ncclfrreceiver/doc.go b/module/receiver/ncclfrreceiver/doc.go deleted file mode 100644 index 54becb65..00000000 --- a/module/receiver/ncclfrreceiver/doc.go +++ /dev/null @@ -1,13 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -// Package ncclfrreceiver is the NCCL FlightRecorder dump-directory -// receiver. It watches a configured directory for pickle files written -// by PyTorch's FlightRecorder, decodes each one via the safe pickle -// parser in module/pkg/nccl/fr_parser, and emits one OTel log record -// per ring-buffer entry. -// -// The pickle parser is the security boundary: every dump-file byte -// flows through the default-deny opcode whitelist. See PRINCIPLES.md -// §1 ("never crash the workload"), §9 (typed errors), and the M11 -// rubric in MILESTONES.md. -package ncclfrreceiver diff --git a/module/receiver/ncclfrreceiver/nccl_fr.go b/module/receiver/ncclfrreceiver/nccl_fr.go index f408a8e4..33804b9b 100644 --- a/module/receiver/ncclfrreceiver/nccl_fr.go +++ b/module/receiver/ncclfrreceiver/nccl_fr.go @@ -1,5 +1,15 @@ // SPDX-License-Identifier: Apache-2.0 +// Package ncclfrreceiver is the NCCL FlightRecorder dump-directory +// receiver. It watches a configured directory for pickle files written +// by PyTorch's FlightRecorder, decodes each one via the safe pickle +// parser in module/pkg/nccl/fr_parser, and emits one OTel log record +// per ring-buffer entry. +// +// The pickle parser is the security boundary: every dump-file byte +// flows through the default-deny opcode whitelist. See PRINCIPLES.md +// §1 ("never crash the workload"), §9 (typed errors), and the M11 +// rubric in MILESTONES.md. package ncclfrreceiver import (