Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions module/pkg/patterns/checkpointer_hang.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sort"
"strings"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
)

// Design spec: docs/patterns/11-checkpointer-hang.md. The detector
Expand Down Expand Up @@ -266,6 +268,32 @@ type CheckpointerHangVerdict struct {
MissingLayers []string `json:"missing_layers,omitempty"`
}

// Common returns the shared verdict scalars; satisfies the
// patterndetectorprocessor's verdictAttrer interface so the generic
// appendVerdict helper emits checkpointer_hang verdicts without
// per-pattern boilerplate.
func (v CheckpointerHangVerdict) Common() VerdictCommon {
return VerdictCommon{
PatternID: v.PatternID,
Headline: v.Headline,
Remediation: v.Remediation,
EvidenceTrail: v.EvidenceTrail,
Kind: "checkpointer_hang",
}
}

// PutAttrs stamps the checkpointer_hang-specific promoted scalars
// onto the verdict log record's attribute map (issue #270 contract).
func (v CheckpointerHangVerdict) PutAttrs(attrs pcommon.Map) {
attrs.PutStr(verdictAttrConfidence, string(v.Confidence))
attrs.PutInt(verdictAttrCheckpointerStallSeconds, v.StallSeconds)
attrs.PutStr(verdictAttrCheckpointerPhase, string(v.Phase))
attrs.PutStr(verdictAttrCheckpointerStorageBackend, v.StorageBackend)
putStrIfSet(attrs, verdictAttrK8sPodName, v.PodName)
putStrIfSet(attrs, verdictAttrK8sPodNamespace, v.PodNamespace)
putStrIfSet(attrs, verdictAttrK8sNodeName, v.Node)
}

// CheckpointerHangDetector is the checkpointer_hang pattern
// detector (NORTHSTAR pattern #11). Zero-value usage is permitted
// — BackwardWindow defaults to
Expand Down
33 changes: 33 additions & 0 deletions module/pkg/patterns/cuda_oom.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"sort"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
)

// DefaultCUDAOOMCorrelationWindow is the maximum gap between a CUDA
Expand Down Expand Up @@ -191,6 +193,37 @@ type CUDAOOMVerdict struct {
MissingLayers []string `json:"missing_layers,omitempty"`
}

// Common returns the shared verdict scalars; satisfies the
// patterndetectorprocessor's verdictAttrer interface so the generic
// appendVerdict helper emits cuda_oom verdicts without per-pattern
// boilerplate.
func (v CUDAOOMVerdict) Common() VerdictCommon {
return VerdictCommon{
PatternID: v.PatternID,
Headline: v.Headline,
Remediation: v.Remediation,
EvidenceTrail: v.EvidenceTrail,
Kind: "cuda_oom",
}
}

// PutAttrs stamps the cuda_oom-specific promoted scalars onto the
// verdict log record's attribute map (issue #270 contract). Empty-
// string scalars (GPUID/Node/Pod*) skip via putStrIfSet so dashboards
// filtering by attribute presence don't silently match empty-filter
// queries.
func (v CUDAOOMVerdict) PutAttrs(attrs pcommon.Map) {
attrs.PutStr(verdictAttrConfidence, string(v.Confidence))
attrs.PutStr(verdictAttrCUDAOOMKind, string(v.Kind))
attrs.PutInt(verdictAttrCUDAOOMTriedAllocBytes, v.TriedAllocBytes)
attrs.PutInt(verdictAttrCUDAOOMFBFreeBytes, v.FBFreeBytes)
attrs.PutDouble(verdictAttrCUDAOOMFBFreeRatio, v.FBFreeRatio)
putStrIfSet(attrs, verdictAttrPCIeAERGPUID, v.GPUID)
putStrIfSet(attrs, verdictAttrK8sNodeName, v.Node)
putStrIfSet(attrs, verdictAttrK8sPodName, v.PodName)
putStrIfSet(attrs, verdictAttrK8sPodNamespace, v.PodNamespace)
}

// CUDAOOMDetector is the cuda_oom pattern detector (NORTHSTAR pattern
// #10). Zero-value usage is permitted — CorrelationWindow defaults to
// DefaultCUDAOOMCorrelationWindow; FBFreeFragmentationThreshold
Expand Down
41 changes: 41 additions & 0 deletions module/pkg/patterns/dataloader_hang.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"sort"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
)

// Design spec: docs/patterns/07-dataloader-hang.md. The detector joins
Expand Down Expand Up @@ -180,6 +182,45 @@ type DataLoaderHangVerdict struct {
EventReason string `json:"k8s.event.reason,omitempty"`
}

