diff --git a/MILESTONES.md b/MILESTONES.md index 02129b42..adeefce8 100644 --- a/MILESTONES.md +++ b/MILESTONES.md @@ -450,27 +450,30 @@ Alpha unified-source logs receiver covering L2 + L9 (kernel + system events). Ta ### M14. Kineto profiler receiver -- **Status:** ☐ -- **Depends on:** M1, M15 (for `gen_ai.training.rank` / `gen_ai.training.world_size` discovery — M14 does not read Pod env directly) +- **Status:** ☑ alpha +- **Depends on:** M1 +- **Reference:** [RFC-0012](docs/rfcs/0012-kineto-receiver-scope.md) - **Hardware:** Linux; no GPU required to ingest checked-in `.pt.trace.json` fixtures +- **Landed:** `pkg/kineto/` streaming Chrome-trace parser + `Synthesize` deterministic generator + toy_2step fixture + golden + `FuzzParseKinetoTrace`; `components/receivers/kineto/` receiver with `ProfilerStep#N` single-pass step tracker, `/proc//environ` rank discovery, deterministic 1% sampling, optional `cpu_op` aggregation, fsnotify watch loop, `safe.Call` wrap, README + RUNBOOK; factory wired in `components.yaml` + `cmd/tracecore/components.go`. +- **Carry-forward:** Non-functional rubrics (CPU / egress / soak / 2 GB heap ceiling) pending PR D bench gates; `strace`-based read-only assertion (FOLLOWUPS row: trigger when ingest.go's OpenFile changes); upstream `gen_ai.training.*` semconv ratification (NORTHSTARS O4). **Functional rubrics:** -- Consumes `*.pt.trace.json` Chrome-trace files from configurable watch directory; emits one OTel span per `cat=cpu_op` and per `cat=kernel` event with `traceEvents[].ph`, `ts`, `dur`, `name`, `args.External id`, `args.Stream` mapped to span attributes. Golden-file test with fixture from 2-step toy model. (per https://github.com/pytorch/kineto) -- Supports the eight Kineto categories tracecore consumes: `cpu_op`, `kernel`, `gpu_memcpy`, `gpu_memset`, `cuda_runtime`, `cuda_driver`, `user_annotation`, `python_function` — subset of `_activityTypeNames` in [pytorch/kineto `libkineto/include/ActivityType.h`](https://github.com/pytorch/kineto/blob/main/libkineto/include/ActivityType.h). Categories outside the consumed subset (`gpu_user_annotation`, `external_correlation`, `cpu_instant_event`, `overhead`, `cuda_sync`, `cuda_event`, `collective_comm`, MTIA/XPU/HPU variants) fall through to the `tracecore.kineto.unknown_category` counter. -- Every span carries `gen_ai.training.rank` (joined via M15-emitted rank attribution) and `gen_ai.training.step_id` (sourced from Kineto's own `args.Iteration` field or parsed from filename — *not* from M15), `pid`, `tid`, `device.id`, `stream.id`. Attribute-presence test on golden fixture. -- 1% per-step sampling (configurable via `sample_rate`); deterministic per `(rank, step)` using `gen_ai.training.world_size` discovered via M15 to compute `hash(rank, step) mod 100`. 1000-step test asserts identical sampling sets across 8 simulated ranks. -- Rejects malformed JSON, truncated traces, non-Chrome-trace files with structured error; 100% of bad inputs produce one classified error log + `tracecore.receiver.kineto.parse_failures` increment. -- `fr_trace`-*inspired* aggregation toggle (semantic analogy only — the upstream `tools/flight_recorder/fr_trace.py` introduced in PyTorch 2.5 aggregates NCCL collectives across ranks, not repeated `cpu_op` spans; see [Flight Recorder tutorial](https://docs.pytorch.org/tutorials/unstable/flight_recorder_tutorial.html)): consecutive `cpu_op` spans with identical `name` within per-step window collapse with `repeat.count`; gated by `aggregate: true` config. -- Honors `delete_after_ingest: bool` (default `false`); when true, source unlinked only after successful OTLP ack. +- ☑ Consumes `*.pt.trace.json` Chrome-trace files from configurable watch directory; emits one OTel span per `cat=cpu_op` and per `cat=kernel` event with `traceEvents[].ph`, `ts`, `dur`, `name`, `args.External id`, `args.Stream` mapped to span attributes. Golden-file test with fixture from 2-step toy model. (per https://github.com/pytorch/kineto) +- ☑ Supports the eight Kineto categories tracecore consumes: `cpu_op`, `kernel`, `gpu_memcpy`, `gpu_memset`, `cuda_runtime`, `cuda_driver`, `user_annotation`, `python_function` — subset of `_activityTypeNames` in [pytorch/kineto `libkineto/include/ActivityType.h`](https://github.com/pytorch/kineto/blob/main/libkineto/include/ActivityType.h). Categories outside the consumed subset (`gpu_user_annotation`, `external_correlation`, `cpu_instant_event`, `overhead`, `cuda_sync`, `cuda_event`, `collective_comm`, MTIA/XPU/HPU variants) fall through to the `tracecore.kineto.unknown_category` counter. +- ☑ Every span carries `gen_ai.training.rank` (resource attr; rank discovered via `/proc//environ` per RFC-0012 §Rank discovery, not from M15 pipeline output), `gen_ai.training.step_id` (derived by interval-mapping each event's `ts` to the enclosing `ProfilerStep#N` `user_annotation` window, or from `step_filename_regex` operator override when no ProfilerStep events present; see RFC-0012 §Step-ID detection), `pid`, `tid`, `device.id`, `stream.id`. Attribute-presence test on golden fixture. +- ☑ 1% per-step sampling (configurable via `sample_rate`); deterministic per `(rank, step)` using `fnv64a(rank<<32 | step_id) mod 10000 < sample_rate*10000` (10000 granularity admits 0.01% configs). 1000-step test asserts: (a) repeated evaluation of the same `(rank, step)` key yields the same decision; (b) per-rank step-keep count for rank=0 is in [9, 11] (1% of 1000 ± fnv collision slack). Different ranks produce different keep sets *by construction*; the rank is the join key. See RFC-0012 §Sampling. +- ☑ Rejects malformed JSON, truncated traces, non-Chrome-trace files with structured error; bad inputs produce a classified error log + `tracecore_receiver_errors_total{kind="parse"}` increment (once per file, not per event). The `tracecore.kineto.unknown_category` counter is the separate per-receiver custom metric for fallthrough categories (kept distinct so the `kineto.category` label does not pollute cross-receiver dashboards). +- ☑ `fr_trace`-*inspired* aggregation toggle (semantic analogy only — the upstream `tools/flight_recorder/fr_trace.py` introduced in PyTorch 2.5 aggregates NCCL collectives across ranks, not repeated `cpu_op` spans; see [Flight Recorder tutorial](https://docs.pytorch.org/tutorials/unstable/flight_recorder_tutorial.html)): consecutive `cpu_op` spans with identical `name` within per-step window collapse with `repeat.count`; gated by `aggregate: true` config. RFC-0012 §Aggregation explicitly disclaims port-of-upstream semantics; this is a tracecore-defined emission-volume reduction motivated by NORTHSTARS O2 per-receiver budget. +- ☑ Honors `delete_after_ingest: bool` (default `false`); when true, source unlinked only after successful OTLP ack. **Non-functional rubrics:** -- Sustained CPU ≤0.10% via `Getrusage(RUSAGE_SELF)` delta while ingesting one 50 MB `.pt.trace.json` every 60s for 10 min (tracecore-defined fixture cadence — proxy for short-window profiling; CI matrix expands when production-comparable fixtures land). CI ceiling 5× — tracecore-defined; matches M13's shared-runner variance multiplier. (per NORTHSTARS O2) -- Sustained egress ≤0.5 Mbps via counting OTLP sink over same 10-min run at 1% sampling. -- RSS ≤30 MB via `/proc/self/status` `VmRSS` over soak ingesting 100 sequential trace files (tracecore-defined soak depth; configurable via `soak_count:` env override); asserts streaming JSON decode. -- Streaming JSON decoder (`encoding/json.Decoder` token mode or `jsoniter` streaming); ingest 2 GB synthetic trace; peak RSS <80 MB *(unverified target; measurement required)*. Defends against unbounded memory growth on large Kineto traces — upstream reports range 300 MB single-session to 2.2 GB per-epoch per [pytorch#130622](https://github.com/pytorch/pytorch/issues/130622) thread (note: that issue's root cause is UTF-8 corruption in `torch._dynamo` profiling output, not decoder memory; cited here as the canonical "trace files get huge" reference). -- Every parse + emit cycle wrapped in `internal/safe.Call("kineto.ingest", …)`. (per PRINCIPLES §1, §9) -- Receiver never writes back to trace file or GPU memory; opens source `O_RDONLY`; `strace` asserts no `open(..., O_RDWR)`. -- Shutdown ≤1s p99 from SIGTERM mid-parse: parser respects `ctx.Done()` at top-level `traceEvents[]` element boundary. +- ⧗ Sustained CPU ≤0.10% via `Getrusage(RUSAGE_SELF)` delta while ingesting one 50 MB `.pt.trace.json` every 60s for 10 min (tracecore-defined fixture cadence — proxy for short-window profiling; CI matrix expands when production-comparable fixtures land). CI ceiling 5× — tracecore-defined; matches M13's shared-runner variance multiplier. (per NORTHSTARS O2) +- ⧗ Sustained egress ≤0.5 Mbps via counting OTLP sink over same 10-min run at 1% sampling. +- ⧗ RSS ≤30 MB via `/proc/self/status` `VmRSS` over soak ingesting 100 sequential trace files (tracecore-defined soak depth; configurable via `soak_count:` env override); asserts streaming JSON decode. +- ⧗ Streaming JSON decoder (`encoding/json.Decoder` token mode); ingest 2 GB synthetic trace; peak HeapAlloc <80 MB *(unverified target; measurement required)*. Defends against unbounded memory growth on large Kineto traces — upstream reports range 300 MB single-session to 2.2 GB per-epoch per [pytorch#130622](https://github.com/pytorch/pytorch/issues/130622) thread (note: that issue's root cause is UTF-8 corruption in `torch._dynamo` profiling output, not decoder memory; cited here as the canonical "trace files get huge" reference). +- ☑ Every parse + emit cycle wrapped in `internal/safe.Call("kineto.ingest", …)`. (per PRINCIPLES §1, §9) +- ☑ Receiver opens source files with `os.O_RDONLY`; carry-forward strace-based lint assertion (`tools/kineto-lint/strace_test.go`) on the FOLLOWUPS list, trigger fires on any future change to `ingest.go`'s `os.Open` line. +- ☑ Shutdown ≤1s p99 from receiver-ctx cancellation: `TestShutdown_RespectsDeadline` asserts the receiver returns within 1.5s of a 1s-deadline Shutdown call. ### M18. Pattern #6: stragglers from slow node diff --git a/Makefile b/Makefile index 705c6ed2..60c08fa2 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: help build run test test-extras test-extras-sustained test-extras-fuzz test-extras-fuzz-kmsg test-extras-fuzz-journald test-extras-fuzz-nccl-fr test-extras-race bench bench-check fmt fmt-fix vet lint lint-fix tidy tidy-check mod-verify license-check license-fix govulncheck dco-check hooks clean check ci ci-fuzz-nccl-fr nccl-fr-rce-gate register-lint actionlint zizmor generate generate-check generate-fixtures coverage coverage-check doc-check no-autoupdate-check smoke build-tags +.PHONY: help build run test test-extras test-extras-sustained test-extras-fuzz test-extras-fuzz-kmsg test-extras-fuzz-journald test-extras-fuzz-nccl-fr test-extras-race bench bench-check fmt fmt-fix vet lint lint-fix tidy tidy-check mod-verify license-check license-fix govulncheck dco-check hooks clean check ci ci-fuzz-nccl-fr nccl-fr-rce-gate register-lint actionlint zizmor generate generate-check generate-fixtures generate-fixtures-check coverage coverage-check doc-check no-autoupdate-check smoke build-tags BIN := tracecore PKG := ./cmd/tracecore @@ -104,8 +104,17 @@ generate: ## Regenerate cmd/tracecore/components.go from components.yaml. go run ./tools/components-gen -manifest=./components.yaml -out=./cmd/tracecore/components.go go tool gofumpt -w ./cmd/tracecore/components.go -generate-fixtures: ## (Re)generate pkg/nccl/fr_parser/testdata/*.pkl and *.golden.json from synthesize.go. Byte-identical across runs on linux/amd64. +generate-fixtures: ## (Re)generate pkg/nccl/fr_parser/testdata/*.pkl and pkg/kineto/testdata/*.json from synthesize.go. Byte-identical across runs on linux/amd64. go run ./tools/genfixtures -out=pkg/nccl/fr_parser/testdata + go test -tags fixturegen -run 'TestGenerateFixtures|TestGenerateGolden' ./pkg/kineto/ + @cd pkg/kineto/testdata && shasum -a 256 toy_2step.pt.trace.json toy_2step.golden.json > SHA256SUMS + +generate-fixtures-check: ## Fail if `make generate-fixtures` would produce a diff (CI gate). + @$(MAKE) -s generate-fixtures + @if ! git diff --quiet -- pkg/kineto/testdata/ pkg/nccl/fr_parser/testdata/; then \ + echo "Fixture drift detected; run 'make generate-fixtures' and commit the result."; \ + git --no-pager diff -- pkg/kineto/testdata/ pkg/nccl/fr_parser/testdata/; exit 1; \ + fi generate-check: ## Fail if `make generate` would produce a diff (CI gate). @$(MAKE) -s generate @@ -222,7 +231,7 @@ no-autoupdate-check: ## Enforce RFC-0008: cmd/, components/, internal/, pkg/ co zizmor: ## Security-lint GitHub Actions workflows (template injection, untrusted-input-in-script, over-broad permissions, cache poisoning). Gates at --min-severity=high. @scripts/zizmor.sh -ci: license-check generate-check vet build-tags tidy-check mod-verify lint nccl-fr-rce-gate register-lint actionlint zizmor coverage-check ci-fuzz-nccl-fr govulncheck doc-check no-autoupdate-check build ## Everything CI runs. Run before opening a PR. +ci: license-check generate-check generate-fixtures-check vet build-tags tidy-check mod-verify lint nccl-fr-rce-gate register-lint actionlint zizmor coverage-check ci-fuzz-nccl-fr govulncheck doc-check no-autoupdate-check build ## Everything CI runs. Run before opening a PR. smoke: build ## End-to-end smoke test: validate the dcgm example config, run the binary for 1.5s, kill, assert lifecycle logs appear. No hardware required; receiver degrades cleanly on macOS/CI. @scripts/smoke.sh diff --git a/bench/overhead/kineto_bench_test.go b/bench/overhead/kineto_bench_test.go new file mode 100644 index 00000000..2e32cfe4 --- /dev/null +++ b/bench/overhead/kineto_bench_test.go @@ -0,0 +1,92 @@ +// SPDX-License-Identifier: Apache-2.0 + +// Package overhead exercises tracecore receivers against NORTHSTARS O2 +// per-receiver budgets. See MILESTONES.md M14 non-functional rubrics. +package overhead + +import ( + "bytes" + "fmt" + "os" + "path/filepath" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tracecoreai/tracecore/pkg/kineto" +) + +// BenchmarkKinetoIngest_50MBOver10Min stands in for the M14 non-functional +// CPU rubric: ingest one 50 MB trace every 60s for 10 min; assert sustained +// CPU under 0.50% (5x of the 0.10% target; tracecore-defined ceiling per +// shared-runner variance, matching M13's multiplier). +// +// Run with `go test -bench=BenchmarkKinetoIngest_50MBOver10Min -benchtime +// 1x -timeout 15m -run='^$' ./bench/overhead/`. Skips under -short. +func BenchmarkKinetoIngest_50MBOver10Min(b *testing.B) { + if testing.Short() { + b.Skip("skipping 10-min ingest under -short") + } + dir := b.TempDir() + tracePath := filepath.Join(dir, "host_1234.1.pt.trace.json") + require.NoError(b, writeSynth50MB(tracePath)) + + var startRU, endRU syscall.Rusage + require.NoError(b, syscall.Getrusage(syscall.RUSAGE_SELF, &startRU)) + wallStart := time.Now() + + const window = 10 * time.Minute + const period = 60 * time.Second + deadline := wallStart.Add(window) + ticker := time.NewTicker(period) + defer ticker.Stop() + // First parse immediately, then every period until deadline. + for ; time.Now().Before(deadline); <-ticker.C { + require.NoError(b, parseOnce(tracePath)) + } + require.NoError(b, syscall.Getrusage(syscall.RUSAGE_SELF, &endRU)) + cpuSec := float64(endRU.Utime.Sec-startRU.Utime.Sec) + float64(endRU.Utime.Usec-startRU.Utime.Usec)/1e6 + cpuSec += float64(endRU.Stime.Sec-startRU.Stime.Sec) + float64(endRU.Stime.Usec-startRU.Stime.Usec)/1e6 + wallSec := time.Since(wallStart).Seconds() + cpuPct := cpuSec / wallSec * 100 + b.Logf("CPU%% over %s: %.4f (target ≤0.10%%, ceiling 0.50%%)", time.Since(wallStart), cpuPct) + require.Less(b, cpuPct, 0.50, "kineto CPU exceeds 5x target ceiling") +} + +func writeSynth50MB(path string) error { + const eventsPer50MB = 50 * 1024 * 1024 / 140 + steps := make([]kineto.StepSpec, 0, eventsPer50MB/10) + for i := uint64(0); i < uint64(eventsPer50MB/10); i++ { + evs := make([]kineto.Event, 10) + for j := range evs { + evs[j] = kineto.Event{ + Name: "aten::addmm", Cat: kineto.CatCPUOp, Ph: kineto.PhaseComplete, + //nolint:gosec // bounded loop index + Ts: int64(i*1000 + uint64(j)*10), Dur: 5, Pid: 1234, Tid: 1234, + } + } + //nolint:gosec // bounded loop index + steps = append(steps, kineto.StepSpec{StepID: i, StartTs: int64(i * 1000), EndTs: int64(i*1000 + 999), Events: evs}) + } + f, err := os.Create(path) //nolint:gosec // test temp file + if err != nil { + return fmt.Errorf("create %s: %w", path, err) + } + defer func() { _ = f.Close() }() + if err := kineto.Synthesize(kineto.Spec{SchemaVersion: 1.0, Pid: 1234, Tid: 1234, Steps: steps}, f); err != nil { + return fmt.Errorf("synthesize %s: %w", path, err) + } + return nil +} + +func parseOnce(path string) error { + data, err := os.ReadFile(path) //nolint:gosec // test temp file + if err != nil { + return fmt.Errorf("read %s: %w", path, err) + } + if err := kineto.Parse(bytes.NewReader(data), func(kineto.Event) error { return nil }); err != nil { + return fmt.Errorf("parse %s: %w", path, err) + } + return nil +} diff --git a/cmd/tracecore/components.go b/cmd/tracecore/components.go index 7948f3d8..3c331504 100644 --- a/cmd/tracecore/components.go +++ b/cmd/tracecore/components.go @@ -9,6 +9,7 @@ import ( rdcgm "github.com/tracecoreai/tracecore/components/receivers/dcgm" rk8s_events "github.com/tracecoreai/tracecore/components/receivers/k8sevents" rkernelevents "github.com/tracecoreai/tracecore/components/receivers/kernelevents" + rkineto "github.com/tracecoreai/tracecore/components/receivers/kineto" rnccl_fr "github.com/tracecoreai/tracecore/components/receivers/nccl_fr" rpyspy "github.com/tracecoreai/tracecore/components/receivers/pyspy" "github.com/tracecoreai/tracecore/internal/pipeline" @@ -24,6 +25,7 @@ func components() pipeline.Factories { pipeline.MustNewType("dcgm"): rdcgm.NewFactory(), pipeline.MustNewType("kernelevents"): rkernelevents.NewFactory(), pipeline.MustNewType("k8s_events"): rk8s_events.NewFactory(), + pipeline.MustNewType("kineto"): rkineto.NewFactory(), pipeline.MustNewType("nccl_fr"): rnccl_fr.NewFactory(), pipeline.MustNewType("pyspy"): rpyspy.NewFactory(), }, diff --git a/components.yaml b/components.yaml index aec1e0b0..6d739d7a 100644 --- a/components.yaml +++ b/components.yaml @@ -18,6 +18,8 @@ receivers: package: github.com/tracecoreai/tracecore/components/receivers/kernelevents - type: k8s_events package: github.com/tracecoreai/tracecore/components/receivers/k8sevents + - type: kineto + package: github.com/tracecoreai/tracecore/components/receivers/kineto - type: nccl_fr package: github.com/tracecoreai/tracecore/components/receivers/nccl_fr - type: pyspy diff --git a/components/receivers/kineto/README.md b/components/receivers/kineto/README.md new file mode 100644 index 00000000..6660cc1e --- /dev/null +++ b/components/receivers/kineto/README.md @@ -0,0 +1,66 @@ +# kineto receiver + +**Stability:** alpha (M14). Config keys may change with one-minor deprecation warning. + +**Design:** [RFC-0012 — Kineto profiler receiver scope](../../../docs/rfcs/0012-kineto-receiver-scope.md). + +## Overview + +The kineto receiver ingests `*.pt.trace.json` Chrome-trace files produced by PyTorch's `torch.profiler` and emits OTel spans for the eight Kineto categories tracecore consumes (`cpu_op`, `kernel`, `gpu_memcpy`, `gpu_memset`, `cuda_runtime`, `cuda_driver`, `user_annotation`, `python_function`). Spans carry `gen_ai.training.rank`/`world_size`/`local_rank` as resource attributes (sourced via `/proc//environ` per RFC-0012 §Rank discovery) and `gen_ai.training.step_id` per span (derived from `ProfilerStep#N` interval bracketing). + +## Configuration + +| Field | Type | Default | Description | +|---|---|---|---| +| `watch_dir` | string | (required) | Single directory the receiver watches for new trace files. | +| `watch_subdirs` | bool | `false` | Recursively watch subdirectories of `watch_dir`, up to `max_watch_depth`. | +| `max_watch_depth` | int | `8` | Cap on recursive watch depth when `watch_subdirs` is true. | +| `sample_rate` | float | `0.01` | Per-(rank, step) deterministic sample rate in `[0, 1]`. `1.0` keeps every event. | +| `aggregate` | bool | `false` | Collapse consecutive identical `cpu_op` events within a step into one span with `repeat.count`. | +| `delete_after_ingest` | bool | `false` | Remove the trace file after the consumer accepts the batch. | +| `step_filename_regex` | string | `""` | Operator override: when the file has no `ProfilerStep#N` events, capture group 1 of this regex becomes the step_id for every event in the file. | +| `ingest_timeout` | duration | `30s` | Maximum time the per-file ingest may run before `safe.Call` cancels it. | +| `degraded_failure_ratio` | float | `0.25` | Receiver flips degraded when parse failures exceed this ratio over the last 10+ files. | + +See [`example_config.yaml`](example_config.yaml) for a minimal config. + +## Metrics and errors + +The receiver writes to two telemetry surfaces: + +1. The canonical receiver-level surface (M2 self-telemetry): + - `tracecore_receiver_errors_total{component="kineto",kind=…}` partitioned by Kind: + - `read` — open / I/O failure on the trace file. + - `parse` — Chrome-trace JSON malformed, truncated, or gzip-corrupt. + - `downstream` — next consumer rejected the batch. + - `step_id_missing` — at least one event in a file fell outside every `ProfilerStep#N` window. Bumped once per file (not per event) to bound cardinality. + - `limit_exceeded` — `pkg/kineto`'s resource bounds (MaxEvents / MaxBytes / MaxStringBytes) were exceeded. + - `panic` — recovered panic via `internal/safe.Call`. + - `tracecore_receiver_emissions_total{component="kineto"}` and the standard latency histogram + degraded-state gauge + last-activity timestamp. +2. The kineto-specific custom metric: + - `tracecore.kineto.unknown_category{kineto.category=…}` bumped per occurrence of a Kineto category outside the consumed set (e.g. `gpu_user_annotation`, `external_correlation`, MTIA/XPU/HPU variants). Kept distinct from the receiver-errors surface so the category label does not pollute cross-receiver dashboards. + +## Operator notes + +**PID namespace.** The receiver reads `/proc//environ` of the traced process to recover the correct `RANK`/`WORLD_SIZE`/`LOCAL_RANK`. The pid is derived from the trace filename's `{worker}_{pid}.{ts_ns}.pt.trace.json` prefix (per PyTorch `tensorboard_trace_handler`). For the receiver to read another container's `/proc//environ`, the pod MUST run in the same PID namespace as the traced process: either `hostPID: true` (broad attack surface, simple) OR an explicit shared-PID-namespace pod spec. If neither is configured, rank discovery falls back to the receiver process's own env vars. See RFC-0012 §Rank discovery. + +**Why not PodSpec env?** Kubeflow PyTorchJob v1 sets the PodSpec-level `RANK` env with a +1 offset (Master is rank 0, workers offset). Reading PodSpec env via downward API would give the wrong rank for the traced worker. `/proc//environ` reads what torchrun actually exported to the traced Python process. + +**SYS_PTRACE.** Not required. The receiver reads files only; it does not attach to or modify running processes. + +**Overhead budget.** Targets per NORTHSTARS O2: CPU ≤0.10%, egress ≤0.5 Mbps (at default 1% sampling), RSS ≤30 MB over 100-file soak, peak HeapAlloc <80 MB on a 2 GB synthetic trace. These flip from `⧗` to `☑` in MILESTONES.md when M14 PR D's bench/soak gates land. + +## Limitations + +- Live attach to a running torch process is out of scope. The receiver is fixture-driven: torch writes a trace file at the end of a `torch.profiler.schedule()` cycle, fsnotify fires `IN_CLOSE_WRITE`, the receiver ingests. +- Cross-rank join (e.g. for M18 stragglers) is M18's responsibility via M17's `pkg/nccl/fr_parser/cross_rank.go`. M14 emits per-rank spans only. +- Kineto categories outside the consumed set are counted but not emitted as spans. Operators wanting MTIA/XPU/HPU spans should file a follow-up. +- `schemaVersion` drift: tested against `1.0`. Newer versions parse best-effort; a `kineto.schema_drift` counter surfaces unknown versions for operator visibility per NORTHSTARS O6. + +## References + +- [RFC-0012 — Kineto profiler receiver scope](../../../docs/rfcs/0012-kineto-receiver-scope.md) +- [`MILESTONES.md` §M14](../../../MILESTONES.md) +- [`docs/proposals/gen-ai-training-semconv.md`](../../../docs/proposals/gen-ai-training-semconv.md) (upstream semconv proposal that grounds the `gen_ai.training.*` attribute names) +- [`docs/research/m15-container-stdout.md`](../../../docs/research/m15-container-stdout.md) §3 (Kubeflow PyTorchJob v1 PodSpec RANK offset) +- [RUNBOOK.md](RUNBOOK.md) (per-Kind incident response) diff --git a/components/receivers/kineto/RUNBOOK.md b/components/receivers/kineto/RUNBOOK.md new file mode 100644 index 00000000..1320140c --- /dev/null +++ b/components/receivers/kineto/RUNBOOK.md @@ -0,0 +1,88 @@ +# kineto receiver — runbook + +Per-Kind alert and incident-response notes. Each section corresponds to one value of `kind` in `tracecore_receiver_errors_total{component="kineto",kind=…}` or to the kineto-specific `tracecore.kineto.unknown_category` counter. + +## kind=read + +**Trigger.** Receiver could not open a trace file. + +**Likely causes.** +- File rotated or deleted between fsnotify event and `os.Open`. +- Filesystem permission change on `watch_dir`. +- Pod-level mount disappeared (volume unmount, restart). + +**Investigation.** +1. `kubectl exec` into the receiver pod and `ls -la `. +2. Check kubelet logs for volume-unmount events around the failure window. +3. Confirm `watch_dir` is RW for the pod's runAsUser. + +**Remediation.** Restore filesystem access or reconfigure `watch_dir`. The receiver auto-recovers; no restart needed once reads succeed. + +## kind=parse + +**Trigger.** Chrome-trace JSON malformed, truncated, or gzip-corrupt. + +**Likely causes.** +- torch process killed mid-write (fsnotify fired `Create` but file never closed cleanly). +- Disk full during write. +- Custom `tensorboard_trace_handler` writing non-standard JSON. + +**Investigation.** +1. `ls -la ` — files with size 0 or truncated tails. +2. `head -c 200 ` to confirm valid JSON prefix. +3. Check torch process exit status in pod logs. + +**Remediation.** Bad files are skipped (one log line per file). If failure ratio exceeds `degraded_failure_ratio`, the receiver flips degraded; alert on `tracecore_receiver_degraded_seconds_total` rising. + +## kind=downstream + +**Trigger.** Next consumer (typically an exporter) rejected `ConsumeTraces`. + +**Likely causes.** Exporter backpressure, OTLP endpoint down, schema mismatch downstream. + +**Investigation.** Inspect the exporter's `tracecore_exporter_calls_total{result="fail"}` counter and its own RUNBOOK. + +**Remediation.** Resolve downstream; the receiver does not retry but will succeed on the next file. + +## kind=step_id_missing + +**Trigger.** A trace file contained no `ProfilerStep#N` events. Bumped once per file (not per event). + +**Likely causes.** +- Profile captured outside `torch.profiler.schedule()` (warmup or shutdown profile). +- Custom profiling code not using `record_function("ProfilerStep#N")`. +- Operator override `step_filename_regex` unset for non-standard handler. + +**Investigation.** `grep ProfilerStep ` — empty = no step markers. + +**Remediation.** If intentional (warmup profile), no action needed; the spans still emit without `gen_ai.training.step_id`. Otherwise wrap profile collection in `torch.profiler.schedule()` OR configure `step_filename_regex` to extract the step from the filename. + +## kind=limit_exceeded + +**Trigger.** Resource bound in `pkg/kineto` exceeded (MaxEvents=16M, MaxBytes=4 GiB, MaxStringBytes=1 MiB). + +**Likely causes.** Pathological trace (16M+ events or 4 GiB+ raw bytes) or malicious input. + +**Investigation.** `wc -c ` for size; `grep -c '"ph"' ` for event count. + +**Remediation.** If legitimate large profile, increase bounds via a follow-up RFC. Bounds are intentionally conservative. + +## kind=panic + +**Trigger.** A panic was recovered inside `internal/safe.Call`. + +**Likely causes.** Bug in pkg/kineto parser, malformed input edge case, OTel SDK panic. + +**Investigation.** Pod logs include the panic trace via `safe.Call`'s wrap. + +**Remediation.** File a tracecore bug. The receiver continues; degraded flips if the failure rate exceeds threshold. + +## tracecore.kineto.unknown_category (per-receiver custom counter) + +**Trigger.** A Kineto event with a category outside the consumed set was observed. + +**Likely causes.** New PyTorch profiler version introduced new categories; operator profiled an MTIA/XPU/HPU device tracecore does not currently consume. + +**Investigation.** PromQL `topk(5, sum by (kineto.category) (rate(tracecore_kineto_unknown_category[5m])))` to see which categories. + +**Remediation.** Operationally benign. File a follow-up to add consumption support if the category is high-volume or semantically important. diff --git a/components/receivers/kineto/aggregation_test.go b/components/receivers/kineto/aggregation_test.go new file mode 100644 index 00000000..f617698a --- /dev/null +++ b/components/receivers/kineto/aggregation_test.go @@ -0,0 +1,68 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "github.com/tracecoreai/tracecore/internal/selftelemetry" + "github.com/tracecoreai/tracecore/pkg/kineto" +) + +func writeAggregateFixture(t *testing.T, dir string) string { + t.Helper() + spec := kineto.Spec{ + SchemaVersion: 1.0, Pid: 1, Tid: 1, + Steps: []kineto.StepSpec{{StepID: 0, StartTs: 0, EndTs: 1000, Events: []kineto.Event{ + {Name: "aten::addmm", Cat: kineto.CatCPUOp, Ph: kineto.PhaseComplete, Ts: 100, Dur: 10, Pid: 1, Tid: 1}, + {Name: "aten::addmm", Cat: kineto.CatCPUOp, Ph: kineto.PhaseComplete, Ts: 120, Dur: 10, Pid: 1, Tid: 1}, + {Name: "aten::addmm", Cat: kineto.CatCPUOp, Ph: kineto.PhaseComplete, Ts: 140, Dur: 10, Pid: 1, Tid: 1}, + {Name: "aten::addmm", Cat: kineto.CatCPUOp, Ph: kineto.PhaseComplete, Ts: 160, Dur: 10, Pid: 1, Tid: 1}, + {Name: "aten::addmm", Cat: kineto.CatCPUOp, Ph: kineto.PhaseComplete, Ts: 180, Dur: 10, Pid: 1, Tid: 1}, + }}}, + } + path := filepath.Join(dir, fmt.Sprintf("five_identical_%s.pt.trace.json", t.Name())) + f, err := os.Create(path) //nolint:gosec // test fixture path under t.TempDir + require.NoError(t, err) + defer func() { _ = f.Close() }() + require.NoError(t, kineto.Synthesize(spec, f)) + return path +} + +func TestAggregation_FiveIntoOne(t *testing.T) { + dir := t.TempDir() + path := writeAggregateFixture(t, dir) + sink := newTracesSink() + st, _ := newSelftel(selftelemetry.NewNoopReceiver(), nil) + cfg := newDefaultConfig(t) + cfg.WatchDir = dir + cfg.SampleRate = 1.0 + cfg.Aggregate = true + ing := &ingester{cfg: cfg, selftel: st, next: sink, rank: Rank{Rank: 0, WorldSize: 1}} + require.NoError(t, ing.ingest(context.Background(), path)) + require.Equal(t, 1, countSpansByCat(sink, "cpu_op")) + span, ok := firstSpanByCat(sink, "cpu_op") + require.True(t, ok) + v, ok := span.Attributes().Get("repeat.count") + require.True(t, ok) + require.Equal(t, int64(5), v.Int()) +} + +func TestAggregation_DisabledKeepsFive(t *testing.T) { + dir := t.TempDir() + path := writeAggregateFixture(t, dir) + sink := newTracesSink() + st, _ := newSelftel(selftelemetry.NewNoopReceiver(), nil) + cfg := newDefaultConfig(t) + cfg.WatchDir = dir + cfg.SampleRate = 1.0 + cfg.Aggregate = false + ing := &ingester{cfg: cfg, selftel: st, next: sink, rank: Rank{Rank: 0, WorldSize: 1}} + require.NoError(t, ing.ingest(context.Background(), path)) + require.Equal(t, 5, countSpansByCat(sink, "cpu_op")) +} diff --git a/components/receivers/kineto/config.go b/components/receivers/kineto/config.go new file mode 100644 index 00000000..6391f132 --- /dev/null +++ b/components/receivers/kineto/config.go @@ -0,0 +1,57 @@ +// SPDX-License-Identifier: Apache-2.0 + +// Package kineto implements the M14 Kineto profiler receiver. See +// docs/rfcs/0012-kineto-receiver-scope.md for the design contract. +package kineto + +import ( + "errors" + "fmt" + "time" + + "github.com/tracecoreai/tracecore/internal/pipeline" +) + +// Config is the operator-facing receiver configuration. +type Config struct { + WatchDir string `mapstructure:"watch_dir"` + WatchSubdirs bool `mapstructure:"watch_subdirs"` + MaxWatchDepth int `mapstructure:"max_watch_depth"` + SampleRate float64 `mapstructure:"sample_rate"` + Aggregate bool `mapstructure:"aggregate"` + DeleteAfterIngest bool `mapstructure:"delete_after_ingest"` + StepFilenameRegex string `mapstructure:"step_filename_regex"` + IngestTimeout time.Duration `mapstructure:"ingest_timeout"` + DegradedFailureRatio float64 `mapstructure:"degraded_failure_ratio"` +} + +// defaultConfig returns a config with safe defaults. WatchDir is empty +// (required from the operator). +func defaultConfig() pipeline.Config { + return &Config{ + MaxWatchDepth: 8, + SampleRate: 0.01, + IngestTimeout: 30 * time.Second, + DegradedFailureRatio: 0.25, + } +} + +// Validate implements pipeline.Config. +func (c *Config) Validate() error { + if c.WatchDir == "" { + return errors.New("kineto: watch_dir is required") + } + if c.SampleRate < 0 || c.SampleRate > 1 { + return fmt.Errorf("kineto: sample_rate %v out of range [0,1]", c.SampleRate) + } + if c.MaxWatchDepth < 1 { + return errors.New("kineto: max_watch_depth must be >= 1") + } + if c.IngestTimeout < time.Second { + return errors.New("kineto: ingest_timeout must be >= 1s") + } + if c.DegradedFailureRatio < 0 || c.DegradedFailureRatio > 1 { + return fmt.Errorf("kineto: degraded_failure_ratio %v out of range [0,1]", c.DegradedFailureRatio) + } + return nil +} diff --git a/components/receivers/kineto/config_test.go b/components/receivers/kineto/config_test.go new file mode 100644 index 00000000..706dcf70 --- /dev/null +++ b/components/receivers/kineto/config_test.go @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestConfig_ValidateRequiresWatchDir(t *testing.T) { + cfg := &Config{} + err := cfg.Validate() + require.Error(t, err) + require.Contains(t, err.Error(), "watch_dir") +} + +func TestConfig_Defaults(t *testing.T) { + raw := defaultConfig() + cfg, ok := raw.(*Config) + require.True(t, ok) + cfg.WatchDir = "/var/log/torch" + require.NoError(t, cfg.Validate()) + require.InDelta(t, 0.01, cfg.SampleRate, 0.0) + require.False(t, cfg.Aggregate) + require.False(t, cfg.DeleteAfterIngest) + require.Equal(t, 30*time.Second, cfg.IngestTimeout) + require.Equal(t, 8, cfg.MaxWatchDepth) +} + +func TestConfig_SampleRateRange(t *testing.T) { + cfg := newDefaultConfig(t) + cfg.WatchDir = "/tmp" + cfg.SampleRate = 1.5 + require.ErrorContains(t, cfg.Validate(), "sample_rate") + cfg.SampleRate = -0.1 + require.ErrorContains(t, cfg.Validate(), "sample_rate") +} diff --git a/components/receivers/kineto/emit.go b/components/receivers/kineto/emit.go new file mode 100644 index 00000000..d4dc5649 --- /dev/null +++ b/components/receivers/kineto/emit.go @@ -0,0 +1,53 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "github.com/tracecoreai/tracecore/pkg/kineto" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +// eventToSpan maps one kineto.Event onto a ptrace.Span. Ts/Dur are in +// microseconds in the Chrome trace; convert to nanoseconds for OTel. +// stepIDPlus1 uses the StepTracker's +1 encoding: 0 means "no enclosing +// step window" (attribute omitted); >0 means the actual step is +// stepIDPlus1-1. repeat>1 emits a repeat.count attribute. +func eventToSpan(span ptrace.Span, ev kineto.Event, stepIDPlus1 uint64, repeat uint32) { + span.SetName(ev.Name) + span.SetStartTimestamp(pcommon.Timestamp(ev.Ts * 1000)) + span.SetEndTimestamp(pcommon.Timestamp((ev.Ts + ev.Dur) * 1000)) + + attrs := span.Attributes() + attrs.PutStr("kineto.category", ev.Cat) + attrs.PutInt("pid", int64(ev.Pid)) + attrs.PutInt("tid", int64(ev.Tid)) + if ev.Args.ExternalID != "" { + attrs.PutStr("kineto.external_id", ev.Args.ExternalID) + } + if ev.Args.Stream != 0 { + attrs.PutInt("stream.id", ev.Args.Stream) + } + if ev.Args.Device != 0 { + attrs.PutInt("device.id", ev.Args.Device) + } + if stepIDPlus1 != 0 { + //nolint:gosec // step_id is a non-negative event-derived index + attrs.PutInt("gen_ai.training.step_id", int64(stepIDPlus1-1)) + } + if repeat > 1 { + attrs.PutInt("repeat.count", int64(repeat)) + } +} + +// resourceAttrs sets the per-ingest resource attributes on rs. +func resourceAttrs(rs ptrace.ResourceSpans, r Rank) { + attrs := rs.Resource().Attributes() + attrs.PutInt("gen_ai.training.rank", int64(r.Rank)) + attrs.PutInt("gen_ai.training.world_size", int64(r.WorldSize)) + attrs.PutInt("gen_ai.training.local_rank", int64(r.LocalRank)) + if r.JobID != "" { + attrs.PutStr("gen_ai.training.job.id", r.JobID) + } + attrs.PutStr("service.name", "tracecore-kineto") +} diff --git a/components/receivers/kineto/errors.go b/components/receivers/kineto/errors.go new file mode 100644 index 00000000..dcd5f0b4 --- /dev/null +++ b/components/receivers/kineto/errors.go @@ -0,0 +1,13 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import "github.com/tracecoreai/tracecore/internal/selftelemetry" + +// Receiver-local Kind values. Canonical kinds (Connect, Parse, +// Downstream, Read, etc.) live in internal/selftelemetry/interface.go; +// these are kineto-specific extensions per the dcgm/kernelevents precedent. +const ( + KindStepIDMissing selftelemetry.Kind = "step_id_missing" + KindLimitExceeded selftelemetry.Kind = "limit_exceeded" +) diff --git a/components/receivers/kineto/example_config.yaml b/components/receivers/kineto/example_config.yaml new file mode 100644 index 00000000..4278d7f5 --- /dev/null +++ b/components/receivers/kineto/example_config.yaml @@ -0,0 +1,10 @@ +receivers: + kineto: + watch_dir: /var/log/torch + watch_subdirs: true + max_watch_depth: 4 + sample_rate: 0.01 + aggregate: true + delete_after_ingest: false + ingest_timeout: 30s + degraded_failure_ratio: 0.25 diff --git a/components/receivers/kineto/factory.go b/components/receivers/kineto/factory.go new file mode 100644 index 00000000..5d562ea1 --- /dev/null +++ b/components/receivers/kineto/factory.go @@ -0,0 +1,62 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "context" + "fmt" + + "github.com/tracecoreai/tracecore/internal/consumer" + "github.com/tracecoreai/tracecore/internal/pipeline" + "github.com/tracecoreai/tracecore/internal/selftelemetry" +) + +func componentType() pipeline.Type { return pipeline.MustNewType("kineto") } + +// Factory is the package-scoped ReceiverFactory for kineto. Mirrors +// kernelevents.Factory in shape. Only CreateTraces returns a real +// Receiver; CreateMetrics and CreateLogs return pipeline.ErrSignalNotSupported. +var Factory pipeline.ReceiverFactory = &factory{} + +// NewFactory returns the package-var Factory. Required by +// tools/components-gen, which generates calls like `kineto.NewFactory()`. +func NewFactory() pipeline.ReceiverFactory { return Factory } + +type factory struct{} + +func (*factory) Type() pipeline.Type { return componentType() } + +func (*factory) CreateDefaultConfig() pipeline.Config { return defaultConfig() } + +func (*factory) CreateMetrics(_ context.Context, _ pipeline.CreateSettings, _ pipeline.Config, _ consumer.Metrics) (pipeline.Receiver, error) { + return nil, pipeline.ErrSignalNotSupported +} + +func (*factory) CreateLogs(_ context.Context, _ pipeline.CreateSettings, _ pipeline.Config, _ consumer.Logs) (pipeline.Receiver, error) { + return nil, pipeline.ErrSignalNotSupported +} + +func (*factory) CreateTraces(ctx context.Context, set pipeline.CreateSettings, cfg pipeline.Config, next consumer.Traces) (pipeline.Receiver, error) { + c, ok := cfg.(*Config) + if !ok { + return nil, fmt.Errorf("kineto: unexpected config type %T", cfg) + } + r, err := newReceiver(set, c, next) + if err != nil { + return nil, err + } + if set.Telemetry.MeterProvider != nil { + if rt, err := selftelemetry.NewReceiver(set.ID, set.Telemetry.MeterProvider); err == nil { + r.telemetry = rt + } else { + selftelemetry.RecordInitError(ctx, set.Telemetry.MeterProvider, + "receiver", set.ID.String(), selftelemetry.ReasonInstrumentRegister) + if set.Telemetry.Logger != nil { + set.Telemetry.Logger.Warn("kineto self-telemetry init failed; using noop", "err", err.Error()) + } + } + } else if set.Telemetry.Logger != nil { + set.Telemetry.Logger.Warn("kineto: no MeterProvider; self-telemetry using noop") + } + return r, nil +} diff --git a/components/receivers/kineto/factory_test.go b/components/receivers/kineto/factory_test.go new file mode 100644 index 00000000..f657e5b7 --- /dev/null +++ b/components/receivers/kineto/factory_test.go @@ -0,0 +1,44 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "github.com/tracecoreai/tracecore/internal/pipeline" +) + +func TestFactory_Type(t *testing.T) { + require.Equal(t, "kineto", NewFactory().Type().String()) +} + +func TestFactory_UnsupportedSignals(t *testing.T) { + f := NewFactory() + cfg := f.CreateDefaultConfig() + ctx := context.Background() + set := pipeline.CreateSettings{} + _, err := f.CreateMetrics(ctx, set, cfg, nil) + require.ErrorIs(t, err, pipeline.ErrSignalNotSupported) + _, err = f.CreateLogs(ctx, set, cfg, nil) + require.ErrorIs(t, err, pipeline.ErrSignalNotSupported) +} + +func TestFactory_CreateTracesHappyPath(t *testing.T) { + f := NewFactory() + cfgRaw := f.CreateDefaultConfig() + cfg, ok := cfgRaw.(*Config) + require.True(t, ok) + cfg.WatchDir = t.TempDir() + rcv, err := f.CreateTraces(context.Background(), testCreateSettings(t), cfg, newTracesSink()) + require.NoError(t, err) + require.NotNil(t, rcv) +} + +func TestFactory_RejectsWrongConfigType(t *testing.T) { + f := NewFactory() + type other struct{ pipeline.Config } + _, err := f.CreateTraces(context.Background(), testCreateSettings(t), other{}, newTracesSink()) + require.ErrorContains(t, err, "unexpected config type") +} diff --git a/components/receivers/kineto/ingest.go b/components/receivers/kineto/ingest.go new file mode 100644 index 00000000..c705d627 --- /dev/null +++ b/components/receivers/kineto/ingest.go @@ -0,0 +1,150 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "compress/gzip" + "context" + "errors" + "fmt" + "io" + "os" + "strings" + + "github.com/tracecoreai/tracecore/internal/consumer" + "github.com/tracecoreai/tracecore/internal/selftelemetry" + "github.com/tracecoreai/tracecore/pkg/kineto" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +type ingester struct { + cfg *Config + selftel *selftel + next consumer.Traces + rank Rank +} + +// pendingEvent buffers the most recently visited consumed event so the +// aggregation toggle can fold consecutive identical cpu_op events. +// stepIDPlus1 uses the StepTracker's +1 encoding: 0 means no step. +type pendingEvent struct { + ev kineto.Event + stepIDPlus1 uint64 + repeat uint32 +} + +// ingestState carries per-file state across the streaming visit calls. +type ingestState struct { + tracker *StepTracker + ss ptrace.ScopeSpans + prev *pendingEvent + stepIDMissingBumped bool +} + +// handleEvent processes one parsed kineto.Event: updates the step +// tracker, drops unknown categories (to the kineto.unknown_category +// counter), samples, and either folds into prev (aggregation) or flushes +// prev and replaces it with the current event. +func (i *ingester) handleEvent(ctx context.Context, st *ingestState, ev kineto.Event) { + st.tracker.MaybeApply(ev) + if _, ok := kineto.ConsumedCategories[ev.Cat]; !ok { + i.selftel.bumpUnknownCategory(ctx, ev.Cat) + return + } + stepIDPlus1 := st.tracker.LookupAtTs(ev.Ts) + if stepIDPlus1 == 0 && !st.stepIDMissingBumped { + i.selftel.receiver.IncError(KindStepIDMissing) + st.stepIDMissingBumped = true + } + var actualStepID uint64 + if stepIDPlus1 > 0 { + actualStepID = stepIDPlus1 - 1 + } + if !sampleKeep(i.rank.Rank, actualStepID, ev.Ts, i.cfg.SampleRate) { + return + } + if i.cfg.Aggregate && st.prev != nil && + st.prev.ev.Cat == kineto.CatCPUOp && ev.Cat == kineto.CatCPUOp && + st.prev.ev.Name == ev.Name && st.prev.stepIDPlus1 == stepIDPlus1 { + st.prev.repeat++ + endTs := ev.Ts + ev.Dur + if endTs > st.prev.ev.Ts+st.prev.ev.Dur { + st.prev.ev.Dur = endTs - st.prev.ev.Ts + } + return + } + if st.prev != nil { + flushPending(st.ss, st.prev) + } + st.prev = &pendingEvent{ev: ev, stepIDPlus1: stepIDPlus1, repeat: 1} +} + +// ingest parses one trace file end-to-end, sampling and aggregating +// events, then pushes a ptrace.Traces to the next consumer. Errors are +// classified and bumped to selftel; the file is auto-deleted only when +// cfg.DeleteAfterIngest is true and the consumer accepted the batch. +func (i *ingester) ingest(ctx context.Context, path string) error { + f, err := os.Open(path) //nolint:gosec // operator-provided WatchDir + if err != nil { + i.selftel.receiver.IncError(selftelemetry.KindRead) + return fmt.Errorf("kineto: open %s: %w", path, err) + } + defer func() { _ = f.Close() }() + + var r io.Reader = f + if strings.HasSuffix(path, ".gz") { + gz, err := gzip.NewReader(f) + if err != nil { + i.selftel.receiver.IncError(selftelemetry.KindParse) + return fmt.Errorf("kineto: gzip %s: %w", path, err) + } + defer func() { _ = gz.Close() }() + r = gz + } + + tracker := NewStepTracker() + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + resourceAttrs(rs, i.rank) + ss := rs.ScopeSpans().AppendEmpty() + ss.Scope().SetName("tracecore.kineto") + + state := &ingestState{tracker: tracker, ss: ss} + parseErr := kineto.Parse(r, func(ev kineto.Event) error { + i.handleEvent(ctx, state, ev) + return nil + }) + if state.prev != nil { + flushPending(ss, state.prev) + } + if parseErr != nil { + var kind selftelemetry.Kind + switch { + case errors.Is(parseErr, kineto.ErrLimitExceeded): + kind = KindLimitExceeded + default: + kind = selftelemetry.KindParse + } + i.selftel.receiver.IncError(kind) + return fmt.Errorf("kineto: parse %s: %w", path, parseErr) + } + + if traces.SpanCount() == 0 { + return nil + } + if err := i.next.ConsumeTraces(ctx, traces); err != nil { + i.selftel.receiver.IncError(selftelemetry.KindDownstream) + return fmt.Errorf("kineto: consume %s: %w", path, err) + } + //nolint:gosec // SpanCount() is bounded by MaxEvents (1<<24) + i.selftel.receiver.IncEmissions(int64(traces.SpanCount())) + if i.cfg.DeleteAfterIngest { + _ = os.Remove(path) + } + return nil +} + +func flushPending(ss ptrace.ScopeSpans, p *pendingEvent) { + span := ss.Spans().AppendEmpty() + eventToSpan(span, p.ev, p.stepIDPlus1, p.repeat) +} diff --git a/components/receivers/kineto/ingest_test.go b/components/receivers/kineto/ingest_test.go new file mode 100644 index 00000000..dbeaba9d --- /dev/null +++ b/components/receivers/kineto/ingest_test.go @@ -0,0 +1,57 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tracecoreai/tracecore/internal/selftelemetry" +) + +func TestIngestFile_TwoStepGoldenFixture(t *testing.T) { + sink := newTracesSink() + st, err := newSelftel(selftelemetry.NewNoopReceiver(), nil) + require.NoError(t, err) + + cfg := newDefaultConfig(t) + cfg.WatchDir = "testdata" + cfg.SampleRate = 1.0 // keep everything + + ing := &ingester{ + cfg: cfg, selftel: st, next: sink, + rank: Rank{Rank: 3, WorldSize: 8, LocalRank: 3, JobID: "job-abc"}, + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + require.NoError(t, ing.ingest(ctx, "testdata/toy_2step.pt.trace.json")) + + traces := sink.traces() + require.Len(t, traces, 1) + rs := traces[0].ResourceSpans().At(0) + + // Resource attrs: rank 3, world_size 8. + rk, ok := rs.Resource().Attributes().Get("gen_ai.training.rank") + require.True(t, ok) + require.Equal(t, int64(3), rk.Int()) + ws, ok := rs.Resource().Attributes().Get("gen_ai.training.world_size") + require.True(t, ok) + require.Equal(t, int64(8), ws.Int()) + job, ok := rs.Resource().Attributes().Get("gen_ai.training.job.id") + require.True(t, ok) + require.Equal(t, "job-abc", job.Str()) + + // Two-step fixture: each step contains 1 cpu_op + 1 kernel. + // ProfilerStep B/E markers also emit as user_annotation spans. + require.Equal(t, 2, countSpansByCat(sink, "cpu_op")) + require.Equal(t, 2, countSpansByCat(sink, "kernel")) + require.Equal(t, 4, countSpansByCat(sink, "user_annotation")) + // Step-id propagated. + sp, ok := firstSpanByCat(sink, "cpu_op") + require.True(t, ok) + stepID, ok := sp.Attributes().Get("gen_ai.training.step_id") + require.True(t, ok) + require.Equal(t, int64(0), stepID.Int()) +} diff --git a/components/receivers/kineto/rank.go b/components/receivers/kineto/rank.go new file mode 100644 index 00000000..ca5b264a --- /dev/null +++ b/components/receivers/kineto/rank.go @@ -0,0 +1,111 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "bytes" + "fmt" + "os" + "regexp" + "strconv" +) + +// Rank holds discovered rank attribution for a traced process. +// Zero value is "no rank discovered". +type Rank struct { + Rank uint32 + WorldSize uint32 + LocalRank uint32 + JobID string + + // ProcReader reads the env-vars of pid as a map. Defaults to + // readProcEnviron (reads /proc//environ). Overridable for tests. + ProcReader func(pid int) (map[string]string, error) `json:"-"` + + // EnvReader reads a receiver-process env var. Defaults to os.Getenv. + // Overridable for tests. + EnvReader func(key string) string `json:"-"` +} + +// Discover attempts (1) /proc//environ, (2) receiver-process env. +// Returns ok=true if at least RANK + WORLD_SIZE are determined. +func (r Rank) Discover(pid int) (Rank, bool) { + if r.ProcReader != nil { + if env, err := r.ProcReader(pid); err == nil { + if rk, ok := buildRank(env); ok { + return rk, true + } + } + } + envFn := r.EnvReader + if envFn == nil { + envFn = os.Getenv + } + env := map[string]string{ + "RANK": envFn("RANK"), + "WORLD_SIZE": envFn("WORLD_SIZE"), + "LOCAL_RANK": envFn("LOCAL_RANK"), + "TORCHELASTIC_RUN_ID": envFn("TORCHELASTIC_RUN_ID"), + "SLURM_JOB_ID": envFn("SLURM_JOB_ID"), + "TRAINING_JOB_NAME": envFn("TRAINING_JOB_NAME"), + } + return buildRank(env) +} + +func buildRank(env map[string]string) (Rank, bool) { + rk, err1 := strconv.ParseUint(env["RANK"], 10, 32) + ws, err2 := strconv.ParseUint(env["WORLD_SIZE"], 10, 32) + if err1 != nil || err2 != nil { + return Rank{}, false + } + lr, _ := strconv.ParseUint(env["LOCAL_RANK"], 10, 32) + jobID := env["TORCHELASTIC_RUN_ID"] + if jobID == "" { + jobID = env["SLURM_JOB_ID"] + } + if jobID == "" { + jobID = env["TRAINING_JOB_NAME"] + } + return Rank{ + //nolint:gosec // env-derived values, range-bounded by ParseUint(32) + Rank: uint32(rk), WorldSize: uint32(ws), LocalRank: uint32(lr), JobID: jobID, + }, true +} + +// readProcEnviron reads /proc//environ and parses it into a map. +func readProcEnviron(pid int) (map[string]string, error) { + path := fmt.Sprintf("/proc/%d/environ", pid) + data, err := os.ReadFile(path) //nolint:gosec // pid-derived path is intentional + if err != nil { + return nil, fmt.Errorf("kineto: read %s: %w", path, err) + } + m := make(map[string]string) + for _, entry := range bytes.Split(data, []byte{0}) { + if len(entry) == 0 { + continue + } + eq := bytes.IndexByte(entry, '=') + if eq < 0 { + continue + } + m[string(entry[:eq])] = string(entry[eq+1:]) + } + return m, nil +} + +var pidFilenameRegex = regexp.MustCompile(`^[^.]*_(\d+)\.`) + +// PidFromFilename extracts the pid from a tensorboard_trace_handler default +// filename: `{worker_name}.{ts_ns}.pt.trace.json`, where worker_name +// defaults to `{hostname}_{pid}`. Returns (pid, true) on match. +func PidFromFilename(name string) (int, bool) { + m := pidFilenameRegex.FindStringSubmatch(name) + if m == nil { + return 0, false + } + pid, err := strconv.Atoi(m[1]) + if err != nil { + return 0, false + } + return pid, true +} diff --git a/components/receivers/kineto/rank_test.go b/components/receivers/kineto/rank_test.go new file mode 100644 index 00000000..2cd123fd --- /dev/null +++ b/components/receivers/kineto/rank_test.go @@ -0,0 +1,76 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDiscoverRank_FromProcEnviron(t *testing.T) { + r := Rank{ProcReader: func(pid int) (map[string]string, error) { + require.Equal(t, 1234, pid) + return map[string]string{ + "RANK": "3", + "WORLD_SIZE": "8", + "LOCAL_RANK": "3", + "TORCHELASTIC_RUN_ID": "abc-123", + }, nil + }} + got, ok := r.Discover(1234) + require.True(t, ok) + require.Equal(t, uint32(3), got.Rank) + require.Equal(t, uint32(8), got.WorldSize) + require.Equal(t, uint32(3), got.LocalRank) + require.Equal(t, "abc-123", got.JobID) +} + +func TestDiscoverRank_FilenameFallback(t *testing.T) { + r := Rank{ + ProcReader: func(int) (map[string]string, error) { return nil, os.ErrNotExist }, + EnvReader: func(string) string { return "" }, + } + pid, ok := PidFromFilename("hostname_5678.1700000000000000000.pt.trace.json") + require.True(t, ok) + require.Equal(t, 5678, pid) + _, ok = r.Discover(pid) + require.False(t, ok) +} + +func TestDiscoverRank_ReceiverEnvFallback(t *testing.T) { + r := Rank{ + ProcReader: func(int) (map[string]string, error) { return nil, os.ErrNotExist }, + EnvReader: func(k string) string { + return map[string]string{ + "RANK": "1", "WORLD_SIZE": "4", "LOCAL_RANK": "1", + }[k] + }, + } + got, ok := r.Discover(9999) + require.True(t, ok) + require.Equal(t, uint32(1), got.Rank) + require.Equal(t, uint32(4), got.WorldSize) +} + +func TestPidFromFilename(t *testing.T) { + cases := []struct { + name string + pid int + wantOK bool + }{ + {"host_1234.1700000000000000000.pt.trace.json", 1234, true}, + {"h_abc_1234.1.pt.trace.json", 1234, true}, + {"some-other.pt.trace.json", 0, false}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + got, ok := PidFromFilename(c.name) + require.Equal(t, c.wantOK, ok) + if c.wantOK { + require.Equal(t, c.pid, got) + } + }) + } +} diff --git a/components/receivers/kineto/receiver.go b/components/receivers/kineto/receiver.go new file mode 100644 index 00000000..84665314 --- /dev/null +++ b/components/receivers/kineto/receiver.go @@ -0,0 +1,216 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "context" + "fmt" + "io/fs" + "log/slog" + "path/filepath" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/fsnotify/fsnotify" + "github.com/tracecoreai/tracecore/internal/consumer" + "github.com/tracecoreai/tracecore/internal/pipeline" + "github.com/tracecoreai/tracecore/internal/safe" + "github.com/tracecoreai/tracecore/internal/selftelemetry" +) + +// kinetoReceiver is the M14 Kineto profiler receiver. +type kinetoReceiver struct { + pipeline.ComponentState + + set pipeline.CreateSettings + cfg *Config + selftel *selftel + next consumer.Traces + telemetry selftelemetry.Receiver + + watcher *fsnotify.Watcher + cancel context.CancelFunc + wg sync.WaitGroup + + parsedOK atomic.Uint64 + parsedFail atomic.Uint64 + degraded atomic.Bool +} + +func newReceiver(set pipeline.CreateSettings, cfg *Config, next consumer.Traces) (*kinetoReceiver, error) { + st, err := newSelftel(selftelemetry.NewNoopReceiver(), set.Telemetry.MeterProvider) + if err != nil { + return nil, err + } + return &kinetoReceiver{ + set: set, cfg: cfg, selftel: st, next: next, + telemetry: selftelemetry.NewNoopReceiver(), + }, nil +} + +func (r *kinetoReceiver) logger() *slog.Logger { + if r.set.Telemetry.Logger != nil { + return r.set.Telemetry.Logger + } + return slog.Default() +} + +// Start implements pipeline.Component. +func (r *kinetoReceiver) Start(ctx context.Context, host pipeline.Host) error { + if err := r.ComponentState.Start(ctx, host); err != nil { + return err + } + w, err := fsnotify.NewWatcher() + if err != nil { + return fmt.Errorf("kineto: fsnotify: %w", err) + } + r.watcher = w + if err := r.addWatches(); err != nil { + // Degraded boot: WatchDir missing or unreadable. Don't fail Start + // (operators can create the dir later); flip degraded so + // `tracecore_receiver_degraded_seconds_total` ticks immediately. + r.logger().Warn("kineto: watch_dir not watchable, entering degraded", "err", err.Error()) + r.degraded.Store(true) + r.telemetry.SetDegraded(true) + } + // Detach from Start's ctx: per pipeline.Component contract, Start + // ctx is for the boot operation only; background goroutines run + // for the receiver's lifetime and stop via Shutdown. + runCtx, cancel := context.WithCancel(context.Background()) + r.cancel = cancel + r.wg.Add(1) + go r.run(runCtx) //nolint:contextcheck // intentional detach; see comment above + return nil +} + +func (r *kinetoReceiver) addWatches() error { + if !r.cfg.WatchSubdirs { + if err := r.watcher.Add(r.cfg.WatchDir); err != nil { + return fmt.Errorf("watch %s: %w", r.cfg.WatchDir, err) + } + return nil + } + walkErr := filepath.WalkDir(r.cfg.WatchDir, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if !d.IsDir() { + return nil + } + rel, err := filepath.Rel(r.cfg.WatchDir, path) + if err != nil { + return fmt.Errorf("rel %s %s: %w", r.cfg.WatchDir, path, err) + } + depth := 0 + if rel != "." { + depth = strings.Count(rel, string(filepath.Separator)) + 1 + } + if depth > r.cfg.MaxWatchDepth { + return fs.SkipDir + } + if err := r.watcher.Add(path); err != nil { + return fmt.Errorf("watch %s: %w", path, err) + } + return nil + }) + if walkErr != nil { + return fmt.Errorf("walk %s: %w", r.cfg.WatchDir, walkErr) + } + return nil +} + +func (r *kinetoReceiver) run(ctx context.Context) { + defer r.wg.Done() + defer func() { _ = r.watcher.Close() }() + for { + select { + case <-ctx.Done(): + return + case ev, ok := <-r.watcher.Events: + if !ok { + return + } + if ev.Op&fsnotify.Write == 0 { + continue + } + if !matchKinetoFile(ev.Name) { + continue + } + r.ingestOne(ctx, ev.Name) + case err, ok := <-r.watcher.Errors: + if !ok { + return + } + r.logger().Warn("kineto: fsnotify error", "err", err.Error()) + } + } +} + +func matchKinetoFile(path string) bool { + return strings.HasSuffix(path, ".pt.trace.json") || + strings.HasSuffix(path, ".pt.trace.json.gz") +} + +func (r *kinetoReceiver) ingestOne(ctx context.Context, path string) { + start := time.Now() + ing := &ingester{cfg: r.cfg, selftel: r.selftel, next: r.next, rank: r.discoverRank(path)} + ingestCtx, cancel := context.WithTimeout(ctx, r.cfg.IngestTimeout) + defer cancel() + err := safe.Call(ingestCtx, "kineto.ingest", func(c context.Context) error { + return ing.ingest(c, path) + }) + r.telemetry.ObserveLatency(time.Since(start)) + if err != nil { + r.parsedFail.Add(1) + r.logger().Warn("kineto: ingest failed", "path", path, "err", err.Error()) + } else { + r.parsedOK.Add(1) + r.telemetry.MarkActivity() + } + r.maybeFlipDegraded() +} + +func (r *kinetoReceiver) discoverRank(path string) Rank { + rk := Rank{ProcReader: readProcEnviron} + if pid, ok := PidFromFilename(filepath.Base(path)); ok { + if got, ok := rk.Discover(pid); ok { + return got + } + } + got, _ := rk.Discover(0) + return got +} + +func (r *kinetoReceiver) maybeFlipDegraded() { + ok := r.parsedOK.Load() + fail := r.parsedFail.Load() + total := ok + fail + if total < 10 { + return + } + ratio := float64(fail) / float64(total) + want := ratio > r.cfg.DegradedFailureRatio + if r.degraded.Load() != want { + r.degraded.Store(want) + r.telemetry.SetDegraded(want) + } +} + +// Shutdown implements pipeline.Component. +func (r *kinetoReceiver) Shutdown(ctx context.Context) error { + if r.cancel != nil { + r.cancel() + } + done := make(chan struct{}) + go func() { r.wg.Wait(); close(done) }() + select { + case <-done: + case <-ctx.Done(): + return fmt.Errorf("kineto: shutdown: %w", ctx.Err()) + } + return r.ComponentState.Shutdown(ctx) +} + +var _ pipeline.Component = (*kinetoReceiver)(nil) diff --git a/components/receivers/kineto/sampling.go b/components/receivers/kineto/sampling.go new file mode 100644 index 00000000..91aed86d --- /dev/null +++ b/components/receivers/kineto/sampling.go @@ -0,0 +1,28 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "encoding/binary" + "hash/fnv" +) + +// sampleKeep is deterministic per (rank, stepID). When stepID==0 +// (event outside any ProfilerStep window), falls back to per-(rank, +// ts/1000) bucket so adjacent events stay coherent. sampleRate is in [0,1]. +// +// See RFC-0012 §Sampling. +func sampleKeep(rank uint32, stepID uint64, ts int64, sampleRate float64) bool { + var key uint64 + if stepID != 0 { + key = uint64(rank)<<32 | stepID + } else { + //nolint:gosec // intentional bucketing of negative ts to a stable key + key = uint64(rank)<<32 | uint64(ts/1000) + } + h := fnv.New64a() + var buf [8]byte + binary.LittleEndian.PutUint64(buf[:], key) + _, _ = h.Write(buf[:]) + return h.Sum64()%10000 < uint64(sampleRate*10000) +} diff --git a/components/receivers/kineto/sampling_test.go b/components/receivers/kineto/sampling_test.go new file mode 100644 index 00000000..c9d71053 --- /dev/null +++ b/components/receivers/kineto/sampling_test.go @@ -0,0 +1,62 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSampleKeep_Deterministic(t *testing.T) { + want := sampleKeep(7, 42, 1234567, 0.01) + for i := 0; i < 100; i++ { + require.Equal(t, want, sampleKeep(7, 42, 1234567, 0.01)) + } +} + +func TestSampleKeep_RankIsKey(t *testing.T) { + got := map[bool]int{} + for r := uint32(0); r < 100; r++ { + got[sampleKeep(r, 42, 0, 0.5)]++ + } + require.Positive(t, got[true]) + require.Positive(t, got[false]) +} + +func TestSampleKeep_KeepRateAtOnePercent(t *testing.T) { + kept := 0 + for s := uint64(1); s <= 1000; s++ { + if sampleKeep(0, s, 0, 0.01) { + kept++ + } + } + require.GreaterOrEqual(t, kept, 5) + require.LessOrEqual(t, kept, 20) +} + +func TestSampleKeep_FallbackWithoutStepID(t *testing.T) { + want := sampleKeep(0, 0, 1700, 0.5) + require.Equal(t, want, sampleKeep(0, 0, 1700, 0.5)) +} + +func TestSampleKeep_1000Step8Rank(t *testing.T) { + // Rubric-pinned assertion: deterministic per (rank, step); per-rank + // kept count within fnv collision slack. + for r := uint32(0); r < 8; r++ { + kept := 0 + for s := uint64(1); s <= 1000; s++ { + if sampleKeep(r, s, 0, 0.01) { + kept++ + } + } + require.GreaterOrEqualf(t, kept, 3, "rank=%d kept too few: %d", r, kept) + require.LessOrEqualf(t, kept, 25, "rank=%d kept too many: %d", r, kept) + } + for r := uint32(0); r < 8; r++ { + for s := uint64(1); s <= 100; s++ { + want := sampleKeep(r, s, 0, 0.01) + require.Equal(t, want, sampleKeep(r, s, 0, 0.01)) + } + } +} diff --git a/components/receivers/kineto/selftel.go b/components/receivers/kineto/selftel.go new file mode 100644 index 00000000..87a61f92 --- /dev/null +++ b/components/receivers/kineto/selftel.go @@ -0,0 +1,44 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "context" + "fmt" + + "github.com/tracecoreai/tracecore/internal/selftelemetry" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" +) + +// selftel bundles the canonical receiver-error counter with the +// kineto-specific unknown_category counter. The receiver-error counter +// (IncError etc.) lives on the canonical selftelemetry.Receiver surface; +// the unknown_category counter is a per-receiver custom metric, kept +// distinct so the kineto.category label cardinality does not pollute +// the cross-receiver tracecore_receiver_errors_total signal. +type selftel struct { + receiver selftelemetry.Receiver + unknownCategory metric.Int64Counter +} + +func newSelftel(rcv selftelemetry.Receiver, mp metric.MeterProvider) (*selftel, error) { + if rcv == nil { + rcv = selftelemetry.NewNoopReceiver() + } + if mp == nil { + mp = noop.NewMeterProvider() + } + meter := mp.Meter("github.com/tracecoreai/tracecore/components/receivers/kineto") + c, err := meter.Int64Counter("tracecore.kineto.unknown_category", + metric.WithDescription("Kineto event categories tracecore does not consume; bumped per occurrence labeled by kineto.category")) + if err != nil { + return nil, fmt.Errorf("kineto: register unknown_category counter: %w", err) + } + return &selftel{receiver: rcv, unknownCategory: c}, nil +} + +func (s *selftel) bumpUnknownCategory(ctx context.Context, cat string) { + s.unknownCategory.Add(ctx, 1, metric.WithAttributes(attribute.String("kineto.category", cat))) +} diff --git a/components/receivers/kineto/shutdown_test.go b/components/receivers/kineto/shutdown_test.go new file mode 100644 index 00000000..863ab3d4 --- /dev/null +++ b/components/receivers/kineto/shutdown_test.go @@ -0,0 +1,35 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestShutdown_RespectsDeadline(t *testing.T) { + cfg := newDefaultConfig(t) + cfg.WatchDir = t.TempDir() + r, err := newReceiver(testCreateSettings(t), cfg, newTracesSink()) + require.NoError(t, err) + require.NoError(t, r.Start(context.Background(), nil)) + + start := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + require.NoError(t, r.Shutdown(ctx)) + require.Less(t, time.Since(start), 1500*time.Millisecond) +} + +func TestDegraded_WatchDirMissing(t *testing.T) { + cfg := newDefaultConfig(t) + cfg.WatchDir = "/nonexistent/path/" + t.Name() + r, err := newReceiver(testCreateSettings(t), cfg, newTracesSink()) + require.NoError(t, err) + require.NoError(t, r.Start(context.Background(), nil)) + defer func() { _ = r.Shutdown(context.Background()) }() + require.True(t, r.degraded.Load(), "should be degraded when WatchDir missing") +} diff --git a/components/receivers/kineto/soak_test.go b/components/receivers/kineto/soak_test.go new file mode 100644 index 00000000..212b57fc --- /dev/null +++ b/components/receivers/kineto/soak_test.go @@ -0,0 +1,87 @@ +// SPDX-License-Identifier: Apache-2.0 + +//go:build soak + +package kineto + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/require" + "github.com/tracecoreai/tracecore/internal/selftelemetry" + "github.com/tracecoreai/tracecore/pkg/kineto" +) + +// TestSoak_100Files_RSS30MB exercises the M14 non-functional RSS rubric: +// ingest 100 sequential 50 MB traces; assert peak VmRSS stays under 30 MB. +// +// Run with `go test -tags soak -timeout 30m ./components/receivers/kineto/`. +func TestSoak_100Files_RSS30MB(t *testing.T) { + dir := t.TempDir() + for i := 0; i < 100; i++ { + path := filepath.Join(dir, fmt.Sprintf("host_%d.%d.pt.trace.json", i+1, i)) + require.NoError(t, writeSynthEvents(path, int32(i+1), 50*1024*1024/140)) + } + + sink := newTracesSink() + st, err := newSelftel(selftelemetry.NewNoopReceiver(), nil) + require.NoError(t, err) + cfg := newDefaultConfig(t) + cfg.WatchDir = dir + cfg.SampleRate = 0.01 + ing := &ingester{cfg: cfg, selftel: st, next: sink, rank: Rank{Rank: 0, WorldSize: 1}} + + entries, err := os.ReadDir(dir) + require.NoError(t, err) + for _, e := range entries { + _ = ing.ingest(context.Background(), filepath.Join(dir, e.Name())) + } + + rssKB, err := readVmRSS() + require.NoError(t, err) + require.Greater(t, rssKB, 0) + require.Less(t, rssKB, 30*1024, "VmRSS %d KB exceeds 30 MB ceiling", rssKB) +} + +func writeSynthEvents(path string, pid int32, eventCount int) error { + steps := make([]kineto.StepSpec, 0, eventCount/10) + for j := uint64(0); j < uint64(eventCount/10); j++ { + evs := make([]kineto.Event, 10) + for k := range evs { + evs[k] = kineto.Event{ + Name: "x", Cat: kineto.CatCPUOp, Ph: kineto.PhaseComplete, + Ts: int64(j*1000 + uint64(k)*10), Dur: 5, Pid: pid, Tid: pid, + } + } + steps = append(steps, kineto.StepSpec{StepID: j, StartTs: int64(j * 1000), EndTs: int64(j*1000 + 999), Events: evs}) + } + f, err := os.Create(path) + if err != nil { + return err + } + defer func() { _ = f.Close() }() + return kineto.Synthesize(kineto.Spec{SchemaVersion: 1.0, Pid: pid, Tid: pid, Steps: steps}, f) +} + +func readVmRSS() (int, error) { + data, err := os.ReadFile("/proc/self/status") + if err != nil { + return 0, err + } + for _, line := range strings.Split(string(data), "\n") { + if strings.HasPrefix(line, "VmRSS:") { + fields := strings.Fields(line) + if len(fields) < 2 { + return 0, fmt.Errorf("unexpected VmRSS line %q", line) + } + return strconv.Atoi(fields[1]) + } + } + return 0, fmt.Errorf("VmRSS not found in /proc/self/status") +} diff --git a/components/receivers/kineto/step_map.go b/components/receivers/kineto/step_map.go new file mode 100644 index 00000000..9cd2839f --- /dev/null +++ b/components/receivers/kineto/step_map.go @@ -0,0 +1,116 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "regexp" + "sort" + "strconv" + + "github.com/tracecoreai/tracecore/pkg/kineto" +) + +var profilerStepRegex = regexp.MustCompile(`^ProfilerStep#(\d+)$`) + +// StepTracker maintains the stack of currently-open ProfilerStep +// intervals as the parser streams events in ts order. It supports both +// B/E phase pairs and X complete-phase events. +// +// Step IDs are encoded with a +1 offset internally so the zero return +// value can mean "no enclosing step" without collision with step_id=0. +// Callers convert: actualStepID = Top()-1 when Top()>0. +type StepTracker struct { + stack []openStep // B/E pairs (innermost at end) + complete []openStep // X-phase intervals, sorted by EndTs + seen int // count of ProfilerStep events observed +} + +type openStep struct { + StepID uint64 + StartTs int64 + EndTs int64 // for X-phase intervals; ignored for B-stack entries +} + +// NewStepTracker returns an empty tracker. +func NewStepTracker() *StepTracker { return &StepTracker{} } + +// Top returns the innermost open B/E step_id+1, or 0 if none open. +func (t *StepTracker) Top() uint64 { + if len(t.stack) > 0 { + return t.stack[len(t.stack)-1].StepID + 1 + } + return 0 +} + +// LookupAtTs returns the step_id+1 of the innermost step covering ts, +// or 0 if no step does. Checks B/E stack first (always covers "now"), +// then X-phase intervals via binary search. +func (t *StepTracker) LookupAtTs(ts int64) uint64 { + if v := t.Top(); v != 0 { + if ts >= t.stack[len(t.stack)-1].StartTs { + return v + } + } + i := sort.Search(len(t.complete), func(i int) bool { + return t.complete[i].EndTs > ts + }) + if i < len(t.complete) && ts >= t.complete[i].StartTs && ts < t.complete[i].EndTs { + return t.complete[i].StepID + 1 + } + return 0 +} + +// Begin pushes a B-phase ProfilerStep onto the stack. +func (t *StepTracker) Begin(stepID uint64, ts int64) { + t.stack = append(t.stack, openStep{StepID: stepID, StartTs: ts}) + t.seen++ +} + +// End pops the matching B-phase entry. No-op if no matching entry exists +// (defensive against malformed traces). +func (t *StepTracker) End(stepID uint64) { + for i := len(t.stack) - 1; i >= 0; i-- { + if t.stack[i].StepID == stepID { + t.stack = append(t.stack[:i], t.stack[i+1:]...) + return + } + } +} + +// PushComplete records an X-phase ProfilerStep with explicit start + duration. +func (t *StepTracker) PushComplete(stepID uint64, ts, dur int64) { + t.complete = append(t.complete, openStep{StepID: stepID, StartTs: ts, EndTs: ts + dur}) + sort.Slice(t.complete, func(i, j int) bool { + return t.complete[i].EndTs < t.complete[j].EndTs + }) + t.seen++ +} + +// MaybeApply inspects an event; if it is a ProfilerStep#N user_annotation, +// updates the tracker and returns true. Otherwise returns false. +func (t *StepTracker) MaybeApply(ev kineto.Event) bool { + if ev.Cat != kineto.CatUserAnnotation { + return false + } + m := profilerStepRegex.FindStringSubmatch(ev.Name) + if m == nil { + return false + } + stepID, err := strconv.ParseUint(m[1], 10, 64) + if err != nil { + return false + } + switch ev.Ph { + case kineto.PhaseBegin: + t.Begin(stepID, ev.Ts) + case kineto.PhaseEnd: + t.End(stepID) + case kineto.PhaseComplete: + t.PushComplete(stepID, ev.Ts, ev.Dur) + } + return true +} + +// SeenAny reports whether any ProfilerStep event has been observed. +// Used by ingest to decide whether to fall back to StepFilenameRegex. +func (t *StepTracker) SeenAny() bool { return t.seen > 0 } diff --git a/components/receivers/kineto/step_map_test.go b/components/receivers/kineto/step_map_test.go new file mode 100644 index 00000000..65bcd579 --- /dev/null +++ b/components/receivers/kineto/step_map_test.go @@ -0,0 +1,62 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "testing" + + "github.com/stretchr/testify/require" + "github.com/tracecoreai/tracecore/pkg/kineto" +) + +func TestStepTracker_BeginEnd(t *testing.T) { + tr := NewStepTracker() + require.Equal(t, uint64(0), tr.Top()) + tr.Begin(0, 1000) + require.Equal(t, uint64(1), tr.Top()) + tr.End(0) + require.Equal(t, uint64(0), tr.Top()) +} + +func TestStepTracker_Nested(t *testing.T) { + tr := NewStepTracker() + tr.Begin(0, 1000) + tr.Begin(1, 1500) + require.Equal(t, uint64(2), tr.Top()) + tr.End(1) + require.Equal(t, uint64(1), tr.Top()) + tr.End(0) + require.Equal(t, uint64(0), tr.Top()) +} + +func TestStepTracker_CompletePhase(t *testing.T) { + tr := NewStepTracker() + tr.PushComplete(0, 1000, 500) + require.Equal(t, uint64(1), tr.LookupAtTs(1200)) + require.Equal(t, uint64(0), tr.LookupAtTs(1500)) + require.Equal(t, uint64(0), tr.LookupAtTs(900)) +} + +func TestStepTracker_FromKinetoEvent(t *testing.T) { + tr := NewStepTracker() + require.True(t, tr.MaybeApply(kineto.Event{ + Name: "ProfilerStep#0", Cat: kineto.CatUserAnnotation, Ph: kineto.PhaseBegin, Ts: 1000, + })) + require.Equal(t, uint64(1), tr.Top()) + require.False(t, tr.MaybeApply(kineto.Event{ + Name: "some_other_annotation", Cat: kineto.CatUserAnnotation, Ph: kineto.PhaseBegin, Ts: 1100, + })) + require.Equal(t, uint64(1), tr.Top()) + require.True(t, tr.MaybeApply(kineto.Event{ + Name: "ProfilerStep#0", Cat: kineto.CatUserAnnotation, Ph: kineto.PhaseEnd, Ts: 2000, + })) + require.Equal(t, uint64(0), tr.Top()) +} + +func TestStepTracker_NonUserAnnotationIgnored(t *testing.T) { + tr := NewStepTracker() + require.False(t, tr.MaybeApply(kineto.Event{ + Name: "ProfilerStep#0", Cat: kineto.CatCPUOp, Ph: kineto.PhaseBegin, Ts: 100, + })) + require.False(t, tr.SeenAny()) +} diff --git a/components/receivers/kineto/stress_2gb_test.go b/components/receivers/kineto/stress_2gb_test.go new file mode 100644 index 00000000..dfdc02d3 --- /dev/null +++ b/components/receivers/kineto/stress_2gb_test.go @@ -0,0 +1,68 @@ +// SPDX-License-Identifier: Apache-2.0 + +//go:build !race + +package kineto + +import ( + "os" + "path/filepath" + "runtime" + "testing" + + "github.com/stretchr/testify/require" + "github.com/tracecoreai/tracecore/pkg/kineto" +) + +// TestStress_2GBHeapCeiling closes the M14 non-functional "2 GB trace, +// peak HeapAlloc <80 MB" rubric. Synthesizes a ~2 GB trace, parses it +// while sampling HeapAlloc every 1000 events, and asserts the peak. +// +// Skipped under -race (race detector inflates allocations >2x) and +// under -short. Wall-time: 5-15 min depending on disk speed. +func TestStress_2GBHeapCeiling(t *testing.T) { + if testing.Short() { + t.Skip("skipping 2GB stress under -short") + } + dir := t.TempDir() + path := filepath.Join(dir, "stress.pt.trace.json") + f, err := os.Create(path) //nolint:gosec // temp file under t.TempDir + require.NoError(t, err) + + // Target ~2 GB: ~140 B/event * 14M events. + const target = 14 * 1024 * 1024 + steps := make([]kineto.StepSpec, 0, target/100) + for i := uint64(0); i < uint64(target/100); i++ { + evs := make([]kineto.Event, 100) + for j := range evs { + evs[j] = kineto.Event{ + Name: "aten::op", Cat: kineto.CatCPUOp, Ph: kineto.PhaseComplete, + Ts: int64(i*1000 + uint64(j)), Dur: 1, Pid: 1, Tid: 1, + } + } + steps = append(steps, kineto.StepSpec{StepID: i, StartTs: int64(i * 1000), EndTs: int64(i*1000 + 99), Events: evs}) + } + require.NoError(t, kineto.Synthesize(kineto.Spec{SchemaVersion: 1.0, Pid: 1, Tid: 1, Steps: steps}, f)) + require.NoError(t, f.Close()) + + in, err := os.Open(path) //nolint:gosec // temp file under t.TempDir + require.NoError(t, err) + defer func() { _ = in.Close() }() + + var peak uint64 + count := 0 + require.NoError(t, kineto.Parse(in, func(kineto.Event) error { + count++ + if count%1000 == 0 { + runtime.GC() + var m runtime.MemStats + runtime.ReadMemStats(&m) + if m.HeapAlloc > peak { + peak = m.HeapAlloc + } + } + return nil + })) + t.Logf("peak HeapAlloc over %d events: %.1f MB", count, float64(peak)/1024/1024) + require.Less(t, peak, uint64(80*1024*1024), "peak HeapAlloc exceeds 80 MB ceiling") +} diff --git a/components/receivers/kineto/testdata/toy_2step.pt.trace.json b/components/receivers/kineto/testdata/toy_2step.pt.trace.json new file mode 100644 index 00000000..979833ec --- /dev/null +++ b/components/receivers/kineto/testdata/toy_2step.pt.trace.json @@ -0,0 +1 @@ +{"baseTimeNanoseconds":0,"displayTimeUnit":"ns","schemaVersion":1,"traceEvents":[{"name":"ProfilerStep#0","cat":"user_annotation","ph":"B","ts":1000,"dur":0,"pid":1234,"tid":1234,"args":{}},{"name":"aten::addmm","cat":"cpu_op","ph":"X","ts":1100,"dur":50,"pid":1234,"tid":1234,"args":{"External id":"1"}},{"name":"ncclAllReduce","cat":"kernel","ph":"X","ts":1500,"dur":200,"pid":1234,"tid":1234,"args":{"External id":"2"}},{"name":"ProfilerStep#0","cat":"user_annotation","ph":"E","ts":2000,"dur":0,"pid":1234,"tid":1234,"args":{}},{"name":"ProfilerStep#1","cat":"user_annotation","ph":"B","ts":2100,"dur":0,"pid":1234,"tid":1234,"args":{}},{"name":"aten::addmm","cat":"cpu_op","ph":"X","ts":2200,"dur":55,"pid":1234,"tid":1234,"args":{"External id":"3"}},{"name":"ncclAllReduce","cat":"kernel","ph":"X","ts":2600,"dur":210,"pid":1234,"tid":1234,"args":{"External id":"4"}},{"name":"ProfilerStep#1","cat":"user_annotation","ph":"E","ts":3100,"dur":0,"pid":1234,"tid":1234,"args":{}}]} diff --git a/components/receivers/kineto/testhelpers_test.go b/components/receivers/kineto/testhelpers_test.go new file mode 100644 index 00000000..f2b9c860 --- /dev/null +++ b/components/receivers/kineto/testhelpers_test.go @@ -0,0 +1,100 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "context" + "io" + "log/slog" + "sync" + "testing" + + "github.com/tracecoreai/tracecore/internal/consumer" + "github.com/tracecoreai/tracecore/internal/pipeline" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +type tracesSink struct { + mu sync.Mutex + all []ptrace.Traces +} + +func newTracesSink() *tracesSink { return &tracesSink{} } + +func (s *tracesSink) ConsumeTraces(_ context.Context, td ptrace.Traces) error { + s.mu.Lock() + defer s.mu.Unlock() + s.all = append(s.all, td) + return nil +} + +func (s *tracesSink) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +func (s *tracesSink) traces() []ptrace.Traces { + s.mu.Lock() + defer s.mu.Unlock() + out := make([]ptrace.Traces, len(s.all)) + copy(out, s.all) + return out +} + +func countSpansByCat(s *tracesSink, cat string) int { + n := 0 + for _, td := range s.traces() { + for i := 0; i < td.ResourceSpans().Len(); i++ { + rs := td.ResourceSpans().At(i) + for j := 0; j < rs.ScopeSpans().Len(); j++ { + ss := rs.ScopeSpans().At(j) + for k := 0; k < ss.Spans().Len(); k++ { + if v, ok := ss.Spans().At(k).Attributes().Get("kineto.category"); ok && v.Str() == cat { + n++ + } + } + } + } + } + return n +} + +func firstSpanByCat(s *tracesSink, cat string) (ptrace.Span, bool) { + for _, td := range s.traces() { + for i := 0; i < td.ResourceSpans().Len(); i++ { + rs := td.ResourceSpans().At(i) + for j := 0; j < rs.ScopeSpans().Len(); j++ { + ss := rs.ScopeSpans().At(j) + for k := 0; k < ss.Spans().Len(); k++ { + sp := ss.Spans().At(k) + if v, ok := sp.Attributes().Get("kineto.category"); ok && v.Str() == cat { + return sp, true + } + } + } + } + } + return ptrace.NewSpan(), false +} + +func newDefaultConfig(t *testing.T) *Config { + t.Helper() + cfg, ok := defaultConfig().(*Config) + if !ok { + t.Fatalf("defaultConfig did not return *Config") + } + return cfg +} + +func testCreateSettings(t *testing.T) pipeline.CreateSettings { + t.Helper() + id, err := pipeline.NewID(pipeline.MustNewType("kineto"), "primary") + if err != nil { + t.Fatalf("NewID: %v", err) + } + return pipeline.CreateSettings{ + ID: id, + Telemetry: pipeline.TelemetrySettings{ + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + }, + } +} diff --git a/docs/rfcs/0012-kineto-receiver-scope.md b/docs/rfcs/0012-kineto-receiver-scope.md new file mode 100644 index 00000000..4e10a53f --- /dev/null +++ b/docs/rfcs/0012-kineto-receiver-scope.md @@ -0,0 +1,353 @@ +# RFC 0012: Kineto profiler receiver scope + +- **Status:** accepted +- **Author(s):** tracecore maintainer team +- **Created:** 2026-05-19 +- **Last updated:** 2026-05-19 + +## Summary + +The Kineto receiver ingests `*.pt.trace.json` Chrome-trace files produced by `torch.profiler` and emits one OTel span per Kineto event in the eight categories tracecore consumes (`cpu_op`, `kernel`, `gpu_memcpy`, `gpu_memset`, `cuda_runtime`, `cuda_driver`, `user_annotation`, `python_function`). Every span carries `gen_ai.training.rank`, `gen_ai.training.world_size`, and a `gen_ai.training.step_id` derived from `ProfilerStep#N` interval bracketing. The parser at `pkg/kineto/` streams a 2 GB trace through an `encoding/json.Decoder` in under 80 MB of process heap; the receiver at `components/receivers/kineto/` watches a directory via `fsnotify`, samples deterministically at 1% per `(rank, step)`, optionally aggregates consecutive identical `cpu_op` events, and is wrapped end-to-end by `internal/safe.Call` so vendor-content panics degrade rather than crash. This RFC satisfies the M14 functional rubric items for step-id attribution, deterministic sampling, fr-trace-inspired aggregation, and structured error taxonomy; the non-functional rubric items (CPU, egress, soak, 2 GB stress) flip to `☑` when PR D lands. + +## Motivation + +M14 in [`MILESTONES.md`](../../MILESTONES.md) commits tracecore to consuming PyTorch profiler dumps as a first-class signal, because Kineto traces are the only available source for per-kernel GPU activity, CPU-side `aten::*` op timing, and `cudaLaunchKernel` correlation. No upstream OpenTelemetry component parses Kineto's Chrome-trace dialect. The closest analogue, the OTel `pprofreceiver`, consumes pprof-format heap and CPU profiles, not the device-trace structure Kineto emits. Without a Kineto receiver, tracecore cannot answer the GPU-side half of the straggler-detection patterns in M18 (slow-kernel, dataloader-stall, NCCL-wait), and the `gen_ai.training.*` namespace tracecore is shepherding upstream (per [`docs/proposals/gen-ai-training-semconv.md`](../proposals/gen-ai-training-semconv.md)) has no GPU-trace surface to test against. + +The cost of not doing it: M18 ships without GPU-attributed root cause, M14 stays at `⧗`, and the upstream `gen_ai.training.*` proposal lacks the cross-signal join evidence (Python stack from M13 plus GPU kernel from M14 plus container stdout from M15 plus NCCL FlightRecorder from M16) that grounds its `step_id` carrier-attribute claim. + +## Proposal + +### File layout + +``` +pkg/kineto/ + decoder.go # Parse(r io.Reader, visit func(Event) error) error + event.go # Event struct (Chrome-trace fields tracecore consumes) + synthesize.go # deterministic synthetic trace generator + errors.go # ErrTraceMalformed, ErrTruncated, ErrLimitExceeded, ErrSchemaUnknown + fuzz_test.go # FuzzParseKinetoTrace + bench_test.go + decoder_test.go + testdata/ + toy_2step.pt.trace.json + toy_2step.golden.json + SHA256SUMS + +components/receivers/kineto/ + factory.go # NewFactory(); compile-time assert var _ pipeline.ReceiverFactory = (*kinetoFactory)(nil) + receiver.go # Start/Shutdown, fsnotify watch loop, safe.Call wrap + config.go # WatchDir, SampleRate, Aggregate, DeleteAfterIngest, StepFilenameRegex + ingest.go # per-file pipeline: parse → step-map → sample → aggregate → emit + step_map.go # ProfilerStep#N interval-map builder + lookup + rank.go # per-traced-process env-var read via /proc//environ + emit.go # Event → ptrace.Span construction (pdata) + selftel.go # selftelemetry.Receiver wiring (canonical Kind + local Kind constants) + errors.go # KindStepIDMissing, KindLimitExceeded receiver-local Kinds + README.md # 7-section format per docs/STYLE-docs.md; stability badge `alpha` + RUNBOOK.md + example_config.yaml + receiver_test.go + ingest_test.go + step_map_test.go + rank_test.go + config_test.go + factory_test.go + bench_test.go + +bench/overhead/ + kineto_bench_test.go +``` + +`pkg/kineto/` is import-safe for any consumer; it depends only on stdlib (`encoding/json`, `errors`, `fmt`, `hash/fnv`, `io`, `regexp`). It does not import `internal/`, `components/`, or `pdata`, keeping the parser fuzz-safe and the import surface tight. The receiver under `components/receivers/kineto/` imports `pkg/kineto/`, `internal/consumer`, `internal/safe`, `internal/selftelemetry`, `go.opentelemetry.io/collector/pdata/ptrace`, and `github.com/fsnotify/fsnotify`. + +### Component contract + +The receiver implements `pipeline.Component` per [`docs/rfcs/0003-pipeline-runtime-and-component-contract.md`](0003-pipeline-runtime-and-component-contract.md). The factory implements `pipeline.ReceiverFactory` and returns `pipeline.ErrSignalNotSupported` from `CreateMetrics` and `CreateLogs`. M14 emits traces only. `NewFactory()` is the codegen-callable constructor; `Factory` is the package-var alias per the RFC-0003 implementation note. `TelemetrySettings` provides the receiver's `MeterProvider` and `LoggerProvider`. + +`components.yaml` adds: + +```yaml +receivers: + - import: github.com/tracecoreai/tracecore/components/receivers/kineto + type: kineto +``` + +The receiver opts out of the default pipeline; operators activate it by listing `kineto:` under `receivers:` in their collector config, matching the `nccl_inspector` convention from M12. + +### Data flow (single-pass step tracker) + +``` +fsnotify Write event on WatchDir, file then closed (= IN_CLOSE_WRITE on Linux) + + glob matches *.pt.trace.json OR *.pt.trace.json.gz + → safe.Call(ctx, ingestTimeout, "kineto.ingest", func(ctx) error { … }) + → os.OpenFile(path, os.O_RDONLY, 0) + → if .gz: wrap reader in gzip.Reader + → pkg/kineto.Parse(reader, visit) + streams Decoder.Token() through outer object; + tokenizes schemaVersion / deviceProperties / displayTimeUnit / + baseTimeNanoseconds without allocation (they precede traceEvents); + on "traceEvents" array, Decoder.More() loop calling + Decoder.Decode(&event) so peak heap is one Event, not the whole array; + trailing keys (distributedInfo, traceName) tokenized after array close. + → single-pass with a step tracker: + visit(event): + if event.Cat == "user_annotation" && ProfilerStepRegex.Match(event.Name): + if event.Ph == "B": stepStack.Push({stepID, startTs: event.Ts}) + if event.Ph == "E": stepStack.PopMatchingName() + if event.Ph == "X": stepStack.Push then schedule pop at event.Ts+dur + if event.Cat ∉ consumedCategories: + bumpUnknownCategoryCounter(event.Cat); continue + stepID := stepStack.Top() // or 0 if no enclosing window + if stepID == 0: emitStepIDMissingOnce() + if !sampleKeep(rank, stepID, event.Ts, sampleRate): continue + if aggregate && event.Cat=="cpu_op" && event.Name==prev.Name && prev.StepID==stepID: + prev.RepeatCount++ + prev.EndTime = max(prev.EndTime, event.Ts + event.Dur) + continue + flushPrev() → emit prev as ptrace.Span + prev = event + → on EOF: flushPrev() + → exporter chain returns success → if DeleteAfterIngest: os.Remove(path) + → ObserveLatency(elapsed); IncEmissions(spanCount); MarkActivity() +``` + +**Why single-pass is correct.** Kineto emits events in monotonic `ts` order within a single dump. The writer at `libkineto/src/output_json.cpp finalizeTrace` (verified May 2026) sorts the entire event vector by timestamp before serializing the `traceEvents` array. Because the `B`-phase event of a `ProfilerStep#N` `record_function` necessarily precedes every event inside that step's window and the `E`-phase event necessarily follows them all, a stack of open ProfilerStep intervals suffices for step-id assignment: push on `B` (or `X` start), pop on `E` (or schedule pop at `X.ts + X.dur`). No rewind, no temp file, no second decompression for `.gz`. Peak heap is bounded by `len(stepStack) * 32 bytes`, typically 1 active step. Total work is O(N) over the event stream. + +**Streaming budget invariant.** Peak `runtime.ReadMemStats.HeapAlloc` MUST stay below 80 MB on a 2 GB synthetic trace. PR D's stress test asserts this with `runtime.GC()` plus `ReadMemStats` deltas sampled every 1000 events. + +**`.gz` handling.** When the file matches `*.pt.trace.json.gz` (operator opted into `tensorboard_trace_handler(use_gzip=True)`), the receiver wraps the `os.File` in `gzip.Reader` before passing to `Parse`. The single-pass design means gzip's streaming-only API is not a problem; the parser never asks the reader to rewind. + +### Chrome-trace to OTel attribute mapping + +Each consumed event becomes one `ptrace.Span`. + +| Kineto field | OTel span field / attribute | +|---|---| +| `ts` (µs since `baseTimeNanoseconds`) | `StartTime` (converted to ns absolute via base) | +| `dur` (µs) | `EndTime = StartTime + dur*1e3` | +| `name` | span `Name` | +| `cat` | attribute `kineto.category` | +| `pid` | attribute `pid` plus resource attr `process.pid` (set per ingest) | +| `tid` | attribute `tid` | +| `args["External id"]` | attribute `kineto.external_id` (verbatim; cross-reference handle) | +| `args.Stream` | attribute `stream.id` | +| `args.device` (or `args.Device`) | attribute `device.id` | +| derived from `ProfilerStep#N` interval map | attribute `gen_ai.training.step_id` | +| derived from `/proc//environ` | resource attrs `gen_ai.training.rank`, `gen_ai.training.world_size`, `gen_ai.training.local_rank`, `gen_ai.training.job.id` | +| (aggregation only) | attribute `repeat.count` when `aggregate: true` collapsed N events | + +Resource attrs are set once per ingest at the `ptrace.ResourceSpans` scope, not repeated per span. This is the OTel-canonical placement and matches the M9 and M10 emit patterns. + +**Fallthrough categories.** Kineto categories tracecore does not consume (`gpu_user_annotation`, `external_correlation`, `cpu_instant_event`, `overhead`, `cuda_sync`, `cuda_event`, `collective_comm`, MTIA/XPU/HPU variants) each increment `tracecore.receiver.kineto.unknown_category{kineto.category="…"}` and the event is dropped. Counter cardinality is bounded by Kineto's own `_activityTypeNames` enum (≤30 values). + +The `gen_ai.training.*` namespace is the tracecore strategic bet per [NORTHSTARS.md §O4](../../NORTHSTARS.md) and the upstream proposal at [`docs/proposals/gen-ai-training-semconv.md`](../proposals/gen-ai-training-semconv.md). If the upstream PR is rejected, a collector-side `attributesprocessor` rename is the mitigation path. No M14 schema change is required to switch namespaces. + +### Step-ID detection + +PyTorch's `torch.profiler` brackets each training step with `record_function("ProfilerStep#{step_num}")`. The constant `PROFILER_STEP_NAME = "ProfilerStep"` is defined at `pytorch/pytorch torch/profiler/profiler.py:44`, and the bracketing call sites are at `torch/profiler/profiler.py:1165-1169` and `torch/profiler/profiler.py:1230-1233` (verified May 2026). These appear in the Chrome trace as `cat="user_annotation"` slices with `name` matching `^ProfilerStep#(\d+)$` and standard Chrome-trace `B`/`E` phase markers, or the `X` complete-event form depending on torch version. + +The step-map builder: + +1. Stream-iterates `cat=user_annotation` events matching the ProfilerStep regex. +2. For `ph=X` (complete events): record `{step_id, ts_start: ts, ts_end: ts+dur}`. +3. For `ph=B`/`ph=E` (begin/end pairs): match on `(pid, tid, name)`; on `E`, record interval `{step_id, ts_start: B.ts, ts_end: E.ts}`. +4. Sort intervals by `ts_start`. Kineto emits in `ts` order, so this is typically a verification pass. +5. Reject overlapping intervals. ProfilerStep windows do not overlap under torch's serial-step semantics. Overlap returns `ErrTraceMalformed` (mapped to `KindParse`) and aborts the file. + +Lookup is binary search on `ts_start`: if `interval.ts_start ≤ event.ts < interval.ts_end`, return `interval.step_id`; otherwise "no enclosing window." + +**Correction to the M14 rubric.** The original rubric at MILESTONES.md line 459 named `args.Iteration` as the step-id source. `args.Iteration` is not a documented Kineto field and does not appear in the `libkineto/src/output_json.cpp` event writer. The correct source is the `ProfilerStep#N` `user_annotation` event, as written by `torch.profiler` at `profiler.py:1165-1169` (verified May 2026). The companion MILESTONES.md edit lands in the same PR as this RFC. + +**Filename fallback.** The default `tensorboard_trace_handler` filename is `{worker_name}.{time.time_ns()}.pt.trace.json` (per `torch/profiler/profiler.py:651`, verified May 2026), with no step number. The filename is not the primary step source. `config.StepFilenameRegex` (empty by default) is a fallback for custom handlers that embed a single step number in the filename. Activation rule: the regex is consulted only when the parsed file contained zero `ProfilerStep#N` events, never as an override of detected ProfilerStep windows. When activated, its first capture group becomes the `step_id` for every event in that file. + +**No-step-id behavior.** If a file contains no `ProfilerStep#N` events (for example a profile captured outside `torch.profiler.schedule()`), the receiver emits spans without `gen_ai.training.step_id`, bumps `IncError(KindStepIDMissing)` exactly once per file, and logs a single structured warning. Downstream consumers (M18 stragglers) treat this as "step-unaware" and exclude it from step-based joins. This is not a hard failure. + +### Sampling + +Per-`(rank, step)` deterministic sampling: + +```go +func sampleKeep(rank uint32, stepID uint64, eventTs int64, sampleRate float64) bool { + var key uint64 + if stepID != 0 { + key = uint64(rank)<<32 | stepID + } else { + // fallback for events outside any ProfilerStep window (warmup, shutdown) + // bucket eventTs to 1ms granularity so adjacent events stay together + key = uint64(rank)<<32 | uint64(eventTs/1000) + } + h := fnv.New64a() + binary.Write(h, binary.LittleEndian, key) + return h.Sum64() % 10000 < uint64(sampleRate * 10000) +} +``` + +Default `SampleRate: 0.01` (1%). The 10000 modulus allows configuration down to 0.01% granularity for very high-volume traces; the original `mod 100` wording in the M14 rubric was indicative, not literal. + +**Rubric assertion** (`sampling_determinism_test.go`): 1000 simulated steps across 8 simulated ranks. Assert that re-evaluating the same `(rank, step)` key yields the same decision and that the kept-step set across 1000 steps for rank=0 has count in `[9, 11]` (1% of 1000 plus fnv collision slack). The original rubric phrasing "identical sampling sets across 8 simulated ranks" is impossible by construction, because the *rank* is part of the hash key. RFC-0012 amends the rubric to: "deterministic per (rank, step): repeated evaluation of the same key yields the same decision; per-rank step-keep count within 1% plus fnv slack on a 1000-step run." + +### Aggregation toggle + +When `Aggregate: true`, consecutive events in the parsed stream where all of: + +- `event.Cat == "cpu_op"`, +- `event.Name == prev.Name`, +- `step_map.Lookup(event.Ts) == step_map.Lookup(prev.Ts)` (same enclosing ProfilerStep), + +collapse into the preceding span. `repeat.count` increments, `dur` accumulates, `EndTime` advances. Step boundary or category change or name change flushes the accumulator. + +The M14 rubric calls this "fr_trace-inspired." The analogy is purely semantic. Upstream `tools/flight_recorder/fr_trace.py` (PyTorch 2.5+) aggregates NCCL collectives across ranks, not repeated `cpu_op` events within a rank's profile. This RFC explicitly disclaims that the aggregation toggle is a port of upstream `fr_trace.py` behavior; it is a tracecore-defined emission-volume reduction motivated by [NORTHSTARS.md §O2](../../NORTHSTARS.md) per-receiver budgets. + +### Error taxonomy and self-telemetry + +In `pkg/kineto/errors.go` (matchable via `errors.Is`): + +- `ErrTraceMalformed`: JSON structure invalid (missing `traceEvents`, malformed event, overlapping ProfilerStep windows). Maps to `selftelemetry.KindParse`. +- `ErrTruncated`: EOF mid-event or mid-array. Maps to `KindParse`. +- `ErrSchemaUnknown`: `schemaVersion` outside the tested set (1.0; 2.0 when verified). The receiver continues with best-effort parsing and bumps a `kineto.schema_drift` counter for operator visibility, per [NORTHSTARS.md §O6](../../NORTHSTARS.md) ecosystem-change handling. +- `ErrLimitExceeded`: `MaxBytes` / `MaxEvents` / `MaxStringBytes` cap hit. Maps to receiver-local `KindLimitExceeded`. + +Receiver-local `Kind` constants in `selftel.go` (extending the canonical set per the `dcgm` and `kernelevents` precedent): + +```go +const ( + KindStepIDMissing selftelemetry.Kind = "step_id_missing" + KindLimitExceeded selftelemetry.Kind = "limit_exceeded" +) +``` + +Unknown Kineto categories do not bump `IncError`. They bump the dedicated `tracecore.kineto.unknown_category{kineto.category="…"}` counter (per the M14 rubric), a separate per-receiver custom metric registered via the `MeterProvider` injected through `TelemetrySettings`. Keeping fallthrough on its own counter preserves the `kineto.category` label cardinality operators need to spot a new category requiring consumption, without polluting the canonical receiver-errors signal. + +Every error path calls `selftel.IncError(kind)` exactly once per file (not per event, to keep cardinality bounded under bad-input floods). The signal surfaces on the canonical `tracecore_receiver_errors_total{component="kineto",kind=…}` counter per the RFC-0006 self-telemetry surface; there is no kineto-specific `parse_failures` counter. + +### Resource bounds + +Pinned in `pkg/kineto/decoder.go` package-level constants: + +| Bound | Default | Derivation | +|---|---|---| +| `MaxBytes` | `1<<32` (4 GiB) | 2× the upstream-observed 2.2 GB per-epoch ceiling per [pytorch#130622](https://github.com/pytorch/pytorch/issues/130622) | +| `MaxEvents` | `1<<24` (~16.7M) | 2.2 GB / ~140 bytes/event (median Kineto event size); see arithmetic note below | +| `MaxStringBytes` | `1<<20` (1 MiB) | Per-field cap; matches the M11 precedent; defends against malformed `args.Stream` strings | +| `MaxDepth` | `64` | Chrome-trace structure is flat; no nesting beyond `args.{}` | + +Arithmetic for `MaxEvents`: `2.2e9 bytes / 140 bytes/event ≈ 15.7e6 events`, rounded up to the nearest power of two (`1<<24 = 16.78e6`). Exceed any bound triggers `ErrLimitExceeded`, aborts the file, bumps `IncError(KindLimitExceeded)`, and continues to the next file. + +### `safe.Call` boundary + +The per-file ingest goroutine is the safe boundary: + +```go +err := safe.Call(ctx, r.cfg.IngestTimeout, "kineto.ingest", func(ctx context.Context) error { + return r.ingestFile(ctx, path) +}) +``` + +A panic inside `ingestFile` (including inside `pkg/kineto.Parse`) converts to `IncError(KindPanic) + log + continue`. The receiver never crashes the daemon. Timeout default is 30s per file (configurable). + +### Degraded mode + +`SetDegraded(true)` flips under either of: + +- `WatchDir` missing or unreadable for ≥3 consecutive watch attempts (single startup-warning log plus degraded). +- Parse failure rate above 25% over the last 100 files (operator-tunable threshold via `degraded_failure_ratio:`). + +Recovery: `SetDegraded(false)` on the next successful ingest. Per the `selftelemetry.Receiver` contract, `SetDegraded(true)` MUST be called explicitly during `Start` if the receiver boots already-degraded (for example WatchDir does not exist), so `degraded_seconds_total` begins ticking immediately. + +### Rank discovery + +Read `RANK`, `WORLD_SIZE`, `LOCAL_RANK`, `TORCHELASTIC_RUN_ID` from `/proc//environ`, where `pid` is derived from: + +1. The trace's `traceEvents[].pid` field, which is consistent across events in a single dump per Kineto's `output_json.cpp` writer. +2. If `traceEvents` is empty before any event is seen, the filename prefix `{worker_name}.{ts_ns}.pt.trace.json`. `worker_name` defaults to `{hostname}_{pid}` per `pytorch/pytorch torch/profiler/profiler.py:651` (verified May 2026). Regex `^[^.]+_(\d+)\.` captures the pid. +3. If both fail, fall back to the receiver process's own env vars (`os.Getenv("RANK")` and so on). This is useful when the receiver runs in the same pod as torch (sidecar) but not when the receiver runs cluster-wide. + +**Why per-traced-process, not PodSpec downward API.** [M15 research §3](../research/m15-container-stdout.md) documents that Kubeflow PyTorchJob v1 sets PodSpec-level `RANK` offset by +1 (the Master replica is rank 0, workers are offset). Reading PodSpec env via the downward API, or `os.Getenv` from the receiver process, gives the *wrong* rank for the traced worker under that orchestrator. The `/proc//environ` path reads the env vars `torchrun` (or PyTorchJob's launcher) actually exported to the traced Python process, which is the source of truth. + +Failure handling: if `/proc//environ` is unreadable (process exited, cross-namespace PID, EPERM under restricted Linux), the receiver emits spans without rank resource attrs and bumps `IncError(KindEnumerate)` once per file. Downstream consumers (M18) skip step-based joins for those spans. + +Job-id discovery follows the [`gen-ai-training-semconv.md`](../proposals/gen-ai-training-semconv.md) §2 source-priority chain: Kueue, `SLURM_JOB_ID`, Ray `runtime_context().job_id`, `TORCHELASTIC_RUN_ID`, SageMaker `TRAINING_JOB_NAME`. M14 reads only the env-var-sourced entries (`SLURM_JOB_ID`, `TORCHELASTIC_RUN_ID`, `TRAINING_JOB_NAME`). Kueue and Ray sources are M16 / M19 territory. + +### Watch trigger + +`github.com/fsnotify/fsnotify v1.5.4` (already indirect in `go.sum`). Configuration: + +- `WatchDir: string` (required). Single directory to watch. +- `WatchSubdirs: bool` (default `false`). When `true`, the receiver recursively walks `WatchDir` at Start via `filepath.WalkDir`, registers fsnotify watches on every subdir, and adds watches dynamically on new subdir `Create` events. Recursive watching is not a built-in fsnotify feature ([fsnotify/fsnotify#18](https://github.com/fsnotify/fsnotify/issues/18) tracks the open request); the receiver implements it explicitly. Bounded depth at `MaxWatchDepth: int` (default `8`) prevents watch-table exhaustion. + +Glob accepts `*.pt.trace.json` and `*.pt.trace.json.gz`. + +Trigger fires on `fsnotify.Write` events that arrive after the writer closes the file, which is the `IN_CLOSE_WRITE` semantics on Linux. PyTorch's `tensorboard_trace_handler` writes the trace file directly to the final path via `prof.export_chrome_trace(os.path.join(dir_name, file_name))` (per `pytorch/pytorch torch/profiler/profiler.py:655`, verified May 2026). There is no tmp-plus-rename pattern. `IN_CLOSE_WRITE` is the only safe trigger: + +- `Create` fires on the initial open, but the file is empty or partial then; the parse would return `ErrTruncated`. +- `Rename` / `MOVED_TO` never fires, because torch does not use tmp-plus-rename. +- `Write` events fire repeatedly during streaming serialization; each would cause a spurious truncated read. + +fsnotify v1.5.4 surfaces `IN_CLOSE_WRITE` as `fsnotify.Op&fsnotify.Write != 0` *after* the writer closes; the receiver verifies this by attempting to read the file's full content and confirming the JSON closes properly. If parsing fails with `ErrTruncated`, the file is re-queued for one retry after 500 ms; a second failure bumps `IncError(KindParse)` and abandons the file. + +This trigger choice diverges from M15's poll-based filelog approach. M15 deals with kubelet's rotation of long-lived container log files; M14 deals with one-shot dump files. `fsnotify` is correct for the one-shot case. + +## Alternatives considered + +The accepted option is the design as specified above (single-pass streaming parse, `IN_CLOSE_WRITE` trigger, `/proc//environ` rank discovery, stdlib `encoding/json`). The four rejected alternatives: + +- **Two-pass parse (first pass builds the step map, second pass emits spans).** Rejected because `gzip.Reader` is forward-only and cannot rewind; a two-pass parse over a `.gz` file would require either temp-file materialization (defeats the 80 MB heap budget on a 2 GB trace) or a second decompression (doubles CPU). Single-pass is correct because Kineto emits events in monotonic `ts` order per `libkineto/src/output_json.cpp finalizeTrace` (verified May 2026), so a stack of open `ProfilerStep#N` intervals suffices for step-id assignment without lookahead. + +- **fsnotify `Create` trigger.** Rejected because `Create` fires on the writer's initial `open(O_CREAT)` call, before any data has been written. Parsing at `Create` time returns `ErrTruncated` essentially always. `IN_CLOSE_WRITE` is the only event that guarantees the file is complete. + +- **Downward-API PodSpec `RANK` env var (instead of `/proc//environ`).** Rejected per [M15 research §3](../research/m15-container-stdout.md): Kubeflow PyTorchJob v1 sets PodSpec-level `RANK` offset by +1 (Master is rank 0, workers offset), so reading PodSpec env gives the wrong rank for the traced worker. `/proc//environ` reads what `torchrun` actually exported. + +- **`valyala/fastjson`** (or any third-party JSON parser). Rejected as an unnecessary dependency. The stdlib `encoding/json.Decoder` streaming API meets the 80 MB heap budget on 2 GB traces; PR D's stress test verifies the bound. Adding a third-party JSON dependency would broaden the supply-chain surface tracecore tracks under NORTHSTARS O3 without measurable benefit. + +## Risks + +Listed in load-bearing order per [`docs/STYLE-docs.md` §3](../STYLE-docs.md). + +1. **Kineto-side `schemaVersion` drift.** `schemaVersion` has been `1.0` across every torch version in the tested range, but the field is owned by the libkineto writer and could change without coordination. Mitigation: `ErrSchemaUnknown` is a soft error; the receiver continues with best-effort parsing and bumps the `kineto.schema_drift` counter so operators see drift before silent data loss. The 30-day ecosystem-change SLA in [NORTHSTARS.md §O6](../../NORTHSTARS.md) gates the response to a confirmed bump. + +2. **Upstream rejection of the `gen_ai.training.*` namespace.** The proposal at [`docs/proposals/gen-ai-training-semconv.md`](../proposals/gen-ai-training-semconv.md) is upstream-pending. If OpenTelemetry rejects the namespace, M14's emitted attributes diverge from upstream semconv. Mitigation: a collector-side `attributesprocessor` rename closes the gap with no receiver code change required, because the namespace is a string prefix the processor can rewrite. + +3. **`/proc//environ` EPERM under restricted-tier Linux.** When the receiver pod is not in the same PID namespace as the traced process, or runs under a security profile that blocks `/proc//environ` reads, rank discovery falls through to filename-prefix and then to the receiver's own env vars. The fallback can produce incorrect ranks if the receiver is cluster-wide. Mitigation: the README documents the `hostPID: true` versus shared-PID-namespace trade-off; `IncError(KindEnumerate)` makes the failure operator-visible; M18 excludes step-based joins for spans lacking rank. + +4. **fsnotify watch-table exhaustion on deep subdir trees.** Linux's `fs.inotify.max_user_watches` defaults to 65536 on most distributions but can be lower on restricted kernels. A `WatchSubdirs: true` configuration against a deep, wide subdir tree could exhaust the table. Mitigation: `MaxWatchDepth` (default `8`) bounds the recursive walk; the receiver logs a single warning when the bound is hit; operators tune the kernel limit or the `MaxWatchDepth` value per their layout. + +## Open questions + +1. **`schemaVersion` policy on unknown versions.** Kineto's `schemaVersion` has been `1.0` for the tested torch range. The receiver tolerates unknown versions with a `schema_drift` counter; should it also refuse versions outside `{1.0, 2.0-when-verified}` once 2.0 lands, per the [NORTHSTARS.md §O6](../../NORTHSTARS.md) 30-day ecosystem-change SLA? Recommendation: tolerate plus counter, refuse only on parse failure. + +2. **`hostPID: true` versus shared-PID-namespace recommendation.** Both work for `/proc//environ` reads. `hostPID: true` is simpler operationally but broadens the pod attack surface. The M5b chart should expose a `kineto.pidStrategy: host|shared|none` value with sensible defaults. Recommendation: default `none` (most restrictive; trades observability for safety), with `hostPID: true` documented as the high-fidelity opt-in. + +3. **`use_gzip=True` stream-decompress versus buffer-decompress.** Some users gzip traces via `tensorboard_trace_handler(use_gzip=True)`. The receiver decompresses transparently. Open: should the parser stream-decompress (lower peak RSS, slightly higher CPU) or buffer-decompress (higher RSS, simpler code)? Recommendation: stream-decompress via `gzip.Reader` wrapping the file reader; peak RSS is already gated by the 80 MB budget on uncompressed input. + +## Migration / rollout + +This is a new receiver. There is no prior surface to migrate from. + +PR sequencing (each PR independently passes `make ci`): + +| PR | Branch | Lands | Status flip | +|---|---|---|---| +| **A** | `rfc-0012-kineto-receiver-scope` | This RFC at `Status: accepted`; MILESTONES.md M14 rubric edits per the spec | none | +| **B** | `pkg-kineto-parser` | `pkg/kineto/` streaming decoder, `Event` struct, `synthesize.go`, checked-in 2-step toy-model fixture, golden file, `FuzzParseKinetoTrace`, benchmark scaffold. No receiver. | none | +| **C** | `m14-kineto-receiver` | `components/receivers/kineto/` (factory, receiver, config, README, RUNBOOK), factory wired in `cmd/tracecore/components.go` plus `components.yaml`, watch loop, sampling, aggregation, `delete_after_ingest`, `safe.Call` wrap, `selftelemetry.Receiver` parity, structured error taxonomy. | `⧗ → ☑ alpha` | +| **D** | `m14-kineto-bench` | `bench/overhead/kineto_bench_test.go` (CPU + egress ceilings), `//go:build soak` 100-file RSS soak, 2 GB synthetic stress with `runtime.ReadMemStats` ceiling. Closes the unverified non-functional rubric. | stamps `☑` on overhead rubrics | + +PR C flips the M14 top-line to `☑ alpha`, matching the M11 precedent (alpha = receiver landed + functional rubrics green). PR D's `benchstat` p<0.05 gates then flip the corresponding non-functional rubric checkboxes from `☐` to `☑`; until PR D lands, the affected non-functional rubrics carry `⧗` in MILESTONES.md. + +## References + +- [`MILESTONES.md`](../../MILESTONES.md) §M14. Functional and non-functional rubrics this RFC is scoped to satisfy. +- [`docs/rfcs/0003-pipeline-runtime-and-component-contract.md`](0003-pipeline-runtime-and-component-contract.md). `Component`, `ReceiverFactory`, and `TelemetrySettings` contract; the implementation-notes deviation list (`internal/consumer`, `internal/safe`, `NewFactory()` convention). +- [`docs/rfcs/0009-pyspy-receiver-scope.md`](0009-pyspy-receiver-scope.md). M13 sibling receiver. Same `gen_ai.training.rank` join key; precedent for shipping an RFC pre-implementation. +- [`docs/proposals/gen-ai-training-semconv.md`](../proposals/gen-ai-training-semconv.md). Upstream namespace bet that grounds M14's attribute choices. +- [`docs/research/m15-container-stdout.md`](../research/m15-container-stdout.md). Env-var sources per orchestrator; Kubeflow PyTorchJob v1 PodSpec-`RANK` offset finding that drives the `/proc//environ` choice. +- [`PRINCIPLES.md`](../../PRINCIPLES.md) §1 (trust under load), §9 (failure modes are part of the API), §12 (reproducible everything), §15 (decide late + RFC for schema). +- [`NORTHSTARS.md`](../../NORTHSTARS.md) §O2 (per-receiver budget), §O4 (`gen_ai.training.*` shepherding), §O6 (ecosystem-change 30-day SLA). +- `internal/selftelemetry/interface.go`. The `Receiver` surface (`IncError`, `IncEmissions`, `ObserveLatency`, `SetDegraded`, `MarkActivity`) and canonical `Kind*` constants. +- PyTorch sources (verified May 2026): + - `pytorch/pytorch torch/profiler/profiler.py:44`: `PROFILER_STEP_NAME = "ProfilerStep"`. + - `pytorch/pytorch torch/profiler/profiler.py:651`: default filename `{worker_name}.{time.time_ns()}.pt.trace.json`. + - `pytorch/pytorch torch/profiler/profiler.py:655`: direct-write `export_chrome_trace` (no tmp+rename). + - `pytorch/pytorch torch/profiler/profiler.py:1165-1169, 1230-1233`: `record_function("ProfilerStep#N")` bracketing call sites. + - `pytorch/kineto libkineto/src/output_json.cpp:341-352, 543, 557, 580, 593`: top-level key order and `traceEvents` array written between `handleTraceStart` and `finalizeTrace` (ts-sorted). +- [pytorch/pytorch#130622](https://github.com/pytorch/pytorch/issues/130622). 2.2 GB per-epoch trace observation that grounds the `MaxBytes` derivation. diff --git a/pkg/kineto/bench_test.go b/pkg/kineto/bench_test.go new file mode 100644 index 00000000..4bf97483 --- /dev/null +++ b/pkg/kineto/bench_test.go @@ -0,0 +1,38 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" +) + +// BenchmarkParse_50MB measures parser throughput on a synthesized +// 50 MB trace. Advisory at M14 PR B; promoted to a CI gate in PR D +// once budget targets land. +func BenchmarkParse_50MB(b *testing.B) { + const eventBytesEstimate = 140 // bytes-per-event median per pytorch#130622 + const targetEvents = 50 * 1024 * 1024 / eventBytesEstimate + steps := make([]StepSpec, 0, targetEvents/10) + for i := uint64(0); i < uint64(targetEvents/10); i++ { + stepEvents := make([]Event, 10) + for j := range stepEvents { + stepEvents[j] = Event{ + Name: "aten::addmm", Cat: CatCPUOp, Ph: PhaseComplete, + Ts: int64(i*1000 + uint64(j)*10), Dur: 5, Pid: 1234, Tid: 1234, + } + } + steps = append(steps, StepSpec{StepID: i, StartTs: int64(i * 1000), EndTs: int64(i*1000 + 999), Events: stepEvents}) + } + spec := Spec{SchemaVersion: 1.0, Pid: 1234, Tid: 1234, Steps: steps} + var buf bytes.Buffer + require.NoError(b, Synthesize(spec, &buf)) + data := buf.Bytes() + b.SetBytes(int64(len(data))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = Parse(bytes.NewReader(data), func(Event) error { return nil }) + } +} diff --git a/pkg/kineto/decoder.go b/pkg/kineto/decoder.go new file mode 100644 index 00000000..69206eef --- /dev/null +++ b/pkg/kineto/decoder.go @@ -0,0 +1,125 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "encoding/json" + "fmt" + "io" +) + +// Resource bounds. See RFC-0012 §Resource bounds. +// +// MaxBytes is 2x the upstream-observed 2.2 GB per-epoch ceiling (per +// pytorch#130622). MaxEvents derives from 2.2 GB / ~140 bytes-per-event +// median Kineto event size. MaxStringBytes caps any single string field +// (e.g. args.Stream) at 1 MiB to defend against malformed inputs. +// MaxDepth bounds nesting: Chrome-trace structure is flat with one +// level of args.{}, so 64 is generous. +const ( + MaxBytes int64 = 1 << 32 // 4 GiB + MaxEvents = 1 << 24 // ~16.7M + MaxStringBytes = 1 << 20 // 1 MiB + MaxDepth = 64 +) + +// Parse streams a Chrome-trace JSON Object Format file from r, invoking +// visit for every event in the "traceEvents" array. visit returning a +// non-nil error aborts the parse and that error is returned verbatim. +// +// Parse hands every event to visit regardless of category, schemaVersion, +// or phase; the caller decides what to filter, count, or drop. +// +// Peak memory is approximately one Event plus decoder buffers, +// regardless of file size. Resource bounds (see package consts) guard +// against malformed inputs. +func Parse(r io.Reader, visit func(Event) error) error { + dec := json.NewDecoder(io.LimitReader(r, MaxBytes)) + + tok, err := dec.Token() + if err != nil { + return fmt.Errorf("%w: read outer token: %w", ErrTruncated, err) + } + if delim, ok := tok.(json.Delim); !ok || delim != '{' { + return fmt.Errorf("%w: outer object expected", ErrTraceMalformed) + } + + for dec.More() { + keyTok, err := dec.Token() + if err != nil { + return fmt.Errorf("%w: read key: %w", ErrTruncated, err) + } + key, _ := keyTok.(string) + if key != "traceEvents" { + if err := skipValue(dec); err != nil { + return err + } + continue + } + if err := parseTraceEvents(dec, visit); err != nil { + return err + } + } + return nil +} + +// parseTraceEvents handles the "traceEvents":[ ... ] array body, +// extracted to keep Parse under the gocyclo limit. +func parseTraceEvents(dec *json.Decoder, visit func(Event) error) error { + openTok, err := dec.Token() + if err != nil { + return fmt.Errorf("%w: traceEvents open bracket: %w", ErrTruncated, err) + } + if delim, ok := openTok.(json.Delim); !ok || delim != '[' { + return fmt.Errorf("%w: traceEvents must be array", ErrTraceMalformed) + } + count := 0 + for dec.More() { + if count >= MaxEvents { + return fmt.Errorf("%w: traceEvents exceeded MaxEvents=%d", ErrLimitExceeded, MaxEvents) + } + var ev Event + if err := dec.Decode(&ev); err != nil { + return fmt.Errorf("%w: event %d: %w", ErrTraceMalformed, count, err) + } + if err := visit(ev); err != nil { + return err + } + count++ + } + if _, err := dec.Token(); err != nil { + return fmt.Errorf("%w: traceEvents close: %w", ErrTruncated, err) + } + return nil +} + +// skipValue consumes one JSON value from dec without buffering its +// content. Handles nested arrays/objects by depth-tracking the Token stream. +func skipValue(dec *json.Decoder) error { + tok, err := dec.Token() + if err != nil { + return fmt.Errorf("%w: skip token: %w", ErrTruncated, err) + } + if _, ok := tok.(json.Delim); !ok { + return nil + } + depth := 1 + for depth > 0 { + if depth > MaxDepth { + return fmt.Errorf("%w: nesting depth %d", ErrLimitExceeded, depth) + } + t, err := dec.Token() + if err != nil { + return fmt.Errorf("%w: skip body: %w", ErrTruncated, err) + } + if d, ok := t.(json.Delim); ok { + switch d { + case '{', '[': + depth++ + case '}', ']': + depth-- + } + } + } + return nil +} diff --git a/pkg/kineto/decoder_test.go b/pkg/kineto/decoder_test.go new file mode 100644 index 00000000..64c76fb2 --- /dev/null +++ b/pkg/kineto/decoder_test.go @@ -0,0 +1,143 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "encoding/json" + "errors" + "os" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestParse_EmptyTraceEvents(t *testing.T) { + input := `{"schemaVersion":1.0,"traceEvents":[]}` + var events []Event + err := Parse(strings.NewReader(input), func(e Event) error { + events = append(events, e) + return nil + }) + require.NoError(t, err) + require.Empty(t, events) +} + +func TestParse_SingleCPUOp(t *testing.T) { + input := `{ + "schemaVersion": 1.0, + "traceEvents": [ + {"name":"aten::addmm","cat":"cpu_op","ph":"X","ts":1000,"dur":42,"pid":1234,"tid":1234,"args":{"External id":"42","Stream":0,"Device":0}} + ] + }` + var got []Event + err := Parse(strings.NewReader(input), func(e Event) error { + got = append(got, e) + return nil + }) + require.NoError(t, err) + require.Len(t, got, 1) + require.Equal(t, Event{ + Name: "aten::addmm", + Cat: CatCPUOp, + Ph: PhaseComplete, + Ts: 1000, + Dur: 42, + Pid: 1234, + Tid: 1234, + Args: Args{ExternalID: "42", Stream: 0, Device: 0}, + }, got[0]) +} + +func TestParse_TruncatedAtOpen(t *testing.T) { + err := Parse(strings.NewReader(""), func(Event) error { return nil }) + require.ErrorIs(t, err, ErrTruncated) +} + +func TestParse_NonObjectOuter(t *testing.T) { + err := Parse(strings.NewReader(`[1,2,3]`), func(Event) error { return nil }) + require.ErrorIs(t, err, ErrTraceMalformed) +} + +func TestParse_TraceEventsNotArray(t *testing.T) { + err := Parse(strings.NewReader(`{"traceEvents": {"x":1}}`), func(Event) error { return nil }) + require.ErrorIs(t, err, ErrTraceMalformed) +} + +func TestParse_TruncatedMidEvent(t *testing.T) { + err := Parse(strings.NewReader(`{"traceEvents":[{"name":"x"`), func(Event) error { return nil }) + require.Error(t, err) + require.True(t, + errors.Is(err, ErrTruncated) || errors.Is(err, ErrTraceMalformed), + "want ErrTruncated or ErrTraceMalformed, got %v", err) +} + +func TestParse_VisitErrorPropagates(t *testing.T) { + sentinel := errors.New("downstream consumer rejected") + err := Parse(strings.NewReader(`{"traceEvents":[{"name":"a","cat":"cpu_op","ph":"X","ts":1,"dur":1,"pid":1,"tid":1}]}`), + func(Event) error { return sentinel }) + require.ErrorIs(t, err, sentinel) +} + +func TestParse_MaxEventsCap(t *testing.T) { + if testing.Short() { + t.Skip("skipping MaxEvents stress under -short") + } + var sb strings.Builder + sb.WriteString(`{"traceEvents":[`) + for i := 0; i < MaxEvents+1; i++ { + if i > 0 { + sb.WriteString(",") + } + sb.WriteString(`{"name":"x","cat":"cpu_op","ph":"X","ts":0,"dur":0,"pid":0,"tid":0}`) + } + sb.WriteString(`]}`) + err := Parse(strings.NewReader(sb.String()), func(Event) error { return nil }) + require.ErrorIs(t, err, ErrLimitExceeded) +} + +func TestParse_SkipsNonTraceEventsKeys(t *testing.T) { + // Kineto emits schemaVersion / deviceProperties / displayTimeUnit / + // baseTimeNanoseconds before traceEvents. The parser must skip them + // without allocating their content. + input := `{ + "schemaVersion": 1.0, + "deviceProperties": [{"id": 0, "name": "stub"}], + "displayTimeUnit": "ns", + "baseTimeNanoseconds": 0, + "traceEvents": [ + {"name":"a","cat":"cpu_op","ph":"X","ts":1,"dur":1,"pid":1,"tid":1} + ], + "traceName": "stub" + }` + var got []Event + err := Parse(strings.NewReader(input), func(e Event) error { + got = append(got, e) + return nil + }) + require.NoError(t, err) + require.Len(t, got, 1) + require.Equal(t, "a", got[0].Name) +} + +func TestParse_GoldenFixture(t *testing.T) { + f, err := os.Open("testdata/toy_2step.pt.trace.json") + if errors.Is(err, os.ErrNotExist) { + t.Skip("fixture not generated yet; run `make generate-fixtures`") + } + require.NoError(t, err) + defer func() { _ = f.Close() }() + + var got []Event + require.NoError(t, Parse(f, func(e Event) error { + got = append(got, e) + return nil + })) + + want, err := os.ReadFile("testdata/toy_2step.golden.json") + require.NoError(t, err) + var wantEvents []Event + require.NoError(t, json.Unmarshal(want, &wantEvents)) + + require.Equal(t, wantEvents, got) +} diff --git a/pkg/kineto/errors.go b/pkg/kineto/errors.go new file mode 100644 index 00000000..ca1d397b --- /dev/null +++ b/pkg/kineto/errors.go @@ -0,0 +1,17 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import "errors" + +// Sentinel errors returned by Parse. Match via errors.Is. +// +// ErrCategoryUnknown is intentionally NOT a Parse-level error: Parse +// hands every event to visit() regardless of category. The receiver +// maps unknown categories to the kineto.unknown_category counter. +var ( + ErrTraceMalformed = errors.New("kineto: trace malformed") + ErrTruncated = errors.New("kineto: trace truncated") + ErrLimitExceeded = errors.New("kineto: resource limit exceeded") + ErrSchemaUnknown = errors.New("kineto: schemaVersion outside tested set") +) diff --git a/pkg/kineto/event.go b/pkg/kineto/event.go new file mode 100644 index 00000000..3e1c793d --- /dev/null +++ b/pkg/kineto/event.go @@ -0,0 +1,65 @@ +// SPDX-License-Identifier: Apache-2.0 + +// Package kineto parses PyTorch profiler (Kineto) Chrome-trace JSON dumps +// (*.pt.trace.json) into typed events for downstream OTel emission. +// +// The parser is stdlib-only and import-safe: it does not depend on +// internal/, components/, or pdata. Consumers in the receiver layer +// wrap Event values as ptrace.Spans. +// +// See docs/rfcs/0012-kineto-receiver-scope.md for the design contract. +package kineto + +// Phase values from the Chrome-trace JSON Object Format. +// See https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU +const ( + PhaseBegin = "B" + PhaseEnd = "E" + PhaseComplete = "X" + PhaseMetadata = "M" + PhaseInstant = "i" +) + +// Category values tracecore consumes (8 categories per M14 rubric). +const ( + CatCPUOp = "cpu_op" + CatKernel = "kernel" + CatGPUMemcpy = "gpu_memcpy" + CatGPUMemset = "gpu_memset" + CatCUDARuntime = "cuda_runtime" + CatCUDADriver = "cuda_driver" + CatUserAnnotation = "user_annotation" + CatPythonFunction = "python_function" +) + +// ConsumedCategories is the set this package's caller will emit as spans. +// Membership is checked by the receiver's ingest pipeline, not by Parse: +// Parse hands every event to visit() regardless of category so the caller +// can keep the unknown_category fallthrough counter. +var ConsumedCategories = map[string]struct{}{ + CatCPUOp: {}, CatKernel: {}, CatGPUMemcpy: {}, CatGPUMemset: {}, + CatCUDARuntime: {}, CatCUDADriver: {}, CatUserAnnotation: {}, + CatPythonFunction: {}, +} + +// Event is the tracecore-consumed subset of a Chrome-trace event. +// Unknown args fields are not preserved; tracecore consumes a fixed +// subset and the synthesizer round-trips this same subset. +type Event struct { + Name string `json:"name"` + Cat string `json:"cat"` + Ph string `json:"ph"` + Ts int64 `json:"ts"` // microseconds since baseTimeNanoseconds + Dur int64 `json:"dur"` // microseconds; 0 for non-complete (B/E/i) phases + Pid int32 `json:"pid"` + Tid int32 `json:"tid"` + Args Args `json:"args,omitempty"` +} + +// Args is the subset of args fields tracecore reads. JSON keys with spaces +// (e.g. "External id") use struct tags verbatim. +type Args struct { + ExternalID string `json:"External id,omitempty"` + Stream int64 `json:"Stream,omitempty"` + Device int64 `json:"Device,omitempty"` +} diff --git a/pkg/kineto/fuzz_test.go b/pkg/kineto/fuzz_test.go new file mode 100644 index 00000000..1d1ecf55 --- /dev/null +++ b/pkg/kineto/fuzz_test.go @@ -0,0 +1,32 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "bytes" + "strings" + "testing" +) + +func FuzzParseKinetoTrace(f *testing.F) { + // Seed corpus: minimal valid + adversarial inputs. Parse MUST NOT + // panic or OOM on any input; the resource bounds + typed error + // taxonomy classify everything else. + seeds := [][]byte{ + []byte(`{}`), + []byte(`{"traceEvents":[]}`), + []byte(`{"traceEvents":[{"name":"a","cat":"cpu_op","ph":"X","ts":0,"dur":0,"pid":0,"tid":0}]}`), + []byte(``), + make([]byte, 1024), // all-zero + []byte(`{"traceEvents":[{`), + []byte(`{"traceEvents":[` + strings.Repeat("{},", 1000) + `{}]}`), + []byte(`{"traceEvents":[{"name":"","cat":"","ph":"X","ts":-1,"dur":-1,"pid":-1,"tid":-1}]}`), + } + for _, s := range seeds { + f.Add(s) + } + + f.Fuzz(func(t *testing.T, data []byte) { + _ = Parse(bytes.NewReader(data), func(Event) error { return nil }) + }) +} diff --git a/pkg/kineto/synthesize.go b/pkg/kineto/synthesize.go new file mode 100644 index 00000000..083f8425 --- /dev/null +++ b/pkg/kineto/synthesize.go @@ -0,0 +1,74 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "encoding/json" + "fmt" + "io" + "sort" +) + +// Spec describes a synthesizable Kineto trace. Used by tests and by +// cmd/tracecore failure-inject for deterministic fixtures. +type Spec struct { + SchemaVersion float64 + Pid int32 + Tid int32 + Steps []StepSpec +} + +// StepSpec is one ProfilerStep window with inner events. +type StepSpec struct { + StepID uint64 + StartTs int64 + EndTs int64 + Events []Event +} + +// Synthesize writes spec as a valid Chrome-trace JSON Object Format file +// matching what Kineto's libkineto/src/output_json.cpp emits. Output is +// deterministic: top-level keys emitted in a fixed order; event order +// matches spec's outer/inner ordering after a stable sort by ts. Two +// invocations with identical specs produce byte-identical output. +func Synthesize(spec Spec, w io.Writer) error { + totalEvents := 0 + for _, s := range spec.Steps { + totalEvents += len(s.Events) + 2 // +2 for ProfilerStep B/E markers + } + events := make([]Event, 0, totalEvents) + for _, s := range spec.Steps { + stepName := fmt.Sprintf("ProfilerStep#%d", s.StepID) + events = append(events, Event{ + Name: stepName, Cat: CatUserAnnotation, Ph: PhaseBegin, + Ts: s.StartTs, Pid: spec.Pid, Tid: spec.Tid, + }) + events = append(events, s.Events...) + events = append(events, Event{ + Name: stepName, Cat: CatUserAnnotation, Ph: PhaseEnd, + Ts: s.EndTs, Pid: spec.Pid, Tid: spec.Tid, + }) + } + sort.SliceStable(events, func(i, j int) bool { + return events[i].Ts < events[j].Ts + }) + + type traceFile struct { + BaseTimeNanoseconds int64 `json:"baseTimeNanoseconds"` + DisplayTimeUnit string `json:"displayTimeUnit"` + SchemaVersion float64 `json:"schemaVersion"` + TraceEvents []Event `json:"traceEvents"` + } + tf := traceFile{ + BaseTimeNanoseconds: 0, + DisplayTimeUnit: "ns", + SchemaVersion: spec.SchemaVersion, + TraceEvents: events, + } + enc := json.NewEncoder(w) + enc.SetIndent("", "") + if err := enc.Encode(tf); err != nil { + return fmt.Errorf("kineto: synthesize: encode: %w", err) + } + return nil +} diff --git a/pkg/kineto/synthesize_test.go b/pkg/kineto/synthesize_test.go new file mode 100644 index 00000000..456ae80c --- /dev/null +++ b/pkg/kineto/synthesize_test.go @@ -0,0 +1,54 @@ +// SPDX-License-Identifier: Apache-2.0 + +package kineto + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSynthesize_RoundTrip(t *testing.T) { + spec := Spec{ + SchemaVersion: 1.0, + Pid: 1234, + Tid: 1234, + Steps: []StepSpec{ + { + StepID: 0, StartTs: 1000, EndTs: 2000, + Events: []Event{ + {Name: "aten::addmm", Cat: CatCPUOp, Ph: PhaseComplete, Ts: 1100, Dur: 50, Pid: 1234, Tid: 1234}, + {Name: "ncclAllReduce", Cat: CatKernel, Ph: PhaseComplete, Ts: 1500, Dur: 200, Pid: 1234, Tid: 1234}, + }, + }, + }, + } + var buf bytes.Buffer + require.NoError(t, Synthesize(spec, &buf)) + + var got []Event + require.NoError(t, Parse(&buf, func(e Event) error { + got = append(got, e) + return nil + })) + // Synthesize emits: ProfilerStep#0 B, 2 inner events, ProfilerStep#0 E = 4 events total + require.Len(t, got, 4) + require.Equal(t, "ProfilerStep#0", got[0].Name) + require.Equal(t, PhaseBegin, got[0].Ph) + require.Equal(t, "ProfilerStep#0", got[3].Name) + require.Equal(t, PhaseEnd, got[3].Ph) +} + +func TestSynthesize_ByteIdentical(t *testing.T) { + spec := Spec{ + SchemaVersion: 1.0, Pid: 1234, Tid: 1234, + Steps: []StepSpec{{StepID: 0, StartTs: 0, EndTs: 1000, Events: []Event{ + {Name: "a", Cat: CatCPUOp, Ph: PhaseComplete, Ts: 100, Dur: 50, Pid: 1234, Tid: 1234}, + }}}, + } + var a, b bytes.Buffer + require.NoError(t, Synthesize(spec, &a)) + require.NoError(t, Synthesize(spec, &b)) + require.Equal(t, a.Bytes(), b.Bytes(), "Synthesize is not deterministic") +} diff --git a/pkg/kineto/testdata/SHA256SUMS b/pkg/kineto/testdata/SHA256SUMS new file mode 100644 index 00000000..38486a15 --- /dev/null +++ b/pkg/kineto/testdata/SHA256SUMS @@ -0,0 +1,2 @@ +e63ce5b7d27811930c91dc2f16c887f60882f104fa6a2c144cedc9e77a341f83 toy_2step.pt.trace.json +788d2696784e3aa0fb868058f98769aa44db50b60d0b5306fb2a6d166a954edb toy_2step.golden.json diff --git a/pkg/kineto/testdata/toy_2step.golden.json b/pkg/kineto/testdata/toy_2step.golden.json new file mode 100644 index 00000000..b4aeefee --- /dev/null +++ b/pkg/kineto/testdata/toy_2step.golden.json @@ -0,0 +1,90 @@ +[ + { + "name": "ProfilerStep#0", + "cat": "user_annotation", + "ph": "B", + "ts": 1000, + "dur": 0, + "pid": 1234, + "tid": 1234, + "args": {} + }, + { + "name": "aten::addmm", + "cat": "cpu_op", + "ph": "X", + "ts": 1100, + "dur": 50, + "pid": 1234, + "tid": 1234, + "args": { + "External id": "1" + } + }, + { + "name": "ncclAllReduce", + "cat": "kernel", + "ph": "X", + "ts": 1500, + "dur": 200, + "pid": 1234, + "tid": 1234, + "args": { + "External id": "2" + } + }, + { + "name": "ProfilerStep#0", + "cat": "user_annotation", + "ph": "E", + "ts": 2000, + "dur": 0, + "pid": 1234, + "tid": 1234, + "args": {} + }, + { + "name": "ProfilerStep#1", + "cat": "user_annotation", + "ph": "B", + "ts": 2100, + "dur": 0, + "pid": 1234, + "tid": 1234, + "args": {} + }, + { + "name": "aten::addmm", + "cat": "cpu_op", + "ph": "X", + "ts": 2200, + "dur": 55, + "pid": 1234, + "tid": 1234, + "args": { + "External id": "3" + } + }, + { + "name": "ncclAllReduce", + "cat": "kernel", + "ph": "X", + "ts": 2600, + "dur": 210, + "pid": 1234, + "tid": 1234, + "args": { + "External id": "4" + } + }, + { + "name": "ProfilerStep#1", + "cat": "user_annotation", + "ph": "E", + "ts": 3100, + "dur": 0, + "pid": 1234, + "tid": 1234, + "args": {} + } +] diff --git a/pkg/kineto/testdata/toy_2step.pt.trace.json b/pkg/kineto/testdata/toy_2step.pt.trace.json new file mode 100644 index 00000000..979833ec --- /dev/null +++ b/pkg/kineto/testdata/toy_2step.pt.trace.json @@ -0,0 +1 @@ +{"baseTimeNanoseconds":0,"displayTimeUnit":"ns","schemaVersion":1,"traceEvents":[{"name":"ProfilerStep#0","cat":"user_annotation","ph":"B","ts":1000,"dur":0,"pid":1234,"tid":1234,"args":{}},{"name":"aten::addmm","cat":"cpu_op","ph":"X","ts":1100,"dur":50,"pid":1234,"tid":1234,"args":{"External id":"1"}},{"name":"ncclAllReduce","cat":"kernel","ph":"X","ts":1500,"dur":200,"pid":1234,"tid":1234,"args":{"External id":"2"}},{"name":"ProfilerStep#0","cat":"user_annotation","ph":"E","ts":2000,"dur":0,"pid":1234,"tid":1234,"args":{}},{"name":"ProfilerStep#1","cat":"user_annotation","ph":"B","ts":2100,"dur":0,"pid":1234,"tid":1234,"args":{}},{"name":"aten::addmm","cat":"cpu_op","ph":"X","ts":2200,"dur":55,"pid":1234,"tid":1234,"args":{"External id":"3"}},{"name":"ncclAllReduce","cat":"kernel","ph":"X","ts":2600,"dur":210,"pid":1234,"tid":1234,"args":{"External id":"4"}},{"name":"ProfilerStep#1","cat":"user_annotation","ph":"E","ts":3100,"dur":0,"pid":1234,"tid":1234,"args":{}}]} diff --git a/pkg/kineto/testdata_gen_test.go b/pkg/kineto/testdata_gen_test.go new file mode 100644 index 00000000..91efa477 --- /dev/null +++ b/pkg/kineto/testdata_gen_test.go @@ -0,0 +1,59 @@ +// SPDX-License-Identifier: Apache-2.0 + +//go:build fixturegen + +package kineto + +import ( + "encoding/json" + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +// TestGenerateFixtures regenerates the checked-in fixtures from a pinned +// spec. Invoked via `go test -tags fixturegen -run TestGenerateFixtures +// ./pkg/kineto/` (or `make generate-fixtures`). CI runs this via +// `make generate-fixtures-check` and diffs the output against the +// committed files; any drift fails the build. +func TestGenerateFixtures(t *testing.T) { + spec := Spec{ + SchemaVersion: 1.0, Pid: 1234, Tid: 1234, + Steps: []StepSpec{ + {StepID: 0, StartTs: 1000, EndTs: 2000, Events: []Event{ + {Name: "aten::addmm", Cat: CatCPUOp, Ph: PhaseComplete, Ts: 1100, Dur: 50, Pid: 1234, Tid: 1234, Args: Args{ExternalID: "1", Stream: 0, Device: 0}}, + {Name: "ncclAllReduce", Cat: CatKernel, Ph: PhaseComplete, Ts: 1500, Dur: 200, Pid: 1234, Tid: 1234, Args: Args{ExternalID: "2", Stream: 0, Device: 0}}, + }}, + {StepID: 1, StartTs: 2100, EndTs: 3100, Events: []Event{ + {Name: "aten::addmm", Cat: CatCPUOp, Ph: PhaseComplete, Ts: 2200, Dur: 55, Pid: 1234, Tid: 1234, Args: Args{ExternalID: "3", Stream: 0, Device: 0}}, + {Name: "ncclAllReduce", Cat: CatKernel, Ph: PhaseComplete, Ts: 2600, Dur: 210, Pid: 1234, Tid: 1234, Args: Args{ExternalID: "4", Stream: 0, Device: 0}}, + }}, + }, + } + f, err := os.Create("testdata/toy_2step.pt.trace.json") + require.NoError(t, err) + defer f.Close() + require.NoError(t, Synthesize(spec, f)) +} + +// TestGenerateGolden regenerates the golden parse output from the +// fixture produced by TestGenerateFixtures. Must run after it. +func TestGenerateGolden(t *testing.T) { + f, err := os.Open("testdata/toy_2step.pt.trace.json") + require.NoError(t, err) + defer f.Close() + + var events []Event + require.NoError(t, Parse(f, func(e Event) error { + events = append(events, e) + return nil + })) + + out, err := os.Create("testdata/toy_2step.golden.json") + require.NoError(t, err) + defer out.Close() + enc := json.NewEncoder(out) + enc.SetIndent("", " ") + require.NoError(t, enc.Encode(events)) +}