Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 21 additions & 23 deletions components/receivers/kernelevents/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"context"
"fmt"
"io"
"log/slog"
"runtime"
"sort"
"strings"
Expand All @@ -16,13 +15,15 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.uber.org/zap"

"github.com/tracecoreai/tracecore/components/receivers/kernelevents"
"github.com/tracecoreai/tracecore/internal/consumer"
"github.com/tracecoreai/tracecore/internal/pipeline"
"github.com/tracecoreai/tracecore/internal/pipeline/pipelinetest"
"github.com/tracecoreai/tracecore/internal/sli"
)

Expand Down Expand Up @@ -55,16 +56,16 @@ func BenchmarkReceiver_KmsgWithDegradedJournald(b *testing.B) {
func runBench(b *testing.B, kmsg, journald bool) {
b.Helper()
sink := &countingSink{}
cfg, _ := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config)
cfg, _ := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config)
cfg.Kmsg.Enabled = kmsg
cfg.Journald.Enabled = journald
cfg.MinSeverity = severityDebug // pass everything

// pipelinetest.New takes *testing.T; benchmarks fabricate
// minimal CreateSettings + Host inline.
// receivertest.NewNopSettings takes a component.Type; benchmarks
// fabricate a discard-logger settings + nop host inline so the
// bench loop doesn't surface log noise from the factory.
settings, host := benchSettings()
_ = settings // referenced via rcv construction below
rcv, err := kernelevents.Factory.CreateLogs(b.Context(), settings, cfg, sink)
rcv, err := kernelevents.NewFactory().CreateLogs(b.Context(), settings, cfg, sink)
require.NoError(b, err)

// Feed kmsg from a memory reader.
Expand Down Expand Up @@ -142,20 +143,19 @@ func TestReceiver_SLIBudget(t *testing.T) {
const targetHeapMiB = 50.0

sink := &countingSink{latencies: make([]time.Duration, 0, totalEvents)}
cfg, _ := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config)
cfg, _ := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config)
cfg.Journald.Enabled = false
cfg.MinSeverity = severityDebug

fx := pipelinetest.New(t)
rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, sink)
rcv, err := kernelevents.NewFactory().CreateLogs(t.Context(), testSettings(t), cfg, sink)
require.NoError(t, err)

fixture := strings.Repeat("3,1,1,-;NVRM: Xid (PCI:0000:65:00.0): 79, pid=0\n", totalEvents)
kernelevents.SetKmsgOpenForTest(rcv, func() (io.ReadCloser, error) {
return io.NopCloser(strings.NewReader(fixture)), nil
})

require.NoError(t, rcv.Start(t.Context(), fx.Host))
require.NoError(t, rcv.Start(t.Context(), componenttest.NewNopHost()))
t.Cleanup(func() { _ = rcv.Shutdown(t.Context()) })

startWall := time.Now()
Expand Down Expand Up @@ -308,13 +308,11 @@ func cpuNs() int64 {
// false negatives in CI.
func isRaceBuild() bool { return raceEnabled }

// benchSettings fabricates a minimal CreateSettings + Host for
// benchmarks (pipelinetest.New is *testing.T-bound).
func benchSettings() (pipeline.CreateSettings, pipeline.Host) {
return pipeline.CreateSettings{
Telemetry: pipeline.TelemetrySettings{
Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
Resource: pcommon.NewResource(),
},
}, pipelinetest.NewHost()
// benchSettings fabricates a minimal receiver.Settings + Host for
// benchmarks (testSettings is *testing.T-bound). The logger is the
// zap.NewNop so the bench loop doesn't pay log-format costs.
func benchSettings() (receiver.Settings, component.Host) {
s := receivertest.NewNopSettings(component.MustNewType("kernelevents"))
s.Logger = zap.NewNop()
return s, componenttest.NewNopHost()
}
16 changes: 7 additions & 9 deletions components/receivers/kernelevents/cascade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"

"github.com/tracecoreai/tracecore/components/receivers/kernelevents"
"github.com/tracecoreai/tracecore/internal/pipeline/pipelinetest"
)

// TestReceiver_CancelCascadesToSources is the A+ criterion #4
Expand All @@ -25,11 +25,10 @@ func TestReceiver_CancelCascadesToSources(t *testing.T) {
t.Parallel()

sink := newLogsSink()
cfg, _ := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config)
cfg, _ := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config)
cfg.Journald.Enabled = false // kmsg-only

