Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
e42289f
[ci] Bump integration test deadlines from 1.5s to 5s
trilamsr May 14, 2026
741aa2c
[telemetry] Extend CreateSettings + TelemetrySettings for M2
trilamsr May 14, 2026
0e4e172
[telemetry] M2 WI2: MeterProvider construction
trilamsr May 14, 2026
fc7b1b1
[telemetry] M2 WI5: real selftelemetry.Receiver impl
trilamsr May 14, 2026
d0147a5
[telemetry] M2 WI6+7+8: HTTP server + /metrics + /healthz + /readyz
trilamsr May 14, 2026
45dd77f
[telemetry] M2 WI9: telemetry: block in operator-facing config
trilamsr May 14, 2026
f15a58c
[telemetry] M2 WI12: migrate ReportStatus to componentstatus pkg
trilamsr May 14, 2026
59400cf
[telemetry] M2 WI10a: thread MeterProvider + BuildInfo
trilamsr May 14, 2026
53ab3c8
[telemetry] M2 WI11: clockreceiver self-telemetry wire-up
trilamsr May 14, 2026
c3e057f
[telemetry] M2 WI16+15: wire telemetry into cmd/tracecore
trilamsr May 14, 2026
438ae61
[telemetry] M2 docs: RFC-0006, README, FAILURE-MODES, STRATEGY
trilamsr May 14, 2026
c4bdf78
[telemetry] M2 docs follow-up: CHANGELOG + RECEIVER-PATTERNS
trilamsr May 14, 2026
c35010a
[telemetry] M2 WI14: O2 SLO observable gauges
trilamsr May 14, 2026
fab64f8
[telemetry] M2 docs: criterion 10 delivered
trilamsr May 14, 2026
b821bb5
[telemetry] M2: tighten integration test for criterion 8
trilamsr May 14, 2026
b6ddd39
[telemetry] M2 review: fix Server hang + Serve error swallow
trilamsr May 14, 2026
3edffdc
[telemetry] M2 review: windowed failure_rate (60s rolling)
trilamsr May 14, 2026
77e1e4e
[telemetry] M2 review: tests + small fixes
trilamsr May 14, 2026
1c6c117
[telemetry] M2 review: docs reflect windowed + bug-fix work
trilamsr May 14, 2026
6b4e703
[telemetry] M2 polish: dedup, build_info, drain policy, errors test
trilamsr May 14, 2026
c685f43
[telemetry] M2 Loop4 Pass1: blockers + strong findings
trilamsr May 14, 2026
0fa17a6
[telemetry] M2 Loop4 Pass2: security + doc fixes
trilamsr May 14, 2026
628c5ea
[telemetry] M2 Loop4 Pass3: M8/M9 author unblockers
trilamsr May 14, 2026
6b0af2d
[telemetry] M2 docs: document Loop 4 reviewer deferrals
trilamsr May 14, 2026
fbefc9a
[telemetry] M2 A+ pass: verifications + safety + perf evidence
trilamsr May 14, 2026
4fa276b
[telemetry] M2 self-review Phase 1: pkg distinction + glyph + lessons
trilamsr May 14, 2026
ca30681
[telemetry] M2 self-review Phase 2: safety + features
trilamsr May 14, 2026
569f79e
[telemetry] M2 self-review Phase 3: ServerConfig.Paths sub-struct
trilamsr May 14, 2026
9a9fa80
[telemetry] M2 self-review Phase 4: split selftelemetry/impl.go
trilamsr May 14, 2026
b975fdf
[telemetry] M2 self-review Phase 5: extract WindowedRate primitive
trilamsr May 14, 2026
1da39bc
[telemetry] M2 self-review Phase 6: hoist test fakes to pipelinetest
trilamsr May 14, 2026
8d7ade3
[telemetry] M2 review Phase 7: extract registerObs + runRuntime
trilamsr May 14, 2026
a9862cb
[telemetry] M2 review Phase 8: tracecore validate --explain
trilamsr May 14, 2026
91212f2
[telemetry] M2 review Phase 9: JSON probes + deprecation policy
trilamsr May 14, 2026
6605aa7
[telemetry] M2 review Phase 10: consolidate export_test files
trilamsr May 14, 2026
a9d6def
[telemetry] A+ batch 1: promote ExporterCarrier + drop mutex
trilamsr May 14, 2026
1b4346b
[telemetry] A+ batch 2: JSON cache + PromHandler cache + SLO helper
trilamsr May 14, 2026
a1ea906
[telemetry] A+ batch 3: path validation + WindowedRate rationale
trilamsr May 14, 2026
a89b7fa
[telemetry] A+ batch 4: fd-leak count + tick m2 success criteria
trilamsr May 14, 2026
9f269b2
[telemetry] Trim WHAT comments + document phased deferrals
trilamsr May 14, 2026
8e726ec
[telemetry] Ecosystem A+: alerts + CapturingReceiver + buckets
trilamsr May 14, 2026
789020d
[telemetry] A++ pass: diagram + bench docs + Grafana + SECURITY
trilamsr May 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ Pre-alpha. The CLI runs the M1 pipeline runtime end-to-end via factory-based ass
- **`internal/selftelemetry`** — producer-side `Receiver` interface (`IncError`, `IncEmissions`, `ObserveLatency`, `SetDegraded`, `MarkActivity`) that components write to when reporting their own health, plus a noop default. The `/metrics` endpoint that surfaces these to operators is owned by M2; this package lets M8+ receivers wire to self-telemetry from day one without waiting for M2.
- **`docs/agents/REVIEWER-CONTEXT.md`** — pre-digested standards bundle for review subagents launched from the parallel-agent ralph-loops (M8, M9, M10+). Consolidates the must-read entries from `STYLE.md`, `PRINCIPLES.md`, `STYLE-errors.md`, `NORTHSTARS.md`, and the current divergences table.
- **`docs/agents/RECEIVER-PATTERNS.md`** — cross-loop knowledge sink. Each parallel-agent receiver loop appends patterns future receiver authors should inherit (build-tag layout, streaming-source lifecycle, subprocess teardown, self-telemetry wiring, cardinality cap, degraded-mode re-arm, correlation-context propagation).
- **M2 — self-telemetry surface (`/metrics`, `/healthz`, `/readyz`) + MeterProvider + ReportStatus alignment.** [RFC-0006](docs/rfcs/RFC-0006-self-telemetry-surface.md):
- `internal/telemetry.NewMeterProvider()` constructs the OTel SDK MeterProvider backed by an OTel Prometheus exporter writing to a tracecore-owned `*prometheus.Registry`.
- `internal/telemetry.Server` mounts `/metrics`, `/healthz`, `/readyz` on a single listener. Default OFF; default `listen: "localhost:8888"` when enabled.
- `internal/selftelemetry.NewReceiver(id, mp)` real impl backing the 5-method interface. Receivers acquire it from `TelemetrySettings.MeterProvider` in one line.
- `clockreceiver` wired as the canonical example: ObserveLatency around every push, IncEmissions/MarkActivity on success, IncError("downstream") on failure.
- `cmd/tracecore` plumbs MeterProvider + BuildInfo + Server lifecycle. ReadyFn flips when Runtime.Start returns.
- **Closes three STRATEGY M2 divergence rows:** `Host.ReportStatus` → free fn `internal/componentstatus.ReportStatus(host, ev)`; `CreateSettings` gains `BuildInfo` + `_ struct{}` guard; `TelemetrySettings` gains `MeterProvider` + `_ struct{}` guard.
- End-to-end integration tests pin the operator-observable contract: scrape returns 200 + expected metric names; default-off does not bind a port.
- **O2 SLO observable gauges** (`tracecore.exporter.failure_rate`, `tracecore.queue.depth_ratio`, `tracecore.component.restart_count_per_hour`): `failure_rate` is a **rolling 60s window** rate driven by real exporter signal via `selftelemetry.Exporter` wired into `stdoutexporter` (the lifetime cumulative ratio would have pinned the gauge above 0 forever after a single failure — useless for SLO alerting); `queue.depth_ratio` and `restart_count_per_hour` report 0 today (carry-forward until tracecore has a queue mechanism + a runtime restart mechanism). Raw counter `tracecore.exporter.calls_total{result,kind}` also surfaces so operators can derive custom windows in PromQL.

