Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 40 additions & 28 deletions components/receivers/nccl_fr/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,46 @@ import (
"context"
"fmt"

"github.com/tracecoreai/tracecore/internal/consumer"
"github.com/tracecoreai/tracecore/internal/pipeline"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
)

func componentType() pipeline.Type { return pipeline.MustNewType("nccl_fr") }
// componentType is the kind name registered in components.yaml.
// Wrapped in a function so the MustNewType call is not a top-level side
// effect (mirrors the clockreceiver / kernelevents pattern).
func componentType() component.Type { return component.MustNewType("nccl_fr") }

// Factory is the package-scoped ReceiverFactory for nccl_fr.
var Factory pipeline.ReceiverFactory = &factory{}
// stability is the OCB-surfaced stability level for nccl_fr's logs
// signal. Beta tracks "metrics + label shape pinned; behavior may
// evolve" — same level the receiver has carried since the v0.1.x
// internal/pipeline factory; PR-B2 preserves it across the upstream
// swap.
const stability = component.StabilityLevelBeta

// NewFactory returns the package-var Factory. Required by
// tools/components-gen.
func NewFactory() pipeline.ReceiverFactory { return Factory }

type factory struct{}

func (*factory) Type() pipeline.Type { return componentType() }

func (*factory) CreateDefaultConfig() pipeline.Config { return defaultConfig() }

