From fad3bc39c8074eb4e0be1feea94c9d305f16e90c Mon Sep 17 00:00:00 2001 From: Tri Lam Date: Sun, 31 May 2026 00:51:04 -0700 Subject: [PATCH] feat(pivot): port containerstdout off internal pipeline+consumer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ports `components/receivers/containerstdout` off the v0.1.x internal facades (`internal/pipeline`, `internal/consumer`) onto upstream `go.opentelemetry.io/collector/{component,receiver,consumer}` v1.59.0 — follow-up to PR-F #197 (selftel + lifecycle port) and mirrors PR-B2 #201 (nccl_fr same swap). After this PR the receiver has zero `internal/*` imports, clearing the PR-I.1 submodule-extraction gate. Type-swap (per PR-B2 #201 reference table): internal/pipeline.Type → component.Type internal/pipeline.ReceiverFactory → receiver.Factory internal/pipeline.CreateSettings → receiver.Settings internal/pipeline.Config → component.Config (any) internal/pipeline.Receiver → receiver.Logs internal/pipeline.Host → component.Host internal/pipeline.ID → component.ID internal/consumer.Logs → consumer.Logs *slog.Logger → *zap.Logger Factory is now `receiver.NewFactory(componentType, createDefaultConfig, receiver.WithLogs(createLogs, component.StabilityLevelBeta))` — stability level preserved across the swap so OCB-surfaced metadata doesn't regress. The hand-rolled `Factory` package-var + `type factory struct{}` are deleted; each OCB-stitched pipeline gets a freshly-built factory via `containerstdout.NewFactory()`. The `CreateMetrics` / `CreateTraces` sentinel methods are no longer needed — receiver.NewFactory's default unimplemented behavior surfaces "signal not supported" naturally, so the two sentinel tests are removed. Receiver + noopReceiver no longer embed `pipeline.ComponentState` (upstream `component.Component` carries no equivalent Started/Stopped mixin; the runtime never read that bookkeeping on the upstream graph). Lifecycle bookkeeping the receiver actually needs lives in the in-package `lifecycle` helper added in PR-F #197. Logger swapped from `*slog.Logger` → upstream's `*zap.Logger` (the type carried in `receiver.Settings.Logger`). All log call sites converted to `zap.String/Int64/Float64/Bool/Error` fields; log messages and field names are byte-for-byte preserved so operator alerting on log content does not regress. Internal lifecycle helper, tailer, informer, pipeline, and receiver all converted in lockstep. Tests swap `pipelinetest.New(t)` → `receivertest.NewNopSettings(componentType())` + `componenttest.NewNopHost()`. A package-local `testSettings()` helper pins the ID to `containerstdout/test` so selftel label assertions stay deterministic; mirrors the sibling PR-B2 nccl_fr pattern. Hard gate (PR-I.1 submodule extraction): $ grep -rn 'internal/pipeline\|internal/consumer\|internal/runtime/lifecycle\|internal/selftelemetry' components/receivers/containerstdout/*.go (no matches) Comment-only historical references remain in factory.go, receiver.go, noop_receiver.go, selftel.go, kind_test.go, selftel_test.go documenting the v0.1.x → v0.2.0 migration; they are not imports. Compatibility note: `go.mod` promotes `go.opentelemetry.io/collector/component/componenttest` from indirect to direct (used by receiver_test.go + pipeline_test.go for componenttest.NewNopHost()). No transitive-dep churn beyond that. Test plan: - `make check` — gofumpt clean, golangci-lint 0 issues, vet clean, go.mod verified - `go test -race ./components/receivers/containerstdout/... -count=10` — all tests green under race, including stress runs of TestTailer_RotationStalledKind and TestContainerstdout_Lifecycle_ConcurrentAddDuringShutdown_NoPanic - `go test ./...` — full repo green except the pre-existing TestK8sevents_Lifecycle_ConcurrentAddDuringShutdown_NoPanic race-window flake (passes on retry; documented in PR-B2 #201 + PR-F #197 bodies) Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Tri Lam --- .../receivers/containerstdout/factory.go | 98 +++++++++---------- .../receivers/containerstdout/factory_test.go | 91 ++++++----------- .../failure_modes_internal_test.go | 9 +- .../receivers/containerstdout/informer.go | 12 +-- .../containerstdout/informer_test.go | 11 +-- .../receivers/containerstdout/lifecycle.go | 17 ++-- .../containerstdout/lifecycle_test.go | 23 ++--- .../containerstdout/noop_receiver.go | 40 ++++---- .../receivers/containerstdout/pipeline.go | 13 +-- .../containerstdout/pipeline_test.go | 10 +- .../receivers/containerstdout/receiver.go | 86 ++++++++-------- .../containerstdout/receiver_test.go | 45 ++++++--- .../receivers/containerstdout/selftel.go | 5 +- .../receivers/containerstdout/selftel_test.go | 7 +- .../receivers/containerstdout/tailer.go | 39 ++++---- go.mod | 2 +- 16 files changed, 244 insertions(+), 264 deletions(-) diff --git a/components/receivers/containerstdout/factory.go b/components/receivers/containerstdout/factory.go index f7089bde..e05ab895 100644 --- a/components/receivers/containerstdout/factory.go +++ b/components/receivers/containerstdout/factory.go @@ -6,8 +6,10 @@ import ( "context" "fmt" - "github.com/tracecoreai/tracecore/internal/consumer" - "github.com/tracecoreai/tracecore/internal/pipeline" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" ) // ComponentType is the canonical receiver-factory ID. Centralized so @@ -17,59 +19,55 @@ import ( // drift between this constant and the YAML key. const ComponentType = "containerstdout" -// componentType is wrapped in a function so the pipeline.MustNewType -// call is not a top-level side effect - defensive habit (the literal -// is regex-safe but the pattern is reusable; mirrors kernelevents + -// clockreceiver). -func componentType() pipeline.Type { return pipeline.MustNewType(ComponentType) } +// componentType is wrapped in a function so the component.MustNewType +// call is not a top-level side effect — defensive habit (the literal +// is regex-safe but the pattern is reusable; mirrors nccl_fr + +// clockreceiver after the PR-B2/PR-F.2 upstream port). +func componentType() component.Type { return component.MustNewType(ComponentType) } -// Factory is the package-scoped ReceiverFactory for containerstdout. -// Only CreateLogs returns a real Receiver; CreateMetrics and -// CreateTraces return pipeline.ErrSignalNotSupported. Mirrors the -// kernelevents + k8sevents shape - M15 receiver authors copy this -// pattern. -var Factory pipeline.ReceiverFactory = &factory{} +// stability is the OCB-surfaced stability level for containerstdout's +// logs signal. Beta tracks "metrics + label shape pinned; behavior may +// evolve" — same level the receiver has carried since the v0.1.x +// internal/pipeline factory; the PR-F.2 upstream swap preserves it so +// OCB-surfaced metadata doesn't regress. +const stability = component.StabilityLevelBeta -// NewFactory returns the package-var Factory. Required by -// tools/components-gen, which generates calls like -// `containerstdout.NewFactory()` against the codegen-emitted -// cmd/tracecore/components.go. -func NewFactory() pipeline.ReceiverFactory { return Factory } - -type factory struct{} - -func (*factory) Type() pipeline.Type { return componentType() } - -func (*factory) CreateDefaultConfig() pipeline.Config { return defaultConfig() } - -func (*factory) CreateMetrics(_ context.Context, _ pipeline.CreateSettings, _ pipeline.Config, _ consumer.Metrics) (pipeline.Receiver, error) { - return nil, pipeline.ErrSignalNotSupported +// NewFactory returns the upstream receiver.Factory for containerstdout. +// Mirrors the upstream-contrib pattern (otlpreceiver, filelogreceiver) +// and the sibling PR-B2 nccl_fr port — callers construct via +// `containerstdout.NewFactory()` rather than a package var, so each +// OCB-stitched pipeline gets a freshly-built factory and the package +// surface stays a single exported symbol. +// +// Only the logs signal returns a real Receiver; metrics + traces +// surface upstream's "signal not supported" naturally via +// receiver.NewFactory's default unimplemented behavior (no need for a +// hand-rolled sentinel like the v0.1.x `pipeline.ErrSignalNotSupported`). +func NewFactory() receiver.Factory { + return receiver.NewFactory( + componentType(), + createDefaultConfig, + receiver.WithLogs(createLogs, stability), + ) } -func (*factory) CreateTraces(_ context.Context, _ pipeline.CreateSettings, _ pipeline.Config, _ consumer.Traces) (pipeline.Receiver, error) { - return nil, pipeline.ErrSignalNotSupported -} +// createDefaultConfig matches upstream component.CreateDefaultConfigFunc. +func createDefaultConfig() component.Config { return defaultConfig() } -// CreateLogs wires the containerstdout receiver. The type-assertion -// guard returns a typed error rather than a panic so config-loader -// regressions surface as a clean exit-2. +// createLogs is the receiver.CreateLogsFunc wired by WithLogs. // // When cfg.Enabled is false the factory returns a no-op Receiver // (newNoopReceiver) so operators who leave the receiver off pay no // runtime cost beyond the type-resolution. When enabled the factory // validates the config (Validate short-circuits on Enabled=false, so // the call here only runs on the enabled branch) and constructs the -// real receiver stub - Phase 12 will replace newReceiver's -// Start/Shutdown body with the lifecycle wiring; this phase only -// has to satisfy the pipeline.Receiver contract. +// real receiver. // -// Self-telemetry follows the kernelevents sibling pattern: the noop -// default keeps the hot path nil-safe; the real Telemetry is wired -// when set.Telemetry.MeterProvider is non-nil; init failures are -// surfaced via recordInitError so operators see when they're running -// silent. RFC-0013 PR-F replaces the v0.1.x dependency on -// internal/selftelemetry with the receiver-scoped sibling here. -func (*factory) CreateLogs(ctx context.Context, set pipeline.CreateSettings, cfg pipeline.Config, next consumer.Logs) (pipeline.Receiver, error) { +// Self-telemetry follows the kernelevents/nccl_fr sibling pattern: +// the noop default keeps the hot path nil-safe; the real Telemetry is +// wired when set.MeterProvider is non-nil; init failures are surfaced +// via recordInitError so operators see when they're running silent. +func createLogs(ctx context.Context, set receiver.Settings, cfg component.Config, next consumer.Logs) (receiver.Logs, error) { c, ok := cfg.(*Config) if !ok { return nil, fmt.Errorf("containerstdout: unexpected config type %T", cfg) @@ -79,18 +77,18 @@ func (*factory) CreateLogs(ctx context.Context, set pipeline.CreateSettings, cfg } telemetry := newNoopTelemetry() - if set.Telemetry.MeterProvider != nil { - if rt, err := newTelemetry(set.ID, set.Telemetry.MeterProvider); err == nil { + if set.MeterProvider != nil { + if rt, err := newTelemetry(set.ID, set.MeterProvider); err == nil { telemetry = rt } else { - recordInitError(ctx, set.Telemetry.MeterProvider, + recordInitError(ctx, set.MeterProvider, "receiver", set.ID.String(), reasonInstrumentRegister) - if set.Telemetry.Logger != nil { - set.Telemetry.Logger.Warn("containerstdout self-telemetry init failed; using noop", "err", err) + if set.Logger != nil { + set.Logger.Warn("containerstdout self-telemetry init failed; using noop", zap.Error(err)) } } - } else if set.Telemetry.Logger != nil { - set.Telemetry.Logger.Warn("containerstdout: no MeterProvider; self-telemetry using noop") + } else if set.Logger != nil { + set.Logger.Warn("containerstdout: no MeterProvider; self-telemetry using noop") } if !c.Enabled { diff --git a/components/receivers/containerstdout/factory_test.go b/components/receivers/containerstdout/factory_test.go index 9c8c2f20..30cc5ace 100644 --- a/components/receivers/containerstdout/factory_test.go +++ b/components/receivers/containerstdout/factory_test.go @@ -7,12 +7,13 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/receiver/receivertest" "github.com/tracecoreai/tracecore/components/receivers/containerstdout" - "github.com/tracecoreai/tracecore/internal/consumer" - "github.com/tracecoreai/tracecore/internal/pipeline" - "github.com/tracecoreai/tracecore/internal/pipeline/pipelinetest" ) // nopLogsConsumer is a test-only consumer.Logs that swallows pushes. @@ -23,34 +24,32 @@ func (nopLogsConsumer) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } -// wrongConfig is a pipeline.Config that is NOT *containerstdout.Config - +// wrongConfig is a component.Config that is NOT *containerstdout.Config — // used to exercise the factory's type-assertion guard. type wrongConfig struct{} -func (*wrongConfig) Validate() error { return nil } - // TestFactory_Type pins the registered component-type literal. The // string is operator-facing (YAML key) and load-bearing for // components.yaml registration; renames go through an RFC, not a // silent string drift. func TestFactory_Type(t *testing.T) { t.Parallel() - require.Equal(t, "containerstdout", containerstdout.Factory.Type().String()) + require.Equal(t, "containerstdout", containerstdout.NewFactory().Type().String()) } // TestFactory_CreateDefaultConfig pins that the factory returns // the same *Config shape that the package-private defaultConfig() -// would - equal by value, not just type-asserted. Catches drift +// would — equal by value, not just type-asserted. Catches drift // between the factory and the canonical defaults. func TestFactory_CreateDefaultConfig(t *testing.T) { t.Parallel() - cfg := containerstdout.Factory.CreateDefaultConfig() + cfg := containerstdout.NewFactory().CreateDefaultConfig() require.IsType(t, &containerstdout.Config{}, cfg) c, ok := cfg.(*containerstdout.Config) require.True(t, ok, "CreateDefaultConfig must return *Config") // Disabled by default (alpha-stability surface). require.False(t, c.Enabled, "default Enabled must be false (alpha; opt-in)") - // Validate must accept the default - operators who deploy with + // Validate must accept the default — operators who deploy with // `containerstdout: {}` (defaults only) must not get a // config-load error. require.NoError(t, c.Validate(), "default config must validate") @@ -58,24 +57,25 @@ func TestFactory_CreateDefaultConfig(t *testing.T) { // TestFactory_CreateLogs_DisabledShortCircuit pins the disabled // short-circuit: when Enabled=false the factory returns a real -// pipeline.Receiver whose Start/Shutdown are no-ops returning nil. +// receiver.Logs whose Start/Shutdown are no-ops returning nil. // Operators who leave the receiver off must not pay any runtime // cost beyond the type-resolution. func TestFactory_CreateLogs_DisabledShortCircuit(t *testing.T) { t.Parallel() - fx := pipelinetest.New(t) - cfg, ok := containerstdout.Factory.CreateDefaultConfig().(*containerstdout.Config) + factory := containerstdout.NewFactory() + set := receivertest.NewNopSettings(component.MustNewType(containerstdout.ComponentType)) + cfg, ok := factory.CreateDefaultConfig().(*containerstdout.Config) require.True(t, ok, "default config must be *Config") - // defaultConfig() leaves Enabled=false - assert it explicitly so + // defaultConfig() leaves Enabled=false — assert it explicitly so // a future default flip would surface here loud. require.False(t, cfg.Enabled, "test precondition: default must be disabled") - rcv, err := containerstdout.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, nopLogsConsumer{}) + rcv, err := factory.CreateLogs(t.Context(), set, cfg, nopLogsConsumer{}) require.NoError(t, err) require.NotNil(t, rcv) // Start + Shutdown must succeed without side effects. - require.NoError(t, rcv.Start(context.Background(), fx.Host)) + require.NoError(t, rcv.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, rcv.Shutdown(context.Background())) } @@ -86,13 +86,14 @@ func TestFactory_CreateLogs_DisabledShortCircuit(t *testing.T) { // short-circuits to nil per RFC-0010 §Configuration surface). func TestFactory_CreateLogs_RejectsInvalidConfig(t *testing.T) { t.Parallel() - fx := pipelinetest.New(t) - cfg, ok := containerstdout.Factory.CreateDefaultConfig().(*containerstdout.Config) + factory := containerstdout.NewFactory() + set := receivertest.NewNopSettings(component.MustNewType(containerstdout.ComponentType)) + cfg, ok := factory.CreateDefaultConfig().(*containerstdout.Config) require.True(t, ok, "default config must be *Config") cfg.Enabled = true cfg.RankSource = "not-a-real-source" // fails validateRankSource - rcv, err := containerstdout.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, nopLogsConsumer{}) + rcv, err := factory.CreateLogs(t.Context(), set, cfg, nopLogsConsumer{}) require.Nil(t, rcv) require.Error(t, err) require.ErrorContains(t, err, "containerstdout") @@ -102,15 +103,10 @@ func TestFactory_CreateLogs_RejectsInvalidConfig(t *testing.T) { // TestFactory_CreateLogs_WiresSelftelemetry pins the self-telemetry // wiring contract: a valid enabled config wired with a real // MeterProvider (the test fixture's noop MeterProvider counts as -// "real" for this contract - it satisfies the newTelemetry signature) +// "real" for this contract — it satisfies the newTelemetry signature) // produces a Receiver. The factory must not return nil or error on a // happy-path enabled config. // -// The factory wires a non-noop containerstdout Telemetry when -// set.Telemetry.MeterProvider is non-nil and the noop default -// otherwise - both branches must yield a non-nil internal field so -// hot-path callers don't nil-check. -// // Phase 12 added real Start prerequisites (NODE_NAME + reachable // in-cluster k8s client); the wiring proxy is now "CreateLogs returns // a non-nil Receiver and Start fails with a clean wrapped error @@ -126,19 +122,20 @@ func TestFactory_CreateLogs_WiresSelftelemetry(t *testing.T) { // missing-env error rather than an in-cluster-config failure that // depends on the host's filesystem state. t.Setenv("NODE_NAME", "") - fx := pipelinetest.New(t) - cfg, ok := containerstdout.Factory.CreateDefaultConfig().(*containerstdout.Config) + factory := containerstdout.NewFactory() + set := receivertest.NewNopSettings(component.MustNewType(containerstdout.ComponentType)) + cfg, ok := factory.CreateDefaultConfig().(*containerstdout.Config) require.True(t, ok, "default config must be *Config") cfg.Enabled = true // happy path - rcv, err := containerstdout.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, nopLogsConsumer{}) + rcv, err := factory.CreateLogs(t.Context(), set, cfg, nopLogsConsumer{}) require.NoError(t, err) require.NotNil(t, rcv) // Start must fail cleanly (NODE_NAME missing); the wrapped error - // confirms the receiver reached Start without nil-deref panics - + // confirms the receiver reached Start without nil-deref panics — // the M2 telemetry-wiring proxy. - startErr := rcv.Start(context.Background(), fx.Host) + startErr := rcv.Start(context.Background(), componenttest.NewNopHost()) require.Error(t, startErr) require.ErrorContains(t, startErr, "NODE_NAME") // Shutdown must remain idempotent even after a failed Start. @@ -150,39 +147,11 @@ func TestFactory_CreateLogs_WiresSelftelemetry(t *testing.T) { // operator-facing component name + "config type", not a panic. func TestFactory_CreateLogs_RejectsWrongConfigType(t *testing.T) { t.Parallel() - fx := pipelinetest.New(t) - rcv, err := containerstdout.Factory.CreateLogs(t.Context(), fx.CreateSettings, &wrongConfig{}, nopLogsConsumer{}) + factory := containerstdout.NewFactory() + set := receivertest.NewNopSettings(component.MustNewType(containerstdout.ComponentType)) + rcv, err := factory.CreateLogs(t.Context(), set, &wrongConfig{}, nopLogsConsumer{}) require.Nil(t, rcv) require.Error(t, err) require.ErrorContains(t, err, "containerstdout") require.ErrorContains(t, err, "config") } - -// TestFactory_CreateMetrics_Unsupported and TestFactory_CreateTraces_Unsupported -// pin that the logs-only factory returns the sentinel for the two -// unsupported signals - operators get a clear "signal not supported" -// rather than a panic-on-nil-receiver downstream. -func TestFactory_CreateMetrics_Unsupported(t *testing.T) { - t.Parallel() - fx := pipelinetest.New(t) - rcv, err := containerstdout.Factory.CreateMetrics(t.Context(), fx.CreateSettings, containerstdout.Factory.CreateDefaultConfig(), nil) - require.Nil(t, rcv) - require.ErrorIs(t, err, pipeline.ErrSignalNotSupported) -} - -func TestFactory_CreateTraces_Unsupported(t *testing.T) { - t.Parallel() - fx := pipelinetest.New(t) - rcv, err := containerstdout.Factory.CreateTraces(t.Context(), fx.CreateSettings, containerstdout.Factory.CreateDefaultConfig(), nil) - require.Nil(t, rcv) - require.ErrorIs(t, err, pipeline.ErrSignalNotSupported) -} - -// TestNewFactory_ReturnsPackageVarFactory pins that NewFactory and -// the package-var Factory are the same value. tools/components-gen -// emits `containerstdout.NewFactory()` against the generated -// components.go. -func TestNewFactory_ReturnsPackageVarFactory(t *testing.T) { - t.Parallel() - require.Same(t, containerstdout.Factory, containerstdout.NewFactory()) -} diff --git a/components/receivers/containerstdout/failure_modes_internal_test.go b/components/receivers/containerstdout/failure_modes_internal_test.go index 965e7da5..89f9bd6c 100644 --- a/components/receivers/containerstdout/failure_modes_internal_test.go +++ b/components/receivers/containerstdout/failure_modes_internal_test.go @@ -18,8 +18,6 @@ import ( "context" "errors" "fmt" - "io" - "log/slog" "os" "path/filepath" "reflect" @@ -30,6 +28,7 @@ import ( "time" "github.com/stretchr/testify/require" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -48,11 +47,11 @@ func (failureModesInternalStubSource) Lookup(_ PathInfo) (AttributionRef, error) return AttributionRef{Found: false}, nil } -// failureModesTestLogger returns a discard-output slog logger so the +// failureModesTestLogger returns a discard-output zap logger so the // informer-flap test's reflector spam doesn't pollute `go test -v` // output. -func failureModesTestLogger() *slog.Logger { - return slog.New(slog.NewTextHandler(io.Discard, nil)) +func failureModesTestLogger() *zap.Logger { + return zap.NewNop() } // TestFailure_AttributionCardinalityOverflow falsifier: insert 5 diff --git a/components/receivers/containerstdout/informer.go b/components/receivers/containerstdout/informer.go index 75b0ca7a..c7adabaa 100644 --- a/components/receivers/containerstdout/informer.go +++ b/components/receivers/containerstdout/informer.go @@ -6,10 +6,10 @@ import ( "context" "errors" "fmt" - "log/slog" "sync/atomic" "time" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -60,7 +60,7 @@ type PodInformer struct { nodeName string namespaces map[string]struct{} // empty = all telemetry Telemetry - logger *slog.Logger + logger *zap.Logger informer cache.SharedIndexInformer indexer cache.Indexer @@ -97,7 +97,7 @@ func NewPodInformer( nodeName string, namespaces []string, telemetry Telemetry, - logger *slog.Logger, + logger *zap.Logger, ) (*PodInformer, error) { if client == nil { return nil, errors.New("containerstdout: PodInformer requires a non-nil k8s client") @@ -161,7 +161,7 @@ func NewPodInformer( // continue; the receiver-level degraded signal will lag a // reconnect window in the unlikely failure case. logger.Warn("containerstdout: SetWatchErrorHandler returned error; watch-degraded reporting may be silent", - "err", err.Error()) + zap.Error(err)) } return p, nil @@ -273,8 +273,8 @@ func (p *PodInformer) onWatchError(_ *cache.Reflector, err error) { p.telemetry.IncError(KindWatch) } p.logger.Warn("containerstdout: pod informer watch error", - "err", err.Error(), - "node", p.nodeName) + zap.Error(err), + zap.String("node", p.nodeName)) } // filteredIndexer wraps a cache.Indexer to expose only objects that diff --git a/components/receivers/containerstdout/informer_test.go b/components/receivers/containerstdout/informer_test.go index f76853a3..e18dfac2 100644 --- a/components/receivers/containerstdout/informer_test.go +++ b/components/receivers/containerstdout/informer_test.go @@ -4,22 +4,21 @@ package containerstdout import ( "context" - "io" - "log/slog" "testing" "time" "github.com/stretchr/testify/require" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/fake" ) -// testLogger is a discard-output slog handler so the test binary -// doesn't spam stdout with informer log lines. -func testLogger() *slog.Logger { - return slog.New(slog.NewTextHandler(io.Discard, nil)) +// testLogger is a discard-output zap logger so the test binary doesn't +// spam stdout with informer log lines. +func testLogger() *zap.Logger { + return zap.NewNop() } // podOnNode builds a minimal Pod for the fake client. The node diff --git a/components/receivers/containerstdout/lifecycle.go b/components/receivers/containerstdout/lifecycle.go index 544023ac..24060e50 100644 --- a/components/receivers/containerstdout/lifecycle.go +++ b/components/receivers/containerstdout/lifecycle.go @@ -21,10 +21,11 @@ import ( "context" "errors" "fmt" - "log/slog" "runtime" "sync" "sync/atomic" + + "go.uber.org/zap" ) // errLifecycleAlreadyStarted is returned by lifecycle.Start when called @@ -45,7 +46,7 @@ type panicCallback func(panicValue any) // caller-ctx deadline) is stashed + returned by every subsequent // Shutdown so deadline failures aren't silently swallowed. type lifecycle struct { - logger *slog.Logger + logger *zap.Logger onPanic panicCallback mu sync.Mutex @@ -58,11 +59,11 @@ type lifecycle struct { } // newLifecycle constructs a lifecycle. logger may be nil (replaced with -// slog.Default for tests). onPanic may be nil — panics are still +// zap.NewNop for tests). onPanic may be nil — panics are still // recovered + logged at ERROR but no callback fires. -func newLifecycle(logger *slog.Logger, onPanic panicCallback) *lifecycle { +func newLifecycle(logger *zap.Logger, onPanic panicCallback) *lifecycle { if logger == nil { - logger = slog.Default() + logger = zap.NewNop() } return &lifecycle{logger: logger, onPanic: onPanic} } @@ -97,7 +98,7 @@ func (l *lifecycle) safeRun(ctx context.Context, run func(context.Context)) { defer l.wg.Done() defer func() { if rec := recover(); rec != nil { - l.logger.Error("containerstdout lifecycle: run panic recovered", "panic", fmt.Sprintf("%v", rec)) + l.logger.Error("containerstdout lifecycle: run panic recovered", zap.String("panic", fmt.Sprintf("%v", rec))) if l.onPanic != nil { l.onPanic(rec) } @@ -140,7 +141,7 @@ func (l *lifecycle) Shutdown(ctx context.Context) error { // it here lets operators eyeball whether the leak is plausibly // ours. l.logger.Warn("containerstdout lifecycle: shutdown deadline elapsed before goroutine exited", - "process_goroutines", runtime.NumGoroutine()) + zap.Int("process_goroutines", runtime.NumGoroutine())) err := fmt.Errorf("containerstdout lifecycle shutdown: %w", ctx.Err()) l.mu.Lock() l.shutdownErr = err @@ -171,7 +172,7 @@ func (l *lifecycle) Add(run func(context.Context)) { l.mu.Unlock() // Log so future authors don't hit silent-refusal traps. l.logger.Warn("containerstdout lifecycle.Add called outside running window — ignored", - "closed", l.closed, "started", l.internalCtx != nil) + zap.Bool("closed", l.closed), zap.Bool("started", l.internalCtx != nil)) return } ctx := l.internalCtx diff --git a/components/receivers/containerstdout/lifecycle_test.go b/components/receivers/containerstdout/lifecycle_test.go index 52242051..8e9560bc 100644 --- a/components/receivers/containerstdout/lifecycle_test.go +++ b/components/receivers/containerstdout/lifecycle_test.go @@ -5,19 +5,20 @@ package containerstdout import ( "context" "errors" - "log/slog" "runtime" "sync" "sync/atomic" "testing" "time" + + "go.uber.org/zap" ) // TestContainerstdout_Lifecycle_StartShutdown pins the happy path: // Start spawns the supplied run function, Shutdown cancels its ctx, // run returns, Shutdown returns nil. func TestContainerstdout_Lifecycle_StartShutdown(t *testing.T) { - lc := newLifecycle(slog.Default(), nil) + lc := newLifecycle(zap.NewNop(), nil) started := make(chan struct{}) stopped := make(chan struct{}) @@ -45,7 +46,7 @@ func TestContainerstdout_Lifecycle_StartShutdown(t *testing.T) { // The sentinel MUST be errors.Is-comparable so callers don't // string-match. func TestContainerstdout_Lifecycle_StartTwiceErrors(t *testing.T) { - lc := newLifecycle(slog.Default(), nil) + lc := newLifecycle(zap.NewNop(), nil) if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil { t.Fatalf("first Start: %v", err) } @@ -60,7 +61,7 @@ func TestContainerstdout_Lifecycle_StartTwiceErrors(t *testing.T) { // Shutdown returns the first call's error (typically nil), not // "double-close" or panic. func TestContainerstdout_Lifecycle_ShutdownIdempotent(t *testing.T) { - lc := newLifecycle(slog.Default(), nil) + lc := newLifecycle(zap.NewNop(), nil) if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil { t.Fatalf("Start: %v", err) } @@ -81,7 +82,7 @@ func TestContainerstdout_Lifecycle_PanicCallbackFires(t *testing.T) { var called atomic.Int32 var got any var mu sync.Mutex - lc := newLifecycle(slog.Default(), func(v any) { + lc := newLifecycle(zap.NewNop(), func(v any) { called.Add(1) mu.Lock() got = v @@ -123,7 +124,7 @@ func TestContainerstdout_Lifecycle_PanicCallbackFires(t *testing.T) { // honor ctx, so this is purely a contract pin so future authors don't // silently lose the deadline. func TestContainerstdout_Lifecycle_ShutdownDeadlineReturnsCtxErr(t *testing.T) { - lc := newLifecycle(slog.Default(), nil) + lc := newLifecycle(zap.NewNop(), nil) leak := make(chan struct{}) if err := lc.Start(context.Background(), func(context.Context) { <-leak // ignore ctx @@ -149,7 +150,7 @@ func TestContainerstdout_Lifecycle_ShutdownDeadlineReturnsCtxErr(t *testing.T) { // goroutine has observed ctx.Done. Tailers + informer + health/eviction // loops all rely on this. func TestContainerstdout_Lifecycle_AddRegistersUnderSameWaitGroup(t *testing.T) { - lc := newLifecycle(slog.Default(), nil) + lc := newLifecycle(zap.NewNop(), nil) if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil { t.Fatalf("Start: %v", err) } @@ -183,7 +184,7 @@ func TestContainerstdout_Lifecycle_AddPanicFiresCallback(t *testing.T) { var called atomic.Int32 var got any var mu sync.Mutex - lc := newLifecycle(slog.Default(), func(v any) { + lc := newLifecycle(zap.NewNop(), func(v any) { called.Add(1) mu.Lock() got = v @@ -220,7 +221,7 @@ func TestContainerstdout_Lifecycle_AddPanicFiresCallback(t *testing.T) { // Start would spawn a goroutine with no cancel hookup — a leak — so // the helper refuses. func TestContainerstdout_Lifecycle_AddBeforeStartIsNoop(t *testing.T) { - lc := newLifecycle(slog.Default(), nil) + lc := newLifecycle(zap.NewNop(), nil) var spawned atomic.Bool lc.Add(func(context.Context) { spawned.Store(true) }) // Give a generous window for any (incorrectly) spawned goroutine @@ -237,7 +238,7 @@ func TestContainerstdout_Lifecycle_AddBeforeStartIsNoop(t *testing.T) { // "sync: WaitGroup is reused before previous Wait has returned" if the // helper didn't guard against this. func TestContainerstdout_Lifecycle_AddAfterShutdownIsNoop(t *testing.T) { - lc := newLifecycle(slog.Default(), nil) + lc := newLifecycle(zap.NewNop(), nil) if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil { t.Fatalf("Start: %v", err) } @@ -275,7 +276,7 @@ func TestContainerstdout_Lifecycle_AddAfterShutdownIsNoop(t *testing.T) { // repo even though it lives in containerstdout-internal package. func TestContainerstdout_Lifecycle_ConcurrentAddDuringShutdown_NoPanic(t *testing.T) { const adders = 50 - lc := newLifecycle(slog.Default(), nil) + lc := newLifecycle(zap.NewNop(), nil) if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil { t.Fatalf("Start: %v", err) } diff --git a/components/receivers/containerstdout/noop_receiver.go b/components/receivers/containerstdout/noop_receiver.go index 01111ca1..c4c7c8c7 100644 --- a/components/receivers/containerstdout/noop_receiver.go +++ b/components/receivers/containerstdout/noop_receiver.go @@ -5,25 +5,32 @@ package containerstdout import ( "context" - "github.com/tracecoreai/tracecore/internal/pipeline" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/receiver" ) // noopReceiver is the disabled-path Receiver returned by the factory // when cfg.Enabled is false. Carrying the telemetry handle (even // though it's never written from this struct) keeps the wiring -// uniform - Phase 12 may extend the disabled path to record a -// "receiver_disabled" emission, and the factory wiring already -// produced the telemetry value. +// uniform — the factory already produced the telemetry value and a +// future disabled-path emission (e.g. "receiver_disabled") would have +// the sink ready. // -// Embeds pipeline.ComponentState so the lifecycle bookkeeping is -// available to the runtime - disabled receivers still flow through -// the same Start/Shutdown contract. +// Implements upstream receiver.Logs (alias for component.Component) +// directly; the v0.1.x pipeline.ComponentState embed was dropped in +// the PR-F.2 upstream-port (RFC-0013) because upstream's +// component.Component carries no equivalent mixin and the disabled +// path doesn't need Started()/Stopped() bookkeeping. type noopReceiver struct { - pipeline.ComponentState - telemetry Telemetry } +// Compile-time assertion that noopReceiver satisfies upstream's +// receiver.Logs (component.Component embedded in receiver.Logs). A +// breaking shape change in upstream surfaces here at build time rather +// than at runtime when OCB stitches the pipeline. +var _ receiver.Logs = (*noopReceiver)(nil) + // newNoopReceiver wires the disabled-path receiver. The factory // always supplies a non-nil telemetry value (noop or real); guard // here so a direct caller doesn't panic. @@ -34,15 +41,8 @@ func newNoopReceiver(telemetry Telemetry) *noopReceiver { return &noopReceiver{telemetry: telemetry} } -// Start delegates to ComponentState so Started() reports true after -// the runtime brings the receiver up; the disabled receiver does no -// work beyond that bookkeeping. -func (n *noopReceiver) Start(ctx context.Context, host pipeline.Host) error { - return n.ComponentState.Start(ctx, host) -} +// Start is a no-op — the disabled receiver does no work. +func (*noopReceiver) Start(context.Context, component.Host) error { return nil } -// Shutdown delegates to ComponentState. Returns nil - there is -// nothing to drain. -func (n *noopReceiver) Shutdown(ctx context.Context) error { - return n.ComponentState.Shutdown(ctx) -} +// Shutdown is a no-op — there is nothing to drain. +func (*noopReceiver) Shutdown(context.Context) error { return nil } diff --git a/components/receivers/containerstdout/pipeline.go b/components/receivers/containerstdout/pipeline.go index 709b75dd..9b26ddef 100644 --- a/components/receivers/containerstdout/pipeline.go +++ b/components/receivers/containerstdout/pipeline.go @@ -5,8 +5,9 @@ package containerstdout import ( "context" "errors" - "log/slog" "time" + + "go.uber.org/zap" ) // runTailerPipeline drains lines from a single Tailer through the full @@ -133,7 +134,7 @@ func (r *containerStdoutReceiver) emitRecord( if maxAttrs <= 0 { maxAttrs = DefaultMaxAttributes } - logs, dropped := newLogsForRecord(rec, maxAttrs, jsonAttrs, r.set.Telemetry.Resource) + logs, dropped := newLogsForRecord(rec, maxAttrs, jsonAttrs, r.set.Resource) if dropped > 0 { r.telemetry.IncError(KindCardinality) } @@ -247,8 +248,8 @@ func (r *containerStdoutReceiver) startTailerForPath(path string) *Tailer { // may legitimately glob over directories that include non-CRI // files during dev runs. r.logger().Debug("containerstdout: skipping non-CRI path", - slog.String("path", path), - slog.String("err", err.Error())) + zap.String("path", path), + zap.Error(err)) return nil } @@ -292,8 +293,8 @@ func (r *containerStdoutReceiver) startTailerForPath(path string) *Tailer { }) if err != nil { r.logger().Warn("containerstdout: NewTailer failed", - slog.String("path", path), - slog.String("err", err.Error())) + zap.String("path", path), + zap.Error(err)) return nil } diff --git a/components/receivers/containerstdout/pipeline_test.go b/components/receivers/containerstdout/pipeline_test.go index 9ea97853..0bb5b81c 100644 --- a/components/receivers/containerstdout/pipeline_test.go +++ b/components/receivers/containerstdout/pipeline_test.go @@ -12,6 +12,8 @@ import ( "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -19,9 +21,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" - - "github.com/tracecoreai/tracecore/internal/consumer" - "github.com/tracecoreai/tracecore/internal/pipeline/pipelinetest" ) // testRankRegex is the body-match regex shared by the @@ -107,7 +106,6 @@ func newPipelineHarness(t *testing.T, pods []*corev1.Pod, cfgMods func(*Config)) podLogRoot = logRoot t.Cleanup(func() { podLogRoot = prev }) - fx := pipelinetest.New(t) cfg := defaultConfig() cfg.Enabled = true cfg.Cursor.Dir = filepath.Join(t.TempDir(), "cursors") @@ -123,7 +121,7 @@ func newPipelineHarness(t *testing.T, pods []*corev1.Pod, cfgMods func(*Config)) tel := NewCapturingTelemetry() cons := &fakeConsumer{} - r := newReceiver(fx.CreateSettings, cfg, cons, tel) + r := newReceiver(testSettings(), cfg, cons, tel) require.NotNil(t, r) objs := make([]runtime.Object, 0, len(pods)) @@ -138,7 +136,7 @@ func newPipelineHarness(t *testing.T, pods []*corev1.Pod, cfgMods func(*Config)) client := fake.NewSimpleClientset(objs...) r.clientBuilder = func() (kubernetes.Interface, error) { return client, nil } - require.NoError(t, r.Start(context.Background(), pipelinetest.NewHost())) + require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() diff --git a/components/receivers/containerstdout/receiver.go b/components/receivers/containerstdout/receiver.go index 3d8d611a..b8abb4ff 100644 --- a/components/receivers/containerstdout/receiver.go +++ b/components/receivers/containerstdout/receiver.go @@ -6,20 +6,20 @@ import ( "context" "errors" "fmt" - "log/slog" "os" "path/filepath" "sync" "sync/atomic" "time" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - - "github.com/tracecoreai/tracecore/internal/consumer" - "github.com/tracecoreai/tracecore/internal/pipeline" ) // healthLoopInterval is the cadence on which the receiver recomputes @@ -69,21 +69,21 @@ func defaultClientBuilder() (kubernetes.Interface, error) { // containerStdoutReceiver is the M15 receiver. Phase 12 wires the full // lifecycle: pod informer, attribution cache, cursor store, rate // limiter, dataloader extractor + optional body matcher, all -// orchestrated under a single lifecycle.Lifecycle for clean cancel- -// and-wait shutdown semantics. +// orchestrated under a single lifecycle helper for clean cancel-and- +// wait shutdown semantics. // // The line-pipeline glue (tailer → CRI parse → attribution → emit) is // deferred to Phase 14. Phase 12 stands up COMPONENTS + LIFECYCLE only // so cross-cutting concerns (degraded aggregation, health loop) can be // asserted against in isolation before the integration code lands. // -// Embeds pipeline.ComponentState so Started()/Stopped() bookkeeping -// matches kernelevents + clockreceiver shape; the runtime uses those -// flags during graph shutdown to skip Components that never started. +// Implements upstream receiver.Logs (alias for component.Component) +// directly; the v0.1.x pipeline.ComponentState embed was dropped in +// the PR-F.2 upstream-port (RFC-0013) — upstream's component.Component +// carries no equivalent Started/Stopped mixin and the runtime never +// reads that bookkeeping on the upstream graph. type containerStdoutReceiver struct { - pipeline.ComponentState - - set pipeline.CreateSettings + set receiver.Settings cfg *Config next consumer.Logs telemetry Telemetry @@ -152,6 +152,12 @@ type containerStdoutReceiver struct { lastDegraded bool } +// Compile-time assertion that containerStdoutReceiver satisfies +// upstream's receiver.Logs (component.Component embedded in +// receiver.Logs). A breaking shape change in upstream surfaces here at +// build time rather than at runtime when OCB stitches the pipeline. +var _ receiver.Logs = (*containerStdoutReceiver)(nil) + // newReceiver constructs the enabled-path receiver. Components that // don't need a k8s client (CursorStore, RateLimiter, DataloaderExtractor, // BodyMatcher) are built here; components that do (PodInformer, @@ -164,7 +170,7 @@ type containerStdoutReceiver struct { // for an operator who can otherwise see fresh logs but loses one // restart's worth of resume-from-offset capability; the alpha-stability // trade-off favours degraded availability over hard refusal. -func newReceiver(set pipeline.CreateSettings, cfg *Config, next consumer.Logs, telemetry Telemetry) *containerStdoutReceiver { +func newReceiver(set receiver.Settings, cfg *Config, next consumer.Logs, telemetry Telemetry) *containerStdoutReceiver { if telemetry == nil { telemetry = newNoopTelemetry() } @@ -187,7 +193,7 @@ func newReceiver(set pipeline.CreateSettings, cfg *Config, next consumer.Logs, t cs, err := NewCursorStore(cfg.Cursor, telemetry) if err != nil { r.logger().Warn("containerstdout: cursor store unavailable; running in replay-on-restart mode", - "err", err.Error()) + zap.Error(err)) r.degradedCursor = true } else { r.cursors = cs @@ -206,7 +212,7 @@ func newReceiver(set pipeline.CreateSettings, cfg *Config, next consumer.Logs, t // and proceed with extraction disabled rather than refusing // Start. r.logger().Warn("containerstdout: dataloader extractor build failed; extraction disabled", - "err", err.Error()) + zap.Error(err)) } else { r.dataloader = de } @@ -221,7 +227,7 @@ func newReceiver(set pipeline.CreateSettings, cfg *Config, next consumer.Logs, t if err != nil { // Validate rejects bad regexes; this is a contract guard. r.logger().Warn("containerstdout: body matcher build failed; rank fallback disabled", - "err", err.Error()) + zap.Error(err)) } else { r.bodyMatcher = bm } @@ -246,13 +252,13 @@ func newReceiver(set pipeline.CreateSettings, cfg *Config, next consumer.Logs, t } // logger returns the receiver's structured logger, defaulting to -// slog.Default when the runtime did not inject one (tests typically -// pre-populate fx.CreateSettings.Telemetry.Logger). -func (r *containerStdoutReceiver) logger() *slog.Logger { - if r.set.Telemetry.Logger != nil { - return r.set.Telemetry.Logger +// zap.NewNop when the runtime did not inject one (tests typically +// pre-populate set.Logger via receivertest.NewNopSettings). +func (r *containerStdoutReceiver) logger() *zap.Logger { + if r.set.Logger != nil { + return r.set.Logger } - return slog.Default() + return zap.NewNop() } // Capabilities advertises the consumer-side contract. The receiver @@ -271,23 +277,17 @@ func (r *containerStdoutReceiver) Capabilities() consumer.Capabilities { // // Order of operations matters: // -// 1. ComponentState.Start so Started()=true before any goroutine -// spawns (the runtime gates Shutdown on Started()). -// 2. Build client. Failure here is fatal - the receiver cannot -// attribute logs without an informer. -// 3. Validate NODE_NAME. Production refuses to run without it: a +// 1. Validate NODE_NAME. Production refuses to run without it: a // daemonset replica with an unset env would re-attribute the // entire cluster's logs to a random pod, which is worse than // refusing to start. -// 4. Build informer + attribution chain. -// 5. lc.Start kicks the main run goroutine. lc.Add adds the +// 2. Build client. Failure here is fatal - the receiver cannot +// attribute logs without an informer. +// 3. Build informer + attribution chain. +// 4. lc.Start kicks the main run goroutine. lc.Add adds the // informer + eviction + health loops; Add MUST run AFTER Start // because lifecycle.Add silent-no-ops outside the running window. -func (r *containerStdoutReceiver) Start(ctx context.Context, host pipeline.Host) error { - if err := r.ComponentState.Start(ctx, host); err != nil { - return err - } - +func (r *containerStdoutReceiver) Start(ctx context.Context, _ component.Host) error { // NODE_NAME validation runs BEFORE client build so a misconfigured // daemonset gets the clearest error message - operators commonly // forget the downward-API env, and a "build clientset failed" @@ -360,10 +360,10 @@ func (r *containerStdoutReceiver) Start(ctx context.Context, host pipeline.Host) r.lc.Add(r.healthLoop) r.logger().Info("containerstdout started", - "node", nodeName, - "namespaces", r.cfg.Namespaces, - "rank_source", r.cfg.RankSource, - "include", r.cfg.Include) + zap.String("node", nodeName), + zap.Strings("namespaces", r.cfg.Namespaces), + zap.String("rank_source", r.cfg.RankSource), + zap.Strings("include", r.cfg.Include)) return nil } @@ -377,12 +377,12 @@ func (r *containerStdoutReceiver) Shutdown(ctx context.Context) error { if r.lc != nil { if err := r.lc.Shutdown(ctx); err != nil { r.logger().Warn("containerstdout: lifecycle shutdown returned error", - "err", err.Error()) + zap.Error(err)) } } r.logger().Info("containerstdout stopped", - "emitted", r.linesEmitted.Load()) - return r.ComponentState.Shutdown(ctx) + zap.Int64("emitted", r.linesEmitted.Load())) + return nil } // onPodAdd is the SharedIndexInformer ADD callback. Derives the @@ -549,8 +549,8 @@ func (r *containerStdoutReceiver) healthLoop(ctx context.Context) { r.recomputeDegraded() emitted := r.linesEmitted.Swap(0) r.logger().Debug("containerstdout: 15s window", - "lines_emitted", emitted, - "lines_per_s", float64(emitted)/healthLoopInterval.Seconds()) + zap.Int64("lines_emitted", emitted), + zap.Float64("lines_per_s", float64(emitted)/healthLoopInterval.Seconds())) r.rediscoverPaths() } } diff --git a/components/receivers/containerstdout/receiver_test.go b/components/receivers/containerstdout/receiver_test.go index ea2ca187..0bd33edb 100644 --- a/components/receivers/containerstdout/receiver_test.go +++ b/components/receivers/containerstdout/receiver_test.go @@ -10,13 +10,14 @@ import ( "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receivertest" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" - - "github.com/tracecoreai/tracecore/internal/consumer" - "github.com/tracecoreai/tracecore/internal/pipeline" - "github.com/tracecoreai/tracecore/internal/pipeline/pipelinetest" ) // testNopConsumer is a Logs consumer that drops everything. Used by @@ -28,6 +29,18 @@ func (testNopConsumer) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } +// testSettings returns receiver.Settings with a deterministic +// component ID `containerstdout/test`. Mirrors the sibling PR-B2 +// nccl_fr pattern — selftel label assertions key on the ID, so a +// per-run UUID would defeat them. receivertest.NewNopSettings carries +// every field upstream adds (BuildInfo, TelemetrySettings) so we don't +// have to hand-roll the struct. +func testSettings() receiver.Settings { + set := receivertest.NewNopSettings(componentType()) + set.ID = component.NewIDWithName(componentType(), "test") + return set +} + // newReceiverWithFake builds a *containerStdoutReceiver wired with a fake // kubernetes clientset for tests. The cursor dir is scoped to t.TempDir // so each test gets a clean checkpoint surface. cfgMods lets a test @@ -35,7 +48,6 @@ func (testNopConsumer) Capabilities() consumer.Capabilities { // receiver is constructed. func newReceiverWithFake(t *testing.T, cfgMods func(*Config)) (*containerStdoutReceiver, kubernetes.Interface) { t.Helper() - fx := pipelinetest.New(t) cfg := defaultConfig() cfg.Enabled = true cfg.Cursor.Dir = filepath.Join(t.TempDir(), "cursors") @@ -45,7 +57,7 @@ func newReceiverWithFake(t *testing.T, cfgMods func(*Config)) (*containerStdoutR require.NoError(t, cfg.Validate(), "test config must validate") tel := NewCapturingTelemetry() - r := newReceiver(fx.CreateSettings, cfg, testNopConsumer{}, tel) + r := newReceiver(testSettings(), cfg, testNopConsumer{}, tel) require.NotNil(t, r) client := fake.NewSimpleClientset() @@ -63,7 +75,7 @@ func TestReceiver_StartShutdownClean(t *testing.T) { r, _ := newReceiverWithFake(t, nil) ctx := context.Background() - require.NoError(t, r.Start(ctx, pipelinetest.NewHost())) + require.NoError(t, r.Start(ctx, componenttest.NewNopHost())) // Brief settle so the informer's reflector at least gets to register // the initial-list. Short enough not to bloat test runtime. @@ -88,7 +100,7 @@ func TestReceiver_StartFailsOnMissingNodeName(t *testing.T) { r, _ := newReceiverWithFake(t, nil) - err := r.Start(context.Background(), pipelinetest.NewHost()) + err := r.Start(context.Background(), componenttest.NewNopHost()) require.Error(t, err) require.ErrorContains(t, err, "NODE_NAME") } @@ -103,7 +115,7 @@ func TestReceiver_DegradedFlagsCompose(t *testing.T) { r, _ := newReceiverWithFake(t, nil) tel, ok := r.telemetry.(*CapturingTelemetry) - require.True(t, ok, "test fixture must provide CapturingReceiver telemetry") + require.True(t, ok, "test fixture must provide CapturingTelemetry") // Initially all-false: recompute should leave the flag at false. r.recomputeDegraded() @@ -171,7 +183,7 @@ func TestReceiver_ShutdownTwiceIsSafe(t *testing.T) { t.Setenv("NODE_NAME", "test-node") r, _ := newReceiverWithFake(t, nil) - require.NoError(t, r.Start(context.Background(), pipelinetest.NewHost())) + require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) time.Sleep(20 * time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) @@ -189,7 +201,7 @@ func TestReceiver_ConstructsAttribution(t *testing.T) { t.Setenv("NODE_NAME", "test-node") r, _ := newReceiverWithFake(t, nil) - require.NoError(t, r.Start(context.Background(), pipelinetest.NewHost())) + require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { _ = r.Shutdown(context.Background()) }) @@ -253,7 +265,6 @@ func TestReceiver_ConstructsCursorStore(t *testing.T) { // failure. The receiver in this state runs in replay-on-restart mode // - an operator-visible degraded signal but not a refusal to start. func TestReceiver_CursorMkdirFailureSetsDegraded(t *testing.T) { - fx := pipelinetest.New(t) cfg := defaultConfig() cfg.Enabled = true // Empty cursor dir = NewCursorStore returns ("cursor: empty Dir") @@ -263,7 +274,7 @@ func TestReceiver_CursorMkdirFailureSetsDegraded(t *testing.T) { require.NoError(t, cfg.Validate()) tel := NewCapturingTelemetry() - r := newReceiver(fx.CreateSettings, cfg, testNopConsumer{}, tel) + r := newReceiver(testSettings(), cfg, testNopConsumer{}, tel) require.NotNil(t, r) require.Nil(t, r.cursors, "empty Dir → CursorStore construction must fail") require.True(t, r.degradedCursor, "construct failure must set degradedCursor=true") @@ -281,11 +292,15 @@ func TestReceiver_ClientBuilderError(t *testing.T) { wantErr := errors.New("simulated client build failure") r.clientBuilder = func() (kubernetes.Interface, error) { return nil, wantErr } - err := r.Start(context.Background(), pipelinetest.NewHost()) + err := r.Start(context.Background(), componenttest.NewNopHost()) require.Error(t, err) require.ErrorIs(t, err, wantErr) require.True(t, r.degradedInformer, "client-build failure must set degradedInformer=true") } -var _ pipeline.Receiver = (*containerStdoutReceiver)(nil) +// Compile-time assertion: the receiver satisfies upstream receiver.Logs +// (component.Component embedded in receiver.Logs). A breaking shape +// change in upstream surfaces here at build time rather than at runtime +// when OCB stitches the pipeline. +var _ receiver.Logs = (*containerStdoutReceiver)(nil) diff --git a/components/receivers/containerstdout/selftel.go b/components/receivers/containerstdout/selftel.go index 3237872e..109e9b07 100644 --- a/components/receivers/containerstdout/selftel.go +++ b/components/receivers/containerstdout/selftel.go @@ -48,10 +48,9 @@ import ( "sync/atomic" "time" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" - - "github.com/tracecoreai/tracecore/internal/pipeline" ) // Kind is a low-cardinality error-class identifier. Named type catches @@ -169,7 +168,7 @@ var _ Telemetry = noopTelemetry{} // instruments the v0.1.x internal selftelemetry package registered, so // scraped metric names + label shape are unchanged across the // migration. Unexported: factory is the only non-test caller. -func newTelemetry(id pipeline.ID, mp metric.MeterProvider) (Telemetry, error) { +func newTelemetry(id component.ID, mp metric.MeterProvider) (Telemetry, error) { if mp == nil { return nil, errNilMeterProvider } diff --git a/components/receivers/containerstdout/selftel_test.go b/components/receivers/containerstdout/selftel_test.go index 061ef4b1..5ac6c72b 100644 --- a/components/receivers/containerstdout/selftel_test.go +++ b/components/receivers/containerstdout/selftel_test.go @@ -10,11 +10,10 @@ import ( "testing" "time" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/otel/attribute" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" - - "github.com/tracecoreai/tracecore/internal/pipeline" ) // newTestMeterProvider builds an SDK MeterProvider backed by a @@ -79,8 +78,8 @@ func kvMatch(dp metricdata.DataPoint[int64], want map[string]string) bool { return true } -func selftelTestID() pipeline.ID { - return pipeline.MustNewID(pipeline.MustNewType("containerstdout"), "test") +func selftelTestID() component.ID { + return component.NewIDWithName(component.MustNewType("containerstdout"), "test") } func dumpNames(rm metricdata.ResourceMetrics) string { diff --git a/components/receivers/containerstdout/tailer.go b/components/receivers/containerstdout/tailer.go index 0b1c6a6d..8428703c 100644 --- a/components/receivers/containerstdout/tailer.go +++ b/components/receivers/containerstdout/tailer.go @@ -9,10 +9,11 @@ import ( "errors" "fmt" "io" - "log/slog" "os" "sync" "time" + + "go.uber.org/zap" ) // TruncationMarker is the suffix appended when a single log line @@ -105,10 +106,10 @@ type TailerOptions struct { // that don't care about telemetry can omit the field. Telemetry Telemetry - // Logger is the structured logger; nil falls back to a discard - // handler so the tailer never panics on a missing logger and tests + // Logger is the structured logger; nil falls back to a no-op zap + // logger so the tailer never panics on a missing logger and tests // don't have to stand up a log sink. - Logger *slog.Logger + Logger *zap.Logger // OnRotation is called when the file's fingerprint changes mid- // tail. Receivers typically wire this to CursorStore.Save with the @@ -227,7 +228,7 @@ func newTailer(_ context.Context, opts TailerOptions, nowFn func() time.Time) (* opts.Telemetry = newNoopTelemetry() } if opts.Logger == nil { - opts.Logger = slog.New(slog.NewTextHandler(io.Discard, nil)) + opts.Logger = zap.NewNop() } if nowFn == nil { nowFn = time.Now @@ -330,8 +331,8 @@ func (t *Tailer) poll(ctx context.Context) bool { fp, fpErr := t.readFingerprint() if fpErr != nil { t.opts.Logger.Warn("tailer: fingerprint failed", - slog.String("path", t.opts.Path), - slog.String("error", fpErr.Error()), + zap.String("path", t.opts.Path), + zap.Error(fpErr), ) t.closeFile() return true @@ -369,8 +370,8 @@ func (t *Tailer) ensureOpen() bool { if err != nil { if !os.IsNotExist(err) { t.opts.Logger.Warn("tailer: open failed", - slog.String("path", t.opts.Path), - slog.String("error", err.Error()), + zap.String("path", t.opts.Path), + zap.Error(err), ) } return false @@ -379,8 +380,8 @@ func (t *Tailer) ensureOpen() bool { if t.offset > 0 { if _, err := t.file.Seek(t.offset, io.SeekStart); err != nil { t.opts.Logger.Warn("tailer: seek failed", - slog.String("path", t.opts.Path), - slog.String("error", err.Error()), + zap.String("path", t.opts.Path), + zap.Error(err), ) t.closeFile() return false @@ -412,8 +413,8 @@ func (t *Tailer) handleRotation(fp string) bool { f, err := os.Open(t.opts.Path) //nolint:gosec // operator-configured log path if err != nil { t.opts.Logger.Warn("tailer: reopen after rotation failed", - slog.String("path", t.opts.Path), - slog.String("error", err.Error()), + zap.String("path", t.opts.Path), + zap.Error(err), ) return false } @@ -428,8 +429,8 @@ func (t *Tailer) statSize() (int64, bool) { info, err := t.file.Stat() if err != nil { t.opts.Logger.Warn("tailer: stat failed", - slog.String("path", t.opts.Path), - slog.String("error", err.Error()), + zap.String("path", t.opts.Path), + zap.Error(err), ) t.closeFile() return 0, false @@ -448,8 +449,8 @@ func (t *Tailer) handleTruncate(size int64) bool { t.lineBuf = t.lineBuf[:0] if _, err := t.file.Seek(0, io.SeekStart); err != nil { t.opts.Logger.Warn("tailer: seek-zero after truncate failed", - slog.String("path", t.opts.Path), - slog.String("error", err.Error()), + zap.String("path", t.opts.Path), + zap.Error(err), ) t.closeFile() return false @@ -477,8 +478,8 @@ func (t *Tailer) readAvail(ctx context.Context, size int64) (bool, int64) { if readErr != nil { if !errors.Is(readErr, io.EOF) { t.opts.Logger.Warn("tailer: read failed", - slog.String("path", t.opts.Path), - slog.String("error", readErr.Error()), + zap.String("path", t.opts.Path), + zap.Error(readErr), ) } break diff --git a/go.mod b/go.mod index 720466c3..e24062be 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/santhosh-tekuri/jsonschema/v6 v6.0.2 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/collector/component v1.59.0 + go.opentelemetry.io/collector/component/componenttest v0.153.0 go.opentelemetry.io/collector/consumer v1.59.0 go.opentelemetry.io/collector/pdata v1.59.0 go.opentelemetry.io/collector/receiver v1.59.0 @@ -229,7 +230,6 @@ require ( go-simpler.org/sloglint v0.11.0 // indirect go.augendre.info/fatcontext v0.8.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/collector/component/componenttest v0.153.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.153.0 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.153.0 // indirect go.opentelemetry.io/collector/featuregate v1.59.0 // indirect