### Changed

Expand Down
29 changes: 21 additions & 8 deletions MILESTONES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ See [`NORTHSTARS.md`](NORTHSTARS.md) §O6 for the governing operating rules.
**Target hit-rate:** ≥80% of the milestones below shipped on the committed week.
**Scope:** in-repo work only. Standards-body engagement (NORTHSTARS.md O4) and external community work are recurring cadences, not quarterly milestones — they live in [`NORTHSTARS.md`](NORTHSTARS.md), not here.

**Status glyphs:** ☐ planned / not yet started · ☑ delivered · ☒ deliberately not done (policy choice or out-of-scope) · ⊟ carry-forward (intent valid, deferred for a documented reason).

## Status legend

- ☐ **planned** — committed at quarter start
Expand Down Expand Up @@ -81,17 +83,28 @@ Must reach `⧗` before Phase 2 broadly starts. Eight foundation tracks; most ca

### M2. Self-telemetry surface

- **Status:**
- **Status:** ☑ (delivered)
- **Owner:** observability lead
- **Files touched:** `internal/telemetry/*.go`, `cmd/tracecore/main.go` (wire-up)
- **Files touched:** `internal/telemetry/*.go`, `internal/selftelemetry/impl*.go`, `internal/componentstatus/*.go`, `internal/pipeline/{component,factory,runtime}.go`, `internal/pipelinebuilder/builder.go`, `internal/config/{config,load,telemetry_test}.go`, `components/receivers/clockreceiver/clockreceiver.go`, `cmd/tracecore/{collect,integration_telemetry_test}.go`, [RFC-0006](docs/rfcs/RFC-0006-self-telemetry-surface.md), `internal/telemetry/README.md`.
- **Depends on:** M1 alpha
- **Acceptance:**
- `/metrics` (Prometheus exposition), `/healthz`, `/readyz` endpoints on configurable port
- Optional `pprof` via `--debug.pprof` flag (off by default)
- Every component contributes ingested / dropped / queue-depth / exporter success-failure counters through this single surface (per [RFC 0001](docs/rfcs/0001-architecture-overview.md) §Self-telemetry)
- Self-telemetry SLO thresholds wired: exporter failure rate >0.1% sustained → `/readyz` reports degraded; component restart >1/hr → `/readyz` reports degraded
- Structured `slog` logging on stderr, JSON by default (per [`STYLE.md`](STYLE.md) §Logging)
- `internal/telemetry/README.md` documents the contract
- ☑ `/metrics` (Prometheus exposition), `/healthz`, `/readyz` endpoints on configurable port
- ☐ Optional `pprof` via `--debug.pprof` flag — **Carry-forward from M2:** security policy story > 5 LOC of plumbing.
- ☑ Receivers contribute self-metrics (errors/emissions/latency/degraded/last-activity) through `selftelemetry.Receiver` injected via `TelemetrySettings.MeterProvider`.
- ☑ Three O2 SLO observable gauges emitted with exact names: `tracecore.exporter.failure_rate` (driven by real exporter signal via `selftelemetry.Exporter` wired into `stdoutexporter`), `tracecore.queue.depth_ratio` (0 — **Carry-forward from M2** until queue mechanism lands), `tracecore.component.restart_count_per_hour` (0 — **Carry-forward from M2** until restart mechanism lands). Raw counter `tracecore.exporter.calls_total{result,kind}` also emitted so operators can derive richer rates.
- ☒ Self-telemetry SLO thresholds wired to `/readyz` — **Policy-declined**, not deferred. RFC-0006 explicitly chose degraded ≠ not-ready, so k8s doesn't evict pods on transient backend issues. Operators alert on the SLO gauges via Prometheus rules rather than via /readyz. (☒ glyph distinguishes a deliberate policy choice from a carry-forward.)
- ☑ Structured `slog` logging on stderr (inherited from M1).
- ☑ `internal/telemetry/README.md` documents the contract.
- ☑ Three STRATEGY M2 divergences closed (`Host.ReportStatus` → free fn, `CreateSettings` BuildInfo + guard, `TelemetrySettings` MeterProvider + guard).
- **Carry-forward from M2:**
- pprof endpoint (security policy work).
- Queue mechanism that drives `tracecore.queue.depth_ratio` (the gauge is registered + reports 0 today; needs a queue impl).
- Component restart mechanism that drives `tracecore.component.restart_count_per_hour` (gauge registered + reports 0 today; needs a runtime restart impl).
- OTLP push reader on the MeterProvider (operators on push-only backends).
- `MetricsLevel` knob (only when cardinality becomes a real problem).
- Histogram bucket tuning for `collection_latency_seconds`.
- Per-role `CreateSettings` split (`receiver.Settings` / `exporter.Settings` / `processor.Settings`).
- `TracerProvider` field on `TelemetrySettings` (tracing milestone).
- **Can run alongside:** M3, M4, M4b, M5, M5b, M6 in drafting; final integration after M1 alpha.
- **Effort:** ≈1 week

