Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion components/receivers/containerstdout/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ Per-Kind alert mapping:
| `backpressure_drop` | `ContainerStdoutBackpressure` | warning |
| `cursor_write_failed` | `ContainerStdoutCursorWriteFailed` | critical |
| `watch` | `ContainerStdoutWatchFlap` | warning |
| `fingerprint_cardinality` | `ContainerStdoutCardinalityFingerprint` | info |
| `attribution_cardinality` | `ContainerStdoutCardinalityAttribution` | info |
| `rate_limit_cardinality` | `ContainerStdoutCardinalityRateLimit` | info |
| (composite OR of source flags) | `ContainerStdoutDegraded` | warning |
Expand Down
36 changes: 9 additions & 27 deletions components/receivers/containerstdout/RUNBOOK.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,39 +163,22 @@ test that pins its behaviour, and the operator remediation path.
reconnect; the degraded gauge clears on the next successful
watch.

### KindFingerprintCardinality

- **Alert:** `ContainerStdoutCardinalityFingerprint` (severity: info)
- **Test:** `TestFailure_FingerprintCardinalityOverflow`
- **Means:** the fingerprint→stream identity LRU rolled over a key.
Future scans MAY double-emit lines on the evicted stream until the
cursor resyncs.
- **Likely root cause:** pod churn exceeded the LRU capacity
(default 8192 entries). Normal during cluster autoscaling or
large rolling deploys; sustained signal indicates the cap is
undersized for the cluster.
- **Diagnose:**
```sh
curl localhost:8888/metrics | grep 'kind="fingerprint_cardinality"'
# Compare with pod churn rate.
kubectl get events -A --field-selector reason=Created | wc -l
```
- **Mitigation:** signal-only; no action required for a transient
spike. For a sustained rate, raise the fingerprint LRU cap via the
`config:` override (`containerstdout.attribution_lru_cap`).

### KindAttributionCardinality

- **Alert:** `ContainerStdoutCardinalityAttribution` (severity: info)
- **Test:** `TestFailure_AttributionCardinalityOverflow`
- **Means:** the per-`(pod_uid, container)` attribution LRU rolled
over. Emissions for the evicted key fall back to namespace +
container until the informer re-keys.
- **Likely root cause:** same as fingerprint cardinality -
short-lived pods rotating through the receiver faster than the
LRU clears.
- **Diagnose + Mitigation:** same as KindFingerprintCardinality;
the two LRUs share a cap.
- **Likely root cause:** short-lived pods rotating through the
receiver faster than the LRU clears.
- **Diagnose:**
```sh
curl localhost:8888/metrics | grep 'kind="attribution_cardinality"'
```
- **Mitigation:** signal-only; no action required for a transient
spike. For a sustained rate, raise the attribution LRU cap via the
`config:` override (`containerstdout.attribution_lru_cap`).

### KindRateLimitCardinality

Expand Down Expand Up @@ -268,7 +251,6 @@ cleans them up or re-enables the receiver.
| Slow downstream consumer | `backpressure_drop` | Drop with `kind="backpressure_drop"`; tailer never blocks. | `TestFailure_BackpressureDropOnSlowConsumer` |
| Cursor write to read-only fs | `cursor_write_failed` | Bounded retry; receiver stays alive; restart replays from older cursor. | `TestFailure_CursorWriteFailedReadOnlyFs` |
| Pod-informer watch error | `watch` | `Degraded()=true`; client-go backoff (1s→30s); receiver stays alive. | `TestFailure_WatchAPIServerFlap` |
| Fingerprint LRU overflow | `fingerprint_cardinality` | Evicted key may double-emit until cursor resyncs. | `TestFailure_FingerprintCardinalityOverflow` |
| Attribution LRU overflow | `attribution_cardinality` | Evicted key falls back to namespace+container. | `TestFailure_AttributionCardinalityOverflow` |
| Rate-limit LRU overflow | `rate_limit_cardinality` | Evicted key emits without budget until namespace fallback claims it. | `TestFailure_RateLimitCardinalityOverflow` |
| Corrupt CRI line | (no Kind - line skipped) | Skipped + emission-stat tick; receiver stays alive. | `TestFailure_CorruptCRILineSkipped` |
Expand Down
8 changes: 3 additions & 5 deletions components/receivers/containerstdout/attribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ package containerstdout
import (
"container/list"
"sync"

"github.com/tracecoreai/tracecore/internal/selftelemetry"
)

