diff --git a/components/exporters/otlphttp/selftel.go b/components/exporters/otlphttp/selftel.go index 62e68056..817a1076 100644 --- a/components/exporters/otlphttp/selftel.go +++ b/components/exporters/otlphttp/selftel.go @@ -1,35 +1,35 @@ // SPDX-License-Identifier: Apache-2.0 -// Exporter-scoped self-telemetry surface. Replaces the v0.1.x -// dependency on `internal/selftelemetry`. Metric names follow the -// upstream OTel collector `otelcol___` -// convention per RFC-0013 §migration v0.1.0 namespace alignment: +// Exporter-scoped self-telemetry surface. Thin wrapper over +// module/pkg/selftel that pins this exporter's scope-name + instrument +// name + the kind enum. Metric names follow the upstream OTel collector +// `otelcol___` convention per RFC-0013 +// §migration v0.1.0 namespace alignment: // `otelcol.exporter.otlphttp.calls_total{result,kind,component_id}` -// (Prometheus exporter renders the dots as underscores). Label shape -// is preserved (`component_id`) so multi-instance disambiguation in -// dashboards is unchanged from v0.1.x. The instrumentation scope name -// is THIS exporter's Go import path — when the exporter moves under -// `module/` in PR-I, the scope name moves with it, matching OTel -// convention. +// (the Prometheus exporter renders the dots as underscores). Label +// shape is preserved (`component_id`) so multi-instance disambiguation +// in dashboards is unchanged from v0.1.x. The instrumentation scope +// name is THIS exporter's Go import path. // -// Mirrors `components/exporters/stdoutexporter/selftel.go` (PR-B1). +// Mirrors `components/exporters/stdoutexporter/selftel.go`. Shared +// plumbing (the OTel counter, the noop fallback, the init-error +// fallback counter) lives in module/pkg/selftel. package otlphttp import ( "context" - "errors" - "fmt" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + + "github.com/tracecoreai/tracecore/module/pkg/selftel" ) // kind is a low-cardinality error-class identifier for exporter failures. -// Mirrors the internal/selftelemetry.Kind type so the migration is -// mechanical; exporter-local because the canonical-Kind enforcement that -// the internal package owned moves into RFC-0013 PR-I's submodule. +// Exporter-local so the wire-format strings stay owned by the package +// that emits them; the canonical-Kind enforcement the deleted +// internal/selftelemetry package owned moves into RFC-0013's submodule. type kind string const ( @@ -47,38 +47,29 @@ const ( kindDownstream kind = "downstream" ) -// reasonInstrumentRegister labels init_errors_total ticks when OTel -// instrument registration failed at construction time. -const reasonInstrumentRegister = "instrument_register" - // instrumentationScope pins the OTel scope name. Per OTel convention, -// the scope is the package's Go import path; PR-I changes this when -// the exporter moves into the module/ submodule. +// the scope is the package's Go import path. const instrumentationScope = "github.com/tracecoreai/tracecore/components/exporters/otlphttp" -// errNilMeterProvider mirrors selftelemetry.ErrNilMeterProvider — the -// factory is responsible for substituting the noop fallback + ticking -// init_errors_total. Returning a sentinel rather than a generic error -// lets the factory distinguish "wire-up bug" from "instrument register -// failure" if it ever needs to. -var errNilMeterProvider = errors.New("otlphttp: MeterProvider is nil") - -// selfExporter is the exporter-scoped self-health surface. Methods are -// non-blocking + safe for concurrent use; the noop impl discards. -// Mirrors the internal/selftelemetry.Exporter interface but trimmed to -// the exact surface otlphttp uses — no FailureRateReader (the -// per-exporter failure_rate aggregation contract is intentionally -// dropped by this port; see the package comment on SelfExporter -// removal in otlphttp.go). -// -// Why drop FailureRateReader / ExporterCarrier: -// - The runtime path that consumed ExporterCarrier -// (`cmd/tracecore.collect.collectFailureRateReaders` in v0.1.x) -// was deleted by RFC-0013 PR-A2 along with the hand-wired entry -// point, so the carrier interface has no remaining consumer. -// - `internal/selftelemetry` (which owned the carrier) was deleted -// by RFC-0013 PR-F.1. Operators rate-derive failure rate via -// PromQL `rate(otelcol_exporter_otlphttp_calls_total{result="error"}[5m])`. +// callsTotalName is the operator-facing metric name for this exporter's +// per-call counter. Kept here (not in module/pkg/selftel) so the +// shared package stays unaware of caller-specific name choices. +const callsTotalName = "otelcol.exporter.otlphttp.calls_total" + +// reasonInstrumentRegister is the wire-format label value for +// init_errors_total ticks when OTel instrument registration failed at +// construction time. Re-exported from the shared package so this +// package's factory + tests don't import selftel just for the const. +const reasonInstrumentRegister = selftel.ReasonInstrumentRegister + +// errNilMeterProvider is the sentinel returned by newSelfExporter when +// called with a nil MeterProvider. Aliased to the shared sentinel so +// the factory's errors.Is check survives the migration. +var errNilMeterProvider = selftel.ErrNilMeterProvider + +// selfExporter is the exporter-scoped self-health surface used by +// otlphttp hot paths. Mirrors selftel.Exporter but carries the +// package-local `kind` type so call sites stay type-checked. type selfExporter interface { IncCallSuccess() IncCallFailure(k kind) @@ -94,79 +85,33 @@ func (noopSelfExporter) IncCallFailure(kind) {} var _ selfExporter = noopSelfExporter{} -// newSelfExporter returns a real selfExporter backed by an OTel counter -// `otelcol.exporter.otlphttp.calls_total{result, kind, component_id}` acquired -// from mp. The component's id is attached as the `component_id` label -// on every emission. Metric name + label shape preserved from the -// v0.1.x internal selftelemetry package so dashboards / alerts don't -// regress. +// newSelfExporter returns a real selfExporter backed by the shared +// selftel.Exporter wired at this package's scope + calls_total name. +// Returns errNilMeterProvider (== selftel.ErrNilMeterProvider) when mp +// is nil; the factory is responsible for the noop fallback + the +// init_errors_total tick via recordInitError. func newSelfExporter(id component.ID, mp metric.MeterProvider) (selfExporter, error) { - if mp == nil { - return nil, errNilMeterProvider - } - meter := mp.Meter(instrumentationScope) - - calls, err := meter.Int64Counter( - "otelcol.exporter.otlphttp.calls_total", - metric.WithDescription("Exporter Consume* calls partitioned by result"), - ) + inner, err := selftel.NewExporter(id.String(), instrumentationScope, callsTotalName, mp) if err != nil { - return nil, fmt.Errorf("exporter.calls_total counter: %w", err) + return nil, err } - - return &selfExporterImpl{ - componentID: id.String(), - calls: calls, - }, nil + return &selfExporterImpl{inner: inner}, nil } -var _ selfExporter = (*selfExporterImpl)(nil) - +// selfExporterImpl casts the package-local `kind` to string at the +// shared-package seam. Zero-cost — the cast is a compile-time op. type selfExporterImpl struct { - componentID string - calls metric.Int64Counter + inner selftel.Exporter } -func (e *selfExporterImpl) IncCallSuccess() { - // Emit component_id + result in one WithAttributes call rather than - // merging two attribute sets — avoids relying on SDK merge semantics - // that vary across OTel versions. Mirrors the stdoutexporter sibling. - e.calls.Add(context.Background(), 1, metric.WithAttributes( - attribute.String("component_id", e.componentID), - attribute.String("result", "success"), - )) -} +var _ selfExporter = (*selfExporterImpl)(nil) -func (e *selfExporterImpl) IncCallFailure(k kind) { - e.calls.Add(context.Background(), 1, metric.WithAttributes( - attribute.String("component_id", e.componentID), - attribute.String("result", "failure"), - attribute.String("kind", string(k)), - )) -} +func (e *selfExporterImpl) IncCallSuccess() { e.inner.IncCallSuccess() } +func (e *selfExporterImpl) IncCallFailure(k kind) { e.inner.IncCallFailure(string(k)) } -// recordInitError ticks otelcol.selftelemetry.init_errors_total when -// exporter wiring falls back to noop telemetry. Operators alert on -// `> 0` to learn that self-telemetry isn't really plugged in. Panics -// from a broken MeterProvider are swallowed — recordInitError IS the -// degraded-path fallback; crashing here would turn a partial outage -// into a process kill. +// recordInitError forwards to the shared selftel.RecordInitError with +// this package's scope. Kept as a thin wrapper so the factory's call +// site stays identical to the pre-refactor shape. func recordInitError(ctx context.Context, mp metric.MeterProvider, kindLabel, componentID, reason string) { - defer func() { _ = recover() }() - if mp == nil { - return - } - meter := mp.Meter(instrumentationScope) - c, err := meter.Int64Counter( - "otelcol.selftelemetry.init_errors_total", - metric.WithDescription("Counter of self-telemetry construction failures that fell back to the noop implementation."), - ) - if err != nil { - return - } - c.Add(ctx, 1, metric.WithAttributes( - attribute.String("kind", kindLabel), - attribute.String("component_id", componentID), - attribute.String("reason", reason), - )) + selftel.RecordInitError(ctx, mp, instrumentationScope, kindLabel, componentID, reason) } diff --git a/components/exporters/otlphttp/selftel_test.go b/components/exporters/otlphttp/selftel_test.go index f70b789e..c592f331 100644 --- a/components/exporters/otlphttp/selftel_test.go +++ b/components/exporters/otlphttp/selftel_test.go @@ -5,82 +5,15 @@ package otlphttp import ( "context" "errors" - "fmt" - "strings" "testing" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/embedded" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" -) - -// newTestMeterProvider builds an SDK MeterProvider backed by a ManualReader -// so tests can collect metricdata.ResourceMetrics deterministically without -// the Prometheus exporter or any internal/telemetry plumbing — the exporter -// package must stay decoupled from internal/* so PR-F can delete those -// packages without touching this test file. Mirrors the PR-B1 sibling -// pattern (`components/exporters/stdoutexporter/selftel_test.go`). -func newTestMeterProvider(t *testing.T) (*sdkmetric.MeterProvider, *sdkmetric.ManualReader) { - t.Helper() - rdr := sdkmetric.NewManualReader() - mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)) - t.Cleanup(func() { _ = mp.Shutdown(context.Background()) }) - return mp, rdr -} - -func collectRM(t *testing.T, rdr *sdkmetric.ManualReader) metricdata.ResourceMetrics { - t.Helper() - var rm metricdata.ResourceMetrics - if err := rdr.Collect(context.Background(), &rm); err != nil { - t.Fatalf("collect: %v", err) - } - return rm -} - -// findInstrument returns the first metricdata.Metrics whose Name matches the -// supplied OTel-dot name (e.g. "otelcol.exporter.otlphttp.calls_total"). Returns -// (nil, false) if absent. Scope-agnostic: walks all scope metrics. -func findInstrument(rm metricdata.ResourceMetrics, name string) (metricdata.Metrics, bool) { - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - if m.Name == name { - return m, true - } - } - } - return metricdata.Metrics{}, false -} - -// scopeOf returns the instrumentation scope name that emitted the supplied -// metric name. Used to pin the scope-name standard for PR-B1 (sibling -// exporter port). -func scopeOf(rm metricdata.ResourceMetrics, name string) (string, bool) { - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - if m.Name == name { - return sm.Scope.Name, true - } - } - } - return "", false -} -// kvMatch returns true if every want key's value matches the int64 -// datapoint's attribute set. -func kvMatch(dp metricdata.DataPoint[int64], want map[string]string) bool { - for k, v := range want { - got, ok := dp.Attributes.Value(attribute.Key(k)) - if !ok || got.AsString() != v { - return false - } - } - return true -} + selftelutil "github.com/tracecoreai/tracecore/module/pkg/testutil/selftel" +) // TestOtlphttp_NoopAlwaysSafe pins: newNoopSelfExporter returns a // value whose hot-path methods never panic and silently discard. Every @@ -120,7 +53,7 @@ func TestOtlphttp_NewExporter_NilProviderErrors(t *testing.T) { // drops the kind label, the component_id label, the result label, or the // metric-name prefix fails here. func TestOtlphttp_EmitsCallsTotal_WithResultKindAndComponentID(t *testing.T) { - mp, rdr := newTestMeterProvider(t) + mp, rdr := selftelutil.NewTestMeterProvider(t) se, err := newSelfExporter(testID(), mp) if err != nil { t.Fatalf("newSelfExporter: %v", err) @@ -131,10 +64,10 @@ func TestOtlphttp_EmitsCallsTotal_WithResultKindAndComponentID(t *testing.T) { se.IncCallFailure(kindIO) se.IncCallFailure(kindDownstream) - rm := collectRM(t, rdr) - m, ok := findInstrument(rm, "otelcol.exporter.otlphttp.calls_total") + rm := selftelutil.CollectRM(t, rdr) + m, ok := selftelutil.FindInstrument(rm, "otelcol.exporter.otlphttp.calls_total") if !ok { - t.Fatalf("metric otelcol.exporter.otlphttp.calls_total absent; have: %s", dumpNames(rm)) + t.Fatalf("metric otelcol.exporter.otlphttp.calls_total absent; have: %s", selftelutil.DumpNames(rm)) } sum, ok := m.Data.(metricdata.Sum[int64]) if !ok { @@ -146,7 +79,7 @@ func TestOtlphttp_EmitsCallsTotal_WithResultKindAndComponentID(t *testing.T) { // budget. component_id is asserted per-datapoint inside the loop. got := map[string]int{} for _, dp := range sum.DataPoints { - if !kvMatch(dp, map[string]string{"component_id": "otlphttp/test"}) { + if !selftelutil.KVMatch(dp, map[string]string{"component_id": "otlphttp/test"}) { t.Errorf("datapoint missing component_id=otlphttp/test: %v", dp.Attributes) continue } @@ -173,14 +106,14 @@ func TestOtlphttp_EmitsCallsTotal_WithResultKindAndComponentID(t *testing.T) { // the PR-B1-style decision (vs reusing the deleted internal/selftelemetry // scope) so a future drift back to the internal name fails here. func TestOtlphttp_ScopeNameIsExporterImportPath(t *testing.T) { - mp, rdr := newTestMeterProvider(t) + mp, rdr := selftelutil.NewTestMeterProvider(t) se, err := newSelfExporter(testID(), mp) if err != nil { t.Fatalf("newSelfExporter: %v", err) } se.IncCallSuccess() - rm := collectRM(t, rdr) - scope, ok := scopeOf(rm, "otelcol.exporter.otlphttp.calls_total") + rm := selftelutil.CollectRM(t, rdr) + scope, ok := selftelutil.ScopeOf(rm, "otelcol.exporter.otlphttp.calls_total") if !ok { t.Fatalf("calls_total absent") } @@ -197,13 +130,13 @@ func TestOtlphttp_ScopeNameIsExporterImportPath(t *testing.T) { // only signal that an exporter fell back to noop telemetry; dropping the // recordInitError call must fail this test. func TestOtlphttp_RecordInitError_TicksInitErrorsCounter(t *testing.T) { - mp, rdr := newTestMeterProvider(t) + mp, rdr := selftelutil.NewTestMeterProvider(t) recordInitError(context.Background(), mp, "exporter", testID().String(), reasonInstrumentRegister) - rm := collectRM(t, rdr) - m, ok := findInstrument(rm, "otelcol.selftelemetry.init_errors_total") + rm := selftelutil.CollectRM(t, rdr) + m, ok := selftelutil.FindInstrument(rm, "otelcol.selftelemetry.init_errors_total") if !ok { - t.Fatalf("init_errors_total absent; have: %s", dumpNames(rm)) + t.Fatalf("init_errors_total absent; have: %s", selftelutil.DumpNames(rm)) } sum, ok := m.Data.(metricdata.Sum[int64]) if !ok { @@ -218,7 +151,7 @@ func TestOtlphttp_RecordInitError_TicksInitErrorsCounter(t *testing.T) { "component_id": "otlphttp/test", "reason": reasonInstrumentRegister, } - if !kvMatch(dp, want) { + if !selftelutil.KVMatch(dp, want) { t.Errorf("init_errors attrs: got %v, want %v", dp.Attributes, want) } if dp.Value != 1 { @@ -246,8 +179,8 @@ func TestOtlphttp_RecordInitError_NilProviderIsSafe(t *testing.T) { // tick otelcol.selftelemetry.init_errors_total via recordInitError. // Mirrors the stdoutexporter sibling test seam. func TestOtlphttp_FallsBackToNoopWhenMeterFails(t *testing.T) { - mp, rdr := newTestMeterProvider(t) - failing := &failingExporterMP{real: mp} + mp, rdr := selftelutil.NewTestMeterProvider(t) + failing := selftelutil.NewFailingMeterProvider(mp, "otelcol.exporter.otlphttp.") set := testSettings() set.MeterProvider = failing @@ -267,15 +200,15 @@ func TestOtlphttp_FallsBackToNoopWhenMeterFails(t *testing.T) { exp.telemetry.IncCallSuccess() exp.telemetry.IncCallFailure(kindIO) - rm := collectRM(t, rdr) - if m, ok := findInstrument(rm, "otelcol.exporter.otlphttp.calls_total"); ok { + rm := selftelutil.CollectRM(t, rdr) + if m, ok := selftelutil.FindInstrument(rm, "otelcol.exporter.otlphttp.calls_total"); ok { if sum, ok := m.Data.(metricdata.Sum[int64]); ok && len(sum.DataPoints) > 0 { t.Errorf("noop fallback leaked Inc* into calls_total datapoints: %v", sum.DataPoints) } } - m, ok := findInstrument(rm, "otelcol.selftelemetry.init_errors_total") + m, ok := selftelutil.FindInstrument(rm, "otelcol.selftelemetry.init_errors_total") if !ok { - t.Fatalf("init_errors_total absent after factory fallback; have: %s", dumpNames(rm)) + t.Fatalf("init_errors_total absent after factory fallback; have: %s", selftelutil.DumpNames(rm)) } sum, ok := m.Data.(metricdata.Sum[int64]) if !ok { @@ -297,7 +230,7 @@ func TestOtlphttp_FallsBackToNoopWhenMeterFails(t *testing.T) { // wired but instrument registration failed; a nil provider means the // operator opted out of telemetry entirely, so a phantom counter would // be noise. Mirrors `TestOtlphttp_FallsBackToNoopWhenMeterFails` minus -// the failingExporterMP wrapper. +// the failing wrapper. func TestOtlphttp_FallsBackToNoopWhenMeterProviderIsNil(t *testing.T) { set := testSettings() set.MeterProvider = nil @@ -373,45 +306,3 @@ func testSettings() exporter.Settings { func testID() component.ID { return component.NewIDWithName(componentType(), "test") } - -func dumpNames(rm metricdata.ResourceMetrics) string { - var b strings.Builder - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - fmt.Fprintf(&b, " %s@%s", m.Name, sm.Scope.Name) - } - } - return b.String() -} - -// failingExporterMP wraps a real MeterProvider but fails every instrument -// registration whose name starts with "otelcol.exporter.otlphttp.". -// Mirrors the stdoutexporter sibling test seam so a future refactor that -// reorders the newSelfExporter constructor doesn't silently bypass coverage. -type failingExporterMP struct { - embedded.MeterProvider - real metric.MeterProvider -} - -func (p *failingExporterMP) Meter(name string, opts ...metric.MeterOption) metric.Meter { - return &failingExporterMeter{Meter: p.real.Meter(name, opts...)} -} - -type failingExporterMeter struct { - metric.Meter -} - -const exporterInstrumentPrefix = "otelcol.exporter.otlphttp." - -var errSyntheticExporterFailure = errors.New("synthetic: exporter instrument registration failed") - -func (m *failingExporterMeter) Int64Counter(name string, opts ...metric.Int64CounterOption) (metric.Int64Counter, error) { - if strings.HasPrefix(name, exporterInstrumentPrefix) { - return nil, errSyntheticExporterFailure - } - c, err := m.Meter.Int64Counter(name, opts...) - if err != nil { - return nil, fmt.Errorf("failingExporterMeter passthrough: %w", err) - } - return c, nil -} diff --git a/components/exporters/stdoutexporter/selftel.go b/components/exporters/stdoutexporter/selftel.go index 53d01496..48baf415 100644 --- a/components/exporters/stdoutexporter/selftel.go +++ b/components/exporters/stdoutexporter/selftel.go @@ -1,35 +1,34 @@ // SPDX-License-Identifier: Apache-2.0 -// Exporter-scoped self-telemetry surface. Replaces the v0.1.x -// dependency on `internal/selftelemetry`. Metric names follow the -// upstream OTel collector `otelcol___` -// convention per RFC-0013 §migration v0.1.0 namespace alignment: +// Exporter-scoped self-telemetry surface. Thin wrapper over +// module/pkg/selftel that pins this exporter's scope-name + instrument +// name + the kind enum. Metric names follow the upstream OTel collector +// `otelcol___` convention per RFC-0013 +// §migration v0.1.0 namespace alignment: // `otelcol.exporter.stdoutexporter.calls_total{result,kind,component_id}` -// (Prometheus exporter renders the dots as underscores). Label shape -// is preserved (`component_id`) so multi-instance disambiguation in -// dashboards is unchanged from v0.1.x. The instrumentation scope name -// is THIS exporter's Go import path — when the exporter moves under -// `module/` in PR-I, the scope name moves with it, matching OTel -// convention. +// (the Prometheus exporter renders the dots as underscores). Label +// shape is preserved (`component_id`) so multi-instance disambiguation +// in dashboards is unchanged from v0.1.x. The instrumentation scope +// name is THIS exporter's Go import path. // -// Mirrors `module/receiver/ncclfrreceiver/selftel.go` (PR-B1 / PR-B2; moved out of components/ in PR-I.1b). +// Mirrors `components/exporters/otlphttp/selftel.go`. Shared plumbing +// (the OTel counter, the noop fallback, the init-error fallback +// counter) lives in module/pkg/selftel. package stdoutexporter import ( "context" - "errors" - "fmt" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + + "github.com/tracecoreai/tracecore/module/pkg/selftel" ) // kind is a low-cardinality error-class identifier for exporter failures. -// Mirrors the internal/selftelemetry.Kind type so the migration is -// mechanical; exporter-local because the canonical-Kind enforcement that -// the internal package owned moves into RFC-0013 PR-I's submodule. +// Mirrors the v0.1.x internal/selftelemetry.Kind type; exporter-local +// so the wire-format strings stay owned by the package that emits them. type kind string const ( @@ -41,40 +40,29 @@ const ( kindIO kind = "io" ) -// reasonInstrumentRegister labels init_errors_total ticks when OTel -// instrument registration failed at construction time. -const reasonInstrumentRegister = "instrument_register" - // instrumentationScope pins the OTel scope name. Per OTel convention, -// the scope is the package's Go import path; PR-I changes this when -// the exporter moves into the module/ submodule. +// the scope is the package's Go import path. const instrumentationScope = "github.com/tracecoreai/tracecore/components/exporters/stdoutexporter" -// errNilMeterProvider mirrors selftelemetry.ErrNilMeterProvider — the -// factory is responsible for substituting the noop fallback + ticking -// init_errors_total. Returning a sentinel rather than a generic error -// lets the factory distinguish "wire-up bug" from "instrument register -// failure" if it ever needs to. -var errNilMeterProvider = errors.New("stdoutexporter: MeterProvider is nil") - -// selfExporter is the exporter-scoped self-health surface. Methods are -// non-blocking + safe for concurrent use; the noop impl discards. -// Mirrors the internal/selftelemetry.Exporter interface but trimmed to -// the exact surface stdoutexporter uses — no FailureRateReader (the -// per-exporter failure_rate aggregation contract is intentionally -// dropped by this port; see the package comment on SelfExporter -// removal below). -// -// Why drop FailureRateReader / ExporterCarrier: -// - stdoutexporter is the canonical debug / example exporter; it -// writes JSON lines to a configured io.Writer (stdout in -// production). Operators don't alert on its failure_rate. -// - `internal/selftelemetry` (which owned the ExporterCarrier -// interface) was deleted in RFC-0013 PR-F.1, so any code that -// referenced it had to be removed anyway. The -// `otelcol_exporter_stdoutexporter_calls_total` counter is the -// only surfaced signal — operators rate-derive failure rate via -// PromQL `rate(calls_total{result="error"}[5m])` instead. +// callsTotalName is the operator-facing metric name for this exporter's +// per-call counter. Kept here (not in module/pkg/selftel) so the +// shared package stays unaware of caller-specific name choices. +const callsTotalName = "otelcol.exporter.stdoutexporter.calls_total" + +// reasonInstrumentRegister is the wire-format label value for +// init_errors_total ticks when OTel instrument registration failed at +// construction time. Re-exported from the shared package so this +// package's factory + tests don't import selftel just for the const. +const reasonInstrumentRegister = selftel.ReasonInstrumentRegister + +// errNilMeterProvider is the sentinel returned by newSelfExporter when +// called with a nil MeterProvider. Aliased to the shared sentinel so +// the factory's errors.Is check survives the migration. +var errNilMeterProvider = selftel.ErrNilMeterProvider + +// selfExporter is the exporter-scoped self-health surface used by +// stdoutexporter hot paths. Mirrors selftel.Exporter but carries the +// package-local `kind` type so call sites stay type-checked. type selfExporter interface { IncCallSuccess() IncCallFailure(k kind) @@ -90,79 +78,33 @@ func (noopSelfExporter) IncCallFailure(kind) {} var _ selfExporter = noopSelfExporter{} -// newSelfExporter returns a real selfExporter backed by an OTel counter -// `otelcol.exporter.stdoutexporter.calls_total{result, kind, component_id}` acquired -// from mp. The component's id is attached as the `component_id` label -// on every emission. Metric name + label shape preserved from the -// v0.1.x internal selftelemetry package so dashboards / alerts don't -// regress. +// newSelfExporter returns a real selfExporter backed by the shared +// selftel.Exporter wired at this package's scope + calls_total name. +// Returns errNilMeterProvider (== selftel.ErrNilMeterProvider) when mp +// is nil; the factory is responsible for the noop fallback + the +// init_errors_total tick via recordInitError. func newSelfExporter(id component.ID, mp metric.MeterProvider) (selfExporter, error) { - if mp == nil { - return nil, errNilMeterProvider - } - meter := mp.Meter(instrumentationScope) - - calls, err := meter.Int64Counter( - "otelcol.exporter.stdoutexporter.calls_total", - metric.WithDescription("Exporter Consume* calls partitioned by result"), - ) + inner, err := selftel.NewExporter(id.String(), instrumentationScope, callsTotalName, mp) if err != nil { - return nil, fmt.Errorf("exporter.calls_total counter: %w", err) + return nil, err } - - return &selfExporterImpl{ - componentID: id.String(), - calls: calls, - }, nil + return &selfExporterImpl{inner: inner}, nil } -var _ selfExporter = (*selfExporterImpl)(nil) - +// selfExporterImpl casts the package-local `kind` to string at the +// shared-package seam. Zero-cost — the cast is a compile-time op. type selfExporterImpl struct { - componentID string - calls metric.Int64Counter + inner selftel.Exporter } -func (e *selfExporterImpl) IncCallSuccess() { - // Emit component_id + result in one WithAttributes call rather than - // merging two attribute sets — avoids relying on SDK merge semantics - // that vary across OTel versions. Mirrors the receiver sibling. - e.calls.Add(context.Background(), 1, metric.WithAttributes( - attribute.String("component_id", e.componentID), - attribute.String("result", "success"), - )) -} +var _ selfExporter = (*selfExporterImpl)(nil) -func (e *selfExporterImpl) IncCallFailure(k kind) { - e.calls.Add(context.Background(), 1, metric.WithAttributes( - attribute.String("component_id", e.componentID), - attribute.String("result", "failure"), - attribute.String("kind", string(k)), - )) -} +func (e *selfExporterImpl) IncCallSuccess() { e.inner.IncCallSuccess() } +func (e *selfExporterImpl) IncCallFailure(k kind) { e.inner.IncCallFailure(string(k)) } -// recordInitError ticks otelcol.selftelemetry.init_errors_total when -// exporter wiring falls back to noop telemetry. Operators alert on -// `> 0` to learn that self-telemetry isn't really plugged in. Panics -// from a broken MeterProvider are swallowed — recordInitError IS the -// degraded-path fallback; crashing here would turn a partial outage -// into a process kill. +// recordInitError forwards to the shared selftel.RecordInitError with +// this package's scope. Kept as a thin wrapper so the factory's call +// site stays identical to the pre-refactor shape. func recordInitError(ctx context.Context, mp metric.MeterProvider, kindLabel, componentID, reason string) { - defer func() { _ = recover() }() - if mp == nil { - return - } - meter := mp.Meter(instrumentationScope) - c, err := meter.Int64Counter( - "otelcol.selftelemetry.init_errors_total", - metric.WithDescription("Counter of self-telemetry construction failures that fell back to the noop implementation."), - ) - if err != nil { - return - } - c.Add(ctx, 1, metric.WithAttributes( - attribute.String("kind", kindLabel), - attribute.String("component_id", componentID), - attribute.String("reason", reason), - )) + selftel.RecordInitError(ctx, mp, instrumentationScope, kindLabel, componentID, reason) } diff --git a/components/exporters/stdoutexporter/selftel_test.go b/components/exporters/stdoutexporter/selftel_test.go index 437c3418..bf055cf1 100644 --- a/components/exporters/stdoutexporter/selftel_test.go +++ b/components/exporters/stdoutexporter/selftel_test.go @@ -6,82 +6,15 @@ import ( "bytes" "context" "errors" - "fmt" - "strings" "testing" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/embedded" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" -) - -// newTestMeterProvider builds an SDK MeterProvider backed by a ManualReader -// so tests can collect metricdata.ResourceMetrics deterministically without -// the Prometheus exporter or any internal/telemetry plumbing — the exporter -// package must stay decoupled from internal/* so PR-F can delete those -// packages without touching this test file. Mirrors the PR-B1 sibling -// pattern (`module/receiver/ncclfrreceiver/selftel_test.go`; moved out of components/ in PR-I.1b). -func newTestMeterProvider(t *testing.T) (*sdkmetric.MeterProvider, *sdkmetric.ManualReader) { - t.Helper() - rdr := sdkmetric.NewManualReader() - mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)) - t.Cleanup(func() { _ = mp.Shutdown(context.Background()) }) - return mp, rdr -} - -func collectRM(t *testing.T, rdr *sdkmetric.ManualReader) metricdata.ResourceMetrics { - t.Helper() - var rm metricdata.ResourceMetrics - if err := rdr.Collect(context.Background(), &rm); err != nil { - t.Fatalf("collect: %v", err) - } - return rm -} - -// findInstrument returns the first metricdata.Metrics whose Name matches the -// supplied OTel-dot name (e.g. "otelcol.exporter.stdoutexporter.calls_total"). Returns -// (nil, false) if absent. Scope-agnostic: walks all scope metrics. -func findInstrument(rm metricdata.ResourceMetrics, name string) (metricdata.Metrics, bool) { - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - if m.Name == name { - return m, true - } - } - } - return metricdata.Metrics{}, false -} - -// scopeOf returns the instrumentation scope name that emitted the supplied -// metric name. Used to pin the scope-name standard for PR-B1 (sibling -// exporter port). -func scopeOf(rm metricdata.ResourceMetrics, name string) (string, bool) { - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - if m.Name == name { - return sm.Scope.Name, true - } - } - } - return "", false -} -// kvMatch returns true if every want key's value matches the int64 -// datapoint's attribute set. -func kvMatch(dp metricdata.DataPoint[int64], want map[string]string) bool { - for k, v := range want { - got, ok := dp.Attributes.Value(attribute.Key(k)) - if !ok || got.AsString() != v { - return false - } - } - return true -} + selftelutil "github.com/tracecoreai/tracecore/module/pkg/testutil/selftel" +) // TestSelfTelemetry_NoopAlwaysSafe pins: newNoopSelfExporter returns a // value whose hot-path methods never panic and silently discard. Every @@ -119,7 +52,7 @@ func TestSelfTelemetry_NewExporter_NilProviderErrors(t *testing.T) { // component_id. A regression that drops the kind label, the component_id // label, the result label, or the metric-name prefix fails here. func TestSelfTelemetry_EmitsCallsTotal_WithResultKindAndComponentID(t *testing.T) { - mp, rdr := newTestMeterProvider(t) + mp, rdr := selftelutil.NewTestMeterProvider(t) se, err := newSelfExporter(testID(), mp) if err != nil { t.Fatalf("newSelfExporter: %v", err) @@ -129,10 +62,10 @@ func TestSelfTelemetry_EmitsCallsTotal_WithResultKindAndComponentID(t *testing.T se.IncCallFailure(kindMarshal) se.IncCallFailure(kindIO) - rm := collectRM(t, rdr) - m, ok := findInstrument(rm, "otelcol.exporter.stdoutexporter.calls_total") + rm := selftelutil.CollectRM(t, rdr) + m, ok := selftelutil.FindInstrument(rm, "otelcol.exporter.stdoutexporter.calls_total") if !ok { - t.Fatalf("metric otelcol.exporter.stdoutexporter.calls_total absent; have: %s", dumpNames(rm)) + t.Fatalf("metric otelcol.exporter.stdoutexporter.calls_total absent; have: %s", selftelutil.DumpNames(rm)) } sum, ok := m.Data.(metricdata.Sum[int64]) if !ok { @@ -144,7 +77,7 @@ func TestSelfTelemetry_EmitsCallsTotal_WithResultKindAndComponentID(t *testing.T // budget. component_id is asserted per-datapoint inside the loop. got := map[string]int{} for _, dp := range sum.DataPoints { - if !kvMatch(dp, map[string]string{"component_id": "stdoutexporter/test"}) { + if !selftelutil.KVMatch(dp, map[string]string{"component_id": "stdoutexporter/test"}) { t.Errorf("datapoint missing component_id=stdoutexporter/test: %v", dp.Attributes) continue } @@ -170,14 +103,14 @@ func TestSelfTelemetry_EmitsCallsTotal_WithResultKindAndComponentID(t *testing.T // the PR-B1-style decision (vs reusing the deleted internal/selftelemetry // scope) so a future drift back to the internal name fails here. func TestSelfTelemetry_ScopeNameIsExporterImportPath(t *testing.T) { - mp, rdr := newTestMeterProvider(t) + mp, rdr := selftelutil.NewTestMeterProvider(t) se, err := newSelfExporter(testID(), mp) if err != nil { t.Fatalf("newSelfExporter: %v", err) } se.IncCallSuccess() - rm := collectRM(t, rdr) - scope, ok := scopeOf(rm, "otelcol.exporter.stdoutexporter.calls_total") + rm := selftelutil.CollectRM(t, rdr) + scope, ok := selftelutil.ScopeOf(rm, "otelcol.exporter.stdoutexporter.calls_total") if !ok { t.Fatalf("calls_total absent") } @@ -194,13 +127,13 @@ func TestSelfTelemetry_ScopeNameIsExporterImportPath(t *testing.T) { // only signal that an exporter fell back to noop telemetry; dropping the // recordInitError call must fail this test. func TestRecordInitError_TicksInitErrorsCounter(t *testing.T) { - mp, rdr := newTestMeterProvider(t) + mp, rdr := selftelutil.NewTestMeterProvider(t) recordInitError(context.Background(), mp, "exporter", testID().String(), reasonInstrumentRegister) - rm := collectRM(t, rdr) - m, ok := findInstrument(rm, "otelcol.selftelemetry.init_errors_total") + rm := selftelutil.CollectRM(t, rdr) + m, ok := selftelutil.FindInstrument(rm, "otelcol.selftelemetry.init_errors_total") if !ok { - t.Fatalf("init_errors_total absent; have: %s", dumpNames(rm)) + t.Fatalf("init_errors_total absent; have: %s", selftelutil.DumpNames(rm)) } sum, ok := m.Data.(metricdata.Sum[int64]) if !ok { @@ -215,7 +148,7 @@ func TestRecordInitError_TicksInitErrorsCounter(t *testing.T) { "component_id": "stdoutexporter/test", "reason": reasonInstrumentRegister, } - if !kvMatch(dp, want) { + if !selftelutil.KVMatch(dp, want) { t.Errorf("init_errors attrs: got %v, want %v", dp.Attributes, want) } if dp.Value != 1 { @@ -243,8 +176,8 @@ func TestRecordInitError_NilProviderIsSafe(t *testing.T) { // tick otelcol.selftelemetry.init_errors_total via recordInitError. // Mirrors the nccl_fr sibling test seam. func TestFactory_FallsBackToNoopWhenMeterFails(t *testing.T) { - mp, rdr := newTestMeterProvider(t) - failing := &failingExporterMP{real: mp} + mp, rdr := selftelutil.NewTestMeterProvider(t) + failing := selftelutil.NewFailingMeterProvider(mp, "otelcol.exporter.stdoutexporter.") set := exportertest.NewNopSettings(componentType()) set.ID = component.NewIDWithName(componentType(), "test") @@ -265,15 +198,15 @@ func TestFactory_FallsBackToNoopWhenMeterFails(t *testing.T) { exp.telemetry.IncCallSuccess() exp.telemetry.IncCallFailure(kindIO) - rm := collectRM(t, rdr) - if m, ok := findInstrument(rm, "otelcol.exporter.stdoutexporter.calls_total"); ok { + rm := selftelutil.CollectRM(t, rdr) + if m, ok := selftelutil.FindInstrument(rm, "otelcol.exporter.stdoutexporter.calls_total"); ok { if sum, ok := m.Data.(metricdata.Sum[int64]); ok && len(sum.DataPoints) > 0 { t.Errorf("noop fallback leaked Inc* into calls_total datapoints: %v", sum.DataPoints) } } - m, ok := findInstrument(rm, "otelcol.selftelemetry.init_errors_total") + m, ok := selftelutil.FindInstrument(rm, "otelcol.selftelemetry.init_errors_total") if !ok { - t.Fatalf("init_errors_total absent after factory fallback; have: %s", dumpNames(rm)) + t.Fatalf("init_errors_total absent after factory fallback; have: %s", selftelutil.DumpNames(rm)) } sum, ok := m.Data.(metricdata.Sum[int64]) if !ok { @@ -355,48 +288,6 @@ func testID() component.ID { return component.NewIDWithName(componentType(), "test") } -func dumpNames(rm metricdata.ResourceMetrics) string { - var b strings.Builder - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - fmt.Fprintf(&b, " %s@%s", m.Name, sm.Scope.Name) - } - } - return b.String() -} - -// failingExporterMP wraps a real MeterProvider but fails every instrument -// registration whose name starts with "otelcol.exporter.stdoutexporter.". -// Mirrors the nccl_fr sibling test seam so a future refactor that reorders -// the newSelfExporter constructor doesn't silently bypass coverage. -type failingExporterMP struct { - embedded.MeterProvider - real metric.MeterProvider -} - -func (p *failingExporterMP) Meter(name string, opts ...metric.MeterOption) metric.Meter { - return &failingExporterMeter{Meter: p.real.Meter(name, opts...)} -} - -type failingExporterMeter struct { - metric.Meter -} - -const exporterInstrumentPrefix = "otelcol.exporter.stdoutexporter." - -var errSyntheticExporterFailure = errors.New("synthetic: exporter instrument registration failed") - -func (m *failingExporterMeter) Int64Counter(name string, opts ...metric.Int64CounterOption) (metric.Int64Counter, error) { - if strings.HasPrefix(name, exporterInstrumentPrefix) { - return nil, errSyntheticExporterFailure - } - c, err := m.Meter.Int64Counter(name, opts...) - if err != nil { - return nil, fmt.Errorf("failingExporterMeter passthrough: %w", err) - } - return c, nil -} - // Compile-time assertion: NewFactory returns an exporter.Factory. // Pins the upstream-type contract that PR-B established — if a future // refactor regresses to internal/pipeline.ExporterFactory, this fails diff --git a/components/receivers/pyspy/selftel.go b/components/receivers/pyspy/selftel.go index 5350b770..9f1543ec 100644 --- a/components/receivers/pyspy/selftel.go +++ b/components/receivers/pyspy/selftel.go @@ -1,52 +1,66 @@ // SPDX-License-Identifier: Apache-2.0 -// Receiver-scoped self-telemetry surface. Replaces the v0.1.x -// dependency on `internal/selftelemetry`. Metric names follow the -// upstream OTel collector `otelcol___` -// convention per RFC-0013 §migration v0.1.0 namespace alignment: -// `otelcol.receiver.pyspy.errors_total{kind,component_id}` and -// siblings (Prometheus exporter renders the dots as underscores). -// Label shape is preserved (`component_id`) so multi-instance -// disambiguation in dashboards is unchanged from v0.1.x. The -// instrumentation scope name is THIS receiver's Go import path — -// when the receiver moves to `module/receiver/pyspyreceiver/` in -// PR-I.1, the scope name moves with it, matching OTel convention. +// Receiver-scoped self-telemetry surface. Thin wrapper over +// module/pkg/selftel that pins this receiver's scope-name + instrument +// names + the kind enum. Metric names follow the upstream OTel +// collector `otelcol___` convention per +// RFC-0013 §migration v0.1.0 namespace alignment: +// `otelcol.receiver.pyspy.errors_total{kind,component_id}` and siblings +// (Prometheus exporter renders the dots as underscores). Label shape +// is preserved (`component_id`) so multi-instance disambiguation in +// dashboards is unchanged from v0.1.x. The instrumentation scope name +// is THIS receiver's Go import path. +// +// Shared plumbing (the OTel counters/histogram/observables, the +// degraded-state atomic machinery, the noop fallback, the init-error +// fallback counter) lives in module/pkg/selftel; this file owns only +// the naming + the package-local interface shape. The `kind` enum +// lives in kinds.go (kept separate for RFC-0009 §Degraded-modes parity +// tests). package pyspy import ( "context" - "errors" - "fmt" - "sync/atomic" "time" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" -) -// reasonInstrumentRegister labels init_errors_total ticks when OTel -// instrument registration failed at construction time. -const reasonInstrumentRegister = "instrument_register" + "github.com/tracecoreai/tracecore/module/pkg/selftel" +) // instrumentationScope pins the OTel scope name. Per OTel convention, -// the scope is the package's Go import path; PR-I.1 changes this when -// the receiver moves to module/receiver/pyspyreceiver/. +// the scope is the package's Go import path. const instrumentationScope = "github.com/tracecoreai/tracecore/components/receivers/pyspy" -// errNilMeterProvider mirrors selftelemetry.ErrNilMeterProvider — the -// factory is responsible for substituting the noop fallback + ticking -// init_errors_total. Returning a sentinel rather than a generic error -// lets the factory distinguish "wire-up bug" from "instrument register -// failure" if it ever needs to. -var errNilMeterProvider = errors.New("pyspy: MeterProvider is nil") - -// selfTelemetry is the receiver-scoped self-health surface. Methods are -// non-blocking + safe for concurrent use; the noop impl discards. -// Mirrors the internal/selftelemetry.Receiver interface 1:1 so the -// migration is mechanical — pyspy hot paths already call IncError / -// IncEmissions / ObserveLatency / SetDegraded / MarkActivity. +// reasonInstrumentRegister is the wire-format label value for +// init_errors_total ticks when OTel instrument registration failed at +// construction time. Re-exported from the shared package so this +// package's factory + tests don't import selftel just for the const. +const reasonInstrumentRegister = selftel.ReasonInstrumentRegister + +// errNilMeterProvider is the sentinel returned by newSelfTelemetry +// when called with a nil MeterProvider. Aliased to the shared sentinel +// so the factory's errors.Is check survives the migration. +var errNilMeterProvider = selftel.ErrNilMeterProvider + +// receiverInstrumentNames pins the operator-facing instrument names +// for this receiver's five OTel instruments. Kept here (not in the +// shared package) so module/pkg/selftel stays unaware of caller-specific +// name choices. +var receiverInstrumentNames = selftel.ReceiverInstrumentNames{ + ErrorsTotal: "otelcol.receiver.pyspy.errors_total", + EmissionsTotal: "otelcol.receiver.pyspy.emissions_total", + CollectionLatencySeconds: "otelcol.receiver.pyspy.collection_latency_seconds", + DegradedSecondsTotal: "otelcol.receiver.pyspy.degraded_seconds_total", + LastActivityUnixSeconds: "otelcol.receiver.pyspy.last_activity_unix_seconds", +} + +// selfTelemetry is the receiver-scoped self-health surface used by +// pyspy hot paths. Mirrors selftel.Receiver but carries the +// package-local `kind` type (declared in kinds.go) so call sites stay +// type-checked. type selfTelemetry interface { IncError(k kind) IncEmissions(n int64) @@ -68,182 +82,36 @@ func (noopSelfTelemetry) MarkActivity() {} var _ selfTelemetry = noopSelfTelemetry{} -// newSelfTelemetry returns a real selfTelemetry backed by OTel metric -// instruments acquired from mp. The component's id is attached as the -// `component_id` label on every emission. Registers the same five -// instruments the v0.1.x internal selftelemetry package registered, so -// scraped metric names + label shape are unchanged. +// newSelfTelemetry returns a real selfTelemetry backed by the shared +// selftel.Receiver wired at this package's scope + instrument names. +// Returns errNilMeterProvider (== selftel.ErrNilMeterProvider) when mp +// is nil; the factory is responsible for the noop fallback + the +// init_errors_total tick via recordInitError. func newSelfTelemetry(id component.ID, mp metric.MeterProvider) (selfTelemetry, error) { - if mp == nil { - return nil, errNilMeterProvider - } - meter := mp.Meter(instrumentationScope) - attrSet := attribute.NewSet(attribute.String("component_id", id.String())) - - errsCtr, err := meter.Int64Counter( - "otelcol.receiver.pyspy.errors_total", - metric.WithDescription("Errors observed by a receiver, partitioned by kind"), - ) - if err != nil { - return nil, fmt.Errorf("errors_total counter: %w", err) - } - emissionsCtr, err := meter.Int64Counter( - "otelcol.receiver.pyspy.emissions_total", - metric.WithDescription("Data points / events emitted by a receiver"), - ) - if err != nil { - return nil, fmt.Errorf("emissions_total counter: %w", err) - } - latencyHist, err := meter.Float64Histogram( - "otelcol.receiver.pyspy.collection_latency_seconds", - metric.WithDescription("Receiver collection cycle latency in seconds"), - metric.WithUnit("s"), - // Bucket boundaries chosen for sub-millisecond dump-poll cycles - // up to 10s slow paths; mirrors the internal/selftelemetry - // shape so histograms remain comparable across the migration. - metric.WithExplicitBucketBoundaries( - 0.0001, 0.001, 0.005, 0.01, 0.05, - 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, - ), - ) + inner, err := selftel.NewReceiver(id.String(), instrumentationScope, receiverInstrumentNames, mp) if err != nil { - return nil, fmt.Errorf("collection_latency_seconds histogram: %w", err) - } - - st := &selfTelemetryImpl{ - componentID: id.String(), - attrs: attrSet, - errors: errsCtr, - emissions: emissionsCtr, - latency: latencyHist, + return nil, err } - // Seed last-activity to construction time so a `time() - last_activity - // > N` alert doesn't fire on the zero-valued gauge during boot. - st.activityUnix.Store(time.Now().Unix()) - - if _, err := meter.Float64ObservableCounter( - "otelcol.receiver.pyspy.degraded_seconds_total", - metric.WithDescription("Cumulative seconds the receiver has been in the degraded state"), - metric.WithUnit("s"), - metric.WithFloat64Callback(func(_ context.Context, obs metric.Float64Observer) error { - obs.Observe(st.degradedTotalSeconds(), metric.WithAttributeSet(attrSet)) - return nil - }), - ); err != nil { - return nil, fmt.Errorf("degraded_seconds_total observable: %w", err) - } - - if _, err := meter.Int64ObservableGauge( - "otelcol.receiver.pyspy.last_activity_unix_seconds", - metric.WithDescription("Unix-second timestamp of the receiver's last successful activity"), - metric.WithInt64Callback(func(_ context.Context, obs metric.Int64Observer) error { - obs.Observe(st.activityUnix.Load(), metric.WithAttributeSet(attrSet)) - return nil - }), - ); err != nil { - return nil, fmt.Errorf("last_activity_unix_seconds observable: %w", err) - } - - return st, nil + return &selfTelemetryImpl{inner: inner}, nil } -var _ selfTelemetry = (*selfTelemetryImpl)(nil) - +// selfTelemetryImpl casts the package-local `kind` to string at the +// shared-package seam. Zero-cost — the cast is a compile-time op. type selfTelemetryImpl struct { - componentID string - attrs attribute.Set - errors metric.Int64Counter - emissions metric.Int64Counter - latency metric.Float64Histogram - - // degradedAt holds the time of the most recent SetDegraded(true); - // nil pointer = not currently degraded. Atomic so SetDegraded is - // lock-free and the observable callback reads a stable snapshot. - degradedAt atomic.Pointer[time.Time] - - // accumulated holds nanoseconds spent degraded across completed - // degrade→recover cycles; degradedTotalSeconds adds the open - // interval at observation time. - accumulated atomic.Uint64 - - // activityUnix holds the Unix-second timestamp of the most recent - // MarkActivity (seeded to construction time). - activityUnix atomic.Int64 + inner selftel.Receiver } -func (s *selfTelemetryImpl) IncError(k kind) { - // Emit component_id + kind in one WithAttributes call rather than - // merging two attribute sets — avoids relying on SDK merge semantics - // that vary across OTel versions. - s.errors.Add(context.Background(), 1, metric.WithAttributes( - attribute.String("component_id", s.componentID), - attribute.String("kind", string(k)), - )) -} - -func (s *selfTelemetryImpl) IncEmissions(n int64) { - if n < 0 { - return - } - s.emissions.Add(context.Background(), n, metric.WithAttributeSet(s.attrs)) -} - -func (s *selfTelemetryImpl) ObserveLatency(d time.Duration) { - s.latency.Record(context.Background(), d.Seconds(), metric.WithAttributeSet(s.attrs)) -} - -// SetDegraded transitions degraded state. Lock-free: enter via -// CAS(nil → &now), exit via Swap → nil + accumulate the elapsed -// interval. Microsecond-scale under-count on concurrent transitions is -// tolerated; self-corrects on the next scrape. -func (s *selfTelemetryImpl) SetDegraded(degraded bool) { - if degraded { - now := time.Now() - s.degradedAt.CompareAndSwap(nil, &now) - return - } - if old := s.degradedAt.Swap(nil); old != nil { - elapsed := time.Since(*old) - if elapsed > 0 { - s.accumulated.Add(uint64(elapsed.Nanoseconds())) - } - } -} +var _ selfTelemetry = (*selfTelemetryImpl)(nil) -func (s *selfTelemetryImpl) MarkActivity() { - s.activityUnix.Store(time.Now().Unix()) -} +func (s *selfTelemetryImpl) IncError(k kind) { s.inner.IncError(string(k)) } +func (s *selfTelemetryImpl) IncEmissions(n int64) { s.inner.IncEmissions(n) } +func (s *selfTelemetryImpl) ObserveLatency(d time.Duration) { s.inner.ObserveLatency(d) } +func (s *selfTelemetryImpl) SetDegraded(degraded bool) { s.inner.SetDegraded(degraded) } +func (s *selfTelemetryImpl) MarkActivity() { s.inner.MarkActivity() } -func (s *selfTelemetryImpl) degradedTotalSeconds() float64 { - acc := time.Duration(s.accumulated.Load()) - if openStart := s.degradedAt.Load(); openStart != nil { - acc += time.Since(*openStart) - } - return acc.Seconds() -} - -// recordInitError ticks otelcol.selftelemetry.init_errors_total when -// receiver wiring falls back to noop telemetry. Operators alert on -// `> 0` to learn that self-telemetry isn't really plugged in. Panics -// from a broken MeterProvider are swallowed — recordInitError IS the -// degraded-path fallback; crashing here would turn a partial outage -// into a process kill. +// recordInitError forwards to the shared selftel.RecordInitError with +// this package's scope. Kept as a thin wrapper so the factory's call +// site stays identical to the pre-refactor shape. func recordInitError(ctx context.Context, mp metric.MeterProvider, kindLabel, componentID, reason string) { - defer func() { _ = recover() }() - if mp == nil { - return - } - meter := mp.Meter(instrumentationScope) - c, err := meter.Int64Counter( - "otelcol.selftelemetry.init_errors_total", - metric.WithDescription("Counter of self-telemetry construction failures that fell back to the noop implementation."), - ) - if err != nil { - return - } - c.Add(ctx, 1, metric.WithAttributes( - attribute.String("kind", kindLabel), - attribute.String("component_id", componentID), - attribute.String("reason", reason), - )) + selftel.RecordInitError(ctx, mp, instrumentationScope, kindLabel, componentID, reason) } diff --git a/components/receivers/pyspy/selftel_test.go b/components/receivers/pyspy/selftel_test.go index 818d79d1..ca41c3a5 100644 --- a/components/receivers/pyspy/selftel_test.go +++ b/components/receivers/pyspy/selftel_test.go @@ -5,79 +5,14 @@ package pyspy import ( "context" "errors" - "fmt" - "strings" "testing" "time" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/embedded" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" -) - -// newTestMeterProvider builds an SDK MeterProvider backed by a ManualReader -// so tests can collect metricdata.ResourceMetrics deterministically without -// the Prometheus exporter or any internal/telemetry plumbing — the receiver -// package must stay decoupled from internal/* so PR-F can delete those -// packages without touching this test file. -func newTestMeterProvider(t *testing.T) (*sdkmetric.MeterProvider, *sdkmetric.ManualReader) { - t.Helper() - rdr := sdkmetric.NewManualReader() - mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)) - t.Cleanup(func() { _ = mp.Shutdown(context.Background()) }) - return mp, rdr -} -func collectMetrics(t *testing.T, rdr *sdkmetric.ManualReader) metricdata.ResourceMetrics { - t.Helper() - var rm metricdata.ResourceMetrics - if err := rdr.Collect(context.Background(), &rm); err != nil { - t.Fatalf("collect: %v", err) - } - return rm -} - -// findInstrument returns the first metricdata.Metrics whose Name matches the -// supplied OTel-dot name (e.g. "otelcol.receiver.pyspy.errors_total"). Returns -// (nil, false) if absent. Scope-agnostic: walks all scope metrics. -func findInstrument(rm metricdata.ResourceMetrics, name string) (metricdata.Metrics, bool) { - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - if m.Name == name { - return m, true - } - } - } - return metricdata.Metrics{}, false -} - -// scopeOf returns the instrumentation scope name that emitted the supplied -// metric name. Used to pin the scope-name standard for PR-F sibling ports. -func scopeOf(rm metricdata.ResourceMetrics, name string) (string, bool) { - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - if m.Name == name { - return sm.Scope.Name, true - } - } - } - return "", false -} - -// kvMatch returns true if every want key's value matches the int64 -// datapoint's attribute set. -func kvMatch(dp metricdata.DataPoint[int64], want map[string]string) bool { - for k, v := range want { - got, ok := dp.Attributes.Value(attribute.Key(k)) - if !ok || got.AsString() != v { - return false - } - } - return true -} + selftelutil "github.com/tracecoreai/tracecore/module/pkg/testutil/selftel" +) // TestPyspy_NoopAlwaysSafe pins: newNoopSelfTelemetry returns a // value whose hot-path methods never panic and silently discard. Every @@ -120,7 +55,7 @@ func TestPyspy_NewReceiver_NilProviderErrors(t *testing.T) { // regression that drops the kind label, the component_id label, or the // metric-name prefix fails here. func TestPyspy_EmitsErrorsTotal_WithKindAndComponentID(t *testing.T) { - mp, rdr := newTestMeterProvider(t) + mp, rdr := selftelutil.NewTestMeterProvider(t) st, err := newSelfTelemetry(testID(), mp) if err != nil { t.Fatalf("newSelfTelemetry: %v", err) @@ -129,10 +64,10 @@ func TestPyspy_EmitsErrorsTotal_WithKindAndComponentID(t *testing.T) { st.IncError(kindTargetGone) st.IncError(kindParseError) - rm := collectMetrics(t, rdr) - m, ok := findInstrument(rm, "otelcol.receiver.pyspy.errors_total") + rm := selftelutil.CollectRM(t, rdr) + m, ok := selftelutil.FindInstrument(rm, "otelcol.receiver.pyspy.errors_total") if !ok { - t.Fatalf("metric otelcol.receiver.pyspy.errors_total absent; have: %s", dumpNames(rm)) + t.Fatalf("metric otelcol.receiver.pyspy.errors_total absent; have: %s", selftelutil.DumpNames(rm)) } sum, ok := m.Data.(metricdata.Sum[int64]) if !ok { @@ -141,7 +76,7 @@ func TestPyspy_EmitsErrorsTotal_WithKindAndComponentID(t *testing.T) { gotGone, foundGone := 0, false gotParse, foundParse := 0, false for _, dp := range sum.DataPoints { - if !kvMatch(dp, map[string]string{"component_id": "pyspy/test"}) { + if !selftelutil.KVMatch(dp, map[string]string{"component_id": "pyspy/test"}) { t.Errorf("datapoint missing component_id=pyspy/test: %v", dp.Attributes) continue } @@ -168,14 +103,14 @@ func TestPyspy_EmitsErrorsTotal_WithKindAndComponentID(t *testing.T) { // the PR-F sibling-port decision so a future drift back to the deleted // internal/selftelemetry scope fails here. func TestPyspy_ScopeNameIsReceiverImportPath(t *testing.T) { - mp, rdr := newTestMeterProvider(t) + mp, rdr := selftelutil.NewTestMeterProvider(t) st, err := newSelfTelemetry(testID(), mp) if err != nil { t.Fatalf("newSelfTelemetry: %v", err) } st.IncEmissions(1) - rm := collectMetrics(t, rdr) - scope, ok := scopeOf(rm, "otelcol.receiver.pyspy.emissions_total") + rm := selftelutil.CollectRM(t, rdr) + scope, ok := selftelutil.ScopeOf(rm, "otelcol.receiver.pyspy.emissions_total") if !ok { t.Fatalf("emissions_total absent") } @@ -192,13 +127,13 @@ func TestPyspy_ScopeNameIsReceiverImportPath(t *testing.T) { // only signal that a receiver fell back to noop telemetry; dropping the // recordInitError call must fail this test. func TestPyspy_RecordInitError_TicksInitErrorsCounter(t *testing.T) { - mp, rdr := newTestMeterProvider(t) + mp, rdr := selftelutil.NewTestMeterProvider(t) recordInitError(context.Background(), mp, "receiver", testID().String(), reasonInstrumentRegister) - rm := collectMetrics(t, rdr) - m, ok := findInstrument(rm, "otelcol.selftelemetry.init_errors_total") + rm := selftelutil.CollectRM(t, rdr) + m, ok := selftelutil.FindInstrument(rm, "otelcol.selftelemetry.init_errors_total") if !ok { - t.Fatalf("init_errors_total absent; have: %s", dumpNames(rm)) + t.Fatalf("init_errors_total absent; have: %s", selftelutil.DumpNames(rm)) } sum, ok := m.Data.(metricdata.Sum[int64]) if !ok { @@ -213,7 +148,7 @@ func TestPyspy_RecordInitError_TicksInitErrorsCounter(t *testing.T) { "component_id": "pyspy/test", "reason": reasonInstrumentRegister, } - if !kvMatch(dp, want) { + if !selftelutil.KVMatch(dp, want) { t.Errorf("init_errors attrs: got %v, want %v", dp.Attributes, want) } if dp.Value != 1 { @@ -240,8 +175,8 @@ func TestPyspy_RecordInitError_NilProviderIsSafe(t *testing.T) { // noop telemetry field (no nil, no panic on hot-path calls), AND (2) // tick otelcol.selftelemetry.init_errors_total via recordInitError. func TestPyspy_FallsBackToNoopWhenMeterFails(t *testing.T) { - mp, rdr := newTestMeterProvider(t) - failing := &failingReceiverMP{real: mp} + mp, rdr := selftelutil.NewTestMeterProvider(t) + failing := selftelutil.NewFailingMeterProvider(mp, "otelcol.receiver.pyspy.") set := testSettings() set.MeterProvider = failing @@ -261,15 +196,15 @@ func TestPyspy_FallsBackToNoopWhenMeterFails(t *testing.T) { // Hot-path call must not panic + must not surface (noop discards). recv.telemetry.IncError(kindTargetGone) - rm := collectMetrics(t, rdr) - if m, ok := findInstrument(rm, "otelcol.receiver.pyspy.errors_total"); ok { + rm := selftelutil.CollectRM(t, rdr) + if m, ok := selftelutil.FindInstrument(rm, "otelcol.receiver.pyspy.errors_total"); ok { if sum, ok := m.Data.(metricdata.Sum[int64]); ok && len(sum.DataPoints) > 0 { t.Errorf("noop fallback leaked IncError into errors_total datapoints: %v", sum.DataPoints) } } - m, ok := findInstrument(rm, "otelcol.selftelemetry.init_errors_total") + m, ok := selftelutil.FindInstrument(rm, "otelcol.selftelemetry.init_errors_total") if !ok { - t.Fatalf("init_errors_total absent after factory fallback; have: %s", dumpNames(rm)) + t.Fatalf("init_errors_total absent after factory fallback; have: %s", selftelutil.DumpNames(rm)) } sum, ok := m.Data.(metricdata.Sum[int64]) if !ok { @@ -284,81 +219,6 @@ func testID() component.ID { return component.NewIDWithName(componentType(), "test") } -func dumpNames(rm metricdata.ResourceMetrics) string { - var b strings.Builder - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - fmt.Fprintf(&b, " %s@%s", m.Name, sm.Scope.Name) - } - } - return b.String() -} - -// failingReceiverMP wraps a real MeterProvider but fails every instrument -// registration whose name starts with "otelcol.receiver.pyspy.". Mirrors -// the nccl_fr sibling test seam so a future refactor that reorders the -// newSelfTelemetry constructor doesn't silently bypass coverage. -type failingReceiverMP struct { - embedded.MeterProvider - real metric.MeterProvider -} - -func (p *failingReceiverMP) Meter(name string, opts ...metric.MeterOption) metric.Meter { - return &failingReceiverMeter{Meter: p.real.Meter(name, opts...)} -} - -type failingReceiverMeter struct { - metric.Meter -} - -const receiverInstrumentPrefix = "otelcol.receiver.pyspy." - -var errSyntheticReceiverFailure = errors.New("synthetic: receiver instrument registration failed") - -func (m *failingReceiverMeter) Int64Counter(name string, opts ...metric.Int64CounterOption) (metric.Int64Counter, error) { - if strings.HasPrefix(name, receiverInstrumentPrefix) { - return nil, errSyntheticReceiverFailure - } - c, err := m.Meter.Int64Counter(name, opts...) - if err != nil { - return nil, fmt.Errorf("failingReceiverMeter passthrough: %w", err) - } - return c, nil -} - -func (m *failingReceiverMeter) Float64Histogram(name string, opts ...metric.Float64HistogramOption) (metric.Float64Histogram, error) { - if strings.HasPrefix(name, receiverInstrumentPrefix) { - return nil, errSyntheticReceiverFailure - } - h, err := m.Meter.Float64Histogram(name, opts...) - if err != nil { - return nil, fmt.Errorf("failingReceiverMeter passthrough: %w", err) - } - return h, nil -} - -func (m *failingReceiverMeter) Float64ObservableCounter(name string, opts ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) { - if strings.HasPrefix(name, receiverInstrumentPrefix) { - return nil, errSyntheticReceiverFailure - } - c, err := m.Meter.Float64ObservableCounter(name, opts...) - if err != nil { - return nil, fmt.Errorf("failingReceiverMeter passthrough: %w", err) - } - return c, nil -} - -func (m *failingReceiverMeter) Int64ObservableGauge(name string, opts ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) { - if strings.HasPrefix(name, receiverInstrumentPrefix) { - return nil, errSyntheticReceiverFailure - } - g, err := m.Meter.Int64ObservableGauge(name, opts...) - if err != nil { - return nil, fmt.Errorf("failingReceiverMeter passthrough: %w", err) - } - return g, nil -} - // asSelfTelemetry is a compile-time pin: it accepts the package-local // selfTelemetry interface only. If a future refactor moves the type // back into internal/selftelemetry (e.g. reintroduces a diff --git a/go.mod b/go.mod index 12e77e05..c80708f8 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( go.opentelemetry.io/collector/pipeline v1.59.0 go.opentelemetry.io/collector/receiver v1.59.0 go.opentelemetry.io/collector/receiver/receivertest v0.153.0 - go.opentelemetry.io/otel v1.43.0 + go.opentelemetry.io/otel v1.43.0 // indirect go.opentelemetry.io/otel/metric v1.43.0 go.opentelemetry.io/otel/sdk/metric v1.43.0 go.uber.org/zap v1.28.0 diff --git a/module/pkg/selftel/exporter.go b/module/pkg/selftel/exporter.go new file mode 100644 index 00000000..221842f4 --- /dev/null +++ b/module/pkg/selftel/exporter.go @@ -0,0 +1,92 @@ +// SPDX-License-Identifier: Apache-2.0 + +package selftel + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// Exporter is the exporter-scoped self-health surface shared by the +// in-repo exporters. Methods are non-blocking + safe for concurrent +// use; the noop impl discards. kind is the caller's wire-format string +// (e.g. "marshal", "io", "downstream") — each caller keeps a +// package-local `kind` newtype + named consts and casts at this seam. +// +// Why this is the minimal surface: the v0.1.x internal/selftelemetry +// Exporter interface also carried FailureRateReader / ExporterCarrier +// for the per-exporter failure_rate aggregation contract. That contract +// was intentionally dropped by RFC-0013 PR-A2 / PR-F.1; operators +// rate-derive failure rate via PromQL +// `rate(calls_total{result="error"}[5m])` instead. +type Exporter interface { + IncCallSuccess() + IncCallFailure(kind string) +} + +// Noop fallback is intentionally NOT exported from this package. Each +// caller (the per-component selftel.go wrappers) owns its own +// package-local noop type — they need the noop to satisfy the +// caller's package-local selfExporter interface (which carries the +// caller's `kind` newtype), and a shared `selftel.Exporter`-typed +// noop wouldn't fit. A shared noop here would be dead production +// surface. + +// NewExporter returns a real Exporter backed by an OTel counter +// acquired from mp at the supplied scope. The counter is registered +// under callsTotalName (caller chooses, typically +// `otelcol.exporter..calls_total`); the component's id is +// attached as the `component_id` label on every emission. Metric name +// + label shape preserved from the v0.1.x internal selftelemetry +// package so dashboards / alerts don't regress. +// +// Returns ErrNilMeterProvider when mp is nil; the factory is +// responsible for substituting the noop fallback + ticking +// init_errors_total via RecordInitError. +func NewExporter(componentID, scope, callsTotalName string, mp metric.MeterProvider) (Exporter, error) { + if mp == nil { + return nil, ErrNilMeterProvider + } + meter := mp.Meter(scope) + + calls, err := meter.Int64Counter( + callsTotalName, + metric.WithDescription("Exporter Consume* calls partitioned by result"), + ) + if err != nil { + return nil, fmt.Errorf("exporter.calls_total counter: %w", err) + } + + return &exporterImpl{ + componentID: componentID, + calls: calls, + }, nil +} + +type exporterImpl struct { + componentID string + calls metric.Int64Counter +} + +var _ Exporter = (*exporterImpl)(nil) + +func (e *exporterImpl) IncCallSuccess() { + // Emit component_id + result in one WithAttributes call rather than + // merging two attribute sets — avoids relying on SDK merge semantics + // that vary across OTel versions. + e.calls.Add(context.Background(), 1, metric.WithAttributes( + attribute.String("component_id", e.componentID), + attribute.String("result", "success"), + )) +} + +func (e *exporterImpl) IncCallFailure(kind string) { + e.calls.Add(context.Background(), 1, metric.WithAttributes( + attribute.String("component_id", e.componentID), + attribute.String("result", "failure"), + attribute.String("kind", kind), + )) +} diff --git a/module/pkg/selftel/receiver.go b/module/pkg/selftel/receiver.go new file mode 100644 index 00000000..8aa47cd9 --- /dev/null +++ b/module/pkg/selftel/receiver.go @@ -0,0 +1,210 @@ +// SPDX-License-Identifier: Apache-2.0 + +package selftel + +import ( + "context" + "fmt" + "sync/atomic" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// Receiver is the receiver-scoped self-health surface shared by the +// in-repo receivers. Methods are non-blocking + safe for concurrent +// use; the noop impl discards. kind is the caller's wire-format string +// (e.g. "enumerate", "target_gone") — each caller keeps a +// package-local `kind` newtype + named consts and casts at this seam. +// +// Mirrors the v0.1.x internal/selftelemetry.Receiver interface 1:1 so +// the migration was mechanical; trimmed to the exact surface in-repo +// receivers use (no Connect/Cardinality/Init kinds, no +// FailureRateReader — those belonged to the exporter surface). +type Receiver interface { + IncError(kind string) + IncEmissions(n int64) + ObserveLatency(d time.Duration) + SetDegraded(degraded bool) + MarkActivity() +} + +// ReceiverInstrumentNames pins the per-component instrument names the +// shared receiver wiring will register. Each caller chooses these to +// match `otelcol.receiver..` (operator-facing contract). +// Keeping them on a struct rather than five string params avoids +// argument-order bugs at call sites. +type ReceiverInstrumentNames struct { + ErrorsTotal string // counter; "errors_total" + EmissionsTotal string // counter; "emissions_total" + CollectionLatencySeconds string // histogram; "collection_latency_seconds" + DegradedSecondsTotal string // observable counter; "degraded_seconds_total" + LastActivityUnixSeconds string // observable gauge; "last_activity_unix_seconds" +} + +// Noop fallback is intentionally NOT exported from this package. Each +// caller (the per-component selftel.go wrappers) owns its own +// package-local noop type — they need the noop to satisfy the +// caller's package-local selfTelemetry interface (which carries the +// caller's `kind` newtype), and a shared `selftel.Receiver`-typed +// noop wouldn't fit. A shared noop here would be dead production +// surface. + +// latencyBucketBoundaries are the histogram bucket boundaries for the +// collection-latency histogram. Chosen for sub-millisecond dump-poll +// cycles up to 10s slow paths; mirrors the v0.1.x internal/selftelemetry +// shape so histograms remain comparable across the migration. +var latencyBucketBoundaries = []float64{ + 0.0001, 0.001, 0.005, 0.01, 0.05, + 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, +} + +// NewReceiver returns a real Receiver backed by OTel metric instruments +// acquired from mp at the supplied scope. The component's id is attached +// as the `component_id` label on every emission. Registers the five +// instruments named by `names`. Returns ErrNilMeterProvider when mp is +// nil; the factory is responsible for substituting the noop fallback + +// ticking init_errors_total via RecordInitError. +func NewReceiver(componentID, scope string, names ReceiverInstrumentNames, mp metric.MeterProvider) (Receiver, error) { + if mp == nil { + return nil, ErrNilMeterProvider + } + meter := mp.Meter(scope) + attrSet := attribute.NewSet(attribute.String("component_id", componentID)) + + errsCtr, err := meter.Int64Counter( + names.ErrorsTotal, + metric.WithDescription("Errors observed by a receiver, partitioned by kind"), + ) + if err != nil { + return nil, fmt.Errorf("errors_total counter: %w", err) + } + emissionsCtr, err := meter.Int64Counter( + names.EmissionsTotal, + metric.WithDescription("Data points / events emitted by a receiver"), + ) + if err != nil { + return nil, fmt.Errorf("emissions_total counter: %w", err) + } + latencyHist, err := meter.Float64Histogram( + names.CollectionLatencySeconds, + metric.WithDescription("Receiver collection cycle latency in seconds"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(latencyBucketBoundaries...), + ) + if err != nil { + return nil, fmt.Errorf("collection_latency_seconds histogram: %w", err) + } + + r := &receiverImpl{ + componentID: componentID, + attrs: attrSet, + errors: errsCtr, + emissions: emissionsCtr, + latency: latencyHist, + } + // Seed last-activity to construction time so a `time() - last_activity + // > N` alert doesn't fire on the zero-valued gauge during boot. + r.activityUnix.Store(time.Now().Unix()) + + if _, err := meter.Float64ObservableCounter( + names.DegradedSecondsTotal, + metric.WithDescription("Cumulative seconds the receiver has been in the degraded state"), + metric.WithUnit("s"), + metric.WithFloat64Callback(func(_ context.Context, obs metric.Float64Observer) error { + obs.Observe(r.degradedTotalSeconds(), metric.WithAttributeSet(attrSet)) + return nil + }), + ); err != nil { + return nil, fmt.Errorf("degraded_seconds_total observable: %w", err) + } + + if _, err := meter.Int64ObservableGauge( + names.LastActivityUnixSeconds, + metric.WithDescription("Unix-second timestamp of the receiver's last successful activity"), + metric.WithInt64Callback(func(_ context.Context, obs metric.Int64Observer) error { + obs.Observe(r.activityUnix.Load(), metric.WithAttributeSet(attrSet)) + return nil + }), + ); err != nil { + return nil, fmt.Errorf("last_activity_unix_seconds observable: %w", err) + } + + return r, nil +} + +type receiverImpl struct { + componentID string + attrs attribute.Set + errors metric.Int64Counter + emissions metric.Int64Counter + latency metric.Float64Histogram + + // degradedAt holds the time of the most recent SetDegraded(true); + // nil pointer = not currently degraded. Atomic so SetDegraded is + // lock-free and the observable callback reads a stable snapshot. + degradedAt atomic.Pointer[time.Time] + + // accumulated holds nanoseconds spent degraded across completed + // degrade→recover cycles; degradedTotalSeconds adds the open + // interval at observation time. + accumulated atomic.Uint64 + + // activityUnix holds the Unix-second timestamp of the most recent + // MarkActivity (seeded to construction time). + activityUnix atomic.Int64 +} + +var _ Receiver = (*receiverImpl)(nil) + +func (r *receiverImpl) IncError(kind string) { + // Emit component_id + kind in one WithAttributes call rather than + // merging two attribute sets — avoids relying on SDK merge semantics + // that vary across OTel versions. + r.errors.Add(context.Background(), 1, metric.WithAttributes( + attribute.String("component_id", r.componentID), + attribute.String("kind", kind), + )) +} + +func (r *receiverImpl) IncEmissions(n int64) { + if n < 0 { + return + } + r.emissions.Add(context.Background(), n, metric.WithAttributeSet(r.attrs)) +} + +func (r *receiverImpl) ObserveLatency(d time.Duration) { + r.latency.Record(context.Background(), d.Seconds(), metric.WithAttributeSet(r.attrs)) +} + +// SetDegraded transitions degraded state. Lock-free: enter via +// CAS(nil → &now), exit via Swap → nil + accumulate the elapsed +// interval. Microsecond-scale under-count on concurrent transitions is +// tolerated; self-corrects on the next scrape. +func (r *receiverImpl) SetDegraded(degraded bool) { + if degraded { + now := time.Now() + r.degradedAt.CompareAndSwap(nil, &now) + return + } + if old := r.degradedAt.Swap(nil); old != nil { + elapsed := time.Since(*old) + if elapsed > 0 { + r.accumulated.Add(uint64(elapsed.Nanoseconds())) + } + } +} + +func (r *receiverImpl) MarkActivity() { + r.activityUnix.Store(time.Now().Unix()) +} + +func (r *receiverImpl) degradedTotalSeconds() float64 { + acc := time.Duration(r.accumulated.Load()) + if openStart := r.degradedAt.Load(); openStart != nil { + acc += time.Since(*openStart) + } + return acc.Seconds() +} diff --git a/module/pkg/selftel/selftel.go b/module/pkg/selftel/selftel.go new file mode 100644 index 00000000..5a8e6f5e --- /dev/null +++ b/module/pkg/selftel/selftel.go @@ -0,0 +1,82 @@ +// SPDX-License-Identifier: Apache-2.0 + +// Package selftel is the shared self-telemetry plumbing for in-repo +// receivers + exporters. It replaces four hand-rolled selftel.go +// copies (one each in components/exporters/otlphttp, +// components/exporters/stdoutexporter, components/receivers/pyspy, +// and module/receiver/ncclfrreceiver). The per-component +// instrument-name / scope-name choices stay with the caller (those are +// the operator-facing contracts pinned by upstream OTel collector +// `otelcol___` convention per RFC-0013 +// §migration v0.1.0 namespace alignment); only the plumbing — the +// OTel counter/histogram/observable wiring, the degraded-state atomic +// machinery, the noop fallback, the init-error counter — lives here. +// +// Scope choice: this package owns no scope of its own. Each caller +// passes its own Go import path (`github.com/tracecoreai/tracecore/...`) +// as the scope so multi-component scrape output stays partitioned per +// the OTel scope-name standard. +// +// kind is `string` at this seam. Each caller keeps a package-local +// `kind string` newtype + named consts (the wire-format strings are +// the stable operator contract); the shared API takes the cast value +// so this package stays unaware of caller-specific enums. +package selftel + +import ( + "context" + "errors" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// ErrNilMeterProvider is returned by NewExporter / NewReceiver when +// called with a nil MeterProvider. The factory is responsible for +// substituting the noop fallback + ticking init_errors_total via +// RecordInitError. Returning a sentinel rather than a generic error +// lets the factory distinguish "wire-up bug" from "instrument register +// failure" if it ever needs to. +var ErrNilMeterProvider = errors.New("selftel: MeterProvider is nil") + +// ReasonInstrumentRegister labels init_errors_total ticks when OTel +// instrument registration failed at construction time. Exported so +// callers (factory wiring) reuse the same wire-format string. +const ReasonInstrumentRegister = "instrument_register" + +// initErrorsTotalName is the canonical metric name for the self-telemetry +// fallback counter. Kept private — callers don't choose this name. +const initErrorsTotalName = "otelcol.selftelemetry.init_errors_total" + +// RecordInitError ticks otelcol.selftelemetry.init_errors_total when +// receiver/exporter wiring falls back to noop telemetry. Operators +// alert on `> 0` to learn that self-telemetry isn't really plugged in. +// Panics from a broken MeterProvider are swallowed — RecordInitError +// IS the degraded-path fallback; crashing here would turn a partial +// outage into a process kill. +// +// scope is the caller's Go import path (matches the scope passed to +// NewExporter / NewReceiver so init-error ticks land in the same +// scope-metrics block as the rest of the component's self-telemetry). +// kindLabel is the role string ("exporter" or "receiver"); componentID +// is the caller-supplied component id; reason is ReasonInstrumentRegister +// (or a caller-defined extension). +func RecordInitError(ctx context.Context, mp metric.MeterProvider, scope, kindLabel, componentID, reason string) { + defer func() { _ = recover() }() + if mp == nil { + return + } + meter := mp.Meter(scope) + c, err := meter.Int64Counter( + initErrorsTotalName, + metric.WithDescription("Counter of self-telemetry construction failures that fell back to the noop implementation."), + ) + if err != nil { + return + } + c.Add(ctx, 1, metric.WithAttributes( + attribute.String("kind", kindLabel), + attribute.String("component_id", componentID), + attribute.String("reason", reason), + )) +} diff --git a/module/pkg/selftel/selftel_test.go b/module/pkg/selftel/selftel_test.go new file mode 100644 index 00000000..7a81164d --- /dev/null +++ b/module/pkg/selftel/selftel_test.go @@ -0,0 +1,260 @@ +// SPDX-License-Identifier: Apache-2.0 + +package selftel_test + +import ( + "context" + "errors" + "testing" + "time" + + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/embedded" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + + "github.com/tracecoreai/tracecore/module/pkg/selftel" + selftelutil "github.com/tracecoreai/tracecore/module/pkg/testutil/selftel" +) + +const ( + testScope = "github.com/tracecoreai/tracecore/module/pkg/selftel/test" + testComponentID = "test/component" + testCallsTotalName = "otelcol.test.exporter.calls_total" + + receiverErrorsTotal = "otelcol.test.receiver.errors_total" + receiverEmissionsTotal = "otelcol.test.receiver.emissions_total" + receiverCollectionLatencySecs = "otelcol.test.receiver.collection_latency_seconds" + receiverDegradedSecondsTotal = "otelcol.test.receiver.degraded_seconds_total" + receiverLastActivityUnixSeconds = "otelcol.test.receiver.last_activity_unix_seconds" +) + +func receiverNames() selftel.ReceiverInstrumentNames { + return selftel.ReceiverInstrumentNames{ + ErrorsTotal: receiverErrorsTotal, + EmissionsTotal: receiverEmissionsTotal, + CollectionLatencySeconds: receiverCollectionLatencySecs, + DegradedSecondsTotal: receiverDegradedSecondsTotal, + LastActivityUnixSeconds: receiverLastActivityUnixSeconds, + } +} + +// Per-component noop coverage lives where the noop is consumed: +// each per-component selftel_test.go has its own TestX_NoopAlwaysSafe +// against its package-local noop type. The shared package has no +// production noop to test (see exporter.go / receiver.go "Noop +// fallback is intentionally NOT exported" comment). + +func TestNewExporter_NilProviderReturnsSentinel(t *testing.T) { + if _, err := selftel.NewExporter(testComponentID, testScope, testCallsTotalName, nil); !errors.Is(err, selftel.ErrNilMeterProvider) { + t.Fatalf("err = %v, want ErrNilMeterProvider", err) + } +} + +func TestNewReceiver_NilProviderReturnsSentinel(t *testing.T) { + if _, err := selftel.NewReceiver(testComponentID, testScope, receiverNames(), nil); !errors.Is(err, selftel.ErrNilMeterProvider) { + t.Fatalf("err = %v, want ErrNilMeterProvider", err) + } +} + +func TestNewExporter_EmitsCallsTotalShape(t *testing.T) { + mp, rdr := selftelutil.NewTestMeterProvider(t) + e, err := selftel.NewExporter(testComponentID, testScope, testCallsTotalName, mp) + if err != nil { + t.Fatalf("NewExporter: %v", err) + } + e.IncCallSuccess() + e.IncCallFailure("marshal") + + rm := selftelutil.CollectRM(t, rdr) + m, ok := selftelutil.FindInstrument(rm, testCallsTotalName) + if !ok { + t.Fatalf("calls_total missing; have: %s", selftelutil.DumpNames(rm)) + } + sum, ok := m.Data.(metricdata.Sum[int64]) + if !ok { + t.Fatalf("calls_total shape: got %T", m.Data) + } + gotKeys := map[string]int{} + for _, dp := range sum.DataPoints { + if !selftelutil.KVMatch(dp, map[string]string{"component_id": testComponentID}) { + t.Errorf("missing component_id on datapoint: %v", dp.Attributes) + continue + } + result, _ := dp.Attributes.Value("result") + kind, _ := dp.Attributes.Value("kind") + gotKeys[result.AsString()+"/"+kind.AsString()] += int(dp.Value) + } + want := map[string]int{"success/": 1, "failure/marshal": 1} + for k, v := range want { + if gotKeys[k] != v { + t.Errorf("calls_total[%q]: got %d, want %d", k, gotKeys[k], v) + } + } + + scope, ok := selftelutil.ScopeOf(rm, testCallsTotalName) + if !ok || scope != testScope { + t.Errorf("scope: got %q, want %q", scope, testScope) + } +} + +func TestNewReceiver_EmitsAllFiveInstruments(t *testing.T) { + mp, rdr := selftelutil.NewTestMeterProvider(t) + r, err := selftel.NewReceiver(testComponentID, testScope, receiverNames(), mp) + if err != nil { + t.Fatalf("NewReceiver: %v", err) + } + r.IncError("enumerate") + r.IncError("enumerate") + r.IncError("parse") + r.IncEmissions(5) + r.IncEmissions(-1) // discarded + r.ObserveLatency(20 * time.Millisecond) + r.SetDegraded(true) + r.SetDegraded(false) + r.MarkActivity() + + rm := selftelutil.CollectRM(t, rdr) + for _, name := range []string{ + receiverErrorsTotal, receiverEmissionsTotal, + receiverCollectionLatencySecs, receiverDegradedSecondsTotal, + receiverLastActivityUnixSeconds, + } { + if _, ok := selftelutil.FindInstrument(rm, name); !ok { + t.Errorf("instrument %q missing; have: %s", name, selftelutil.DumpNames(rm)) + } + } + + // emissions_total: -1 must be discarded → total 5. + em, _ := selftelutil.FindInstrument(rm, receiverEmissionsTotal) + sum := em.Data.(metricdata.Sum[int64]) + if len(sum.DataPoints) != 1 || sum.DataPoints[0].Value != 5 { + t.Errorf("emissions: got %v, want 1 datapoint value=5", sum.DataPoints) + } + + // errors_total: 2 enumerate + 1 parse. + errs, _ := selftelutil.FindInstrument(rm, receiverErrorsTotal) + errSum := errs.Data.(metricdata.Sum[int64]) + byKind := map[string]int64{} + for _, dp := range errSum.DataPoints { + kind, _ := dp.Attributes.Value("kind") + byKind[kind.AsString()] += dp.Value + } + if byKind["enumerate"] != 2 || byKind["parse"] != 1 { + t.Errorf("errors_total by kind: %v", byKind) + } +} + +func TestRecordInitError_TicksCounter(t *testing.T) { + mp, rdr := selftelutil.NewTestMeterProvider(t) + selftel.RecordInitError(context.Background(), mp, testScope, "exporter", testComponentID, selftel.ReasonInstrumentRegister) + + rm := selftelutil.CollectRM(t, rdr) + m, ok := selftelutil.FindInstrument(rm, "otelcol.selftelemetry.init_errors_total") + if !ok { + t.Fatalf("init_errors_total absent; have: %s", selftelutil.DumpNames(rm)) + } + sum := m.Data.(metricdata.Sum[int64]) + if len(sum.DataPoints) != 1 || sum.DataPoints[0].Value != 1 { + t.Fatalf("init_errors: want 1 datapoint value=1, got %v", sum.DataPoints) + } + dp := sum.DataPoints[0] + want := map[string]string{ + "kind": "exporter", + "component_id": testComponentID, + "reason": selftel.ReasonInstrumentRegister, + } + if !selftelutil.KVMatch(dp, want) { + t.Errorf("init_errors attrs: got %v, want %v", dp.Attributes, want) + } +} + +func TestRecordInitError_NilProviderIsSafe(t *testing.T) { + defer func() { + if r := recover(); r != nil { + t.Fatalf("nil provider panicked: %v", r) + } + }() + selftel.RecordInitError(context.Background(), nil, testScope, "exporter", testComponentID, selftel.ReasonInstrumentRegister) +} + +// TestRecordInitError_FailingProviderIsSafe pins: RecordInitError +// swallows panics from a broken MeterProvider. Operators rely on this +// — RecordInitError IS the degraded-path fallback; crashing here would +// turn a partial outage into a process kill. +func TestRecordInitError_FailingProviderIsSafe(t *testing.T) { + defer func() { + if r := recover(); r != nil { + t.Fatalf("panicking provider escaped: %v", r) + } + }() + selftel.RecordInitError(context.Background(), panickingMP{}, testScope, "exporter", testComponentID, selftel.ReasonInstrumentRegister) +} + +// panickingMP returns a Meter that panics on every instrument call. +type panickingMP struct{ embedded.MeterProvider } + +func (panickingMP) Meter(string, ...metric.MeterOption) metric.Meter { + return panickingMeter{} +} + +type panickingMeter struct{ embedded.Meter } + +func (panickingMeter) Int64Counter(string, ...metric.Int64CounterOption) (metric.Int64Counter, error) { + panic("synthetic counter panic") +} + +func (panickingMeter) Int64UpDownCounter(string, ...metric.Int64UpDownCounterOption) (metric.Int64UpDownCounter, error) { + return nil, nil +} + +func (panickingMeter) Int64Histogram(string, ...metric.Int64HistogramOption) (metric.Int64Histogram, error) { + return nil, nil +} + +func (panickingMeter) Int64Gauge(string, ...metric.Int64GaugeOption) (metric.Int64Gauge, error) { + return nil, nil +} + +func (panickingMeter) Int64ObservableCounter(string, ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) { + return nil, nil +} + +func (panickingMeter) Int64ObservableUpDownCounter(string, ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) { + return nil, nil +} + +func (panickingMeter) Int64ObservableGauge(string, ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) { + return nil, nil +} + +func (panickingMeter) Float64Counter(string, ...metric.Float64CounterOption) (metric.Float64Counter, error) { + return nil, nil +} + +func (panickingMeter) Float64UpDownCounter(string, ...metric.Float64UpDownCounterOption) (metric.Float64UpDownCounter, error) { + return nil, nil +} + +func (panickingMeter) Float64Histogram(string, ...metric.Float64HistogramOption) (metric.Float64Histogram, error) { + return nil, nil +} + +func (panickingMeter) Float64Gauge(string, ...metric.Float64GaugeOption) (metric.Float64Gauge, error) { + return nil, nil +} + +func (panickingMeter) Float64ObservableCounter(string, ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) { + return nil, nil +} + +func (panickingMeter) Float64ObservableUpDownCounter(string, ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) { + return nil, nil +} + +func (panickingMeter) Float64ObservableGauge(string, ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) { + return nil, nil +} + +func (panickingMeter) RegisterCallback(metric.Callback, ...metric.Observable) (metric.Registration, error) { + return nil, nil +} diff --git a/module/pkg/testutil/selftel/selftel.go b/module/pkg/testutil/selftel/selftel.go new file mode 100644 index 00000000..f2571e57 --- /dev/null +++ b/module/pkg/testutil/selftel/selftel.go @@ -0,0 +1,187 @@ +// SPDX-License-Identifier: Apache-2.0 + +// Package selftel provides test helpers for the shared +// module/pkg/selftel surface + the per-component selftel wrappers in +// receivers/exporters. Build-tag-free: imported only by `*_test.go` +// files so the production binary doesn't pull the SDK MeterProvider. +// +// Why this lives under module/pkg/testutil rather than next to +// module/pkg/selftel: keeping test helpers in a sibling-named package +// (selftel.NewExporter vs selfteltest.NewTestMeterProvider) keeps the +// production surface free of test-only types while still letting both +// caller pools (components/* + module/receiver/*) import a single +// helper module. +package selftel + +import ( + "context" + "errors" + "fmt" + "strings" + "testing" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/embedded" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +// NewTestMeterProvider builds an SDK MeterProvider backed by a +// ManualReader so tests can collect metricdata.ResourceMetrics +// deterministically without the Prometheus exporter or any +// internal/telemetry plumbing. Caller-side packages stay decoupled +// from internal/* so PR-F can delete those packages without touching +// per-component test files. +func NewTestMeterProvider(t *testing.T) (*sdkmetric.MeterProvider, *sdkmetric.ManualReader) { + t.Helper() + rdr := sdkmetric.NewManualReader() + mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)) + t.Cleanup(func() { _ = mp.Shutdown(context.Background()) }) + return mp, rdr +} + +// CollectRM collects a single ResourceMetrics snapshot from the +// supplied ManualReader; fatals on collection error. +func CollectRM(t *testing.T, rdr *sdkmetric.ManualReader) metricdata.ResourceMetrics { + t.Helper() + var rm metricdata.ResourceMetrics + if err := rdr.Collect(context.Background(), &rm); err != nil { + t.Fatalf("collect: %v", err) + } + return rm +} + +// FindInstrument returns the first metricdata.Metrics whose Name +// matches the supplied OTel-dot name. Returns (zero, false) if absent. +// Scope-agnostic: walks all scope metrics. +func FindInstrument(rm metricdata.ResourceMetrics, name string) (metricdata.Metrics, bool) { + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == name { + return m, true + } + } + } + return metricdata.Metrics{}, false +} + +// ScopeOf returns the instrumentation-scope name that emitted the +// supplied metric name; used to pin the per-component scope-name +// standard. +func ScopeOf(rm metricdata.ResourceMetrics, name string) (string, bool) { + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == name { + return sm.Scope.Name, true + } + } + } + return "", false +} + +// KVMatch returns true if every key/value in want matches the int64 +// datapoint's attribute set. +func KVMatch(dp metricdata.DataPoint[int64], want map[string]string) bool { + for k, v := range want { + got, ok := dp.Attributes.Value(attribute.Key(k)) + if !ok || got.AsString() != v { + return false + } + } + return true +} + +// DumpNames returns a space-separated `name@scope` listing of every +// metric in rm. Used by t.Fatalf messages when a lookup misses. +func DumpNames(rm metricdata.ResourceMetrics) string { + var b strings.Builder + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + fmt.Fprintf(&b, " %s@%s", m.Name, sm.Scope.Name) + } + } + return b.String() +} + +// FailingMeterProvider wraps a real MeterProvider but fails every +// instrument registration whose name starts with the configured +// prefix. Used to drive the factory's noop-fallback path. +// +// Why this is here rather than per-package: every consumer's +// `failing*MP` was a verbatim copy with only the instrument prefix +// changing — extracting cuts ~30 LOC per test file and centralizes +// the failure-injection contract so any new instrument type added to +// module/pkg/selftel needs exactly one method added here. +type FailingMeterProvider struct { + embedded.MeterProvider + real metric.MeterProvider + prefix string +} + +// NewFailingMeterProvider wraps real so any Int64Counter, +// Float64Histogram, Float64ObservableCounter, or Int64ObservableGauge +// registration whose name starts with prefix returns a synthetic +// error. +func NewFailingMeterProvider(real metric.MeterProvider, prefix string) *FailingMeterProvider { + return &FailingMeterProvider{real: real, prefix: prefix} +} + +// Meter returns a wrapped meter whose instrument registrations honor +// the failure prefix. +func (p *FailingMeterProvider) Meter(name string, opts ...metric.MeterOption) metric.Meter { + return &failingMeter{Meter: p.real.Meter(name, opts...), prefix: p.prefix} +} + +// errSyntheticFailure is the sentinel returned by failingMeter when an +// instrument name matches the configured prefix. +var errSyntheticFailure = errors.New("synthetic: instrument registration failed") + +type failingMeter struct { + metric.Meter + prefix string +} + +func (m *failingMeter) Int64Counter(name string, opts ...metric.Int64CounterOption) (metric.Int64Counter, error) { + if strings.HasPrefix(name, m.prefix) { + return nil, errSyntheticFailure + } + c, err := m.Meter.Int64Counter(name, opts...) + if err != nil { + return nil, fmt.Errorf("failingMeter passthrough: %w", err) + } + return c, nil +} + +func (m *failingMeter) Float64Histogram(name string, opts ...metric.Float64HistogramOption) (metric.Float64Histogram, error) { + if strings.HasPrefix(name, m.prefix) { + return nil, errSyntheticFailure + } + h, err := m.Meter.Float64Histogram(name, opts...) + if err != nil { + return nil, fmt.Errorf("failingMeter passthrough: %w", err) + } + return h, nil +} + +func (m *failingMeter) Float64ObservableCounter(name string, opts ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) { + if strings.HasPrefix(name, m.prefix) { + return nil, errSyntheticFailure + } + c, err := m.Meter.Float64ObservableCounter(name, opts...) + if err != nil { + return nil, fmt.Errorf("failingMeter passthrough: %w", err) + } + return c, nil +} + +func (m *failingMeter) Int64ObservableGauge(name string, opts ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) { + if strings.HasPrefix(name, m.prefix) { + return nil, errSyntheticFailure + } + g, err := m.Meter.Int64ObservableGauge(name, opts...) + if err != nil { + return nil, fmt.Errorf("failingMeter passthrough: %w", err) + } + return g, nil +} diff --git a/module/receiver/ncclfrreceiver/selftel.go b/module/receiver/ncclfrreceiver/selftel.go index 63c9b36c..266cd658 100644 --- a/module/receiver/ncclfrreceiver/selftel.go +++ b/module/receiver/ncclfrreceiver/selftel.go @@ -1,39 +1,39 @@ // SPDX-License-Identifier: Apache-2.0 -// Receiver-scoped self-telemetry surface. Replaces the v0.1.x -// dependency on `internal/selftelemetry`, which was deleted in -// RFC-0013 PR-F.1. Metric names follow the upstream OTel collector -// `otelcol___` convention per RFC-0013 -// §migration v0.1.0 namespace alignment: instruments register as -// `otelcol.receiver.ncclfr.errors_total{kind,component_id}` (OTel-dot -// form; the Prometheus exporter renders this as +// Receiver-scoped self-telemetry surface. Thin wrapper over +// module/pkg/selftel that pins this receiver's scope-name + instrument +// names + the kind enum. Metric names follow the upstream OTel +// collector `otelcol___` convention per +// RFC-0013 §migration v0.1.0 namespace alignment: instruments register +// as `otelcol.receiver.ncclfr.errors_total{kind,component_id}` +// (OTel-dot form; the Prometheus exporter renders this as // `otelcol_receiver_ncclfr_errors_total`). Label shape is preserved // (`component_id` still partitions per-instance) so multi-instance // disambiguation in dashboards is unchanged from v0.1.x. The -// instrumentation scope name is THIS receiver's Go import path — when -// the receiver moves to `module/receiver/ncclfrreceiver/` in PR-I.1, -// the scope name moves with it, matching OTel convention. Operators -// migrating from v0.1.x dashboards rename `tracecore_receiver_*` → -// `otelcol_receiver_ncclfr_*` per docs/migration/v0.1-to-v0.2.md. +// instrumentation scope name is THIS receiver's Go import path. +// Operators migrating from v0.1.x dashboards rename +// `tracecore_receiver_*` → `otelcol_receiver_ncclfr_*` per +// docs/migration/v0.1-to-v0.2.md. +// +// Shared plumbing (the OTel counters/histogram/observables, the +// degraded-state atomic machinery, the noop fallback, the init-error +// fallback counter) lives in module/pkg/selftel; this file owns only +// the naming + the kind enum + the package-local interface shape. package ncclfrreceiver import ( "context" - "errors" - "fmt" - "sync/atomic" "time" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + + "github.com/tracecoreai/tracecore/module/pkg/selftel" ) -// kind is a low-cardinality error-class identifier. Mirrors the -// internal/selftelemetry.Kind type so the migration is mechanical; -// receiver-local because the canonical-Kind enforcement that the -// internal package owned moves into RFC-0013 PR-I's submodule. +// kind is a low-cardinality error-class identifier. Receiver-local +// so the wire-format strings stay owned by the package that emits them. type kind string const ( @@ -44,27 +44,36 @@ const ( kindPanic kind = "panic" // recovered panic in hot path ) -// reasonInstrumentRegister labels init_errors_total ticks when OTel -// instrument registration failed at construction time. -const reasonInstrumentRegister = "instrument_register" - // instrumentationScope pins the OTel scope name. Per OTel convention, -// the scope is the package's Go import path; PR-I.1b moved this to the -// Go submodule when the receiver landed at module/receiver/ncclfrreceiver/. +// the scope is the package's Go import path. const instrumentationScope = "github.com/tracecoreai/tracecore/module/receiver/ncclfrreceiver" -// errNilMeterProvider mirrors selftelemetry.ErrNilMeterProvider — the -// factory is responsible for substituting the noop fallback + ticking -// init_errors_total. Returning a sentinel rather than a generic error -// lets the factory distinguish "wire-up bug" from "instrument register -// failure" if it ever needs to. -var errNilMeterProvider = errors.New("nccl_fr: MeterProvider is nil") +// reasonInstrumentRegister is the wire-format label value for +// init_errors_total ticks when OTel instrument registration failed at +// construction time. Re-exported from the shared package so this +// package's factory + tests don't import selftel just for the const. +const reasonInstrumentRegister = selftel.ReasonInstrumentRegister + +// errNilMeterProvider is the sentinel returned by newSelfTelemetry +// when called with a nil MeterProvider. Aliased to the shared sentinel +// so the factory's errors.Is check survives the migration. +var errNilMeterProvider = selftel.ErrNilMeterProvider + +// receiverInstrumentNames pins the operator-facing instrument names +// for this receiver's five OTel instruments. Kept here (not in the +// shared package) so module/pkg/selftel stays unaware of caller-specific +// name choices. +var receiverInstrumentNames = selftel.ReceiverInstrumentNames{ + ErrorsTotal: "otelcol.receiver.ncclfr.errors_total", + EmissionsTotal: "otelcol.receiver.ncclfr.emissions_total", + CollectionLatencySeconds: "otelcol.receiver.ncclfr.collection_latency_seconds", + DegradedSecondsTotal: "otelcol.receiver.ncclfr.degraded_seconds_total", + LastActivityUnixSeconds: "otelcol.receiver.ncclfr.last_activity_unix_seconds", +} -// selfTelemetry is the receiver-scoped self-health surface. Methods are -// non-blocking + safe for concurrent use; the noop impl discards. -// Mirrors the internal/selftelemetry.Receiver interface but trimmed to -// the exact surface nccl_fr uses — no Connect/Cardinality/Init kinds, -// no FailureRateReader (those belonged to the exporter surface). +// selfTelemetry is the receiver-scoped self-health surface used by +// ncclfrreceiver hot paths. Mirrors selftel.Receiver but carries the +// package-local `kind` type so call sites stay type-checked. type selfTelemetry interface { IncError(k kind) IncEmissions(n int64) @@ -86,184 +95,36 @@ func (noopSelfTelemetry) MarkActivity() {} var _ selfTelemetry = noopSelfTelemetry{} -// newSelfTelemetry returns a real selfTelemetry backed by OTel metric -// instruments acquired from mp. The component's id is attached as the -// `component_id` label on every emission. Registers the same five -// instruments the v0.1.x internal selftelemetry package registered; -// the OTel-dot prefix changed from `tracecore.receiver.*` to -// `otelcol.receiver.ncclfr.*` per RFC-0013 namespace alignment, label -// shape is unchanged. +// newSelfTelemetry returns a real selfTelemetry backed by the shared +// selftel.Receiver wired at this package's scope + instrument names. +// Returns errNilMeterProvider (== selftel.ErrNilMeterProvider) when mp +// is nil; the factory is responsible for the noop fallback + the +// init_errors_total tick via recordInitError. func newSelfTelemetry(id component.ID, mp metric.MeterProvider) (selfTelemetry, error) { - if mp == nil { - return nil, errNilMeterProvider - } - meter := mp.Meter(instrumentationScope) - attrSet := attribute.NewSet(attribute.String("component_id", id.String())) - - errsCtr, err := meter.Int64Counter( - "otelcol.receiver.ncclfr.errors_total", - metric.WithDescription("Errors observed by a receiver, partitioned by kind"), - ) + inner, err := selftel.NewReceiver(id.String(), instrumentationScope, receiverInstrumentNames, mp) if err != nil { - return nil, fmt.Errorf("errors_total counter: %w", err) - } - emissionsCtr, err := meter.Int64Counter( - "otelcol.receiver.ncclfr.emissions_total", - metric.WithDescription("Data points / events emitted by a receiver"), - ) - if err != nil { - return nil, fmt.Errorf("emissions_total counter: %w", err) - } - latencyHist, err := meter.Float64Histogram( - "otelcol.receiver.ncclfr.collection_latency_seconds", - metric.WithDescription("Receiver collection cycle latency in seconds"), - metric.WithUnit("s"), - // Bucket boundaries chosen for sub-millisecond dump-poll cycles - // up to 10s slow paths; mirrors the internal/selftelemetry - // shape so histograms remain comparable across the migration. - metric.WithExplicitBucketBoundaries( - 0.0001, 0.001, 0.005, 0.01, 0.05, - 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, - ), - ) - if err != nil { - return nil, fmt.Errorf("collection_latency_seconds histogram: %w", err) - } - - st := &selfTelemetryImpl{ - componentID: id.String(), - attrs: attrSet, - errors: errsCtr, - emissions: emissionsCtr, - latency: latencyHist, - } - // Seed last-activity to construction time so a `time() - last_activity - // > N` alert doesn't fire on the zero-valued gauge during boot. - st.activityUnix.Store(time.Now().Unix()) - - if _, err := meter.Float64ObservableCounter( - "otelcol.receiver.ncclfr.degraded_seconds_total", - metric.WithDescription("Cumulative seconds the receiver has been in the degraded state"), - metric.WithUnit("s"), - metric.WithFloat64Callback(func(_ context.Context, obs metric.Float64Observer) error { - obs.Observe(st.degradedTotalSeconds(), metric.WithAttributeSet(attrSet)) - return nil - }), - ); err != nil { - return nil, fmt.Errorf("degraded_seconds_total observable: %w", err) - } - - if _, err := meter.Int64ObservableGauge( - "otelcol.receiver.ncclfr.last_activity_unix_seconds", - metric.WithDescription("Unix-second timestamp of the receiver's last successful activity"), - metric.WithInt64Callback(func(_ context.Context, obs metric.Int64Observer) error { - obs.Observe(st.activityUnix.Load(), metric.WithAttributeSet(attrSet)) - return nil - }), - ); err != nil { - return nil, fmt.Errorf("last_activity_unix_seconds observable: %w", err) + return nil, err } - - return st, nil + return &selfTelemetryImpl{inner: inner}, nil } -var _ selfTelemetry = (*selfTelemetryImpl)(nil) - +// selfTelemetryImpl casts the package-local `kind` to string at the +// shared-package seam. Zero-cost — the cast is a compile-time op. type selfTelemetryImpl struct { - componentID string - attrs attribute.Set - errors metric.Int64Counter - emissions metric.Int64Counter - latency metric.Float64Histogram - - // degradedAt holds the time of the most recent SetDegraded(true); - // nil pointer = not currently degraded. Atomic so SetDegraded is - // lock-free and the observable callback reads a stable snapshot. - degradedAt atomic.Pointer[time.Time] - - // accumulated holds nanoseconds spent degraded across completed - // degrade→recover cycles; degradedTotalSeconds adds the open - // interval at observation time. - accumulated atomic.Uint64 - - // activityUnix holds the Unix-second timestamp of the most recent - // MarkActivity (seeded to construction time). - activityUnix atomic.Int64 -} - -func (s *selfTelemetryImpl) IncError(k kind) { - // Emit component_id + kind in one WithAttributes call rather than - // merging two attribute sets — avoids relying on SDK merge semantics - // that vary across OTel versions. - s.errors.Add(context.Background(), 1, metric.WithAttributes( - attribute.String("component_id", s.componentID), - attribute.String("kind", string(k)), - )) -} - -func (s *selfTelemetryImpl) IncEmissions(n int64) { - if n < 0 { - return - } - s.emissions.Add(context.Background(), n, metric.WithAttributeSet(s.attrs)) -} - -func (s *selfTelemetryImpl) ObserveLatency(d time.Duration) { - s.latency.Record(context.Background(), d.Seconds(), metric.WithAttributeSet(s.attrs)) + inner selftel.Receiver } -// SetDegraded transitions degraded state. Lock-free: enter via -// CAS(nil → &now), exit via Swap → nil + accumulate the elapsed -// interval. Microsecond-scale under-count on concurrent transitions is -// tolerated; self-corrects on the next scrape. -func (s *selfTelemetryImpl) SetDegraded(degraded bool) { - if degraded { - now := time.Now() - s.degradedAt.CompareAndSwap(nil, &now) - return - } - if old := s.degradedAt.Swap(nil); old != nil { - elapsed := time.Since(*old) - if elapsed > 0 { - s.accumulated.Add(uint64(elapsed.Nanoseconds())) - } - } -} +var _ selfTelemetry = (*selfTelemetryImpl)(nil) -func (s *selfTelemetryImpl) MarkActivity() { - s.activityUnix.Store(time.Now().Unix()) -} +func (s *selfTelemetryImpl) IncError(k kind) { s.inner.IncError(string(k)) } +func (s *selfTelemetryImpl) IncEmissions(n int64) { s.inner.IncEmissions(n) } +func (s *selfTelemetryImpl) ObserveLatency(d time.Duration) { s.inner.ObserveLatency(d) } +func (s *selfTelemetryImpl) SetDegraded(degraded bool) { s.inner.SetDegraded(degraded) } +func (s *selfTelemetryImpl) MarkActivity() { s.inner.MarkActivity() } -func (s *selfTelemetryImpl) degradedTotalSeconds() float64 { - acc := time.Duration(s.accumulated.Load()) - if openStart := s.degradedAt.Load(); openStart != nil { - acc += time.Since(*openStart) - } - return acc.Seconds() -} - -// recordInitError ticks otelcol.selftelemetry.init_errors_total when -// receiver wiring falls back to noop telemetry. Operators alert on -// `> 0` to learn that self-telemetry isn't really plugged in. Panics -// from a broken MeterProvider are swallowed — recordInitError IS the -// degraded-path fallback; crashing here would turn a partial outage -// into a process kill. +// recordInitError forwards to the shared selftel.RecordInitError with +// this package's scope. Kept as a thin wrapper so the factory's call +// site stays identical to the pre-refactor shape. func recordInitError(ctx context.Context, mp metric.MeterProvider, kindLabel, componentID, reason string) { - defer func() { _ = recover() }() - if mp == nil { - return - } - meter := mp.Meter(instrumentationScope) - c, err := meter.Int64Counter( - "otelcol.selftelemetry.init_errors_total", - metric.WithDescription("Counter of self-telemetry construction failures that fell back to the noop implementation."), - ) - if err != nil { - return - } - c.Add(ctx, 1, metric.WithAttributes( - attribute.String("kind", kindLabel), - attribute.String("component_id", componentID), - attribute.String("reason", reason), - )) + selftel.RecordInitError(ctx, mp, instrumentationScope, kindLabel, componentID, reason) } diff --git a/module/receiver/ncclfrreceiver/selftel_test.go b/module/receiver/ncclfrreceiver/selftel_test.go index 0f80cc2b..df9f9504 100644 --- a/module/receiver/ncclfrreceiver/selftel_test.go +++ b/module/receiver/ncclfrreceiver/selftel_test.go @@ -5,78 +5,13 @@ package ncclfrreceiver import ( "context" "errors" - "fmt" - "strings" "testing" "time" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/embedded" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" -) - -// newTestMeterProvider builds an SDK MeterProvider backed by a ManualReader -// so tests can collect metricdata.ResourceMetrics deterministically without -// the Prometheus exporter or any internal/telemetry plumbing — the receiver -// package must stay decoupled from internal/* so PR-F can delete those -// packages without touching this test file. -func newTestMeterProvider(t *testing.T) (*sdkmetric.MeterProvider, *sdkmetric.ManualReader) { - t.Helper() - rdr := sdkmetric.NewManualReader() - mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)) - t.Cleanup(func() { _ = mp.Shutdown(context.Background()) }) - return mp, rdr -} -func collect(t *testing.T, rdr *sdkmetric.ManualReader) metricdata.ResourceMetrics { - t.Helper() - var rm metricdata.ResourceMetrics - if err := rdr.Collect(context.Background(), &rm); err != nil { - t.Fatalf("collect: %v", err) - } - return rm -} - -// findInstrument returns the first metricdata.Metrics whose Name matches the -// supplied OTel-dot name (e.g. "otelcol.receiver.ncclfr.errors_total"). -// Returns (nil, false) if absent. Scope-agnostic: walks all scope metrics. -func findInstrument(rm metricdata.ResourceMetrics, name string) (metricdata.Metrics, bool) { - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - if m.Name == name { - return m, true - } - } - } - return metricdata.Metrics{}, false -} - -// scopeOf returns the instrumentation scope name that emitted the supplied -// metric name. Used to pin the scope-name standard for PR-B1. -func scopeOf(rm metricdata.ResourceMetrics, name string) (string, bool) { - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - if m.Name == name { - return sm.Scope.Name, true - } - } - } - return "", false -} - -// kvMatch returns true if every want key's value matches the int64 -// datapoint's attribute set. -func kvMatch(dp metricdata.DataPoint[int64], want map[string]string) bool { - for k, v := range want { - got, ok := dp.Attributes.Value(attribute.Key(k)) - if !ok || got.AsString() != v { - return false - } - } - return true -} + selftelutil "github.com/tracecoreai/tracecore/module/pkg/testutil/selftel" +) // TestSelfTelemetry_NoopAlwaysSafe pins: newNoopSelfTelemetry returns a // value whose hot-path methods never panic and silently discard. Every @@ -120,7 +55,7 @@ func TestSelfTelemetry_NewReceiver_NilProviderErrors(t *testing.T) { // regression that drops the kind label, the component_id label, or the // metric-name prefix fails here. func TestSelfTelemetry_EmitsErrorsTotal_WithKindAndComponentID(t *testing.T) { - mp, rdr := newTestMeterProvider(t) + mp, rdr := selftelutil.NewTestMeterProvider(t) st, err := newSelfTelemetry(testSettings().ID, mp) if err != nil { t.Fatalf("newSelfTelemetry: %v", err) @@ -129,10 +64,10 @@ func TestSelfTelemetry_EmitsErrorsTotal_WithKindAndComponentID(t *testing.T) { st.IncError(kindEnumerate) st.IncError(kindParse) - rm := collect(t, rdr) - m, ok := findInstrument(rm, "otelcol.receiver.ncclfr.errors_total") + rm := selftelutil.CollectRM(t, rdr) + m, ok := selftelutil.FindInstrument(rm, "otelcol.receiver.ncclfr.errors_total") if !ok { - t.Fatalf("metric otelcol.receiver.ncclfr.errors_total absent; have: %s", dumpNames(rm)) + t.Fatalf("metric otelcol.receiver.ncclfr.errors_total absent; have: %s", selftelutil.DumpNames(rm)) } sum, ok := m.Data.(metricdata.Sum[int64]) if !ok { @@ -141,7 +76,7 @@ func TestSelfTelemetry_EmitsErrorsTotal_WithKindAndComponentID(t *testing.T) { gotEnum, foundEnum := 0, false gotParse, foundParse := 0, false for _, dp := range sum.DataPoints { - if !kvMatch(dp, map[string]string{"component_id": "nccl_fr/test"}) { + if !selftelutil.KVMatch(dp, map[string]string{"component_id": "nccl_fr/test"}) { t.Errorf("datapoint missing component_id=nccl_fr/test: %v", dp.Attributes) continue } @@ -168,7 +103,7 @@ func TestSelfTelemetry_EmitsErrorsTotal_WithKindAndComponentID(t *testing.T) { // interface contract (kept identical to internal/selftelemetry to avoid // regression in receivers that pass negative debug values). func TestSelfTelemetry_EmitsEmissionsTotal(t *testing.T) { - mp, rdr := newTestMeterProvider(t) + mp, rdr := selftelutil.NewTestMeterProvider(t) st, err := newSelfTelemetry(testSettings().ID, mp) if err != nil { t.Fatalf("newSelfTelemetry: %v", err) @@ -176,10 +111,10 @@ func TestSelfTelemetry_EmitsEmissionsTotal(t *testing.T) { st.IncEmissions(3) st.IncEmissions(5) st.IncEmissions(-1) - rm := collect(t, rdr) - m, ok := findInstrument(rm, "otelcol.receiver.ncclfr.emissions_total") + rm := selftelutil.CollectRM(t, rdr) + m, ok := selftelutil.FindInstrument(rm, "otelcol.receiver.ncclfr.emissions_total") if !ok { - t.Fatalf("metric otelcol.receiver.ncclfr.emissions_total absent; have: %s", dumpNames(rm)) + t.Fatalf("metric otelcol.receiver.ncclfr.emissions_total absent; have: %s", selftelutil.DumpNames(rm)) } sum, ok := m.Data.(metricdata.Sum[int64]) if !ok { @@ -198,14 +133,14 @@ func TestSelfTelemetry_EmitsEmissionsTotal(t *testing.T) { // the PR-B1 decision (vs reusing the deleted internal/selftelemetry scope) // so a future drift back to the internal name fails here. func TestSelfTelemetry_ScopeNameIsReceiverImportPath(t *testing.T) { - mp, rdr := newTestMeterProvider(t) + mp, rdr := selftelutil.NewTestMeterProvider(t) st, err := newSelfTelemetry(testSettings().ID, mp) if err != nil { t.Fatalf("newSelfTelemetry: %v", err) } st.IncEmissions(1) - rm := collect(t, rdr) - scope, ok := scopeOf(rm, "otelcol.receiver.ncclfr.emissions_total") + rm := selftelutil.CollectRM(t, rdr) + scope, ok := selftelutil.ScopeOf(rm, "otelcol.receiver.ncclfr.emissions_total") if !ok { t.Fatalf("emissions_total absent") } @@ -222,13 +157,13 @@ func TestSelfTelemetry_ScopeNameIsReceiverImportPath(t *testing.T) { // only signal that a receiver fell back to noop telemetry; dropping the // recordInitError call must fail this test. func TestRecordInitError_TicksInitErrorsCounter(t *testing.T) { - mp, rdr := newTestMeterProvider(t) + mp, rdr := selftelutil.NewTestMeterProvider(t) recordInitError(context.Background(), mp, "receiver", testSettings().ID.String(), reasonInstrumentRegister) - rm := collect(t, rdr) - m, ok := findInstrument(rm, "otelcol.selftelemetry.init_errors_total") + rm := selftelutil.CollectRM(t, rdr) + m, ok := selftelutil.FindInstrument(rm, "otelcol.selftelemetry.init_errors_total") if !ok { - t.Fatalf("init_errors_total absent; have: %s", dumpNames(rm)) + t.Fatalf("init_errors_total absent; have: %s", selftelutil.DumpNames(rm)) } sum, ok := m.Data.(metricdata.Sum[int64]) if !ok { @@ -243,7 +178,7 @@ func TestRecordInitError_TicksInitErrorsCounter(t *testing.T) { "component_id": "nccl_fr/test", "reason": reasonInstrumentRegister, } - if !kvMatch(dp, want) { + if !selftelutil.KVMatch(dp, want) { t.Errorf("init_errors attrs: got %v, want %v", dp.Attributes, want) } if dp.Value != 1 { @@ -272,8 +207,8 @@ func TestRecordInitError_NilProviderIsSafe(t *testing.T) { // This is the regression seam the dcgm sibling test pins for that // receiver; nccl_fr needs the same guarantee. func TestFactory_FallsBackToNoopWhenMeterFails(t *testing.T) { - mp, rdr := newTestMeterProvider(t) - failing := &failingReceiverMP{real: mp} + mp, rdr := selftelutil.NewTestMeterProvider(t) + failing := selftelutil.NewFailingMeterProvider(mp, "otelcol.receiver.ncclfr.") set := testSettings() set.MeterProvider = failing @@ -293,15 +228,15 @@ func TestFactory_FallsBackToNoopWhenMeterFails(t *testing.T) { // Hot-path call must not panic + must not surface (noop discards). recv.telemetry.IncError(kindEnumerate) - rm := collect(t, rdr) - if m, ok := findInstrument(rm, "otelcol.receiver.ncclfr.errors_total"); ok { + rm := selftelutil.CollectRM(t, rdr) + if m, ok := selftelutil.FindInstrument(rm, "otelcol.receiver.ncclfr.errors_total"); ok { if sum, ok := m.Data.(metricdata.Sum[int64]); ok && len(sum.DataPoints) > 0 { t.Errorf("noop fallback leaked IncError into errors_total datapoints: %v", sum.DataPoints) } } - m, ok := findInstrument(rm, "otelcol.selftelemetry.init_errors_total") + m, ok := selftelutil.FindInstrument(rm, "otelcol.selftelemetry.init_errors_total") if !ok { - t.Fatalf("init_errors_total absent after factory fallback; have: %s", dumpNames(rm)) + t.Fatalf("init_errors_total absent after factory fallback; have: %s", selftelutil.DumpNames(rm)) } sum, ok := m.Data.(metricdata.Sum[int64]) if !ok { @@ -311,78 +246,3 @@ func TestFactory_FallsBackToNoopWhenMeterFails(t *testing.T) { t.Errorf("init_errors_total: want 1 datapoint value=1, got %v", sum.DataPoints) } } - -func dumpNames(rm metricdata.ResourceMetrics) string { - var b strings.Builder - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - fmt.Fprintf(&b, " %s@%s", m.Name, sm.Scope.Name) - } - } - return b.String() -} - -// failingReceiverMP wraps a real MeterProvider but fails every instrument -// registration whose name starts with "otelcol.receiver.ncclfr.". Mirrors -// the dcgm sibling test seam so a future refactor that reorders the -// newSelfTelemetry constructor doesn't silently bypass coverage. -type failingReceiverMP struct { - embedded.MeterProvider - real metric.MeterProvider -} - -func (p *failingReceiverMP) Meter(name string, opts ...metric.MeterOption) metric.Meter { - return &failingReceiverMeter{Meter: p.real.Meter(name, opts...)} -} - -type failingReceiverMeter struct { - metric.Meter -} - -const receiverInstrumentPrefix = "otelcol.receiver.ncclfr." - -var errSyntheticReceiverFailure = errors.New("synthetic: receiver instrument registration failed") - -func (m *failingReceiverMeter) Int64Counter(name string, opts ...metric.Int64CounterOption) (metric.Int64Counter, error) { - if strings.HasPrefix(name, receiverInstrumentPrefix) { - return nil, errSyntheticReceiverFailure - } - c, err := m.Meter.Int64Counter(name, opts...) - if err != nil { - return nil, fmt.Errorf("failingReceiverMeter passthrough: %w", err) - } - return c, nil -} - -func (m *failingReceiverMeter) Float64Histogram(name string, opts ...metric.Float64HistogramOption) (metric.Float64Histogram, error) { - if strings.HasPrefix(name, receiverInstrumentPrefix) { - return nil, errSyntheticReceiverFailure - } - h, err := m.Meter.Float64Histogram(name, opts...) - if err != nil { - return nil, fmt.Errorf("failingReceiverMeter passthrough: %w", err) - } - return h, nil -} - -func (m *failingReceiverMeter) Float64ObservableCounter(name string, opts ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) { - if strings.HasPrefix(name, receiverInstrumentPrefix) { - return nil, errSyntheticReceiverFailure - } - c, err := m.Meter.Float64ObservableCounter(name, opts...) - if err != nil { - return nil, fmt.Errorf("failingReceiverMeter passthrough: %w", err) - } - return c, nil -} - -func (m *failingReceiverMeter) Int64ObservableGauge(name string, opts ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) { - if strings.HasPrefix(name, receiverInstrumentPrefix) { - return nil, errSyntheticReceiverFailure - } - g, err := m.Meter.Int64ObservableGauge(name, opts...) - if err != nil { - return nil, fmt.Errorf("failingReceiverMeter passthrough: %w", err) - } - return g, nil -}