func (*factory) CreateMetrics(_ context.Context, _ pipeline.CreateSettings, _ pipeline.Config, _ consumer.Metrics) (pipeline.Receiver, error) {
return nil, pipeline.ErrSignalNotSupported
// NewFactory returns the upstream receiver.Factory for nccl_fr.
// Mirrors the upstream-contrib pattern (otlpreceiver, filelogreceiver) —
// callers construct via `ncclfr.NewFactory()` rather than a package var,
// so each OCB-stitched pipeline gets a freshly-built factory and the
// package surface stays a single exported symbol.
//
// Only the logs signal returns a real Receiver; metrics + traces
// surface upstream's "signal not supported" via receiver.NewFactory's
// default unimplemented behavior.
func NewFactory() receiver.Factory {
return receiver.NewFactory(
componentType(),
createDefaultConfig,
receiver.WithLogs(createLogs, stability),
)
}

func (*factory) CreateTraces(_ context.Context, _ pipeline.CreateSettings, _ pipeline.Config, _ consumer.Traces) (pipeline.Receiver, error) {
return nil, pipeline.ErrSignalNotSupported
}
// createDefaultConfig matches upstream component.CreateDefaultConfigFunc.
func createDefaultConfig() component.Config { return defaultConfig() }

func (*factory) CreateLogs(ctx context.Context, set pipeline.CreateSettings, cfg pipeline.Config, next consumer.Logs) (pipeline.Receiver, error) {
// createLogs is the receiver.CreateLogsFunc wired by WithLogs.
func createLogs(ctx context.Context, set receiver.Settings, cfg component.Config, next consumer.Logs) (receiver.Logs, error) {
c, ok := cfg.(*Config)
if !ok {
return nil, fmt.Errorf("nccl_fr: unexpected config type %T", cfg)
Expand All @@ -42,18 +54,18 @@ func (*factory) CreateLogs(ctx context.Context, set pipeline.CreateSettings, cfg
return nil, fmt.Errorf("nccl_fr: %w", err)
}
r := newReceiver(set, c, next)
if set.Telemetry.MeterProvider != nil {
if rt, err := newSelfTelemetry(set.ID, set.Telemetry.MeterProvider); err == nil {
if set.MeterProvider != nil {
if rt, err := newSelfTelemetry(set.ID, set.MeterProvider); err == nil {
r.telemetry = rt
} else {
recordInitError(ctx, set.Telemetry.MeterProvider,
recordInitError(ctx, set.MeterProvider,
"receiver", set.ID.String(), reasonInstrumentRegister)
if set.Telemetry.Logger != nil {
set.Telemetry.Logger.Warn("nccl_fr self-telemetry init failed; using noop", "err", err)
if set.Logger != nil {
set.Logger.Warn("nccl_fr self-telemetry init failed; using noop", zap.Error(err))
}
}
} else if set.Telemetry.Logger != nil {
set.Telemetry.Logger.Warn("nccl_fr: no MeterProvider; self-telemetry using noop")
} else if set.Logger != nil {
set.Logger.Warn("nccl_fr: no MeterProvider; self-telemetry using noop")
}
return r, nil
}
15 changes: 8 additions & 7 deletions components/receivers/nccl_fr/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"runtime"
"sync"
"sync/atomic"

"go.uber.org/zap"
)

// errLifecycleAlreadyStarted is returned by lifecycle.Start when called
Expand All @@ -38,7 +39,7 @@ type panicCallback func(panicValue any)
// caller-ctx deadline) is stashed + returned by every subsequent
// Shutdown so deadline failures aren't silently swallowed.
type lifecycle struct {
logger *slog.Logger
logger *zap.Logger
onPanic panicCallback

mu sync.Mutex
Expand All @@ -50,11 +51,11 @@ type lifecycle struct {
}

// newLifecycle constructs a lifecycle. logger may be nil (replaced with
// slog.Default for tests). onPanic may be nil — panics are still
// zap.NewNop for tests). onPanic may be nil — panics are still
// recovered + logged at ERROR but no callback fires.
func newLifecycle(logger *slog.Logger, onPanic panicCallback) *lifecycle {
func newLifecycle(logger *zap.Logger, onPanic panicCallback) *lifecycle {
if logger == nil {
logger = slog.Default()
logger = zap.NewNop()
}
return &lifecycle{logger: logger, onPanic: onPanic}
}
Expand Down Expand Up @@ -87,7 +88,7 @@ func (l *lifecycle) safeRun(ctx context.Context, run func(context.Context)) {
defer l.wg.Done()
defer func() {
if rec := recover(); rec != nil {
l.logger.Error("nccl_fr lifecycle: run panic recovered", "panic", fmt.Sprintf("%v", rec))
l.logger.Error("nccl_fr lifecycle: run panic recovered", zap.String("panic", fmt.Sprintf("%v", rec)))
if l.onPanic != nil {
l.onPanic(rec)
}
Expand Down Expand Up @@ -129,7 +130,7 @@ func (l *lifecycle) Shutdown(ctx context.Context) error {
// it here lets operators eyeball whether the leak is plausibly
// ours.
l.logger.Warn("nccl_fr lifecycle: shutdown deadline elapsed before goroutine exited",
"process_goroutines", runtime.NumGoroutine())
zap.Int("process_goroutines", runtime.NumGoroutine()))
err := fmt.Errorf("nccl_fr lifecycle shutdown: %w", ctx.Err())
l.mu.Lock()
l.shutdownErr = err
Expand Down
13 changes: 7 additions & 6 deletions components/receivers/nccl_fr/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@ package ncclfr
import (
"context"
"errors"
"log/slog"
"sync"
"sync/atomic"
"testing"
"time"

"go.uber.org/zap"
)

// TestLifecycle_StartShutdown pins the happy path: Start spawns the
// supplied run function, Shutdown cancels its ctx, run returns, Shutdown
// returns nil.
func TestLifecycle_StartShutdown(t *testing.T) {
lc := newLifecycle(slog.Default(), nil)
lc := newLifecycle(zap.NewNop(), nil)
started := make(chan struct{})
stopped := make(chan struct{})

Expand All @@ -43,7 +44,7 @@ func TestLifecycle_StartShutdown(t *testing.T) {
// intervening Shutdown returns errLifecycleAlreadyStarted. The sentinel
// MUST be errors.Is-comparable so callers don't string-match.
func TestLifecycle_StartTwiceErrors(t *testing.T) {
lc := newLifecycle(slog.Default(), nil)
lc := newLifecycle(zap.NewNop(), nil)
if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil {
t.Fatalf("first Start: %v", err)
}
Expand All @@ -57,7 +58,7 @@ func TestLifecycle_StartTwiceErrors(t *testing.T) {
// TestLifecycle_ShutdownIdempotent pins: a second Shutdown returns the
// first call's error (typically nil), not "double-close" or panic.
func TestLifecycle_ShutdownIdempotent(t *testing.T) {
lc := newLifecycle(slog.Default(), nil)
lc := newLifecycle(zap.NewNop(), nil)
if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil {
t.Fatalf("Start: %v", err)
}
Expand All @@ -77,7 +78,7 @@ func TestLifecycle_PanicCallbackFires(t *testing.T) {
var called atomic.Int32
var got any
var mu sync.Mutex
lc := newLifecycle(slog.Default(), func(v any) {
lc := newLifecycle(zap.NewNop(), func(v any) {
called.Add(1)
mu.Lock()
got = v
Expand Down Expand Up @@ -118,7 +119,7 @@ func TestLifecycle_PanicCallbackFires(t *testing.T) {
// ctx err (wrapping). nccl_fr's run loop honors ctx, so this is purely
// a contract pin so future authors don't silently lose the deadline.
func TestLifecycle_ShutdownDeadlineReturnsCtxErr(t *testing.T) {
lc := newLifecycle(slog.Default(), nil)
lc := newLifecycle(zap.NewNop(), nil)
leak := make(chan struct{})
if err := lc.Start(context.Background(), func(context.Context) {
<-leak // ignore ctx
Expand Down
Loading
Loading