Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 48 additions & 50 deletions components/receivers/containerstdout/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"context"
"fmt"

"github.com/tracecoreai/tracecore/internal/consumer"
"github.com/tracecoreai/tracecore/internal/pipeline"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
)

// ComponentType is the canonical receiver-factory ID. Centralized so
Expand All @@ -17,59 +19,55 @@ import (
// drift between this constant and the YAML key.
const ComponentType = "containerstdout"

// componentType is wrapped in a function so the pipeline.MustNewType
// call is not a top-level side effect - defensive habit (the literal
// is regex-safe but the pattern is reusable; mirrors kernelevents +
// clockreceiver).
func componentType() pipeline.Type { return pipeline.MustNewType(ComponentType) }
// componentType is wrapped in a function so the component.MustNewType
// call is not a top-level side effect defensive habit (the literal
// is regex-safe but the pattern is reusable; mirrors nccl_fr +
// clockreceiver after the PR-B2/PR-F.2 upstream port).
func componentType() component.Type { return component.MustNewType(ComponentType) }

// Factory is the package-scoped ReceiverFactory for containerstdout.
// Only CreateLogs returns a real Receiver; CreateMetrics and
// CreateTraces return pipeline.ErrSignalNotSupported. Mirrors the
// kernelevents + k8sevents shape - M15 receiver authors copy this
// pattern.
var Factory pipeline.ReceiverFactory = &factory{}
// stability is the OCB-surfaced stability level for containerstdout's
// logs signal. Beta tracks "metrics + label shape pinned; behavior may
// evolve" — same level the receiver has carried since the v0.1.x
// internal/pipeline factory; the PR-F.2 upstream swap preserves it so
// OCB-surfaced metadata doesn't regress.
const stability = component.StabilityLevelBeta

// NewFactory returns the package-var Factory. Required by
// tools/components-gen, which generates calls like
// `containerstdout.NewFactory()` against the codegen-emitted
// cmd/tracecore/components.go.
func NewFactory() pipeline.ReceiverFactory { return Factory }

type factory struct{}

func (*factory) Type() pipeline.Type { return componentType() }

func (*factory) CreateDefaultConfig() pipeline.Config { return defaultConfig() }

func (*factory) CreateMetrics(_ context.Context, _ pipeline.CreateSettings, _ pipeline.Config, _ consumer.Metrics) (pipeline.Receiver, error) {
return nil, pipeline.ErrSignalNotSupported
// NewFactory returns the upstream receiver.Factory for containerstdout.
// Mirrors the upstream-contrib pattern (otlpreceiver, filelogreceiver)
// and the sibling PR-B2 nccl_fr port — callers construct via
// `containerstdout.NewFactory()` rather than a package var, so each
// OCB-stitched pipeline gets a freshly-built factory and the package
// surface stays a single exported symbol.
//
// Only the logs signal returns a real Receiver; metrics + traces
// surface upstream's "signal not supported" naturally via
// receiver.NewFactory's default unimplemented behavior (no need for a
// hand-rolled sentinel like the v0.1.x `pipeline.ErrSignalNotSupported`).
func NewFactory() receiver.Factory {
return receiver.NewFactory(
componentType(),
createDefaultConfig,
receiver.WithLogs(createLogs, stability),
)
}

func (*factory) CreateTraces(_ context.Context, _ pipeline.CreateSettings, _ pipeline.Config, _ consumer.Traces) (pipeline.Receiver, error) {
return nil, pipeline.ErrSignalNotSupported
}
// createDefaultConfig matches upstream component.CreateDefaultConfigFunc.
func createDefaultConfig() component.Config { return defaultConfig() }

// CreateLogs wires the containerstdout receiver. The type-assertion
// guard returns a typed error rather than a panic so config-loader
// regressions surface as a clean exit-2.
// createLogs is the receiver.CreateLogsFunc wired by WithLogs.
//
// When cfg.Enabled is false the factory returns a no-op Receiver
// (newNoopReceiver) so operators who leave the receiver off pay no
// runtime cost beyond the type-resolution. When enabled the factory
// validates the config (Validate short-circuits on Enabled=false, so
// the call here only runs on the enabled branch) and constructs the
// real receiver stub - Phase 12 will replace newReceiver's
// Start/Shutdown body with the lifecycle wiring; this phase only
// has to satisfy the pipeline.Receiver contract.
// real receiver.
//
// Self-telemetry follows the kernelevents sibling pattern: the noop
// default keeps the hot path nil-safe; the real Telemetry is wired
// when set.Telemetry.MeterProvider is non-nil; init failures are
// surfaced via recordInitError so operators see when they're running
// silent. RFC-0013 PR-F replaces the v0.1.x dependency on
// internal/selftelemetry with the receiver-scoped sibling here.
func (*factory) CreateLogs(ctx context.Context, set pipeline.CreateSettings, cfg pipeline.Config, next consumer.Logs) (pipeline.Receiver, error) {
// Self-telemetry follows the kernelevents/nccl_fr sibling pattern:
// the noop default keeps the hot path nil-safe; the real Telemetry is
// wired when set.MeterProvider is non-nil; init failures are surfaced
// via recordInitError so operators see when they're running silent.
func createLogs(ctx context.Context, set receiver.Settings, cfg component.Config, next consumer.Logs) (receiver.Logs, error) {
c, ok := cfg.(*Config)
if !ok {
return nil, fmt.Errorf("containerstdout: unexpected config type %T", cfg)
Expand All @@ -79,18 +77,18 @@ func (*factory) CreateLogs(ctx context.Context, set pipeline.CreateSettings, cfg
}

telemetry := newNoopTelemetry()
if set.Telemetry.MeterProvider != nil {
if rt, err := newTelemetry(set.ID, set.Telemetry.MeterProvider); err == nil {
if set.MeterProvider != nil {
if rt, err := newTelemetry(set.ID, set.MeterProvider); err == nil {
telemetry = rt
} else {
recordInitError(ctx, set.Telemetry.MeterProvider,
recordInitError(ctx, set.MeterProvider,
"receiver", set.ID.String(), reasonInstrumentRegister)
if set.Telemetry.Logger != nil {
set.Telemetry.Logger.Warn("containerstdout self-telemetry init failed; using noop", "err", err)
if set.Logger != nil {
set.Logger.Warn("containerstdout self-telemetry init failed; using noop", zap.Error(err))
}
}
} else if set.Telemetry.Logger != nil {
set.Telemetry.Logger.Warn("containerstdout: no MeterProvider; self-telemetry using noop")
} else if set.Logger != nil {
set.Logger.Warn("containerstdout: no MeterProvider; self-telemetry using noop")
}

if !c.Enabled {
Expand Down
91 changes: 30 additions & 61 deletions components/receivers/containerstdout/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/receiver/receivertest"

"github.com/tracecoreai/tracecore/components/receivers/containerstdout"
"github.com/tracecoreai/tracecore/internal/consumer"
"github.com/tracecoreai/tracecore/internal/pipeline"
"github.com/tracecoreai/tracecore/internal/pipeline/pipelinetest"
)

// nopLogsConsumer is a test-only consumer.Logs that swallows pushes.
Expand All @@ -23,59 +24,58 @@ func (nopLogsConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

// wrongConfig is a pipeline.Config that is NOT *containerstdout.Config -
// wrongConfig is a component.Config that is NOT *containerstdout.Config
// used to exercise the factory's type-assertion guard.
type wrongConfig struct{}

func (*wrongConfig) Validate() error { return nil }

// TestFactory_Type pins the registered component-type literal. The
// string is operator-facing (YAML key) and load-bearing for
// components.yaml registration; renames go through an RFC, not a
// silent string drift.
func TestFactory_Type(t *testing.T) {
t.Parallel()
require.Equal(t, "containerstdout", containerstdout.Factory.Type().String())
require.Equal(t, "containerstdout", containerstdout.NewFactory().Type().String())
}

// TestFactory_CreateDefaultConfig pins that the factory returns
// the same *Config shape that the package-private defaultConfig()
// would - equal by value, not just type-asserted. Catches drift
// would equal by value, not just type-asserted. Catches drift
// between the factory and the canonical defaults.
func TestFactory_CreateDefaultConfig(t *testing.T) {
t.Parallel()
cfg := containerstdout.Factory.CreateDefaultConfig()
cfg := containerstdout.NewFactory().CreateDefaultConfig()
require.IsType(t, &containerstdout.Config{}, cfg)
c, ok := cfg.(*containerstdout.Config)
require.True(t, ok, "CreateDefaultConfig must return *Config")
// Disabled by default (alpha-stability surface).
require.False(t, c.Enabled, "default Enabled must be false (alpha; opt-in)")
// Validate must accept the default - operators who deploy with
// Validate must accept the default operators who deploy with
// `containerstdout: {}` (defaults only) must not get a
// config-load error.
require.NoError(t, c.Validate(), "default config must validate")
}

// TestFactory_CreateLogs_DisabledShortCircuit pins the disabled
// short-circuit: when Enabled=false the factory returns a real
// pipeline.Receiver whose Start/Shutdown are no-ops returning nil.
// receiver.Logs whose Start/Shutdown are no-ops returning nil.
// Operators who leave the receiver off must not pay any runtime
// cost beyond the type-resolution.
func TestFactory_CreateLogs_DisabledShortCircuit(t *testing.T) {
t.Parallel()
fx := pipelinetest.New(t)
cfg, ok := containerstdout.Factory.CreateDefaultConfig().(*containerstdout.Config)
factory := containerstdout.NewFactory()
set := receivertest.NewNopSettings(component.MustNewType(containerstdout.ComponentType))
cfg, ok := factory.CreateDefaultConfig().(*containerstdout.Config)
require.True(t, ok, "default config must be *Config")
// defaultConfig() leaves Enabled=false - assert it explicitly so
// defaultConfig() leaves Enabled=false assert it explicitly so
// a future default flip would surface here loud.
require.False(t, cfg.Enabled, "test precondition: default must be disabled")

rcv, err := containerstdout.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, nopLogsConsumer{})
rcv, err := factory.CreateLogs(t.Context(), set, cfg, nopLogsConsumer{})
require.NoError(t, err)
require.NotNil(t, rcv)

// Start + Shutdown must succeed without side effects.
require.NoError(t, rcv.Start(context.Background(), fx.Host))
require.NoError(t, rcv.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, rcv.Shutdown(context.Background()))
}

Expand All @@ -86,13 +86,14 @@ func TestFactory_CreateLogs_DisabledShortCircuit(t *testing.T) {
// short-circuits to nil per RFC-0010 §Configuration surface).
func TestFactory_CreateLogs_RejectsInvalidConfig(t *testing.T) {
t.Parallel()
fx := pipelinetest.New(t)
cfg, ok := containerstdout.Factory.CreateDefaultConfig().(*containerstdout.Config)
factory := containerstdout.NewFactory()
set := receivertest.NewNopSettings(component.MustNewType(containerstdout.ComponentType))
cfg, ok := factory.CreateDefaultConfig().(*containerstdout.Config)
require.True(t, ok, "default config must be *Config")
cfg.Enabled = true
cfg.RankSource = "not-a-real-source" // fails validateRankSource

rcv, err := containerstdout.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, nopLogsConsumer{})
rcv, err := factory.CreateLogs(t.Context(), set, cfg, nopLogsConsumer{})
require.Nil(t, rcv)
require.Error(t, err)
require.ErrorContains(t, err, "containerstdout")
Expand All @@ -102,15 +103,10 @@ func TestFactory_CreateLogs_RejectsInvalidConfig(t *testing.T) {
// TestFactory_CreateLogs_WiresSelftelemetry pins the self-telemetry
// wiring contract: a valid enabled config wired with a real
// MeterProvider (the test fixture's noop MeterProvider counts as
// "real" for this contract - it satisfies the newTelemetry signature)
// "real" for this contract it satisfies the newTelemetry signature)
// produces a Receiver. The factory must not return nil or error on a
// happy-path enabled config.
//
// The factory wires a non-noop containerstdout Telemetry when
// set.Telemetry.MeterProvider is non-nil and the noop default
// otherwise - both branches must yield a non-nil internal field so
// hot-path callers don't nil-check.
//
// Phase 12 added real Start prerequisites (NODE_NAME + reachable
// in-cluster k8s client); the wiring proxy is now "CreateLogs returns
// a non-nil Receiver and Start fails with a clean wrapped error
Expand All @@ -126,19 +122,20 @@ func TestFactory_CreateLogs_WiresSelftelemetry(t *testing.T) {
// missing-env error rather than an in-cluster-config failure that
// depends on the host's filesystem state.
t.Setenv("NODE_NAME", "")
fx := pipelinetest.New(t)
cfg, ok := containerstdout.Factory.CreateDefaultConfig().(*containerstdout.Config)
factory := containerstdout.NewFactory()
set := receivertest.NewNopSettings(component.MustNewType(containerstdout.ComponentType))
cfg, ok := factory.CreateDefaultConfig().(*containerstdout.Config)
require.True(t, ok, "default config must be *Config")
cfg.Enabled = true // happy path

rcv, err := containerstdout.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, nopLogsConsumer{})
rcv, err := factory.CreateLogs(t.Context(), set, cfg, nopLogsConsumer{})
require.NoError(t, err)
require.NotNil(t, rcv)

// Start must fail cleanly (NODE_NAME missing); the wrapped error
// confirms the receiver reached Start without nil-deref panics -
// confirms the receiver reached Start without nil-deref panics
// the M2 telemetry-wiring proxy.
startErr := rcv.Start(context.Background(), fx.Host)
startErr := rcv.Start(context.Background(), componenttest.NewNopHost())
require.Error(t, startErr)
require.ErrorContains(t, startErr, "NODE_NAME")
// Shutdown must remain idempotent even after a failed Start.
Expand All @@ -150,39 +147,11 @@ func TestFactory_CreateLogs_WiresSelftelemetry(t *testing.T) {
// operator-facing component name + "config type", not a panic.
func TestFactory_CreateLogs_RejectsWrongConfigType(t *testing.T) {
t.Parallel()
fx := pipelinetest.New(t)
rcv, err := containerstdout.Factory.CreateLogs(t.Context(), fx.CreateSettings, &wrongConfig{}, nopLogsConsumer{})
factory := containerstdout.NewFactory()
set := receivertest.NewNopSettings(component.MustNewType(containerstdout.ComponentType))
rcv, err := factory.CreateLogs(t.Context(), set, &wrongConfig{}, nopLogsConsumer{})
require.Nil(t, rcv)
require.Error(t, err)
require.ErrorContains(t, err, "containerstdout")
require.ErrorContains(t, err, "config")
}

// TestFactory_CreateMetrics_Unsupported and TestFactory_CreateTraces_Unsupported
// pin that the logs-only factory returns the sentinel for the two
// unsupported signals - operators get a clear "signal not supported"
// rather than a panic-on-nil-receiver downstream.
func TestFactory_CreateMetrics_Unsupported(t *testing.T) {
t.Parallel()
fx := pipelinetest.New(t)
rcv, err := containerstdout.Factory.CreateMetrics(t.Context(), fx.CreateSettings, containerstdout.Factory.CreateDefaultConfig(), nil)
require.Nil(t, rcv)
require.ErrorIs(t, err, pipeline.ErrSignalNotSupported)
}

func TestFactory_CreateTraces_Unsupported(t *testing.T) {
t.Parallel()
fx := pipelinetest.New(t)
rcv, err := containerstdout.Factory.CreateTraces(t.Context(), fx.CreateSettings, containerstdout.Factory.CreateDefaultConfig(), nil)
require.Nil(t, rcv)
require.ErrorIs(t, err, pipeline.ErrSignalNotSupported)
}

// TestNewFactory_ReturnsPackageVarFactory pins that NewFactory and
// the package-var Factory are the same value. tools/components-gen
// emits `containerstdout.NewFactory()` against the generated
// components.go.
func TestNewFactory_ReturnsPackageVarFactory(t *testing.T) {
t.Parallel()
require.Same(t, containerstdout.Factory, containerstdout.NewFactory())
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"reflect"
Expand All @@ -30,6 +28,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -48,11 +47,11 @@ func (failureModesInternalStubSource) Lookup(_ PathInfo) (AttributionRef, error)
return AttributionRef{Found: false}, nil
}

// failureModesTestLogger returns a discard-output slog logger so the
// failureModesTestLogger returns a discard-output zap logger so the
// informer-flap test's reflector spam doesn't pollute `go test -v`
// output.
func failureModesTestLogger() *slog.Logger {
return slog.New(slog.NewTextHandler(io.Discard, nil))
func failureModesTestLogger() *zap.Logger {
return zap.NewNop()
}

// TestFailure_AttributionCardinalityOverflow falsifier: insert 5
Expand Down
Loading
Loading