Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions components/receivers/nccl_fr/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/tracecoreai/tracecore/internal/consumer"
"github.com/tracecoreai/tracecore/internal/pipeline"
"github.com/tracecoreai/tracecore/internal/selftelemetry"
)

func componentType() pipeline.Type { return pipeline.MustNewType("nccl_fr") }
Expand Down Expand Up @@ -44,11 +43,11 @@ func (*factory) CreateLogs(ctx context.Context, set pipeline.CreateSettings, cfg
}
r := newReceiver(set, c, next)
if set.Telemetry.MeterProvider != nil {
if rt, err := selftelemetry.NewReceiver(set.ID, set.Telemetry.MeterProvider); err == nil {
if rt, err := newSelfTelemetry(set.ID, set.Telemetry.MeterProvider); err == nil {
r.telemetry = rt
} else {
selftelemetry.RecordInitError(ctx, set.Telemetry.MeterProvider,
"receiver", set.ID.String(), selftelemetry.ReasonInstrumentRegister)
recordInitError(ctx, set.Telemetry.MeterProvider,
"receiver", set.ID.String(), reasonInstrumentRegister)
if set.Telemetry.Logger != nil {
set.Telemetry.Logger.Warn("nccl_fr self-telemetry init failed; using noop", "err", err)
}
Expand Down
139 changes: 139 additions & 0 deletions components/receivers/nccl_fr/lifecycle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// SPDX-License-Identifier: Apache-2.0

// Receiver-scoped streaming-source lifecycle helper. Replaces the
// v0.1.x dependency on `internal/runtime/lifecycle`, which is slated
// for deletion in RFC-0013 PR-F. Owns the cancel + WaitGroup +
// panic-recovery bookkeeping nccl_fr's poller goroutine needs, so the
// receiver author writes the body function, not the plumbing. Slimmer
// than the internal helper: no Add() (nccl_fr is single-source), no
// post-Shutdown Add silent-no-op path.

package ncclfr

import (
"context"
"errors"
"fmt"
"log/slog"
"runtime"
"sync"
"sync/atomic"
)

// errLifecycleAlreadyStarted is returned by lifecycle.Start when called
// twice without an intervening Shutdown. errors.Is-comparable so callers
// don't string-match the message.
var errLifecycleAlreadyStarted = errors.New("nccl_fr lifecycle: already started")

// panicCallback is invoked once if the Run function panics. The helper
// recovers the panic so the receiver never crashes the workload
// (PRINCIPLES.md §1). nccl_fr wires this to IncError(kindPanic) +
// SetDegraded(true).
type panicCallback func(panicValue any)

// lifecycle bundles cancel + WaitGroup + started-flag for a streaming
// source. Zero-value is NOT useful; use newLifecycle.
//
// Shutdown is idempotent. The FIRST Shutdown's error (typically a
// caller-ctx deadline) is stashed + returned by every subsequent
// Shutdown so deadline failures aren't silently swallowed.
type lifecycle struct {
logger *slog.Logger
onPanic panicCallback

mu sync.Mutex
cancel context.CancelFunc
closed bool
shutdownErr error
wg sync.WaitGroup
started atomic.Bool
}

// newLifecycle constructs a lifecycle. logger may be nil (replaced with
// slog.Default for tests). onPanic may be nil — panics are still
// recovered + logged at ERROR but no callback fires.
func newLifecycle(logger *slog.Logger, onPanic panicCallback) *lifecycle {
if logger == nil {
logger = slog.Default()
}
return &lifecycle{logger: logger, onPanic: onPanic}
}

// Start spawns run in a goroutine. The ctx passed to run is derived
// from parent via context.WithCancel — so the receiver-level parent's
// cancellation cascades into the goroutine without an explicit Shutdown
// call. Idempotent: a second Start without an intervening Shutdown
// returns errLifecycleAlreadyStarted.
func (l *lifecycle) Start(parent context.Context, run func(context.Context)) error {
if !l.started.CompareAndSwap(false, true) {
return errLifecycleAlreadyStarted
}
l.mu.Lock()
internalCtx, cancel := context.WithCancel(parent)
l.cancel = cancel
// wg.Add(1) MUST happen under the same mutex as cancel so a
// concurrent Shutdown sees the post-Add state. Without this lock,
// Shutdown could observe cancel != nil, call wg.Wait at counter=0,
// return immediately, and either trigger `sync: WaitGroup misuse`
// OR orphan the goroutine. (Mirrors the internal helper's fix.)
l.wg.Add(1)
l.mu.Unlock()
go l.safeRun(internalCtx, run)
return nil
}

// safeRun wraps run with panic recovery + wg.Done bookkeeping.
func (l *lifecycle) safeRun(ctx context.Context, run func(context.Context)) {
defer l.wg.Done()
defer func() {
if rec := recover(); rec != nil {
l.logger.Error("nccl_fr lifecycle: run panic recovered", "panic", fmt.Sprintf("%v", rec))
if l.onPanic != nil {
l.onPanic(rec)
}
}
}()
run(ctx)
}

// Shutdown cancels the internal ctx + waits for the goroutine to exit,
// honoring the caller's ctx deadline. Idempotent: subsequent calls
// return the FIRST call's error so a missed deadline isn't silently
// swallowed.
func (l *lifecycle) Shutdown(ctx context.Context) error {
l.mu.Lock()
if l.closed {
err := l.shutdownErr
l.mu.Unlock()
return err
}
cancel := l.cancel
l.cancel = nil
l.closed = true
l.mu.Unlock()
if cancel == nil {
return nil
}
cancel()

done := make(chan struct{})
go func() {
l.wg.Wait()
close(done)
}()
select {
case <-done:
return nil
case <-ctx.Done():
// NumGoroutine is process-wide, not lifecycle-local; surfacing
// it here lets operators eyeball whether the leak is plausibly
// ours.
l.logger.Warn("nccl_fr lifecycle: shutdown deadline elapsed before goroutine exited",
"process_goroutines", runtime.NumGoroutine())
err := fmt.Errorf("nccl_fr lifecycle shutdown: %w", ctx.Err())
l.mu.Lock()
l.shutdownErr = err
l.mu.Unlock()
return err
}
}
136 changes: 136 additions & 0 deletions components/receivers/nccl_fr/lifecycle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// SPDX-License-Identifier: Apache-2.0

package ncclfr

import (
"context"
"errors"
"log/slog"
"sync"
"sync/atomic"
"testing"
"time"
)

// TestLifecycle_StartShutdown pins the happy path: Start spawns the
// supplied run function, Shutdown cancels its ctx, run returns, Shutdown
// returns nil.
func TestLifecycle_StartShutdown(t *testing.T) {
lc := newLifecycle(slog.Default(), nil)
started := make(chan struct{})
stopped := make(chan struct{})

err := lc.Start(context.Background(), func(ctx context.Context) {
close(started)
<-ctx.Done()
close(stopped)
})
if err != nil {
t.Fatalf("Start: %v", err)
}
<-started
if err := lc.Shutdown(context.Background()); err != nil {
t.Fatalf("Shutdown: %v", err)
}
select {
case <-stopped:
case <-time.After(time.Second):
t.Fatal("run did not exit within 1s of Shutdown")
}
}

// TestLifecycle_StartTwiceErrors pins: a second Start without an
// intervening Shutdown returns errLifecycleAlreadyStarted. The sentinel
// MUST be errors.Is-comparable so callers don't string-match.
func TestLifecycle_StartTwiceErrors(t *testing.T) {
lc := newLifecycle(slog.Default(), nil)
if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil {
t.Fatalf("first Start: %v", err)
}
defer func() { _ = lc.Shutdown(context.Background()) }()
err := lc.Start(context.Background(), func(context.Context) {})
if !errors.Is(err, errLifecycleAlreadyStarted) {
t.Errorf("second Start err: got %v, want errLifecycleAlreadyStarted", err)
}
}

// TestLifecycle_ShutdownIdempotent pins: a second Shutdown returns the
// first call's error (typically nil), not "double-close" or panic.
func TestLifecycle_ShutdownIdempotent(t *testing.T) {
lc := newLifecycle(slog.Default(), nil)
if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil {
t.Fatalf("Start: %v", err)
}
if err := lc.Shutdown(context.Background()); err != nil {
t.Fatalf("first Shutdown: %v", err)
}
if err := lc.Shutdown(context.Background()); err != nil {
t.Errorf("second Shutdown: got %v, want nil (idempotent)", err)
}
}

// TestLifecycle_PanicCallbackFires pins: if the run goroutine panics,
// the supplied onPanic callback fires exactly once with the panic value
// and the panic is recovered (the process doesn't crash). This is the
// behaviour nccl_fr depends on to translate panic → IncError(kindPanic).
func TestLifecycle_PanicCallbackFires(t *testing.T) {
var called atomic.Int32
var got any
var mu sync.Mutex
lc := newLifecycle(slog.Default(), func(v any) {
called.Add(1)
mu.Lock()
got = v
mu.Unlock()
})
done := make(chan struct{})
err := lc.Start(context.Background(), func(context.Context) {
defer close(done)
panic("boom")
})
if err != nil {
t.Fatalf("Start: %v", err)
}
<-done
// Give safeRun's deferred recover time to invoke the callback after
// run returns; the WaitGroup decrement happens BEFORE the recover's
// callback in some race orderings.
deadline := time.Now().Add(time.Second)
for time.Now().Before(deadline) && called.Load() == 0 {
time.Sleep(5 * time.Millisecond)
}
if called.Load() != 1 {
t.Errorf("onPanic call count: got %d, want 1", called.Load())
}
mu.Lock()
defer mu.Unlock()
if got != "boom" {
t.Errorf("onPanic payload: got %v, want \"boom\"", got)
}
// Shutdown after panic must not block / error.
if err := lc.Shutdown(context.Background()); err != nil {
t.Errorf("Shutdown after panic: %v", err)
}
}

// TestLifecycle_ShutdownDeadlineReturnsCtxErr pins: a run goroutine
// that ignores ctx cancellation causes Shutdown to return the caller's
// ctx err (wrapping). nccl_fr's run loop honors ctx, so this is purely
// a contract pin so future authors don't silently lose the deadline.
func TestLifecycle_ShutdownDeadlineReturnsCtxErr(t *testing.T) {
lc := newLifecycle(slog.Default(), nil)
leak := make(chan struct{})
if err := lc.Start(context.Background(), func(context.Context) {
<-leak // ignore ctx
}); err != nil {
t.Fatalf("Start: %v", err)
}
t.Cleanup(func() { close(leak) })

shutdownCtx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
err := lc.Shutdown(shutdownCtx)
if !errors.Is(err, context.DeadlineExceeded) {
t.Errorf("Shutdown err: got %v, want wrapped DeadlineExceeded", err)
}
}
26 changes: 12 additions & 14 deletions components/receivers/nccl_fr/nccl_fr.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (

"github.com/tracecoreai/tracecore/internal/consumer"
"github.com/tracecoreai/tracecore/internal/pipeline"
"github.com/tracecoreai/tracecore/internal/runtime/lifecycle"
"github.com/tracecoreai/tracecore/internal/selftelemetry"
frparser "github.com/tracecoreai/tracecore/pkg/nccl/fr_parser"
)

Expand All @@ -42,9 +40,9 @@ type receiver struct {
set pipeline.CreateSettings
cfg *Config
next consumer.Logs
telemetry selftelemetry.Receiver
telemetry selfTelemetry

lc *lifecycle.Lifecycle
lc *lifecycle

mu sync.Mutex
processed map[string]int64 // file path → mtime ns; replays only when mtime advances
Expand All @@ -58,11 +56,11 @@ func newReceiver(set pipeline.CreateSettings, cfg *Config, next consumer.Logs) *
set: set,
cfg: cfg.withDefaults(),
next: next,
telemetry: selftelemetry.NewNoopReceiver(),
telemetry: newNoopSelfTelemetry(),
processed: map[string]int64{},
}
r.lc = lifecycle.New(r.logger(), func(_ any) {
r.telemetry.IncError(selftelemetry.KindPanic)
r.lc = newLifecycle(r.logger(), func(_ any) {
r.telemetry.IncError(kindPanic)
r.telemetry.SetDegraded(true)
})
return r
Expand Down Expand Up @@ -154,7 +152,7 @@ func (r *receiver) scan(ctx context.Context) {
// failures within the streak are silent (the warn rate-limit is
// implicit — we log only on the transition).
func (r *receiver) handleDirError(err error) {
r.telemetry.IncError(selftelemetry.KindEnumerate)
r.telemetry.IncError(kindEnumerate)
r.mu.Lock()
firstFailure := r.firstDirErrAt.IsZero()
if firstFailure {
Expand Down Expand Up @@ -199,7 +197,7 @@ func (r *receiver) handleDirRecovery() {

func (r *receiver) processFile(ctx context.Context, path string, size int64) {
if r.cfg.MaxFileBytes > 0 && size > r.cfg.MaxFileBytes {
r.telemetry.IncError(selftelemetry.KindEnumerate)
r.telemetry.IncError(kindEnumerate)
r.logger().Warn("nccl_fr: file exceeds max_file_bytes; skipping",
"path", path,
"size", size,
Expand All @@ -209,7 +207,7 @@ func (r *receiver) processFile(ctx context.Context, path string, size int64) {
}
raw, err := os.ReadFile(path) //nolint:gosec // dump dir is operator-configured
if err != nil {
r.telemetry.IncError(selftelemetry.KindRead)
r.telemetry.IncError(kindRead)
r.logger().Warn("nccl_fr: read file", "path", path, "err", err)
return
}
Expand All @@ -221,28 +219,28 @@ func (r *receiver) processFile(ctx context.Context, path string, size int64) {
// next mtime bump will trigger a retry.
r.logger().Info("nccl_fr: pickle truncated; retrying when the writer finishes", "path", path, "err", err)
case errors.Is(err, frparser.ErrUnsafeOpcode):
r.telemetry.IncError(selftelemetry.KindParse)
r.telemetry.IncError(kindParse)
r.logger().Error("nccl_fr: unsafe pickle opcode rejected",
"path", path,
"err", err,
"security.event", true)
default:
r.telemetry.IncError(selftelemetry.KindParse)
r.telemetry.IncError(kindParse)
r.logger().Warn("nccl_fr: parse failed", "path", path, "err", err)
}
return
}
records, version, err := frparser.DecodeRecords(v)
if err != nil {
r.telemetry.IncError(selftelemetry.KindParse)
r.telemetry.IncError(kindParse)
r.logger().Warn("nccl_fr: decode records", "path", path, "err", err)
return
}
if len(records) == 0 {
return
}
if err := r.emit(ctx, records, version, path); err != nil {
r.telemetry.IncError(selftelemetry.KindDownstream)
r.telemetry.IncError(kindDownstream)
r.logger().Warn("nccl_fr: emit", "path", path, "err", err)
return
}
Expand Down
Loading
Loading