Expand Down
16 changes: 15 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: help build run test fmt fmt-fix vet lint lint-fix tidy tidy-check mod-verify license-check license-fix govulncheck dco-check ai-review hooks clean check ci generate generate-check coverage coverage-check doc-check
.PHONY: help build run test bench bench-check fmt fmt-fix vet lint lint-fix tidy tidy-check mod-verify license-check license-fix govulncheck dco-check ai-review hooks clean check ci generate generate-check coverage coverage-check doc-check

BIN := tracecore
PKG := ./cmd/tracecore
Expand Down Expand Up @@ -34,6 +34,20 @@ run: ## Run tracecore from source (no version embedding).
test: ## Run unit tests with the race detector.
go test -race ./...

bench: ## Run benchmarks across the repo with -benchmem, count=5.
go test -bench=Benchmark -benchmem -benchtime=500ms -count=5 -run='^$$' ./...

bench-check: ## Compare current bench against internal/telemetry/testdata/bench-baseline.txt. Fails if any row regresses >10% on geomean.
@out=$$(mktemp); \
go test -bench=Benchmark -benchmem -benchtime=500ms -count=5 -run='^$$' ./internal/telemetry/ > $$out 2>&1; \
if ! command -v benchstat >/dev/null 2>&1; then \
echo "benchstat not installed; install with: go install golang.org/x/perf/cmd/benchstat@latest"; \
exit 2; \
fi; \
echo "Comparing $$out against internal/telemetry/testdata/bench-baseline.txt..."; \
benchstat internal/telemetry/testdata/bench-baseline.txt $$out


