From 7e9946bb0b99dba4c3cd9f765d438d9716e94154 Mon Sep 17 00:00:00 2001 From: Tri Lam Date: Sat, 30 May 2026 23:22:11 -0700 Subject: [PATCH 1/2] feat(pivot): PR-F port containerstdout off internal selftel + lc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Receiver-scoped sibling for `internal/selftelemetry` and `internal/runtime/lifecycle` so RFC-0013 PR-F can delete the internal moats. Mirrors the kernelevents (PR #187) multi-source sibling pattern + the dcgm (PR #188) co-located Kind block. 14 production .go files rewired; zero remaining imports of the internal packages in containerstdout. - selftel.go: exported Telemetry interface, Kind type (canonical mirrors + receiver-local kinds co-located in one const block), NewTelemetry / NewNoopTelemetry / NewCapturingTelemetry, recordInitError. Scope name pinned to the receiver's import path. Constructors stay exported because external _test packages consume them via NewCursorStore / NewCache / NewRateLimiter / NewPodInformer / TailerOptions.Telemetry. - lifecycle.go: receiver-scoped lifecycle with Add() (multi-source: per-tailer Run + per-tailer pipeline + informer + health + idleEvict loops all join the same WaitGroup). TOCTOU-safe Add. - New tests (TDD): selftel_test pins noop safety across every Kind, nil-MP error sentinel, errors_total + scope name, every receiver-local kind routes through the same counter, init_errors_total tick, nil-MP recordInitError safety, CapturingTelemetry round-trip. lifecycle_test pins happy/idempotent/panic-cb/deadline/Add-WG-share/Add-panic-cb/ Add-before-start-noop/Add-after-shutdown-noop/concurrent-Add- during-Shutdown (TOCTOU, race-detector verified, scheduler- hardened with a shutdownGate so the test exercises the race window deterministically rather than flaking on fast machines). - TestTailer_RotationStalledKind stays green via the new sibling (the pre-existing assertion that KindRotationStalled fires after stall must hold; verified under -race × 10). LOC: +1419 new selftel/lifecycle (incl. tests) / -193 in 23 rewired sites = net +1226 across the receiver. Eliminates 24 import sites of the deleted-target packages. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Tri Lam --- .../receivers/containerstdout/attribution.go | 8 +- .../containerstdout/attribution_test.go | 15 +- .../containerstdout/bench_attribution_test.go | 8 +- .../bench_channel_depth_test.go | 4 +- .../containerstdout/bench_hot_path_test.go | 8 +- .../receivers/containerstdout/cursor.go | 8 +- .../receivers/containerstdout/cursor_test.go | 11 +- .../receivers/containerstdout/factory.go | 18 +- .../receivers/containerstdout/factory_test.go | 10 +- .../failure_modes_internal_test.go | 18 +- .../receivers/containerstdout/informer.go | 6 +- .../containerstdout/informer_test.go | 16 +- components/receivers/containerstdout/kind.go | 43 -- .../receivers/containerstdout/kind_test.go | 16 +- .../receivers/containerstdout/lifecycle.go | 181 +++++++ .../containerstdout/lifecycle_test.go | 368 ++++++++++++++ .../containerstdout/noop_receiver.go | 9 +- .../receivers/containerstdout/pipeline.go | 10 +- .../containerstdout/pipeline_test.go | 5 +- .../receivers/containerstdout/ratelimit.go | 8 +- .../containerstdout/ratelimit_test.go | 25 +- .../receivers/containerstdout/receiver.go | 30 +- .../containerstdout/receiver_test.go | 7 +- .../receivers/containerstdout/selftel.go | 460 ++++++++++++++++++ .../receivers/containerstdout/selftel_test.go | 409 ++++++++++++++++ .../receivers/containerstdout/tailer.go | 6 +- .../receivers/containerstdout/tailer_test.go | 21 +- 27 files changed, 1536 insertions(+), 192 deletions(-) delete mode 100644 components/receivers/containerstdout/kind.go create mode 100644 components/receivers/containerstdout/lifecycle.go create mode 100644 components/receivers/containerstdout/lifecycle_test.go create mode 100644 components/receivers/containerstdout/selftel.go create mode 100644 components/receivers/containerstdout/selftel_test.go diff --git a/components/receivers/containerstdout/attribution.go b/components/receivers/containerstdout/attribution.go index a8b2f76b..f50707b5 100644 --- a/components/receivers/containerstdout/attribution.go +++ b/components/receivers/containerstdout/attribution.go @@ -5,8 +5,6 @@ package containerstdout import ( "container/list" "sync" - - "github.com/tracecoreai/tracecore/internal/selftelemetry" ) // AttributionRef bundles K8s + training identity for a log line source. @@ -50,7 +48,7 @@ type AttributionSource interface { type Cache struct { src AttributionSource cap int - telemetry selftelemetry.Receiver + telemetry Telemetry mu sync.Mutex ll *list.List @@ -78,14 +76,14 @@ func entryOf(el *list.Element) *cacheEntry { // capacity. A non-positive cap is normalised to 1 so the cache stays // well-defined; operators should never see this - Validate enforces // AttributionConfig.LRUCap > 0 at config load. -func NewCache(src AttributionSource, capacity int, telemetry selftelemetry.Receiver) *Cache { +func NewCache(src AttributionSource, capacity int, telemetry Telemetry) *Cache { if capacity <= 0 { capacity = 1 } if telemetry == nil { // Defensive - Cache's hot path must never panic on a nil // telemetry sink. Cardinality events are silently dropped. - telemetry = selftelemetry.NewNoopReceiver() + telemetry = NewNoopTelemetry() } return &Cache{ src: src, diff --git a/components/receivers/containerstdout/attribution_test.go b/components/receivers/containerstdout/attribution_test.go index 700387ce..eaf66ad8 100644 --- a/components/receivers/containerstdout/attribution_test.go +++ b/components/receivers/containerstdout/attribution_test.go @@ -12,7 +12,6 @@ import ( "github.com/stretchr/testify/require" "github.com/tracecoreai/tracecore/components/receivers/containerstdout" - "github.com/tracecoreai/tracecore/internal/selftelemetry" ) // stubSource is a test-only AttributionSource that returns a pre-canned @@ -73,7 +72,7 @@ func TestAttributionCache_HitMissEviction(t *testing.T) { Found: true, } } - tel := selftelemetry.NewCapturingReceiver() + tel := containerstdout.NewCapturingTelemetry() cache := containerstdout.NewCache(src, 3, tel) // Cold inserts: 3 misses → 3 source calls, cache full. @@ -113,7 +112,7 @@ func TestAttributionCache_CardinalityKindRecorded(t *testing.T) { for i := range 5 { src.byPath[mkPath(i)] = containerstdout.AttributionRef{Found: true} } - tel := selftelemetry.NewCapturingReceiver() + tel := containerstdout.NewCapturingTelemetry() cache := containerstdout.NewCache(src, 2, tel) // Two cold inserts fill the cap - no cardinality event. @@ -123,12 +122,12 @@ func TestAttributionCache_CardinalityKindRecorded(t *testing.T) { // Third insert pushes past cap → one eviction → one cardinality event. _, _ = cache.Lookup(mkPath(2)) - require.Equal(t, []selftelemetry.Kind{containerstdout.KindAttributionCardinality}, tel.Errors()) + require.Equal(t, []containerstdout.Kind{containerstdout.KindAttributionCardinality}, tel.Errors()) // Fourth insert → another eviction → another event. _, _ = cache.Lookup(mkPath(3)) require.Equal(t, - []selftelemetry.Kind{ + []containerstdout.Kind{ containerstdout.KindAttributionCardinality, containerstdout.KindAttributionCardinality, }, @@ -146,7 +145,7 @@ func TestAttributionCache_LookupErrPropagates(t *testing.T) { src := newStubSource() sentinel := errors.New("informer transient") src.err = sentinel - tel := selftelemetry.NewCapturingReceiver() + tel := containerstdout.NewCapturingTelemetry() cache := containerstdout.NewCache(src, 4, tel) _, err := cache.Lookup(mkPath(0)) @@ -168,7 +167,7 @@ func TestAttributionCache_NotFoundCached(t *testing.T) { src := newStubSource() // No byPath entry → stub returns {Found:false}, nil. - tel := selftelemetry.NewCapturingReceiver() + tel := containerstdout.NewCapturingTelemetry() cache := containerstdout.NewCache(src, 4, tel) ref, err := cache.Lookup(mkPath(0)) @@ -197,7 +196,7 @@ func TestAttributionCache_ConcurrentSafe(t *testing.T) { for i := range keys { src.byPath[mkPath(i)] = containerstdout.AttributionRef{Found: true} } - tel := selftelemetry.NewCapturingReceiver() + tel := containerstdout.NewCapturingTelemetry() cache := containerstdout.NewCache(src, 8, tel) var wg sync.WaitGroup diff --git a/components/receivers/containerstdout/bench_attribution_test.go b/components/receivers/containerstdout/bench_attribution_test.go index c4e483c1..540ea3e6 100644 --- a/components/receivers/containerstdout/bench_attribution_test.go +++ b/components/receivers/containerstdout/bench_attribution_test.go @@ -2,11 +2,7 @@ package containerstdout -import ( - "testing" - - "github.com/tracecoreai/tracecore/internal/selftelemetry" -) +import "testing" // BenchmarkAttributionLookup measures Cache.Lookup with 100% hit rate // (steady state - the receiver settles here after the first few lines @@ -36,7 +32,7 @@ func BenchmarkAttributionLookup(b *testing.B) { Found: true, }, }) - cache := NewCache(src, 64, selftelemetry.NewNoopReceiver()) + cache := NewCache(src, 64, NewNoopTelemetry()) if _, err := cache.Lookup(path); err != nil { b.Fatalf("prewarm cache: %v", err) } diff --git a/components/receivers/containerstdout/bench_channel_depth_test.go b/components/receivers/containerstdout/bench_channel_depth_test.go index 762a51bb..607a8b63 100644 --- a/components/receivers/containerstdout/bench_channel_depth_test.go +++ b/components/receivers/containerstdout/bench_channel_depth_test.go @@ -5,8 +5,6 @@ package containerstdout import ( "testing" "time" - - "github.com/tracecoreai/tracecore/internal/selftelemetry" ) // BenchmarkChannelDepthUnderEviction measures rate-limit reject overhead @@ -27,7 +25,7 @@ func BenchmarkChannelDepthUnderEviction(b *testing.B) { LRUCap: 16, LRUEvictAfter: time.Minute, } - rl := NewRateLimiter(cfg, selftelemetry.NewNoopReceiver()) + rl := NewRateLimiter(cfg, NewNoopTelemetry()) // Prime the bucket so the loop measures the steady-state reject // path, not the one-shot admission + accept. diff --git a/components/receivers/containerstdout/bench_hot_path_test.go b/components/receivers/containerstdout/bench_hot_path_test.go index c06b0c16..83b5d935 100644 --- a/components/receivers/containerstdout/bench_hot_path_test.go +++ b/components/receivers/containerstdout/bench_hot_path_test.go @@ -2,11 +2,7 @@ package containerstdout -import ( - "testing" - - "github.com/tracecoreai/tracecore/internal/selftelemetry" -) +import "testing" // benchDataloaderRegex is the canonical two-named-capture pattern from // dataloader_test.go - same shape RFC-0010 §Dataloader regex @@ -60,7 +56,7 @@ func newBenchHotPathFixture(tb testing.TB) *benchHotPathFixture { Found: true, }, }) - cache := NewCache(src, 64, selftelemetry.NewNoopReceiver()) + cache := NewCache(src, 64, NewNoopTelemetry()) // Pre-populate so the first Lookup in the bench loop is a hit. if _, err := cache.Lookup(path); err != nil { diff --git a/components/receivers/containerstdout/cursor.go b/components/receivers/containerstdout/cursor.go index c08e95a4..24a21c8f 100644 --- a/components/receivers/containerstdout/cursor.go +++ b/components/receivers/containerstdout/cursor.go @@ -10,8 +10,6 @@ import ( "path/filepath" "strings" "time" - - "github.com/tracecoreai/tracecore/internal/selftelemetry" ) // cursorFileExt is the extension worn by every persisted cursor file. @@ -63,7 +61,7 @@ type CursorStore struct { dir string fileMode os.FileMode dirMode os.FileMode - telemetry selftelemetry.Receiver + telemetry Telemetry } // cursorOnDisk is the JSON wire format for cursor files. Kept distinct @@ -86,7 +84,7 @@ type cursorOnDisk struct { // A nil telemetry sink is replaced with a no-op so Save never panics // on a missing receiver - symmetric with attribution.Cache and // RateLimiter, the other receiver-internal components. -func NewCursorStore(cfg CursorConfig, telemetry selftelemetry.Receiver) (*CursorStore, error) { +func NewCursorStore(cfg CursorConfig, telemetry Telemetry) (*CursorStore, error) { if cfg.Dir == "" { return nil, errors.New("cursor: empty Dir") } @@ -101,7 +99,7 @@ func NewCursorStore(cfg CursorConfig, telemetry selftelemetry.Receiver) (*Cursor return nil, fmt.Errorf("cursor: chmod %q: %w", cfg.Dir, err) } if telemetry == nil { - telemetry = selftelemetry.NewNoopReceiver() + telemetry = NewNoopTelemetry() } return &CursorStore{ dir: cfg.Dir, diff --git a/components/receivers/containerstdout/cursor_test.go b/components/receivers/containerstdout/cursor_test.go index 0d33cbfa..a581974f 100644 --- a/components/receivers/containerstdout/cursor_test.go +++ b/components/receivers/containerstdout/cursor_test.go @@ -13,7 +13,6 @@ import ( "github.com/stretchr/testify/require" "github.com/tracecoreai/tracecore/components/receivers/containerstdout" - "github.com/tracecoreai/tracecore/internal/selftelemetry" ) // goosWindows centralises the runtime.GOOS guard string used by the @@ -25,10 +24,10 @@ const goosWindows = "windows" // t.TempDir() with default modes and a capturing telemetry sink. The // returned dir + tel let each test assert against persisted state and // recorded self-telemetry without rebuilding the harness boilerplate. -func newCursorStoreT(t *testing.T) (*containerstdout.CursorStore, string, *selftelemetry.CapturingReceiver) { +func newCursorStoreT(t *testing.T) (*containerstdout.CursorStore, string, *containerstdout.CapturingTelemetry) { t.Helper() dir := filepath.Join(t.TempDir(), "cursors") - tel := selftelemetry.NewCapturingReceiver() + tel := containerstdout.NewCapturingTelemetry() store, err := containerstdout.NewCursorStore(containerstdout.CursorConfig{ Dir: dir, FileMode: 0o600, @@ -276,7 +275,7 @@ func TestCursorStore_DirCreatedWithMode(t *testing.T) { Dir: dir, FileMode: 0o600, DirMode: 0o700, - }, selftelemetry.NewCapturingReceiver()) + }, containerstdout.NewCapturingTelemetry()) require.NoError(t, err) info, err := os.Stat(dir) @@ -301,7 +300,7 @@ func TestCursorStore_WriteFailureRecordsKind(t *testing.T) { } dir := filepath.Join(t.TempDir(), "ro-cursors") - tel := selftelemetry.NewCapturingReceiver() + tel := containerstdout.NewCapturingTelemetry() store, err := containerstdout.NewCursorStore(containerstdout.CursorConfig{ Dir: dir, FileMode: 0o600, @@ -323,7 +322,7 @@ func TestCursorStore_WriteFailureRecordsKind(t *testing.T) { }) require.Error(t, err, "Save must fail on read-only dir") require.Equal(t, - []selftelemetry.Kind{containerstdout.KindCursorWriteFailed}, + []containerstdout.Kind{containerstdout.KindCursorWriteFailed}, tel.Errors(), "write failure records exactly one KindCursorWriteFailed", ) diff --git a/components/receivers/containerstdout/factory.go b/components/receivers/containerstdout/factory.go index 5968eea6..b9562d67 100644 --- a/components/receivers/containerstdout/factory.go +++ b/components/receivers/containerstdout/factory.go @@ -8,7 +8,6 @@ import ( "github.com/tracecoreai/tracecore/internal/consumer" "github.com/tracecoreai/tracecore/internal/pipeline" - "github.com/tracecoreai/tracecore/internal/selftelemetry" ) // ComponentType is the canonical receiver-factory ID. Centralized so @@ -64,11 +63,12 @@ func (*factory) CreateTraces(_ context.Context, _ pipeline.CreateSettings, _ pip // Start/Shutdown body with the lifecycle wiring; this phase only // has to satisfy the pipeline.Receiver contract. // -// Self-telemetry follows the kernelevents M2 pattern: the noop -// default keeps the hot path nil-safe; the real Receiver is wired +// Self-telemetry follows the kernelevents sibling pattern: the noop +// default keeps the hot path nil-safe; the real Telemetry is wired // when set.Telemetry.MeterProvider is non-nil; init failures are -// surfaced via selftelemetry.RecordInitError so operators see when -// they're running silent. +// surfaced via recordInitError so operators see when they're running +// silent. RFC-0013 PR-F replaces the v0.1.x dependency on +// internal/selftelemetry with the receiver-scoped sibling here. func (*factory) CreateLogs(ctx context.Context, set pipeline.CreateSettings, cfg pipeline.Config, next consumer.Logs) (pipeline.Receiver, error) { c, ok := cfg.(*Config) if !ok { @@ -78,13 +78,13 @@ func (*factory) CreateLogs(ctx context.Context, set pipeline.CreateSettings, cfg return nil, fmt.Errorf("containerstdout: invalid config: %w", err) } - telemetry := selftelemetry.NewNoopReceiver() + telemetry := NewNoopTelemetry() if set.Telemetry.MeterProvider != nil { - if rt, err := selftelemetry.NewReceiver(set.ID, set.Telemetry.MeterProvider); err == nil { + if rt, err := NewTelemetry(set.ID, set.Telemetry.MeterProvider); err == nil { telemetry = rt } else { - selftelemetry.RecordInitError(ctx, set.Telemetry.MeterProvider, - "receiver", set.ID.String(), selftelemetry.ReasonInstrumentRegister) + recordInitError(ctx, set.Telemetry.MeterProvider, + "receiver", set.ID.String(), reasonInstrumentRegister) if set.Telemetry.Logger != nil { set.Telemetry.Logger.Warn("containerstdout self-telemetry init failed; using noop", "err", err) } diff --git a/components/receivers/containerstdout/factory_test.go b/components/receivers/containerstdout/factory_test.go index 25116a2f..1a6fc99a 100644 --- a/components/receivers/containerstdout/factory_test.go +++ b/components/receivers/containerstdout/factory_test.go @@ -99,14 +99,14 @@ func TestFactory_CreateLogs_RejectsInvalidConfig(t *testing.T) { require.ErrorContains(t, err, "rank_source") } -// TestFactory_CreateLogs_WiresSelftelemetry pins the M2 self-telemetry +// TestFactory_CreateLogs_WiresSelftelemetry pins the self-telemetry // wiring contract: a valid enabled config wired with a real // MeterProvider (the test fixture's noop MeterProvider counts as -// "real" for this contract - it satisfies the selftelemetry.NewReceiver -// signature) produces a Receiver. The factory must not return nil or -// error on a happy-path enabled config. +// "real" for this contract - it satisfies the NewTelemetry signature) +// produces a Receiver. The factory must not return nil or error on a +// happy-path enabled config. // -// The factory wires a non-noop selftelemetry.Receiver when +// The factory wires a non-noop containerstdout.Telemetry when // set.Telemetry.MeterProvider is non-nil and the noop default // otherwise - both branches must yield a non-nil internal field so // hot-path callers don't nil-check. diff --git a/components/receivers/containerstdout/failure_modes_internal_test.go b/components/receivers/containerstdout/failure_modes_internal_test.go index 7e3843f8..965e7da5 100644 --- a/components/receivers/containerstdout/failure_modes_internal_test.go +++ b/components/receivers/containerstdout/failure_modes_internal_test.go @@ -36,8 +36,6 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes/fake" ktesting "k8s.io/client-go/testing" - - "github.com/tracecoreai/tracecore/internal/selftelemetry" ) // failureModesInternalStubSource is a test-only AttributionSource that @@ -67,7 +65,7 @@ func failureModesTestLogger() *slog.Logger { func TestFailure_AttributionCardinalityOverflow(t *testing.T) { t.Parallel() - tel := selftelemetry.NewCapturingReceiver() + tel := NewCapturingTelemetry() cache := NewCache(failureModesInternalStubSource{}, 4, tel) for i := 0; i < 5; i++ { @@ -106,7 +104,7 @@ func TestFailure_AttributionCardinalityOverflow(t *testing.T) { func TestFailure_RateLimitCardinalityOverflow(t *testing.T) { t.Parallel() - tel := selftelemetry.NewCapturingReceiver() + tel := NewCapturingTelemetry() rl := NewRateLimiter(EgressRateLimitConfig{ Rate: 1000, Burst: 100, @@ -155,7 +153,7 @@ func TestFailure_CursorWriteFailedReadOnlyFs(t *testing.T) { } dir := filepath.Join(t.TempDir(), "ro-cursors") - tel := selftelemetry.NewCapturingReceiver() + tel := NewCapturingTelemetry() store, err := NewCursorStore(CursorConfig{ Dir: dir, FileMode: 0o600, @@ -202,7 +200,7 @@ func TestFailure_WatchAPIServerFlap(t *testing.T) { return true, nil, errors.New("flap: simulated apiserver watch failure") }) - tel := selftelemetry.NewCapturingReceiver() + tel := NewCapturingTelemetry() p, err := NewPodInformer(client, "", nil, tel, failureModesTestLogger()) require.NoError(t, err) @@ -307,7 +305,7 @@ func TestFailure_OversizeLineTruncatedWithMarker(t *testing.T) { timeout = 2 * time.Second ) - tel := selftelemetry.NewCapturingReceiver() + tel := NewCapturingTelemetry() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -364,7 +362,7 @@ func TestFailure_OrphanCursorFilesIgnored(t *testing.T) { t.Parallel() dir := filepath.Join(t.TempDir(), "orphan-cursors") - tel := selftelemetry.NewCapturingReceiver() + tel := NewCapturingTelemetry() store, err := NewCursorStore(CursorConfig{ Dir: dir, FileMode: 0o600, @@ -420,7 +418,7 @@ func TestFailure_RankRegressionOnRestart(t *testing.T) { }, }, } - cache := NewCache(src, 8, selftelemetry.NewCapturingReceiver()) + cache := NewCache(src, 8, NewCapturingTelemetry()) keyOld := PathInfo{Namespace: "ns", PodName: "trainer-0", PodUID: "uid-1", ContainerName: "trainer", LogIndex: 0} keyNew := PathInfo{Namespace: "ns", PodName: "trainer-0", PodUID: "uid-2", ContainerName: "trainer", LogIndex: 0} @@ -504,7 +502,7 @@ var ( _ corev1.Pod = corev1.Pod{} _ metav1.ObjectMeta = metav1.ObjectMeta{} _ types.UID = types.UID("") - _ selftelemetry.Kind = KindAttributionCardinality + _ Kind = KindAttributionCardinality _ TrainingRef = TrainingRef{} _ context.CancelFunc = func() {} _ failureModesInternalUnused diff --git a/components/receivers/containerstdout/informer.go b/components/receivers/containerstdout/informer.go index 243a735f..75b0ca7a 100644 --- a/components/receivers/containerstdout/informer.go +++ b/components/receivers/containerstdout/informer.go @@ -16,8 +16,6 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" - - "github.com/tracecoreai/tracecore/internal/selftelemetry" ) // podInformerResync is the SharedInformerFactory resync interval. The @@ -61,7 +59,7 @@ type PodInformer struct { client kubernetes.Interface nodeName string namespaces map[string]struct{} // empty = all - telemetry selftelemetry.Receiver + telemetry Telemetry logger *slog.Logger informer cache.SharedIndexInformer @@ -98,7 +96,7 @@ func NewPodInformer( client kubernetes.Interface, nodeName string, namespaces []string, - telemetry selftelemetry.Receiver, + telemetry Telemetry, logger *slog.Logger, ) (*PodInformer, error) { if client == nil { diff --git a/components/receivers/containerstdout/informer_test.go b/components/receivers/containerstdout/informer_test.go index 4e218987..f76853a3 100644 --- a/components/receivers/containerstdout/informer_test.go +++ b/components/receivers/containerstdout/informer_test.go @@ -14,8 +14,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/fake" - - "github.com/tracecoreai/tracecore/internal/selftelemetry" ) // testLogger is a discard-output slog handler so the test binary @@ -98,7 +96,7 @@ func TestPodInformer_FiltersByNodeName(t *testing.T) { podOnNode("ns2", "pod-d", "nodeB"), ) - telemetry := selftelemetry.NewCapturingReceiver() + telemetry := NewCapturingTelemetry() p, err := NewPodInformer(client, "nodeA", nil, telemetry, testLogger()) require.NoError(t, err) @@ -134,7 +132,7 @@ func TestPodInformer_FiltersByNamespaceAllowlist(t *testing.T) { podOnNode("kube-system", "pod-3", "nodeA"), ) - telemetry := selftelemetry.NewCapturingReceiver() + telemetry := NewCapturingTelemetry() p, err := NewPodInformer(client, "nodeA", []string{"team-a"}, telemetry, testLogger()) require.NoError(t, err) @@ -166,7 +164,7 @@ func TestPodInformer_EmptyAllowlistMeansAll(t *testing.T) { podOnNode("kube-system", "pod-3", "nodeA"), ) - telemetry := selftelemetry.NewCapturingReceiver() + telemetry := NewCapturingTelemetry() p, err := NewPodInformer(client, "nodeA", nil, telemetry, testLogger()) require.NoError(t, err) @@ -189,7 +187,7 @@ func TestPodInformer_GetByKeyHitMiss(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset(podOnNode("ns1", "pod-a", "nodeA")) - telemetry := selftelemetry.NewCapturingReceiver() + telemetry := NewCapturingTelemetry() p, err := NewPodInformer(client, "nodeA", nil, telemetry, testLogger()) require.NoError(t, err) @@ -221,7 +219,7 @@ func TestPodInformer_SyncedAfterRun(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset(podOnNode("ns1", "pod-a", "nodeA")) - telemetry := selftelemetry.NewCapturingReceiver() + telemetry := NewCapturingTelemetry() p, err := NewPodInformer(client, "nodeA", nil, telemetry, testLogger()) require.NoError(t, err) @@ -243,7 +241,7 @@ func TestPodInformer_RunReturnsOnCtxCancel(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset(podOnNode("ns1", "pod-a", "nodeA")) - telemetry := selftelemetry.NewCapturingReceiver() + telemetry := NewCapturingTelemetry() p, err := NewPodInformer(client, "nodeA", nil, telemetry, testLogger()) require.NoError(t, err) @@ -283,7 +281,7 @@ func TestPodInformer_IndexerReturnsCacheIndexer(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset(podOnNode("ns1", "pod-a", "nodeA")) - telemetry := selftelemetry.NewCapturingReceiver() + telemetry := NewCapturingTelemetry() p, err := NewPodInformer(client, "nodeA", nil, telemetry, testLogger()) require.NoError(t, err) require.NotNil(t, p.Indexer(), "Indexer() must surface the underlying cache.Indexer") diff --git a/components/receivers/containerstdout/kind.go b/components/receivers/containerstdout/kind.go deleted file mode 100644 index 863b8e7d..00000000 --- a/components/receivers/containerstdout/kind.go +++ /dev/null @@ -1,43 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package containerstdout - -import "github.com/tracecoreai/tracecore/internal/selftelemetry" - -// Receiver-local Kind values for failures the canonical Kind* set -// doesn't cover. containerstdout-specific; do not promote to -// selftelemetry without an RFC amendment. Mirrors kernelevents/dcgm -// receiver patterns so cross-receiver dashboards on -// `tracecore_receiver_errors_total{kind=…}` share vocabulary. -// -// Operators see these in the RUNBOOK error-message index (RUNBOOK.md -// will land in M15 Phase 14) and in alert rules. -const ( - // KindRotationStalled - a file open in the tailer hasn't grown - // for longer than the rotation-detect window; cursor write - // retries are running against a likely-deleted inode. - KindRotationStalled = selftelemetry.Kind("rotation_stalled") - // KindCursorWriteFailed - atomic-rename cursor write failed. - // Increment once per failed checkpoint attempt; bounded retry - // in the lifecycle. - KindCursorWriteFailed = selftelemetry.Kind("cursor_write_failed") - // KindBackpressureDrop - tailer→emit channel was full; the - // dropped line is also counted in the per-key dropped_lines - // gauge for tail-receiver SLO accounting. - KindBackpressureDrop = selftelemetry.Kind("backpressure_drop") - // KindFingerprintCardinality - the fingerprint→stream identity - // LRU rolled over a key; future scan may double-emit lines on - // the evicted stream until the cursor resyncs. - KindFingerprintCardinality = selftelemetry.Kind("fingerprint_cardinality") - // KindAttributionCardinality - the per-(pod_uid, container) - // attribution LRU rolled over; emissions for the evicted key - // fall back to namespace+container until the informer re-keys. - KindAttributionCardinality = selftelemetry.Kind("attribution_cardinality") - // KindRateLimitCardinality - the per-(pod_uid, container) - // rate-limit LRU rolled over; the evicted key emits without - // budget until the namespace fallback claims it. - KindRateLimitCardinality = selftelemetry.Kind("rate_limit_cardinality") - // KindWatch - the per-node Pod informer reported a watch error. - // Set degraded; retries are driven by client-go backoff. - KindWatch = selftelemetry.Kind("watch") -) diff --git a/components/receivers/containerstdout/kind_test.go b/components/receivers/containerstdout/kind_test.go index 792a373c..293fc3ec 100644 --- a/components/receivers/containerstdout/kind_test.go +++ b/components/receivers/containerstdout/kind_test.go @@ -2,16 +2,12 @@ package containerstdout -import ( - "testing" - - "github.com/tracecoreai/tracecore/internal/selftelemetry" -) +import "testing" func TestKinds_StringValues(t *testing.T) { for _, tt := range []struct { name string - got selftelemetry.Kind + got Kind want string }{ {"rotation_stalled", KindRotationStalled, "rotation_stalled"}, @@ -21,6 +17,14 @@ func TestKinds_StringValues(t *testing.T) { {"attribution_cardinality", KindAttributionCardinality, "attribution_cardinality"}, {"rate_limit_cardinality", KindRateLimitCardinality, "rate_limit_cardinality"}, {"watch", KindWatch, "watch"}, + // Canonical mirrors (string values must stay byte-for-byte + // identical to the deleted internal/selftelemetry Kind* set — + // dashboards/alerts grep on the exact strings). + {"parse", KindParse, "parse"}, + {"read", KindRead, "read"}, + {"cardinality", KindCardinality, "cardinality"}, + {"downstream", KindDownstream, "downstream"}, + {"panic", KindPanic, "panic"}, } { t.Run(tt.name, func(t *testing.T) { if string(tt.got) != tt.want { diff --git a/components/receivers/containerstdout/lifecycle.go b/components/receivers/containerstdout/lifecycle.go new file mode 100644 index 00000000..544023ac --- /dev/null +++ b/components/receivers/containerstdout/lifecycle.go @@ -0,0 +1,181 @@ +// SPDX-License-Identifier: Apache-2.0 + +// Receiver-scoped streaming-source lifecycle helper. Replaces the +// v0.1.x dependency on `internal/runtime/lifecycle`, which is slated +// for deletion in RFC-0013 PR-F. Owns cancel + WaitGroup + +// panic-recovery bookkeeping containerstdout's fan-in needs (the main +// run goroutine + the PodInformer goroutine + the per-tailer Run +// goroutines + the per-tailer pipeline goroutines + the health + +// eviction loops), so each source author writes a body function, not +// the plumbing. +// +// Multi-source: containerstdout keeps Add() — the receiver registers +// ONE goroutine per source under the same WaitGroup so Shutdown waits +// for every source to finish (mirrors the kernelevents sibling). The +// pyspy / nccl_fr siblings dropped Add() because they have a single +// source; containerstdout's tailers + informer make Add() load-bearing. + +package containerstdout + +import ( + "context" + "errors" + "fmt" + "log/slog" + "runtime" + "sync" + "sync/atomic" +) + +// errLifecycleAlreadyStarted is returned by lifecycle.Start when called +// twice without an intervening Shutdown. errors.Is-comparable so callers +// don't string-match the message. +var errLifecycleAlreadyStarted = errors.New("containerstdout lifecycle: already started") + +// panicCallback is invoked once if a Run function panics. The helper +// recovers the panic so the receiver never crashes the workload +// (PRINCIPLES.md §1). containerstdout wires this to IncError(KindPanic) + +// SetDegraded(true). +type panicCallback func(panicValue any) + +// lifecycle bundles cancel + WaitGroup + started-flag for one or more +// streaming-source goroutines. Zero-value is NOT useful; use newLifecycle. +// +// Shutdown is idempotent. The FIRST Shutdown's error (typically a +// caller-ctx deadline) is stashed + returned by every subsequent +// Shutdown so deadline failures aren't silently swallowed. +type lifecycle struct { + logger *slog.Logger + onPanic panicCallback + + mu sync.Mutex + cancel context.CancelFunc + internalCtx context.Context //nolint:containedctx // ctx held so Add() can spawn goroutines under the same parent. + closed bool // set by Shutdown so post-Shutdown Add silently no-ops. + shutdownErr error + wg sync.WaitGroup + started atomic.Bool +} + +// newLifecycle constructs a lifecycle. logger may be nil (replaced with +// slog.Default for tests). onPanic may be nil — panics are still +// recovered + logged at ERROR but no callback fires. +func newLifecycle(logger *slog.Logger, onPanic panicCallback) *lifecycle { + if logger == nil { + logger = slog.Default() + } + return &lifecycle{logger: logger, onPanic: onPanic} +} + +// Start spawns run in a goroutine. The ctx passed to run is derived +// from parent via context.WithCancel — so the receiver-level parent's +// cancellation cascades into the goroutine without an explicit Shutdown +// call. Idempotent: a second Start without an intervening Shutdown +// returns errLifecycleAlreadyStarted. +func (l *lifecycle) Start(parent context.Context, run func(context.Context)) error { + if !l.started.CompareAndSwap(false, true) { + return errLifecycleAlreadyStarted + } + l.mu.Lock() + internalCtx, cancel := context.WithCancel(parent) + l.cancel = cancel + l.internalCtx = internalCtx + // wg.Add(1) MUST happen under the same mutex as cancel/internalCtx + // so a concurrent Shutdown sees the post-Add state. Without this + // lock, Shutdown could observe cancel != nil, call wg.Wait at + // counter=0, return immediately, and either trigger + // `sync: WaitGroup misuse` OR orphan the goroutine. (Mirrors the + // internal helper's fix.) + l.wg.Add(1) + l.mu.Unlock() + go l.safeRun(internalCtx, run) + return nil +} + +// safeRun wraps run with panic recovery + wg.Done bookkeeping. +func (l *lifecycle) safeRun(ctx context.Context, run func(context.Context)) { + defer l.wg.Done() + defer func() { + if rec := recover(); rec != nil { + l.logger.Error("containerstdout lifecycle: run panic recovered", "panic", fmt.Sprintf("%v", rec)) + if l.onPanic != nil { + l.onPanic(rec) + } + } + }() + run(ctx) +} + +// Shutdown cancels the internal ctx + waits for every Add'd / Start'd +// goroutine to exit, honoring the caller's ctx deadline. Idempotent: +// subsequent calls return the FIRST call's error so a missed deadline +// isn't silently swallowed. +func (l *lifecycle) Shutdown(ctx context.Context) error { + l.mu.Lock() + if l.closed { + err := l.shutdownErr + l.mu.Unlock() + return err + } + cancel := l.cancel + l.cancel = nil + l.internalCtx = nil + l.closed = true + l.mu.Unlock() + if cancel == nil { + return nil + } + cancel() + + done := make(chan struct{}) + go func() { + l.wg.Wait() + close(done) + }() + select { + case <-done: + return nil + case <-ctx.Done(): + // NumGoroutine is process-wide, not lifecycle-local; surfacing + // it here lets operators eyeball whether the leak is plausibly + // ours. + l.logger.Warn("containerstdout lifecycle: shutdown deadline elapsed before goroutine exited", + "process_goroutines", runtime.NumGoroutine()) + err := fmt.Errorf("containerstdout lifecycle shutdown: %w", ctx.Err()) + l.mu.Lock() + l.shutdownErr = err + l.mu.Unlock() + return err + } +} + +// Add registers an additional goroutine under the same WaitGroup, so +// auxiliary watchers + per-source driver goroutines participate in +// Shutdown waiting. The goroutine receives the lifecycle's internal +// ctx so Shutdown's cancel reaches it. A panic inside `run` is +// recovered (same contract as Start). +// +// Refusal modes (silent, but logged at WARN so callers don't lose +// the goroutine to invisibility): +// - lifecycle has not been Started: the callback would never see +// a cancel; spawning is a leak hazard. +// - lifecycle has already been Shutdown: wg.Wait may have +// returned; a fresh wg.Add(1) would panic. +// +// TOCTOU safety: wg.Add(1) happens under the same mutex as the +// post-Shutdown `closed` check, so Add never races a concurrent +// Shutdown into wg.Add-after-wg.Wait-returned panic territory. +func (l *lifecycle) Add(run func(context.Context)) { + l.mu.Lock() + if l.closed || l.internalCtx == nil { + l.mu.Unlock() + // Log so future authors don't hit silent-refusal traps. + l.logger.Warn("containerstdout lifecycle.Add called outside running window — ignored", + "closed", l.closed, "started", l.internalCtx != nil) + return + } + ctx := l.internalCtx + l.wg.Add(1) + l.mu.Unlock() + go l.safeRun(ctx, run) +} diff --git a/components/receivers/containerstdout/lifecycle_test.go b/components/receivers/containerstdout/lifecycle_test.go new file mode 100644 index 00000000..52242051 --- /dev/null +++ b/components/receivers/containerstdout/lifecycle_test.go @@ -0,0 +1,368 @@ +// SPDX-License-Identifier: Apache-2.0 + +package containerstdout + +import ( + "context" + "errors" + "log/slog" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" +) + +// TestContainerstdout_Lifecycle_StartShutdown pins the happy path: +// Start spawns the supplied run function, Shutdown cancels its ctx, +// run returns, Shutdown returns nil. +func TestContainerstdout_Lifecycle_StartShutdown(t *testing.T) { + lc := newLifecycle(slog.Default(), nil) + started := make(chan struct{}) + stopped := make(chan struct{}) + + err := lc.Start(context.Background(), func(ctx context.Context) { + close(started) + <-ctx.Done() + close(stopped) + }) + if err != nil { + t.Fatalf("Start: %v", err) + } + <-started + if err := lc.Shutdown(context.Background()); err != nil { + t.Fatalf("Shutdown: %v", err) + } + select { + case <-stopped: + case <-time.After(time.Second): + t.Fatal("run did not exit within 1s of Shutdown") + } +} + +// TestContainerstdout_Lifecycle_StartTwiceErrors pins: a second Start +// without an intervening Shutdown returns errLifecycleAlreadyStarted. +// The sentinel MUST be errors.Is-comparable so callers don't +// string-match. +func TestContainerstdout_Lifecycle_StartTwiceErrors(t *testing.T) { + lc := newLifecycle(slog.Default(), nil) + if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil { + t.Fatalf("first Start: %v", err) + } + defer func() { _ = lc.Shutdown(context.Background()) }() + err := lc.Start(context.Background(), func(context.Context) {}) + if !errors.Is(err, errLifecycleAlreadyStarted) { + t.Errorf("second Start err: got %v, want errLifecycleAlreadyStarted", err) + } +} + +// TestContainerstdout_Lifecycle_ShutdownIdempotent pins: a second +// Shutdown returns the first call's error (typically nil), not +// "double-close" or panic. +func TestContainerstdout_Lifecycle_ShutdownIdempotent(t *testing.T) { + lc := newLifecycle(slog.Default(), nil) + if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil { + t.Fatalf("Start: %v", err) + } + if err := lc.Shutdown(context.Background()); err != nil { + t.Fatalf("first Shutdown: %v", err) + } + if err := lc.Shutdown(context.Background()); err != nil { + t.Errorf("second Shutdown: got %v, want nil (idempotent)", err) + } +} + +// TestContainerstdout_Lifecycle_PanicCallbackFires pins: if the run +// goroutine panics, the supplied onPanic callback fires exactly once +// with the panic value and the panic is recovered (the process doesn't +// crash). This is the behaviour the containerstdout receiver depends +// on to translate panic → IncError(KindPanic) + degraded-flag flip. +func TestContainerstdout_Lifecycle_PanicCallbackFires(t *testing.T) { + var called atomic.Int32 + var got any + var mu sync.Mutex + lc := newLifecycle(slog.Default(), func(v any) { + called.Add(1) + mu.Lock() + got = v + mu.Unlock() + }) + done := make(chan struct{}) + err := lc.Start(context.Background(), func(context.Context) { + defer close(done) + panic("boom") + }) + if err != nil { + t.Fatalf("Start: %v", err) + } + <-done + // Give safeRun's deferred recover time to invoke the callback after + // run returns; the WaitGroup decrement happens BEFORE the recover's + // callback in some race orderings. + deadline := time.Now().Add(time.Second) + for time.Now().Before(deadline) && called.Load() == 0 { + time.Sleep(5 * time.Millisecond) + } + if called.Load() != 1 { + t.Errorf("onPanic call count: got %d, want 1", called.Load()) + } + mu.Lock() + defer mu.Unlock() + if got != "boom" { + t.Errorf("onPanic payload: got %v, want \"boom\"", got) + } + // Shutdown after panic must not block / error. + if err := lc.Shutdown(context.Background()); err != nil { + t.Errorf("Shutdown after panic: %v", err) + } +} + +// TestContainerstdout_Lifecycle_ShutdownDeadlineReturnsCtxErr pins: a +// run goroutine that ignores ctx cancellation causes Shutdown to return +// the caller's ctx err (wrapping). Containerstdout's run loops all +// honor ctx, so this is purely a contract pin so future authors don't +// silently lose the deadline. +func TestContainerstdout_Lifecycle_ShutdownDeadlineReturnsCtxErr(t *testing.T) { + lc := newLifecycle(slog.Default(), nil) + leak := make(chan struct{}) + if err := lc.Start(context.Background(), func(context.Context) { + <-leak // ignore ctx + }); err != nil { + t.Fatalf("Start: %v", err) + } + t.Cleanup(func() { close(leak) }) + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + err := lc.Shutdown(shutdownCtx) + if !errors.Is(err, context.DeadlineExceeded) { + t.Errorf("Shutdown err: got %v, want wrapped DeadlineExceeded", err) + } +} + +// TestContainerstdout_Lifecycle_AddRegistersUnderSameWaitGroup pins the +// multi-source contract containerstdout needs (vs the slimmer +// single-source siblings that dropped Add): Add'd goroutines are +// bookkept under the SAME WaitGroup, so Shutdown blocks until each one +// returns. A regression that spawns a bare `go func()` instead of +// routing through Add fails here because Shutdown returns before the +// goroutine has observed ctx.Done. Tailers + informer + health/eviction +// loops all rely on this. +func TestContainerstdout_Lifecycle_AddRegistersUnderSameWaitGroup(t *testing.T) { + lc := newLifecycle(slog.Default(), nil) + if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil { + t.Fatalf("Start: %v", err) + } + var addedExited atomic.Bool + added := make(chan struct{}) + lc.Add(func(ctx context.Context) { + close(added) + <-ctx.Done() + // Sleep BEFORE flipping exit bit so a buggy Shutdown that + // doesn't wait for Add'd goroutines races + observes + // addedExited=false. + time.Sleep(20 * time.Millisecond) + addedExited.Store(true) + }) + <-added // wait for Add'd goroutine to be live + if err := lc.Shutdown(context.Background()); err != nil { + t.Fatalf("Shutdown: %v", err) + } + if !addedExited.Load() { + t.Error("Add'd goroutine did not finish before Shutdown returned — WaitGroup not shared") + } +} + +// TestContainerstdout_Lifecycle_AddPanicFiresCallback pins: a panic +// inside an Add'd goroutine MUST be recovered + fire the same onPanic +// callback as a Start'd panic. The containerstdout receiver wires the +// panic callback to IncError(KindPanic) + degraded-flag flip; silently +// swallowing an Add'd panic would hide that one of the per-source +// drivers (tailer, informer, healthLoop) died. +func TestContainerstdout_Lifecycle_AddPanicFiresCallback(t *testing.T) { + var called atomic.Int32 + var got any + var mu sync.Mutex + lc := newLifecycle(slog.Default(), func(v any) { + called.Add(1) + mu.Lock() + got = v + mu.Unlock() + }) + if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil { + t.Fatalf("Start: %v", err) + } + done := make(chan struct{}) + lc.Add(func(context.Context) { + defer close(done) + panic("added-boom") + }) + <-done + deadline := time.Now().Add(time.Second) + for time.Now().Before(deadline) && called.Load() == 0 { + time.Sleep(5 * time.Millisecond) + } + if called.Load() != 1 { + t.Errorf("onPanic call count for Add'd goroutine: got %d, want 1", called.Load()) + } + mu.Lock() + if got != "added-boom" { + t.Errorf("onPanic payload: got %v, want \"added-boom\"", got) + } + mu.Unlock() + if err := lc.Shutdown(context.Background()); err != nil { + t.Errorf("Shutdown after Add panic: %v", err) + } +} + +// TestContainerstdout_Lifecycle_AddBeforeStartIsNoop pins: Add called +// before Start silently no-ops (logged at WARN). Calling Add before +// Start would spawn a goroutine with no cancel hookup — a leak — so +// the helper refuses. +func TestContainerstdout_Lifecycle_AddBeforeStartIsNoop(t *testing.T) { + lc := newLifecycle(slog.Default(), nil) + var spawned atomic.Bool + lc.Add(func(context.Context) { spawned.Store(true) }) + // Give a generous window for any (incorrectly) spawned goroutine + // to flip the bit. + time.Sleep(50 * time.Millisecond) + if spawned.Load() { + t.Error("Add before Start spawned a goroutine; should silently refuse") + } +} + +// TestContainerstdout_Lifecycle_AddAfterShutdownIsNoop pins: Add called +// after Shutdown silently no-ops (logged at WARN). wg.Wait may have +// returned already, so a fresh wg.Add(1) would panic with +// "sync: WaitGroup is reused before previous Wait has returned" if the +// helper didn't guard against this. +func TestContainerstdout_Lifecycle_AddAfterShutdownIsNoop(t *testing.T) { + lc := newLifecycle(slog.Default(), nil) + if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil { + t.Fatalf("Start: %v", err) + } + if err := lc.Shutdown(context.Background()); err != nil { + t.Fatalf("Shutdown: %v", err) + } + var spawned atomic.Bool + lc.Add(func(context.Context) { spawned.Store(true) }) + time.Sleep(50 * time.Millisecond) + if spawned.Load() { + t.Error("Add after Shutdown spawned a goroutine; should silently refuse") + } +} + +// TestContainerstdout_Lifecycle_ConcurrentAddDuringShutdown_NoPanic +// pins the TOCTOU-safety contract: the mutex guard around +// (closed-check, wg.Add(1)) ensures that for an Add racing a concurrent +// Shutdown, exactly one of two outcomes holds: +// +// (a) Add lands before Shutdown flips `closed` → the goroutine is +// registered under the same WaitGroup, Shutdown's cancel reaches +// it, and Shutdown waits for it to exit. No leak. +// (b) Shutdown flips `closed` before Add takes the mutex → Add +// silently no-ops (logged WARN). No `sync: WaitGroup misuse` +// panic from wg.Add-after-wg.Wait-returned. +// +// Without the mutex guard, Add could observe `closed=false`, race the +// lock, call wg.Add(1) after wg.Wait has returned, and panic. This +// test stresses that interleaving with 50 concurrent Adders and a +// single Shutdowner. Must be run under -race to catch the underlying +// data race; the panic itself fires deterministically once the race +// lands. +// +// Naming: receiver-prefix so the test is grep-discoverable across the +// repo even though it lives in containerstdout-internal package. +func TestContainerstdout_Lifecycle_ConcurrentAddDuringShutdown_NoPanic(t *testing.T) { + const adders = 50 + lc := newLifecycle(slog.Default(), nil) + if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil { + t.Fatalf("Start: %v", err) + } + + type addState struct { + registered atomic.Bool + finished atomic.Bool + } + states := make([]*addState, adders) + for i := range states { + states[i] = &addState{} + } + + release := make(chan struct{}) + shutdownGate := make(chan struct{}) + var launched sync.WaitGroup + launched.Add(adders + 1) + + for i := range adders { + go func() { + launched.Done() + <-release + runtime.Gosched() + st := states[i] + lc.Add(func(ctx context.Context) { + st.registered.Store(true) + <-ctx.Done() + st.finished.Store(true) + }) + }() + } + + var shutdownErr error + shutdownDone := make(chan struct{}) + go func() { + launched.Done() + <-release + // Wait for shutdownGate to open before calling Shutdown. The + // gate fires AFTER the adders have been released for a brief + // scheduling window, which guarantees the race window straddles + // adders-in-flight + Shutdown rather than letting the + // shutdowner trivially win and force every Add into the + // closed-guard no-op branch. (Without this, on fast machines + // every adder consistently no-ops, defeating the test's + // TOCTOU-coverage purpose. Kernelevents's parallel test has + // the same brittleness; we tightened the gate here so a + // stress-loop hold-down doesn't flake.) + <-shutdownGate + runtime.Gosched() + shutdownErr = lc.Shutdown(context.Background()) + close(shutdownDone) + }() + + launched.Wait() + close(release) + // Give the adders a short scheduling window to land their Add() + // calls. The window is intentionally tiny — the race detector + + // the scheduler still interleave so SOME adders will no-op (which + // is also a tested outcome) — but it prevents the trivial + // shutdowner-wins-everything degenerate. + time.Sleep(50 * time.Microsecond) + close(shutdownGate) + + select { + case <-shutdownDone: + case <-time.After(5 * time.Second): + t.Fatal("Shutdown did not return within 5s — likely a leaked goroutine joined the WaitGroup but never observed ctx.Done") + } + + if shutdownErr != nil { + t.Errorf("Shutdown err: got %v, want nil (no goroutine should outlive cancel)", shutdownErr) + } + + var registeredCount, finishedCount int + for i, st := range states { + if st.registered.Load() { + registeredCount++ + if st.finished.Load() { + finishedCount++ + } else { + t.Errorf("Add #%d registered but did not finish before Shutdown returned — WaitGroup join missed", i) + } + } + } + if registeredCount == 0 { + t.Errorf("no Add'd goroutines registered across %d attempts — race window collapsed, test no longer exercises the TOCTOU path", adders) + } + t.Logf("registered=%d finished=%d of %d adders (remainder no-op'd via closed-guard)", + registeredCount, finishedCount, adders) +} diff --git a/components/receivers/containerstdout/noop_receiver.go b/components/receivers/containerstdout/noop_receiver.go index 7a78c655..f9f2568b 100644 --- a/components/receivers/containerstdout/noop_receiver.go +++ b/components/receivers/containerstdout/noop_receiver.go @@ -6,11 +6,10 @@ import ( "context" "github.com/tracecoreai/tracecore/internal/pipeline" - "github.com/tracecoreai/tracecore/internal/selftelemetry" ) // noopReceiver is the disabled-path Receiver returned by the factory -// when cfg.Enabled is false. Carrying the selftelemetry handle (even +// when cfg.Enabled is false. Carrying the telemetry handle (even // though it's never written from this struct) keeps the wiring // uniform - Phase 12 may extend the disabled path to record a // "receiver_disabled" emission, and the factory wiring already @@ -22,15 +21,15 @@ import ( type noopReceiver struct { pipeline.ComponentState - telemetry selftelemetry.Receiver + telemetry Telemetry } // newNoopReceiver wires the disabled-path receiver. The factory // always supplies a non-nil telemetry value (noop or real); guard // here so a direct caller doesn't panic. -func newNoopReceiver(telemetry selftelemetry.Receiver) *noopReceiver { +func newNoopReceiver(telemetry Telemetry) *noopReceiver { if telemetry == nil { - telemetry = selftelemetry.NewNoopReceiver() + telemetry = NewNoopTelemetry() } return &noopReceiver{telemetry: telemetry} } diff --git a/components/receivers/containerstdout/pipeline.go b/components/receivers/containerstdout/pipeline.go index 7f72de01..709b75dd 100644 --- a/components/receivers/containerstdout/pipeline.go +++ b/components/receivers/containerstdout/pipeline.go @@ -7,8 +7,6 @@ import ( "errors" "log/slog" "time" - - "github.com/tracecoreai/tracecore/internal/selftelemetry" ) // runTailerPipeline drains lines from a single Tailer through the full @@ -73,7 +71,7 @@ func (r *containerStdoutReceiver) handleLine( func (r *containerStdoutReceiver) parseAndStitch(line TailedLine) ([]byte, time.Time, StreamType, bool) { ts, stream, tag, msg, err := parseCRILine(line.Bytes) if err != nil { - r.telemetry.IncError(selftelemetry.KindParse) + r.telemetry.IncError(KindParse) return nil, time.Time{}, StreamUnknown, false } body, ok := r.stitcher.Add(line.Fingerprint, tag, msg) @@ -90,7 +88,7 @@ func (r *containerStdoutReceiver) parseAndStitch(line TailedLine) ([]byte, time. func (r *containerStdoutReceiver) attributeAndEnrich(pathInfo PathInfo, body []byte) (AttributionRef, bool) { ref, err := r.attribution.Lookup(pathInfo) if err != nil { - r.telemetry.IncError(selftelemetry.KindRead) + r.telemetry.IncError(KindRead) return AttributionRef{}, false } // Body-match runs even when ref.Found=false to cover the @@ -137,7 +135,7 @@ func (r *containerStdoutReceiver) emitRecord( } logs, dropped := newLogsForRecord(rec, maxAttrs, jsonAttrs, r.set.Telemetry.Resource) if dropped > 0 { - r.telemetry.IncError(selftelemetry.KindCardinality) + r.telemetry.IncError(KindCardinality) } start := time.Now() @@ -145,7 +143,7 @@ func (r *containerStdoutReceiver) emitRecord( if errors.Is(err, context.Canceled) { return } - r.telemetry.IncError(selftelemetry.KindDownstream) + r.telemetry.IncError(KindDownstream) return } r.telemetry.ObserveLatency(time.Since(start)) diff --git a/components/receivers/containerstdout/pipeline_test.go b/components/receivers/containerstdout/pipeline_test.go index f4769f22..9ea97853 100644 --- a/components/receivers/containerstdout/pipeline_test.go +++ b/components/receivers/containerstdout/pipeline_test.go @@ -22,7 +22,6 @@ import ( "github.com/tracecoreai/tracecore/internal/consumer" "github.com/tracecoreai/tracecore/internal/pipeline/pipelinetest" - "github.com/tracecoreai/tracecore/internal/selftelemetry" ) // testRankRegex is the body-match regex shared by the @@ -90,7 +89,7 @@ func (c *fakeConsumer) recordCount() int { type pipelineHarness struct { r *containerStdoutReceiver cons *fakeConsumer - tel *selftelemetry.CapturingReceiver + tel *CapturingTelemetry logRoot string } @@ -122,7 +121,7 @@ func newPipelineHarness(t *testing.T, pods []*corev1.Pod, cfgMods func(*Config)) } require.NoError(t, cfg.Validate()) - tel := selftelemetry.NewCapturingReceiver() + tel := NewCapturingTelemetry() cons := &fakeConsumer{} r := newReceiver(fx.CreateSettings, cfg, cons, tel) require.NotNil(t, r) diff --git a/components/receivers/containerstdout/ratelimit.go b/components/receivers/containerstdout/ratelimit.go index 98d92f5f..b3d18e90 100644 --- a/components/receivers/containerstdout/ratelimit.go +++ b/components/receivers/containerstdout/ratelimit.go @@ -8,8 +8,6 @@ import ( "time" "golang.org/x/time/rate" - - "github.com/tracecoreai/tracecore/internal/selftelemetry" ) // RateLimiter caps egress per (namespace, rank) via independent @@ -35,7 +33,7 @@ type RateLimiter struct { nsBudgets map[string]NamespaceBudget cap int evictAfter time.Duration - telemetry selftelemetry.Receiver + telemetry Telemetry // now is the clock injection point for tests. Production uses // time.Now; EvictIdle tests overwrite via the package-private @@ -81,13 +79,13 @@ func bucketEntryOf(el *list.Element) *bucketEntry { // // A nil telemetry sink is replaced with a no-op so Allow never panics // on a missing receiver - defensive symmetry with attribution.Cache. -func NewRateLimiter(cfg EgressRateLimitConfig, telemetry selftelemetry.Receiver) *RateLimiter { +func NewRateLimiter(cfg EgressRateLimitConfig, telemetry Telemetry) *RateLimiter { capN := cfg.LRUCap if capN <= 0 { capN = 1 } if telemetry == nil { - telemetry = selftelemetry.NewNoopReceiver() + telemetry = NewNoopTelemetry() } return &RateLimiter{ defaultRate: rate.Limit(cfg.Rate), diff --git a/components/receivers/containerstdout/ratelimit_test.go b/components/receivers/containerstdout/ratelimit_test.go index 6b66fe8b..b6a35696 100644 --- a/components/receivers/containerstdout/ratelimit_test.go +++ b/components/receivers/containerstdout/ratelimit_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/require" "github.com/tracecoreai/tracecore/components/receivers/containerstdout" - "github.com/tracecoreai/tracecore/internal/selftelemetry" ) // TestRateLimiter_AllowsUnderRate verifies that with burst=10, ten @@ -26,7 +25,7 @@ func TestRateLimiter_AllowsUnderRate(t *testing.T) { LRUCap: 128, LRUEvictAfter: time.Minute, } - tel := selftelemetry.NewCapturingReceiver() + tel := containerstdout.NewCapturingTelemetry() rl := containerstdout.NewRateLimiter(cfg, tel) for i := 0; i < 10; i++ { @@ -46,7 +45,7 @@ func TestRateLimiter_RejectsOverBurst(t *testing.T) { LRUCap: 16, LRUEvictAfter: time.Minute, } - tel := selftelemetry.NewCapturingReceiver() + tel := containerstdout.NewCapturingTelemetry() rl := containerstdout.NewRateLimiter(cfg, tel) require.True(t, rl.Allow("ns", 0)) @@ -65,7 +64,7 @@ func TestRateLimiter_BackpressureKindRecorded(t *testing.T) { LRUCap: 16, LRUEvictAfter: time.Minute, } - tel := selftelemetry.NewCapturingReceiver() + tel := containerstdout.NewCapturingTelemetry() rl := containerstdout.NewRateLimiter(cfg, tel) require.True(t, rl.Allow("ns", 0)) @@ -73,7 +72,7 @@ func TestRateLimiter_BackpressureKindRecorded(t *testing.T) { require.False(t, rl.Allow("ns", 0)) require.Equal(t, - []selftelemetry.Kind{containerstdout.KindBackpressureDrop}, + []containerstdout.Kind{containerstdout.KindBackpressureDrop}, tel.Errors(), ) } @@ -89,7 +88,7 @@ func TestRateLimiter_PerKeyIsolation(t *testing.T) { LRUCap: 16, LRUEvictAfter: time.Minute, } - tel := selftelemetry.NewCapturingReceiver() + tel := containerstdout.NewCapturingTelemetry() rl := containerstdout.NewRateLimiter(cfg, tel) require.True(t, rl.Allow("a", 0)) @@ -115,7 +114,7 @@ func TestRateLimiter_NamespaceBudgetOverride(t *testing.T) { }, LRUEvictAfter: time.Minute, } - tel := selftelemetry.NewCapturingReceiver() + tel := containerstdout.NewCapturingTelemetry() rl := containerstdout.NewRateLimiter(cfg, tel) // Default ns: burst=2, third call rejects. @@ -142,7 +141,7 @@ func TestRateLimiter_LRUEvictsOldestOnCap(t *testing.T) { LRUCap: 2, LRUEvictAfter: time.Minute, } - tel := selftelemetry.NewCapturingReceiver() + tel := containerstdout.NewCapturingTelemetry() rl := containerstdout.NewRateLimiter(cfg, tel) require.True(t, rl.Allow("a", 0)) @@ -151,7 +150,7 @@ func TestRateLimiter_LRUEvictsOldestOnCap(t *testing.T) { require.True(t, rl.Allow("c", 0)) require.Equal(t, - []selftelemetry.Kind{containerstdout.KindRateLimitCardinality}, + []containerstdout.Kind{containerstdout.KindRateLimitCardinality}, tel.Errors(), "admitting past cap records exactly one cardinality event", ) @@ -170,7 +169,7 @@ func TestRateLimiter_EvictIdleByTime(t *testing.T) { LRUCap: 128, LRUEvictAfter: 5 * time.Minute, } - tel := selftelemetry.NewCapturingReceiver() + tel := containerstdout.NewCapturingTelemetry() rl := containerstdout.NewRateLimiter(cfg, tel) // Fake clock at t0. @@ -201,7 +200,7 @@ func TestRateLimiter_ConcurrentAllow_NoRace(t *testing.T) { LRUCap: 64, LRUEvictAfter: time.Minute, } - tel := selftelemetry.NewCapturingReceiver() + tel := containerstdout.NewCapturingTelemetry() rl := containerstdout.NewRateLimiter(cfg, tel) var wg sync.WaitGroup @@ -232,7 +231,7 @@ func TestRateLimiter_LRURecencyOnAllow(t *testing.T) { LRUCap: 3, LRUEvictAfter: time.Minute, } - tel := selftelemetry.NewCapturingReceiver() + tel := containerstdout.NewCapturingTelemetry() rl := containerstdout.NewRateLimiter(cfg, tel) require.True(t, rl.Allow("A", 0)) @@ -247,7 +246,7 @@ func TestRateLimiter_LRURecencyOnAllow(t *testing.T) { require.True(t, rl.Allow("D", 0)) require.Equal(t, 3, containerstdout.RateLimiterLenForTest(rl)) require.Equal(t, - []selftelemetry.Kind{containerstdout.KindRateLimitCardinality}, + []containerstdout.Kind{containerstdout.KindRateLimitCardinality}, tel.Errors(), ) diff --git a/components/receivers/containerstdout/receiver.go b/components/receivers/containerstdout/receiver.go index 888e514e..f5d35c80 100644 --- a/components/receivers/containerstdout/receiver.go +++ b/components/receivers/containerstdout/receiver.go @@ -20,8 +20,6 @@ import ( "github.com/tracecoreai/tracecore/internal/consumer" "github.com/tracecoreai/tracecore/internal/pipeline" - "github.com/tracecoreai/tracecore/internal/runtime/lifecycle" - "github.com/tracecoreai/tracecore/internal/selftelemetry" ) // healthLoopInterval is the cadence on which the receiver recomputes @@ -88,7 +86,7 @@ type containerStdoutReceiver struct { set pipeline.CreateSettings cfg *Config next consumer.Logs - telemetry selftelemetry.Receiver + telemetry Telemetry // Composed components. CursorStore, RateLimiter, BodyMatcher, // and DataloaderExtractor are constructed in newReceiver (they @@ -115,8 +113,8 @@ type containerStdoutReceiver struct { tailers map[string]*Tailer // linesEmitted is the running per-15s lines-emitted count, drained - // by healthLoop into a log line (the selftelemetry interface - // doesn't expose a Gauge today, so the value is published via the + // by healthLoop into a log line (the Telemetry interface doesn't + // expose a Gauge today, so the value is published via the // structured log + IncEmissions counter - the rate is derivable // from the cumulative emissions metric). linesEmitted atomic.Int64 @@ -124,7 +122,7 @@ type containerStdoutReceiver struct { // Lifecycle orchestrator: owns cancel + WaitGroup + panic // recovery for the long-lived informer goroutine and the // receiver's own health/eviction loops. - lc *lifecycle.Lifecycle + lc *lifecycle // clientBuilder is the k8s client constructor seam. Production // uses defaultClientBuilder; tests override directly via field @@ -134,9 +132,9 @@ type containerStdoutReceiver struct { clientBuilder clientBuilder // Per-source degraded flags. The receiver-wide flag exposed via - // selftelemetry.SetDegraded is the OR of these three; a one- - // sided failure must keep the receiver degraded even when the - // other sources are green. Mirrors k8sevents.recomputeDegraded. + // Telemetry.SetDegraded is the OR of these three; a one-sided + // failure must keep the receiver degraded even when the other + // sources are green. Mirrors k8sevents.recomputeDegraded. // // Guarded by mu (NOT atomic.Bool) because recomputeDegraded must // read all three under a consistent snapshot to avoid an @@ -166,9 +164,9 @@ type containerStdoutReceiver struct { // for an operator who can otherwise see fresh logs but loses one // restart's worth of resume-from-offset capability; the alpha-stability // trade-off favours degraded availability over hard refusal. -func newReceiver(set pipeline.CreateSettings, cfg *Config, next consumer.Logs, telemetry selftelemetry.Receiver) *containerStdoutReceiver { +func newReceiver(set pipeline.CreateSettings, cfg *Config, next consumer.Logs, telemetry Telemetry) *containerStdoutReceiver { if telemetry == nil { - telemetry = selftelemetry.NewNoopReceiver() + telemetry = NewNoopTelemetry() } maxLine := cfg.MaxLogSize if maxLine <= 0 { @@ -234,8 +232,8 @@ func newReceiver(set pipeline.CreateSettings, cfg *Config, next consumer.Logs, t // receiver degraded regardless of which goroutine crashed - // recovery is the next successful health-loop tick (which // clears bits a healthy source reports clear). - r.lc = lifecycle.New(r.logger(), func(_ any) { - r.telemetry.IncError(selftelemetry.KindPanic) + r.lc = newLifecycle(r.logger(), func(_ any) { + r.telemetry.IncError(KindPanic) r.mu.Lock() r.degradedInformer = true r.degradedCursor = true @@ -559,15 +557,15 @@ func (r *containerStdoutReceiver) healthLoop(ctx context.Context) { } // recomputeDegraded ORs the three per-source flags and pushes the -// resulting receiver-wide flag through selftelemetry.SetDegraded ONLY -// on a state change. Mirrors k8sevents.recomputeDegraded exactly so +// resulting receiver-wide flag through Telemetry.SetDegraded ONLY on a +// state change. Mirrors k8sevents.recomputeDegraded exactly so // cross-receiver alerts that key on degraded transitions share the // same edge-trigger semantics. // // Called whenever any per-source flag transitions (via the // setDegradedX helpers) and on the health-loop tick (so a flag flipped // from a goroutine that didn't call back through recomputeDegraded -// still reaches selftelemetry eventually). +// still reaches Telemetry eventually). func (r *containerStdoutReceiver) recomputeDegraded() { r.mu.Lock() now := r.degradedInformer || r.degradedCursor || r.degradedTailers diff --git a/components/receivers/containerstdout/receiver_test.go b/components/receivers/containerstdout/receiver_test.go index fe87c1f1..ea2ca187 100644 --- a/components/receivers/containerstdout/receiver_test.go +++ b/components/receivers/containerstdout/receiver_test.go @@ -17,7 +17,6 @@ import ( "github.com/tracecoreai/tracecore/internal/consumer" "github.com/tracecoreai/tracecore/internal/pipeline" "github.com/tracecoreai/tracecore/internal/pipeline/pipelinetest" - "github.com/tracecoreai/tracecore/internal/selftelemetry" ) // testNopConsumer is a Logs consumer that drops everything. Used by @@ -45,7 +44,7 @@ func newReceiverWithFake(t *testing.T, cfgMods func(*Config)) (*containerStdoutR } require.NoError(t, cfg.Validate(), "test config must validate") - tel := selftelemetry.NewCapturingReceiver() + tel := NewCapturingTelemetry() r := newReceiver(fx.CreateSettings, cfg, testNopConsumer{}, tel) require.NotNil(t, r) @@ -103,7 +102,7 @@ func TestReceiver_DegradedFlagsCompose(t *testing.T) { t.Setenv("NODE_NAME", "test-node") r, _ := newReceiverWithFake(t, nil) - tel, ok := r.telemetry.(*selftelemetry.CapturingReceiver) + tel, ok := r.telemetry.(*CapturingTelemetry) require.True(t, ok, "test fixture must provide CapturingReceiver telemetry") // Initially all-false: recompute should leave the flag at false. @@ -263,7 +262,7 @@ func TestReceiver_CursorMkdirFailureSetsDegraded(t *testing.T) { cfg.Cursor.Dir = "" require.NoError(t, cfg.Validate()) - tel := selftelemetry.NewCapturingReceiver() + tel := NewCapturingTelemetry() r := newReceiver(fx.CreateSettings, cfg, testNopConsumer{}, tel) require.NotNil(t, r) require.Nil(t, r.cursors, "empty Dir → CursorStore construction must fail") diff --git a/components/receivers/containerstdout/selftel.go b/components/receivers/containerstdout/selftel.go new file mode 100644 index 00000000..c63c28f1 --- /dev/null +++ b/components/receivers/containerstdout/selftel.go @@ -0,0 +1,460 @@ +// SPDX-License-Identifier: Apache-2.0 + +// Receiver-scoped self-telemetry surface. Replaces the v0.1.x +// dependency on `internal/selftelemetry`, which is slated for deletion +// in RFC-0013 PR-F. Metric names + label shape are preserved +// (`tracecore.receiver.errors_total{kind,component_id}` and siblings) +// so dashboards / alerts don't regress. The instrumentation scope name +// is THIS receiver's Go import path — when the receiver moves to its +// own submodule in PR-I.3, the scope name moves with it, matching OTel +// convention. +// +// Mirrors the kernelevents sibling design — multi-source (tailers + +// informer), so this surface is shared per-receiver across every +// tailer + the PodInformer; per-kind increments fan into the same +// component_id'd counter. +// +// Receiver-local Kinds (KindRotationStalled, KindCursorWriteFailed, +// KindBackpressureDrop, KindFingerprintCardinality, +// KindAttributionCardinality, KindRateLimitCardinality, KindWatch) +// are co-located in the same const block as the canonical mirrors +// (KindParse, KindRead, KindCardinality, KindDownstream, KindPanic) +// per the dcgm sibling pattern: one place to grep for "every kind +// containerstdout emits." +// +// EXPORTED surface: unlike the kernelevents sibling (which keeps +// `selfTelemetry` + `kind` unexported), containerstdout's constructors +// (NewCursorStore, NewCache, NewRateLimiter, NewPodInformer, TailerOptions) +// are PUBLIC and accept the telemetry sink as a parameter. External +// _test packages (containerstdout_test) construct receivers via these +// APIs, so the Telemetry interface + the Kind type + the +// CapturingTelemetry test helper MUST be exported for the migration to +// preserve the test surface without rewriting every external test into +// the internal package. + +package containerstdout + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/tracecoreai/tracecore/internal/pipeline" +) + +// Kind is a low-cardinality error-class identifier. Named type catches +// renames + typos at compile time so they can't silently drift +// `tracecore_receiver_errors_total{kind=…}`. Never derive a Kind from +// an error message — it explodes cardinality. +type Kind string + +// Canonical mirrors of internal/selftelemetry's Kind* set, scoped here +// so containerstdout has zero dependency on the deleted package. Only +// the subset this receiver actually emits is mirrored — KindConnect, +// KindEnumerate, KindInit are not used by containerstdout and stay +// off this surface. +const ( + // KindParse — CRI line failed structural parse (malformed timestamp, + // missing stream marker). + KindParse Kind = "parse" + // KindRead — attribution-source indexer call failed transiently. + // Mirrors the canonical "read from open source failed" semantics. + KindRead Kind = "read" + // KindCardinality — emit-side dropped attributes to enforce the + // per-record cap. + KindCardinality Kind = "cardinality" + // KindDownstream — next consumer's ConsumeLogs returned an error + // (other than context.Canceled). + KindDownstream Kind = "downstream" + // KindPanic — recovered panic in a lifecycle-spawned goroutine. + KindPanic Kind = "panic" +) + +// Receiver-local Kind values for failures the canonical set doesn't +// cover. containerstdout-specific; do not promote to a shared package +// without an RFC amendment. Mirrors the dcgm + kernelevents receiver +// pattern so cross-receiver dashboards on +// `tracecore_receiver_errors_total{kind=…}` share vocabulary. +// +// Operators see these in RUNBOOK.md's error-message index and in +// alert rules (prometheus-alerts.example.yaml). +const ( + // KindRotationStalled — a file open in the tailer hasn't grown for + // longer than the rotation-detect window; cursor write retries are + // running against a likely-deleted inode. + KindRotationStalled Kind = "rotation_stalled" + // KindCursorWriteFailed — atomic-rename cursor write failed. + // Increment once per failed checkpoint attempt; bounded retry in + // the lifecycle. + KindCursorWriteFailed Kind = "cursor_write_failed" + // KindBackpressureDrop — tailer→emit channel was full; the dropped + // line is also counted in the per-key dropped_lines gauge for + // tail-receiver SLO accounting. + KindBackpressureDrop Kind = "backpressure_drop" + // KindFingerprintCardinality — the fingerprint→stream identity LRU + // rolled over a key; future scan may double-emit lines on the + // evicted stream until the cursor resyncs. + KindFingerprintCardinality Kind = "fingerprint_cardinality" + // KindAttributionCardinality — the per-(pod_uid, container) + // attribution LRU rolled over; emissions for the evicted key fall + // back to namespace+container until the informer re-keys. + KindAttributionCardinality Kind = "attribution_cardinality" + // KindRateLimitCardinality — the per-(pod_uid, container) + // rate-limit LRU rolled over; the evicted key emits without budget + // until the namespace fallback claims it. + KindRateLimitCardinality Kind = "rate_limit_cardinality" + // KindWatch — the per-node Pod informer reported a watch error. + // Set degraded; retries are driven by client-go backoff. + KindWatch Kind = "watch" +) + +// reasonInstrumentRegister labels init_errors_total ticks when OTel +// instrument registration failed at construction time. +const reasonInstrumentRegister = "instrument_register" + +// instrumentationScope pins the OTel scope name. Per OTel convention, +// the scope is the package's Go import path; PR-I.3 changes this when +// the receiver moves to its own submodule. +const instrumentationScope = "github.com/tracecoreai/tracecore/components/receivers/containerstdout" + +// ErrNilMeterProvider mirrors selftelemetry.ErrNilMeterProvider — the +// factory is responsible for substituting the noop fallback + ticking +// init_errors_total. Returning a sentinel rather than a generic error +// lets the factory distinguish "wire-up bug" from "instrument register +// failure." +var ErrNilMeterProvider = errors.New("containerstdout: MeterProvider is nil") + +// Telemetry is the receiver-scoped self-health surface. Methods are +// non-blocking + safe for concurrent use; the noop impl discards. +// Mirrors the trimmed internal/selftelemetry.Receiver interface. Shared +// between every tailer + the PodInformer + the receiver itself so per- +// kind increments fan into the same component_id'd counter. +type Telemetry interface { + IncError(k Kind) + IncEmissions(n int64) + ObserveLatency(d time.Duration) + SetDegraded(degraded bool) + MarkActivity() +} + +// noopTelemetry discards every call. +type noopTelemetry struct{} + +// NewNoopTelemetry returns a Telemetry whose methods discard. Use in +// tests + as the factory fallback when the runtime didn't supply a +// MeterProvider. Exported so external _test packages (containerstdout_test) +// can substitute when they don't need to assert on telemetry. +func NewNoopTelemetry() Telemetry { return noopTelemetry{} } + +func (noopTelemetry) IncError(Kind) {} +func (noopTelemetry) IncEmissions(int64) {} +func (noopTelemetry) ObserveLatency(time.Duration) {} +func (noopTelemetry) SetDegraded(bool) {} +func (noopTelemetry) MarkActivity() {} + +var _ Telemetry = noopTelemetry{} + +// NewTelemetry returns a real Telemetry backed by OTel metric +// instruments acquired from mp. The component's id is attached as the +// `component_id` label on every emission. Registers the same five +// instruments the v0.1.x internal selftelemetry package registered, so +// scraped metric names + label shape are unchanged across the migration. +// +// Exported because the factory calls it; renaming to lowercase would +// move the factory's CreateLogs into selftel.go (which it doesn't +// belong in) or force a per-receiver internal package, neither of +// which is worth the symbol-hiding gain when the factory is the only +// non-test caller. +func NewTelemetry(id pipeline.ID, mp metric.MeterProvider) (Telemetry, error) { + if mp == nil { + return nil, ErrNilMeterProvider + } + meter := mp.Meter(instrumentationScope) + attrSet := attribute.NewSet(attribute.String("component_id", id.String())) + + errsCtr, err := meter.Int64Counter( + "tracecore.receiver.errors_total", + metric.WithDescription("Errors observed by a receiver, partitioned by kind"), + ) + if err != nil { + return nil, fmt.Errorf("errors_total counter: %w", err) + } + emissionsCtr, err := meter.Int64Counter( + "tracecore.receiver.emissions_total", + metric.WithDescription("Data points / events emitted by a receiver"), + ) + if err != nil { + return nil, fmt.Errorf("emissions_total counter: %w", err) + } + latencyHist, err := meter.Float64Histogram( + "tracecore.receiver.collection_latency_seconds", + metric.WithDescription("Receiver collection cycle latency in seconds"), + metric.WithUnit("s"), + // Bucket boundaries chosen for sub-millisecond per-record emit + // cycles up to 10s slow paths; mirrors the internal/selftelemetry + // shape so histograms remain comparable across the migration. + metric.WithExplicitBucketBoundaries( + 0.0001, 0.001, 0.005, 0.01, 0.05, + 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, + ), + ) + if err != nil { + return nil, fmt.Errorf("collection_latency_seconds histogram: %w", err) + } + + st := &telemetryImpl{ + componentID: id.String(), + attrs: attrSet, + errors: errsCtr, + emissions: emissionsCtr, + latency: latencyHist, + } + // Seed last-activity to construction time so a + // `time() - last_activity > N` alert doesn't fire on the zero-valued + // gauge during boot. + st.activityUnix.Store(time.Now().Unix()) + + if _, err := meter.Float64ObservableCounter( + "tracecore.receiver.degraded_seconds_total", + metric.WithDescription("Cumulative seconds the receiver has been in the degraded state"), + metric.WithUnit("s"), + metric.WithFloat64Callback(func(_ context.Context, obs metric.Float64Observer) error { + obs.Observe(st.degradedTotalSeconds(), metric.WithAttributeSet(attrSet)) + return nil + }), + ); err != nil { + return nil, fmt.Errorf("degraded_seconds_total observable: %w", err) + } + + if _, err := meter.Int64ObservableGauge( + "tracecore.receiver.last_activity_unix_seconds", + metric.WithDescription("Unix-second timestamp of the receiver's last successful activity"), + metric.WithInt64Callback(func(_ context.Context, obs metric.Int64Observer) error { + obs.Observe(st.activityUnix.Load(), metric.WithAttributeSet(attrSet)) + return nil + }), + ); err != nil { + return nil, fmt.Errorf("last_activity_unix_seconds observable: %w", err) + } + + return st, nil +} + +var _ Telemetry = (*telemetryImpl)(nil) + +type telemetryImpl struct { + componentID string + attrs attribute.Set + errors metric.Int64Counter + emissions metric.Int64Counter + latency metric.Float64Histogram + + // degradedAt holds the time of the most recent SetDegraded(true); + // nil pointer = not currently degraded. Atomic so SetDegraded is + // lock-free and the observable callback reads a stable snapshot. + degradedAt atomic.Pointer[time.Time] + + // accumulated holds nanoseconds spent degraded across completed + // degrade→recover cycles; degradedTotalSeconds adds the open + // interval at observation time. + accumulated atomic.Uint64 + + // activityUnix holds the Unix-second timestamp of the most recent + // MarkActivity (seeded to construction time). + activityUnix atomic.Int64 +} + +func (s *telemetryImpl) IncError(k Kind) { + // Emit component_id + kind in one WithAttributes call rather than + // merging two attribute sets — avoids relying on SDK merge semantics + // that vary across OTel versions. + s.errors.Add(context.Background(), 1, metric.WithAttributes( + attribute.String("component_id", s.componentID), + attribute.String("kind", string(k)), + )) +} + +func (s *telemetryImpl) IncEmissions(n int64) { + if n < 0 { + return + } + s.emissions.Add(context.Background(), n, metric.WithAttributeSet(s.attrs)) +} + +func (s *telemetryImpl) ObserveLatency(d time.Duration) { + s.latency.Record(context.Background(), d.Seconds(), metric.WithAttributeSet(s.attrs)) +} + +// SetDegraded transitions degraded state. Lock-free: enter via +// CAS(nil → &now), exit via Swap → nil + accumulate the elapsed +// interval. Microsecond-scale under-count on concurrent transitions is +// tolerated; self-corrects on the next scrape. +func (s *telemetryImpl) SetDegraded(degraded bool) { + if degraded { + now := time.Now() + s.degradedAt.CompareAndSwap(nil, &now) + return + } + if old := s.degradedAt.Swap(nil); old != nil { + elapsed := time.Since(*old) + if elapsed > 0 { + s.accumulated.Add(uint64(elapsed.Nanoseconds())) + } + } +} + +func (s *telemetryImpl) MarkActivity() { + s.activityUnix.Store(time.Now().Unix()) +} + +func (s *telemetryImpl) degradedTotalSeconds() float64 { + acc := time.Duration(s.accumulated.Load()) + if openStart := s.degradedAt.Load(); openStart != nil { + acc += time.Since(*openStart) + } + return acc.Seconds() +} + +// recordInitError ticks tracecore.selftelemetry.init_errors_total when +// receiver wiring falls back to noop telemetry. Operators alert on +// `> 0` to learn that self-telemetry isn't really plugged in. Panics +// from a broken MeterProvider are swallowed — recordInitError IS the +// degraded-path fallback; crashing here would turn a partial outage +// into a process kill. +func recordInitError(ctx context.Context, mp metric.MeterProvider, kindLabel, componentID, reason string) { + defer func() { _ = recover() }() + if mp == nil { + return + } + meter := mp.Meter(instrumentationScope) + c, err := meter.Int64Counter( + "tracecore.selftelemetry.init_errors_total", + metric.WithDescription("Counter of self-telemetry construction failures that fell back to the noop implementation."), + ) + if err != nil { + return + } + c.Add(ctx, 1, metric.WithAttributes( + attribute.String("kind", kindLabel), + attribute.String("component_id", componentID), + attribute.String("reason", reason), + )) +} + +// CapturingTelemetry records every call for test assertion. Use in +// receiver unit tests when you want to verify the receiver calls the +// right self-telemetry method with the right kind/value. +// +// Concurrent-safe; tests spawning receiver goroutines can read the +// accessor methods from the main test goroutine. Mirrors the +// internal/selftelemetry.CapturingReceiver surface so external _test +// packages can swap `selftelemetry.NewCapturingReceiver()` → +// `containerstdout.NewCapturingTelemetry()` mechanically. +// +// Lives in the production file (not _test.go) so external test +// packages can import it without a build-tag dance — selftel.go has +// no test-only deps so the production binary cost is just the struct's +// vtable, which Go's deadcode pass strips. +type CapturingTelemetry struct { + mu sync.Mutex + errors []Kind + emissions []int64 + latencies []time.Duration + degraded []bool + activityHits int +} + +// NewCapturingTelemetry returns a CapturingTelemetry ready for use. +func NewCapturingTelemetry() *CapturingTelemetry { return &CapturingTelemetry{} } + +// IncError records the kind. +func (c *CapturingTelemetry) IncError(k Kind) { + c.mu.Lock() + defer c.mu.Unlock() + c.errors = append(c.errors, k) +} + +// IncEmissions records n. Negative values are discarded per the +// Telemetry contract. +func (c *CapturingTelemetry) IncEmissions(n int64) { + if n < 0 { + return + } + c.mu.Lock() + defer c.mu.Unlock() + c.emissions = append(c.emissions, n) +} + +// ObserveLatency records d. +func (c *CapturingTelemetry) ObserveLatency(d time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.latencies = append(c.latencies, d) +} + +// SetDegraded records every transition value (does not collapse no-op +// repeats — tests see exactly what the receiver called). +func (c *CapturingTelemetry) SetDegraded(degraded bool) { + c.mu.Lock() + defer c.mu.Unlock() + c.degraded = append(c.degraded, degraded) +} + +// MarkActivity increments the activity counter. +func (c *CapturingTelemetry) MarkActivity() { + c.mu.Lock() + defer c.mu.Unlock() + c.activityHits++ +} + +// Errors returns the Kind argument of every IncError call in order. +// Returned slice is a copy; callers may mutate freely. +func (c *CapturingTelemetry) Errors() []Kind { + c.mu.Lock() + defer c.mu.Unlock() + out := make([]Kind, len(c.errors)) + copy(out, c.errors) + return out +} + +// Emissions returns the n value of every IncEmissions call. +func (c *CapturingTelemetry) Emissions() []int64 { + c.mu.Lock() + defer c.mu.Unlock() + out := make([]int64, len(c.emissions)) + copy(out, c.emissions) + return out +} + +// Latencies returns every ObserveLatency duration. +func (c *CapturingTelemetry) Latencies() []time.Duration { + c.mu.Lock() + defer c.mu.Unlock() + out := make([]time.Duration, len(c.latencies)) + copy(out, c.latencies) + return out +} + +// DegradedTransitions returns every SetDegraded argument in order. +func (c *CapturingTelemetry) DegradedTransitions() []bool { + c.mu.Lock() + defer c.mu.Unlock() + out := make([]bool, len(c.degraded)) + copy(out, c.degraded) + return out +} + +// ActivityHits returns the cumulative MarkActivity call count. +func (c *CapturingTelemetry) ActivityHits() int { + c.mu.Lock() + defer c.mu.Unlock() + return c.activityHits +} + +var _ Telemetry = (*CapturingTelemetry)(nil) diff --git a/components/receivers/containerstdout/selftel_test.go b/components/receivers/containerstdout/selftel_test.go new file mode 100644 index 00000000..281a08a0 --- /dev/null +++ b/components/receivers/containerstdout/selftel_test.go @@ -0,0 +1,409 @@ +// SPDX-License-Identifier: Apache-2.0 + +package containerstdout + +import ( + "context" + "errors" + "fmt" + "strings" + "testing" + "time" + + "go.opentelemetry.io/otel/attribute" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + + "github.com/tracecoreai/tracecore/internal/pipeline" +) + +// newTestMeterProvider builds an SDK MeterProvider backed by a +// ManualReader so tests can collect metricdata.ResourceMetrics +// deterministically without the Prometheus exporter or any +// internal/telemetry plumbing — the receiver package must stay +// decoupled from internal/* so PR-F can delete those packages without +// touching this test file. +func newTestMeterProvider(t *testing.T) (*sdkmetric.MeterProvider, *sdkmetric.ManualReader) { + t.Helper() + rdr := sdkmetric.NewManualReader() + mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)) + t.Cleanup(func() { _ = mp.Shutdown(context.Background()) }) + return mp, rdr +} + +func collectRM(t *testing.T, rdr *sdkmetric.ManualReader) metricdata.ResourceMetrics { + t.Helper() + var rm metricdata.ResourceMetrics + if err := rdr.Collect(context.Background(), &rm); err != nil { + t.Fatalf("collect: %v", err) + } + return rm +} + +// findInstrument returns the first metricdata.Metrics whose Name matches +// the supplied OTel-dot name. Returns (nil, false) if absent. +// Scope-agnostic: walks all scope metrics. +func findInstrument(rm metricdata.ResourceMetrics, name string) (metricdata.Metrics, bool) { + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == name { + return m, true + } + } + } + return metricdata.Metrics{}, false +} + +// scopeOf returns the instrumentation scope name that emitted the +// supplied metric name. Used to pin the scope-name standard. +func scopeOf(rm metricdata.ResourceMetrics, name string) (string, bool) { + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == name { + return sm.Scope.Name, true + } + } + } + return "", false +} + +// kvMatch returns true if every want key's value matches the int64 +// datapoint's attribute set. +func kvMatch(dp metricdata.DataPoint[int64], want map[string]string) bool { + for k, v := range want { + got, ok := dp.Attributes.Value(attribute.Key(k)) + if !ok || got.AsString() != v { + return false + } + } + return true +} + +func selftelTestID() pipeline.ID { + return pipeline.MustNewID(pipeline.MustNewType("containerstdout"), "test") +} + +func dumpNames(rm metricdata.ResourceMetrics) string { + var b strings.Builder + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + fmt.Fprintf(&b, " %s@%s", m.Name, sm.Scope.Name) + } + } + return b.String() +} + +// TestSelfTel_NoopAlwaysSafe pins: NewNoopTelemetry returns a value +// whose hot-path methods never panic and silently discard. Every +// containerstdout hot path calls into the Telemetry surface; nil-checks +// at each call site are forbidden, so the noop must be a real value. +// Covers EVERY kind containerstdout emits (canonical mirrors + +// receiver-local), so an accidental drop of a kind constant from the +// const block in selftel.go fails this test instead of silently +// breaking the alert grep against KindRotationStalled etc. +func TestSelfTel_NoopAlwaysSafe(t *testing.T) { + st := NewNoopTelemetry() + defer func() { + if r := recover(); r != nil { + t.Fatalf("noop panicked: %v", r) + } + }() + for _, k := range []Kind{ + // Canonical mirrors. + KindParse, KindRead, KindCardinality, KindDownstream, KindPanic, + // Receiver-local. + KindRotationStalled, KindCursorWriteFailed, KindBackpressureDrop, + KindFingerprintCardinality, KindAttributionCardinality, + KindRateLimitCardinality, KindWatch, + } { + st.IncError(k) + } + st.IncEmissions(42) + st.IncEmissions(-1) + st.ObserveLatency(15 * time.Millisecond) + st.SetDegraded(true) + st.SetDegraded(false) + st.MarkActivity() +} + +// TestSelfTel_NewTelemetry_NilProviderErrors pins: NewTelemetry returns +// ErrNilMeterProvider when called with a nil provider rather than +// silently substituting noop — the factory is responsible for the +// fallback + the recordInitError tick. +func TestSelfTel_NewTelemetry_NilProviderErrors(t *testing.T) { + _, err := NewTelemetry(selftelTestID(), nil) + if !errors.Is(err, ErrNilMeterProvider) { + t.Fatalf("err = %v, want ErrNilMeterProvider", err) + } +} + +// TestSelfTel_EmitsErrorsTotal_WithKindAndComponentID pins the metric +// contract. After IncError ×2 of a canonical Kind + ×1 of a +// receiver-local Kind, the ManualReader collects +// tracecore.receiver.errors_total with datapoints partitioned by kind +// and labeled with the component_id. A regression that drops the kind +// label, the component_id label, or the metric-name prefix fails here. +// Pins receiver-local Kind (rotation_stalled) is plumbed through the +// same counter as the canonical Kind (parse) — proves the dcgm-style +// co-located const block actually shares one counter, not two. +func TestSelfTel_EmitsErrorsTotal_WithKindAndComponentID(t *testing.T) { + mp, rdr := newTestMeterProvider(t) + st, err := NewTelemetry(selftelTestID(), mp) + if err != nil { + t.Fatalf("NewTelemetry: %v", err) + } + st.IncError(KindParse) + st.IncError(KindParse) + st.IncError(KindRotationStalled) + + rm := collectRM(t, rdr) + m, ok := findInstrument(rm, "tracecore.receiver.errors_total") + if !ok { + t.Fatalf("metric tracecore.receiver.errors_total absent; have: %s", dumpNames(rm)) + } + sum, ok := m.Data.(metricdata.Sum[int64]) + if !ok { + t.Fatalf("errors_total data shape: got %T, want metricdata.Sum[int64]", m.Data) + } + gotParse, foundParse := 0, false + gotStall, foundStall := 0, false + for _, dp := range sum.DataPoints { + if !kvMatch(dp, map[string]string{"component_id": "containerstdout/test"}) { + t.Errorf("datapoint missing component_id=containerstdout/test: %v", dp.Attributes) + continue + } + kindAttr, _ := dp.Attributes.Value("kind") + switch kindAttr.AsString() { + case "parse": + gotParse = int(dp.Value) + foundParse = true + case "rotation_stalled": + gotStall = int(dp.Value) + foundStall = true + } + } + if !foundParse || gotParse != 2 { + t.Errorf("parse count: got %d (found=%v), want 2", gotParse, foundParse) + } + if !foundStall || gotStall != 1 { + t.Errorf("rotation_stalled count: got %d (found=%v), want 1", gotStall, foundStall) + } +} + +// TestSelfTel_EveryKind_RoutesThroughErrorsTotal pins that every Kind +// containerstdout declares routes through the same counter. The +// canonical KindParse path is exercised above; this test exhaustively +// covers receiver-local KindRotationStalled / KindCursorWriteFailed / +// KindBackpressureDrop / KindFingerprintCardinality / +// KindAttributionCardinality / KindRateLimitCardinality / KindWatch. +// Falsifier: introducing a separate counter for "watch" (a future +// drift toward two surfaces) fails here because all 7 distinct kinds +// must surface as distinct datapoints on ONE metric. +func TestSelfTel_EveryKind_RoutesThroughErrorsTotal(t *testing.T) { + mp, rdr := newTestMeterProvider(t) + st, err := NewTelemetry(selftelTestID(), mp) + if err != nil { + t.Fatalf("NewTelemetry: %v", err) + } + receiverLocal := []Kind{ + KindRotationStalled, KindCursorWriteFailed, KindBackpressureDrop, + KindFingerprintCardinality, KindAttributionCardinality, + KindRateLimitCardinality, KindWatch, + } + for _, k := range receiverLocal { + st.IncError(k) + } + + rm := collectRM(t, rdr) + m, ok := findInstrument(rm, "tracecore.receiver.errors_total") + if !ok { + t.Fatalf("metric tracecore.receiver.errors_total absent; have: %s", dumpNames(rm)) + } + sum, ok := m.Data.(metricdata.Sum[int64]) + if !ok { + t.Fatalf("errors_total data shape: got %T", m.Data) + } + seen := map[string]int{} + for _, dp := range sum.DataPoints { + kindAttr, _ := dp.Attributes.Value("kind") + seen[kindAttr.AsString()] = int(dp.Value) + } + for _, k := range receiverLocal { + if seen[string(k)] != 1 { + t.Errorf("kind=%s: got count %d, want 1 (seen: %v)", k, seen[string(k)], seen) + } + } +} + +// TestSelfTel_EmitsEmissionsTotal pins: IncEmissions surfaces a +// monotonic counter; negative values are silently discarded per the +// interface contract (kept identical to internal/selftelemetry to +// avoid regression in receivers that pass negative debug values). +func TestSelfTel_EmitsEmissionsTotal(t *testing.T) { + mp, rdr := newTestMeterProvider(t) + st, err := NewTelemetry(selftelTestID(), mp) + if err != nil { + t.Fatalf("NewTelemetry: %v", err) + } + st.IncEmissions(3) + st.IncEmissions(5) + st.IncEmissions(-1) + rm := collectRM(t, rdr) + m, ok := findInstrument(rm, "tracecore.receiver.emissions_total") + if !ok { + t.Fatalf("metric tracecore.receiver.emissions_total absent; have: %s", dumpNames(rm)) + } + sum, ok := m.Data.(metricdata.Sum[int64]) + if !ok { + t.Fatalf("emissions_total data shape: got %T, want metricdata.Sum[int64]", m.Data) + } + if len(sum.DataPoints) != 1 { + t.Fatalf("emissions datapoints: got %d, want 1", len(sum.DataPoints)) + } + if sum.DataPoints[0].Value != 8 { + t.Errorf("emissions value: got %d, want 8", sum.DataPoints[0].Value) + } +} + +// TestSelfTel_ScopeNameIsReceiverImportPath pins the OTel scope-name +// standard: instrumentation scope = receiver's Go import path. Anchors +// the RFC-0013 PR-B / PR-F decision (vs reusing the deleted +// internal/selftelemetry scope) so a future drift back to the internal +// name fails here. +func TestSelfTel_ScopeNameIsReceiverImportPath(t *testing.T) { + mp, rdr := newTestMeterProvider(t) + st, err := NewTelemetry(selftelTestID(), mp) + if err != nil { + t.Fatalf("NewTelemetry: %v", err) + } + st.IncEmissions(1) + rm := collectRM(t, rdr) + scope, ok := scopeOf(rm, "tracecore.receiver.emissions_total") + if !ok { + t.Fatalf("emissions_total absent") + } + const wantScope = "github.com/tracecoreai/tracecore/components/receivers/containerstdout" + if scope != wantScope { + t.Errorf("instrumentation scope: got %q, want %q", scope, wantScope) + } +} + +// TestSelfTel_RecordInitError_TicksInitErrorsCounter pins: when +// factory wiring fails (NewTelemetry returns an error), recordInitError +// surfaces a tracecore.selftelemetry.init_errors_total tick with +// kind="receiver", the component_id label, and +// reason="instrument_register". This is the only signal that a +// receiver fell back to noop telemetry; dropping the recordInitError +// call must fail this test. +func TestSelfTel_RecordInitError_TicksInitErrorsCounter(t *testing.T) { + mp, rdr := newTestMeterProvider(t) + recordInitError(context.Background(), mp, "receiver", selftelTestID().String(), reasonInstrumentRegister) + + rm := collectRM(t, rdr) + m, ok := findInstrument(rm, "tracecore.selftelemetry.init_errors_total") + if !ok { + t.Fatalf("init_errors_total absent; have: %s", dumpNames(rm)) + } + sum, ok := m.Data.(metricdata.Sum[int64]) + if !ok { + t.Fatalf("init_errors_total data shape: got %T, want metricdata.Sum[int64]", m.Data) + } + if len(sum.DataPoints) != 1 { + t.Fatalf("init_errors datapoints: got %d, want 1", len(sum.DataPoints)) + } + dp := sum.DataPoints[0] + want := map[string]string{ + "kind": "receiver", + "component_id": "containerstdout/test", + "reason": reasonInstrumentRegister, + } + if !kvMatch(dp, want) { + t.Errorf("init_errors attrs: got %v, want %v", dp.Attributes, want) + } + if dp.Value != 1 { + t.Errorf("init_errors value: got %d, want 1", dp.Value) + } +} + +// TestSelfTel_RecordInitError_NilProviderIsSafe pins: a nil +// MeterProvider must not panic — recordInitError IS the fallback path; +// crashing here would turn a partial degradation into a process kill. +func TestSelfTel_RecordInitError_NilProviderIsSafe(t *testing.T) { + defer func() { + if r := recover(); r != nil { + t.Fatalf("recordInitError(nil) panicked: %v", r) + } + }() + recordInitError(context.Background(), nil, "receiver", "x/y", reasonInstrumentRegister) +} + +// TestSelfTel_CapturingTelemetry_RecordsAllCalls pins the test-helper +// contract used by every external _test in containerstdout_test: +// IncError / IncEmissions / ObserveLatency / SetDegraded / MarkActivity +// each round-trip through their accessor. Negative IncEmissions is +// discarded (matches the production Telemetry contract). +func TestSelfTel_CapturingTelemetry_RecordsAllCalls(t *testing.T) { + tel := NewCapturingTelemetry() + tel.IncError(KindParse) + tel.IncError(KindWatch) + tel.IncEmissions(7) + tel.IncEmissions(-1) // must be discarded + tel.IncEmissions(2) + tel.ObserveLatency(3 * time.Millisecond) + tel.SetDegraded(true) + tel.SetDegraded(false) + tel.MarkActivity() + tel.MarkActivity() + + if got, want := tel.Errors(), []Kind{KindParse, KindWatch}; !equalKinds(got, want) { + t.Errorf("Errors: got %v want %v", got, want) + } + if got, want := tel.Emissions(), []int64{7, 2}; !equalInt64s(got, want) { + t.Errorf("Emissions: got %v want %v (negative must be discarded)", got, want) + } + if got := tel.Latencies(); len(got) != 1 || got[0] != 3*time.Millisecond { + t.Errorf("Latencies: got %v want [3ms]", got) + } + if got, want := tel.DegradedTransitions(), []bool{true, false}; !equalBools(got, want) { + t.Errorf("DegradedTransitions: got %v want %v", got, want) + } + if got := tel.ActivityHits(); got != 2 { + t.Errorf("ActivityHits: got %d want 2", got) + } +} + +func equalKinds(a, b []Kind) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + +func equalInt64s(a, b []int64) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + +func equalBools(a, b []bool) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} diff --git a/components/receivers/containerstdout/tailer.go b/components/receivers/containerstdout/tailer.go index b2c5203f..6d856495 100644 --- a/components/receivers/containerstdout/tailer.go +++ b/components/receivers/containerstdout/tailer.go @@ -13,8 +13,6 @@ import ( "os" "sync" "time" - - "github.com/tracecoreai/tracecore/internal/selftelemetry" ) // TruncationMarker is the suffix appended when a single log line @@ -105,7 +103,7 @@ type TailerOptions struct { // today; future kinds for read errors and oversize-line counters). // A nil sink is replaced with the no-op receiver so test harnesses // that don't care about telemetry can omit the field. - Telemetry selftelemetry.Receiver + Telemetry Telemetry // Logger is the structured logger; nil falls back to a discard // handler so the tailer never panics on a missing logger and tests @@ -226,7 +224,7 @@ func newTailer(_ context.Context, opts TailerOptions, nowFn func() time.Time) (* opts.MaxLineSize = DefaultMaxLogSize } if opts.Telemetry == nil { - opts.Telemetry = selftelemetry.NewNoopReceiver() + opts.Telemetry = NewNoopTelemetry() } if opts.Logger == nil { opts.Logger = slog.New(slog.NewTextHandler(io.Discard, nil)) diff --git a/components/receivers/containerstdout/tailer_test.go b/components/receivers/containerstdout/tailer_test.go index 8209f2a8..cd11322b 100644 --- a/components/receivers/containerstdout/tailer_test.go +++ b/components/receivers/containerstdout/tailer_test.go @@ -18,7 +18,6 @@ import ( "github.com/stretchr/testify/require" "github.com/tracecoreai/tracecore/components/receivers/containerstdout" - "github.com/tracecoreai/tracecore/internal/selftelemetry" ) // tailerTestPoll is the per-file poll interval used across the tailer @@ -135,7 +134,7 @@ func TestTailer_ReadsAppendedLines(t *testing.T) { path := filepath.Join(dir, "0.log") require.NoError(t, os.WriteFile(path, nil, 0o600)) - tel := selftelemetry.NewCapturingReceiver() + tel := containerstdout.NewCapturingTelemetry() tailer, stop := runTailer(t, containerstdout.TailerOptions{ Path: path, PollInterval: tailerTestPoll, @@ -174,7 +173,7 @@ func TestTailer_StartOffsetSkipsPriorBytes(t *testing.T) { PollInterval: tailerTestPoll, FingerprintSize: 16, MaxLineSize: 1024, - Telemetry: selftelemetry.NewCapturingReceiver(), + Telemetry: containerstdout.NewCapturingTelemetry(), }) defer stop() @@ -200,7 +199,7 @@ func TestTailer_HandlesEmptyFile(t *testing.T) { PollInterval: tailerTestPoll, FingerprintSize: 16, MaxLineSize: 1024, - Telemetry: selftelemetry.NewCapturingReceiver(), + Telemetry: containerstdout.NewCapturingTelemetry(), }) defer stop() @@ -234,7 +233,7 @@ func TestTailer_MissingFileNoCrash(t *testing.T) { PollInterval: tailerTestPoll, FingerprintSize: 16, MaxLineSize: 1024, - Telemetry: selftelemetry.NewCapturingReceiver(), + Telemetry: containerstdout.NewCapturingTelemetry(), }) defer stop() @@ -277,7 +276,7 @@ func TestTailer_DetectsRotation(t *testing.T) { PollInterval: tailerTestPoll, FingerprintSize: fpSize, MaxLineSize: 1024, - Telemetry: selftelemetry.NewCapturingReceiver(), + Telemetry: containerstdout.NewCapturingTelemetry(), OnRotation: func(oldFp, newFp, p string) { rotMu.Lock() defer rotMu.Unlock() @@ -324,7 +323,7 @@ func TestTailer_TruncationWithoutRotation(t *testing.T) { PollInterval: tailerTestPoll, FingerprintSize: fpSize, MaxLineSize: 1024, - Telemetry: selftelemetry.NewCapturingReceiver(), + Telemetry: containerstdout.NewCapturingTelemetry(), OnTruncate: func(string) { select { case truncated <- struct{}{}: @@ -373,7 +372,7 @@ func TestTailer_OversizeLineTruncated(t *testing.T) { PollInterval: tailerTestPoll, FingerprintSize: 8, MaxLineSize: maxLine, - Telemetry: selftelemetry.NewCapturingReceiver(), + Telemetry: containerstdout.NewCapturingTelemetry(), }) defer stop() @@ -398,7 +397,7 @@ func TestTailer_RotationStalledKind(t *testing.T) { path := filepath.Join(dir, "0.log") require.NoError(t, os.WriteFile(path, nil, 0o600)) - tel := selftelemetry.NewCapturingReceiver() + tel := containerstdout.NewCapturingTelemetry() // Inject a fake clock so we can advance past the stall threshold // without sleeping. Each call to the returned function increments @@ -488,7 +487,7 @@ func TestTailer_StopClosesChannel(t *testing.T) { PollInterval: tailerTestPoll, FingerprintSize: 16, MaxLineSize: 1024, - Telemetry: selftelemetry.NewCapturingReceiver(), + Telemetry: containerstdout.NewCapturingTelemetry(), }) require.NoError(t, err) @@ -526,7 +525,7 @@ func TestTailer_ContextCancelStops(t *testing.T) { PollInterval: tailerTestPoll, FingerprintSize: 16, MaxLineSize: 1024, - Telemetry: selftelemetry.NewCapturingReceiver(), + Telemetry: containerstdout.NewCapturingTelemetry(), }) require.NoError(t, err) From 9eae369a6565cdde811cd45df0f00ec5dcba22b6 Mon Sep 17 00:00:00 2001 From: Tri Lam Date: Sat, 30 May 2026 23:54:31 -0700 Subject: [PATCH 2/2] fix(containerstdout): drop dead kind + prefix tests + trim exports MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reviewer findings on PR #197: 1. Delete KindFingerprintCardinality. The kind had zero IncError call sites in production — only defensive coverage in TestSelfTel_* and a t.Skip()'d "Phase 14" deferred test. Per the dcgm sibling rule "every kind has an impl call site" + [[no-bloat]], deleted the const, the failure_modes_test.go skipped test, kind_test.go row, RUNBOOK section, README alert row, prometheus-alerts entry, RFC-0010 enumeration, and docs/followups/M15.md grep example. The original tailer-pool LRU it was meant to instrument never materialised (footnoted in RFC-0010 §M15). 2. Rename 9 TestSelfTel_* → TestContainerstdout_SelfTel_* so test names disambiguate from sibling receivers' selftel tests (matches the lifecycle-test prefix convention reviewer flagged). 3. Reduce export surface — NewTelemetry, NewNoopTelemetry, and ErrNilMeterProvider had zero external callers (factory + selftel_test only, both same-package), so unexported to newTelemetry, newNoopTelemetry, errNilMeterProvider. Telemetry interface, Kind type, KindXxx constants, and CapturingTelemetry/NewCapturingTelemetry stay EXPORTED — they are used by ~10 external test files in package containerstdout_test (cursor_test, attribution_test, ratelimit_test, tailer_test, factory_test, etc.) and operators grep KindXxx names in dashboards/alert rules per RFC-0010. Unexporting them would force converting every external test into the internal package, which is net bloat against the symbol-hiding gain. make check + go test -race ./components/receivers/containerstdout/... + go test ./... all green (excluding pre-existing TestK8sevents_ Lifecycle_ConcurrentAddDuringShutdown_NoPanic flake from #196). Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Tri Lam --- .../receivers/containerstdout/README.md | 1 - .../receivers/containerstdout/RUNBOOK.md | 36 ++--- .../receivers/containerstdout/attribution.go | 2 +- .../containerstdout/bench_attribution_test.go | 2 +- .../bench_channel_depth_test.go | 2 +- .../containerstdout/bench_hot_path_test.go | 2 +- .../receivers/containerstdout/cursor.go | 2 +- .../receivers/containerstdout/factory.go | 4 +- .../receivers/containerstdout/factory_test.go | 4 +- .../containerstdout/failure_modes_test.go | 9 -- .../receivers/containerstdout/kind_test.go | 1 - .../containerstdout/noop_receiver.go | 2 +- .../prometheus-alerts.example.yaml | 19 --- .../receivers/containerstdout/ratelimit.go | 2 +- .../receivers/containerstdout/receiver.go | 2 +- .../receivers/containerstdout/selftel.go | 72 +++++----- .../receivers/containerstdout/selftel_test.go | 135 +++++++++--------- .../receivers/containerstdout/tailer.go | 2 +- docs/followups/M15.md | 2 +- .../0010-containerstdout-receiver-scope.md | 5 +- 20 files changed, 128 insertions(+), 178 deletions(-) diff --git a/components/receivers/containerstdout/README.md b/components/receivers/containerstdout/README.md index 061c8225..2a1d762f 100644 --- a/components/receivers/containerstdout/README.md +++ b/components/receivers/containerstdout/README.md @@ -64,7 +64,6 @@ Per-Kind alert mapping: | `backpressure_drop` | `ContainerStdoutBackpressure` | warning | | `cursor_write_failed` | `ContainerStdoutCursorWriteFailed` | critical | | `watch` | `ContainerStdoutWatchFlap` | warning | -| `fingerprint_cardinality` | `ContainerStdoutCardinalityFingerprint` | info | | `attribution_cardinality` | `ContainerStdoutCardinalityAttribution` | info | | `rate_limit_cardinality` | `ContainerStdoutCardinalityRateLimit` | info | | (composite OR of source flags) | `ContainerStdoutDegraded` | warning | diff --git a/components/receivers/containerstdout/RUNBOOK.md b/components/receivers/containerstdout/RUNBOOK.md index 1bfb22f4..acad9b72 100644 --- a/components/receivers/containerstdout/RUNBOOK.md +++ b/components/receivers/containerstdout/RUNBOOK.md @@ -163,27 +163,6 @@ test that pins its behaviour, and the operator remediation path. reconnect; the degraded gauge clears on the next successful watch. -### KindFingerprintCardinality - -- **Alert:** `ContainerStdoutCardinalityFingerprint` (severity: info) -- **Test:** `TestFailure_FingerprintCardinalityOverflow` -- **Means:** the fingerprint→stream identity LRU rolled over a key. - Future scans MAY double-emit lines on the evicted stream until the - cursor resyncs. -- **Likely root cause:** pod churn exceeded the LRU capacity - (default 8192 entries). Normal during cluster autoscaling or - large rolling deploys; sustained signal indicates the cap is - undersized for the cluster. -- **Diagnose:** - ```sh - curl localhost:8888/metrics | grep 'kind="fingerprint_cardinality"' - # Compare with pod churn rate. - kubectl get events -A --field-selector reason=Created | wc -l - ``` -- **Mitigation:** signal-only; no action required for a transient - spike. For a sustained rate, raise the fingerprint LRU cap via the - `config:` override (`containerstdout.attribution_lru_cap`). - ### KindAttributionCardinality - **Alert:** `ContainerStdoutCardinalityAttribution` (severity: info) @@ -191,11 +170,15 @@ test that pins its behaviour, and the operator remediation path. - **Means:** the per-`(pod_uid, container)` attribution LRU rolled over. Emissions for the evicted key fall back to namespace + container until the informer re-keys. -- **Likely root cause:** same as fingerprint cardinality - - short-lived pods rotating through the receiver faster than the - LRU clears. -- **Diagnose + Mitigation:** same as KindFingerprintCardinality; - the two LRUs share a cap. +- **Likely root cause:** short-lived pods rotating through the + receiver faster than the LRU clears. +- **Diagnose:** + ```sh + curl localhost:8888/metrics | grep 'kind="attribution_cardinality"' + ``` +- **Mitigation:** signal-only; no action required for a transient + spike. For a sustained rate, raise the attribution LRU cap via the + `config:` override (`containerstdout.attribution_lru_cap`). ### KindRateLimitCardinality @@ -268,7 +251,6 @@ cleans them up or re-enables the receiver. | Slow downstream consumer | `backpressure_drop` | Drop with `kind="backpressure_drop"`; tailer never blocks. | `TestFailure_BackpressureDropOnSlowConsumer` | | Cursor write to read-only fs | `cursor_write_failed` | Bounded retry; receiver stays alive; restart replays from older cursor. | `TestFailure_CursorWriteFailedReadOnlyFs` | | Pod-informer watch error | `watch` | `Degraded()=true`; client-go backoff (1s→30s); receiver stays alive. | `TestFailure_WatchAPIServerFlap` | -| Fingerprint LRU overflow | `fingerprint_cardinality` | Evicted key may double-emit until cursor resyncs. | `TestFailure_FingerprintCardinalityOverflow` | | Attribution LRU overflow | `attribution_cardinality` | Evicted key falls back to namespace+container. | `TestFailure_AttributionCardinalityOverflow` | | Rate-limit LRU overflow | `rate_limit_cardinality` | Evicted key emits without budget until namespace fallback claims it. | `TestFailure_RateLimitCardinalityOverflow` | | Corrupt CRI line | (no Kind - line skipped) | Skipped + emission-stat tick; receiver stays alive. | `TestFailure_CorruptCRILineSkipped` | diff --git a/components/receivers/containerstdout/attribution.go b/components/receivers/containerstdout/attribution.go index f50707b5..127a006a 100644 --- a/components/receivers/containerstdout/attribution.go +++ b/components/receivers/containerstdout/attribution.go @@ -83,7 +83,7 @@ func NewCache(src AttributionSource, capacity int, telemetry Telemetry) *Cache { if telemetry == nil { // Defensive - Cache's hot path must never panic on a nil // telemetry sink. Cardinality events are silently dropped. - telemetry = NewNoopTelemetry() + telemetry = newNoopTelemetry() } return &Cache{ src: src, diff --git a/components/receivers/containerstdout/bench_attribution_test.go b/components/receivers/containerstdout/bench_attribution_test.go index 540ea3e6..1291f0eb 100644 --- a/components/receivers/containerstdout/bench_attribution_test.go +++ b/components/receivers/containerstdout/bench_attribution_test.go @@ -32,7 +32,7 @@ func BenchmarkAttributionLookup(b *testing.B) { Found: true, }, }) - cache := NewCache(src, 64, NewNoopTelemetry()) + cache := NewCache(src, 64, newNoopTelemetry()) if _, err := cache.Lookup(path); err != nil { b.Fatalf("prewarm cache: %v", err) } diff --git a/components/receivers/containerstdout/bench_channel_depth_test.go b/components/receivers/containerstdout/bench_channel_depth_test.go index 607a8b63..d06704e6 100644 --- a/components/receivers/containerstdout/bench_channel_depth_test.go +++ b/components/receivers/containerstdout/bench_channel_depth_test.go @@ -25,7 +25,7 @@ func BenchmarkChannelDepthUnderEviction(b *testing.B) { LRUCap: 16, LRUEvictAfter: time.Minute, } - rl := NewRateLimiter(cfg, NewNoopTelemetry()) + rl := NewRateLimiter(cfg, newNoopTelemetry()) // Prime the bucket so the loop measures the steady-state reject // path, not the one-shot admission + accept. diff --git a/components/receivers/containerstdout/bench_hot_path_test.go b/components/receivers/containerstdout/bench_hot_path_test.go index 83b5d935..abb3d4da 100644 --- a/components/receivers/containerstdout/bench_hot_path_test.go +++ b/components/receivers/containerstdout/bench_hot_path_test.go @@ -56,7 +56,7 @@ func newBenchHotPathFixture(tb testing.TB) *benchHotPathFixture { Found: true, }, }) - cache := NewCache(src, 64, NewNoopTelemetry()) + cache := NewCache(src, 64, newNoopTelemetry()) // Pre-populate so the first Lookup in the bench loop is a hit. if _, err := cache.Lookup(path); err != nil { diff --git a/components/receivers/containerstdout/cursor.go b/components/receivers/containerstdout/cursor.go index 24a21c8f..564656e4 100644 --- a/components/receivers/containerstdout/cursor.go +++ b/components/receivers/containerstdout/cursor.go @@ -99,7 +99,7 @@ func NewCursorStore(cfg CursorConfig, telemetry Telemetry) (*CursorStore, error) return nil, fmt.Errorf("cursor: chmod %q: %w", cfg.Dir, err) } if telemetry == nil { - telemetry = NewNoopTelemetry() + telemetry = newNoopTelemetry() } return &CursorStore{ dir: cfg.Dir, diff --git a/components/receivers/containerstdout/factory.go b/components/receivers/containerstdout/factory.go index b9562d67..f7089bde 100644 --- a/components/receivers/containerstdout/factory.go +++ b/components/receivers/containerstdout/factory.go @@ -78,9 +78,9 @@ func (*factory) CreateLogs(ctx context.Context, set pipeline.CreateSettings, cfg return nil, fmt.Errorf("containerstdout: invalid config: %w", err) } - telemetry := NewNoopTelemetry() + telemetry := newNoopTelemetry() if set.Telemetry.MeterProvider != nil { - if rt, err := NewTelemetry(set.ID, set.Telemetry.MeterProvider); err == nil { + if rt, err := newTelemetry(set.ID, set.Telemetry.MeterProvider); err == nil { telemetry = rt } else { recordInitError(ctx, set.Telemetry.MeterProvider, diff --git a/components/receivers/containerstdout/factory_test.go b/components/receivers/containerstdout/factory_test.go index 1a6fc99a..9c8c2f20 100644 --- a/components/receivers/containerstdout/factory_test.go +++ b/components/receivers/containerstdout/factory_test.go @@ -102,11 +102,11 @@ func TestFactory_CreateLogs_RejectsInvalidConfig(t *testing.T) { // TestFactory_CreateLogs_WiresSelftelemetry pins the self-telemetry // wiring contract: a valid enabled config wired with a real // MeterProvider (the test fixture's noop MeterProvider counts as -// "real" for this contract - it satisfies the NewTelemetry signature) +// "real" for this contract - it satisfies the newTelemetry signature) // produces a Receiver. The factory must not return nil or error on a // happy-path enabled config. // -// The factory wires a non-noop containerstdout.Telemetry when +// The factory wires a non-noop containerstdout Telemetry when // set.Telemetry.MeterProvider is non-nil and the noop default // otherwise - both branches must yield a non-nil internal field so // hot-path callers don't nil-check. diff --git a/components/receivers/containerstdout/failure_modes_test.go b/components/receivers/containerstdout/failure_modes_test.go index fedb67e6..2e4023ed 100644 --- a/components/receivers/containerstdout/failure_modes_test.go +++ b/components/receivers/containerstdout/failure_modes_test.go @@ -58,12 +58,3 @@ func TestFailure_RotationStalledSurfacesKind(t *testing.T) { func TestFailure_BackpressureDropOnSlowConsumer(t *testing.T) { t.Skip("Phase 14 integration glue: needs receiver+consumer integration to drive the slow-consumer path") } - -// TestFailure_FingerprintCardinalityOverflow - DEFERRED. Asserts that -// when 10K distinct fingerprints arrive (e.g. churning test pods) the -// tailer-pool LRU evicts and KindFingerprintCardinality is recorded. -// The tailer pool itself is wired in Phase 14 - until then there is -// no fingerprint-keyed LRU to overflow. -func TestFailure_FingerprintCardinalityOverflow(t *testing.T) { - t.Skip("Phase 14 integration glue: tailer pool fingerprint LRU lands in Phase 14") -} diff --git a/components/receivers/containerstdout/kind_test.go b/components/receivers/containerstdout/kind_test.go index 293fc3ec..e73a5798 100644 --- a/components/receivers/containerstdout/kind_test.go +++ b/components/receivers/containerstdout/kind_test.go @@ -13,7 +13,6 @@ func TestKinds_StringValues(t *testing.T) { {"rotation_stalled", KindRotationStalled, "rotation_stalled"}, {"cursor_write_failed", KindCursorWriteFailed, "cursor_write_failed"}, {"backpressure_drop", KindBackpressureDrop, "backpressure_drop"}, - {"fingerprint_cardinality", KindFingerprintCardinality, "fingerprint_cardinality"}, {"attribution_cardinality", KindAttributionCardinality, "attribution_cardinality"}, {"rate_limit_cardinality", KindRateLimitCardinality, "rate_limit_cardinality"}, {"watch", KindWatch, "watch"}, diff --git a/components/receivers/containerstdout/noop_receiver.go b/components/receivers/containerstdout/noop_receiver.go index f9f2568b..01111ca1 100644 --- a/components/receivers/containerstdout/noop_receiver.go +++ b/components/receivers/containerstdout/noop_receiver.go @@ -29,7 +29,7 @@ type noopReceiver struct { // here so a direct caller doesn't panic. func newNoopReceiver(telemetry Telemetry) *noopReceiver { if telemetry == nil { - telemetry = NewNoopTelemetry() + telemetry = newNoopTelemetry() } return &noopReceiver{telemetry: telemetry} } diff --git a/components/receivers/containerstdout/prometheus-alerts.example.yaml b/components/receivers/containerstdout/prometheus-alerts.example.yaml index f402eb26..d555395e 100644 --- a/components/receivers/containerstdout/prometheus-alerts.example.yaml +++ b/components/receivers/containerstdout/prometheus-alerts.example.yaml @@ -23,7 +23,6 @@ # | backpressure_drop | warning | Rate-limit working; investigate sustained rate | # | cursor_write_failed | critical | Disk/permissions failure; data loss on restart | # | watch | warning | API-server flap; client-go retries | -# | fingerprint_cardinality | info | Pod churn signal | # | attribution_cardinality | info | LRU saturating; tune cap or namespace scope | # | rate_limit_cardinality | info | LRU saturating | # | degraded | warning | Composite OR of source flags | @@ -131,24 +130,6 @@ groups: receiver sets degraded and client-go drives reconnect. runbook_url: https://github.com/TraceCoreAI/tracecore/blob/main/components/receivers/containerstdout/RUNBOOK.md#kindwatch - - alert: ContainerStdoutCardinalityFingerprint - expr: | - rate(tracecore_receiver_errors_total{ - component_id=~"containerstdout/.*", kind="fingerprint_cardinality" - }[5m]) > 1 - for: 10m - labels: - severity: info - receiver_id: containerstdout - annotations: - summary: "containerstdout fingerprint LRU saturating on {{ $labels.host }}" - description: | - The fingerprint→stream-identity LRU is rolling over keys. - Future scans MAY double-emit lines on the evicted stream - until the cursor resyncs. Signal of pod churn; raise the - LRU cap if sustained. - runbook_url: https://github.com/TraceCoreAI/tracecore/blob/main/components/receivers/containerstdout/RUNBOOK.md#kindfingerprintcardinality - - alert: ContainerStdoutCardinalityAttribution expr: | rate(tracecore_receiver_errors_total{ diff --git a/components/receivers/containerstdout/ratelimit.go b/components/receivers/containerstdout/ratelimit.go index b3d18e90..62213b77 100644 --- a/components/receivers/containerstdout/ratelimit.go +++ b/components/receivers/containerstdout/ratelimit.go @@ -85,7 +85,7 @@ func NewRateLimiter(cfg EgressRateLimitConfig, telemetry Telemetry) *RateLimiter capN = 1 } if telemetry == nil { - telemetry = NewNoopTelemetry() + telemetry = newNoopTelemetry() } return &RateLimiter{ defaultRate: rate.Limit(cfg.Rate), diff --git a/components/receivers/containerstdout/receiver.go b/components/receivers/containerstdout/receiver.go index f5d35c80..3d8d611a 100644 --- a/components/receivers/containerstdout/receiver.go +++ b/components/receivers/containerstdout/receiver.go @@ -166,7 +166,7 @@ type containerStdoutReceiver struct { // trade-off favours degraded availability over hard refusal. func newReceiver(set pipeline.CreateSettings, cfg *Config, next consumer.Logs, telemetry Telemetry) *containerStdoutReceiver { if telemetry == nil { - telemetry = NewNoopTelemetry() + telemetry = newNoopTelemetry() } maxLine := cfg.MaxLogSize if maxLine <= 0 { diff --git a/components/receivers/containerstdout/selftel.go b/components/receivers/containerstdout/selftel.go index c63c28f1..3237872e 100644 --- a/components/receivers/containerstdout/selftel.go +++ b/components/receivers/containerstdout/selftel.go @@ -15,22 +15,28 @@ // component_id'd counter. // // Receiver-local Kinds (KindRotationStalled, KindCursorWriteFailed, -// KindBackpressureDrop, KindFingerprintCardinality, -// KindAttributionCardinality, KindRateLimitCardinality, KindWatch) -// are co-located in the same const block as the canonical mirrors -// (KindParse, KindRead, KindCardinality, KindDownstream, KindPanic) -// per the dcgm sibling pattern: one place to grep for "every kind -// containerstdout emits." +// KindBackpressureDrop, KindAttributionCardinality, +// KindRateLimitCardinality, KindWatch) are co-located in the same +// const block as the canonical mirrors (KindParse, KindRead, +// KindCardinality, KindDownstream, KindPanic) per the dcgm sibling +// pattern: one place to grep for "every kind containerstdout emits." // -// EXPORTED surface: unlike the kernelevents sibling (which keeps -// `selfTelemetry` + `kind` unexported), containerstdout's constructors -// (NewCursorStore, NewCache, NewRateLimiter, NewPodInformer, TailerOptions) -// are PUBLIC and accept the telemetry sink as a parameter. External -// _test packages (containerstdout_test) construct receivers via these -// APIs, so the Telemetry interface + the Kind type + the -// CapturingTelemetry test helper MUST be exported for the migration to -// preserve the test surface without rewriting every external test into -// the internal package. +// EXPORTED surface (post reviewer pass): +// - Telemetry interface + Kind type + KindXxx constants + +// CapturingTelemetry / NewCapturingTelemetry: external `_test` +// packages (containerstdout_test) construct receivers via the +// exported constructors (NewCursorStore, NewCache, NewRateLimiter, +// NewPodInformer, TailerOptions) and assert on the telemetry +// surface, so these must stay public. Operators also grep KindXxx +// names in dashboards / alert rules per RFC-0010. +// - newTelemetry / newNoopTelemetry / errNilMeterProvider: +// unexported — factory is the only non-test caller and tests live +// in the same package. +// +// Mirrors the dcgm sibling rule "every kind has an IncError call site +// in production" — KindFingerprintCardinality was dropped in the +// PR-F port because its emitter (tailer-pool LRU) never materialised +// (RFC-0010 §M15 footnote). package containerstdout @@ -97,10 +103,6 @@ const ( // line is also counted in the per-key dropped_lines gauge for // tail-receiver SLO accounting. KindBackpressureDrop Kind = "backpressure_drop" - // KindFingerprintCardinality — the fingerprint→stream identity LRU - // rolled over a key; future scan may double-emit lines on the - // evicted stream until the cursor resyncs. - KindFingerprintCardinality Kind = "fingerprint_cardinality" // KindAttributionCardinality — the per-(pod_uid, container) // attribution LRU rolled over; emissions for the evicted key fall // back to namespace+container until the informer re-keys. @@ -123,12 +125,12 @@ const reasonInstrumentRegister = "instrument_register" // the receiver moves to its own submodule. const instrumentationScope = "github.com/tracecoreai/tracecore/components/receivers/containerstdout" -// ErrNilMeterProvider mirrors selftelemetry.ErrNilMeterProvider — the +// errNilMeterProvider mirrors selftelemetry.ErrNilMeterProvider — the // factory is responsible for substituting the noop fallback + ticking // init_errors_total. Returning a sentinel rather than a generic error // lets the factory distinguish "wire-up bug" from "instrument register -// failure." -var ErrNilMeterProvider = errors.New("containerstdout: MeterProvider is nil") +// failure." Unexported: factory + selftel_test.go are the only callers. +var errNilMeterProvider = errors.New("containerstdout: MeterProvider is nil") // Telemetry is the receiver-scoped self-health surface. Methods are // non-blocking + safe for concurrent use; the noop impl discards. @@ -146,11 +148,12 @@ type Telemetry interface { // noopTelemetry discards every call. type noopTelemetry struct{} -// NewNoopTelemetry returns a Telemetry whose methods discard. Use in -// tests + as the factory fallback when the runtime didn't supply a -// MeterProvider. Exported so external _test packages (containerstdout_test) -// can substitute when they don't need to assert on telemetry. -func NewNoopTelemetry() Telemetry { return noopTelemetry{} } +// newNoopTelemetry returns a Telemetry whose methods discard. Used as +// the factory fallback when the runtime didn't supply a MeterProvider +// and as the per-component fallback inside NewCache / NewRateLimiter +// when a nil telemetry sink is passed. Unexported — external test +// packages that need a no-op pass NewCapturingTelemetry() instead. +func newNoopTelemetry() Telemetry { return noopTelemetry{} } func (noopTelemetry) IncError(Kind) {} func (noopTelemetry) IncEmissions(int64) {} @@ -160,20 +163,15 @@ func (noopTelemetry) MarkActivity() {} var _ Telemetry = noopTelemetry{} -// NewTelemetry returns a real Telemetry backed by OTel metric +// newTelemetry returns a real Telemetry backed by OTel metric // instruments acquired from mp. The component's id is attached as the // `component_id` label on every emission. Registers the same five // instruments the v0.1.x internal selftelemetry package registered, so -// scraped metric names + label shape are unchanged across the migration. -// -// Exported because the factory calls it; renaming to lowercase would -// move the factory's CreateLogs into selftel.go (which it doesn't -// belong in) or force a per-receiver internal package, neither of -// which is worth the symbol-hiding gain when the factory is the only -// non-test caller. -func NewTelemetry(id pipeline.ID, mp metric.MeterProvider) (Telemetry, error) { +// scraped metric names + label shape are unchanged across the +// migration. Unexported: factory is the only non-test caller. +func newTelemetry(id pipeline.ID, mp metric.MeterProvider) (Telemetry, error) { if mp == nil { - return nil, ErrNilMeterProvider + return nil, errNilMeterProvider } meter := mp.Meter(instrumentationScope) attrSet := attribute.NewSet(attribute.String("component_id", id.String())) diff --git a/components/receivers/containerstdout/selftel_test.go b/components/receivers/containerstdout/selftel_test.go index 281a08a0..061ef4b1 100644 --- a/components/receivers/containerstdout/selftel_test.go +++ b/components/receivers/containerstdout/selftel_test.go @@ -93,16 +93,17 @@ func dumpNames(rm metricdata.ResourceMetrics) string { return b.String() } -// TestSelfTel_NoopAlwaysSafe pins: NewNoopTelemetry returns a value -// whose hot-path methods never panic and silently discard. Every -// containerstdout hot path calls into the Telemetry surface; nil-checks -// at each call site are forbidden, so the noop must be a real value. -// Covers EVERY kind containerstdout emits (canonical mirrors + -// receiver-local), so an accidental drop of a kind constant from the -// const block in selftel.go fails this test instead of silently -// breaking the alert grep against KindRotationStalled etc. -func TestSelfTel_NoopAlwaysSafe(t *testing.T) { - st := NewNoopTelemetry() +// TestContainerstdout_SelfTel_NoopAlwaysSafe pins: newNoopTelemetry +// returns a value whose hot-path methods never panic and silently +// discard. Every containerstdout hot path calls into the Telemetry +// surface; nil-checks at each call site are forbidden, so the noop +// must be a real value. Covers EVERY kind containerstdout emits +// (canonical mirrors + receiver-local), so an accidental drop of a +// kind constant from the const block in selftel.go fails this test +// instead of silently breaking the alert grep against +// KindRotationStalled etc. +func TestContainerstdout_SelfTel_NoopAlwaysSafe(t *testing.T) { + st := newNoopTelemetry() defer func() { if r := recover(); r != nil { t.Fatalf("noop panicked: %v", r) @@ -113,8 +114,7 @@ func TestSelfTel_NoopAlwaysSafe(t *testing.T) { KindParse, KindRead, KindCardinality, KindDownstream, KindPanic, // Receiver-local. KindRotationStalled, KindCursorWriteFailed, KindBackpressureDrop, - KindFingerprintCardinality, KindAttributionCardinality, - KindRateLimitCardinality, KindWatch, + KindAttributionCardinality, KindRateLimitCardinality, KindWatch, } { st.IncError(k) } @@ -126,31 +126,31 @@ func TestSelfTel_NoopAlwaysSafe(t *testing.T) { st.MarkActivity() } -// TestSelfTel_NewTelemetry_NilProviderErrors pins: NewTelemetry returns -// ErrNilMeterProvider when called with a nil provider rather than -// silently substituting noop — the factory is responsible for the -// fallback + the recordInitError tick. -func TestSelfTel_NewTelemetry_NilProviderErrors(t *testing.T) { - _, err := NewTelemetry(selftelTestID(), nil) - if !errors.Is(err, ErrNilMeterProvider) { - t.Fatalf("err = %v, want ErrNilMeterProvider", err) +// TestContainerstdout_SelfTel_NewTelemetry_NilProviderErrors pins: +// newTelemetry returns errNilMeterProvider when called with a nil +// provider rather than silently substituting noop — the factory is +// responsible for the fallback + the recordInitError tick. +func TestContainerstdout_SelfTel_NewTelemetry_NilProviderErrors(t *testing.T) { + _, err := newTelemetry(selftelTestID(), nil) + if !errors.Is(err, errNilMeterProvider) { + t.Fatalf("err = %v, want errNilMeterProvider", err) } } -// TestSelfTel_EmitsErrorsTotal_WithKindAndComponentID pins the metric -// contract. After IncError ×2 of a canonical Kind + ×1 of a -// receiver-local Kind, the ManualReader collects +// TestContainerstdout_SelfTel_EmitsErrorsTotal_WithKindAndComponentID +// pins the metric contract. After IncError ×2 of a canonical Kind + +// ×1 of a receiver-local Kind, the ManualReader collects // tracecore.receiver.errors_total with datapoints partitioned by kind // and labeled with the component_id. A regression that drops the kind // label, the component_id label, or the metric-name prefix fails here. // Pins receiver-local Kind (rotation_stalled) is plumbed through the // same counter as the canonical Kind (parse) — proves the dcgm-style // co-located const block actually shares one counter, not two. -func TestSelfTel_EmitsErrorsTotal_WithKindAndComponentID(t *testing.T) { +func TestContainerstdout_SelfTel_EmitsErrorsTotal_WithKindAndComponentID(t *testing.T) { mp, rdr := newTestMeterProvider(t) - st, err := NewTelemetry(selftelTestID(), mp) + st, err := newTelemetry(selftelTestID(), mp) if err != nil { - t.Fatalf("NewTelemetry: %v", err) + t.Fatalf("newTelemetry: %v", err) } st.IncError(KindParse) st.IncError(KindParse) @@ -190,25 +190,24 @@ func TestSelfTel_EmitsErrorsTotal_WithKindAndComponentID(t *testing.T) { } } -// TestSelfTel_EveryKind_RoutesThroughErrorsTotal pins that every Kind -// containerstdout declares routes through the same counter. The -// canonical KindParse path is exercised above; this test exhaustively -// covers receiver-local KindRotationStalled / KindCursorWriteFailed / -// KindBackpressureDrop / KindFingerprintCardinality / +// TestContainerstdout_SelfTel_EveryKind_RoutesThroughErrorsTotal pins +// that every Kind containerstdout declares routes through the same +// counter. The canonical KindParse path is exercised above; this test +// exhaustively covers receiver-local KindRotationStalled / +// KindCursorWriteFailed / KindBackpressureDrop / // KindAttributionCardinality / KindRateLimitCardinality / KindWatch. // Falsifier: introducing a separate counter for "watch" (a future -// drift toward two surfaces) fails here because all 7 distinct kinds +// drift toward two surfaces) fails here because all 6 distinct kinds // must surface as distinct datapoints on ONE metric. -func TestSelfTel_EveryKind_RoutesThroughErrorsTotal(t *testing.T) { +func TestContainerstdout_SelfTel_EveryKind_RoutesThroughErrorsTotal(t *testing.T) { mp, rdr := newTestMeterProvider(t) - st, err := NewTelemetry(selftelTestID(), mp) + st, err := newTelemetry(selftelTestID(), mp) if err != nil { - t.Fatalf("NewTelemetry: %v", err) + t.Fatalf("newTelemetry: %v", err) } receiverLocal := []Kind{ KindRotationStalled, KindCursorWriteFailed, KindBackpressureDrop, - KindFingerprintCardinality, KindAttributionCardinality, - KindRateLimitCardinality, KindWatch, + KindAttributionCardinality, KindRateLimitCardinality, KindWatch, } for _, k := range receiverLocal { st.IncError(k) @@ -235,15 +234,15 @@ func TestSelfTel_EveryKind_RoutesThroughErrorsTotal(t *testing.T) { } } -// TestSelfTel_EmitsEmissionsTotal pins: IncEmissions surfaces a -// monotonic counter; negative values are silently discarded per the -// interface contract (kept identical to internal/selftelemetry to -// avoid regression in receivers that pass negative debug values). -func TestSelfTel_EmitsEmissionsTotal(t *testing.T) { +// TestContainerstdout_SelfTel_EmitsEmissionsTotal pins: IncEmissions +// surfaces a monotonic counter; negative values are silently discarded +// per the interface contract (kept identical to internal/selftelemetry +// to avoid regression in receivers that pass negative debug values). +func TestContainerstdout_SelfTel_EmitsEmissionsTotal(t *testing.T) { mp, rdr := newTestMeterProvider(t) - st, err := NewTelemetry(selftelTestID(), mp) + st, err := newTelemetry(selftelTestID(), mp) if err != nil { - t.Fatalf("NewTelemetry: %v", err) + t.Fatalf("newTelemetry: %v", err) } st.IncEmissions(3) st.IncEmissions(5) @@ -265,16 +264,16 @@ func TestSelfTel_EmitsEmissionsTotal(t *testing.T) { } } -// TestSelfTel_ScopeNameIsReceiverImportPath pins the OTel scope-name -// standard: instrumentation scope = receiver's Go import path. Anchors -// the RFC-0013 PR-B / PR-F decision (vs reusing the deleted -// internal/selftelemetry scope) so a future drift back to the internal -// name fails here. -func TestSelfTel_ScopeNameIsReceiverImportPath(t *testing.T) { +// TestContainerstdout_SelfTel_ScopeNameIsReceiverImportPath pins the +// OTel scope-name standard: instrumentation scope = receiver's Go +// import path. Anchors the RFC-0013 PR-B / PR-F decision (vs reusing +// the deleted internal/selftelemetry scope) so a future drift back to +// the internal name fails here. +func TestContainerstdout_SelfTel_ScopeNameIsReceiverImportPath(t *testing.T) { mp, rdr := newTestMeterProvider(t) - st, err := NewTelemetry(selftelTestID(), mp) + st, err := newTelemetry(selftelTestID(), mp) if err != nil { - t.Fatalf("NewTelemetry: %v", err) + t.Fatalf("newTelemetry: %v", err) } st.IncEmissions(1) rm := collectRM(t, rdr) @@ -288,14 +287,14 @@ func TestSelfTel_ScopeNameIsReceiverImportPath(t *testing.T) { } } -// TestSelfTel_RecordInitError_TicksInitErrorsCounter pins: when -// factory wiring fails (NewTelemetry returns an error), recordInitError -// surfaces a tracecore.selftelemetry.init_errors_total tick with -// kind="receiver", the component_id label, and +// TestContainerstdout_SelfTel_RecordInitError_TicksInitErrorsCounter +// pins: when factory wiring fails (newTelemetry returns an error), +// recordInitError surfaces a tracecore.selftelemetry.init_errors_total +// tick with kind="receiver", the component_id label, and // reason="instrument_register". This is the only signal that a // receiver fell back to noop telemetry; dropping the recordInitError // call must fail this test. -func TestSelfTel_RecordInitError_TicksInitErrorsCounter(t *testing.T) { +func TestContainerstdout_SelfTel_RecordInitError_TicksInitErrorsCounter(t *testing.T) { mp, rdr := newTestMeterProvider(t) recordInitError(context.Background(), mp, "receiver", selftelTestID().String(), reasonInstrumentRegister) @@ -325,10 +324,11 @@ func TestSelfTel_RecordInitError_TicksInitErrorsCounter(t *testing.T) { } } -// TestSelfTel_RecordInitError_NilProviderIsSafe pins: a nil -// MeterProvider must not panic — recordInitError IS the fallback path; -// crashing here would turn a partial degradation into a process kill. -func TestSelfTel_RecordInitError_NilProviderIsSafe(t *testing.T) { +// TestContainerstdout_SelfTel_RecordInitError_NilProviderIsSafe pins: +// a nil MeterProvider must not panic — recordInitError IS the fallback +// path; crashing here would turn a partial degradation into a process +// kill. +func TestContainerstdout_SelfTel_RecordInitError_NilProviderIsSafe(t *testing.T) { defer func() { if r := recover(); r != nil { t.Fatalf("recordInitError(nil) panicked: %v", r) @@ -337,12 +337,13 @@ func TestSelfTel_RecordInitError_NilProviderIsSafe(t *testing.T) { recordInitError(context.Background(), nil, "receiver", "x/y", reasonInstrumentRegister) } -// TestSelfTel_CapturingTelemetry_RecordsAllCalls pins the test-helper -// contract used by every external _test in containerstdout_test: -// IncError / IncEmissions / ObserveLatency / SetDegraded / MarkActivity -// each round-trip through their accessor. Negative IncEmissions is -// discarded (matches the production Telemetry contract). -func TestSelfTel_CapturingTelemetry_RecordsAllCalls(t *testing.T) { +// TestContainerstdout_SelfTel_CapturingTelemetry_RecordsAllCalls pins +// the test-helper contract used by every external _test in +// containerstdout_test: IncError / IncEmissions / ObserveLatency / +// SetDegraded / MarkActivity each round-trip through their accessor. +// Negative IncEmissions is discarded (matches the production Telemetry +// contract). +func TestContainerstdout_SelfTel_CapturingTelemetry_RecordsAllCalls(t *testing.T) { tel := NewCapturingTelemetry() tel.IncError(KindParse) tel.IncError(KindWatch) diff --git a/components/receivers/containerstdout/tailer.go b/components/receivers/containerstdout/tailer.go index 6d856495..0b1c6a6d 100644 --- a/components/receivers/containerstdout/tailer.go +++ b/components/receivers/containerstdout/tailer.go @@ -224,7 +224,7 @@ func newTailer(_ context.Context, opts TailerOptions, nowFn func() time.Time) (* opts.MaxLineSize = DefaultMaxLogSize } if opts.Telemetry == nil { - opts.Telemetry = NewNoopTelemetry() + opts.Telemetry = newNoopTelemetry() } if opts.Logger == nil { opts.Logger = slog.New(slog.NewTextHandler(io.Discard, nil)) diff --git a/docs/followups/M15.md b/docs/followups/M15.md index b10b1fc8..939ff75f 100644 --- a/docs/followups/M15.md +++ b/docs/followups/M15.md @@ -129,7 +129,7 @@ scope. Each carries the trigger that should reopen the question. Fingerprint/Attribution Cardinality split) but does not name the runbook/alerts files as Phase-4 deliverables. *Trigger:* M15 Phase-4 alpha-promotion PR. Falsifying-enforcement: - `grep -c "KindRotationStalled\|KindCursorWriteFailed\|KindBackpressureDrop\|KindFingerprintCardinality\|KindAttributionCardinality\|KindWatch" docs/FAILURE-MODES.md` + `grep -c "KindRotationStalled\|KindCursorWriteFailed\|KindBackpressureDrop\|KindAttributionCardinality\|KindWatch" docs/FAILURE-MODES.md` returns ≥6.~~ **Resolved by RFC-0010 §Operator surfaces** (this PR; promoted inline during Phase-4 A+ pass with the KindRateLimitCardinality addition from Phase 3 → 7 alert rows). diff --git a/docs/rfcs/0010-containerstdout-receiver-scope.md b/docs/rfcs/0010-containerstdout-receiver-scope.md index d3c07862..f98a22e1 100644 --- a/docs/rfcs/0010-containerstdout-receiver-scope.md +++ b/docs/rfcs/0010-containerstdout-receiver-scope.md @@ -188,9 +188,8 @@ Mirrors k8sevents pattern: - `KindRotationStalled` — kubelet rotation has not happened for ≥30 s after `0.log` exceeded `containerLogMaxSize`. - `KindCursorWriteFailed` — host FS at cursor dir failed; in-memory tailing continues. - `KindBackpressureDrop` — per-key rate-limit dropped a record (sampled). - - `KindFingerprintCardinality` — fingerprint LRU (`max_concurrent_files` cap) exceeded; remediation is bumping `max_concurrent_files`. - `KindAttributionCardinality` — attribution LRU (informer cache cap) exceeded; remediation is bumping `attribution.lru_cap`. - - `KindRateLimitCardinality` — egress-rate-limit per-key LRU (`egress_rate_limit.lru_cap`) exceeded; remediation is bumping `egress_rate_limit.lru_cap`. The three-way split is justified by three independent overflow surfaces with three distinct remediations; merging them into the canonical `selftelemetry.KindCardinality` would force on-call to consult source `IncError` call sites (precedent in `k8sevents` / `dcgm`) where this receiver's three LRUs make that lookup ambiguous. + - `KindRateLimitCardinality` — egress-rate-limit per-key LRU (`egress_rate_limit.lru_cap`) exceeded; remediation is bumping `egress_rate_limit.lru_cap`. The two-way split is justified by two independent overflow surfaces with two distinct remediations; merging them into the canonical `selftelemetry.KindCardinality` would force on-call to consult source `IncError` call sites (precedent in `k8sevents` / `dcgm`) where this receiver's two LRUs make that lookup ambiguous. (M15 originally enumerated a third `KindFingerprintCardinality` for the tailer-pool LRU; that pool never materialised — emission was dropped during the PR-F selftel port per `[[no-bloat]]` "every kind has impl call site".) - `KindWatch` — Pod informer error. **Kind name re-used from `k8sevents` intentionally.** Alert rules in `prometheus-alerts.example.yaml` MUST scope by `receiver_id="containerstdout"` to disambiguate from `k8sevents.KindWatch`; a global panel on `tracecore_receiver_errors_total{kind="watch"}` would conflate two distinct degraded conditions. `KindBackpressureDrop` follows the same convention (alias of `k8sevents.KindBackpressureDrop`); `receiver_id` scoping is required for both. ### RBAC @@ -395,7 +394,7 @@ Backward-compat is opt-in (per PRINCIPLES §11). Config field names may rename t - **Cursor directory:** M15 owns `/var/lib/tracecore/container_stdout/`. Future siblings reserve their own subdirectories. - **Self-telemetry namespace:** all receivers emit `tracecore_receiver_*` metrics partitioned by `receiver_id`. New M15 kinds (`KindRotationStalled`, `KindCursorWriteFailed`) must not alias kernelevents or k8sevents kinds — grep at PR time. - **RBAC:** unique ClusterRole name `tracecore-containerstdout` (no `-clusterrole` suffix, matches k8sevents `tracecore-k8sevents`). -- **Self-telemetry kinds:** M15 introduces `KindRotationStalled`, `KindCursorWriteFailed`, `KindFingerprintCardinality`, `KindAttributionCardinality`, `KindRateLimitCardinality`. Intentional aliases of `k8sevents` kinds: `KindWatch`, `KindBackpressureDrop` — alert rules MUST scope by `receiver_id`. +- **Self-telemetry kinds:** M15 introduces `KindRotationStalled`, `KindCursorWriteFailed`, `KindAttributionCardinality`, `KindRateLimitCardinality`. Intentional aliases of `k8sevents` kinds: `KindWatch`, `KindBackpressureDrop` — alert rules MUST scope by `receiver_id`. ### Upgrade across vendored `pkg/stanza/fileconsumer` releases