diff --git a/components/receivers/kernelevents/bench_test.go b/components/receivers/kernelevents/bench_test.go index 52232ca8..a94b98b3 100644 --- a/components/receivers/kernelevents/bench_test.go +++ b/components/receivers/kernelevents/bench_test.go @@ -6,7 +6,6 @@ import ( "context" "fmt" "io" - "log/slog" "runtime" "sort" "strings" @@ -16,13 +15,15 @@ import ( "time" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/zap" "github.com/tracecoreai/tracecore/components/receivers/kernelevents" - "github.com/tracecoreai/tracecore/internal/consumer" - "github.com/tracecoreai/tracecore/internal/pipeline" - "github.com/tracecoreai/tracecore/internal/pipeline/pipelinetest" "github.com/tracecoreai/tracecore/internal/sli" ) @@ -55,16 +56,16 @@ func BenchmarkReceiver_KmsgWithDegradedJournald(b *testing.B) { func runBench(b *testing.B, kmsg, journald bool) { b.Helper() sink := &countingSink{} - cfg, _ := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config) + cfg, _ := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config) cfg.Kmsg.Enabled = kmsg cfg.Journald.Enabled = journald cfg.MinSeverity = severityDebug // pass everything - // pipelinetest.New takes *testing.T; benchmarks fabricate - // minimal CreateSettings + Host inline. + // receivertest.NewNopSettings takes a component.Type; benchmarks + // fabricate a discard-logger settings + nop host inline so the + // bench loop doesn't surface log noise from the factory. settings, host := benchSettings() - _ = settings // referenced via rcv construction below - rcv, err := kernelevents.Factory.CreateLogs(b.Context(), settings, cfg, sink) + rcv, err := kernelevents.NewFactory().CreateLogs(b.Context(), settings, cfg, sink) require.NoError(b, err) // Feed kmsg from a memory reader. @@ -142,12 +143,11 @@ func TestReceiver_SLIBudget(t *testing.T) { const targetHeapMiB = 50.0 sink := &countingSink{latencies: make([]time.Duration, 0, totalEvents)} - cfg, _ := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config) + cfg, _ := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config) cfg.Journald.Enabled = false cfg.MinSeverity = severityDebug - fx := pipelinetest.New(t) - rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, sink) + rcv, err := kernelevents.NewFactory().CreateLogs(t.Context(), testSettings(t), cfg, sink) require.NoError(t, err) fixture := strings.Repeat("3,1,1,-;NVRM: Xid (PCI:0000:65:00.0): 79, pid=0\n", totalEvents) @@ -155,7 +155,7 @@ func TestReceiver_SLIBudget(t *testing.T) { return io.NopCloser(strings.NewReader(fixture)), nil }) - require.NoError(t, rcv.Start(t.Context(), fx.Host)) + require.NoError(t, rcv.Start(t.Context(), componenttest.NewNopHost())) t.Cleanup(func() { _ = rcv.Shutdown(t.Context()) }) startWall := time.Now() @@ -308,13 +308,11 @@ func cpuNs() int64 { // false negatives in CI. func isRaceBuild() bool { return raceEnabled } -// benchSettings fabricates a minimal CreateSettings + Host for -// benchmarks (pipelinetest.New is *testing.T-bound). -func benchSettings() (pipeline.CreateSettings, pipeline.Host) { - return pipeline.CreateSettings{ - Telemetry: pipeline.TelemetrySettings{ - Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), - Resource: pcommon.NewResource(), - }, - }, pipelinetest.NewHost() +// benchSettings fabricates a minimal receiver.Settings + Host for +// benchmarks (testSettings is *testing.T-bound). The logger is the +// zap.NewNop so the bench loop doesn't pay log-format costs. +func benchSettings() (receiver.Settings, component.Host) { + s := receivertest.NewNopSettings(component.MustNewType("kernelevents")) + s.Logger = zap.NewNop() + return s, componenttest.NewNopHost() } diff --git a/components/receivers/kernelevents/cascade_test.go b/components/receivers/kernelevents/cascade_test.go index 1aa7ccec..8dc1c26f 100644 --- a/components/receivers/kernelevents/cascade_test.go +++ b/components/receivers/kernelevents/cascade_test.go @@ -11,9 +11,9 @@ import ( "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" "github.com/tracecoreai/tracecore/components/receivers/kernelevents" - "github.com/tracecoreai/tracecore/internal/pipeline/pipelinetest" ) // TestReceiver_CancelCascadesToSources is the A+ criterion #4 @@ -25,11 +25,10 @@ func TestReceiver_CancelCascadesToSources(t *testing.T) { t.Parallel() sink := newLogsSink() - cfg, _ := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config) + cfg, _ := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config) cfg.Journald.Enabled = false // kmsg-only - fx := pipelinetest.New(t) - rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, sink) + rcv, err := kernelevents.NewFactory().CreateLogs(t.Context(), testSettings(t), cfg, sink) require.NoError(t, err) // Blocking reader: only Close() makes Read return. If ctx @@ -42,7 +41,7 @@ func TestReceiver_CancelCascadesToSources(t *testing.T) { }) startCtx, cancelStart := context.WithCancel(t.Context()) - require.NoError(t, rcv.Start(startCtx, fx.Host)) + require.NoError(t, rcv.Start(startCtx, componenttest.NewNopHost())) // Let the source begin reading (block in Read). time.Sleep(50 * time.Millisecond) @@ -93,10 +92,9 @@ func TestReceiver_BothSourcesEnabled_DegradedDualSource(t *testing.T) { t.Parallel() sink := newLogsSink() - cfg, _ := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config) + cfg, _ := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config) - fx := pipelinetest.New(t) - rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, sink) + rcv, err := kernelevents.NewFactory().CreateLogs(t.Context(), testSettings(t), cfg, sink) require.NoError(t, err) kernelevents.SetKmsgOpenForTest(rcv, func() (io.ReadCloser, error) { @@ -106,6 +104,6 @@ func TestReceiver_BothSourcesEnabled_DegradedDualSource(t *testing.T) { return "", io.ErrClosedPipe }) - require.NoError(t, rcv.Start(t.Context(), fx.Host)) + require.NoError(t, rcv.Start(t.Context(), componenttest.NewNopHost())) require.NoError(t, rcv.Shutdown(t.Context())) } diff --git a/components/receivers/kernelevents/cpu_linux_test.go b/components/receivers/kernelevents/cpu_linux_test.go index a4dc93d1..b2cfd0b3 100644 --- a/components/receivers/kernelevents/cpu_linux_test.go +++ b/components/receivers/kernelevents/cpu_linux_test.go @@ -14,9 +14,9 @@ import ( "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" "github.com/tracecoreai/tracecore/components/receivers/kernelevents" - "github.com/tracecoreai/tracecore/internal/pipeline/pipelinetest" ) // TestReceiver_CPUBudget_Linux measures per-event CPU cost using @@ -49,12 +49,11 @@ func TestReceiver_CPUBudget_Linux(t *testing.T) { ) sink := &countingSink{latencies: make([]time.Duration, 0, targetEvents)} - cfg, _ := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config) + cfg, _ := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config) cfg.Journald.Enabled = false cfg.MinSeverity = severityDebug - fx := pipelinetest.New(t) - rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, sink) + rcv, err := kernelevents.NewFactory().CreateLogs(t.Context(), testSettings(t), cfg, sink) require.NoError(t, err) fixture := strings.Repeat("3,1,1,-;NVRM: Xid (PCI:0000:65:00.0): 79, pid=0\n", targetEvents) @@ -62,7 +61,7 @@ func TestReceiver_CPUBudget_Linux(t *testing.T) { return io.NopCloser(strings.NewReader(fixture)), nil }) - require.NoError(t, rcv.Start(t.Context(), fx.Host)) + require.NoError(t, rcv.Start(t.Context(), componenttest.NewNopHost())) t.Cleanup(func() { _ = rcv.Shutdown(t.Context()) }) cpuBefore := readCPUNs(t) diff --git a/components/receivers/kernelevents/example_test.go b/components/receivers/kernelevents/example_test.go index 9cb45349..24036a9a 100644 --- a/components/receivers/kernelevents/example_test.go +++ b/components/receivers/kernelevents/example_test.go @@ -27,7 +27,7 @@ func ExampleNewFactory() { // only one source enabled — the partial-source operating mode that // non-systemd hosts (Alpine, BSD) rely on. func Example_partialSource() { - cfg, ok := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config) + cfg, ok := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config) if !ok { fmt.Println("unexpected config type") return @@ -61,7 +61,7 @@ func TestExampleConfig_Parses(t *testing.T) { require.NoError(t, yaml.Unmarshal(bs, &doc), "example_config.yaml must parse as YAML") - cfg, ok := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config) + cfg, ok := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config) require.True(t, ok, "factory must produce *Config") require.NoError(t, doc.Receivers.Kernelevents.Decode(cfg), "receivers.kernelevents block must decode into Config") diff --git a/components/receivers/kernelevents/export_test.go b/components/receivers/kernelevents/export_test.go index 4a1efc0d..d105d371 100644 --- a/components/receivers/kernelevents/export_test.go +++ b/components/receivers/kernelevents/export_test.go @@ -4,12 +4,12 @@ package kernelevents import ( "io" - "log/slog" "sync" "sync/atomic" "time" - "github.com/tracecoreai/tracecore/internal/pipeline" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" ) // FakeTelemetry is a test-only captor for the receiver-scoped @@ -83,7 +83,7 @@ var _ selfTelemetry = (*FakeTelemetry)(nil) // WithSelfTelemetryForTest swaps the receiver's telemetry hook AND // every source's telemetry hook so external _test packages can assert // on counter calls. -func WithSelfTelemetryForTest(rcv pipeline.Receiver, t *FakeTelemetry) { +func WithSelfTelemetryForTest(rcv receiver.Logs, t *FakeTelemetry) { r, ok := rcv.(*kernelEventsReceiver) if !ok { return @@ -107,7 +107,7 @@ func NewKmsgSourceForTest(cfg *Config, open func() (io.ReadCloser, error), t *Fa if t == nil { telemetry = newNoopSelfTelemetry() } - s := newKmsgSource(cfg, slog.Default(), make(chan parsedRecord, 8), telemetry) + s := newKmsgSource(cfg, zap.NewNop(), make(chan parsedRecord, 8), telemetry) s.open = open return &KmsgSourceForTest{s: s} } @@ -124,7 +124,7 @@ func (k *KmsgSourceForTest) Name() string { return k.s.Name() } // SetKmsgOpenForTest swaps the receiver's kmsg-open hook so external // _test packages can drive the source from a fixture file or // in-memory reader instead of /dev/kmsg. -func SetKmsgOpenForTest(rcv pipeline.Receiver, open func() (io.ReadCloser, error)) { +func SetKmsgOpenForTest(rcv receiver.Logs, open func() (io.ReadCloser, error)) { r, ok := rcv.(*kernelEventsReceiver) if !ok { return @@ -133,7 +133,7 @@ func SetKmsgOpenForTest(rcv pipeline.Receiver, open func() (io.ReadCloser, error } // SetJournaldFinderForTest swaps the journald binary finder. -func SetJournaldFinderForTest(rcv pipeline.Receiver, find func() (string, error)) { +func SetJournaldFinderForTest(rcv receiver.Logs, find func() (string, error)) { r, ok := rcv.(*kernelEventsReceiver) if !ok { return diff --git a/components/receivers/kernelevents/factory.go b/components/receivers/kernelevents/factory.go index b72e8870..14b8b546 100644 --- a/components/receivers/kernelevents/factory.go +++ b/components/receivers/kernelevents/factory.go @@ -6,43 +6,47 @@ import ( "context" "fmt" - "github.com/tracecoreai/tracecore/internal/consumer" - "github.com/tracecoreai/tracecore/internal/pipeline" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" ) -// componentType is wrapped in a function so the pipeline.MustNewType -// call is not a top-level side effect — defensive habit (the literal -// is regex-safe but the pattern is reusable). -func componentType() pipeline.Type { return pipeline.MustNewType("kernelevents") } +// componentType is the kind name registered in components.yaml. Wrapped +// in a function so the MustNewType call is not a top-level side effect +// (mirrors the nccl_fr / clockreceiver pattern). +func componentType() component.Type { return component.MustNewType("kernelevents") } -// Factory is the package-scoped ReceiverFactory for kernelevents. -// Mirrors clockreceiver.Factory in shape — M10+ receiver authors who -// want a streaming-source (vs tick-based) exemplar copy this. -// -// Only CreateLogs returns a real Receiver; CreateMetrics and -// CreateTraces return pipeline.ErrSignalNotSupported. -var Factory pipeline.ReceiverFactory = &factory{} - -// NewFactory returns the package-var Factory. Required by -// tools/components-gen, which generates calls like -// `kernelevents.NewFactory()`. -func NewFactory() pipeline.ReceiverFactory { return Factory } - -type factory struct{} - -func (*factory) Type() pipeline.Type { return componentType() } +// stability is the OCB-surfaced stability level for kernelevents' logs +// signal. Beta tracks "metrics + label shape pinned; behavior may +// evolve" — same level the receiver has carried since the v0.1.x +// internal/pipeline factory; PR-B3 preserves it across the upstream +// swap. +const stability = component.StabilityLevelBeta -func (*factory) CreateDefaultConfig() pipeline.Config { return defaultConfig() } - -func (*factory) CreateMetrics(_ context.Context, _ pipeline.CreateSettings, _ pipeline.Config, _ consumer.Metrics) (pipeline.Receiver, error) { - return nil, pipeline.ErrSignalNotSupported +// NewFactory returns the upstream receiver.Factory for kernelevents. +// Mirrors the upstream-contrib pattern (otlpreceiver, filelogreceiver) +// and the PR-B2 sibling (nccl_fr) — callers construct via +// `kernelevents.NewFactory()` rather than a package var, so each +// OCB-stitched pipeline gets a freshly-built factory and the package +// surface stays a single exported symbol. +// +// Only the logs signal returns a real Receiver; metrics + traces +// surface upstream's "signal not supported" via receiver.NewFactory's +// default unimplemented behavior. +func NewFactory() receiver.Factory { + return receiver.NewFactory( + componentType(), + createDefaultConfig, + receiver.WithLogs(createLogs, stability), + ) } -func (*factory) CreateTraces(_ context.Context, _ pipeline.CreateSettings, _ pipeline.Config, _ consumer.Traces) (pipeline.Receiver, error) { - return nil, pipeline.ErrSignalNotSupported -} +// createDefaultConfig matches upstream component.CreateDefaultConfigFunc. +func createDefaultConfig() component.Config { return defaultConfig() } -func (*factory) CreateLogs(ctx context.Context, set pipeline.CreateSettings, cfg pipeline.Config, next consumer.Logs) (pipeline.Receiver, error) { +// createLogs is the receiver.CreateLogsFunc wired by WithLogs. +func createLogs(ctx context.Context, set receiver.Settings, cfg component.Config, next consumer.Logs) (receiver.Logs, error) { c, ok := cfg.(*Config) if !ok { return nil, fmt.Errorf("kernelevents: unexpected config type %T", cfg) @@ -52,18 +56,18 @@ func (*factory) CreateLogs(ctx context.Context, set pipeline.CreateSettings, cfg // safe and an init-error metric tracks the fallback so operators // see when they're running silent. r := newReceiver(set, c, next) - if set.Telemetry.MeterProvider != nil { - if rt, err := newSelfTelemetry(set.ID, set.Telemetry.MeterProvider); err == nil { + if set.MeterProvider != nil { + if rt, err := newSelfTelemetry(set.ID, set.MeterProvider); err == nil { r.telemetry = rt } else { - recordInitError(ctx, set.Telemetry.MeterProvider, + recordInitError(ctx, set.MeterProvider, "receiver", set.ID.String(), reasonInstrumentRegister) - if set.Telemetry.Logger != nil { - set.Telemetry.Logger.Warn("kernelevents self-telemetry init failed; using noop", "err", err) + if set.Logger != nil { + set.Logger.Warn("kernelevents self-telemetry init failed; using noop", zap.Error(err)) } } - } else if set.Telemetry.Logger != nil { - set.Telemetry.Logger.Warn("kernelevents: no MeterProvider; self-telemetry using noop") + } else if set.Logger != nil { + set.Logger.Warn("kernelevents: no MeterProvider; self-telemetry using noop") } return r, nil } diff --git a/components/receivers/kernelevents/factory_test.go b/components/receivers/kernelevents/factory_test.go index debc01be..2e6e022d 100644 --- a/components/receivers/kernelevents/factory_test.go +++ b/components/receivers/kernelevents/factory_test.go @@ -6,59 +6,64 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/receiver/receivertest" "github.com/tracecoreai/tracecore/components/receivers/kernelevents" - "github.com/tracecoreai/tracecore/internal/pipeline" - "github.com/tracecoreai/tracecore/internal/pipeline/pipelinetest" ) func TestFactory_Type(t *testing.T) { t.Parallel() - require.Equal(t, "kernelevents", kernelevents.Factory.Type().String()) + require.Equal(t, "kernelevents", kernelevents.NewFactory().Type().String()) } func TestFactory_CreateDefaultConfig(t *testing.T) { t.Parallel() - cfg := kernelevents.Factory.CreateDefaultConfig() - require.NoError(t, cfg.Validate(), "default config must validate") + cfg := kernelevents.NewFactory().CreateDefaultConfig() + c, ok := cfg.(interface{ Validate() error }) + require.True(t, ok, "default config must satisfy Validate() error") + require.NoError(t, c.Validate(), "default config must validate") } -func TestFactory_CreateMetrics_Unsupported(t *testing.T) { +// TestFactory_LogsStability pins the OCB-surfaced stability level for +// kernelevents' logs signal: beta. A drift to alpha/stable here would +// flip operator expectations. +func TestFactory_LogsStability(t *testing.T) { t.Parallel() - fx := pipelinetest.New(t) - rcv, err := kernelevents.Factory.CreateMetrics(t.Context(), fx.CreateSettings, kernelevents.Factory.CreateDefaultConfig(), nil) - require.Nil(t, rcv) - require.ErrorIs(t, err, pipeline.ErrSignalNotSupported) + f := kernelevents.NewFactory() + require.Equal(t, component.StabilityLevelBeta, f.LogsStability()) } -func TestFactory_CreateTraces_Unsupported(t *testing.T) { +// TestFactory_MetricsAndTracesUnsupported pins that the receiver only +// returns a real Receiver for logs. The upstream NewFactory machinery +// surfaces pipeline.ErrSignalNotSupported for unconfigured signals. +func TestFactory_MetricsAndTracesUnsupported(t *testing.T) { t.Parallel() - fx := pipelinetest.New(t) - rcv, err := kernelevents.Factory.CreateTraces(t.Context(), fx.CreateSettings, kernelevents.Factory.CreateDefaultConfig(), nil) - require.Nil(t, rcv) - require.ErrorIs(t, err, pipeline.ErrSignalNotSupported) + f := kernelevents.NewFactory() + set := receivertest.NewNopSettings(f.Type()) + cfg := f.CreateDefaultConfig() + mrcv, mErr := f.CreateMetrics(t.Context(), set, cfg, nil) + require.Nil(t, mrcv) + require.ErrorIs(t, mErr, pipeline.ErrSignalNotSupported) + trcv, tErr := f.CreateTraces(t.Context(), set, cfg, nil) + require.Nil(t, trcv) + require.ErrorIs(t, tErr, pipeline.ErrSignalNotSupported) } // TestFactory_CreateLogs_RejectsWrongConfigType pins that the // type-assertion guard returns a typed error, not a panic. func TestFactory_CreateLogs_RejectsWrongConfigType(t *testing.T) { t.Parallel() - fx := pipelinetest.New(t) - rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, &wrongConfig{}, nil) + f := kernelevents.NewFactory() + set := receivertest.NewNopSettings(f.Type()) + rcv, err := f.CreateLogs(t.Context(), set, &wrongConfig{}, nil) require.Nil(t, rcv) require.Error(t, err) require.ErrorContains(t, err, "kernelevents: unexpected config type") } -// TestNewFactory_ReturnsPackageVarFactory pins that NewFactory and -// the package-var Factory are the same value. tools/components-gen -// uses NewFactory(). -func TestNewFactory_ReturnsPackageVarFactory(t *testing.T) { - t.Parallel() - require.Same(t, kernelevents.Factory, kernelevents.NewFactory()) -} - -// wrongConfig is a pipeline.Config that is NOT *kernelevents.Config — +// wrongConfig is a component.Config that is NOT *kernelevents.Config — // used to exercise the factory's type-assertion guard. type wrongConfig struct{} diff --git a/components/receivers/kernelevents/filters_test.go b/components/receivers/kernelevents/filters_test.go index 49552481..a6ed4e0c 100644 --- a/components/receivers/kernelevents/filters_test.go +++ b/components/receivers/kernelevents/filters_test.go @@ -9,9 +9,9 @@ import ( "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" "github.com/tracecoreai/tracecore/components/receivers/kernelevents" - "github.com/tracecoreai/tracecore/internal/pipeline/pipelinetest" ) // TestFilter_IncludeFacility pins facility-include filtering. Only @@ -23,18 +23,17 @@ func TestFilter_IncludeFacility(t *testing.T) { // [user] (facility 1) must drop all of them. fixture := "4,1,1,-;some kern message\n4,2,2,-;another kern message\n" sink := newLogsSink() - cfg, _ := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config) + cfg, _ := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config) cfg.Journald.Enabled = false cfg.IncludeFacilities = []string{"user"} - fx := pipelinetest.New(t) - rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, sink) + rcv, err := kernelevents.NewFactory().CreateLogs(t.Context(), testSettings(t), cfg, sink) require.NoError(t, err) kernelevents.SetKmsgOpenForTest(rcv, func() (io.ReadCloser, error) { return io.NopCloser(strings.NewReader(fixture)), nil }) - require.NoError(t, rcv.Start(t.Context(), fx.Host)) + require.NoError(t, rcv.Start(t.Context(), componenttest.NewNopHost())) t.Cleanup(func() { _ = rcv.Shutdown(t.Context()) }) time.Sleep(200 * time.Millisecond) @@ -48,18 +47,17 @@ func TestFilter_RegexFilter(t *testing.T) { fixture := "4,1,1,-;NVRM: Xid (PCI:0000:65:00.0): 79\n4,2,2,-;mlx5 link up\n4,3,3,-;not matching\n" sink := newLogsSink() - cfg, _ := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config) + cfg, _ := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config) cfg.Journald.Enabled = false cfg.RegexFilter = "NVRM|mlx5" require.NoError(t, cfg.Validate()) - fx := pipelinetest.New(t) - rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, sink) + rcv, err := kernelevents.NewFactory().CreateLogs(t.Context(), testSettings(t), cfg, sink) require.NoError(t, err) kernelevents.SetKmsgOpenForTest(rcv, func() (io.ReadCloser, error) { return io.NopCloser(strings.NewReader(fixture)), nil }) - require.NoError(t, rcv.Start(t.Context(), fx.Host)) + require.NoError(t, rcv.Start(t.Context(), componenttest.NewNopHost())) t.Cleanup(func() { _ = rcv.Shutdown(t.Context()) }) require.Eventually(t, func() bool { return sink.count.Load() == 2 }, @@ -75,7 +73,7 @@ func TestFilter_RegexFilter(t *testing.T) { func TestRegex_NoBacktrackingDoS(t *testing.T) { t.Parallel() - cfg, _ := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config) + cfg, _ := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config) cfg.RegexFilter = "^(a+)+$" require.NoError(t, cfg.Validate(), "RE2 catastrophic-pattern compiles fine; the runtime is bounded") } diff --git a/components/receivers/kernelevents/integration_test.go b/components/receivers/kernelevents/integration_test.go index 5c6ad750..c28bc43c 100644 --- a/components/receivers/kernelevents/integration_test.go +++ b/components/receivers/kernelevents/integration_test.go @@ -12,9 +12,9 @@ import ( "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" "github.com/tracecoreai/tracecore/components/receivers/kernelevents" - "github.com/tracecoreai/tracecore/internal/pipeline/pipelinetest" ) // TestIntegration_KmsgRealHost opens /dev/kmsg on the host and @@ -30,14 +30,13 @@ func TestIntegration_KmsgRealHost(t *testing.T) { } sink := newLogsSink() - cfg, _ := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config) + cfg, _ := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config) cfg.Journald.Enabled = false // kmsg-only for this test - fx := pipelinetest.New(t) - rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, sink) + rcv, err := kernelevents.NewFactory().CreateLogs(t.Context(), testSettings(t), cfg, sink) require.NoError(t, err) - require.NoError(t, rcv.Start(t.Context(), fx.Host)) + require.NoError(t, rcv.Start(t.Context(), componenttest.NewNopHost())) t.Cleanup(func() { _ = rcv.Shutdown(t.Context()) }) // We don't assert a specific event — the kernel ring buffer @@ -70,7 +69,7 @@ func TestIntegration_KmsgWriteReadBehavioral(t *testing.T) { _ = probe.Close() sink := newLogsSink() - cfg, _ := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config) + cfg, _ := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config) cfg.Journald.Enabled = false cfg.MinSeverity = "debug" // accept any priority // Pin the body via regex so we don't accidentally fire on @@ -79,10 +78,9 @@ func TestIntegration_KmsgWriteReadBehavioral(t *testing.T) { cfg.RegexFilter = "TRACECORE-E2E-XID-79-MARKER" require.NoError(t, cfg.Validate()) - fx := pipelinetest.New(t) - rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, sink) + rcv, err := kernelevents.NewFactory().CreateLogs(t.Context(), testSettings(t), cfg, sink) require.NoError(t, err) - require.NoError(t, rcv.Start(t.Context(), fx.Host)) + require.NoError(t, rcv.Start(t.Context(), componenttest.NewNopHost())) t.Cleanup(func() { _ = rcv.Shutdown(t.Context()) }) // Give the receiver a moment to seek to end before we inject. @@ -136,14 +134,13 @@ func TestIntegration_JournaldRealHost(t *testing.T) { } sink := newLogsSink() - cfg, _ := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config) + cfg, _ := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config) cfg.Kmsg.Enabled = false - fx := pipelinetest.New(t) - rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, sink) + rcv, err := kernelevents.NewFactory().CreateLogs(t.Context(), testSettings(t), cfg, sink) require.NoError(t, err) - require.NoError(t, rcv.Start(t.Context(), fx.Host)) + require.NoError(t, rcv.Start(t.Context(), componenttest.NewNopHost())) shutdownStart := time.Now() require.NoError(t, rcv.Shutdown(t.Context())) require.Less(t, time.Since(shutdownStart), 2*time.Second, diff --git a/components/receivers/kernelevents/journald.go b/components/receivers/kernelevents/journald.go index ddbfcb3f..c7c9ed2c 100644 --- a/components/receivers/kernelevents/journald.go +++ b/components/receivers/kernelevents/journald.go @@ -8,7 +8,6 @@ import ( "errors" "fmt" "io" - "log/slog" "os" "os/exec" "regexp" @@ -16,6 +15,8 @@ import ( "strconv" "strings" "time" + + "go.uber.org/zap" ) // backoffCeiling is the hardcoded upper bound for the journalctl @@ -53,9 +54,9 @@ type journaldSource struct { lc *lifecycle } -func newJournaldSource(cfg *Config, logger *slog.Logger, out chan<- parsedRecord, telemetry selfTelemetry) *journaldSource { +func newJournaldSource(cfg *Config, logger *zap.Logger, out chan<- parsedRecord, telemetry selfTelemetry) *journaldSource { if logger == nil { - logger = slog.Default() + logger = zap.NewNop() } if telemetry == nil { telemetry = newNoopSelfTelemetry() @@ -134,7 +135,7 @@ func (s *journaldSource) runRestartLoop(ctx context.Context, binary string) { // markHealthy on the first successful record, so the // receiver re-arms naturally once journalctl recovers. s.markDegraded(fmt.Sprintf("journalctl subprocess crashed: %v", err)) - s.logger.Warn("journald subprocess ended unexpectedly", "err", err.Error(), "attempt", attempt+1) + s.logger.Warn("journald subprocess ended unexpectedly", zap.String("err", err.Error()), zap.Int("attempt", attempt+1)) if attempt >= maxRetries { if !s.slowRecoverySleep(ctx, maxRetries, window) { return @@ -237,7 +238,7 @@ func (s *journaldSource) drainStderr(r io.Reader) { if rec := recover(); rec != nil { s.telemetry.IncError(kindPanic) s.logger.Error("journald drainStderr panic recovered", - "panic", fmt.Sprintf("%v", rec)) + zap.String("panic", fmt.Sprintf("%v", rec))) } }() scanner := bufio.NewScanner(r) @@ -247,7 +248,7 @@ func (s *journaldSource) drainStderr(r io.Reader) { if line == "" { continue } - s.warnOnce("journalctl-stderr", "journalctl stderr (logged once; continues silently)", "line", line) + s.warnOnce("journalctl-stderr", "journalctl stderr (logged once; continues silently)", zap.String("line", line)) } } @@ -262,7 +263,7 @@ func (s *journaldSource) streamLines(ctx context.Context, r io.Reader) error { rec, err := parseJournaldJSON(line) if err != nil { s.telemetry.IncError(kindParse) - s.warnOnce("journald-parse", "journald parse error (logged once; counter continues)", "err", err.Error()) + s.warnOnce("journald-parse", "journald parse error (logged once; counter continues)", zap.String("err", err.Error())) continue } s.markHealthy() diff --git a/components/receivers/kernelevents/journald_test.go b/components/receivers/kernelevents/journald_test.go index b3604e58..df7cec6d 100644 --- a/components/receivers/kernelevents/journald_test.go +++ b/components/receivers/kernelevents/journald_test.go @@ -4,13 +4,13 @@ package kernelevents import ( "errors" - "log/slog" "os" "path/filepath" "testing" "time" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) // TestJournaldSource_StreamsMockOutput pins the happy path: the @@ -26,7 +26,7 @@ func TestJournaldSource_StreamsMockOutput(t *testing.T) { out := make(chan parsedRecord, 4) cfg := &Config{Journald: JournaldConfig{Enabled: true, StartAt: "end"}, JournaldRestart: JournaldRestartConfig{MaxRetries: 0, Window: time.Second, Backoff: time.Millisecond}} - src := newJournaldSource(cfg, slog.Default(), out, nil) + src := newJournaldSource(cfg, zap.NewNop(), out, nil) src.findBinary = func() (string, error) { return mock, nil } require.NoError(t, src.Start(t.Context())) @@ -49,7 +49,7 @@ func TestJournaldSource_DegradedWhenBinaryMissing(t *testing.T) { out := make(chan parsedRecord, 1) cfg := &Config{Journald: JournaldConfig{Enabled: true, StartAt: "end"}, JournaldRestart: JournaldRestartConfig{Window: time.Second, Backoff: time.Millisecond}} - src := newJournaldSource(cfg, slog.Default(), out, nil) + src := newJournaldSource(cfg, zap.NewNop(), out, nil) src.findBinary = func() (string, error) { return "", errors.New("journalctl: not found in PATH") } require.NoError(t, src.Start(t.Context())) @@ -75,7 +75,7 @@ func TestJournaldSource_RestartOnCrash(t *testing.T) { Backoff: 10 * time.Millisecond, }, } - src := newJournaldSource(cfg, slog.Default(), out, nil) + src := newJournaldSource(cfg, zap.NewNop(), out, nil) src.findBinary = func() (string, error) { return mock, nil } require.NoError(t, src.Start(t.Context())) @@ -129,7 +129,7 @@ func TestRunRestartLoop_DegradedOnFirstCrash(t *testing.T) { Backoff: 300 * time.Millisecond, }, } - src := newJournaldSource(cfg, slog.Default(), out, nil) + src := newJournaldSource(cfg, zap.NewNop(), out, nil) src.findBinary = func() (string, error) { return mock, nil } require.NoError(t, src.Start(t.Context())) @@ -148,7 +148,7 @@ func TestJournaldSource_ShutdownIsIdempotent(t *testing.T) { out := make(chan parsedRecord, 1) cfg := &Config{Journald: JournaldConfig{Enabled: true, StartAt: "end"}, JournaldRestart: JournaldRestartConfig{Window: time.Second, Backoff: time.Millisecond}} - src := newJournaldSource(cfg, slog.Default(), out, nil) + src := newJournaldSource(cfg, zap.NewNop(), out, nil) src.findBinary = func() (string, error) { return mock, nil } require.NoError(t, src.Start(t.Context())) @@ -172,7 +172,7 @@ func TestJournaldSource_VersionProbeRejectsOldSystemd(t *testing.T) { Journald: JournaldConfig{Enabled: true, StartAt: "end"}, JournaldRestart: JournaldRestartConfig{MaxRetries: 0, Window: time.Second, Backoff: time.Millisecond}, } - src := newJournaldSource(cfg, slog.Default(), out, nil) + src := newJournaldSource(cfg, zap.NewNop(), out, nil) src.findBinary = func() (string, error) { return stub, nil } require.NoError(t, src.Start(t.Context())) diff --git a/components/receivers/kernelevents/kernelevents.go b/components/receivers/kernelevents/kernelevents.go index 027f11c0..80817e48 100644 --- a/components/receivers/kernelevents/kernelevents.go +++ b/components/receivers/kernelevents/kernelevents.go @@ -6,29 +6,29 @@ import ( "context" "fmt" "io" - "log/slog" "regexp" "sync/atomic" "time" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" - - "github.com/tracecoreai/tracecore/internal/consumer" - "github.com/tracecoreai/tracecore/internal/pipeline" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" ) // kernelEventsReceiver wires N sources (kmsg + journald + future -// streaming inputs) into one consumer.Logs pipeline. Embeds -// pipeline.ComponentState for Started()/Stopped() bookkeeping. +// streaming inputs) into one consumer.Logs pipeline. Implements +// upstream receiver.Logs directly (alias for component.Component); the +// lifecycle bookkeeping is delegated to the sibling `lifecycle` helper, +// so this struct only carries fan-in / emit state. // -// The lifecycle plumbing (cancel + wg + panic-recovery) lives in -// the package-local `lifecycle` sibling type (see lifecycle.go). -// Adding a third source only requires implementing the `source` -// interface — no changes to Start / Shutdown / run below. +// The lifecycle plumbing (cancel + wg + panic-recovery) lives in the +// package-local `lifecycle` sibling type (see lifecycle.go). Adding a +// third source only requires implementing the `source` interface — no +// changes to Start / Shutdown / run below. type kernelEventsReceiver struct { - pipeline.ComponentState - - set pipeline.CreateSettings + set receiver.Settings cfg *Config next consumer.Logs telemetry selfTelemetry @@ -55,6 +55,12 @@ type kernelEventsReceiver struct { parseErrors atomic.Int64 } +// Compile-time assertion: kernelEventsReceiver satisfies the upstream +// receiver.Logs interface (component.Component embedded in receiver.Logs). +// A breaking shape change in upstream surfaces here at build time +// rather than at runtime when OCB stitches the pipeline. +var _ receiver.Logs = (*kernelEventsReceiver)(nil) + // receiverOption mutates the receiver during newReceiver — used to // inject the selftelemetry implementation. M2 supplies the real impl // via TelemetrySettings extension; until then the factory stays on @@ -74,7 +80,7 @@ func withSelfTelemetry(t selfTelemetry) receiverOption { // newReceiver constructs a kernelevents receiver. Telemetry defaults // to a no-op so every hot-path call is safe even before M2 lands the // real implementation. -func newReceiver(set pipeline.CreateSettings, cfg *Config, next consumer.Logs, opts ...receiverOption) *kernelEventsReceiver { +func newReceiver(set receiver.Settings, cfg *Config, next consumer.Logs, opts ...receiverOption) *kernelEventsReceiver { r := &kernelEventsReceiver{ set: set, cfg: cfg, @@ -87,22 +93,18 @@ func newReceiver(set pipeline.CreateSettings, cfg *Config, next consumer.Logs, o return r } -// logger returns the receiver's structured logger, falling back to -// the default slog when set.Telemetry is unset. -func (r *kernelEventsReceiver) logger() *slog.Logger { - if r.set.Telemetry.Logger != nil { - return r.set.Telemetry.Logger +// logger returns the receiver's structured logger, falling back to a +// no-op zap logger when set.Logger is unset. +func (r *kernelEventsReceiver) logger() *zap.Logger { + if r.set.Logger != nil { + return r.set.Logger } - return slog.Default() + return zap.NewNop() } // Start constructs the configured sources, starts each one under the // receiver's lifecycle, then spawns the run loop. -func (r *kernelEventsReceiver) Start(ctx context.Context, host pipeline.Host) error { - if err := r.ComponentState.Start(ctx, host); err != nil { - return err - } - +func (r *kernelEventsReceiver) Start(ctx context.Context, _ component.Host) error { // compiledRegex was set in Config.Validate; a nil regex_filter // keeps r.regexFilter nil, and dropByFilter short-circuits on // the nil check before any method call. @@ -148,7 +150,7 @@ func (r *kernelEventsReceiver) Start(ctx context.Context, host pipeline.Host) er s := src r.lc.Add(func(internalCtx context.Context) { //nolint:contextcheck // internalCtx is the lifecycle-supplied parent for the source. if err := s.Start(internalCtx); err != nil { - r.logger().Warn("kernelevents source start failed", "source", s.Name(), "err", err.Error()) + r.logger().Warn("kernelevents source start failed", zap.String("source", s.Name()), zap.String("err", err.Error())) return } <-internalCtx.Done() @@ -157,28 +159,35 @@ func (r *kernelEventsReceiver) Start(ctx context.Context, host pipeline.Host) er shutdownCtx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() if err := s.Shutdown(shutdownCtx); err != nil { //nolint:contextcheck // fresh budget intentional. - r.logger().Warn("kernelevents source shutdown error", "source", s.Name(), "err", err.Error()) + r.logger().Warn("kernelevents source shutdown error", zap.String("source", s.Name()), zap.String("err", err.Error())) } }) } r.logger().Info("kernelevents started", - "sources", r.sourceNames(), - "min_severity", r.cfg.MinSeverity) + zap.Strings("sources", r.sourceNames()), + zap.String("min_severity", r.cfg.MinSeverity)) return nil } // Shutdown stops the lifecycle (which cancels both the run loop and // every source via the cascade). Logs a one-line graceful-shutdown -// summary so events don't silently vanish. +// summary so events don't silently vanish. Surfaces the caller's +// ctx-deadline error from the lifecycle so an operator-set +// shutdown_timeout isn't silently swallowed — same discipline as the +// PR-B2 nccl_fr sibling. func (r *kernelEventsReceiver) Shutdown(ctx context.Context) error { + var lcErr error if r.lc != nil { - _ = r.lc.Shutdown(ctx) + lcErr = r.lc.Shutdown(ctx) } r.logger().Info("kernelevents stopped", - "emitted", r.emittedCount.Load(), - "parse_errors", r.parseErrors.Load()) - return r.ComponentState.Shutdown(ctx) + zap.Int64("emitted", r.emittedCount.Load()), + zap.Int64("parse_errors", r.parseErrors.Load())) + if lcErr != nil { + return fmt.Errorf("kernelevents lifecycle shutdown: %w", lcErr) + } + return nil } // run is the receiver's hot loop. Reads from r.events (fanned by @@ -259,7 +268,7 @@ func (r *kernelEventsReceiver) emit(ctx context.Context, rec parsedRecord, maxAt ld := plog.NewLogs() rl := ld.ResourceLogs().AppendEmpty() - r.set.Telemetry.Resource.CopyTo(rl.Resource()) + r.set.Resource.CopyTo(rl.Resource()) // SchemaURL identifies the kernelevents-receiver-owned attribute // vocabulary. Downstream consumers dispatch transforms on this // URL; bumping the version is the deprecation-cycle hook for diff --git a/components/receivers/kernelevents/kernelevents_test.go b/components/receivers/kernelevents/kernelevents_test.go index 1516bb54..ee1761f9 100644 --- a/components/receivers/kernelevents/kernelevents_test.go +++ b/components/receivers/kernelevents/kernelevents_test.go @@ -15,11 +15,14 @@ import ( "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receivertest" "github.com/tracecoreai/tracecore/components/receivers/kernelevents" - "github.com/tracecoreai/tracecore/internal/consumer" - "github.com/tracecoreai/tracecore/internal/pipeline/pipelinetest" ) // severityDebug is an in-test constant — goconst would otherwise @@ -34,13 +37,13 @@ func TestReceiver_EmitsKmsgRecordsToNextConsumer(t *testing.T) { t.Parallel() sink := newLogsSink() - cfg, ok := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config) + f := kernelevents.NewFactory() + cfg, ok := f.CreateDefaultConfig().(*kernelevents.Config) require.True(t, ok) cfg.Journald.Enabled = false // kmsg only for this test cfg.MinSeverity = severityDebug - fx := pipelinetest.New(t) - rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, sink) + rcv, err := f.CreateLogs(t.Context(), testSettings(t), cfg, sink) require.NoError(t, err) kernelevents.SetKmsgOpenForTest(rcv, func() (io.ReadCloser, error) { @@ -49,7 +52,7 @@ func TestReceiver_EmitsKmsgRecordsToNextConsumer(t *testing.T) { return f, ferr }) - require.NoError(t, rcv.Start(t.Context(), fx.Host)) + require.NoError(t, rcv.Start(t.Context(), componenttest.NewNopHost())) t.Cleanup(func() { _ = rcv.Shutdown(t.Context()) }) require.Eventually(t, func() bool { @@ -69,19 +72,19 @@ func TestReceiver_FilterDropsBelowMinSeverity(t *testing.T) { fixture := "6,1,1,-;info\n3,2,2,-;err\n7,3,3,-;debug\n" sink := newLogsSink() - cfg, ok := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config) + f := kernelevents.NewFactory() + cfg, ok := f.CreateDefaultConfig().(*kernelevents.Config) require.True(t, ok) cfg.Journald.Enabled = false cfg.MinSeverity = "warning" - fx := pipelinetest.New(t) - rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, sink) + rcv, err := f.CreateLogs(t.Context(), testSettings(t), cfg, sink) require.NoError(t, err) kernelevents.SetKmsgOpenForTest(rcv, func() (io.ReadCloser, error) { return io.NopCloser(strings.NewReader(fixture)), nil }) - require.NoError(t, rcv.Start(t.Context(), fx.Host)) + require.NoError(t, rcv.Start(t.Context(), componenttest.NewNopHost())) t.Cleanup(func() { _ = rcv.Shutdown(t.Context()) }) // Only the err-level record should arrive; debug + info filtered. @@ -98,24 +101,37 @@ func TestReceiver_ShutdownIsIdempotent(t *testing.T) { t.Parallel() sink := newLogsSink() - cfg, ok := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config) + f := kernelevents.NewFactory() + cfg, ok := f.CreateDefaultConfig().(*kernelevents.Config) require.True(t, ok) cfg.Journald.Enabled = false - fx := pipelinetest.New(t) - rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, sink) + rcv, err := f.CreateLogs(t.Context(), testSettings(t), cfg, sink) require.NoError(t, err) kernelevents.SetKmsgOpenForTest(rcv, func() (io.ReadCloser, error) { return io.NopCloser(strings.NewReader("")), nil }) - require.NoError(t, rcv.Start(t.Context(), fx.Host)) + require.NoError(t, rcv.Start(t.Context(), componenttest.NewNopHost())) require.NoError(t, rcv.Shutdown(t.Context())) require.NoError(t, rcv.Shutdown(t.Context())) } // --- test helpers --- +// testSettings returns receiver.Settings sourced from receivertest's +// upstream nop helper (so BuildInfo + TelemetrySettings track upstream +// without manual updates), with the ID overridden to a stable +// "kernelevents/" so selftel assertions assert +// against a predictable label and tests don't get random UUIDs. +func testSettings(t *testing.T) receiver.Settings { + t.Helper() + s := receivertest.NewNopSettings(component.MustNewType("kernelevents")) + s.ID = component.NewIDWithName(component.MustNewType("kernelevents"), + strings.ReplaceAll(t.Name(), "/", "_")) + return s +} + type logsSink struct { mu sync.Mutex pushed []plog.Logs diff --git a/components/receivers/kernelevents/kmsg.go b/components/receivers/kernelevents/kmsg.go index 4d84ece4..3fce4529 100644 --- a/components/receivers/kernelevents/kmsg.go +++ b/components/receivers/kernelevents/kmsg.go @@ -8,9 +8,10 @@ import ( "errors" "fmt" "io" - "log/slog" "strconv" "strings" + + "go.uber.org/zap" ) // errKmsgNotLinux is the platform-stub sentinel surfaced by @@ -39,9 +40,9 @@ type kmsgSource struct { // separately so non-Linux platforms can ship a stub and tests can // inject a fixture-fed reader. telemetry must be non-nil; pass // newNoopSelfTelemetry() when no impl is provided. -func newKmsgSource(cfg *Config, logger *slog.Logger, out chan<- parsedRecord, telemetry selfTelemetry) *kmsgSource { +func newKmsgSource(cfg *Config, logger *zap.Logger, out chan<- parsedRecord, telemetry selfTelemetry) *kmsgSource { if logger == nil { - logger = slog.Default() + logger = zap.NewNop() } if telemetry == nil { telemetry = newNoopSelfTelemetry() @@ -114,7 +115,7 @@ func (s *kmsgSource) run(ctx context.Context) { if rec := recover(); rec != nil { s.telemetry.IncError(kindPanic) s.logger.Error("kmsg ctx-watcher panic recovered", - "panic", fmt.Sprintf("%v", rec)) + zap.String("panic", fmt.Sprintf("%v", rec))) } }() <-ctx.Done() @@ -136,7 +137,7 @@ func (s *kmsgSource) run(ctx context.Context) { rec, err := parseKmsgRecord(text) if err != nil { s.telemetry.IncError(kindParse) - s.warnOnce("kmsg-parse", "kmsg parse error (logged once; counter continues)", "err", err.Error()) + s.warnOnce("kmsg-parse", "kmsg parse error (logged once; counter continues)", zap.String("err", err.Error())) continue } s.markHealthy() diff --git a/components/receivers/kernelevents/kmsg_test.go b/components/receivers/kernelevents/kmsg_test.go index 262f3a67..84a5f674 100644 --- a/components/receivers/kernelevents/kmsg_test.go +++ b/components/receivers/kernelevents/kmsg_test.go @@ -6,7 +6,6 @@ import ( "context" "errors" "io" - "log/slog" "os" "strings" "sync/atomic" @@ -14,6 +13,7 @@ import ( "time" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) // TestKmsgSource_StreamsFixtureRecords feeds the captured kmsg-xid-79 @@ -27,7 +27,7 @@ func TestKmsgSource_StreamsFixtureRecords(t *testing.T) { require.NoError(t, err) out := make(chan parsedRecord, 8) - src := newKmsgSource(&Config{Kmsg: KmsgConfig{Enabled: true}}, slog.Default(), out, nil) + src := newKmsgSource(&Config{Kmsg: KmsgConfig{Enabled: true}}, zap.NewNop(), out, nil) src.open = func() (io.ReadCloser, error) { return io.NopCloser(strings.NewReader(string(raw))), nil } @@ -69,7 +69,7 @@ func TestKmsgSource_FastPathSeverityPrefilter(t *testing.T) { Kmsg: KmsgConfig{Enabled: true}, MinSeverity: "warning", } - src := newKmsgSource(cfg, slog.Default(), out, nil) + src := newKmsgSource(cfg, zap.NewNop(), out, nil) src.open = func() (io.ReadCloser, error) { return io.NopCloser(strings.NewReader(fixture)), nil } @@ -99,7 +99,7 @@ func TestKmsgSource_DegradedOnOpenError(t *testing.T) { t.Parallel() out := make(chan parsedRecord, 1) - src := newKmsgSource(&Config{Kmsg: KmsgConfig{Enabled: true}}, slog.Default(), out, nil) + src := newKmsgSource(&Config{Kmsg: KmsgConfig{Enabled: true}}, zap.NewNop(), out, nil) src.open = func() (io.ReadCloser, error) { return nil, errors.New("open /dev/kmsg: permission denied") } @@ -115,7 +115,7 @@ func TestKmsgSource_ShutdownIsIdempotent(t *testing.T) { t.Parallel() out := make(chan parsedRecord, 1) - src := newKmsgSource(&Config{Kmsg: KmsgConfig{Enabled: true}}, slog.Default(), out, nil) + src := newKmsgSource(&Config{Kmsg: KmsgConfig{Enabled: true}}, zap.NewNop(), out, nil) src.open = func() (io.ReadCloser, error) { // Block on a never-closed reader so the goroutine is alive. return &blockingReader{}, nil @@ -131,7 +131,7 @@ func TestKmsgSource_RestartGuard(t *testing.T) { t.Parallel() out := make(chan parsedRecord, 1) - src := newKmsgSource(&Config{Kmsg: KmsgConfig{Enabled: true}}, slog.Default(), out, nil) + src := newKmsgSource(&Config{Kmsg: KmsgConfig{Enabled: true}}, zap.NewNop(), out, nil) src.open = func() (io.ReadCloser, error) { return &blockingReader{}, nil } diff --git a/components/receivers/kernelevents/lifecycle.go b/components/receivers/kernelevents/lifecycle.go index 77bc7dd1..ecc92f40 100644 --- a/components/receivers/kernelevents/lifecycle.go +++ b/components/receivers/kernelevents/lifecycle.go @@ -1,13 +1,11 @@ // SPDX-License-Identifier: Apache-2.0 -// Receiver-scoped streaming-source lifecycle helper. Replaces the -// v0.1.x dependency on `internal/runtime/lifecycle`, which is slated -// for deletion in RFC-0013 PR-F. Owns cancel + WaitGroup + -// panic-recovery bookkeeping kernelevents' fan-in + per-source -// goroutines need, so each source author writes a body function, not -// the plumbing. +// Receiver-scoped streaming-source lifecycle helper. Owns cancel + +// WaitGroup + panic-recovery bookkeeping kernelevents' fan-in + per- +// source goroutines need, so each source author writes a body +// function, not the plumbing. // -// Multi-source: unlike PR-B1's nccl_fr sibling (single-source, no +// Multi-source: unlike the PR-B2 sibling (nccl_fr, single-source, no // Add), kernelevents keeps Add() — the receiver registers ONE goroutine // per source under the same WaitGroup so Shutdown waits for every // source to finish. @@ -23,10 +21,11 @@ import ( "context" "errors" "fmt" - "log/slog" "runtime" "sync" "sync/atomic" + + "go.uber.org/zap" ) // errLifecycleAlreadyStarted is returned by lifecycle.Start when called @@ -47,7 +46,7 @@ type panicCallback func(panicValue any) // caller-ctx deadline) is stashed + returned by every subsequent // Shutdown so deadline failures aren't silently swallowed. type lifecycle struct { - logger *slog.Logger + logger *zap.Logger onPanic panicCallback mu sync.Mutex @@ -60,11 +59,11 @@ type lifecycle struct { } // newLifecycle constructs a lifecycle. logger may be nil (replaced with -// slog.Default for tests). onPanic may be nil — panics are still +// zap.NewNop for tests). onPanic may be nil — panics are still // recovered + logged at ERROR but no callback fires. -func newLifecycle(logger *slog.Logger, onPanic panicCallback) *lifecycle { +func newLifecycle(logger *zap.Logger, onPanic panicCallback) *lifecycle { if logger == nil { - logger = slog.Default() + logger = zap.NewNop() } return &lifecycle{logger: logger, onPanic: onPanic} } @@ -86,8 +85,7 @@ func (l *lifecycle) Start(parent context.Context, run func(context.Context)) err // so a concurrent Shutdown sees the post-Add state. Without this // lock, Shutdown could observe cancel != nil, call wg.Wait at // counter=0, return immediately, and either trigger - // `sync: WaitGroup misuse` OR orphan the goroutine. (Mirrors the - // internal helper's fix.) + // `sync: WaitGroup misuse` OR orphan the goroutine. l.wg.Add(1) l.mu.Unlock() go l.safeRun(internalCtx, run) @@ -99,7 +97,7 @@ func (l *lifecycle) safeRun(ctx context.Context, run func(context.Context)) { defer l.wg.Done() defer func() { if rec := recover(); rec != nil { - l.logger.Error("kernelevents lifecycle: run panic recovered", "panic", fmt.Sprintf("%v", rec)) + l.logger.Error("kernelevents lifecycle: run panic recovered", zap.String("panic", fmt.Sprintf("%v", rec))) if l.onPanic != nil { l.onPanic(rec) } @@ -142,7 +140,7 @@ func (l *lifecycle) Shutdown(ctx context.Context) error { // it here lets operators eyeball whether the leak is plausibly // ours. l.logger.Warn("kernelevents lifecycle: shutdown deadline elapsed before goroutine exited", - "process_goroutines", runtime.NumGoroutine()) + zap.Int("process_goroutines", runtime.NumGoroutine())) err := fmt.Errorf("kernelevents lifecycle shutdown: %w", ctx.Err()) l.mu.Lock() l.shutdownErr = err @@ -173,7 +171,7 @@ func (l *lifecycle) Add(run func(context.Context)) { l.mu.Unlock() // Log so future authors don't hit silent-refusal traps. l.logger.Warn("kernelevents lifecycle.Add called outside running window — ignored", - "closed", l.closed, "started", l.internalCtx != nil) + zap.Bool("closed", l.closed), zap.Bool("started", l.internalCtx != nil)) return } ctx := l.internalCtx diff --git a/components/receivers/kernelevents/lifecycle_test.go b/components/receivers/kernelevents/lifecycle_test.go index a2137676..d0aecfd0 100644 --- a/components/receivers/kernelevents/lifecycle_test.go +++ b/components/receivers/kernelevents/lifecycle_test.go @@ -5,19 +5,20 @@ package kernelevents import ( "context" "errors" - "log/slog" "runtime" "sync" "sync/atomic" "testing" "time" + + "go.uber.org/zap" ) // TestLifecycle_StartShutdown pins the happy path: Start spawns the // supplied run function, Shutdown cancels its ctx, run returns, // Shutdown returns nil. func TestLifecycle_StartShutdown(t *testing.T) { - lc := newLifecycle(slog.Default(), nil) + lc := newLifecycle(zap.NewNop(), nil) started := make(chan struct{}) stopped := make(chan struct{}) @@ -44,7 +45,7 @@ func TestLifecycle_StartShutdown(t *testing.T) { // intervening Shutdown returns errLifecycleAlreadyStarted. The // sentinel MUST be errors.Is-comparable so callers don't string-match. func TestLifecycle_StartTwiceErrors(t *testing.T) { - lc := newLifecycle(slog.Default(), nil) + lc := newLifecycle(zap.NewNop(), nil) if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil { t.Fatalf("first Start: %v", err) } @@ -58,7 +59,7 @@ func TestLifecycle_StartTwiceErrors(t *testing.T) { // TestLifecycle_ShutdownIdempotent pins: a second Shutdown returns the // first call's error (typically nil), not "double-close" or panic. func TestLifecycle_ShutdownIdempotent(t *testing.T) { - lc := newLifecycle(slog.Default(), nil) + lc := newLifecycle(zap.NewNop(), nil) if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil { t.Fatalf("Start: %v", err) } @@ -79,7 +80,7 @@ func TestLifecycle_PanicCallbackFires(t *testing.T) { var called atomic.Int32 var got any var mu sync.Mutex - lc := newLifecycle(slog.Default(), func(v any) { + lc := newLifecycle(zap.NewNop(), func(v any) { called.Add(1) mu.Lock() got = v @@ -121,7 +122,7 @@ func TestLifecycle_PanicCallbackFires(t *testing.T) { // purely a contract pin so future authors don't silently lose the // deadline. func TestLifecycle_ShutdownDeadlineReturnsCtxErr(t *testing.T) { - lc := newLifecycle(slog.Default(), nil) + lc := newLifecycle(zap.NewNop(), nil) leak := make(chan struct{}) if err := lc.Start(context.Background(), func(context.Context) { <-leak // ignore ctx @@ -146,7 +147,7 @@ func TestLifecycle_ShutdownDeadlineReturnsCtxErr(t *testing.T) { // here because Shutdown returns before the goroutine has observed // ctx.Done. func TestLifecycle_AddRegistersUnderSameWaitGroup(t *testing.T) { - lc := newLifecycle(slog.Default(), nil) + lc := newLifecycle(zap.NewNop(), nil) if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil { t.Fatalf("Start: %v", err) } @@ -179,7 +180,7 @@ func TestLifecycle_AddPanicFiresCallback(t *testing.T) { var called atomic.Int32 var got any var mu sync.Mutex - lc := newLifecycle(slog.Default(), func(v any) { + lc := newLifecycle(zap.NewNop(), func(v any) { called.Add(1) mu.Lock() got = v @@ -216,7 +217,7 @@ func TestLifecycle_AddPanicFiresCallback(t *testing.T) { // spawn a goroutine with no cancel hookup — a leak — so the helper // refuses. func TestLifecycle_AddBeforeStartIsNoop(t *testing.T) { - lc := newLifecycle(slog.Default(), nil) + lc := newLifecycle(zap.NewNop(), nil) var spawned atomic.Bool lc.Add(func(context.Context) { spawned.Store(true) }) // Give a generous window for any (incorrectly) spawned goroutine @@ -233,7 +234,7 @@ func TestLifecycle_AddBeforeStartIsNoop(t *testing.T) { // before previous Wait has returned" if the helper didn't guard // against this. func TestLifecycle_AddAfterShutdownIsNoop(t *testing.T) { - lc := newLifecycle(slog.Default(), nil) + lc := newLifecycle(zap.NewNop(), nil) if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil { t.Fatalf("Start: %v", err) } @@ -271,7 +272,7 @@ func TestLifecycle_AddAfterShutdownIsNoop(t *testing.T) { // Naming: receiver-prefix per criterion 3 (cross-cut review). func TestKernelevents_Lifecycle_ConcurrentAddDuringShutdown_NoPanic(t *testing.T) { const adders = 50 - lc := newLifecycle(slog.Default(), nil) + lc := newLifecycle(zap.NewNop(), nil) if err := lc.Start(context.Background(), func(ctx context.Context) { <-ctx.Done() }); err != nil { t.Fatalf("Start: %v", err) } diff --git a/components/receivers/kernelevents/nullsource_test.go b/components/receivers/kernelevents/nullsource_test.go index 8bfafa13..fde40599 100644 --- a/components/receivers/kernelevents/nullsource_test.go +++ b/components/receivers/kernelevents/nullsource_test.go @@ -4,12 +4,12 @@ package kernelevents import ( "context" - "log/slog" "sync/atomic" "testing" "time" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) // nullSource is the falsifier for research § D1 ("source-abstraction @@ -26,7 +26,7 @@ type nullSource struct { func newNullSource(out chan<- parsedRecord) *nullSource { s := &nullSource{out: out} - s.lc = newLifecycle(slog.Default(), nil) + s.lc = newLifecycle(zap.NewNop(), nil) return s } diff --git a/components/receivers/kernelevents/otelcontrib_e2e_test.go b/components/receivers/kernelevents/otelcontrib_e2e_test.go index 4cc37f44..80594b29 100644 --- a/components/receivers/kernelevents/otelcontrib_e2e_test.go +++ b/components/receivers/kernelevents/otelcontrib_e2e_test.go @@ -27,10 +27,10 @@ import ( "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/pdata/plog" "github.com/tracecoreai/tracecore/components/receivers/kernelevents" - "github.com/tracecoreai/tracecore/internal/pipeline/pipelinetest" ) // TestE2E_OTelContribAcceptsOTLPHTTP is the criterion-#4 falsifier: @@ -47,19 +47,18 @@ func TestE2E_OTelContribAcceptsOTLPHTTP(t *testing.T) { captured := make(chan plog.Logs, 4) sink := &captureSink{out: captured} - cfg, _ := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config) + cfg, _ := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config) cfg.Journald.Enabled = false cfg.MinSeverity = severityDebug - fx := pipelinetest.New(t) - rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, sink) + rcv, err := kernelevents.NewFactory().CreateLogs(t.Context(), testSettings(t), cfg, sink) require.NoError(t, err) fixture := "3,2456,1209384738,-;NVRM: Xid (PCI:0000:65:00.0): 79, pid=0\n" kernelevents.SetKmsgOpenForTest(rcv, func() (io.ReadCloser, error) { return io.NopCloser(strings.NewReader(fixture)), nil }) - require.NoError(t, rcv.Start(t.Context(), fx.Host)) + require.NoError(t, rcv.Start(t.Context(), componenttest.NewNopHost())) t.Cleanup(func() { _ = rcv.Shutdown(t.Context()) }) var emitted plog.Logs diff --git a/components/receivers/kernelevents/parser_test.go b/components/receivers/kernelevents/parser_test.go index 209fa0d0..6e7b0fb0 100644 --- a/components/receivers/kernelevents/parser_test.go +++ b/components/receivers/kernelevents/parser_test.go @@ -3,8 +3,6 @@ package kernelevents import ( - "bytes" - "log/slog" "os" "path/filepath" "sort" @@ -15,6 +13,8 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" ) // TestParseKmsgRecord_Xid79 asserts parseKmsgRecord against the @@ -66,20 +66,23 @@ func TestParseKmsgRecord_RejectsMalformed(t *testing.T) { // per-record parse-error Warns were ungated → log-storm risk. func TestSourceCommon_WarnOnce_GatesByKey(t *testing.T) { t.Parallel() - var buf bytes.Buffer - logger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelWarn})) + // zaptest/observer captures every log entry so we can assert on + // counts per message string — no buffer-substring matching that + // would tangle with zap's JSON / console formatters. + core, recorded := observer.New(zap.WarnLevel) + logger := zap.New(core) c := &sourceCommon{name: "test", logger: logger} // 100 calls with the same key → 1 log line. for i := range 100 { - c.warnOnce("parse", "parse error", "iter", i) + c.warnOnce("parse", "parse error", zap.Int("iter", i)) } - require.Equal(t, 1, strings.Count(buf.String(), "parse error"), + require.Equal(t, 1, recorded.FilterMessage("parse error").Len(), "repeat warnOnce calls for the same key MUST collapse to one log line") // Different key still logs. c.warnOnce("connect", "connect error") - require.Equal(t, 1, strings.Count(buf.String(), "connect error"), + require.Equal(t, 1, recorded.FilterMessage("connect error").Len(), "distinct keys MUST each log once") } diff --git a/components/receivers/kernelevents/roundtrip_helpers_test.go b/components/receivers/kernelevents/roundtrip_helpers_test.go index 6e052a9e..c6e94064 100644 --- a/components/receivers/kernelevents/roundtrip_helpers_test.go +++ b/components/receivers/kernelevents/roundtrip_helpers_test.go @@ -7,8 +7,7 @@ import ( "testing" "github.com/stretchr/testify/require" - - "github.com/tracecoreai/tracecore/internal/consumer" + "go.opentelemetry.io/collector/consumer" ) // Ensure the consumer package's interface is satisfied by diff --git a/components/receivers/kernelevents/roundtrip_test.go b/components/receivers/kernelevents/roundtrip_test.go index f1657a5f..fb0911a1 100644 --- a/components/receivers/kernelevents/roundtrip_test.go +++ b/components/receivers/kernelevents/roundtrip_test.go @@ -10,11 +10,11 @@ import ( "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" "github.com/tracecoreai/tracecore/components/receivers/kernelevents" - "github.com/tracecoreai/tracecore/internal/consumer" - "github.com/tracecoreai/tracecore/internal/pipeline/pipelinetest" ) // TestReceiver_OTLPRoundTrip is the A+ criterion #9: at least one @@ -32,12 +32,11 @@ func TestReceiver_OTLPRoundTrip(t *testing.T) { captured := make(chan plog.Logs, 4) sink := &captureSink{out: captured} - cfg, _ := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config) + cfg, _ := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config) cfg.Journald.Enabled = false cfg.MinSeverity = severityDebug - fx := pipelinetest.New(t) - rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, sink) + rcv, err := kernelevents.NewFactory().CreateLogs(t.Context(), testSettings(t), cfg, sink) require.NoError(t, err) // Single Xid 79 fixture line. @@ -45,7 +44,7 @@ func TestReceiver_OTLPRoundTrip(t *testing.T) { kernelevents.SetKmsgOpenForTest(rcv, func() (io.ReadCloser, error) { return io.NopCloser(strings.NewReader(fixture)), nil }) - require.NoError(t, rcv.Start(t.Context(), fx.Host)) + require.NoError(t, rcv.Start(t.Context(), componenttest.NewNopHost())) t.Cleanup(func() { _ = rcv.Shutdown(t.Context()) }) var emitted plog.Logs @@ -122,19 +121,18 @@ func TestReceiver_OTLPWireSize_CanonicalRecord(t *testing.T) { captured := make(chan plog.Logs, 4) sink := &captureSink{out: captured} - cfg, _ := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config) + cfg, _ := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config) cfg.Journald.Enabled = false cfg.MinSeverity = severityDebug - fx := pipelinetest.New(t) - rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, sink) + rcv, err := kernelevents.NewFactory().CreateLogs(t.Context(), testSettings(t), cfg, sink) require.NoError(t, err) fixture := "3,2456,1209384738,-;NVRM: Xid (PCI:0000:65:00.0): 79, pid=0\n" kernelevents.SetKmsgOpenForTest(rcv, func() (io.ReadCloser, error) { return io.NopCloser(strings.NewReader(fixture)), nil }) - require.NoError(t, rcv.Start(t.Context(), fx.Host)) + require.NoError(t, rcv.Start(t.Context(), componenttest.NewNopHost())) t.Cleanup(func() { _ = rcv.Shutdown(t.Context()) }) var emitted plog.Logs @@ -172,11 +170,10 @@ func TestReceiver_OTLPRoundTrip_TraceContext(t *testing.T) { captured := make(chan plog.Logs, 4) sink := &captureSink{out: captured} - cfg, _ := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config) + cfg, _ := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config) cfg.Kmsg.Enabled = false // journald-only for trace context - fx := pipelinetest.New(t) - rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, sink) + rcv, err := kernelevents.NewFactory().CreateLogs(t.Context(), testSettings(t), cfg, sink) require.NoError(t, err) // Use the journald source via mock to inject a trace-context @@ -186,7 +183,7 @@ func TestReceiver_OTLPRoundTrip_TraceContext(t *testing.T) { t.Setenv("KERNELEVENTS_MOCK_FIXTURE", traceContextFixturePath(t)) kernelevents.SetJournaldFinderForTest(rcv, func() (string, error) { return mock, nil }) - require.NoError(t, rcv.Start(t.Context(), fx.Host)) + require.NoError(t, rcv.Start(t.Context(), componenttest.NewNopHost())) t.Cleanup(func() { _ = rcv.Shutdown(t.Context()) }) var emitted plog.Logs diff --git a/components/receivers/kernelevents/selftel.go b/components/receivers/kernelevents/selftel.go index 9ea890d6..773b0a41 100644 --- a/components/receivers/kernelevents/selftel.go +++ b/components/receivers/kernelevents/selftel.go @@ -1,15 +1,14 @@ // SPDX-License-Identifier: Apache-2.0 // Receiver-scoped self-telemetry surface. Replaces the v0.1.x -// dependency on `internal/selftelemetry`, which is slated for deletion -// in RFC-0013 PR-F. Metric names + label shape are preserved -// (`tracecore.receiver.errors_total{kind,component_id}` and siblings) -// so dashboards / alerts don't regress. The instrumentation scope name -// is THIS receiver's Go import path — when the receiver moves to its -// own submodule in PR-I.2, the scope name moves with it, matching OTel -// convention. +// dependency on `internal/selftelemetry`. Metric names + label shape +// are preserved (`tracecore.receiver.errors_total{kind,component_id}` +// and siblings) so dashboards / alerts don't regress. The +// instrumentation scope name is THIS receiver's Go import path — +// when the receiver moves to its own submodule in PR-I.2, the scope +// name moves with it, matching OTel convention. // -// Mirrors the PR-B1 sibling design for components/receivers/nccl_fr, +// Mirrors the PR-B2 sibling design for components/receivers/nccl_fr, // adapted for kernelevents' multi-source shape: sources receive // `selfTelemetry` injected at construction (one shared per-receiver // instance, so kmsg + journald increments roll up to the same @@ -25,10 +24,9 @@ import ( "sync/atomic" "time" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" - - "github.com/tracecoreai/tracecore/internal/pipeline" ) // kind is a low-cardinality error-class identifier. Mirrors the @@ -99,7 +97,7 @@ var _ selfTelemetry = noopSelfTelemetry{} // `component_id` label on every emission. Registers the same five // instruments the v0.1.x internal selftelemetry package registered, so // scraped metric names + label shape are unchanged. -func newSelfTelemetry(id pipeline.ID, mp metric.MeterProvider) (selfTelemetry, error) { +func newSelfTelemetry(id component.ID, mp metric.MeterProvider) (selfTelemetry, error) { if mp == nil { return nil, errNilMeterProvider } diff --git a/components/receivers/kernelevents/selftel_test.go b/components/receivers/kernelevents/selftel_test.go index 814c85c1..5702ee1a 100644 --- a/components/receivers/kernelevents/selftel_test.go +++ b/components/receivers/kernelevents/selftel_test.go @@ -10,11 +10,10 @@ import ( "testing" "time" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/otel/attribute" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" - - "github.com/tracecoreai/tracecore/internal/pipeline" ) // newTestMeterProvider builds an SDK MeterProvider backed by a @@ -269,8 +268,8 @@ func TestRecordInitError_NilProviderIsSafe(t *testing.T) { recordInitError(context.Background(), nil, "receiver", "x/y", reasonInstrumentRegister) } -func testID() pipeline.ID { - return pipeline.MustNewID(pipeline.MustNewType("kernelevents"), "test") +func testID() component.ID { + return component.NewIDWithName(component.MustNewType("kernelevents"), "test") } func dumpNames(rm metricdata.ResourceMetrics) string { diff --git a/components/receivers/kernelevents/source.go b/components/receivers/kernelevents/source.go index 717e9fcb..e8f22b52 100644 --- a/components/receivers/kernelevents/source.go +++ b/components/receivers/kernelevents/source.go @@ -4,9 +4,10 @@ package kernelevents import ( "context" - "log/slog" "sync" "sync/atomic" + + "go.uber.org/zap" ) // sourceKmsg / sourceJournald are the canonical source-name @@ -78,7 +79,7 @@ type source interface { // don't re-implement the same `markDegraded` / `markHealthy` pair. type sourceCommon struct { name string - logger *slog.Logger + logger *zap.Logger telemetry selfTelemetry degraded atomic.Bool @@ -99,7 +100,7 @@ func (c *sourceCommon) Degraded() bool { return c.degraded.Load() } // log per actual fault). func (c *sourceCommon) markDegraded(reason string) { if c.degraded.CompareAndSwap(false, true) { - c.logger.Warn(c.name+" source degraded", "reason", reason) + c.logger.Warn(c.name+" source degraded", zap.String("reason", reason)) c.telemetry.SetDegraded(true) } } @@ -131,7 +132,7 @@ func (c *sourceCommon) markDegraded(reason string) { // occurrence after markHealthy. Documented as docs/FOLLOWUPS.md // row tracking R1.B7 review extension; trigger is an operator // reporting they lost a recovery breadcrumb. -func (c *sourceCommon) warnOnce(key string, msg string, fields ...any) { +func (c *sourceCommon) warnOnce(key string, msg string, fields ...zap.Field) { if _, loaded := c.warnedKinds.LoadOrStore(key, struct{}{}); loaded { return } diff --git a/components/receivers/kernelevents/source_template.go.example b/components/receivers/kernelevents/source_template.go.example index fc4c18b6..76c2cdc2 100644 --- a/components/receivers/kernelevents/source_template.go.example +++ b/components/receivers/kernelevents/source_template.go.example @@ -41,8 +41,9 @@ package kernelevents import ( "context" - "log/slog" "time" + + "go.uber.org/zap" ) // REPLACE-ME: source identifier constant. Add to source.go's @@ -81,7 +82,7 @@ type YourConfig struct { // out-channel, telemetry. The receiver's Start function calls this. func newYourSource( cfg YourConfig, - logger *slog.Logger, + logger *zap.Logger, out chan<- parsedRecord, telemetry selfTelemetry, ) *yourSource { diff --git a/components/receivers/kernelevents/source_template_test.go b/components/receivers/kernelevents/source_template_test.go index 2c8c8ae8..f19255ee 100644 --- a/components/receivers/kernelevents/source_template_test.go +++ b/components/receivers/kernelevents/source_template_test.go @@ -21,11 +21,11 @@ import ( // downstream inherits it. // // Gate is parser-only — no type checking — because the template -// imports `internal/runtime/lifecycle` and uses kernelevents- -// package-private types (sourceCommon, parsedRecord). Full type -// checking would require dropping the rename safety. The parse -// gate catches the class of bug an author would actually hit: -// missing brace, bad import, broken func signature. +// uses kernelevents-package-private types (sourceCommon, parsedRecord, +// the local lifecycle helper). Full type checking would require +// dropping the rename safety. The parse gate catches the class of +// bug an author would actually hit: missing brace, bad import, +// broken func signature. // // review acceptance criterion: source_template compiles after a naive // gofmt-and-rename. diff --git a/components/receivers/kernelevents/sustained_test.go b/components/receivers/kernelevents/sustained_test.go index f7180cf7..e9865f75 100644 --- a/components/receivers/kernelevents/sustained_test.go +++ b/components/receivers/kernelevents/sustained_test.go @@ -24,9 +24,9 @@ import ( "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" "github.com/tracecoreai/tracecore/components/receivers/kernelevents" - "github.com/tracecoreai/tracecore/internal/pipeline/pipelinetest" ) // TestReceiver_SustainedLoad drives the receiver at ~1000 events/sec @@ -51,12 +51,11 @@ func TestReceiver_SustainedLoad(t *testing.T) { ) sink := &countingSink{latencies: make([]time.Duration, 0, 600000)} - cfg, _ := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config) + cfg, _ := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config) cfg.Journald.Enabled = false cfg.MinSeverity = severityDebug - fx := pipelinetest.New(t) - rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, sink) + rcv, err := kernelevents.NewFactory().CreateLogs(t.Context(), testSettings(t), cfg, sink) require.NoError(t, err) // Build a pre-allocated fixture chunk to keep the feeder loop @@ -72,7 +71,7 @@ func TestReceiver_SustainedLoad(t *testing.T) { return pr, nil }) - require.NoError(t, rcv.Start(t.Context(), fx.Host)) + require.NoError(t, rcv.Start(t.Context(), componenttest.NewNopHost())) t.Cleanup(func() { _ = rcv.Shutdown(t.Context()) }) // Snapshot baseline heap after Start but before the load diff --git a/go.mod b/go.mod index 720466c3..81f83ea3 100644 --- a/go.mod +++ b/go.mod @@ -8,8 +8,10 @@ require ( github.com/santhosh-tekuri/jsonschema/v6 v6.0.2 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/collector/component v1.59.0 + go.opentelemetry.io/collector/component/componenttest v0.153.0 go.opentelemetry.io/collector/consumer v1.59.0 go.opentelemetry.io/collector/pdata v1.59.0 + go.opentelemetry.io/collector/pipeline v1.59.0 go.opentelemetry.io/collector/receiver v1.59.0 go.opentelemetry.io/collector/receiver/receivertest v0.153.0 go.opentelemetry.io/otel v1.43.0 @@ -229,13 +231,11 @@ require ( go-simpler.org/sloglint v0.11.0 // indirect go.augendre.info/fatcontext v0.8.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/collector/component/componenttest v0.153.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.153.0 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.153.0 // indirect go.opentelemetry.io/collector/featuregate v1.59.0 // indirect go.opentelemetry.io/collector/internal/componentalias v0.153.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.153.0 // indirect - go.opentelemetry.io/collector/pipeline v1.59.0 // indirect go.opentelemetry.io/collector/receiver/xreceiver v0.153.0 // indirect go.opentelemetry.io/otel/sdk v1.43.0 // indirect go.opentelemetry.io/otel/trace v1.43.0 // indirect