fmt: ## Check formatting; fails if any file is not gofumpt-clean.
@diff=$$(go tool gofumpt -l .); \
if [ -n "$$diff" ]; then echo "gofumpt needs to be run on:"; echo "$$diff"; exit 1; fi
Expand Down
210 changes: 207 additions & 3 deletions cmd/tracecore/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,57 @@ package main

import (
"context"
"fmt"
"log/slog"
"sync/atomic"
"time"

"github.com/tracecoreai/tracecore/internal/config"
"github.com/tracecoreai/tracecore/internal/pipeline"
"github.com/tracecoreai/tracecore/internal/pipelinebuilder"
"github.com/tracecoreai/tracecore/internal/selftelemetry"
"github.com/tracecoreai/tracecore/internal/telemetry"
"github.com/tracecoreai/tracecore/internal/version"
)

// exporterRegistry implements telemetry.ExporterRegistry over the
// exporters built by pipelinebuilder. Captured at build time;
// readers see a stable snapshot of registered FailureRateReaders.
type exporterRegistry struct {
readers []selftelemetry.FailureRateReader
}

func (r exporterRegistry) RegisteredExporters() []selftelemetry.FailureRateReader {
return r.readers
}

func collectFailureRateReaders(pipelines []pipeline.Pipeline) exporterRegistry {
// Dedup by FailureRateReader identity (pointer equality — the
// FailureRateReader docstring pins the pointer-type requirement).
// Same exporter referenced from multiple pipelines must contribute
// once, not N times, or the failure_rate aggregate is multiplied.
seen := map[selftelemetry.FailureRateReader]struct{}{}
var readers []selftelemetry.FailureRateReader
for _, p := range pipelines {
for _, exp := range p.Exporters {
carrier, ok := exp.(selftelemetry.ExporterCarrier)
if !ok {
continue
}
frr, ok := carrier.SelfExporter().(selftelemetry.FailureRateReader)
if !ok {
continue
}
if _, dup := seen[frr]; dup {
continue
}
seen[frr] = struct{}{}
readers = append(readers, frr)
}
}
return exporterRegistry{readers: readers}
}

