From f24f5ba52e7e8437d5b613fe7b420d066adaffa1 Mon Sep 17 00:00:00 2001 From: Tri Lam Date: Sat, 30 May 2026 21:00:51 -0700 Subject: [PATCH] feat(pivot): port clockreceiver off internal selftel + lifecycle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors PR-B1 (nccl_fr, #184) for clockreceiver, per RFC-0013 §migration PR-B. Helpers travel as siblings (`selftel.go` + `lifecycle.go`) co-located with the receiver. Drops imports of `internal/selftelemetry` + `internal/runtime/lifecycle` — only doc-comment mentions remain. Lifecycle sibling is slimmer than the internal helper: drops `Add()` + the post-Shutdown silent-no-op path because clockreceiver is single-source (same shape as the nccl_fr sibling). `kind` enum trimmed to the two values clockreceiver actually emits (`downstream`, `panic`) — no speculative kinds. Metric names + label shape preserved (`tracecore.receiver.{errors_total,emissions_total,collection_latency_seconds, degraded_seconds_total,last_activity_unix_seconds}` with `component_id` + `kind`), so dashboards / alerts and the `cmd/tracecore` integration test that asserts these names by string still pass. Instrumentation scope is the receiver's Go import path (`github.com/tracecoreai/tracecore/components/receivers/clockreceiver`). clockreceiver is scheduled for deletion in PR-K. Porting first is still worth it: the port is mechanical (mirror of PR-B1), and it unblocks the `internal/selftelemetry` + `internal/runtime/lifecycle` deletes in PR-F earlier than PR-K would. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Tri Lam --- .../receivers/clockreceiver/clockreceiver.go | 64 +-- .../receivers/clockreceiver/lifecycle.go | 138 +++++++ .../receivers/clockreceiver/lifecycle_test.go | 137 +++++++ components/receivers/clockreceiver/selftel.go | 258 ++++++++++++ .../receivers/clockreceiver/selftel_test.go | 381 ++++++++++++++++++ 5 files changed, 946 insertions(+), 32 deletions(-) create mode 100644 components/receivers/clockreceiver/lifecycle.go create mode 100644 components/receivers/clockreceiver/lifecycle_test.go create mode 100644 components/receivers/clockreceiver/selftel.go create mode 100644 components/receivers/clockreceiver/selftel_test.go diff --git a/components/receivers/clockreceiver/clockreceiver.go b/components/receivers/clockreceiver/clockreceiver.go index 8483042e..2d37f248 100644 --- a/components/receivers/clockreceiver/clockreceiver.go +++ b/components/receivers/clockreceiver/clockreceiver.go @@ -12,8 +12,6 @@ import ( "github.com/tracecoreai/tracecore/internal/consumer" "github.com/tracecoreai/tracecore/internal/pipeline" - "github.com/tracecoreai/tracecore/internal/runtime/lifecycle" - "github.com/tracecoreai/tracecore/internal/selftelemetry" ) // clockReceiver emits a gauge metric (`tracecore.clock.now`) on a @@ -23,12 +21,14 @@ import ( // their own clock. // // Embeds pipeline.ComponentState so Started()/Stopped() accessors -// work for tests. Delegates goroutine cancel + WaitGroup + -// panic-recovery to `internal/runtime/lifecycle.Lifecycle` — the -// same helper kernelevents (M9) uses, so the two receivers share -// one mechanism for streaming/tick lifecycle (PRINCIPLES.md §3). -// Self-telemetry wired via the M2 surface (errors / emissions / -// latency / activity) through `selftelemetry.Receiver`. +// work for tests. Cancel + WaitGroup + panic-recovery are delegated +// to the receiver-local `lifecycle` sibling (RFC-0013 PR-B), which +// replaces the v0.1.x dependency on `internal/runtime/lifecycle`. +// Self-telemetry is wired via the receiver-local `selfTelemetry` +// sibling (`tracecore.receiver.{errors_total,emissions_total, +// collection_latency_seconds,degraded_seconds_total, +// last_activity_unix_seconds}`), which replaces the v0.1.x dependency +// on `internal/selftelemetry`. type clockReceiver struct { pipeline.ComponentState @@ -38,43 +38,43 @@ type clockReceiver struct { next consumer.Metrics // telemetry records self-telemetry (errors, emissions, latency, - // activity) per the M2 wiring pattern. Always non-nil — the - // factory substitutes a noop if construction fails. - telemetry selftelemetry.Receiver + // activity). Always non-nil — the factory substitutes a noop if + // construction fails. + telemetry selfTelemetry - lc *lifecycle.Lifecycle + lc *lifecycle } func newReceiver(ctx context.Context, set pipeline.CreateSettings, cfg *Config, next consumer.Metrics) *clockReceiver { - // Construct the real selftelemetry.Receiver. If MeterProvider is - // somehow nil or construction fails, fall back to the noop so the - // hot path doesn't have to nil-check. The fallback emits a + // Construct the real selfTelemetry. If MeterProvider is somehow nil + // or construction fails, fall back to the noop so the hot path + // doesn't have to nil-check. The fallback emits a // `tracecore.selftelemetry.init_errors_total` tick so operators // see when a component is silently running with a noop. - sr := selftelemetry.NewNoopReceiver() + logger := set.Telemetry.Logger + if logger == nil { + logger = slog.Default() + } + st := newNoopSelfTelemetry() if set.Telemetry.MeterProvider == nil { - if set.Telemetry.Logger != nil { - set.Telemetry.Logger.Warn("self-telemetry: no MeterProvider; using noop") - } - } else if r, err := selftelemetry.NewReceiver(set.ID, set.Telemetry.MeterProvider); err == nil { - sr = r + logger.Warn("clockreceiver self-telemetry: no MeterProvider; using noop") + } else if rt, err := newSelfTelemetry(set.ID, set.Telemetry.MeterProvider); err == nil { + st = rt } else { - selftelemetry.RecordInitError(ctx, set.Telemetry.MeterProvider, - "receiver", set.ID.String(), selftelemetry.ReasonInstrumentRegister) - if set.Telemetry.Logger != nil { - set.Telemetry.Logger.Warn("self-telemetry init failed; using noop", "err", err) - } + recordInitError(ctx, set.Telemetry.MeterProvider, + "receiver", set.ID.String(), reasonInstrumentRegister) + logger.Warn("clockreceiver self-telemetry init failed; using noop", "err", err) } r := &clockReceiver{ - logger: set.Telemetry.Logger, + logger: logger, resource: set.Telemetry.Resource, interval: cfg.Interval, next: next, - telemetry: sr, + telemetry: st, } - r.lc = lifecycle.New(r.logger, func(_ any) { - r.telemetry.IncError(selftelemetry.KindPanic) + r.lc = newLifecycle(r.logger, func(_ any) { + r.telemetry.IncError(kindPanic) // SetDegraded(true) after panic: goroutine is dead, no recovery path. r.telemetry.SetDegraded(true) }) @@ -83,7 +83,7 @@ func newReceiver(ctx context.Context, set pipeline.CreateSettings, cfg *Config, // Start spawns the ticker goroutine via the lifecycle helper. The // caller's ctx authorizes the start; the ticker's lifetime is bound -// to Shutdown via lifecycle.Lifecycle's internal cancel. +// to Shutdown via lifecycle's internal cancel. func (r *clockReceiver) Start(ctx context.Context, host pipeline.Host) error { if err := r.ComponentState.Start(ctx, host); err != nil { return err @@ -145,7 +145,7 @@ func (r *clockReceiver) emit(ctx context.Context, now time.Time) { r.telemetry.ObserveLatency(time.Since(start)) if err != nil { - r.telemetry.IncError(selftelemetry.KindDownstream) + r.telemetry.IncError(kindDownstream) r.logger.Warn("clockreceiver: downstream rejected push", "err", err) return } diff --git a/components/receivers/clockreceiver/lifecycle.go b/components/receivers/clockreceiver/lifecycle.go new file mode 100644 index 00000000..827bd4cb --- /dev/null +++ b/components/receivers/clockreceiver/lifecycle.go @@ -0,0 +1,138 @@ +// SPDX-License-Identifier: Apache-2.0 + +// Receiver-scoped tick-source lifecycle helper. Replaces the v0.1.x +// dependency on `internal/runtime/lifecycle`, which is slated for +// deletion in RFC-0013 PR-F. Owns the cancel + WaitGroup + +// panic-recovery bookkeeping clockreceiver's ticker goroutine needs. +// Slimmer than the internal helper: no Add() (clockreceiver is +// single-source), no post-Shutdown Add silent-no-op path. + +package clockreceiver + +import ( + "context" + "errors" + "fmt" + "log/slog" + "runtime" + "sync" + "sync/atomic" +) + +// errLifecycleAlreadyStarted is returned by lifecycle.Start when called +// twice without an intervening Shutdown. errors.Is-comparable so callers +// don't string-match the message. +var errLifecycleAlreadyStarted = errors.New("clockreceiver lifecycle: already started") + +// panicCallback is invoked once if the Run function panics. The helper +// recovers the panic so the receiver never crashes the workload +// (PRINCIPLES.md §1). clockreceiver wires this to IncError(kindPanic) + +// SetDegraded(true). +type panicCallback func(panicValue any) + +// lifecycle bundles cancel + WaitGroup + started-flag for a tick +// source. Zero-value is NOT useful; use newLifecycle. +// +// Shutdown is idempotent. The FIRST Shutdown's error (typically a +// caller-ctx deadline) is stashed + returned by every subsequent +// Shutdown so deadline failures aren't silently swallowed. +type lifecycle struct { + logger *slog.Logger + onPanic panicCallback + + mu sync.Mutex + cancel context.CancelFunc + closed bool + shutdownErr error + wg sync.WaitGroup + started atomic.Bool +} + +// newLifecycle constructs a lifecycle. logger may be nil (replaced with +// slog.Default for tests). onPanic may be nil — panics are still +// recovered + logged at ERROR but no callback fires. +func newLifecycle(logger *slog.Logger, onPanic panicCallback) *lifecycle { + if logger == nil { + logger = slog.Default() + } + return &lifecycle{logger: logger, onPanic: onPanic} +} + +// Start spawns run in a goroutine. The ctx passed to run is derived +// from parent via context.WithCancel — so the receiver-level parent's +// cancellation cascades into the goroutine without an explicit Shutdown +// call. Idempotent: a second Start without an intervening Shutdown +// returns errLifecycleAlreadyStarted. +func (l *lifecycle) Start(parent context.Context, run func(context.Context)) error { + if !l.started.CompareAndSwap(false, true) { + return errLifecycleAlreadyStarted + } + l.mu.Lock() + internalCtx, cancel := context.WithCancel(parent) + l.cancel = cancel + // wg.Add(1) MUST happen under the same mutex as cancel so a + // concurrent Shutdown sees the post-Add state. Without this lock, + // Shutdown could observe cancel != nil, call wg.Wait at counter=0, + // return immediately, and either trigger `sync: WaitGroup misuse` + // OR orphan the goroutine. (Mirrors the internal helper's fix.) + l.wg.Add(1) + l.mu.Unlock() + go l.safeRun(internalCtx, run) + return nil +} + +// safeRun wraps run with panic recovery + wg.Done bookkeeping. +func (l *lifecycle) safeRun(ctx context.Context, run func(context.Context)) { + defer l.wg.Done() + defer func() { + if rec := recover(); rec != nil { + l.logger.Error("clockreceiver lifecycle: run panic recovered", "panic", fmt.Sprintf("%v", rec)) + if l.onPanic != nil { + l.onPanic(rec) + } + } + }() + run(ctx) +} + +// Shutdown cancels the internal ctx + waits for the goroutine to exit, +// honoring the caller's ctx deadline. Idempotent: subsequent calls +// return the FIRST call's error so a missed deadline isn't silently +// swallowed. +func (l *lifecycle) Shutdown(ctx context.Context) error { + l.mu.Lock() + if l.closed { + err := l.shutdownErr + l.mu.Unlock() + return err + } + cancel := l.cancel + l.cancel = nil + l.closed = true + l.mu.Unlock() + if cancel == nil { + return nil + } + cancel() + + done := make(chan struct{}) + go func() { + l.wg.Wait() + close(done) + }() + select { + case <-done: + return nil + case <-ctx.Done(): + // NumGoroutine is process-wide, not lifecycle-local; surfacing + // it here lets operators eyeball whether the leak is plausibly + // ours. + l.logger.Warn("clockreceiver lifecycle: shutdown deadline elapsed before goroutine exited", + "process_goroutines", runtime.NumGoroutine()) + err := fmt.Errorf("clockreceiver lifecycle shutdown: %w", ctx.Err()) + l.mu.Lock() + l.shutdownErr = err + l.mu.Unlock() + return err + } +} diff --git a/components/receivers/clockreceiver/lifecycle_test.go b/components/receivers/clockreceiver/lifecycle_test.go new file mode 100644 index 00000000..c0f3eeb2 --- /dev/null +++ b/components/receivers/clockreceiver/lifecycle_test.go @@ -0,0 +1,137 @@ +// SPDX-License-Identifier: Apache-2.0 + +package clockreceiver + +import ( + "context" + "errors" + "log/slog" + "sync" + "sync/atomic" + "testing" + "time" +) + +// TestLifecycle_StartShutdown pins the happy path: Start spawns the +// supplied run function, Shutdown cancels its ctx, run returns, Shutdown +// returns nil. +func TestLifecycle_StartShutdown(t *testing.T) { + lc := newLifecycle(slog.Default(), nil) + started := make(chan struct{}) + stopped := make(chan struct{}) + + err := lc.Start(context.Background(), func(ctx context.Context) { + close(started) + <-ctx.Done() + close(stopped) + }) + if err != nil { + t.Fatalf("Start: %v", err) + } + <-started + if err := lc.Shutdown(context.Background()); err != nil { + t.Fatalf("Shutdown: %v", err) + } + select { + case <-stopped: + case <-time.After(time.Second): + t.Fatal("run did not exit within 1s of Shutdown") + } +} + +// TestLifecycle_StartTwiceErrors pins: a second Start without an +// intervening Shutdown returns errLifecycleAlreadyStarted. The sentinel +// MUST be errors.Is-comparable so callers don't string-match. +func TestLifecycle_StartTwiceErrors(t *testing.T) { + lc := newLifecycle(slog.Default(), nil) + if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil { + t.Fatalf("first Start: %v", err) + } + defer func() { _ = lc.Shutdown(context.Background()) }() + err := lc.Start(context.Background(), func(context.Context) {}) + if !errors.Is(err, errLifecycleAlreadyStarted) { + t.Errorf("second Start err: got %v, want errLifecycleAlreadyStarted", err) + } +} + +// TestLifecycle_ShutdownIdempotent pins: a second Shutdown returns the +// first call's error (typically nil), not "double-close" or panic. +func TestLifecycle_ShutdownIdempotent(t *testing.T) { + lc := newLifecycle(slog.Default(), nil) + if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil { + t.Fatalf("Start: %v", err) + } + if err := lc.Shutdown(context.Background()); err != nil { + t.Fatalf("first Shutdown: %v", err) + } + if err := lc.Shutdown(context.Background()); err != nil { + t.Errorf("second Shutdown: got %v, want nil (idempotent)", err) + } +} + +// TestLifecycle_PanicCallbackFires pins: if the run goroutine panics, +// the supplied onPanic callback fires exactly once with the panic value +// and the panic is recovered (the process doesn't crash). This is the +// behaviour clockreceiver depends on to translate panic → IncError(kindPanic). +func TestLifecycle_PanicCallbackFires(t *testing.T) { + var called atomic.Int32 + var got any + var mu sync.Mutex + lc := newLifecycle(slog.Default(), func(v any) { + called.Add(1) + mu.Lock() + got = v + mu.Unlock() + }) + done := make(chan struct{}) + err := lc.Start(context.Background(), func(context.Context) { + defer close(done) + panic("boom") + }) + if err != nil { + t.Fatalf("Start: %v", err) + } + <-done + // Give safeRun's deferred recover time to invoke the callback after + // run returns; the WaitGroup decrement happens BEFORE the recover's + // callback in some race orderings. + deadline := time.Now().Add(time.Second) + for time.Now().Before(deadline) && called.Load() == 0 { + time.Sleep(5 * time.Millisecond) + } + if called.Load() != 1 { + t.Errorf("onPanic call count: got %d, want 1", called.Load()) + } + mu.Lock() + defer mu.Unlock() + if got != "boom" { + t.Errorf("onPanic payload: got %v, want \"boom\"", got) + } + // Shutdown after panic must not block / error. + if err := lc.Shutdown(context.Background()); err != nil { + t.Errorf("Shutdown after panic: %v", err) + } +} + +// TestLifecycle_ShutdownDeadlineReturnsCtxErr pins: a run goroutine +// that ignores ctx cancellation causes Shutdown to return the caller's +// ctx err (wrapping). clockreceiver's run loop honors ctx, so this is +// purely a contract pin so future authors don't silently lose the +// deadline. +func TestLifecycle_ShutdownDeadlineReturnsCtxErr(t *testing.T) { + lc := newLifecycle(slog.Default(), nil) + leak := make(chan struct{}) + if err := lc.Start(context.Background(), func(context.Context) { + <-leak // ignore ctx + }); err != nil { + t.Fatalf("Start: %v", err) + } + t.Cleanup(func() { close(leak) }) + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + err := lc.Shutdown(shutdownCtx) + if !errors.Is(err, context.DeadlineExceeded) { + t.Errorf("Shutdown err: got %v, want wrapped DeadlineExceeded", err) + } +} diff --git a/components/receivers/clockreceiver/selftel.go b/components/receivers/clockreceiver/selftel.go new file mode 100644 index 00000000..acc1b5ee --- /dev/null +++ b/components/receivers/clockreceiver/selftel.go @@ -0,0 +1,258 @@ +// SPDX-License-Identifier: Apache-2.0 + +// Receiver-scoped self-telemetry surface. Replaces the v0.1.x +// dependency on `internal/selftelemetry`, which is slated for deletion +// in RFC-0013 PR-F. Metric names + label shape are preserved +// (`tracecore.receiver.errors_total{kind,component_id}` and siblings) +// so dashboards / alerts don't regress. The instrumentation scope name +// is THIS receiver's Go import path — when clockreceiver is deleted in +// PR-K this file goes with it; until then it carries the same shape as +// the nccl_fr sibling so reviewers diff one pattern, not two. + +package clockreceiver + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/tracecoreai/tracecore/internal/pipeline" +) + +// kind is a low-cardinality error-class identifier. Mirrors the +// internal/selftelemetry.Kind type so the migration is mechanical. +// clockreceiver only ever ticks downstream / panic, so the local set +// is trimmed to those two — adding a third should land alongside the +// hot-path call site that emits it (no speculative kinds). +type kind string + +const ( + kindDownstream kind = "downstream" // next consumer's ConsumeMetrics errored + kindPanic kind = "panic" // recovered panic in hot path +) + +// reasonInstrumentRegister labels init_errors_total ticks when OTel +// instrument registration failed at construction time. +const reasonInstrumentRegister = "instrument_register" + +// instrumentationScope pins the OTel scope name. Per OTel convention, +// the scope is the package's Go import path. clockreceiver is slated +// for deletion in PR-K; until then its scope follows the same standard +// the nccl_fr sibling pins. +const instrumentationScope = "github.com/tracecoreai/tracecore/components/receivers/clockreceiver" + +// errNilMeterProvider mirrors selftelemetry.ErrNilMeterProvider — the +// factory is responsible for substituting the noop fallback + ticking +// init_errors_total. Returning a sentinel rather than a generic error +// lets the factory distinguish "wire-up bug" from "instrument register +// failure" if it ever needs to. +var errNilMeterProvider = errors.New("clockreceiver: MeterProvider is nil") + +// selfTelemetry is the receiver-scoped self-health surface. Methods are +// non-blocking + safe for concurrent use; the noop impl discards. +// Trimmed to the exact surface clockreceiver uses. +type selfTelemetry interface { + IncError(k kind) + IncEmissions(n int64) + ObserveLatency(d time.Duration) + SetDegraded(degraded bool) + MarkActivity() +} + +// noopSelfTelemetry discards every call. +type noopSelfTelemetry struct{} + +func newNoopSelfTelemetry() selfTelemetry { return noopSelfTelemetry{} } + +func (noopSelfTelemetry) IncError(kind) {} +func (noopSelfTelemetry) IncEmissions(int64) {} +func (noopSelfTelemetry) ObserveLatency(time.Duration) {} +func (noopSelfTelemetry) SetDegraded(bool) {} +func (noopSelfTelemetry) MarkActivity() {} + +var _ selfTelemetry = noopSelfTelemetry{} + +// newSelfTelemetry returns a real selfTelemetry backed by OTel metric +// instruments acquired from mp. The component's id is attached as the +// `component_id` label on every emission. Registers the same five +// instruments the v0.1.x internal selftelemetry package registered, so +// scraped metric names + label shape are unchanged. +func newSelfTelemetry(id pipeline.ID, mp metric.MeterProvider) (selfTelemetry, error) { + if mp == nil { + return nil, errNilMeterProvider + } + meter := mp.Meter(instrumentationScope) + attrSet := attribute.NewSet(attribute.String("component_id", id.String())) + + errsCtr, err := meter.Int64Counter( + "tracecore.receiver.errors_total", + metric.WithDescription("Errors observed by a receiver, partitioned by kind"), + ) + if err != nil { + return nil, fmt.Errorf("errors_total counter: %w", err) + } + emissionsCtr, err := meter.Int64Counter( + "tracecore.receiver.emissions_total", + metric.WithDescription("Data points / events emitted by a receiver"), + ) + if err != nil { + return nil, fmt.Errorf("emissions_total counter: %w", err) + } + latencyHist, err := meter.Float64Histogram( + "tracecore.receiver.collection_latency_seconds", + metric.WithDescription("Receiver collection cycle latency in seconds"), + metric.WithUnit("s"), + // Bucket boundaries chosen for sub-millisecond tick cycles up + // to 10s slow paths; mirrors the internal/selftelemetry shape + // so histograms remain comparable across the migration. + metric.WithExplicitBucketBoundaries( + 0.0001, 0.001, 0.005, 0.01, 0.05, + 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, + ), + ) + if err != nil { + return nil, fmt.Errorf("collection_latency_seconds histogram: %w", err) + } + + st := &selfTelemetryImpl{ + componentID: id.String(), + attrs: attrSet, + errors: errsCtr, + emissions: emissionsCtr, + latency: latencyHist, + } + // Seed last-activity to construction time so a `time() - last_activity + // > N` alert doesn't fire on the zero-valued gauge during boot. + st.activityUnix.Store(time.Now().Unix()) + + if _, err := meter.Float64ObservableCounter( + "tracecore.receiver.degraded_seconds_total", + metric.WithDescription("Cumulative seconds the receiver has been in the degraded state"), + metric.WithUnit("s"), + metric.WithFloat64Callback(func(_ context.Context, obs metric.Float64Observer) error { + obs.Observe(st.degradedTotalSeconds(), metric.WithAttributeSet(attrSet)) + return nil + }), + ); err != nil { + return nil, fmt.Errorf("degraded_seconds_total observable: %w", err) + } + + if _, err := meter.Int64ObservableGauge( + "tracecore.receiver.last_activity_unix_seconds", + metric.WithDescription("Unix-second timestamp of the receiver's last successful activity"), + metric.WithInt64Callback(func(_ context.Context, obs metric.Int64Observer) error { + obs.Observe(st.activityUnix.Load(), metric.WithAttributeSet(attrSet)) + return nil + }), + ); err != nil { + return nil, fmt.Errorf("last_activity_unix_seconds observable: %w", err) + } + + return st, nil +} + +var _ selfTelemetry = (*selfTelemetryImpl)(nil) + +type selfTelemetryImpl struct { + componentID string + attrs attribute.Set + errors metric.Int64Counter + emissions metric.Int64Counter + latency metric.Float64Histogram + + // degradedAt holds the time of the most recent SetDegraded(true); + // nil pointer = not currently degraded. Atomic so SetDegraded is + // lock-free and the observable callback reads a stable snapshot. + degradedAt atomic.Pointer[time.Time] + + // accumulated holds nanoseconds spent degraded across completed + // degrade→recover cycles; degradedTotalSeconds adds the open + // interval at observation time. + accumulated atomic.Uint64 + + // activityUnix holds the Unix-second timestamp of the most recent + // MarkActivity (seeded to construction time). + activityUnix atomic.Int64 +} + +func (s *selfTelemetryImpl) IncError(k kind) { + // Emit component_id + kind in one WithAttributes call rather than + // merging two attribute sets — avoids relying on SDK merge semantics + // that vary across OTel versions. + s.errors.Add(context.Background(), 1, metric.WithAttributes( + attribute.String("component_id", s.componentID), + attribute.String("kind", string(k)), + )) +} + +func (s *selfTelemetryImpl) IncEmissions(n int64) { + if n < 0 { + return + } + s.emissions.Add(context.Background(), n, metric.WithAttributeSet(s.attrs)) +} + +func (s *selfTelemetryImpl) ObserveLatency(d time.Duration) { + s.latency.Record(context.Background(), d.Seconds(), metric.WithAttributeSet(s.attrs)) +} + +// SetDegraded transitions degraded state. Lock-free: enter via +// CAS(nil → &now), exit via Swap → nil + accumulate the elapsed +// interval. Microsecond-scale under-count on concurrent transitions is +// tolerated; self-corrects on the next scrape. +func (s *selfTelemetryImpl) SetDegraded(degraded bool) { + if degraded { + now := time.Now() + s.degradedAt.CompareAndSwap(nil, &now) + return + } + if old := s.degradedAt.Swap(nil); old != nil { + elapsed := time.Since(*old) + if elapsed > 0 { + s.accumulated.Add(uint64(elapsed.Nanoseconds())) + } + } +} + +func (s *selfTelemetryImpl) MarkActivity() { + s.activityUnix.Store(time.Now().Unix()) +} + +func (s *selfTelemetryImpl) degradedTotalSeconds() float64 { + acc := time.Duration(s.accumulated.Load()) + if openStart := s.degradedAt.Load(); openStart != nil { + acc += time.Since(*openStart) + } + return acc.Seconds() +} + +// recordInitError ticks tracecore.selftelemetry.init_errors_total when +// receiver wiring falls back to noop telemetry. Operators alert on +// `> 0` to learn that self-telemetry isn't really plugged in. Panics +// from a broken MeterProvider are swallowed — recordInitError IS the +// degraded-path fallback; crashing here would turn a partial outage +// into a process kill. +func recordInitError(ctx context.Context, mp metric.MeterProvider, kindLabel, componentID, reason string) { + defer func() { _ = recover() }() + if mp == nil { + return + } + meter := mp.Meter(instrumentationScope) + c, err := meter.Int64Counter( + "tracecore.selftelemetry.init_errors_total", + metric.WithDescription("Counter of self-telemetry construction failures that fell back to the noop implementation."), + ) + if err != nil { + return + } + c.Add(ctx, 1, metric.WithAttributes( + attribute.String("kind", kindLabel), + attribute.String("component_id", componentID), + attribute.String("reason", reason), + )) +} diff --git a/components/receivers/clockreceiver/selftel_test.go b/components/receivers/clockreceiver/selftel_test.go new file mode 100644 index 00000000..7d52593a --- /dev/null +++ b/components/receivers/clockreceiver/selftel_test.go @@ -0,0 +1,381 @@ +// SPDX-License-Identifier: Apache-2.0 + +package clockreceiver + +import ( + "context" + "errors" + "fmt" + "strings" + "testing" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/embedded" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + + "github.com/tracecoreai/tracecore/internal/pipeline" +) + +// newTestMeterProvider builds an SDK MeterProvider backed by a ManualReader +// so tests can collect metricdata.ResourceMetrics deterministically without +// the Prometheus exporter or any internal/telemetry plumbing — the receiver +// package must stay decoupled from internal/* so PR-F can delete those +// packages without touching this test file. +func newTestMeterProvider(t *testing.T) (*sdkmetric.MeterProvider, *sdkmetric.ManualReader) { + t.Helper() + rdr := sdkmetric.NewManualReader() + mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)) + t.Cleanup(func() { _ = mp.Shutdown(context.Background()) }) + return mp, rdr +} + +func collectMetrics(t *testing.T, rdr *sdkmetric.ManualReader) metricdata.ResourceMetrics { + t.Helper() + var rm metricdata.ResourceMetrics + if err := rdr.Collect(context.Background(), &rm); err != nil { + t.Fatalf("collect: %v", err) + } + return rm +} + +// findInstrument returns the first metricdata.Metrics whose Name matches the +// supplied OTel-dot name. Scope-agnostic. +func findInstrument(rm metricdata.ResourceMetrics, name string) (metricdata.Metrics, bool) { + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == name { + return m, true + } + } + } + return metricdata.Metrics{}, false +} + +// scopeOf returns the instrumentation scope name that emitted the supplied +// metric. Used to pin the scope-name standard for PR-B (clockreceiver). +func scopeOf(rm metricdata.ResourceMetrics, name string) (string, bool) { + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == name { + return sm.Scope.Name, true + } + } + } + return "", false +} + +// kvMatch returns true if every want key's value matches the int64 +// datapoint's attribute set. +func kvMatch(dp metricdata.DataPoint[int64], want map[string]string) bool { + for k, v := range want { + got, ok := dp.Attributes.Value(attribute.Key(k)) + if !ok || got.AsString() != v { + return false + } + } + return true +} + +// TestSelfTelemetry_NoopAlwaysSafe pins: newNoopSelfTelemetry returns a +// value whose hot-path methods never panic and silently discard. The +// clockreceiver hot path calls into selfTelemetry without nil-checks; the +// noop must be a real value. +func TestSelfTelemetry_NoopAlwaysSafe(t *testing.T) { + st := newNoopSelfTelemetry() + defer func() { + if r := recover(); r != nil { + t.Fatalf("noop panicked: %v", r) + } + }() + st.IncError(kindDownstream) + st.IncError(kindPanic) + st.IncEmissions(42) + st.IncEmissions(-1) + st.ObserveLatency(15 * time.Millisecond) + st.SetDegraded(true) + st.SetDegraded(false) + st.MarkActivity() +} + +// TestSelfTelemetry_NewReceiver_NilProviderErrors pins: newSelfTelemetry +// returns errNilMeterProvider when called with a nil provider rather than +// silently substituting noop — the factory is responsible for the fallback +// + the recordInitError tick. +func TestSelfTelemetry_NewReceiver_NilProviderErrors(t *testing.T) { + _, err := newSelfTelemetry(testID(), nil) + if !errors.Is(err, errNilMeterProvider) { + t.Fatalf("err = %v, want errNilMeterProvider", err) + } +} + +// TestSelfTelemetry_EmitsErrorsTotal_WithKindAndComponentID pins the M2 +// metric contract. After IncError(kindDownstream) ×2 + IncError(kindPanic) ×1, +// the ManualReader collects tracecore.receiver.errors_total with datapoints +// partitioned by kind and labeled with the component_id. A regression that +// drops the kind label, the component_id label, or the metric-name prefix +// fails here. +func TestSelfTelemetry_EmitsErrorsTotal_WithKindAndComponentID(t *testing.T) { + mp, rdr := newTestMeterProvider(t) + st, err := newSelfTelemetry(testID(), mp) + if err != nil { + t.Fatalf("newSelfTelemetry: %v", err) + } + st.IncError(kindDownstream) + st.IncError(kindDownstream) + st.IncError(kindPanic) + + rm := collectMetrics(t, rdr) + m, ok := findInstrument(rm, "tracecore.receiver.errors_total") + if !ok { + t.Fatalf("metric tracecore.receiver.errors_total absent; have: %s", dumpNames(rm)) + } + sum, ok := m.Data.(metricdata.Sum[int64]) + if !ok { + t.Fatalf("errors_total data shape: got %T, want metricdata.Sum[int64]", m.Data) + } + gotDown, foundDown := 0, false + gotPanic, foundPanic := 0, false + for _, dp := range sum.DataPoints { + if !kvMatch(dp, map[string]string{"component_id": "clockreceiver/test"}) { + t.Errorf("datapoint missing component_id=clockreceiver/test: %v", dp.Attributes) + continue + } + kind, _ := dp.Attributes.Value("kind") + switch kind.AsString() { + case "downstream": + gotDown = int(dp.Value) + foundDown = true + case "panic": + gotPanic = int(dp.Value) + foundPanic = true + } + } + if !foundDown || gotDown != 2 { + t.Errorf("downstream count: got %d (found=%v), want 2", gotDown, foundDown) + } + if !foundPanic || gotPanic != 1 { + t.Errorf("panic count: got %d (found=%v), want 1", gotPanic, foundPanic) + } +} + +// TestSelfTelemetry_EmitsEmissionsTotal pins: IncEmissions surfaces a +// monotonic counter; negative values are silently discarded per the +// interface contract (kept identical to internal/selftelemetry to avoid +// regression). +func TestSelfTelemetry_EmitsEmissionsTotal(t *testing.T) { + mp, rdr := newTestMeterProvider(t) + st, err := newSelfTelemetry(testID(), mp) + if err != nil { + t.Fatalf("newSelfTelemetry: %v", err) + } + st.IncEmissions(3) + st.IncEmissions(5) + st.IncEmissions(-1) + rm := collectMetrics(t, rdr) + m, ok := findInstrument(rm, "tracecore.receiver.emissions_total") + if !ok { + t.Fatalf("metric tracecore.receiver.emissions_total absent; have: %s", dumpNames(rm)) + } + sum, ok := m.Data.(metricdata.Sum[int64]) + if !ok { + t.Fatalf("emissions_total data shape: got %T, want metricdata.Sum[int64]", m.Data) + } + if len(sum.DataPoints) != 1 { + t.Fatalf("emissions datapoints: got %d, want 1", len(sum.DataPoints)) + } + if sum.DataPoints[0].Value != 8 { + t.Errorf("emissions value: got %d, want 8", sum.DataPoints[0].Value) + } +} + +// TestSelfTelemetry_ScopeNameIsReceiverImportPath pins the OTel scope-name +// standard: instrumentation scope = receiver's Go import path. Anchors the +// PR-B decision (vs reusing the deleted internal/selftelemetry scope) so a +// future drift back to the internal name fails here. +func TestSelfTelemetry_ScopeNameIsReceiverImportPath(t *testing.T) { + mp, rdr := newTestMeterProvider(t) + st, err := newSelfTelemetry(testID(), mp) + if err != nil { + t.Fatalf("newSelfTelemetry: %v", err) + } + st.IncEmissions(1) + rm := collectMetrics(t, rdr) + scope, ok := scopeOf(rm, "tracecore.receiver.emissions_total") + if !ok { + t.Fatalf("emissions_total absent") + } + const wantScope = "github.com/tracecoreai/tracecore/components/receivers/clockreceiver" + if scope != wantScope { + t.Errorf("instrumentation scope: got %q, want %q", scope, wantScope) + } +} + +// TestRecordInitError_TicksInitErrorsCounter pins: when factory wiring +// fails (newSelfTelemetry returns an error), recordInitError surfaces a +// tracecore.selftelemetry.init_errors_total tick with kind="receiver", +// the component_id label, and reason="instrument_register". Dropping the +// recordInitError call must fail this test. +func TestRecordInitError_TicksInitErrorsCounter(t *testing.T) { + mp, rdr := newTestMeterProvider(t) + recordInitError(context.Background(), mp, "receiver", testID().String(), reasonInstrumentRegister) + + rm := collectMetrics(t, rdr) + m, ok := findInstrument(rm, "tracecore.selftelemetry.init_errors_total") + if !ok { + t.Fatalf("init_errors_total absent; have: %s", dumpNames(rm)) + } + sum, ok := m.Data.(metricdata.Sum[int64]) + if !ok { + t.Fatalf("init_errors_total data shape: got %T, want metricdata.Sum[int64]", m.Data) + } + if len(sum.DataPoints) != 1 { + t.Fatalf("init_errors datapoints: got %d, want 1", len(sum.DataPoints)) + } + dp := sum.DataPoints[0] + want := map[string]string{ + "kind": "receiver", + "component_id": "clockreceiver/test", + "reason": reasonInstrumentRegister, + } + if !kvMatch(dp, want) { + t.Errorf("init_errors attrs: got %v, want %v", dp.Attributes, want) + } + if dp.Value != 1 { + t.Errorf("init_errors value: got %d, want 1", dp.Value) + } +} + +// TestRecordInitError_NilProviderIsSafe pins: a nil MeterProvider must +// not panic — recordInitError IS the fallback path; crashing here would +// turn a partial degradation into a process kill. +func TestRecordInitError_NilProviderIsSafe(t *testing.T) { + defer func() { + if r := recover(); r != nil { + t.Fatalf("recordInitError(nil) panicked: %v", r) + } + }() + recordInitError(context.Background(), nil, "receiver", "x/y", reasonInstrumentRegister) +} + +// TestFactory_FallsBackToNoopWhenMeterFails pins the factory +// observability contract end-to-end: when newSelfTelemetry returns an +// error (synthetic register failure for every tracecore.receiver.* +// instrument), the factory MUST (1) leave the receiver with a working +// noop telemetry field (no nil, no panic on hot-path calls), AND (2) +// tick tracecore.selftelemetry.init_errors_total via recordInitError. +func TestFactory_FallsBackToNoopWhenMeterFails(t *testing.T) { + mp, rdr := newTestMeterProvider(t) + failing := &failingReceiverMP{real: mp} + + set := pipeline.CreateSettings{ + ID: pipeline.MustNewID(pipeline.MustNewType("clockreceiver"), "test"), + } + set.Telemetry.MeterProvider = failing + cfg := &Config{Interval: 1 * time.Second} + r := newReceiver(context.Background(), set, cfg, nil) + if r.telemetry == nil { + t.Fatal("telemetry field nil after failed wiring; must fall back to noop") + } + // Hot-path call must not panic + must not surface (noop discards). + r.telemetry.IncError(kindDownstream) + + rm := collectMetrics(t, rdr) + if m, ok := findInstrument(rm, "tracecore.receiver.errors_total"); ok { + if sum, ok := m.Data.(metricdata.Sum[int64]); ok && len(sum.DataPoints) > 0 { + t.Errorf("noop fallback leaked IncError into errors_total datapoints: %v", sum.DataPoints) + } + } + m, ok := findInstrument(rm, "tracecore.selftelemetry.init_errors_total") + if !ok { + t.Fatalf("init_errors_total absent after factory fallback; have: %s", dumpNames(rm)) + } + sum, ok := m.Data.(metricdata.Sum[int64]) + if !ok { + t.Fatalf("init_errors_total data shape: got %T", m.Data) + } + if len(sum.DataPoints) != 1 || sum.DataPoints[0].Value != 1 { + t.Errorf("init_errors_total: want 1 datapoint value=1, got %v", sum.DataPoints) + } +} + +func testID() pipeline.ID { + return pipeline.MustNewID(pipeline.MustNewType("clockreceiver"), "test") +} + +func dumpNames(rm metricdata.ResourceMetrics) string { + var b strings.Builder + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + fmt.Fprintf(&b, " %s@%s", m.Name, sm.Scope.Name) + } + } + return b.String() +} + +// failingReceiverMP wraps a real MeterProvider but fails every instrument +// registration whose name starts with "tracecore.receiver.". Mirrors the +// nccl_fr sibling seam so a future refactor that reorders the +// newSelfTelemetry constructor doesn't silently bypass coverage. +type failingReceiverMP struct { + embedded.MeterProvider + real metric.MeterProvider +} + +func (p *failingReceiverMP) Meter(name string, opts ...metric.MeterOption) metric.Meter { + return &failingReceiverMeter{Meter: p.real.Meter(name, opts...)} +} + +type failingReceiverMeter struct { + metric.Meter +} + +const receiverInstrumentPrefix = "tracecore.receiver." + +var errSyntheticReceiverFailure = errors.New("synthetic: receiver instrument registration failed") + +func (m *failingReceiverMeter) Int64Counter(name string, opts ...metric.Int64CounterOption) (metric.Int64Counter, error) { + if strings.HasPrefix(name, receiverInstrumentPrefix) { + return nil, errSyntheticReceiverFailure + } + c, err := m.Meter.Int64Counter(name, opts...) + if err != nil { + return nil, fmt.Errorf("failingReceiverMeter passthrough: %w", err) + } + return c, nil +} + +func (m *failingReceiverMeter) Float64Histogram(name string, opts ...metric.Float64HistogramOption) (metric.Float64Histogram, error) { + if strings.HasPrefix(name, receiverInstrumentPrefix) { + return nil, errSyntheticReceiverFailure + } + h, err := m.Meter.Float64Histogram(name, opts...) + if err != nil { + return nil, fmt.Errorf("failingReceiverMeter passthrough: %w", err) + } + return h, nil +} + +func (m *failingReceiverMeter) Float64ObservableCounter(name string, opts ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) { + if strings.HasPrefix(name, receiverInstrumentPrefix) { + return nil, errSyntheticReceiverFailure + } + c, err := m.Meter.Float64ObservableCounter(name, opts...) + if err != nil { + return nil, fmt.Errorf("failingReceiverMeter passthrough: %w", err) + } + return c, nil +} + +func (m *failingReceiverMeter) Int64ObservableGauge(name string, opts ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) { + if strings.HasPrefix(name, receiverInstrumentPrefix) { + return nil, errSyntheticReceiverFailure + } + g, err := m.Meter.Int64ObservableGauge(name, opts...) + if err != nil { + return nil, fmt.Errorf("failingReceiverMeter passthrough: %w", err) + } + return g, nil +}