Skip to content
108 changes: 85 additions & 23 deletions components/receivers/kernelevents/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package kernelevents_test

import (
"context"
"fmt"
"io"
"log/slog"
"runtime"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/tracecoreai/tracecore/internal/consumer"
"github.com/tracecoreai/tracecore/internal/pipeline"
"github.com/tracecoreai/tracecore/internal/pipeline/pipelinetest"
"github.com/tracecoreai/tracecore/internal/sli"
)

// BenchmarkReceiver_KmsgOnly drives a synthetic stream of kmsg
Expand Down Expand Up @@ -112,28 +114,27 @@ func TestReceiver_SLIBudget(t *testing.T) {
if testing.Short() {
t.Skip("SLI-budget verification skipped in -short mode")
}
t.Parallel()

const targetEvents = 5000
// p99 budget: 5ms in non-race mode, 500ms under `go test -race`.
// The race ceiling is intentionally generous because the
// race-stability workflow job runs `-count=10`, which produces
// 10x parallel-test contention on the same goroutine pool. CI
// history under that load:
// - 2026-04: ~92ms p99 → bumped ceiling 50ms → 250ms
// - 2026-05: observed 314ms p99 on the merge SHA after the
// review-G/H/I/J batch added several new tests that share
// the same -count=10 goroutine pool → bumped to 500ms
// The non-race 5ms target remains the operator-facing SLO; the
// race ceiling at 100× headroom still flags ≥10× regressions
// without absorbing every CI runner noise spike.
// NOT t.Parallel(): measuring latency under cross-test contention
// measures contention, not the receiver.

const (
warmupEvents = 500 // discarded so cold-start variance doesn't dominate p99
targetEvents = 5000
totalEvents = warmupEvents + targetEvents
)
// Two-tier p99 signal: alertP99Ms is the regression bound (emits
// ALERT to the job summary, does NOT fail). targetP99Ms is the
// hard flake guard. The non-race target is the operator-facing
// SLO; the race target is loose to absorb runner contention.
alertP99Ms := 0.5
targetP99Ms := 5.0
if isRaceBuild() {
alertP99Ms = 100.0
targetP99Ms = 500.0
}
const targetHeapMiB = 50.0 // generous; receiver under churn shouldn't exceed
const targetHeapMiB = 50.0

sink := &countingSink{latencies: make([]time.Duration, 0, targetEvents)}
sink := &countingSink{latencies: make([]time.Duration, 0, totalEvents)}
cfg, _ := kernelevents.Factory.CreateDefaultConfig().(*kernelevents.Config)
cfg.Journald.Enabled = false
cfg.MinSeverity = severityDebug
Expand All @@ -142,7 +143,7 @@ func TestReceiver_SLIBudget(t *testing.T) {
rcv, err := kernelevents.Factory.CreateLogs(t.Context(), fx.CreateSettings, cfg, sink)
require.NoError(t, err)

fixture := strings.Repeat("3,1,1,-;NVRM: Xid (PCI:0000:65:00.0): 79, pid=0\n", targetEvents)
fixture := strings.Repeat("3,1,1,-;NVRM: Xid (PCI:0000:65:00.0): 79, pid=0\n", totalEvents)
kernelevents.SetKmsgOpenForTest(rcv, func() (io.ReadCloser, error) {
return io.NopCloser(strings.NewReader(fixture)), nil
})
Expand All @@ -152,10 +153,19 @@ func TestReceiver_SLIBudget(t *testing.T) {

startWall := time.Now()
require.Eventually(t, func() bool {
return sink.count.Load() >= int64(targetEvents)
}, 30*time.Second, 10*time.Millisecond, "receiver should drain %d events", targetEvents)
return sink.count.Load() >= int64(totalEvents)
}, 30*time.Second, 10*time.Millisecond, "receiver should drain %d events", totalEvents)
wallSec := time.Since(startWall).Seconds()
throughput := float64(targetEvents) / wallSec
throughput := float64(totalEvents) / wallSec

// Discard the cold-start window. Guard: sink.count and len(latencies)
// can diverge if a future receiver emits records without
// ObservedTimestamp; fall back to no discard rather than panic.
sink.mu.Lock()
if len(sink.latencies) > warmupEvents {
sink.latencies = sink.latencies[warmupEvents:]
}
sink.mu.Unlock()

p99 := sink.percentileLatency(99)

Expand All @@ -164,8 +174,19 @@ func TestReceiver_SLIBudget(t *testing.T) {
runtime.ReadMemStats(&ms)
heapMiB := float64(ms.HeapAlloc) / (1 << 20)

t.Logf("SLI: throughput=%.0f events/sec, p99-latency=%v (target <%.0fms, race=%v), heap=%.1f MiB (target <%.0f MiB)",
throughput, p99, targetP99Ms, raceEnabled, heapMiB, targetHeapMiB)
t.Logf("SLI: throughput=%.0f events/sec, p99-latency=%v (alert=%.0fms target=%.0fms race=%v), heap=%.1f MiB (target <%.0f MiB)",
throughput, p99, alertP99Ms, targetP99Ms, raceEnabled, heapMiB, targetHeapMiB)

// `t.Logf` is hidden in CI without `-v`; surface p99 in the job
// summary so empirical history is visible per-PR.
p99Ms := float64(p99) / float64(time.Millisecond)
prefix := "SLI"
if p99Ms > alertP99Ms {
prefix = "SLI ALERT"
}
sli.PublishObservation(fmt.Sprintf(
"- %s `kernelevents`: p99=%v alert=%.0fms target=%.0fms throughput=%.0f/s heap=%.1fMiB race=%v",
prefix, p99, alertP99Ms, targetP99Ms, throughput, heapMiB, raceEnabled))

p99Budget := time.Duration(targetP99Ms * float64(time.Millisecond))
require.Less(t, p99, p99Budget, "p99 emit latency budget")
Expand All @@ -178,6 +199,47 @@ func TestReceiver_SLIBudget(t *testing.T) {
// /proc/self/stat or syscall.Rusage on Linux CI).
}

// TestSLIBudget_WarmupDiscardIsLoadBearing falsifies the trim used
// by TestReceiver_SLIBudget. Injects cold-start spikes into a
// fabricated latency stream and asserts the trim collapses p99 by
// ≥10×. If a future refactor drops the discard, this test fails.
func TestSLIBudget_WarmupDiscardIsLoadBearing(t *testing.T) {
t.Parallel()

const (
warmupEvents = 500
targetEvents = 5000
)

sink := &countingSink{
latencies: make([]time.Duration, 0, warmupEvents+targetEvents),
}
// Order matters: latencies are recorded in arrival order; the
// trim discards the leading warmupEvents entries.
for range warmupEvents {
sink.latencies = append(sink.latencies, time.Second)
}
for range targetEvents {
sink.latencies = append(sink.latencies, 10*time.Millisecond)
}

// percentileLatency clones internally, so this doesn't consume
// the data we then mutate below.
p99Untrimmed := sink.percentileLatency(99)
require.GreaterOrEqual(t, p99Untrimmed, time.Second,
"sanity: untrimmed p99 lands in the spike region")

sink.mu.Lock()
sink.latencies = sink.latencies[warmupEvents:]
sink.mu.Unlock()

p99Trimmed := sink.percentileLatency(99)
require.Less(t, p99Trimmed, 100*time.Millisecond,
"after trim, p99 reflects steady-state latencies")
require.GreaterOrEqual(t, p99Untrimmed, p99Trimmed*10,
"trim must collapse p99 by ≥10×")
}

// --- bench helpers ---

type countingSink struct {
Expand Down
37 changes: 37 additions & 0 deletions docs/FOLLOWUPS.md
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,43 @@ sites (internal/telemetry/{build_info,slo}.go) that match the
but premature pre-v0. *Trigger:* v0 cut — integrate or write
a one-line rationale for skipping.

### CI flake hygiene

- [ ] **Audit timing-sensitive test assertions for the same flake
shape.** Two flakes of identical structure surfaced in one
session: `TestReceiver_SLIBudget` (regex bounded `0\.0[0-9]+`
→ 0.126s observed) and `TestReceiver_SetDegraded` (same
pattern, same fix shape). Both were timing-sensitive
assertions calibrated to fast-runner expectations. Grep for
similar shapes: `require\.Regexp.*0\\\.0\[0-9\]` across `*_test.go`,
`time\.Sleep` followed by a regex assertion, fixed-millisecond
assertions in test bodies. *Trigger:* next time a similar
flake appears, or as part of test-hygiene sweep before v0 cut.
- *Closed: SLI p99 now emitted to `GITHUB_STEP_SUMMARY` from
`TestReceiver_SLIBudget`; visible in the PR job-summary tab on every
CI run, accumulates across PRs for empirical history.*
- [ ] **Tighten the SLI race-mode budget from 500ms once CI history
accumulates.** After ~10 PR runs of CI `GITHUB_STEP_SUMMARY`
observations, drop the 500ms ceiling to `worst_observed × 2`
(likely 100–150ms). The current 100× SLO headroom catches only
gross regressions; 20–30× is a meaningful regression signal.
*Trigger:* after 10+ PR runs with the new visibility shipped
and the observed p99 sustained well under 250ms.
- [ ] **Switch the SLI assertion from absolute budget to
baseline-relative.** Replace `require.Less(p99, 500ms)` with
`require.Less(p99, baseline.p99 × 1.5)` where `baseline` is a
checked-in `.sli-baseline.json`. Catches actual SLO regressions
(the receiver got slower) rather than absolute breaches (the
runner had a bad day). *Trigger:* once a stable baseline exists
from accumulated CI history.
- [ ] **Repo-wide flake-pattern lint.** Lint gate that grep's for
timing-sensitive regex assertions with implicit upper bounds
(e.g., `require\.Regexp.*0\\\.0\[0-9\]`, `time\.Sleep` followed
by a regex assertion). Catches the class of issue at PR time
instead of in production CI. *Trigger:* when the flake-pattern
audit above (the broader sweep) lands the first concrete
heuristic, or as part of test-hygiene work before v0 cut.

### CI wall-time (post-verify-split)

- [ ] **Shard `coverage-check` across multiple jobs to close the
Expand Down
31 changes: 31 additions & 0 deletions internal/sli/emit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// SPDX-License-Identifier: Apache-2.0

// Package sli publishes SLI / perf-budget observations to the GitHub
// Actions job-summary surface. `t.Logf` is suppressed in CI without
// `-v`, so empirical history is otherwise invisible; PublishObservation
// fills the gap. No-op outside GitHub Actions. Errors are dropped so
// publishing never fails a test.
package sli

import (
"fmt"
"os"
)

// PublishObservation appends a line to GITHUB_STEP_SUMMARY when the
// env var is set. Best-effort.
func PublishObservation(line string) {
summaryPath := os.Getenv("GITHUB_STEP_SUMMARY")
if summaryPath == "" {
return
}
// #nosec G304 -- GITHUB_STEP_SUMMARY is a writable temp path set
// by the GitHub Actions runner and is the documented API for
// appending to a workflow's job-summary surface.
f, err := os.OpenFile(summaryPath, os.O_APPEND|os.O_WRONLY, 0)
if err != nil {
return
}
defer func() { _ = f.Close() }()
_, _ = fmt.Fprintln(f, line)
}
37 changes: 37 additions & 0 deletions internal/sli/emit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// SPDX-License-Identifier: Apache-2.0

package sli_test

import (
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"

"github.com/tracecoreai/tracecore/internal/sli"
)

func TestPublishObservation_NoOpWhenEnvUnset(t *testing.T) {
t.Setenv("GITHUB_STEP_SUMMARY", "")
sli.PublishObservation("- ignored line")
}

func TestPublishObservation_AppendsWhenEnvSet(t *testing.T) {
summaryPath := filepath.Join(t.TempDir(), "summary.md")
require.NoError(t, os.WriteFile(summaryPath, []byte("# prior\n"), 0o600))
t.Setenv("GITHUB_STEP_SUMMARY", summaryPath)

sli.PublishObservation("- first line")
sli.PublishObservation("- second line")

content, err := os.ReadFile(summaryPath) // #nosec G304 -- test-controlled path under t.TempDir()
require.NoError(t, err)
require.Equal(t, "# prior\n- first line\n- second line\n", string(content))
}

// Best-effort: an unwritable path must not panic or fail the test.
func TestPublishObservation_SilentOnUnwritablePath(t *testing.T) {
t.Setenv("GITHUB_STEP_SUMMARY", filepath.Join(t.TempDir(), "does-not-exist", "summary.md"))
sli.PublishObservation("- ignored line")
}
Loading