// runCollect loads config, builds the pipeline runtime, runs until the
// context is cancelled (SIGINT/SIGTERM), then performs two-phase
// shutdown. Returns the process exit code.
Expand All @@ -32,12 +74,99 @@ func runCollect(ctx context.Context, logger *slog.Logger, configPath string, dra
return exitDataErr
}

pipelines, err := pipelinebuilder.BuildPipelines(ctx, logger, cfg, components())
buildInfo := pipeline.BuildInfo{
Command: "tracecore",
Description: "tracecore observability collector for GPU clusters",
Version: info.Version,
}

// Construct the production MeterProvider only when the operator
// opted in. Otherwise BuildPipelines's default noop is fine and
// the HTTP server stays off.
var (
meterProvider *telemetry.MeterProvider
telemetrySrv *telemetry.Server
ready atomic.Bool
)
if cfg.Telemetry.Enabled {
mp, srv, err := initTelemetryStack(ctx, logger, cfg.Telemetry, ready.Load)
if err != nil {
logger.Error("init telemetry stack", "err", err)
return exitFailure
}
meterProvider = mp
telemetrySrv = srv
}

buildOpts := []pipelinebuilder.BuildOption{pipelinebuilder.WithBuildInfo(buildInfo)}
if meterProvider != nil {
buildOpts = append(buildOpts, pipelinebuilder.WithMeterProvider(meterProvider.Provider))
}

pipelines, err := pipelinebuilder.BuildPipelines(ctx, logger, cfg, components(), buildOpts...)
if err != nil {
logger.Error("build pipelines", "err", err)
shutdownTelemetry(ctx, logger, telemetrySrv, meterProvider)
return exitDataErr
}

if err := registerObservability(meterProvider, pipelines, buildInfo, info.Revision); err != nil {
logger.Error("register observability metrics", "err", err)
shutdownTelemetry(ctx, logger, telemetrySrv, meterProvider)
return exitFailure
}

return runRuntime(ctx, logger, pipelines, drainBudget, &ready, telemetrySrv, meterProvider)
}

// registerObservability wires the SLO observable gauges and the
// build_info join-target onto the operator-facing MeterProvider once
// pipelines are built and exporters are concrete. No-op when
// telemetry is disabled (meterProvider == nil).
func registerObservability(mp *telemetry.MeterProvider, pipelines []pipeline.Pipeline, buildInfo pipeline.BuildInfo, revision string) error {
if mp == nil {
return nil
}
src := telemetry.NewAggregateSLOSource(
collectFailureRateReaders(pipelines),
telemetry.DefaultSLOWindow,
)
if err := telemetry.RegisterSLOMetrics(mp.Provider, src); err != nil {
return fmt.Errorf("SLO metrics: %w", err)
}
// Surface build identity as a Prometheus join-target so
// operators see version metadata next to any tracecore metric.
// Mirrors the otelcol_build_info / prometheus_build_info
// convention.
if err := telemetry.RegisterBuildInfo(mp.Provider, map[string]string{
"command": buildInfo.Command,
"version": buildInfo.Version,
"revision": revision,
}); err != nil {
return fmt.Errorf("build_info metric: %w", err)
}
return nil
}

