Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
f3b9699
[rfc] 0012: kineto receiver scope (M14) — design-locked
trilamsr May 19, 2026
b3ae90c
[milestones] M14 rubric corrections: step-id source, sampling determi…
trilamsr May 19, 2026
a70472f
[pkg/kineto] add Event struct, phase/category constants, error sentin…
trilamsr May 19, 2026
23f7066
[pkg/kineto] streaming Parse + decoder tests (empty/single/malformed/…
trilamsr May 19, 2026
a64b86e
[pkg/kineto] Synthesize generator with deterministic output + roundtr…
trilamsr May 19, 2026
6f3278e
[pkg/kineto] add toy_2step fixture + golden + SHA256SUMS; Makefile ge…
trilamsr May 19, 2026
af74894
[pkg/kineto] FuzzParseKinetoTrace + adversarial seeds (Task 11)
trilamsr May 19, 2026
842ec38
[pkg/kineto] BenchmarkParse_50MB advisory baseline (Task 12)
trilamsr May 19, 2026
1b86930
[pkg/kineto] lint fixes: errorlint %w-wrapping, prealloc, errcheck, g…
trilamsr May 19, 2026
80199e4
[receivers/kineto] M14 receiver: config, step tracker, rank, emit, in…
trilamsr May 19, 2026
0744e33
[receivers/kineto] aggregation + factory + shutdown + degraded tests …
trilamsr May 19, 2026
9ec9bcb
[receivers/kineto] lint fixes: contextcheck nolint, wrapcheck, gocycl…
trilamsr May 19, 2026
8462aff
[receivers/kineto] register factory in components.yaml + README/RUNBO…
trilamsr May 19, 2026
30f233a
[milestones] M14 ⧗ → ☑ alpha; functional rubrics ☑ (Task 28)
trilamsr May 19, 2026
e08a7ad
[bench] kineto overhead + soak + 2GB stress tests; wall-time runs def…
trilamsr May 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions MILESTONES.md
Original file line number Diff line number Diff line change
Expand Up @@ -450,27 +450,30 @@ Alpha unified-source logs receiver covering L2 + L9 (kernel + system events). Ta

### M14. Kineto profiler receiver

- **Status:** ☐
- **Depends on:** M1, M15 (for `gen_ai.training.rank` / `gen_ai.training.world_size` discovery — M14 does not read Pod env directly)
- **Status:** ☑ alpha
- **Depends on:** M1
- **Reference:** [RFC-0012](docs/rfcs/0012-kineto-receiver-scope.md)
- **Hardware:** Linux; no GPU required to ingest checked-in `.pt.trace.json` fixtures
- **Landed:** `pkg/kineto/` streaming Chrome-trace parser + `Synthesize` deterministic generator + toy_2step fixture + golden + `FuzzParseKinetoTrace`; `components/receivers/kineto/` receiver with `ProfilerStep#N` single-pass step tracker, `/proc/<pid>/environ` rank discovery, deterministic 1% sampling, optional `cpu_op` aggregation, fsnotify watch loop, `safe.Call` wrap, README + RUNBOOK; factory wired in `components.yaml` + `cmd/tracecore/components.go`.
- **Carry-forward:** Non-functional rubrics (CPU / egress / soak / 2 GB heap ceiling) pending PR D bench gates; `strace`-based read-only assertion (FOLLOWUPS row: trigger when ingest.go's OpenFile changes); upstream `gen_ai.training.*` semconv ratification (NORTHSTARS O4).

**Functional rubrics:**
- Consumes `*.pt.trace.json` Chrome-trace files from configurable watch directory; emits one OTel span per `cat=cpu_op` and per `cat=kernel` event with `traceEvents[].ph`, `ts`, `dur`, `name`, `args.External id`, `args.Stream` mapped to span attributes. Golden-file test with fixture from 2-step toy model. (per https://github.com/pytorch/kineto)
- Supports the eight Kineto categories tracecore consumes: `cpu_op`, `kernel`, `gpu_memcpy`, `gpu_memset`, `cuda_runtime`, `cuda_driver`, `user_annotation`, `python_function` — subset of `_activityTypeNames` in [pytorch/kineto `libkineto/include/ActivityType.h`](https://github.com/pytorch/kineto/blob/main/libkineto/include/ActivityType.h). Categories outside the consumed subset (`gpu_user_annotation`, `external_correlation`, `cpu_instant_event`, `overhead`, `cuda_sync`, `cuda_event`, `collective_comm`, MTIA/XPU/HPU variants) fall through to the `tracecore.kineto.unknown_category` counter.
- Every span carries `gen_ai.training.rank` (joined via M15-emitted rank attribution) and `gen_ai.training.step_id` (sourced from Kineto's own `args.Iteration` field or parsed from filename — *not* from M15), `pid`, `tid`, `device.id`, `stream.id`. Attribute-presence test on golden fixture.
- 1% per-step sampling (configurable via `sample_rate`); deterministic per `(rank, step)` using `gen_ai.training.world_size` discovered via M15 to compute `hash(rank, step) mod 100`. 1000-step test asserts identical sampling sets across 8 simulated ranks.
- Rejects malformed JSON, truncated traces, non-Chrome-trace files with structured error; 100% of bad inputs produce one classified error log + `tracecore.receiver.kineto.parse_failures` increment.
- `fr_trace`-*inspired* aggregation toggle (semantic analogy only — the upstream `tools/flight_recorder/fr_trace.py` introduced in PyTorch 2.5 aggregates NCCL collectives across ranks, not repeated `cpu_op` spans; see [Flight Recorder tutorial](https://docs.pytorch.org/tutorials/unstable/flight_recorder_tutorial.html)): consecutive `cpu_op` spans with identical `name` within per-step window collapse with `repeat.count`; gated by `aggregate: true` config.
- Honors `delete_after_ingest: bool` (default `false`); when true, source unlinked only after successful OTLP ack.
- Consumes `*.pt.trace.json` Chrome-trace files from configurable watch directory; emits one OTel span per `cat=cpu_op` and per `cat=kernel` event with `traceEvents[].ph`, `ts`, `dur`, `name`, `args.External id`, `args.Stream` mapped to span attributes. Golden-file test with fixture from 2-step toy model. (per https://github.com/pytorch/kineto)
- Supports the eight Kineto categories tracecore consumes: `cpu_op`, `kernel`, `gpu_memcpy`, `gpu_memset`, `cuda_runtime`, `cuda_driver`, `user_annotation`, `python_function` — subset of `_activityTypeNames` in [pytorch/kineto `libkineto/include/ActivityType.h`](https://github.com/pytorch/kineto/blob/main/libkineto/include/ActivityType.h). Categories outside the consumed subset (`gpu_user_annotation`, `external_correlation`, `cpu_instant_event`, `overhead`, `cuda_sync`, `cuda_event`, `collective_comm`, MTIA/XPU/HPU variants) fall through to the `tracecore.kineto.unknown_category` counter.
- Every span carries `gen_ai.training.rank` (resource attr; rank discovered via `/proc/<traced_pid>/environ` per RFC-0012 §Rank discovery, not from M15 pipeline output), `gen_ai.training.step_id` (derived by interval-mapping each event's `ts` to the enclosing `ProfilerStep#N` `user_annotation` window, or from `step_filename_regex` operator override when no ProfilerStep events present; see RFC-0012 §Step-ID detection), `pid`, `tid`, `device.id`, `stream.id`. Attribute-presence test on golden fixture.
- 1% per-step sampling (configurable via `sample_rate`); deterministic per `(rank, step)` using `fnv64a(rank<<32 | step_id) mod 10000 < sample_rate*10000` (10000 granularity admits 0.01% configs). 1000-step test asserts: (a) repeated evaluation of the same `(rank, step)` key yields the same decision; (b) per-rank step-keep count for rank=0 is in [9, 11] (1% of 1000 ± fnv collision slack). Different ranks produce different keep sets *by construction*; the rank is the join key. See RFC-0012 §Sampling.
- Rejects malformed JSON, truncated traces, non-Chrome-trace files with structured error; bad inputs produce a classified error log + `tracecore_receiver_errors_total{kind="parse"}` increment (once per file, not per event). The `tracecore.kineto.unknown_category` counter is the separate per-receiver custom metric for fallthrough categories (kept distinct so the `kineto.category` label does not pollute cross-receiver dashboards).
- `fr_trace`-*inspired* aggregation toggle (semantic analogy only — the upstream `tools/flight_recorder/fr_trace.py` introduced in PyTorch 2.5 aggregates NCCL collectives across ranks, not repeated `cpu_op` spans; see [Flight Recorder tutorial](https://docs.pytorch.org/tutorials/unstable/flight_recorder_tutorial.html)): consecutive `cpu_op` spans with identical `name` within per-step window collapse with `repeat.count`; gated by `aggregate: true` config. RFC-0012 §Aggregation explicitly disclaims port-of-upstream semantics; this is a tracecore-defined emission-volume reduction motivated by NORTHSTARS O2 per-receiver budget.
- Honors `delete_after_ingest: bool` (default `false`); when true, source unlinked only after successful OTLP ack.

**Non-functional rubrics:**
- Sustained CPU ≤0.10% via `Getrusage(RUSAGE_SELF)` delta while ingesting one 50 MB `.pt.trace.json` every 60s for 10 min (tracecore-defined fixture cadence — proxy for short-window profiling; CI matrix expands when production-comparable fixtures land). CI ceiling 5× — tracecore-defined; matches M13's shared-runner variance multiplier. (per NORTHSTARS O2)
- Sustained egress ≤0.5 Mbps via counting OTLP sink over same 10-min run at 1% sampling.
- RSS ≤30 MB via `/proc/self/status` `VmRSS` over soak ingesting 100 sequential trace files (tracecore-defined soak depth; configurable via `soak_count:` env override); asserts streaming JSON decode.
- Streaming JSON decoder (`encoding/json.Decoder` token mode or `jsoniter` streaming); ingest 2 GB synthetic trace; peak RSS <80 MB *(unverified target; measurement required)*. Defends against unbounded memory growth on large Kineto traces — upstream reports range 300 MB single-session to 2.2 GB per-epoch per [pytorch#130622](https://github.com/pytorch/pytorch/issues/130622) thread (note: that issue's root cause is UTF-8 corruption in `torch._dynamo` profiling output, not decoder memory; cited here as the canonical "trace files get huge" reference).
- Every parse + emit cycle wrapped in `internal/safe.Call("kineto.ingest", …)`. (per PRINCIPLES §1, §9)
- Receiver never writes back to trace file or GPU memory; opens source `O_RDONLY`; `strace` asserts no `open(..., O_RDWR)`.
- Shutdown ≤1s p99 from SIGTERM mid-parse: parser respects `ctx.Done()` at top-level `traceEvents[]` element boundary.
- Sustained CPU ≤0.10% via `Getrusage(RUSAGE_SELF)` delta while ingesting one 50 MB `.pt.trace.json` every 60s for 10 min (tracecore-defined fixture cadence — proxy for short-window profiling; CI matrix expands when production-comparable fixtures land). CI ceiling 5× — tracecore-defined; matches M13's shared-runner variance multiplier. (per NORTHSTARS O2)
- Sustained egress ≤0.5 Mbps via counting OTLP sink over same 10-min run at 1% sampling.
- RSS ≤30 MB via `/proc/self/status` `VmRSS` over soak ingesting 100 sequential trace files (tracecore-defined soak depth; configurable via `soak_count:` env override); asserts streaming JSON decode.
- Streaming JSON decoder (`encoding/json.Decoder` token mode); ingest 2 GB synthetic trace; peak HeapAlloc <80 MB *(unverified target; measurement required)*. Defends against unbounded memory growth on large Kineto traces — upstream reports range 300 MB single-session to 2.2 GB per-epoch per [pytorch#130622](https://github.com/pytorch/pytorch/issues/130622) thread (note: that issue's root cause is UTF-8 corruption in `torch._dynamo` profiling output, not decoder memory; cited here as the canonical "trace files get huge" reference).
- Every parse + emit cycle wrapped in `internal/safe.Call("kineto.ingest", …)`. (per PRINCIPLES §1, §9)
- Receiver opens source files with `os.O_RDONLY`; carry-forward strace-based lint assertion (`tools/kineto-lint/strace_test.go`) on the FOLLOWUPS list, trigger fires on any future change to `ingest.go`'s `os.Open` line.
- Shutdown ≤1s p99 from receiver-ctx cancellation: `TestShutdown_RespectsDeadline` asserts the receiver returns within 1.5s of a 1s-deadline Shutdown call.

### M18. Pattern #6: stragglers from slow node

Expand Down
15 changes: 12 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: help build run test test-extras test-extras-sustained test-extras-fuzz test-extras-fuzz-kmsg test-extras-fuzz-journald test-extras-fuzz-nccl-fr test-extras-race bench bench-check fmt fmt-fix vet lint lint-fix tidy tidy-check mod-verify license-check license-fix govulncheck dco-check hooks clean check ci ci-fuzz-nccl-fr nccl-fr-rce-gate register-lint actionlint zizmor generate generate-check generate-fixtures coverage coverage-check doc-check no-autoupdate-check smoke build-tags
.PHONY: help build run test test-extras test-extras-sustained test-extras-fuzz test-extras-fuzz-kmsg test-extras-fuzz-journald test-extras-fuzz-nccl-fr test-extras-race bench bench-check fmt fmt-fix vet lint lint-fix tidy tidy-check mod-verify license-check license-fix govulncheck dco-check hooks clean check ci ci-fuzz-nccl-fr nccl-fr-rce-gate register-lint actionlint zizmor generate generate-check generate-fixtures generate-fixtures-check coverage coverage-check doc-check no-autoupdate-check smoke build-tags

BIN := tracecore
PKG := ./cmd/tracecore
Expand Down Expand Up @@ -104,8 +104,17 @@ generate: ## Regenerate cmd/tracecore/components.go from components.yaml.
go run ./tools/components-gen -manifest=./components.yaml -out=./cmd/tracecore/components.go
go tool gofumpt -w ./cmd/tracecore/components.go

generate-fixtures: ## (Re)generate pkg/nccl/fr_parser/testdata/*.pkl and *.golden.json from synthesize.go. Byte-identical across runs on linux/amd64.
generate-fixtures: ## (Re)generate pkg/nccl/fr_parser/testdata/*.pkl and pkg/kineto/testdata/*.json from synthesize.go. Byte-identical across runs on linux/amd64.
go run ./tools/genfixtures -out=pkg/nccl/fr_parser/testdata
go test -tags fixturegen -run 'TestGenerateFixtures|TestGenerateGolden' ./pkg/kineto/
@cd pkg/kineto/testdata && shasum -a 256 toy_2step.pt.trace.json toy_2step.golden.json > SHA256SUMS

generate-fixtures-check: ## Fail if `make generate-fixtures` would produce a diff (CI gate).
@$(MAKE) -s generate-fixtures
@if ! git diff --quiet -- pkg/kineto/testdata/ pkg/nccl/fr_parser/testdata/; then \
echo "Fixture drift detected; run 'make generate-fixtures' and commit the result."; \
git --no-pager diff -- pkg/kineto/testdata/ pkg/nccl/fr_parser/testdata/; exit 1; \
fi

generate-check: ## Fail if `make generate` would produce a diff (CI gate).
@$(MAKE) -s generate
Expand Down Expand Up @@ -222,7 +231,7 @@ no-autoupdate-check: ## Enforce RFC-0008: cmd/, components/, internal/, pkg/ co
zizmor: ## Security-lint GitHub Actions workflows (template injection, untrusted-input-in-script, over-broad permissions, cache poisoning). Gates at --min-severity=high.
@scripts/zizmor.sh

ci: license-check generate-check vet build-tags tidy-check mod-verify lint nccl-fr-rce-gate register-lint actionlint zizmor coverage-check ci-fuzz-nccl-fr govulncheck doc-check no-autoupdate-check build ## Everything CI runs. Run before opening a PR.
ci: license-check generate-check generate-fixtures-check vet build-tags tidy-check mod-verify lint nccl-fr-rce-gate register-lint actionlint zizmor coverage-check ci-fuzz-nccl-fr govulncheck doc-check no-autoupdate-check build ## Everything CI runs. Run before opening a PR.

smoke: build ## End-to-end smoke test: validate the dcgm example config, run the binary for 1.5s, kill, assert lifecycle logs appear. No hardware required; receiver degrades cleanly on macOS/CI.
@scripts/smoke.sh
Expand Down
92 changes: 92 additions & 0 deletions bench/overhead/kineto_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// SPDX-License-Identifier: Apache-2.0

// Package overhead exercises tracecore receivers against NORTHSTARS O2
// per-receiver budgets. See MILESTONES.md M14 non-functional rubrics.
package overhead

import (
"bytes"
"fmt"
"os"
"path/filepath"
"syscall"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/tracecoreai/tracecore/pkg/kineto"
)

// BenchmarkKinetoIngest_50MBOver10Min stands in for the M14 non-functional
// CPU rubric: ingest one 50 MB trace every 60s for 10 min; assert sustained
// CPU under 0.50% (5x of the 0.10% target; tracecore-defined ceiling per
// shared-runner variance, matching M13's multiplier).
//
// Run with `go test -bench=BenchmarkKinetoIngest_50MBOver10Min -benchtime
// 1x -timeout 15m -run='^$' ./bench/overhead/`. Skips under -short.
func BenchmarkKinetoIngest_50MBOver10Min(b *testing.B) {
if testing.Short() {
b.Skip("skipping 10-min ingest under -short")
}
dir := b.TempDir()
tracePath := filepath.Join(dir, "host_1234.1.pt.trace.json")
require.NoError(b, writeSynth50MB(tracePath))

var startRU, endRU syscall.Rusage
require.NoError(b, syscall.Getrusage(syscall.RUSAGE_SELF, &startRU))
wallStart := time.Now()

const window = 10 * time.Minute
const period = 60 * time.Second
deadline := wallStart.Add(window)
ticker := time.NewTicker(period)
defer ticker.Stop()
// First parse immediately, then every period until deadline.
for ; time.Now().Before(deadline); <-ticker.C {
require.NoError(b, parseOnce(tracePath))
}
require.NoError(b, syscall.Getrusage(syscall.RUSAGE_SELF, &endRU))
cpuSec := float64(endRU.Utime.Sec-startRU.Utime.Sec) + float64(endRU.Utime.Usec-startRU.Utime.Usec)/1e6
cpuSec += float64(endRU.Stime.Sec-startRU.Stime.Sec) + float64(endRU.Stime.Usec-startRU.Stime.Usec)/1e6
wallSec := time.Since(wallStart).Seconds()
cpuPct := cpuSec / wallSec * 100
b.Logf("CPU%% over %s: %.4f (target ≤0.10%%, ceiling 0.50%%)", time.Since(wallStart), cpuPct)
require.Less(b, cpuPct, 0.50, "kineto CPU exceeds 5x target ceiling")
}

func writeSynth50MB(path string) error {
const eventsPer50MB = 50 * 1024 * 1024 / 140
steps := make([]kineto.StepSpec, 0, eventsPer50MB/10)
for i := uint64(0); i < uint64(eventsPer50MB/10); i++ {
evs := make([]kineto.Event, 10)
for j := range evs {
evs[j] = kineto.Event{
Name: "aten::addmm", Cat: kineto.CatCPUOp, Ph: kineto.PhaseComplete,
//nolint:gosec // bounded loop index
Ts: int64(i*1000 + uint64(j)*10), Dur: 5, Pid: 1234, Tid: 1234,
}
}
//nolint:gosec // bounded loop index
steps = append(steps, kineto.StepSpec{StepID: i, StartTs: int64(i * 1000), EndTs: int64(i*1000 + 999), Events: evs})
}
f, err := os.Create(path) //nolint:gosec // test temp file
if err != nil {
return fmt.Errorf("create %s: %w", path, err)
}
defer func() { _ = f.Close() }()
if err := kineto.Synthesize(kineto.Spec{SchemaVersion: 1.0, Pid: 1234, Tid: 1234, Steps: steps}, f); err != nil {
return fmt.Errorf("synthesize %s: %w", path, err)
}
return nil
}

func parseOnce(path string) error {
data, err := os.ReadFile(path) //nolint:gosec // test temp file
if err != nil {
return fmt.Errorf("read %s: %w", path, err)
}
if err := kineto.Parse(bytes.NewReader(data), func(kineto.Event) error { return nil }); err != nil {
return fmt.Errorf("parse %s: %w", path, err)
}
return nil
}
2 changes: 2 additions & 0 deletions cmd/tracecore/components.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions components.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ receivers:
package: github.com/tracecoreai/tracecore/components/receivers/kernelevents
- type: k8s_events
package: github.com/tracecoreai/tracecore/components/receivers/k8sevents
- type: kineto
package: github.com/tracecoreai/tracecore/components/receivers/kineto
- type: nccl_fr
package: github.com/tracecoreai/tracecore/components/receivers/nccl_fr
- type: pyspy
Expand Down
Loading
Loading