// AttributionRef bundles K8s + training identity for a log line source.
Expand Down Expand Up @@ -50,7 +48,7 @@ type AttributionSource interface {
type Cache struct {
src AttributionSource
cap int
telemetry selftelemetry.Receiver
telemetry Telemetry

mu sync.Mutex
ll *list.List
Expand Down Expand Up @@ -78,14 +76,14 @@ func entryOf(el *list.Element) *cacheEntry {
// capacity. A non-positive cap is normalised to 1 so the cache stays
// well-defined; operators should never see this - Validate enforces
// AttributionConfig.LRUCap > 0 at config load.
func NewCache(src AttributionSource, capacity int, telemetry selftelemetry.Receiver) *Cache {
func NewCache(src AttributionSource, capacity int, telemetry Telemetry) *Cache {
if capacity <= 0 {
capacity = 1
}
if telemetry == nil {
// Defensive - Cache's hot path must never panic on a nil
// telemetry sink. Cardinality events are silently dropped.
telemetry = selftelemetry.NewNoopReceiver()
telemetry = newNoopTelemetry()
}
return &Cache{
src: src,
Expand Down
15 changes: 7 additions & 8 deletions components/receivers/containerstdout/attribution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/stretchr/testify/require"

"github.com/tracecoreai/tracecore/components/receivers/containerstdout"
"github.com/tracecoreai/tracecore/internal/selftelemetry"
)

// stubSource is a test-only AttributionSource that returns a pre-canned
Expand Down Expand Up @@ -73,7 +72,7 @@ func TestAttributionCache_HitMissEviction(t *testing.T) {
Found: true,
}
}
tel := selftelemetry.NewCapturingReceiver()
tel := containerstdout.NewCapturingTelemetry()
cache := containerstdout.NewCache(src, 3, tel)

// Cold inserts: 3 misses → 3 source calls, cache full.
Expand Down Expand Up @@ -113,7 +112,7 @@ func TestAttributionCache_CardinalityKindRecorded(t *testing.T) {
for i := range 5 {
src.byPath[mkPath(i)] = containerstdout.AttributionRef{Found: true}
}
tel := selftelemetry.NewCapturingReceiver()
tel := containerstdout.NewCapturingTelemetry()
cache := containerstdout.NewCache(src, 2, tel)

// Two cold inserts fill the cap - no cardinality event.
Expand All @@ -123,12 +122,12 @@ func TestAttributionCache_CardinalityKindRecorded(t *testing.T) {

// Third insert pushes past cap → one eviction → one cardinality event.
_, _ = cache.Lookup(mkPath(2))
require.Equal(t, []selftelemetry.Kind{containerstdout.KindAttributionCardinality}, tel.Errors())
require.Equal(t, []containerstdout.Kind{containerstdout.KindAttributionCardinality}, tel.Errors())

// Fourth insert → another eviction → another event.
_, _ = cache.Lookup(mkPath(3))
require.Equal(t,
[]selftelemetry.Kind{
[]containerstdout.Kind{
containerstdout.KindAttributionCardinality,
containerstdout.KindAttributionCardinality,
},
Expand All @@ -146,7 +145,7 @@ func TestAttributionCache_LookupErrPropagates(t *testing.T) {
src := newStubSource()
sentinel := errors.New("informer transient")
src.err = sentinel
tel := selftelemetry.NewCapturingReceiver()
tel := containerstdout.NewCapturingTelemetry()
cache := containerstdout.NewCache(src, 4, tel)

_, err := cache.Lookup(mkPath(0))
Expand All @@ -168,7 +167,7 @@ func TestAttributionCache_NotFoundCached(t *testing.T) {

src := newStubSource()
// No byPath entry → stub returns {Found:false}, nil.
tel := selftelemetry.NewCapturingReceiver()
tel := containerstdout.NewCapturingTelemetry()
cache := containerstdout.NewCache(src, 4, tel)

ref, err := cache.Lookup(mkPath(0))
Expand Down Expand Up @@ -197,7 +196,7 @@ func TestAttributionCache_ConcurrentSafe(t *testing.T) {
for i := range keys {
src.byPath[mkPath(i)] = containerstdout.AttributionRef{Found: true}
}
tel := selftelemetry.NewCapturingReceiver()
tel := containerstdout.NewCapturingTelemetry()
cache := containerstdout.NewCache(src, 8, tel)

var wg sync.WaitGroup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@

package containerstdout

import (
"testing"

"github.com/tracecoreai/tracecore/internal/selftelemetry"
)
import "testing"

// BenchmarkAttributionLookup measures Cache.Lookup with 100% hit rate
// (steady state - the receiver settles here after the first few lines
Expand Down Expand Up @@ -36,7 +32,7 @@ func BenchmarkAttributionLookup(b *testing.B) {
Found: true,
},
})
cache := NewCache(src, 64, selftelemetry.NewNoopReceiver())
cache := NewCache(src, 64, newNoopTelemetry())
if _, err := cache.Lookup(path); err != nil {
b.Fatalf("prewarm cache: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ package containerstdout
import (
"testing"
"time"

"github.com/tracecoreai/tracecore/internal/selftelemetry"
)

// BenchmarkChannelDepthUnderEviction measures rate-limit reject overhead
Expand All @@ -27,7 +25,7 @@ func BenchmarkChannelDepthUnderEviction(b *testing.B) {
LRUCap: 16,
LRUEvictAfter: time.Minute,
}
rl := NewRateLimiter(cfg, selftelemetry.NewNoopReceiver())
rl := NewRateLimiter(cfg, newNoopTelemetry())

// Prime the bucket so the loop measures the steady-state reject
// path, not the one-shot admission + accept.
Expand Down
8 changes: 2 additions & 6 deletions components/receivers/containerstdout/bench_hot_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@

package containerstdout

import (
"testing"

"github.com/tracecoreai/tracecore/internal/selftelemetry"
)
import "testing"

// benchDataloaderRegex is the canonical two-named-capture pattern from
// dataloader_test.go - same shape RFC-0010 §Dataloader regex
Expand Down Expand Up @@ -60,7 +56,7 @@ func newBenchHotPathFixture(tb testing.TB) *benchHotPathFixture {
Found: true,
},
})
cache := NewCache(src, 64, selftelemetry.NewNoopReceiver())
cache := NewCache(src, 64, newNoopTelemetry())

// Pre-populate so the first Lookup in the bench loop is a hit.
if _, err := cache.Lookup(path); err != nil {
Expand Down
8 changes: 3 additions & 5 deletions components/receivers/containerstdout/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"path/filepath"
"strings"
"time"

"github.com/tracecoreai/tracecore/internal/selftelemetry"
)

// cursorFileExt is the extension worn by every persisted cursor file.
Expand Down Expand Up @@ -63,7 +61,7 @@ type CursorStore struct {
dir string
fileMode os.FileMode
dirMode os.FileMode
telemetry selftelemetry.Receiver
telemetry Telemetry
}

// cursorOnDisk is the JSON wire format for cursor files. Kept distinct
Expand All @@ -86,7 +84,7 @@ type cursorOnDisk struct {
// A nil telemetry sink is replaced with a no-op so Save never panics
// on a missing receiver - symmetric with attribution.Cache and
// RateLimiter, the other receiver-internal components.
func NewCursorStore(cfg CursorConfig, telemetry selftelemetry.Receiver) (*CursorStore, error) {
func NewCursorStore(cfg CursorConfig, telemetry Telemetry) (*CursorStore, error) {
if cfg.Dir == "" {
return nil, errors.New("cursor: empty Dir")
}
Expand All @@ -101,7 +99,7 @@ func NewCursorStore(cfg CursorConfig, telemetry selftelemetry.Receiver) (*Cursor
return nil, fmt.Errorf("cursor: chmod %q: %w", cfg.Dir, err)
}
if telemetry == nil {
telemetry = selftelemetry.NewNoopReceiver()
telemetry = newNoopTelemetry()
}
return &CursorStore{
dir: cfg.Dir,
Expand Down
11 changes: 5 additions & 6 deletions components/receivers/containerstdout/cursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/require"

"github.com/tracecoreai/tracecore/components/receivers/containerstdout"
"github.com/tracecoreai/tracecore/internal/selftelemetry"
)

// goosWindows centralises the runtime.GOOS guard string used by the
Expand All @@ -25,10 +24,10 @@ const goosWindows = "windows"
// t.TempDir() with default modes and a capturing telemetry sink. The
// returned dir + tel let each test assert against persisted state and
// recorded self-telemetry without rebuilding the harness boilerplate.
func newCursorStoreT(t *testing.T) (*containerstdout.CursorStore, string, *selftelemetry.CapturingReceiver) {
func newCursorStoreT(t *testing.T) (*containerstdout.CursorStore, string, *containerstdout.CapturingTelemetry) {
t.Helper()
dir := filepath.Join(t.TempDir(), "cursors")
tel := selftelemetry.NewCapturingReceiver()
tel := containerstdout.NewCapturingTelemetry()
store, err := containerstdout.NewCursorStore(containerstdout.CursorConfig{
Dir: dir,
FileMode: 0o600,
Expand Down Expand Up @@ -276,7 +275,7 @@ func TestCursorStore_DirCreatedWithMode(t *testing.T) {
Dir: dir,
FileMode: 0o600,
DirMode: 0o700,
}, selftelemetry.NewCapturingReceiver())
}, containerstdout.NewCapturingTelemetry())
require.NoError(t, err)

info, err := os.Stat(dir)
Expand All @@ -301,7 +300,7 @@ func TestCursorStore_WriteFailureRecordsKind(t *testing.T) {
}

dir := filepath.Join(t.TempDir(), "ro-cursors")
tel := selftelemetry.NewCapturingReceiver()
tel := containerstdout.NewCapturingTelemetry()
store, err := containerstdout.NewCursorStore(containerstdout.CursorConfig{
Dir: dir,
FileMode: 0o600,
Expand All @@ -323,7 +322,7 @@ func TestCursorStore_WriteFailureRecordsKind(t *testing.T) {
})
require.Error(t, err, "Save must fail on read-only dir")
require.Equal(t,
[]selftelemetry.Kind{containerstdout.KindCursorWriteFailed},
[]containerstdout.Kind{containerstdout.KindCursorWriteFailed},
tel.Errors(),
"write failure records exactly one KindCursorWriteFailed",
)
Expand Down
18 changes: 9 additions & 9 deletions components/receivers/containerstdout/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/tracecoreai/tracecore/internal/consumer"
"github.com/tracecoreai/tracecore/internal/pipeline"
"github.com/tracecoreai/tracecore/internal/selftelemetry"
)

// ComponentType is the canonical receiver-factory ID. Centralized so
Expand Down Expand Up @@ -64,11 +63,12 @@ func (*factory) CreateTraces(_ context.Context, _ pipeline.CreateSettings, _ pip
// Start/Shutdown body with the lifecycle wiring; this phase only
// has to satisfy the pipeline.Receiver contract.
//
// Self-telemetry follows the kernelevents M2 pattern: the noop
// default keeps the hot path nil-safe; the real Receiver is wired
// Self-telemetry follows the kernelevents sibling pattern: the noop
// default keeps the hot path nil-safe; the real Telemetry is wired
// when set.Telemetry.MeterProvider is non-nil; init failures are
// surfaced via selftelemetry.RecordInitError so operators see when
// they're running silent.
// surfaced via recordInitError so operators see when they're running
// silent. RFC-0013 PR-F replaces the v0.1.x dependency on
// internal/selftelemetry with the receiver-scoped sibling here.
func (*factory) CreateLogs(ctx context.Context, set pipeline.CreateSettings, cfg pipeline.Config, next consumer.Logs) (pipeline.Receiver, error) {
c, ok := cfg.(*Config)
if !ok {
Expand All @@ -78,13 +78,13 @@ func (*factory) CreateLogs(ctx context.Context, set pipeline.CreateSettings, cfg
return nil, fmt.Errorf("containerstdout: invalid config: %w", err)
}

telemetry := selftelemetry.NewNoopReceiver()
telemetry := newNoopTelemetry()
if set.Telemetry.MeterProvider != nil {
if rt, err := selftelemetry.NewReceiver(set.ID, set.Telemetry.MeterProvider); err == nil {
if rt, err := newTelemetry(set.ID, set.Telemetry.MeterProvider); err == nil {
telemetry = rt
} else {
selftelemetry.RecordInitError(ctx, set.Telemetry.MeterProvider,
"receiver", set.ID.String(), selftelemetry.ReasonInstrumentRegister)
recordInitError(ctx, set.Telemetry.MeterProvider,
"receiver", set.ID.String(), reasonInstrumentRegister)
if set.Telemetry.Logger != nil {
set.Telemetry.Logger.Warn("containerstdout self-telemetry init failed; using noop", "err", err)
}
Expand Down
10 changes: 5 additions & 5 deletions components/receivers/containerstdout/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,14 @@ func TestFactory_CreateLogs_RejectsInvalidConfig(t *testing.T) {
require.ErrorContains(t, err, "rank_source")
}

// TestFactory_CreateLogs_WiresSelftelemetry pins the M2 self-telemetry
// TestFactory_CreateLogs_WiresSelftelemetry pins the self-telemetry
// wiring contract: a valid enabled config wired with a real
// MeterProvider (the test fixture's noop MeterProvider counts as
// "real" for this contract - it satisfies the selftelemetry.NewReceiver
// signature) produces a Receiver. The factory must not return nil or
// error on a happy-path enabled config.
// "real" for this contract - it satisfies the newTelemetry signature)
// produces a Receiver. The factory must not return nil or error on a
// happy-path enabled config.
//
// The factory wires a non-noop selftelemetry.Receiver when
// The factory wires a non-noop containerstdout Telemetry when
// set.Telemetry.MeterProvider is non-nil and the noop default
// otherwise - both branches must yield a non-nil internal field so
// hot-path callers don't nil-check.
Expand Down
Loading
Loading