// Common returns the shared verdict scalars; satisfies the
// patterndetectorprocessor's verdictAttrer interface so the generic
// appendVerdict helper emits dataloader_hang verdicts without
// per-pattern boilerplate.
func (v DataLoaderHangVerdict) Common() VerdictCommon {
return VerdictCommon{
PatternID: v.PatternID,
Headline: v.Headline,
Remediation: v.Remediation,
EvidenceTrail: v.EvidenceTrail,
Kind: "dataloader_hang",
}
}

// PutAttrs stamps the dataloader_hang-specific promoted scalars onto
// the verdict log record's attribute map (issue #270 contract). The
// discriminator-conditional fields (worker_pid / error_class /
// event.reason) emit only on the matching discriminator branch so the
// wire-format contract documented in
// docs/patterns/07-dataloader-hang.md §"Verdict attributes" is
// honored.
func (v DataLoaderHangVerdict) PutAttrs(attrs pcommon.Map) {
attrs.PutStr(verdictAttrConfidence, string(v.Confidence))
attrs.PutStr(verdictAttrDataLoaderHangDiscriminator, string(v.Discriminator))
attrs.PutInt(verdictAttrDataLoaderHangStallSeconds, v.StallSeconds)
putStrIfSet(attrs, verdictAttrK8sPodName, v.PodName)
putStrIfSet(attrs, verdictAttrK8sPodNamespace, v.PodNamespace)
putStrIfSet(attrs, verdictAttrK8sNodeName, v.NodeName)
switch v.Discriminator {
case DataLoaderHangDiscriminatorWorkerKilled:
if v.WorkerPID > 0 {
attrs.PutInt(verdictAttrDataLoaderWorkerPID, v.WorkerPID)
}
putStrIfSet(attrs, verdictAttrDataLoaderErrorClass, v.ErrorClass)
case DataLoaderHangDiscriminatorStorageEvent:
putStrIfSet(attrs, verdictAttrK8sEventReason, v.EventReason)
}
}

// DataLoaderHangDetector is the dataloader_hang pattern detector
// (NORTHSTAR pattern #7 per docs/patterns/07-dataloader-hang.md).
// Zero-value usage is permitted — StallThreshold defaults to
Expand Down
29 changes: 29 additions & 0 deletions module/pkg/patterns/nccl_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"sort"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
)

// Design spec: docs/patterns/09-nccl-bootstrap-timeout.md. The detector
Expand Down Expand Up @@ -219,6 +221,33 @@ type NCCLBootstrapTimeoutVerdict struct {
MissingLayers []string `json:"missing_layers,omitempty"`
}

// Common returns the shared verdict scalars; satisfies the
// patterndetectorprocessor's verdictAttrer interface so the generic
// appendVerdict helper emits nccl_bootstrap verdicts without
// per-pattern boilerplate.
func (v NCCLBootstrapTimeoutVerdict) Common() VerdictCommon {
return VerdictCommon{
PatternID: v.PatternID,
Headline: v.Headline,
Remediation: v.Remediation,
EvidenceTrail: v.EvidenceTrail,
Kind: "nccl_bootstrap",
}
}

// PutAttrs stamps the nccl_bootstrap-specific promoted scalars onto
// the verdict log record's attribute map (issue #270 contract).
// JobID skips via putStrIfSet because the namespace-only fallback
// path leaves it empty.
func (v NCCLBootstrapTimeoutVerdict) PutAttrs(attrs pcommon.Map) {
attrs.PutStr(verdictAttrConfidence, string(v.Confidence))
putStrIfSet(attrs, verdictAttrK8sNamespaceName, v.Namespace)
putStrIfSet(attrs, verdictAttrGenAITrainingJobID, v.JobID)
attrs.PutInt(verdictAttrNCCLBootstrapCohortSize, int64(v.CohortSize))
attrs.PutInt(verdictAttrNCCLBootstrapFailedRankCount, int64(v.FailedRankCount))
attrs.PutStr(verdictAttrNCCLBootstrapDiscriminator, string(v.Discriminator))
}

// NCCLBootstrapDetector is the nccl_bootstrap_timeout pattern detector
// (NORTHSTAR pattern #9, per docs/patterns/09-nccl-bootstrap-timeout.md).
// Zero-value usage is permitted — BootstrapDeadline defaults to
Expand Down
30 changes: 30 additions & 0 deletions module/pkg/patterns/silent_data_corruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"sort"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
)

// DefaultSDCAccuracyDropThreshold is the absolute accuracy regression
Expand Down Expand Up @@ -212,6 +214,34 @@ type SilentDataCorruptionVerdict struct {
MissingLayers []string `json:"missing_layers,omitempty"`
}