fx := pipelinetest.New(t)
rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, sink)
rcv, err := kernelevents.NewFactory().CreateLogs(t.Context(), testSettings(t), cfg, sink)
require.NoError(t, err)

// Blocking reader: only Close() makes Read return. If ctx
Expand All @@ -42,7 +41,7 @@ func TestReceiver_CancelCascadesToSources(t *testing.T) {
})

startCtx, cancelStart := context.WithCancel(t.Context())
require.NoError(t, rcv.Start(startCtx, fx.Host))
require.NoError(t, rcv.Start(startCtx, componenttest.NewNopHost()))

// Let the source begin reading (block in Read).
time.Sleep(50 * time.Millisecond)
Expand Down Expand Up @@ -93,10 +92,9 @@ func TestReceiver_BothSourcesEnabled_DegradedDualSource(t *testing.T) {
t.Parallel()

sink := newLogsSink()
cfg, _ := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config)
cfg, _ := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config)

fx := pipelinetest.New(t)
rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, sink)
rcv, err := kernelevents.NewFactory().CreateLogs(t.Context(), testSettings(t), cfg, sink)
require.NoError(t, err)

kernelevents.SetKmsgOpenForTest(rcv, func() (io.ReadCloser, error) {
Expand All @@ -106,6 +104,6 @@ func TestReceiver_BothSourcesEnabled_DegradedDualSource(t *testing.T) {
return "", io.ErrClosedPipe
})

require.NoError(t, rcv.Start(t.Context(), fx.Host))
require.NoError(t, rcv.Start(t.Context(), componenttest.NewNopHost()))
require.NoError(t, rcv.Shutdown(t.Context()))
}
9 changes: 4 additions & 5 deletions components/receivers/kernelevents/cpu_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"

"github.com/tracecoreai/tracecore/components/receivers/kernelevents"
"github.com/tracecoreai/tracecore/internal/pipeline/pipelinetest"
)

// TestReceiver_CPUBudget_Linux measures per-event CPU cost using
Expand Down Expand Up @@ -49,20 +49,19 @@ func TestReceiver_CPUBudget_Linux(t *testing.T) {
)

sink := &countingSink{latencies: make([]time.Duration, 0, targetEvents)}
cfg, _ := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config)
cfg, _ := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config)
cfg.Journald.Enabled = false
cfg.MinSeverity = severityDebug

fx := pipelinetest.New(t)
rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, sink)
rcv, err := kernelevents.NewFactory().CreateLogs(t.Context(), testSettings(t), cfg, sink)
require.NoError(t, err)

fixture := strings.Repeat("3,1,1,-;NVRM: Xid (PCI:0000:65:00.0): 79, pid=0\n", targetEvents)
kernelevents.SetKmsgOpenForTest(rcv, func() (io.ReadCloser, error) {
return io.NopCloser(strings.NewReader(fixture)), nil
})

require.NoError(t, rcv.Start(t.Context(), fx.Host))
require.NoError(t, rcv.Start(t.Context(), componenttest.NewNopHost()))
t.Cleanup(func() { _ = rcv.Shutdown(t.Context()) })

