Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 32 additions & 32 deletions components/receivers/clockreceiver/clockreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (

"github.com/tracecoreai/tracecore/internal/consumer"
"github.com/tracecoreai/tracecore/internal/pipeline"
"github.com/tracecoreai/tracecore/internal/runtime/lifecycle"
"github.com/tracecoreai/tracecore/internal/selftelemetry"
)

// clockReceiver emits a gauge metric (`tracecore.clock.now`) on a
Expand All @@ -23,12 +21,14 @@ import (
// their own clock.
//
// Embeds pipeline.ComponentState so Started()/Stopped() accessors
// work for tests. Delegates goroutine cancel + WaitGroup +
// panic-recovery to `internal/runtime/lifecycle.Lifecycle` — the
// same helper kernelevents (M9) uses, so the two receivers share
// one mechanism for streaming/tick lifecycle (PRINCIPLES.md §3).
// Self-telemetry wired via the M2 surface (errors / emissions /
// latency / activity) through `selftelemetry.Receiver`.
// work for tests. Cancel + WaitGroup + panic-recovery are delegated
// to the receiver-local `lifecycle` sibling (RFC-0013 PR-B), which
// replaces the v0.1.x dependency on `internal/runtime/lifecycle`.
// Self-telemetry is wired via the receiver-local `selfTelemetry`
// sibling (`tracecore.receiver.{errors_total,emissions_total,
// collection_latency_seconds,degraded_seconds_total,
// last_activity_unix_seconds}`), which replaces the v0.1.x dependency
// on `internal/selftelemetry`.
type clockReceiver struct {
pipeline.ComponentState

Expand All @@ -38,43 +38,43 @@ type clockReceiver struct {
next consumer.Metrics

// telemetry records self-telemetry (errors, emissions, latency,
// activity) per the M2 wiring pattern. Always non-nil — the
// factory substitutes a noop if construction fails.
telemetry selftelemetry.Receiver
// activity). Always non-nil — the factory substitutes a noop if
// construction fails.
telemetry selfTelemetry

lc *lifecycle.Lifecycle
lc *lifecycle
}

func newReceiver(ctx context.Context, set pipeline.CreateSettings, cfg *Config, next consumer.Metrics) *clockReceiver {
// Construct the real selftelemetry.Receiver. If MeterProvider is
// somehow nil or construction fails, fall back to the noop so the
// hot path doesn't have to nil-check. The fallback emits a
// Construct the real selfTelemetry. If MeterProvider is somehow nil
// or construction fails, fall back to the noop so the hot path
// doesn't have to nil-check. The fallback emits a
// `tracecore.selftelemetry.init_errors_total` tick so operators
// see when a component is silently running with a noop.
sr := selftelemetry.NewNoopReceiver()
logger := set.Telemetry.Logger
if logger == nil {
logger = slog.Default()
}
st := newNoopSelfTelemetry()
if set.Telemetry.MeterProvider == nil {
if set.Telemetry.Logger != nil {
set.Telemetry.Logger.Warn("self-telemetry: no MeterProvider; using noop")
}
} else if r, err := selftelemetry.NewReceiver(set.ID, set.Telemetry.MeterProvider); err == nil {
sr = r
logger.Warn("clockreceiver self-telemetry: no MeterProvider; using noop")
} else if rt, err := newSelfTelemetry(set.ID, set.Telemetry.MeterProvider); err == nil {
st = rt
} else {
selftelemetry.RecordInitError(ctx, set.Telemetry.MeterProvider,
"receiver", set.ID.String(), selftelemetry.ReasonInstrumentRegister)
if set.Telemetry.Logger != nil {
set.Telemetry.Logger.Warn("self-telemetry init failed; using noop", "err", err)
}
recordInitError(ctx, set.Telemetry.MeterProvider,
"receiver", set.ID.String(), reasonInstrumentRegister)
logger.Warn("clockreceiver self-telemetry init failed; using noop", "err", err)
}

r := &clockReceiver{
logger: set.Telemetry.Logger,
logger: logger,
resource: set.Telemetry.Resource,
interval: cfg.Interval,
next: next,
telemetry: sr,
telemetry: st,
}
r.lc = lifecycle.New(r.logger, func(_ any) {
r.telemetry.IncError(selftelemetry.KindPanic)
r.lc = newLifecycle(r.logger, func(_ any) {
r.telemetry.IncError(kindPanic)
// SetDegraded(true) after panic: goroutine is dead, no recovery path.
r.telemetry.SetDegraded(true)
})
Expand All @@ -83,7 +83,7 @@ func newReceiver(ctx context.Context, set pipeline.CreateSettings, cfg *Config,

// Start spawns the ticker goroutine via the lifecycle helper. The
// caller's ctx authorizes the start; the ticker's lifetime is bound
// to Shutdown via lifecycle.Lifecycle's internal cancel.
// to Shutdown via lifecycle's internal cancel.
func (r *clockReceiver) Start(ctx context.Context, host pipeline.Host) error {
if err := r.ComponentState.Start(ctx, host); err != nil {
return err
Expand Down Expand Up @@ -145,7 +145,7 @@ func (r *clockReceiver) emit(ctx context.Context, now time.Time) {
r.telemetry.ObserveLatency(time.Since(start))

if err != nil {
r.telemetry.IncError(selftelemetry.KindDownstream)
r.telemetry.IncError(kindDownstream)
r.logger.Warn("clockreceiver: downstream rejected push", "err", err)
return
}
Expand Down
138 changes: 138 additions & 0 deletions components/receivers/clockreceiver/lifecycle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// SPDX-License-Identifier: Apache-2.0

// Receiver-scoped tick-source lifecycle helper. Replaces the v0.1.x
// dependency on `internal/runtime/lifecycle`, which is slated for
// deletion in RFC-0013 PR-F. Owns the cancel + WaitGroup +
// panic-recovery bookkeeping clockreceiver's ticker goroutine needs.
// Slimmer than the internal helper: no Add() (clockreceiver is
// single-source), no post-Shutdown Add silent-no-op path.

package clockreceiver

import (
"context"
"errors"
"fmt"
"log/slog"
"runtime"
"sync"
"sync/atomic"
)

// errLifecycleAlreadyStarted is returned by lifecycle.Start when called
// twice without an intervening Shutdown. errors.Is-comparable so callers
// don't string-match the message.
var errLifecycleAlreadyStarted = errors.New("clockreceiver lifecycle: already started")

// panicCallback is invoked once if the Run function panics. The helper
// recovers the panic so the receiver never crashes the workload
// (PRINCIPLES.md §1). clockreceiver wires this to IncError(kindPanic) +
// SetDegraded(true).
type panicCallback func(panicValue any)

// lifecycle bundles cancel + WaitGroup + started-flag for a tick
// source. Zero-value is NOT useful; use newLifecycle.
//
// Shutdown is idempotent. The FIRST Shutdown's error (typically a
// caller-ctx deadline) is stashed + returned by every subsequent
// Shutdown so deadline failures aren't silently swallowed.
type lifecycle struct {
logger *slog.Logger
onPanic panicCallback

mu sync.Mutex
cancel context.CancelFunc
closed bool
shutdownErr error
wg sync.WaitGroup
started atomic.Bool
}

// newLifecycle constructs a lifecycle. logger may be nil (replaced with
// slog.Default for tests). onPanic may be nil — panics are still
// recovered + logged at ERROR but no callback fires.
func newLifecycle(logger *slog.Logger, onPanic panicCallback) *lifecycle {
if logger == nil {
logger = slog.Default()
}
return &lifecycle{logger: logger, onPanic: onPanic}
}

// Start spawns run in a goroutine. The ctx passed to run is derived
// from parent via context.WithCancel — so the receiver-level parent's
// cancellation cascades into the goroutine without an explicit Shutdown
// call. Idempotent: a second Start without an intervening Shutdown
// returns errLifecycleAlreadyStarted.
func (l *lifecycle) Start(parent context.Context, run func(context.Context)) error {
if !l.started.CompareAndSwap(false, true) {
return errLifecycleAlreadyStarted
}
l.mu.Lock()
internalCtx, cancel := context.WithCancel(parent)
l.cancel = cancel
// wg.Add(1) MUST happen under the same mutex as cancel so a
// concurrent Shutdown sees the post-Add state. Without this lock,
// Shutdown could observe cancel != nil, call wg.Wait at counter=0,
// return immediately, and either trigger `sync: WaitGroup misuse`
// OR orphan the goroutine. (Mirrors the internal helper's fix.)
l.wg.Add(1)
l.mu.Unlock()
go l.safeRun(internalCtx, run)
return nil
}

// safeRun wraps run with panic recovery + wg.Done bookkeeping.
func (l *lifecycle) safeRun(ctx context.Context, run func(context.Context)) {
defer l.wg.Done()
defer func() {
if rec := recover(); rec != nil {
l.logger.Error("clockreceiver lifecycle: run panic recovered", "panic", fmt.Sprintf("%v", rec))
if l.onPanic != nil {
l.onPanic(rec)
}
}
}()
run(ctx)
}

// Shutdown cancels the internal ctx + waits for the goroutine to exit,
// honoring the caller's ctx deadline. Idempotent: subsequent calls
// return the FIRST call's error so a missed deadline isn't silently
// swallowed.
func (l *lifecycle) Shutdown(ctx context.Context) error {
l.mu.Lock()
if l.closed {
err := l.shutdownErr
l.mu.Unlock()
return err
}
cancel := l.cancel
l.cancel = nil
l.closed = true
l.mu.Unlock()
if cancel == nil {
return nil
}
cancel()

done := make(chan struct{})
go func() {
l.wg.Wait()
close(done)
}()
select {
case <-done:
return nil
case <-ctx.Done():
// NumGoroutine is process-wide, not lifecycle-local; surfacing
// it here lets operators eyeball whether the leak is plausibly
// ours.
l.logger.Warn("clockreceiver lifecycle: shutdown deadline elapsed before goroutine exited",
"process_goroutines", runtime.NumGoroutine())
err := fmt.Errorf("clockreceiver lifecycle shutdown: %w", ctx.Err())
l.mu.Lock()
l.shutdownErr = err
l.mu.Unlock()
return err
}
}
137 changes: 137 additions & 0 deletions components/receivers/clockreceiver/lifecycle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// SPDX-License-Identifier: Apache-2.0

package clockreceiver

import (
"context"
"errors"
"log/slog"
"sync"
"sync/atomic"
"testing"
"time"
)

// TestLifecycle_StartShutdown pins the happy path: Start spawns the
// supplied run function, Shutdown cancels its ctx, run returns, Shutdown
// returns nil.
func TestLifecycle_StartShutdown(t *testing.T) {
lc := newLifecycle(slog.Default(), nil)
started := make(chan struct{})
stopped := make(chan struct{})

err := lc.Start(context.Background(), func(ctx context.Context) {
close(started)
<-ctx.Done()
close(stopped)
})
if err != nil {
t.Fatalf("Start: %v", err)
}
<-started
if err := lc.Shutdown(context.Background()); err != nil {
t.Fatalf("Shutdown: %v", err)
}
select {
case <-stopped:
case <-time.After(time.Second):
t.Fatal("run did not exit within 1s of Shutdown")
}
}

// TestLifecycle_StartTwiceErrors pins: a second Start without an
// intervening Shutdown returns errLifecycleAlreadyStarted. The sentinel
// MUST be errors.Is-comparable so callers don't string-match.
func TestLifecycle_StartTwiceErrors(t *testing.T) {
lc := newLifecycle(slog.Default(), nil)
if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil {
t.Fatalf("first Start: %v", err)
}
defer func() { _ = lc.Shutdown(context.Background()) }()
err := lc.Start(context.Background(), func(context.Context) {})
if !errors.Is(err, errLifecycleAlreadyStarted) {
t.Errorf("second Start err: got %v, want errLifecycleAlreadyStarted", err)
}
}

// TestLifecycle_ShutdownIdempotent pins: a second Shutdown returns the
// first call's error (typically nil), not "double-close" or panic.
func TestLifecycle_ShutdownIdempotent(t *testing.T) {
lc := newLifecycle(slog.Default(), nil)
if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil {
t.Fatalf("Start: %v", err)
}
if err := lc.Shutdown(context.Background()); err != nil {
t.Fatalf("first Shutdown: %v", err)
}
if err := lc.Shutdown(context.Background()); err != nil {
t.Errorf("second Shutdown: got %v, want nil (idempotent)", err)
}
}

// TestLifecycle_PanicCallbackFires pins: if the run goroutine panics,
// the supplied onPanic callback fires exactly once with the panic value
// and the panic is recovered (the process doesn't crash). This is the
// behaviour clockreceiver depends on to translate panic → IncError(kindPanic).
func TestLifecycle_PanicCallbackFires(t *testing.T) {
var called atomic.Int32
var got any
var mu sync.Mutex
lc := newLifecycle(slog.Default(), func(v any) {
called.Add(1)
mu.Lock()
got = v
mu.Unlock()
})
done := make(chan struct{})
err := lc.Start(context.Background(), func(context.Context) {
defer close(done)
panic("boom")
})
if err != nil {
t.Fatalf("Start: %v", err)
}
<-done
// Give safeRun's deferred recover time to invoke the callback after
// run returns; the WaitGroup decrement happens BEFORE the recover's
// callback in some race orderings.
deadline := time.Now().Add(time.Second)
for time.Now().Before(deadline) && called.Load() == 0 {
time.Sleep(5 * time.Millisecond)
}
if called.Load() != 1 {
t.Errorf("onPanic call count: got %d, want 1", called.Load())
}
mu.Lock()
defer mu.Unlock()
if got != "boom" {
t.Errorf("onPanic payload: got %v, want \"boom\"", got)
}
// Shutdown after panic must not block / error.
if err := lc.Shutdown(context.Background()); err != nil {
t.Errorf("Shutdown after panic: %v", err)
}
}

// TestLifecycle_ShutdownDeadlineReturnsCtxErr pins: a run goroutine
// that ignores ctx cancellation causes Shutdown to return the caller's
// ctx err (wrapping). clockreceiver's run loop honors ctx, so this is
// purely a contract pin so future authors don't silently lose the
// deadline.
func TestLifecycle_ShutdownDeadlineReturnsCtxErr(t *testing.T) {
lc := newLifecycle(slog.Default(), nil)
leak := make(chan struct{})
if err := lc.Start(context.Background(), func(context.Context) {
<-leak // ignore ctx
}); err != nil {
t.Fatalf("Start: %v", err)
}
t.Cleanup(func() { close(leak) })

shutdownCtx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
err := lc.Shutdown(shutdownCtx)
if !errors.Is(err, context.DeadlineExceeded) {
t.Errorf("Shutdown err: got %v, want wrapped DeadlineExceeded", err)
}
}
Loading
Loading