// Common returns the shared verdict scalars; satisfies the
// patterndetectorprocessor's verdictAttrer interface so the generic
// appendVerdict helper emits silent_data_corruption verdicts without
// per-pattern boilerplate.
func (v SilentDataCorruptionVerdict) Common() VerdictCommon {
return VerdictCommon{
PatternID: v.PatternID,
Headline: v.Headline,
Remediation: v.Remediation,
EvidenceTrail: v.EvidenceTrail,
Kind: "silent_data_corruption",
}
}

// PutAttrs stamps the silent_data_corruption-specific promoted
// scalars onto the verdict log record's attribute map (issue #270
// contract). SuspectGPUID/SuspectNode skip via putStrIfSet because
// the partial-confidence (Kind=accuracy_only) verdict carries no
// suspect hardware.
func (v SilentDataCorruptionVerdict) PutAttrs(attrs pcommon.Map) {
attrs.PutStr(verdictAttrConfidence, string(v.Confidence))
attrs.PutStr(verdictAttrSDCKind, string(v.Kind))
attrs.PutDouble(verdictAttrSDCAccuracyDrop, v.AccuracyDrop)
putStrIfSet(attrs, verdictAttrSDCSuspectGPUID, v.SuspectGPUID)
putStrIfSet(attrs, verdictAttrSDCSuspectNode, v.SuspectNode)
putStrIfSet(attrs, verdictAttrGenAITrainingJobID, v.JobID)
}

// SilentDataCorruptionDetector is the silent_data_corruption pattern
// detector (pattern #13). Zero-value usage is permitted —
// AccuracyDropThreshold defaults to DefaultSDCAccuracyDropThreshold;
Expand Down
45 changes: 37 additions & 8 deletions module/pkg/patterns/verdict.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,17 @@ type VerdictCommon struct {
Kind string
}

