Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions internal/synthesis/patterns/model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// SPDX-License-Identifier: Apache-2.0

package patterns

import "time"

// The types in this file are the pattern library's own model — the
// shape pattern detectors (and the replay fixture corpus) read from.
// Per RFC-0013, the pattern library is the v0.3 moat; it travels with
// the project regardless of which receivers are in the active build,
// so its input types live next to the detectors instead of being
// re-imported from any one upstream receiver.
//
// Field names and tags mirror the OTel attribute keys a Kubernetes-
// event source would stamp on its emitted log records — so a fixture
// JSON authored against the upstream k8sevents receiver schema
// unmarshals into these types unchanged.

// Record is the typed representation of a single Kubernetes Event
// consumed by a pattern detector. Detectors read Record values
// directly (no plog.LogRecord grep) so a schema rename in an upstream
// emitter surfaces as a compile error here, not as a silent
// pattern-evaluation regression.
type Record struct {
// EventUID is the upstream Event object's metadata.uid — globally
// unique per Event, even across resyncs.
EventUID string `json:"event_uid,omitempty"`

// Action is the events.k8s.io/v1 Event.Action field — what the
// reporter did ("Binding", "Killing", "Pulled", ...). Empty for
// "synthetic" Events the kubelet/controllers emit without a
// distinct action.
Action string `json:"action,omitempty"`

// Reason is the short, machine-readable cause ("Evicted",
// "FailedScheduling", "SystemOOM", ...). Drives Hint.
Reason string `json:"reason,omitempty"`

// Hint is the tracecore-canonical `k8s.event.hint` value. The
// named type rejects raw string-literal comparisons at compile
// time — detectors must use the exported `Hint*` constants.
Hint Hint `json:"hint,omitempty"`

// Regarding identifies the object the Event is about
// (events.k8s.io/v1 Event.Regarding).
Regarding ObjectRef `json:"regarding"`

// ReportingController is the controller name that wrote the
// Event ("kubelet", "default-scheduler", "deployment-controller").
ReportingController string `json:"reporting_controller,omitempty"`

// ReportingInstance is the controller-instance identifier. For
// kubelet-emitted Events (Evicted, NodeHasDiskPressure, etc.)
// this is the node name — load-bearing for the pod-evicted
// detector, which uses it to join an Evicted pod against its
// node's recent pressure transitions.
ReportingInstance string `json:"reporting_instance,omitempty"`

// Note is the human-readable message body. Bounded by the
// upstream API server's 1KiB limit; we don't trim further.
Note string `json:"note,omitempty"`

// SeriesCount is the number of times this Event has fired since
// the upstream API server started compressing repeats. 0 when
// the Event is not in a Series.
SeriesCount int32 `json:"series_count,omitempty"`

// EventTime is the events.k8s.io/v1 Event.EventTime, falling back
// to DeprecatedFirstTimestamp / DeprecatedLastTimestamp on
// kubelet builds that haven't switched to EventTime.
EventTime time.Time `json:"event_time,omitempty"`

// Type is `Normal` or `Warning`. Preserved on the record for
// downstream detectors and the min_event_type filter on the
// emitter side.
Type string `json:"type,omitempty"`
}

// ObjectRef mirrors the events.k8s.io/v1 ObjectReference subset
// pattern detectors need. Kept distinct from upstream
// k8s.io/api/core/v1.ObjectReference so the pattern library does not
// drag the full client-go API surface into its compile graph.
type ObjectRef struct {
Kind string `json:"kind,omitempty"`
Namespace string `json:"namespace,omitempty"`
Name string `json:"name,omitempty"`
UID string `json:"uid,omitempty"`
}

