Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions components/receivers/clockreceiver/clockreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"go.uber.org/zap"
)

// Receiver emits a gauge metric (`tracecore.clock.now`) on a
// clockReceiver emits a gauge metric (`tracecore.clock.now`) on a
// fixed-interval ticker. It exists to exercise the upstream
// receiver.Metrics contract end-to-end, not to be useful in
// production — the metric value is the current Unix time, which any
Expand All @@ -28,7 +28,13 @@ import (
// Self-telemetry is wired via the receiver-local `selfTelemetry`
// sibling, which replaces the v0.1.x dependency on
// `internal/selftelemetry`.
type Receiver struct {
//
// Package-private (lowercase) mirrors peer receivers in this repo
// (containerStdoutReceiver, k8sEventsReceiver, kernelEventsReceiver,
// ncclfrReceiver, pyspyReceiver): the factory is the only exported
// constructor surface — callers stitch via OCB, not by importing the
// concrete struct.
type clockReceiver struct {
set receiver.Settings
interval time.Duration
next consumer.Metrics
Expand All @@ -41,15 +47,15 @@ type Receiver struct {
lc *lifecycle
}

// Compile-time assertion: Receiver satisfies the upstream
// Compile-time assertion: clockReceiver satisfies the upstream
// receiver.Metrics interface (component.Component embedded in
// receiver.Metrics). A breaking shape change in upstream surfaces
// here at build time rather than at runtime when OCB stitches the
// pipeline.
var _ receiver.Metrics = (*Receiver)(nil)
var _ receiver.Metrics = (*clockReceiver)(nil)

func newReceiver(set receiver.Settings, cfg *Config, next consumer.Metrics) *Receiver {
r := &Receiver{
func newReceiver(set receiver.Settings, cfg *Config, next consumer.Metrics) *clockReceiver {
r := &clockReceiver{
set: set,
interval: cfg.Interval,
next: next,
Expand All @@ -66,7 +72,7 @@ func newReceiver(set receiver.Settings, cfg *Config, next consumer.Metrics) *Rec
// logger returns the receiver-scoped zap logger. Falls back to
// zap.NewNop when the runtime didn't supply one, so call sites never
// nil-check before logging.
func (r *Receiver) logger() *zap.Logger {
func (r *clockReceiver) logger() *zap.Logger {
if r.set.Logger != nil {
return r.set.Logger
}
Expand All @@ -76,7 +82,7 @@ func (r *Receiver) logger() *zap.Logger {
// Start spawns the ticker goroutine via the lifecycle helper. The
// caller's ctx authorizes the start; the ticker's lifetime is bound
// to Shutdown via lifecycle's internal cancel.
func (r *Receiver) Start(ctx context.Context, _ component.Host) error {
func (r *clockReceiver) Start(ctx context.Context, _ component.Host) error {
if err := r.lc.Start(ctx, r.run); err != nil {
return err //nolint:wrapcheck // lifecycle error already carries enough context
}
Expand All @@ -89,12 +95,12 @@ func (r *Receiver) Start(ctx context.Context, _ component.Host) error {
// already logs its own deadline-elapsed warning; the error is
// surfaced to the caller (the runtime) so a missed deadline isn't
// silently swallowed.
func (r *Receiver) Shutdown(ctx context.Context) error {
func (r *clockReceiver) Shutdown(ctx context.Context) error {
return r.lc.Shutdown(ctx)
}

// run emits one metric per tick until the lifecycle's ctx fires.
func (r *Receiver) run(ctx context.Context) {
func (r *clockReceiver) run(ctx context.Context) {
ticker := time.NewTicker(r.interval)
defer ticker.Stop()

Expand All @@ -111,7 +117,7 @@ func (r *Receiver) run(ctx context.Context) {
// emit constructs a single pmetric.Metrics with one gauge data point
// and pushes it to the next consumer. Errors are logged but do not
// stop the ticker — the next tick gets a fresh attempt.
func (r *Receiver) emit(ctx context.Context, now time.Time) {
func (r *clockReceiver) emit(ctx context.Context, now time.Time) {
md := pmetric.NewMetrics()
rm := md.ResourceMetrics().AppendEmpty()
r.set.Resource.CopyTo(rm.Resource())
Expand Down
12 changes: 3 additions & 9 deletions components/receivers/clockreceiver/clockreceiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
Expand Down Expand Up @@ -125,7 +126,7 @@ func TestReceiver_EmitsMetric(t *testing.T) {
rcv, err := clockreceiver.NewFactory().CreateMetrics(t.Context(), testSettings(), cfg, sink)
require.NoError(t, err)

require.NoError(t, rcv.Start(t.Context(), componenttest{}))
require.NoError(t, rcv.Start(t.Context(), componenttest.NewNopHost()))

// Block on the first push instead of sleeping for a fixed duration.
select {
Expand All @@ -152,7 +153,7 @@ func TestReceiver_ShutdownStopsGoroutineWithin100ms(t *testing.T) {

rcv, err := clockreceiver.NewFactory().CreateMetrics(t.Context(), testSettings(), cfg, sink)
require.NoError(t, err)
require.NoError(t, rcv.Start(t.Context(), componenttest{}))
require.NoError(t, rcv.Start(t.Context(), componenttest.NewNopHost()))

// Wait for at least one push so we know the goroutine is alive.
<-sink.pushed
Expand Down Expand Up @@ -208,10 +209,3 @@ func (s *metricsSink) ConsumeMetrics(_ context.Context, md pmetric.Metrics) erro
}

func (*metricsSink) Capabilities() consumer.Capabilities { return consumer.Capabilities{} }

// componenttest is a do-nothing component.Host used by Receiver.Start in
// tests; the receiver doesn't call GetExtensions / ReportComponentStatus
// on the host so an empty struct suffices.
type componenttest struct{}

func (componenttest) GetExtensions() map[component.ID]component.Component { return nil }
2 changes: 1 addition & 1 deletion components/receivers/clockreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const stability = component.StabilityLevelBeta
// package var, so each OCB-stitched pipeline gets a freshly-built
// factory and the package surface stays a single exported symbol.
//
// Only the metrics signal returns a real Receiver; traces + logs
// Only the metrics signal returns a real clockReceiver; traces + logs
// surface upstream's "signal not supported" via receiver.NewFactory's
// default unimplemented behavior.
func NewFactory() receiver.Factory {
Expand Down
4 changes: 2 additions & 2 deletions components/receivers/clockreceiver/selftel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,9 @@ func TestFactory_FallsBackToNoopWhenMeterFails(t *testing.T) {
if err != nil {
t.Fatalf("CreateMetrics: %v", err)
}
recv, ok := r.(*Receiver)
recv, ok := r.(*clockReceiver)
if !ok {
t.Fatalf("receiver type: got %T, want *Receiver", r)
t.Fatalf("receiver type: got %T, want *clockReceiver", r)
}
if recv.telemetry == nil {
t.Fatal("telemetry field nil after failed wiring; must fall back to noop")
Expand Down
Loading