// runRuntime starts the pipeline runtime, flips /readyz to ready,
// blocks until the context is cancelled (SIGINT/SIGTERM), then
// performs the ordered shutdown:
//
// 1. flip /readyz to 503 so scrapers stop new traffic during drain.
// 2. shut down the runtime (drains receivers + exporters).
// 3. shut down telemetry server + MeterProvider in that order so
// a final scrape doesn't fail mid-export.
//
// Returns the process exit code.
func runRuntime(
ctx context.Context,
logger *slog.Logger,
pipelines []pipeline.Pipeline,
drainBudget time.Duration,
ready *atomic.Bool,
telemetrySrv *telemetry.Server,
meterProvider *telemetry.MeterProvider,
) int {
rt := pipeline.NewRuntime(pipelines,
pipeline.WithLogger(logger),
pipeline.WithDrainBudget(drainBudget),
Expand All @@ -52,22 +181,97 @@ func runCollect(ctx context.Context, logger *slog.Logger, configPath string, dra
if shutdownErr := rt.Shutdown(ctx); shutdownErr != nil {
logger.Warn("shutdown after failed start", "err", shutdownErr)
}
shutdownTelemetry(ctx, logger, telemetrySrv, meterProvider)
return exitFailure
}
ready.Store(true)

logger.Info("tracecore running; waiting for signal")
<-ctx.Done()
logger.Info("shutdown signal received", "drain_budget", drainBudget)

// Flip /readyz to 503 before the pipeline begins shutting down so
// scrapers stop sending traffic our way.
ready.Store(false)

// WithoutCancel preserves ctx values but drops the already-fired
// cancellation so Shutdown's own per-phase timeouts can apply
// against a fresh deadline.
shutdownCtx := context.WithoutCancel(ctx)
if err := rt.Shutdown(shutdownCtx); err != nil {
logger.Error("shutdown", "err", err)
rtErr := rt.Shutdown(shutdownCtx)
if rtErr != nil {
logger.Error("shutdown", "err", rtErr)
}

shutdownTelemetry(shutdownCtx, logger, telemetrySrv, meterProvider)

if rtErr != nil {
return exitFailure
}

logger.Info("tracecore stopped cleanly")
return exitOK
}

// initTelemetryStack constructs the MeterProvider + Server pair when
// the operator enabled the self-telemetry surface. On any error it
// drains whatever has already been built and returns the error,
// keeping runCollect's complexity bounded.
func initTelemetryStack(ctx context.Context, logger *slog.Logger, cfg config.Telemetry, readyFn func() bool) (*telemetry.MeterProvider, *telemetry.Server, error) {
mp, err := telemetry.NewMeterProvider()
if err != nil {
return nil, nil, fmt.Errorf("meter provider: %w", err)
}

srv, err := telemetry.NewServer(telemetry.ServerConfig{
Listen: cfg.Listen,
MeterProvider: mp,
Paths: telemetry.Paths{
Metrics: cfg.Paths.Metrics,
Healthz: cfg.Paths.Healthz,
Readyz: cfg.Paths.Readyz,
},
ReadyFn: readyFn,
Logger: logger,
})
if err != nil {
if sErr := mp.Shutdown(ctx); sErr != nil {
logger.Warn("shutdown telemetry meter provider after init failure", "err", sErr)
}
return nil, nil, fmt.Errorf("telemetry server: %w", err)
}

// Start the telemetry server FIRST so /healthz and /readyz are
// observable from the moment the binary commits to running.
// /readyz still reports 503 until the pipeline has Started.
if err := srv.Start(ctx); err != nil {
if sErr := mp.Shutdown(ctx); sErr != nil {
logger.Warn("shutdown telemetry meter provider after start failure", "err", sErr)
}
return nil, nil, fmt.Errorf("start telemetry server: %w", err)
}

logger.Info("telemetry surface listening",
"listen", cfg.Listen,
"metrics_path", cfg.Paths.Metrics)

return mp, srv, nil
}

// shutdownTelemetry drains the telemetry server + MeterProvider in
// that order; the server's connections must close before metric
// exports drain so a final scrape doesn't fail mid-export. Both calls
// are idempotent; nil receivers are accepted (telemetry surface was
// disabled).
func shutdownTelemetry(ctx context.Context, logger *slog.Logger, srv *telemetry.Server, mp *telemetry.MeterProvider) {
if srv != nil {
if err := srv.Shutdown(ctx); err != nil {
logger.Warn("shutdown telemetry server", "err", err)
}
}
if mp != nil {
if err := mp.Shutdown(ctx); err != nil {
logger.Warn("shutdown telemetry meter provider", "err", err)
}
}
}
Loading
Loading