// NodeRecord is the typed representation of a single node-condition
// transition. Sibling to Record; pattern detectors read NodeRecord
// values directly.
type NodeRecord struct {
// NodeName is the upstream Node object's metadata.name. Stable
// across condition transitions on the same Node.
NodeName string `json:"node_name,omitempty"`

// NodeUID is the Node's metadata.uid — globally unique.
NodeUID string `json:"node_uid,omitempty"`

// Hint is the canonical k8s.event.hint value. For NodeRecord
// the value is always HintNodePressure (memory/disk/pid) or
// HintNodeUnhealthy (NotReady); the Pressure field carries the
// finer-grained kind.
Hint Hint `json:"hint,omitempty"`

// Pressure is the specific condition kind that transitioned
// True. Named-type so a switch arm typo is a compile error.
Pressure NodePressureKind `json:"pressure,omitempty"`

// TransitionAt is the LastTransitionTime of the condition.
// Used by the pod-evicted detector to bound the join window.
TransitionAt time.Time `json:"transition_at,omitempty"`

// Message is the condition's Message field — human-readable
// detail of why the pressure tripped (e.g.
// "imagefs.available<15%"). Bounded by upstream apiserver
// limits; we don't trim further.
Message string `json:"message,omitempty"`
}

// Hint is the typed `k8s.event.hint` value. The named type means a
// string literal in a downstream `case` is a type error — detectors
// must use the exported `Hint*` constants.
type Hint string

// Canonical Hint values. Mirrors the taxonomy emitters stamp on
// records. Only the values pattern detectors switch on are listed
// here — adding a new pattern that needs another Hint adds the
// constant in the same change.
const (
HintPodEvicted Hint = "pod_evicted"
HintNodePressure Hint = "node_pressure"
HintNodeUnhealthy Hint = "node_unhealthy"
HintScheduleFailure Hint = "schedule_failure"
)

// NodePressureKind enumerates the four node-condition transitions
// pattern detectors recognize. Named type so a downstream
// `case "memory"` is a compile error; consumers must use the exported
// constants.
type NodePressureKind string

// Canonical NodePressureKind values.
const (
PressureMemory NodePressureKind = "memory"
PressureDisk NodePressureKind = "disk"
PressurePID NodePressureKind = "pid"
PressureNotReady NodePressureKind = "notready"
)

// Event Type values mirror the upstream events.k8s.io/v1 Event.Type
// enum — `Normal` or `Warning`. Hoisted here so fixture authors and
// detector tests share one source of truth.
const (
EventTypeNormal = "Normal"
EventTypeWarning = "Warning"
)
54 changes: 26 additions & 28 deletions internal/synthesis/patterns/pod_evicted.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"sort"
"strings"
"time"

"github.com/tracecoreai/tracecore/components/receivers/k8sevents"
)