cpuBefore := readCPUNs(t)
Expand Down
4 changes: 2 additions & 2 deletions components/receivers/kernelevents/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func ExampleNewFactory() {
// only one source enabled — the partial-source operating mode that
// non-systemd hosts (Alpine, BSD) rely on.
func Example_partialSource() {
cfg, ok := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config)
cfg, ok := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config)
if !ok {
fmt.Println("unexpected config type")
return
Expand Down Expand Up @@ -61,7 +61,7 @@ func TestExampleConfig_Parses(t *testing.T) {
require.NoError(t, yaml.Unmarshal(bs, &doc),
"example_config.yaml must parse as YAML")

cfg, ok := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config)
cfg, ok := kernelevents.NewFactory().CreateDefaultConfig().(*kernelevents.Config)
require.True(t, ok, "factory must produce *Config")
require.NoError(t, doc.Receivers.Kernelevents.Decode(cfg),
"receivers.kernelevents block must decode into Config")
Expand Down
12 changes: 6 additions & 6 deletions components/receivers/kernelevents/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ package kernelevents

import (
"io"
"log/slog"
"sync"
"sync/atomic"
"time"

"github.com/tracecoreai/tracecore/internal/pipeline"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
)

// FakeTelemetry is a test-only captor for the receiver-scoped
Expand Down Expand Up @@ -83,7 +83,7 @@ var _ selfTelemetry = (*FakeTelemetry)(nil)
// WithSelfTelemetryForTest swaps the receiver's telemetry hook AND
// every source's telemetry hook so external _test packages can assert
// on counter calls.
func WithSelfTelemetryForTest(rcv pipeline.Receiver, t *FakeTelemetry) {
func WithSelfTelemetryForTest(rcv receiver.Logs, t *FakeTelemetry) {
r, ok := rcv.(*kernelEventsReceiver)
if !ok {
return
Expand All @@ -107,7 +107,7 @@ func NewKmsgSourceForTest(cfg *Config, open func() (io.ReadCloser, error), t *Fa
if t == nil {
telemetry = newNoopSelfTelemetry()
}
s := newKmsgSource(cfg, slog.Default(), make(chan parsedRecord, 8), telemetry)
s := newKmsgSource(cfg, zap.NewNop(), make(chan parsedRecord, 8), telemetry)
s.open = open
return &KmsgSourceForTest{s: s}
}
Expand All @@ -124,7 +124,7 @@ func (k *KmsgSourceForTest) Name() string { return k.s.Name() }
// SetKmsgOpenForTest swaps the receiver's kmsg-open hook so external
// _test packages can drive the source from a fixture file or
// in-memory reader instead of /dev/kmsg.
func SetKmsgOpenForTest(rcv pipeline.Receiver, open func() (io.ReadCloser, error)) {
func SetKmsgOpenForTest(rcv receiver.Logs, open func() (io.ReadCloser, error)) {
r, ok := rcv.(*kernelEventsReceiver)
if !ok {
return
Expand All @@ -133,7 +133,7 @@ func SetKmsgOpenForTest(rcv pipeline.Receiver, open func() (io.ReadCloser, error
}

// SetJournaldFinderForTest swaps the journald binary finder.
func SetJournaldFinderForTest(rcv pipeline.Receiver, find func() (string, error)) {
func SetJournaldFinderForTest(rcv receiver.Logs, find func() (string, error)) {
r, ok := rcv.(*kernelEventsReceiver)
if !ok {
return
Expand Down
78 changes: 41 additions & 37 deletions components/receivers/kernelevents/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,47 @@ import (
"context"
"fmt"

"github.com/tracecoreai/tracecore/internal/consumer"
"github.com/tracecoreai/tracecore/internal/pipeline"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
)

// componentType is wrapped in a function so the pipeline.MustNewType
// call is not a top-level side effect — defensive habit (the literal
// is regex-safe but the pattern is reusable).
func componentType() pipeline.Type { return pipeline.MustNewType("kernelevents") }
// componentType is the kind name registered in components.yaml. Wrapped
// in a function so the MustNewType call is not a top-level side effect
// (mirrors the nccl_fr / clockreceiver pattern).
func componentType() component.Type { return component.MustNewType("kernelevents") }

// Factory is the package-scoped ReceiverFactory for kernelevents.
// Mirrors clockreceiver.Factory in shape — M10+ receiver authors who
// want a streaming-source (vs tick-based) exemplar copy this.
//
// Only CreateLogs returns a real Receiver; CreateMetrics and
// CreateTraces return pipeline.ErrSignalNotSupported.
var Factory pipeline.ReceiverFactory = &factory{}

// NewFactory returns the package-var Factory. Required by
// tools/components-gen, which generates calls like
// `kernelevents.NewFactory()`.
func NewFactory() pipeline.ReceiverFactory { return Factory }

type factory struct{}

func (*factory) Type() pipeline.Type { return componentType() }
// stability is the OCB-surfaced stability level for kernelevents' logs
// signal. Beta tracks "metrics + label shape pinned; behavior may
// evolve" — same level the receiver has carried since the v0.1.x
// internal/pipeline factory; PR-B3 preserves it across the upstream
// swap.
const stability = component.StabilityLevelBeta

func (*factory) CreateDefaultConfig() pipeline.Config { return defaultConfig() }

func (*factory) CreateMetrics(_ context.Context, _ pipeline.CreateSettings, _ pipeline.Config, _ consumer.Metrics) (pipeline.Receiver, error) {
return nil, pipeline.ErrSignalNotSupported
// NewFactory returns the upstream receiver.Factory for kernelevents.
// Mirrors the upstream-contrib pattern (otlpreceiver, filelogreceiver)
// and the PR-B2 sibling (nccl_fr) — callers construct via
// `kernelevents.NewFactory()` rather than a package var, so each
// OCB-stitched pipeline gets a freshly-built factory and the package
// surface stays a single exported symbol.
//
// Only the logs signal returns a real Receiver; metrics + traces
// surface upstream's "signal not supported" via receiver.NewFactory's
// default unimplemented behavior.
func NewFactory() receiver.Factory {
return receiver.NewFactory(
componentType(),
createDefaultConfig,
receiver.WithLogs(createLogs, stability),
)
}

func (*factory) CreateTraces(_ context.Context, _ pipeline.CreateSettings, _ pipeline.Config, _ consumer.Traces) (pipeline.Receiver, error) {
return nil, pipeline.ErrSignalNotSupported
}
// createDefaultConfig matches upstream component.CreateDefaultConfigFunc.
func createDefaultConfig() component.Config { return defaultConfig() }

func (*factory) CreateLogs(ctx context.Context, set pipeline.CreateSettings, cfg pipeline.Config, next consumer.Logs) (pipeline.Receiver, error) {
// createLogs is the receiver.CreateLogsFunc wired by WithLogs.
func createLogs(ctx context.Context, set receiver.Settings, cfg component.Config, next consumer.Logs) (receiver.Logs, error) {
c, ok := cfg.(*Config)
if !ok {
return nil, fmt.Errorf("kernelevents: unexpected config type %T", cfg)
Expand All @@ -52,18 +56,18 @@ func (*factory) CreateLogs(ctx context.Context, set pipeline.CreateSettings, cfg
// safe and an init-error metric tracks the fallback so operators
// see when they're running silent.
r := newReceiver(set, c, next)
if set.Telemetry.MeterProvider != nil {
if rt, err := newSelfTelemetry(set.ID, set.Telemetry.MeterProvider); err == nil {
if set.MeterProvider != nil {
if rt, err := newSelfTelemetry(set.ID, set.MeterProvider); err == nil {
r.telemetry = rt
} else {
recordInitError(ctx, set.Telemetry.MeterProvider,
recordInitError(ctx, set.MeterProvider,
"receiver", set.ID.String(), reasonInstrumentRegister)
if set.Telemetry.Logger != nil {
set.Telemetry.Logger.Warn("kernelevents self-telemetry init failed; using noop", "err", err)
if set.Logger != nil {
set.Logger.Warn("kernelevents self-telemetry init failed; using noop", zap.Error(err))
}
}
} else if set.Telemetry.Logger != nil {
set.Telemetry.Logger.Warn("kernelevents: no MeterProvider; self-telemetry using noop")
} else if set.Logger != nil {
set.Logger.Warn("kernelevents: no MeterProvider; self-telemetry using noop")
}
return r, nil
}
Loading
Loading