// Verdict attribute keys shared across the seven patterns whose
// PutAttrs methods live in this package. Co-located with the verdict
// types so the wire-format key vocabulary is owned by the producer
// (patterns) rather than the consumer (patterndetectorprocessor) —
// the processor's existing private constants intentionally mirror
// these for the five other detectors that still build the attribute
// map inline. Keep the two sets in sync when the wire contract
// changes (see docs/ATTRIBUTES.md for the rename / removal policy).
// Verdict attribute keys owned by the producer (patterns package) so
// the wire-format vocabulary lives next to the verdict type that emits
// it. Every verdict's PutAttrs method stamps from this block; the
// patterndetectorprocessor's generic appendVerdict[V verdictAttrer]
// helper drives PutAttrs without needing to know the keys itself. See
// docs/ATTRIBUTES.md for the rename / removal policy.
const (
verdictAttrConfidence = "pattern.confidence"
verdictAttrK8sPodName = "k8s.pod.name"
verdictAttrK8sPodNamespace = "k8s.pod.namespace"
verdictAttrK8sNamespaceName = "k8s.namespace.name"
verdictAttrK8sNodeName = "k8s.node.name"
verdictAttrK8sEventReason = "k8s.event.reason"
verdictAttrNCCLPgID = "nccl.fr.pg_id"
Expand All @@ -75,6 +74,36 @@ const (
verdictAttrIBHCADevice = "hw.network.ib.device"
verdictAttrIBPortNum = "hw.network.ib.port.num"
verdictAttrIBTransitionCount = "tracecore.alert.ib_link_flap.transition_count"
verdictAttrGenAITrainingJobID = "gen_ai.training.job_id"

// CUDA OOM (pattern #10) promoted scalars. Customer-stable per
// docs/patterns/10-cuda-oom-deceptive.md §"Verdict attributes".
verdictAttrCUDAOOMKind = "cuda_oom.kind"
verdictAttrCUDAOOMTriedAllocBytes = "cuda_oom.tried_alloc_bytes"
verdictAttrCUDAOOMFBFreeBytes = "cuda_oom.fb_free_bytes"
verdictAttrCUDAOOMFBFreeRatio = "cuda_oom.fb_free_ratio"

// Checkpointer hang promoted scalars.
verdictAttrCheckpointerStallSeconds = "tracecore.alert.checkpointer_hang.stall_seconds"
verdictAttrCheckpointerPhase = "tracecore.alert.checkpointer_hang.phase"
verdictAttrCheckpointerStorageBackend = "tracecore.alert.checkpointer_hang.storage_backend"

// DataLoader hang promoted scalars.
verdictAttrDataLoaderHangDiscriminator = "tracecore.alert.dataloader_hang.discriminator"
verdictAttrDataLoaderHangStallSeconds = "tracecore.alert.dataloader_hang.stall_seconds"
verdictAttrDataLoaderWorkerPID = "dataloader.worker_pid"
verdictAttrDataLoaderErrorClass = "dataloader.error_class"

// NCCL bootstrap promoted scalars.
verdictAttrNCCLBootstrapCohortSize = "tracecore.alert.nccl_bootstrap_timeout.cohort_size"
verdictAttrNCCLBootstrapFailedRankCount = "tracecore.alert.nccl_bootstrap_timeout.failed_rank_count"
verdictAttrNCCLBootstrapDiscriminator = "tracecore.alert.nccl_bootstrap_timeout.discriminator"

// Silent data corruption promoted scalars.
verdictAttrSDCKind = "tracecore.alert.silent_data_corruption.kind"
verdictAttrSDCAccuracyDrop = "tracecore.alert.silent_data_corruption.accuracy_drop"
verdictAttrSDCSuspectGPUID = "tracecore.alert.silent_data_corruption.suspect_gpu_id"
verdictAttrSDCSuspectNode = "tracecore.alert.silent_data_corruption.suspect_node"
)

// putStrIfSet skips empty-string scalars so dashboards filtering by
Expand Down
49 changes: 7 additions & 42 deletions module/processor/patterndetectorprocessor/checkpointer_hang.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,12 @@ import (
"github.com/tracecoreai/tracecore/module/pkg/patterns"
)

// Checkpointer hang (pattern #11) verdict-attribute namespace. The
// attribute keys mirror the docs/patterns/11-checkpointer-hang.md
// §"Verdict attributes" table. Promoted onto the verdict log record
// per the issue #270 scalar-promotion contract so dashboards
// table-aggregate on stall_seconds / phase / storage_backend
// without server-side JSON parsing of pattern.verdict_json.
const (
// verdictAttrCheckpointerStallSeconds promotes the wall-clock
// stall duration so dashboards graph hang severity.
verdictAttrCheckpointerStallSeconds = "tracecore.alert.checkpointer_hang.stall_seconds"

// verdictAttrCheckpointerPhase promotes the checkpoint phase
// (plan / write / barrier) so operators triage by phase
// (write ⇒ storage; barrier ⇒ slow rank; plan ⇒ metadata).
verdictAttrCheckpointerPhase = "tracecore.alert.checkpointer_hang.phase"

// verdictAttrCheckpointerStorageBackend promotes the inferred
// storage backend (lustre / fsx / weka / nfs / s3 / unknown).
verdictAttrCheckpointerStorageBackend = "tracecore.alert.checkpointer_hang.storage_backend"
)
// Checkpointer hang (pattern #11) verdict-attribute keys live next
// to the verdict type — see module/pkg/patterns/verdict.go for the
// canonical constant block and CheckpointerHangVerdict.PutAttrs for
// the stamper. Co-locating keys + stamper with the producer side
// ensures the wire-format vocabulary is owned by the patterns
// package, not duplicated here.

// Customer-stable attribute family the OTTL recipes (sibling to
// future filelogreceiver + prometheusreceiver + journaldreceiver
Expand Down Expand Up @@ -126,27 +112,6 @@ func projectStoragePressureRecord(lr plog.LogRecord, resAttrs pcommon.Map) (patt
return r, true
}

// appendCheckpointerHangVerdict emits a checkpointer_hang verdict
// log record. Promoted attrs: k8s.pod.name, k8s.pod.namespace,
// k8s.node.name, the stall_seconds / phase / storage_backend
// scalars, and pattern.confidence (issue #270 contract).
func appendCheckpointerHangVerdict(ld plog.Logs, v patterns.CheckpointerHangVerdict, logger *zap.Logger) {
appendVerdictRecord(ld, logger, verdictCommon{
PatternID: v.PatternID,
Headline: v.Headline,
Remediation: v.Remediation,
EvidenceTrail: v.EvidenceTrail,
}, v, "checkpointer_hang", func(attrs pcommon.Map) {
attrs.PutStr(verdictAttrConfidence, string(v.Confidence))
attrs.PutInt(verdictAttrCheckpointerStallSeconds, v.StallSeconds)
attrs.PutStr(verdictAttrCheckpointerPhase, string(v.Phase))
attrs.PutStr(verdictAttrCheckpointerStorageBackend, v.StorageBackend)
putStrIfSet(attrs, verdictAttrK8sPodName, v.PodName)
putStrIfSet(attrs, verdictAttrK8sPodNamespace, v.PodNamespace)
putStrIfSet(attrs, verdictAttrK8sNodeName, v.Node)
})
}

// collectCheckpointerHangInputs walks the incoming plog.Logs and
// projects the three input shapes the detector consumes. Hoisted
// into the checkpointer-hang-specific file so the cross-cutting
Expand Down Expand Up @@ -217,7 +182,7 @@ func runCheckpointerHangDetector(ld plog.Logs, cfg *Config, emitPartial bool, lo
if v.Confidence == patterns.ConfidencePartial && !emitPartial {
continue
}
appendCheckpointerHangVerdict(ld, v, logger)
appendVerdict(ld, v, logger)
}
}

Expand Down
Loading