// DefaultJoinWindow is the maximum gap between a node-pressure
Expand All @@ -22,8 +20,8 @@ import (
const DefaultJoinWindow = 30 * time.Second

// PodEvictedDetector is the M19 pattern-#14 detector. Cross-receiver
// query is structured (typed accessor against M10's exported Record
// and NodeRecord types per MILESTONES §M19); no string grep against
// query is structured (typed accessor against the local Record and
// NodeRecord types in model.go); no string grep against
// plog.LogRecord attributes.
//
// Zero-value usage is permitted — JoinWindow defaults to
Expand All @@ -35,9 +33,9 @@ type PodEvictedDetector struct {
JoinWindow time.Duration
}

// Evaluate scans events for k8sevents Pod-Evicted records and joins
// each against the most recent matching node-condition record within
// the JoinWindow. Returns one PodEvictedVerdict per evicted pod, in
// Evaluate scans events for Pod-Evicted records and joins each
// against the most recent matching node-condition record within the
// JoinWindow. Returns one PodEvictedVerdict per evicted pod, in
// EventTime ascending order so the slice is deterministic for golden
// tests.
//
Expand All @@ -49,7 +47,7 @@ type PodEvictedDetector struct {
// at the Hint check and never appear in the output. This satisfies
// the rubric's negative-fixture gate: Killing/Preempted/FailedScheduling
// map to non-pod_evicted Hints and so emit zero verdicts.
func (d PodEvictedDetector) Evaluate(events []k8sevents.Record, nodeConds []k8sevents.NodeRecord) []PodEvictedVerdict {
func (d PodEvictedDetector) Evaluate(events []Record, nodeConds []NodeRecord) []PodEvictedVerdict {
window := d.JoinWindow
if window <= 0 {
window = DefaultJoinWindow
Expand All @@ -63,7 +61,7 @@ func (d PodEvictedDetector) Evaluate(events []k8sevents.Record, nodeConds []k8se
verdicts := make([]PodEvictedVerdict, 0)
for i := range events {
ev := events[i]
if ev.Hint != k8sevents.HintPodEvicted {
if ev.Hint != HintPodEvicted {
continue
}
verdicts = append(verdicts, buildVerdict(ev, condIdx, window))
Expand All @@ -78,8 +76,8 @@ func (d PodEvictedDetector) Evaluate(events []k8sevents.Record, nodeConds []k8se
// indexNodeConds groups records by node name and sorts each bucket
// by TransitionAt ascending so the detector can binary-search for
// the most-recent transition before an eviction.
func indexNodeConds(recs []k8sevents.NodeRecord) map[string][]k8sevents.NodeRecord {
idx := map[string][]k8sevents.NodeRecord{}
func indexNodeConds(recs []NodeRecord) map[string][]NodeRecord {
idx := map[string][]NodeRecord{}
for _, r := range recs {
idx[r.NodeName] = append(idx[r.NodeName], r)
}
Expand All @@ -97,7 +95,7 @@ func indexNodeConds(recs []k8sevents.NodeRecord) map[string][]k8sevents.NodeReco
// condition index and produces the verdict. Confidence is Full iff a
// node-condition record was found within the window; Partial otherwise
// with MissingLayers=["node_condition"].
func buildVerdict(ev k8sevents.Record, condIdx map[string][]k8sevents.NodeRecord, window time.Duration) PodEvictedVerdict {
func buildVerdict(ev Record, condIdx map[string][]NodeRecord, window time.Duration) PodEvictedVerdict {
v := PodEvictedVerdict{
PatternID: PatternIDPodEvicted,
EvidenceTrail: []EvidenceRef{
Expand Down Expand Up @@ -152,19 +150,19 @@ func annotateRemediationWithNode(remediation, nodeName string) string {
// the per-node-sorted slice whose TransitionAt is <= evTime and
// >= evTime-window. Bucket is sorted ascending; binary-search finds
// the rightmost element <= evTime, then we verify the window bound.
func mostRecentConditionWithin(bucket []k8sevents.NodeRecord, evTime time.Time, window time.Duration) (k8sevents.NodeRecord, bool) {
func mostRecentConditionWithin(bucket []NodeRecord, evTime time.Time, window time.Duration) (NodeRecord, bool) {
if len(bucket) == 0 {
return k8sevents.NodeRecord{}, false
return NodeRecord{}, false
}
i := sort.Search(len(bucket), func(i int) bool {
return bucket[i].TransitionAt.After(evTime)
})
if i == 0 {
return k8sevents.NodeRecord{}, false
return NodeRecord{}, false
}
candidate := bucket[i-1]
if evTime.Sub(candidate.TransitionAt) > window {
return k8sevents.NodeRecord{}, false
return NodeRecord{}, false
}
return candidate, true
}
Expand All @@ -173,7 +171,7 @@ func mostRecentConditionWithin(bucket []k8sevents.NodeRecord, evTime time.Time,
// joined NodeRecord. Omits the ": <message>" suffix when the
// upstream Message is empty so the description never ends with a
// dangling colon.
func nodeConditionDescription(cond k8sevents.NodeRecord) string {
func nodeConditionDescription(cond NodeRecord) string {
base := fmt.Sprintf("Node %s entered %s pressure", cond.NodeName, cond.Pressure)
if cond.Message == "" {
return base
Expand All @@ -183,7 +181,7 @@ func nodeConditionDescription(cond k8sevents.NodeRecord) string {

// displayPodName returns "namespace/name" for a Pod-shaped Record;
// falls back to just the name (or "<unknown>") otherwise.
func displayPodName(ev k8sevents.Record) string {
func displayPodName(ev Record) string {
if ev.Regarding.Namespace != "" && ev.Regarding.Name != "" {
return ev.Regarding.Namespace + "/" + ev.Regarding.Name
}
Expand All @@ -209,7 +207,7 @@ func formatTimestamp(t time.Time) string {
// vocabulary verbatim (signals + nodeConditionMessageFmt + the
// ephemeral-storage / EmptyDir paths). Non-kubelet drivers
// (descheduler, custom controllers) intentionally land in "unknown".
func pressureFromNote(note string) k8sevents.NodePressureKind {
func pressureFromNote(note string) NodePressureKind {
low := strings.ToLower(note)
for _, m := range pressureMatchers {
for _, anchor := range m.anchors {
Expand All @@ -226,11 +224,11 @@ func pressureFromNote(note string) k8sevents.NodePressureKind {
// "ephemeral storage" beats a hypothetical message that mentions
// both ephemeral-storage and memory.
var pressureMatchers = []struct {
kind k8sevents.NodePressureKind
kind NodePressureKind
anchors []string
}{
{
kind: k8sevents.PressureDisk,
kind: PressureDisk,
anchors: []string{
"nodefs",
"imagefs",
Expand All @@ -243,7 +241,7 @@ var pressureMatchers = []struct {
},
},
{
kind: k8sevents.PressureMemory,
kind: PressureMemory,
anchors: []string{
"memory.available",
"memory pressure",
Expand All @@ -252,7 +250,7 @@ var pressureMatchers = []struct {
},
},
{
kind: k8sevents.PressurePID,
kind: PressurePID,
anchors: []string{
"pid.available",
"pid pressure",
Expand All @@ -267,15 +265,15 @@ var pressureMatchers = []struct {
// memory or add headroom; pid → cap fork rate; unknown → the
// documented "investigate" fallback that satisfies the rubric's
// partial-verdict path.
func remediationFor(p k8sevents.NodePressureKind) string {
func remediationFor(p NodePressureKind) string {
switch p {
case k8sevents.PressureDisk:
case PressureDisk:
return "Free imagefs or relocate the training write path to NVMe; tighten kubelet --eviction-hard nodefs.available."
case k8sevents.PressureMemory:
case PressureMemory:
return "Reduce per-rank memory footprint or add node headroom; review pod requests vs allocatable."
case k8sevents.PressurePID:
case PressurePID:
return "Cap fork rate (training-process spawn) or raise the node pid limit."
case k8sevents.PressureNotReady:
case PressureNotReady:
return "Inspect kubelet health, network plumbing, and CSI mounts on the node."
}
return "Inspect kubelet eviction logs on the node; the pressure root was not captured in the join window."
Expand Down
17 changes: 8 additions & 9 deletions internal/synthesis/patterns/pod_evicted_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"
"time"

"github.com/tracecoreai/tracecore/components/receivers/k8sevents"
"github.com/tracecoreai/tracecore/internal/synthesis/patterns"
)

Expand All @@ -19,36 +18,36 @@ func BenchmarkPodEvictedDetector_1kEventWindow(b *testing.B) {
now := time.Unix(1_700_000_000, 0).UTC()
const n = 1024

events := make([]k8sevents.Record, 0, n)
nodes := make([]k8sevents.NodeRecord, 0, 64)
events := make([]patterns.Record, 0, n)
nodes := make([]patterns.NodeRecord, 0, 64)

for i := 0; i < 64; i++ {
nodes = append(nodes, k8sevents.NodeRecord{
nodes = append(nodes, patterns.NodeRecord{
NodeName: "node-" + strconv.Itoa(i),
NodeUID: "uid-" + strconv.Itoa(i),
Pressure: k8sevents.PressureDisk,
Pressure: patterns.PressureDisk,
TransitionAt: now.Add(time.Duration(-i) * time.Second),
Message: "imagefs.available<15%",
})
}
for i := 0; i < n; i++ {
nodeIdx := i % 64
if i%5 == 0 {
events = append(events, k8sevents.Record{
events = append(events, patterns.Record{
EventUID: "noise-" + strconv.Itoa(i),
Reason: "Killing",
EventTime: now,
})
continue
}
events = append(events, k8sevents.Record{
events = append(events, patterns.Record{
EventUID: "evict-" + strconv.Itoa(i),
Reason: "Evicted",
Hint: k8sevents.HintPodEvicted,
Hint: patterns.HintPodEvicted,
Note: "The node was low on resource: imagefs.available.",
ReportingInstance: "node-" + strconv.Itoa(nodeIdx),
EventTime: now,
Regarding: k8sevents.ObjectRef{Kind: "Pod", Name: "p-" + strconv.Itoa(i)},
Regarding: patterns.ObjectRef{Kind: "Pod", Name: "p-" + strconv.Itoa(i)},
})
}

Expand Down
Loading
Loading