diff --git a/internal/raftengine/cold_start.go b/internal/raftengine/cold_start.go new file mode 100644 index 00000000..c6f99a88 --- /dev/null +++ b/internal/raftengine/cold_start.go @@ -0,0 +1,40 @@ +package raftengine + +// ColdStartObserver receives cold-start snapshot-restore lifecycle +// events from restoreSnapshotState. Implementations live in the +// monitoring package and wire to Prometheus counters/gauges; the +// engine receives a value through OpenConfig and treats nil as +// "no metrics emitted" (preserves the byte-for-byte cold-start +// behaviour for tests and callers that do not wire monitoring). +// +// Three outcomes match the design's strictly-additive policy +// (docs/design/2026_06_02_idempotent_snapshot_restore.md §9): +// +// - RestoreSkipped: the gate fired. `gap = haveAppliedIndex - +// snapshot.Metadata.Index` (how far ahead the live store was). +// This is the user-visible perf win. +// +// - RestoreExecuted: the gate did NOT fire because the live store +// was genuinely stale (haveAppliedIndex < snapshot.Metadata.Index). +// `gap = snapshot.Metadata.Index - haveAppliedIndex` (the work +// the full restore re-did). +// +// - RestoreFallback: the strictly-additive fallback path — the +// FSM did not expose AppliedIndexReader, LastAppliedIndex +// reported the meta key missing, or it returned an error. The +// full restore runs but the skip was never even attempted. +// `reason` carries a stable short label so Prometheus can +// surface why the optimisation could not engage: +// +// not_reader — FSM does not implement AppliedIndexReader +// missing_meta — meta key absent (pre-upgrade fsm.db) +// read_err — LastAppliedIndex returned an error +// +// Implementations MUST NOT block; the engine calls these on the +// cold-start critical path. Treat all label/string arguments as +// untrusted enum values from the engine's enumeration above. +type ColdStartObserver interface { + RestoreSkipped(snapIndex, haveAppliedIndex uint64) + RestoreExecuted(snapIndex, haveAppliedIndex uint64) + RestoreFallback(snapIndex uint64, reason string) +} diff --git a/internal/raftengine/etcd/cold_start_volatile_replay_test.go b/internal/raftengine/etcd/cold_start_volatile_replay_test.go new file mode 100644 index 00000000..c92ea874 --- /dev/null +++ b/internal/raftengine/etcd/cold_start_volatile_replay_test.go @@ -0,0 +1,199 @@ +package etcd + +import ( + "bytes" + "io" + "sync/atomic" + "testing" + + "github.com/bootjp/elastickv/internal/raftengine" + raftpb "go.etcd.io/raft/v3/raftpb" +) + +// volatileTagFakeFSM is a StateMachine that classifies payloads by +// their leading byte: 0x02 (kv.raftEncodeHLCLease) is volatile, every +// other tag is data-mutating. Mirrors the kvFSM contract closely +// enough that the engine's cold-start duplicate guard can be tested +// without pulling in the full kv package. +type volatileTagFakeFSM struct { + calls atomic.Int32 + lastPayload []byte +} + +func (f *volatileTagFakeFSM) Apply(data []byte) any { + f.calls.Add(1) + cp := make([]byte, len(data)) + copy(cp, data) + f.lastPayload = cp + return nil +} + +func (f *volatileTagFakeFSM) Snapshot() (Snapshot, error) { return nil, nil } +func (f *volatileTagFakeFSM) Restore(_ io.Reader) error { return nil } + +func (f *volatileTagFakeFSM) IsVolatileOnlyPayload(payload []byte) bool { + return len(payload) > 0 && payload[0] == 0x02 +} + +var _ raftengine.VolatileEntryClassifier = (*volatileTagFakeFSM)(nil) + +// TestApplyNormalCommitted_VolatileEntryReplayedOnDuplicate pins the +// codex P1 #934 round 7 fix: after the cold-start skip gate seeds +// e.applied at the WAL committed tail, entries delivered by Raft at +// indices <= e.applied that are volatile-only (HLC lease, tag 0x02) +// MUST still reach fsm.Apply so the post-snapshot ceiling raise is +// reconstructed. Data-mutating duplicates (any other tag) MUST NOT +// reach fsm.Apply (OCC re-validation; ceiling regression). A +// regression would either silently drop the lease (the bug) or +// silently re-execute KV writes (idempotency violation). +func TestApplyNormalCommitted_VolatileEntryReplayedOnDuplicate(t *testing.T) { + t.Parallel() + cases := []struct { + name string + tag byte + wantApply bool + }{ + {"volatile HLC lease (tag 0x02) replays past e.applied", 0x02, true}, + {"data-mutating single KV (tag 0x00) is dropped", 0x00, false}, + {"data-mutating batch KV (tag 0x01) is dropped", 0x01, false}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + t.Parallel() + fsm := &volatileTagFakeFSM{} + e := newTestEngine(fsm, nil, nil) + e.applied = 200 + + payload := []byte{c.tag, 0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x00, 0x00} + entry := raftpb.Entry{ + Type: raftpb.EntryNormal, + Index: 150, // <= e.applied → duplicate path + Data: encodeProposalEnvelope(42, payload), + } + + if err := e.applyNormalCommitted(entry); err != nil { + t.Fatalf("applyNormalCommitted: %v", err) + } + + got := fsm.calls.Load() + switch { + case c.wantApply && got != 1: + t.Fatalf("volatile duplicate: fsm.Apply call count = %d, want 1 (lost in-memory effect)", got) + case !c.wantApply && got != 0: + t.Fatalf("data-mutating duplicate: fsm.Apply call count = %d, want 0 (re-applied; OCC will re-validate against post-tail state)", got) + } + + // In every case e.applied must NOT advance — the entry's + // index is below the gate-seeded value and resolveProposal + // is intentionally not called either. + if e.applied != 200 { + t.Fatalf("e.applied advanced to %d, want it pinned at 200 for duplicate-path entries", e.applied) + } + }) + } +} + +// TestApplyNormalCommitted_FreshEntryAlwaysAppliesAndAdvances pins the +// non-duplicate path: entries past e.applied always reach fsm.Apply +// regardless of the volatile/data classification, and e.applied +// advances. Locks in the asymmetry — the classifier ONLY gates the +// duplicate arm. +func TestApplyNormalCommitted_FreshEntryAlwaysAppliesAndAdvances(t *testing.T) { + t.Parallel() + for _, tag := range []byte{0x00, 0x01, 0x02} { + t.Run("tag=0x0"+string("0123"[tag])+"_fresh", func(t *testing.T) { + t.Parallel() + fsm := &volatileTagFakeFSM{} + e := newTestEngine(fsm, nil, nil) + e.applied = 100 + + entry := raftpb.Entry{ + Type: raftpb.EntryNormal, + Index: 150, + Data: encodeProposalEnvelope(7, []byte{tag, 0x99}), + } + + if err := e.applyNormalCommitted(entry); err != nil { + t.Fatalf("applyNormalCommitted: %v", err) + } + if got := fsm.calls.Load(); got != 1 { + t.Fatalf("fsm.Apply call count = %d, want 1 (fresh entry, all tags)", got) + } + if e.applied != 150 { + t.Fatalf("e.applied = %d, want 150 (fresh entry must advance)", e.applied) + } + }) + } +} + +// TestApplyNormalCommitted_VolatileDuplicate_PostCutoverEncrypted pins +// the post-Stage-8a cutover path: encrypted HLC lease entries past +// e.applied MUST be decrypted FIRST, then classified as volatile, then +// replayed for their in-memory effect. The wire-format reality is +// that a post-cutover HLC lease's `payload[0]` is encrypted bytes; +// only the cleartext (after WrapRaftPayload unwrap) carries the 0x02 +// tag, so the classifier must see the cleartext or the lease drops. +// Claude #934 round 7 finding R7-F2 — pre-cutover-only coverage was +// insufficient. +func TestApplyNormalCommitted_VolatileDuplicate_PostCutoverEncrypted(t *testing.T) { + t.Parallel() + c, kid := raftCipherFixture(t) + const cutover uint64 = 100 + fsm := &volatileTagFakeFSM{} + e := newTestEngine(fsm, c, func() uint64 { return cutover }) + e.applied = 200 + + // HLC lease cleartext: tag 0x02 + 8-byte big-endian ceiling. + plain := []byte{0x02, 0, 0, 0, 0, 0, 0, 0, 0x01} + // Index above cutover → triggers WrapRaftPayload path inside + // applyNormalEntry; index below e.applied → duplicate arm. + entry := envelopeEntry(t, c, kid, 150, plain) + + if err := e.applyNormalCommitted(entry); err != nil { + t.Fatalf("applyNormalCommitted: %v", err) + } + if got := fsm.calls.Load(); got != 1 { + t.Fatalf("encrypted volatile duplicate: fsm.Apply call count = %d, want 1 — decryption must run before classification or the lease drops", got) + } + if !bytes.Equal(fsm.lastPayload, plain) { + t.Fatalf("FSM received %x, want cleartext %x — classifier must see post-decrypt bytes", fsm.lastPayload, plain) + } + if e.applied != 200 { + t.Fatalf("e.applied advanced to %d, want pinned at 200 for duplicate-arm replay", e.applied) + } +} + +// TestApplyNormalCommitted_DuplicateWithoutClassifier_DropsAll guards +// the FSM-doesn't-opt-in path: a StateMachine that does NOT implement +// VolatileEntryClassifier (existing FSMs, third-party engines) must +// keep the pre-PR behavior — every duplicate is dropped, including +// any that happen to be volatile. The strictly-additive opt-in keeps +// the engine compatible with FSMs that have not been updated. +func TestApplyNormalCommitted_DuplicateWithoutClassifier_DropsAll(t *testing.T) { + t.Parallel() + // fakeStateMachine (from encryption_test.go) does NOT implement + // VolatileEntryClassifier — that absence is the contract under + // test. + fsm := &fakeStateMachine{} + if _, ok := any(fsm).(raftengine.VolatileEntryClassifier); ok { + t.Fatal("fakeStateMachine unexpectedly implements VolatileEntryClassifier; test contract broken") + } + e := newTestEngine(fsm, nil, nil) + e.applied = 200 + + entry := raftpb.Entry{ + Type: raftpb.EntryNormal, + Index: 150, + Data: encodeProposalEnvelope(13, []byte{0x02, 0xff}), // would-be volatile + } + + if err := e.applyNormalCommitted(entry); err != nil { + t.Fatalf("applyNormalCommitted: %v", err) + } + if got := fsm.calls.Load(); got != 0 { + t.Fatalf("FSM without classifier: fsm.Apply call count = %d, want 0 (drop-all on duplicate, no opt-in)", got) + } + if e.applied != 200 { + t.Fatalf("e.applied advanced unexpectedly: %d, want 200", e.applied) + } +} diff --git a/internal/raftengine/etcd/encryption_test.go b/internal/raftengine/etcd/encryption_test.go index 53052649..1edcef3e 100644 --- a/internal/raftengine/etcd/encryption_test.go +++ b/internal/raftengine/etcd/encryption_test.go @@ -133,7 +133,7 @@ func TestApplyNormalEntry_CutoverActive_NoCipher_FailsClosed(t *testing.T) { Index: cutover + 1, Data: encodeProposalEnvelope(99, []byte("would-be wrapped payload")), } - _, err := e.applyNormalEntry(entry) + _, err := e.applyNormalEntry(entry, false) if !errors.Is(err, ErrRaftUnwrapFailed) { t.Fatalf("expected ErrRaftUnwrapFailed for cutover-active+no-cipher misconfig, got %v", err) } @@ -151,7 +151,7 @@ func TestApplyNormalEntry_CutoverActive_NoCipher_FailsClosed(t *testing.T) { Index: cutover, Data: encodeProposalEnvelope(11, []byte("legacy cleartext")), } - if _, err := e.applyNormalEntry(belowCutoverEntry); err != nil { + if _, err := e.applyNormalEntry(belowCutoverEntry, false); err != nil { t.Fatalf("below-cutover should pass through, got %v", err) } if got := fsm.calls.Load(); got != 1 { @@ -169,7 +169,7 @@ func TestApplyNormalEntry_NoCipher_PassThrough(t *testing.T) { e := newTestEngine(fsm, nil, nil) plain := []byte("op=put key=k1 v=hello") entry := raftpb.Entry{Type: raftpb.EntryNormal, Data: encodeProposalEnvelope(42, plain)} - if _, err := e.applyNormalEntry(entry); err != nil { + if _, err := e.applyNormalEntry(entry, false); err != nil { t.Fatalf("applyNormalEntry: %v", err) } if got := fsm.calls.Load(); got != 1 { @@ -200,7 +200,7 @@ func TestApplyNormalEntry_BelowCutover_PassThrough(t *testing.T) { Index: idx, Data: encodeProposalEnvelope(11, cleartextPayload), } - if _, err := e.applyNormalEntry(entry); err != nil { + if _, err := e.applyNormalEntry(entry, false); err != nil { t.Fatalf("idx=%d: applyNormalEntry: %v", idx, err) } if got := fsm.calls.Load(); got != 1 { @@ -226,7 +226,7 @@ func TestApplyNormalEntry_AboveCutover_Unwraps(t *testing.T) { fsm.calls.Store(0) plaintext := []byte("op=put key=k1 v=secret") entry := envelopeEntry(t, c, kid, idx, plaintext) - if _, err := e.applyNormalEntry(entry); err != nil { + if _, err := e.applyNormalEntry(entry, false); err != nil { t.Fatalf("idx=%d: applyNormalEntry: %v", idx, err) } if got := fsm.calls.Load(); got != 1 { @@ -258,7 +258,7 @@ func TestApplyNormalEntry_UnwrapFailure_Halts(t *testing.T) { // last byte. entry.Data[len(entry.Data)-1] ^= 0xff - _, err := e.applyNormalEntry(entry) + _, err := e.applyNormalEntry(entry, false) if !errors.Is(err, ErrRaftUnwrapFailed) { t.Fatalf("expected ErrRaftUnwrapFailed, got %v", err) } @@ -326,7 +326,7 @@ func TestApplyNormalEntry_BoundaryCutover(t *testing.T) { Index: cutover, Data: encodeProposalEnvelope(13, cleartext), } - if _, err := e.applyNormalEntry(atCutover); err != nil { + if _, err := e.applyNormalEntry(atCutover, false); err != nil { t.Fatalf("at-cutover: %v", err) } if string(fsm.last) != string(cleartext) { @@ -335,7 +335,7 @@ func TestApplyNormalEntry_BoundaryCutover(t *testing.T) { // cutover+1: wrapped payload — MUST be unwrapped. above := envelopeEntry(t, c, kid, cutover+1, []byte("first encrypted")) - if _, err := e.applyNormalEntry(above); err != nil { + if _, err := e.applyNormalEntry(above, false); err != nil { t.Fatalf("above-cutover: %v", err) } if string(fsm.last) != "first encrypted" { @@ -365,7 +365,7 @@ func TestApplyNormalEntry_ProposalIDStillResolvable(t *testing.T) { } data := encodeProposalEnvelope(wantID, wrapped) entry := raftpb.Entry{Type: raftpb.EntryNormal, Index: cutover + 1, Data: data} - if _, err := e.applyNormalEntry(entry); err != nil { + if _, err := e.applyNormalEntry(entry, false); err != nil { t.Fatalf("applyNormalEntry: %v", err) } gotID, _, ok := decodeProposalEnvelope(entry.Data) @@ -396,7 +396,7 @@ func TestApplyNormalEntry_NoCutoverDefault(t *testing.T) { Index: idx, Data: encodeProposalEnvelope(7, cleartext), } - if _, err := e.applyNormalEntry(entry); err != nil { + if _, err := e.applyNormalEntry(entry, false); err != nil { t.Fatalf("idx=%d: %v", idx, err) } if string(fsm.last) != string(cleartext) { diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 1699abf2..227acf52 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -187,6 +187,12 @@ type OpenConfig struct { // has been observed yet, equivalent to "raft envelope hook // off". RaftCutoverIndex RaftCutoverIndex + // ColdStartObserver receives the cold-start snapshot-restore + // skip-gate lifecycle events (skipped / executed / fallback). + // nil disables metrics; the skip itself still runs. See + // docs/design/2026_06_02_idempotent_snapshot_restore.md §9 and + // internal/raftengine/cold_start.go for the contract. + ColdStartObserver raftengine.ColdStartObserver } type Engine struct { @@ -549,7 +555,7 @@ func Open(ctx context.Context, cfg OpenConfig) (*Engine, error) { config: configurationFromConfState(peerMap, prepared.disk.LocalSnap.Metadata.ConfState), voterCount: len(prepared.disk.LocalSnap.Metadata.ConfState.Voters), isLearnerNode: learnerSetFromConfState(prepared.disk.LocalSnap.Metadata.ConfState), - applied: maxAppliedIndex(prepared.disk.LocalSnap), + applied: coldStartApplied(prepared.disk), dispatchCtx: dispatchCtx, dispatchCancel: dispatchCancel, pendingProposals: map[uint64]proposalRequest{}, @@ -564,7 +570,7 @@ func Open(ctx context.Context, cfg OpenConfig) (*Engine, error) { maxWALFiles: maxWALFilesFromEnv(), } engine.configIndex.Store(maxAppliedIndex(prepared.disk.LocalSnap)) - engine.appliedIndex.Store(maxAppliedIndex(prepared.disk.LocalSnap)) + engine.appliedIndex.Store(coldStartApplied(prepared.disk)) engine.initTransport(prepared.cfg) engine.initSnapshotWorker() engine.refreshStatus() @@ -2175,7 +2181,17 @@ func (e *Engine) applyCommitted(entries []raftpb.Entry) error { // lock-free atomic mirror in a single place. Called exclusively from // the Raft run loop, so no synchronization between the two writes is // required beyond the single-writer invariant. +// +// Advance-only: cold-start with EffectiveApplied > snapshot.Index +// seeds e.applied with `have`, after which raft still delivers conf- +// change entries from snapshot.Index+1..have whose applyConfChange* +// path calls setApplied(entry.Index) with index < have. Allowing the +// counter to walk backwards would break the e.appliedIndex.Load() +// contract every other watcher depends on. Codex P1 #934. func (e *Engine) setApplied(index uint64) { + if index <= e.applied { + return + } e.applied = index e.appliedIndex.Store(index) } @@ -2208,7 +2224,28 @@ func (e *Engine) setApplied(index uint64) { // (today's *fsmApplyResponse, returned by kv batch apply) do NOT // implement HaltApply and continue to advance setApplied. func (e *Engine) applyNormalCommitted(entry raftpb.Entry) error { - response, err := e.applyNormalEntry(entry) + // Cold-start idempotency: when the skip gate fires with the FSM + // past snapshot.Metadata.Index (have > tok.Index), e.applied is + // seeded with `have`. Raft still delivers entries + // snapshot.Metadata.Index+1..commit including the [snap.Index+1, + // have] tail, which the FSM has already applied. Re-calling + // applyNormalEntry on a KV/MVCC entry would re-execute the + // transaction (OCC conflicts; HLC ceiling inversion). Drop the + // duplicate without touching the FSM. setApplied is NOT called - + // e.applied is already >= entry.Index. Codex P1 #934. + // + // Codex P1 #934 round 7: volatile-only entries (HLC lease, tag + // 0x02) carry effects that live purely in process memory + // (HLC.SetPhysicalCeiling). Their post-snapshot effect must be + // re-applied on cold start; otherwise ApplySnapshotHeader restores + // only the older snapshot-time ceiling and the next leader-issued + // timestamp can collide with persisted commit_ts values stamped + // under the lost lease. applyNormalEntry routes those duplicates + // to fsm.Apply (which is monotonic and idempotent for HLC leases) + // and returns (nil, nil) for data-mutating duplicates so we fall + // through to the "no setApplied advance, no resolveProposal" arm. + duplicate := entry.Index <= e.applied + response, err := e.applyNormalEntry(entry, duplicate) if err != nil { return err } @@ -2220,6 +2257,13 @@ func (e *Engine) applyNormalCommitted(entry raftpb.Entry) error { return errors.Wrap(herr, "raftengine/etcd: FSM-requested apply halt") } } + if duplicate { + // Volatile-only entries replayed for their in-memory effect + // (e.applied already past) or data-mutating duplicates + // dropped inside applyNormalEntry — either way the engine's + // applied pointer and pending-proposal map stay untouched. + return nil + } e.setApplied(entry.Index) e.resolveProposal(entry.Index, entry.Data, response) return nil @@ -2260,7 +2304,26 @@ func (e *Engine) applyConfChangeV2Committed(entry raftpb.Entry) error { return nil } -func (e *Engine) applyNormalEntry(entry raftpb.Entry) (any, error) { +// applyNormalEntry decodes the raft envelope, optionally unwraps the +// cipher payload, and dispatches to fsm.Apply. +// +// dropIfNonVolatile is the cold-start idempotency seam. When the skip +// gate fires the engine still receives WAL committed-tail entries past +// snapshot.Metadata.Index; data-mutating duplicates must NOT call +// fsm.Apply (OCC re-validation, ceiling regression, sidecar drift) but +// volatile-only entries (HLC lease) MUST still call fsm.Apply for +// their in-memory monotonic effect. The classifier inspects the +// cleartext payload AFTER raft envelope decode + decryption — the +// raftEncodeHLCLease tag (0x02) is only visible there in the +// post-cutover path, so the cleartext-side decision is the only +// correct one. Codex P1 #934 round 7. +// +// dropIfNonVolatile=false preserves the original semantics: every +// decodable entry is handed to fsm.Apply. dropIfNonVolatile=true +// gates the fsm.Apply call on VolatileEntryClassifier.IsVolatileOnlyPayload +// and returns (nil, nil) for data-mutating duplicates so the caller's +// "no setApplied advance" arm fires unchanged. +func (e *Engine) applyNormalEntry(entry raftpb.Entry, dropIfNonVolatile bool) (any, error) { if len(entry.Data) == 0 { return nil, nil } @@ -2326,12 +2389,44 @@ func (e *Engine) applyNormalEntry(entry raftpb.Entry) (any, error) { } payload = plain } - if aware, ok := e.fsm.(raftengine.ApplyIndexAware); ok { - aware.SetApplyIndex(entry.Index) + if dropIfNonVolatile && !e.isVolatilePayload(payload) { + return nil, nil + } + // SetApplyIndex is suppressed on the duplicate-replay path + // because pendingApplyIdx is documented as "the entry index the + // engine is about to apply". On a volatile replay entry.Index is + // BELOW e.applied; writing it would feed a stale index to any + // future durability sink (encryption sidecar.RaftAppliedIndex, + // ApplyMutationsRaftAt's metaAppliedIndex bundle). Today's only + // volatile entry (HLC lease) does not read pendingApplyIdx so + // this is a no-op, but it future-proofs the seam against a new + // volatile entry type that also persists. Claude #934 round 7 + // finding R7-F1. + if !dropIfNonVolatile { + if aware, ok := e.fsm.(raftengine.ApplyIndexAware); ok { + aware.SetApplyIndex(entry.Index) + } } return e.fsm.Apply(payload), nil } +// isVolatilePayload is the cold-start duplicate guard's cleartext +// classifier. Only volatile-only payloads (HLC lease, tag 0x02) may +// be re-applied past e.applied; data-mutating entries return false so +// the caller drops them. The check runs AFTER raft envelope +// decryption because post-cutover the cleartext payload's leading +// byte is the only reliable carrier of the entry-kind tag. FSMs that +// do not implement VolatileEntryClassifier default to false +// (drop-all), preserving the pre-PR semantics for engines that +// haven't opted in. Codex P1 #934 round 7. +func (e *Engine) isVolatilePayload(payload []byte) bool { + classifier, ok := e.fsm.(raftengine.VolatileEntryClassifier) + if !ok { + return false + } + return classifier.IsVolatileOnlyPayload(payload) +} + func (e *Engine) resolveProposal(commitIndex uint64, data []byte, response any) { id, _, ok := decodeProposalEnvelope(data) if !ok { diff --git a/internal/raftengine/etcd/factory.go b/internal/raftengine/etcd/factory.go index c5e425ea..08d37d53 100644 --- a/internal/raftengine/etcd/factory.go +++ b/internal/raftengine/etcd/factory.go @@ -16,6 +16,14 @@ type FactoryConfig struct { ElectionTick int MaxSizePerMsg uint64 MaxInflightMsg int + // ColdStartObserver receives cold-start snapshot-restore + // lifecycle events for the Branch 3 skip gate + // (raftengine.ColdStartObserver). The Factory threads it into + // every OpenConfig it builds; nil disables metrics emission + // (the skip itself still runs). Wire via main.go from + // monitoring.Registry.ColdStartObserver(). See PR #934 round-1 + // codex P2 for the plumbing rationale. + ColdStartObserver raftengine.ColdStartObserver } // Factory creates etcd raft engine instances. @@ -44,19 +52,20 @@ func (f *Factory) Create(cfg raftengine.FactoryConfig) (*raftengine.FactoryResul } engine, err := Open(context.Background(), OpenConfig{ - LocalID: cfg.LocalID, - LocalAddress: cfg.LocalAddress, - DataDir: cfg.DataDir, - Peers: peers, - Bootstrap: cfg.Bootstrap, - JoinAsLearner: cfg.JoinAsLearner, - Transport: transport, - StateMachine: cfg.StateMachine, - TickInterval: f.cfg.TickInterval, - HeartbeatTick: f.cfg.HeartbeatTick, - ElectionTick: f.cfg.ElectionTick, - MaxSizePerMsg: f.cfg.MaxSizePerMsg, - MaxInflightMsg: f.cfg.MaxInflightMsg, + LocalID: cfg.LocalID, + LocalAddress: cfg.LocalAddress, + DataDir: cfg.DataDir, + Peers: peers, + Bootstrap: cfg.Bootstrap, + JoinAsLearner: cfg.JoinAsLearner, + Transport: transport, + StateMachine: cfg.StateMachine, + TickInterval: f.cfg.TickInterval, + HeartbeatTick: f.cfg.HeartbeatTick, + ElectionTick: f.cfg.ElectionTick, + MaxSizePerMsg: f.cfg.MaxSizePerMsg, + MaxInflightMsg: f.cfg.MaxInflightMsg, + ColdStartObserver: f.cfg.ColdStartObserver, }) if err != nil { var closeErr error diff --git a/internal/raftengine/etcd/grpc_transport_test.go b/internal/raftengine/etcd/grpc_transport_test.go index 308035d8..cc262417 100644 --- a/internal/raftengine/etcd/grpc_transport_test.go +++ b/internal/raftengine/etcd/grpc_transport_test.go @@ -217,7 +217,8 @@ func TestReceiveSnapshotStream_StreamingTokenWhenFSMSnapDirSet(t *testing.T) { // materializing the payload as []byte. Verify the receiver FSM ends up // with exactly the entries the sender serialized. receiverFSM := &testStateMachine{} - require.NoError(t, restoreSnapshotState(receiverFSM, *msg.Snapshot, fsmSnapDir)) + _, restoreErr := restoreSnapshotState(receiverFSM, *msg.Snapshot, msg.Snapshot.Metadata.Index, fsmSnapDir, nil, nil) + require.NoError(t, restoreErr) require.Equal(t, senderFSM.Applied(), receiverFSM.Applied()) } diff --git a/internal/raftengine/etcd/wal_store.go b/internal/raftengine/etcd/wal_store.go index c5b1d860..e8b594c0 100644 --- a/internal/raftengine/etcd/wal_store.go +++ b/internal/raftengine/etcd/wal_store.go @@ -2,10 +2,12 @@ package etcd import ( "bytes" + "hash/crc32" "io" "os" "path/filepath" + "github.com/bootjp/elastickv/internal/raftengine" "github.com/cockroachdb/errors" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" etcdstorage "go.etcd.io/etcd/server/v3/storage" @@ -25,6 +27,14 @@ type diskState struct { Storage *etcdraft.MemoryStorage Persist etcdstorage.Storage LocalSnap raftpb.Snapshot + // EffectiveApplied is the FSM's durable applied index when the + // cold-start skip gate fires (i.e., max(snap.Metadata.Index, + // have)). Engine.Open uses this to seed e.applied and the atomic + // mirror so the apply loop does not re-deliver WAL entries + // already represented in the FSM. Zero when no override applies + // (legacy path / execute-restore path); the engine then falls + // back to maxAppliedIndex(LocalSnap). Codex P1 #934. + EffectiveApplied uint64 } func openDiskState(cfg OpenConfig, peers []Peer) (*diskState, error) { @@ -38,7 +48,7 @@ func openDiskState(cfg OpenConfig, peers []Peer) (*diskState, error) { } if wal.Exist(walDir) { - return loadWalState(logger, walDir, snapDir, fsmSnapDir, cfg.StateMachine) + return loadWalState(logger, walDir, snapDir, fsmSnapDir, cfg.StateMachine, cfg.ColdStartObserver) } legacy, legacyErr := loadLegacyOrSplitState(cfg.DataDir) @@ -114,7 +124,7 @@ func bootstrapWalState(logger *zap.Logger, walDir, snapDir, fsmSnapDir string, f return persistBootState(logger, walDir, snapDir, fsmSnapDir, fsm, boot) } -func loadWalState(logger *zap.Logger, walDir, snapDir, fsmSnapDir string, fsm StateMachine) (*diskState, error) { +func loadWalState(logger *zap.Logger, walDir, snapDir, fsmSnapDir string, fsm StateMachine, obs raftengine.ColdStartObserver) (*diskState, error) { // Scope the repair retry tightly to WAL-only reads: both // loadPersistedSnapshot (scans WAL via wal.ValidSnapshotEntries) // and openAndReadWAL's ReadAll can surface io.ErrUnexpectedEOF @@ -130,14 +140,32 @@ func loadWalState(logger *zap.Logger, walDir, snapDir, fsmSnapDir string, fsm St if err != nil { return nil, err } - if err := restoreSnapshotState(fsm, snapshot, fsmSnapDir); err != nil { - return nil, err - } + // Codex P1 #934: open the WAL BEFORE the skip-gate decision so we + // know the post-snapshot entry tail. The skip path is only safe + // when the FSM is at least as advanced as the last WAL entry; if + // the FSM is past `tok.Index` but the WAL still carries entries + // `tok.Index+1 .. have` (the normal interval between snapshots, + // since metaAppliedIndex advances on each Apply), those entries + // would re-apply onto a Pebble store that already contains them, + // hitting OCC conflicts and leaving the HLC below timestamps + // already on disk. Compute the WAL tail's last index and gate + // the skip on `have >= lastWalIndex`. w, hardState, entries, err := openAndReadWALWithRepair(logger, walDir, walSnapshotFor(snapshot)) if err != nil { return nil, err } + lastCommittedIndex := coldStartSkipThreshold(snapshot, hardState) + effectiveApplied, err := restoreSnapshotState(fsm, snapshot, lastCommittedIndex, fsmSnapDir, obs, logger) + if err != nil { + if closeErr := w.Close(); closeErr != nil { + logger.Warn("WAL close failed after restoreSnapshotState error", + zap.String("dir", walDir), + zap.Error(closeErr), + ) + } + return nil, err + } storage, err := newMemoryStorage(persistedState{ HardState: hardState, @@ -155,12 +183,84 @@ func loadWalState(logger *zap.Logger, walDir, snapDir, fsmSnapDir string, fsm St } return &diskState{ - Storage: storage, - Persist: etcdstorage.NewStorage(logger, w, snapshotter), - LocalSnap: snapshot, + Storage: storage, + Persist: etcdstorage.NewStorage(logger, w, snapshotter), + LocalSnap: snapshot, + EffectiveApplied: effectiveApplied, }, nil } +// reportColdStartExecute emits the executed-arm of the cold-start +// outcome (callback + structured log). Split out of reportColdStart +// so the parent stays under the cyclop limit; the absolute-value +// gap calculation pushes the inline arm over the threshold. +func reportColdStartExecute(obs raftengine.ColdStartObserver, logger *zap.Logger, snapIndex, target, have uint64) { + if obs != nil { + obs.RestoreExecuted(snapIndex, have) + } + if logger == nil { + return + } + // gap_to_snapshot uses absolute value because the gate now + // permits have > snapIndex (FSM ahead of snapshot but behind + // committed tail). gap_behind_committed is target-have; can be + // 0 when have==target. + var gapToSnapshot uint64 + if have >= snapIndex { + gapToSnapshot = have - snapIndex + } else { + gapToSnapshot = snapIndex - have + } + logger.Info("restoreSnapshotState executed (FSM behind WAL committed tail)", + zap.Uint64("fsm_applied", have), + zap.Uint64("snapshot_index", snapIndex), + zap.Uint64("last_committed_index", target), + zap.Uint64("gap_to_snapshot", gapToSnapshot), + zap.Uint64("gap_behind_committed", target-have), + ) +} + +// coldStartSkipThreshold returns the maximum log index the cold- +// start replay can deliver via Ready.CommittedEntries on this +// node: max(snapshot.Metadata.Index, hardState.Commit). The skip +// gate compares the FSM's durable applied index against this +// value; skip is only safe when the FSM is at least this fresh. +// +// Followers can carry an UNCOMMITTED WAL suffix +// (entries[n-1].Index > hardState.Commit). Raft does NOT surface +// those entries in CommittedEntries until the leader confirms +// them. The previous gate used the WAL tail (entries[n-1].Index) +// which forced a multi-GiB restore on every restart of any +// follower with an uncommitted suffix, defeating the cold-start +// optimization. Codex P2 #934 round 3. +// +// The lower bound stays at the snapshot pointer because an empty +// WAL still requires the FSM to be at least at the snapshot +// index. Raft's invariant guarantees hardState.Commit >= 0; we do +// not need to bound from below explicitly beyond snap.Index. +func coldStartSkipThreshold(snapshot raftpb.Snapshot, hardState raftpb.HardState) uint64 { + threshold := snapshot.Metadata.Index + if hardState.Commit > threshold { + threshold = hardState.Commit + } + return threshold +} + +// coldStartApplied returns the engine's initial applied counter on +// Open: max(maxAppliedIndex(LocalSnap), EffectiveApplied). When the +// skip gate fires with the FSM at `have > snapshot.Metadata.Index`, +// EffectiveApplied carries `have`; without this seed the engine +// would deliver entries snapshot.Index+1..have to applyCommitted +// and re-apply them onto a Pebble store already containing them +// (codex P1 #934 root cause). +func coldStartApplied(disk *diskState) uint64 { + base := maxAppliedIndex(disk.LocalSnap) + if disk.EffectiveApplied > base { + return disk.EffectiveApplied + } + return base +} + // loadPersistedSnapshotWithRepair wraps loadPersistedSnapshot with one // wal.Repair attempt on io.ErrUnexpectedEOF. The caller passes in a // shared snapshotter so loadWalState does not instantiate snap.New @@ -243,19 +343,279 @@ func loadPersistedSnapshot(logger *zap.Logger, walDir string, snapshotter *snap. } } -func restoreSnapshotState(fsm StateMachine, snapshot raftpb.Snapshot, fsmSnapDir string) error { +// restoreSnapshotState restores or skip-gates the FSM cold start and +// returns the effective applied index that the engine MUST seed its +// apply counter with. Zero means "no override" (legacy path, empty +// snapshot, or nil FSM); the engine falls back to +// maxAppliedIndex(LocalSnap) in those cases. +// +// The non-zero return is the gate's load-bearing escape hatch from +// double-apply (codex P1 #934): on the skip path the FSM is already +// at `have`, so the engine must NOT replay WAL entries with Index +// <= have or the Pebble store would observe them twice (OCC +// conflicts; HLC ceiling inversion). The execute path returns +// snapshot.Metadata.Index to leave engine behaviour unchanged. +func restoreSnapshotState(fsm StateMachine, snapshot raftpb.Snapshot, lastWalIndex uint64, fsmSnapDir string, obs raftengine.ColdStartObserver, logger *zap.Logger) (uint64, error) { if etcdraft.IsEmptySnap(snapshot) || len(snapshot.Data) == 0 || fsm == nil { - return nil + return 0, nil } if isSnapshotToken(snapshot.Data) { - tok, err := decodeSnapshotToken(snapshot.Data) + return restoreSnapshotStateFromToken(fsm, snapshot, lastWalIndex, fsmSnapDir, obs, logger) + } + // Legacy format: full FSM payload embedded in snapshot.Data. + if err := fsm.Restore(bytes.NewReader(snapshot.Data)); err != nil { + return 0, errors.WithStack(err) + } + return snapshot.Metadata.Index, nil +} + +// restoreSnapshotStateFromToken handles the EKVT-token Phase-2 branch +// (PR #910). Split out of restoreSnapshotState so the parent stays +// under the nestif limit. Returns the effective applied index: +// - skip path: `have` (FSM is already past snapshot.Metadata.Index) +// - execute path: snapshot.Metadata.Index (restored from snapshot) +// +// The skip threshold is lastWalIndex (NOT tok.Index): the FSM must be +// at least as fresh as the last WAL entry the cold-start replay would +// deliver, otherwise entries between tok.Index and have would re-apply +// onto a Pebble store that already contains them. Codex P1 #934. +// +// Metrics + log fire AFTER the restore-side work succeeds (coderabbit +// Major #934): a header/CRC failure must not register a "successful" +// outcome in the soak metrics. +func restoreSnapshotStateFromToken(fsm StateMachine, snapshot raftpb.Snapshot, lastWalIndex uint64, fsmSnapDir string, obs raftengine.ColdStartObserver, logger *zap.Logger) (uint64, error) { + tok, err := decodeSnapshotToken(snapshot.Data) + if err != nil { + return 0, err + } + decision, have := decideSkipOutcome(fsm, lastWalIndex) + snapPath := fsmSnapPath(fsmSnapDir, tok.Index) + if decision == coldStartSkip { + if err := applyHeaderStateOnSkip(fsm, snapPath, tok.CRC32C); err != nil { + return 0, err + } + reportColdStart(obs, logger, decision, tok.Index, lastWalIndex, have) + return have, nil + } + if err := openAndRestoreFSMSnapshot(fsm, snapPath, tok.CRC32C); err != nil { + return 0, err + } + reportColdStart(obs, logger, decision, tok.Index, lastWalIndex, have) + return snapshot.Metadata.Index, nil +} + +// coldStartDecision enumerates the three outcomes the skip gate +// distinguishes. Used together with ColdStartObserver labels to +// keep the metrics + log emitter centralised. +type coldStartDecision int + +const ( + coldStartSkip coldStartDecision = iota + coldStartExecute + coldStartFallbackNotReader + coldStartFallbackMissingMeta + coldStartFallbackReadErr +) + +func (d coldStartDecision) fallbackReason() string { + switch d { //nolint:exhaustive // skip / execute return "" via default + case coldStartFallbackNotReader: + return "not_reader" + case coldStartFallbackMissingMeta: + return "missing_meta" + case coldStartFallbackReadErr: + return "read_err" + default: + return "" + } +} + +// decideSkipOutcome reads the FSM's durable applied index and +// classifies into one of the five outcomes. Returns (decision, +// haveIndex). haveIndex is meaningful only for skip / execute +// outcomes; the three fallback outcomes leave it at 0 because the +// store could not authoritatively report a value. +func decideSkipOutcome(fsm StateMachine, want uint64) (coldStartDecision, uint64) { + r, ok := fsm.(raftengine.AppliedIndexReader) + if !ok { + return coldStartFallbackNotReader, 0 + } + have, present, err := r.LastAppliedIndex() + switch { + case err != nil: + return coldStartFallbackReadErr, 0 + case !present: + return coldStartFallbackMissingMeta, 0 + case have < want: + return coldStartExecute, have + default: + return coldStartSkip, have + } +} + +// reportColdStart dispatches the outcome to the observer + the +// engine logger. nil observer / nil logger no-op individually. +func reportColdStart(obs raftengine.ColdStartObserver, logger *zap.Logger, d coldStartDecision, snapIndex, target, have uint64) { + switch d { //nolint:exhaustive // default groups the three fallback variants + case coldStartSkip: + // Observer contract (cold_start.go + monitoring/cold_start.go + // Prometheus impl): args are (snapshotIndex, haveAppliedIndex); + // gauges compute have-snapIndex. Codex P2 + coderabbit Major + // #934: do NOT pass target/lastWalIndex here or the exported + // gauge measures the wrong baseline. + if obs != nil { + obs.RestoreSkipped(snapIndex, have) + } + if logger != nil { + // Two named gap fields so an operator correlating the + // log against the Prometheus gauge sees consistent + // magnitudes (claude #934 round 5): + // - gap_ahead_snapshot mirrors monitoring.ColdStartObserver + // (have - snapIndex), the metric baseline. + // - gap_ahead_committed measures how far past the WAL + // committed tail (target) the FSM is. + logger.Info("restoreSnapshotState skipped", + zap.Uint64("fsm_applied", have), + zap.Uint64("snapshot_index", snapIndex), + zap.Uint64("last_committed_index", target), + zap.Uint64("gap_ahead_snapshot", have-snapIndex), + zap.Uint64("gap_ahead_committed", have-target), + ) + } + case coldStartExecute: + reportColdStartExecute(obs, logger, snapIndex, target, have) + default: + // Fallback variants: the strictly-additive policy. We could + // not even attempt the skip; the full restore runs. + reason := d.fallbackReason() + if obs != nil { + obs.RestoreFallback(snapIndex, reason) + } + if logger != nil { + logger.Info("restoreSnapshotState fallback to full restore", + zap.Uint64("snapshot_index", snapIndex), + zap.String("reason", reason), + ) + } + } +} + +// applyHeaderStateOnSkip mirrors openAndRestoreFSMSnapshot's safety +// contract (size + footer-vs-tokenCRC + full-body-CRC) but applies +// only the header side-effects (HLC ceiling + Stage 8a cutover) +// instead of running the body restore. The body bytes are read for +// CRC coverage but discarded -- fsm.db already holds equivalent +// state, which is precisely the reason we're skipping the restore. +// +// FSMs that do not implement raftengine.SnapshotHeaderApplier +// silently no-op the apply phase -- the FSM has no header state to +// carry forward, and the CRC verification still runs (with no +// observable side-effect on success). On any verification failure +// the typed error propagates and FSM state stays untouched. +// +// See PR #910 design §5 round-7 (two-phase seam) + round-6 +// (three-step CRC mirroring openAndRestoreFSMSnapshot). +func applyHeaderStateOnSkip(fsm StateMachine, snapPath string, tokenCRC uint32) error { + file, err := os.Open(snapPath) + if err != nil { + return statFSMFileError(err) + } + defer func() { _ = file.Close() }() + + info, err := file.Stat() + if err != nil { + return errors.WithStack(err) + } + footer, err := verifyFSMSnapshotPrefix(file, info.Size(), snapPath, tokenCRC) + if err != nil { + return err + } + + // Step 3: full-body CRC. Wrap the payload in a crc32 TeeReader + // and hand it to the FSM's ParseSnapshotHeader for header parse + // + drain. Every payload byte flows through h, matching + // restoreAndComputeCRC's boundary in openAndRestoreFSMSnapshot. + // + // Error-ordering contract (claude #934 R1-F1): header parse + // errors surface BEFORE the body-CRC compare runs, so callers + // (the skip-gate fallback in restoreSnapshotState) may observe + // either an ErrSnapshotHeaderUnknownMagic / InvalidLength chain or + // an ErrFSMSnapshotFileCRC chain depending on which check fails + // first. This is the same ordering openAndRestoreFSMSnapshot has + // — both errors are equally fatal for the skip path (they signal + // snapshot file corruption) and both must propagate without ever + // calling ApplySnapshotHeader. The CRC check stays AFTER the + // header parse so the TeeReader has actually been drained before + // we read h.Sum32(); inverting the order would let a CRC mismatch + // surface on a truncated body even when the header was valid, + // muddying the operator-facing diagnostic. + if _, err := file.Seek(0, io.SeekStart); err != nil { + return errors.WithStack(err) + } + payloadSize := info.Size() - fsmFooterSize + h := crc32.New(crc32cTable) + tee := io.TeeReader(io.LimitReader(file, payloadSize), h) + + setter, hasSetter := fsm.(raftengine.SnapshotHeaderApplier) + ceiling, cutover, err := readSnapshotHeaderOrDrain(setter, hasSetter, tee) + if err != nil { + return err + } + + if h.Sum32() != footer { + return errors.Wrapf(ErrFSMSnapshotFileCRC, + "path=%s footer=%08x computed=%08x", snapPath, footer, h.Sum32()) + } + + // All three checks passed; apply side-effects (pure assignment + // in the FSM). Skipped silently when the FSM does not expose + // the seam. + if hasSetter { + setter.ApplySnapshotHeader(ceiling, cutover) + } + return nil +} + +// verifyFSMSnapshotPrefix runs the first two cheap checks of +// openAndRestoreFSMSnapshot's three-step contract: size and +// footer-vs-tokenCRC. Returns the on-disk footer value (caller +// reuses it for the step-3 full-body CRC compare). Typed errors +// surface unchanged. +func verifyFSMSnapshotPrefix(file *os.File, fileSize int64, snapPath string, tokenCRC uint32) (uint32, error) { + if fileSize < fsmMinFileSize { + return 0, errors.Wrapf(ErrFSMSnapshotTooSmall, + "file too small: %d bytes (minimum %d)", fileSize, fsmMinFileSize) + } + footer, err := readFSMFooter(file, fileSize) + if err != nil { + return 0, err + } + if footer != tokenCRC { + return 0, errors.Wrapf(ErrFSMSnapshotTokenCRC, + "path=%s footer=%08x token=%08x", snapPath, footer, tokenCRC) + } + return footer, nil +} + +// readSnapshotHeaderOrDrain branches on whether the FSM exposes the +// SnapshotHeaderApplier seam: when present, delegate to +// ParseSnapshotHeader (which parses the header AND drains the rest); +// otherwise drain the entire payload through the tee'd reader so the +// CRC pass covers every byte. The (ceiling, cutover) tuple is zero +// in the no-seam case -- the caller's ApplySnapshotHeader branch +// short-circuits on hasSetter, so the zero values are inert. +func readSnapshotHeaderOrDrain(setter raftengine.SnapshotHeaderApplier, hasSetter bool, tee io.Reader) (uint64, uint64, error) { + if hasSetter { + ceiling, cutover, err := setter.ParseSnapshotHeader(tee) if err != nil { - return err + return 0, 0, errors.WithStack(err) } - return openAndRestoreFSMSnapshot(fsm, fsmSnapPath(fsmSnapDir, tok.Index), tok.CRC32C) + return ceiling, cutover, nil } - // Legacy format: full FSM payload embedded in snapshot.Data. - return errors.WithStack(fsm.Restore(bytes.NewReader(snapshot.Data))) + if _, err := io.Copy(io.Discard, tee); err != nil { + return 0, 0, errors.WithStack(err) + } + return 0, 0, nil } func walSnapshotFor(snapshot raftpb.Snapshot) walpb.Snapshot { @@ -310,7 +670,12 @@ func persistBootState(logger *zap.Logger, walDir, snapDir, fsmSnapDir string, fs return nil, err } if wal.Exist(walDir) { - return loadWalState(logger, walDir, snapDir, fsmSnapDir, fsm) + // Recursive load after bootstrap-style setup: no observer + // needed because the engine has not handed one to us + // (bootstrap path runs before OpenConfig wiring reaches + // this point) and a fresh-bootstrap restore will be a no-op + // anyway (the WAL was just created). + return loadWalState(logger, walDir, snapDir, fsmSnapDir, fsm, nil) } w, err := wal.Create(logger, walDir, nil) diff --git a/internal/raftengine/etcd/wal_store_skip_gate_test.go b/internal/raftengine/etcd/wal_store_skip_gate_test.go new file mode 100644 index 00000000..8a94aef4 --- /dev/null +++ b/internal/raftengine/etcd/wal_store_skip_gate_test.go @@ -0,0 +1,466 @@ +package etcd + +import ( + "bytes" + "encoding/binary" + "io" + "os" + "sync" + "testing" + + "github.com/bootjp/elastickv/kv" + "github.com/bootjp/elastickv/store" + "github.com/stretchr/testify/require" + raftpb "go.etcd.io/raft/v3/raftpb" +) + +// --- test doubles --- + +// skipGateFSM is a state machine that satisfies StateMachine, +// AppliedIndexReader, and SnapshotHeaderApplier — the three +// interfaces the skip gate consults. The applied field controls +// the gate's decision; the header* fields capture observed +// ApplySnapshotHeader calls so assertions can verify the header +// preservation contract. +type skipGateFSM struct { + applied uint64 + appliedPresent bool + appliedErr error + parseErr error + restoredHeader bool + parsedCeiling uint64 + appliedCeiling uint64 + appliedCutover uint64 + bodyBytes []byte +} + +func (f *skipGateFSM) Apply(_ []byte) any { return nil } +func (f *skipGateFSM) Snapshot() (Snapshot, error) { return nil, io.EOF } +func (f *skipGateFSM) Restore(r io.Reader) error { + data, err := io.ReadAll(r) + if err != nil { + return err + } + f.bodyBytes = data + return nil +} + +func (f *skipGateFSM) LastAppliedIndex() (uint64, bool, error) { + return f.applied, f.appliedPresent, f.appliedErr +} + +func (f *skipGateFSM) ParseSnapshotHeader(r io.Reader) (uint64, uint64, error) { + if f.parseErr != nil { + return 0, 0, f.parseErr + } + // Mimic the real kvFSM contract: parse + drain. We don't actually + // parse a header here; the test fixtures embed magic+ceiling but + // for the gate-level tests we just drain so the CRC matches. + hdrLen := 16 + hdr := make([]byte, hdrLen) + if n, _ := io.ReadFull(r, hdr); n == hdrLen && bytes.HasPrefix(hdr, []byte("EKVTHLC1")) { + f.parsedCeiling = binary.BigEndian.Uint64(hdr[8:16]) + } + if _, err := io.Copy(io.Discard, r); err != nil { + return 0, 0, err + } + return f.parsedCeiling, 0, nil +} + +func (f *skipGateFSM) ApplySnapshotHeader(ceiling, cutover uint64) { + f.restoredHeader = true + f.appliedCeiling = ceiling + f.appliedCutover = cutover +} + +// recordingObs is a ColdStartObserver test double that records every +// callback for later assertion. +type recordingObs struct { + mu sync.Mutex + skipped []uint64 // gap values reported via RestoreSkipped + executed []uint64 // gap values reported via RestoreExecuted + fallbacks []string // reasons reported via RestoreFallback +} + +func (o *recordingObs) RestoreSkipped(snapIndex, have uint64) { + o.mu.Lock() + defer o.mu.Unlock() + o.skipped = append(o.skipped, have-snapIndex) +} + +func (o *recordingObs) RestoreExecuted(snapIndex, have uint64) { + o.mu.Lock() + defer o.mu.Unlock() + // Mirror the production monitoring.ColdStartObserver semantics: + // the executed gauge stores |snapIndex - have| because the gate + // shifted to comparing against the committed-tail (codex P2 #934 + // round 3) and the FSM-ahead-of-snapshot case must not underflow + // into ~2^64. Claude #934 round 5 caught the test double drift. + var gap uint64 + if have >= snapIndex { + gap = have - snapIndex + } else { + gap = snapIndex - have + } + o.executed = append(o.executed, gap) +} + +func (o *recordingObs) RestoreFallback(_ uint64, reason string) { + o.mu.Lock() + defer o.mu.Unlock() + o.fallbacks = append(o.fallbacks, reason) +} + +// --- skip-gate tests --- + +// TestSkipGate_SkipsWhenFSMFreshEnough verifies the happy path: +// FSM applied >= snap.Index → skip taken → openAndRestoreFSMSnapshot +// is NOT called (FSM body stays empty), but ApplySnapshotHeader IS +// called with the parsed ceiling. +func TestSkipGate_SkipsWhenFSMFreshEnough(t *testing.T) { + dir := t.TempDir() + const ( + ceilingMs uint64 = 1700_000_000_000 + snapIndex uint64 = 100 + appliedIdx uint64 = 200 + ) + // Write a fake .fsm with a v1 header (16 bytes: EKVTHLC1 + ceilingMs BE). + payload := make([]byte, 16) + copy(payload[:8], "EKVTHLC1") + binary.BigEndian.PutUint64(payload[8:], ceilingMs) + crc, _ := writeFSMFileForTest(t, dir, snapIndex, payload) + + fsm := &skipGateFSM{applied: appliedIdx, appliedPresent: true} + tok := snapshotToken{Index: snapIndex, CRC32C: crc} + snap := raftpb.Snapshot{ + Data: encodeSnapshotToken(tok.Index, tok.CRC32C), + Metadata: raftpb.SnapshotMetadata{Index: snapIndex}, + } + obs := &recordingObs{} + _, gateErr := restoreSnapshotState(fsm, snap, snap.Metadata.Index, dir, obs, nil) + require.NoError(t, gateErr) + + require.Empty(t, fsm.bodyBytes, "skip path MUST NOT call fsm.Restore") + require.True(t, fsm.restoredHeader, "skip path MUST call ApplySnapshotHeader") + require.Equal(t, ceilingMs, fsm.appliedCeiling, "ceiling MUST survive the skip") + require.Equal(t, []uint64{appliedIdx - snapIndex}, obs.skipped, "observer MUST record one skip with the gap-ahead value") + require.Empty(t, obs.executed) + require.Empty(t, obs.fallbacks) +} + +// TestSkipGate_ExecutesWhenFSMStale verifies that when applied < +// snap.Index, the gate does NOT skip — full restore runs, observer +// records the executed outcome with gap-behind, header is restored +// via fsm.Restore (which the fake captures as bodyBytes). +func TestSkipGate_ExecutesWhenFSMStale(t *testing.T) { + dir := t.TempDir() + const ( + snapIndex uint64 = 200 + appliedIdx uint64 = 100 + ) + payload := []byte("fsm-body-bytes") + crc, _ := writeFSMFileForTest(t, dir, snapIndex, payload) + + fsm := &skipGateFSM{applied: appliedIdx, appliedPresent: true} + snap := raftpb.Snapshot{ + Data: encodeSnapshotToken(snapIndex, crc), + Metadata: raftpb.SnapshotMetadata{Index: snapIndex}, + } + obs := &recordingObs{} + _, gateErr := restoreSnapshotState(fsm, snap, snap.Metadata.Index, dir, obs, nil) + require.NoError(t, gateErr) + + require.Equal(t, payload, fsm.bodyBytes, "executed path MUST call fsm.Restore with full payload") + require.False(t, fsm.restoredHeader, "ApplySnapshotHeader MUST NOT fire on the executed path") + require.Empty(t, obs.skipped) + require.Equal(t, []uint64{snapIndex - appliedIdx}, obs.executed) + require.Empty(t, obs.fallbacks) +} + +// TestSkipGate_ReturnsEffectiveAppliedOnSkip pins codex P1 #934 +// round 2. The skip path MUST return `have` so Engine.Open can seed +// e.applied above snapshot.Index, preventing applyCommitted from +// re-delivering the snapshot..have tail. +func TestSkipGate_ReturnsEffectiveAppliedOnSkip(t *testing.T) { + dir := t.TempDir() + const ( + ceilingMs uint64 = 1700_000_000_000 + snapIndex uint64 = 100 + appliedIdx uint64 = 200 + ) + payload := make([]byte, 16) + copy(payload[:8], "EKVTHLC1") + binary.BigEndian.PutUint64(payload[8:], ceilingMs) + crc, _ := writeFSMFileForTest(t, dir, snapIndex, payload) + + fsm := &skipGateFSM{applied: appliedIdx, appliedPresent: true} + snap := raftpb.Snapshot{ + Data: encodeSnapshotToken(snapIndex, crc), + Metadata: raftpb.SnapshotMetadata{Index: snapIndex}, + } + effective, err := restoreSnapshotState(fsm, snap, snap.Metadata.Index, dir, nil, nil) + require.NoError(t, err) + require.Equal(t, appliedIdx, effective, + "skip path MUST return the FSM's durable applied index so Engine.Open seeds e.applied above snapshot.Index") +} + +// TestSkipGate_EmitsAfterSuccess pins coderabbit Major #934 (emit- +// after-success). reportColdStart must run AFTER the +// applyHeaderStateOnSkip / openAndRestoreFSMSnapshot path completes, +// not before — otherwise a CRC/header failure would still register +// a successful skip/execute outcome in the soak metrics. +func TestSkipGate_EmitsAfterSuccess(t *testing.T) { + dir := t.TempDir() + const snapIndex uint64 = 100 + // Inject a CRC that won't match the on-disk footer so + // applyHeaderStateOnSkip fails. + crc, _ := writeFSMFileForTest(t, dir, snapIndex, []byte("body")) + wrongCRC := crc ^ 0xFFFFFFFF + + fsm := &skipGateFSM{applied: snapIndex + 50, appliedPresent: true} + snap := raftpb.Snapshot{ + Data: encodeSnapshotToken(snapIndex, wrongCRC), + Metadata: raftpb.SnapshotMetadata{Index: snapIndex}, + } + obs := &recordingObs{} + _, err := restoreSnapshotState(fsm, snap, snap.Metadata.Index, dir, obs, nil) + require.Error(t, err, "CRC mismatch must surface") + require.Empty(t, obs.skipped, "skip metric MUST NOT fire when applyHeaderStateOnSkip errors") + require.Empty(t, obs.executed, "execute metric MUST NOT fire either") + require.Empty(t, obs.fallbacks) +} + +// TestColdStartSkipThreshold pins codex P2 #934 round 3. The +// threshold caps at hardState.Commit so a follower carrying an +// uncommitted WAL suffix is NOT forced to run the full restore +// every restart (the original gate used the WAL tail, which can +// exceed Commit, and raft would not deliver those entries until +// the leader confirmed them). +func TestColdStartSkipThreshold(t *testing.T) { + t.Parallel() + mkSnap := func(idx uint64) raftpb.Snapshot { + return raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: idx}} + } + cases := []struct { + name string + snap raftpb.Snapshot + hs raftpb.HardState + expected uint64 + }{ + {"hardState.Commit equals snapshot (fresh follower)", mkSnap(100), raftpb.HardState{Commit: 100}, 100}, + {"hardState.Commit below snapshot (pre-replay)", mkSnap(100), raftpb.HardState{Commit: 50}, 100}, + {"hardState.Commit above snapshot (post-replay)", mkSnap(100), raftpb.HardState{Commit: 150}, 150}, + {"hardState.Commit zero", mkSnap(100), raftpb.HardState{Commit: 0}, 100}, + } + for _, c := range cases { + got := coldStartSkipThreshold(c.snap, c.hs) + if got != c.expected { + t.Errorf("%s: got %d, want %d", c.name, got, c.expected) + } + } +} + +// TestSkipGate_ExecutesWhenWALCarriesPostSnapshotEntries pins +// codex P1 #934. When the FSM is past tok.Index but the WAL still +// carries entries tok.Index+1 .. have (the normal interval between +// snapshots — metaAppliedIndex advances on each Apply), the skip +// path MUST NOT fire even though have > tok.Index. Those WAL +// entries would re-apply onto a Pebble store that already contains +// them, hitting OCC conflicts and leaving the HLC below timestamps +// already on disk. +// +// Fixture: snap.Index=100, fsm.applied=150, lastWalIndex=150 (the +// WAL has entries 101..150 mirroring the applied tail). Gate +// criterion is have >= lastWalIndex, which holds; that's the +// happy-skip case. To exercise the bug, set lastWalIndex=200 (the +// WAL still has entries 151..200 that have NOT been applied yet); +// have=150 < lastWalIndex=200 must trigger execute, not skip. +func TestSkipGate_ExecutesWhenWALCarriesPostSnapshotEntries(t *testing.T) { + dir := t.TempDir() + const ( + snapIndex uint64 = 100 + appliedIdx uint64 = 150 + lastWalIndex uint64 = 200 + ) + payload := []byte("body-bytes-for-execute") + crc, _ := writeFSMFileForTest(t, dir, snapIndex, payload) + + fsm := &skipGateFSM{applied: appliedIdx, appliedPresent: true} + snap := raftpb.Snapshot{ + Data: encodeSnapshotToken(snapIndex, crc), + Metadata: raftpb.SnapshotMetadata{Index: snapIndex}, + } + obs := &recordingObs{} + _, gateErr := restoreSnapshotState(fsm, snap, lastWalIndex, dir, obs, nil) + require.NoError(t, gateErr) + + require.Equal(t, payload, fsm.bodyBytes, + "have(150) < lastWalIndex(200) MUST execute full restore so the WAL replay does not duplicate-apply") + require.False(t, fsm.restoredHeader, "execute path MUST NOT use ApplySnapshotHeader") + require.Empty(t, obs.skipped) + // recordingObs now stores |snapIndex - have| (round-5 fix; mirrors + // monitoring.ColdStartObserver semantics so the FSM-ahead-of- + // snapshot case doesn't underflow). For have(150) > snapIndex(100) + // the absolute gap is 50. + require.Equal(t, []uint64{appliedIdx - snapIndex}, obs.executed, + "observer MUST record the absolute snapshot-relative gap") + require.Empty(t, obs.fallbacks) +} + +// TestSkipGate_FallbackMissingMeta covers the strictly-additive +// fallback when the FSM reports the meta key missing. +func TestSkipGate_FallbackMissingMeta(t *testing.T) { + dir := t.TempDir() + payload := []byte("payload") + crc, _ := writeFSMFileForTest(t, dir, 50, payload) + fsm := &skipGateFSM{appliedPresent: false} // missing + snap := raftpb.Snapshot{ + Data: encodeSnapshotToken(50, crc), + Metadata: raftpb.SnapshotMetadata{Index: 50}, + } + obs := &recordingObs{} + _, gateErr := restoreSnapshotState(fsm, snap, snap.Metadata.Index, dir, obs, nil) + require.NoError(t, gateErr) + require.Equal(t, payload, fsm.bodyBytes, "missing meta MUST fall back to full restore") + require.Equal(t, []string{"missing_meta"}, obs.fallbacks) +} + +// TestSkipGate_FallbackReadErr covers the LastAppliedIndex-error +// path. Engine MUST NOT propagate the error (we collapse to false +// → fallback) — over-restoring is strictly safer. +func TestSkipGate_FallbackReadErr(t *testing.T) { + dir := t.TempDir() + payload := []byte("payload") + crc, _ := writeFSMFileForTest(t, dir, 50, payload) + fsm := &skipGateFSM{appliedErr: io.ErrUnexpectedEOF} + snap := raftpb.Snapshot{ + Data: encodeSnapshotToken(50, crc), + Metadata: raftpb.SnapshotMetadata{Index: 50}, + } + obs := &recordingObs{} + _, gateErr := restoreSnapshotState(fsm, snap, snap.Metadata.Index, dir, obs, nil) + require.NoError(t, gateErr) + require.Equal(t, payload, fsm.bodyBytes, "read_err MUST fall back to full restore") + require.Equal(t, []string{"read_err"}, obs.fallbacks) +} + +// TestSkipGate_FallbackNotReader covers the legacy FSM path: the +// FSM does not implement AppliedIndexReader, so the gate cannot +// even attempt a decision. +func TestSkipGate_FallbackNotReader(t *testing.T) { + dir := t.TempDir() + payload := []byte("payload") + crc, _ := writeFSMFileForTest(t, dir, 50, payload) + fsm := &dummyFSM{} // no LastAppliedIndex method + snap := raftpb.Snapshot{ + Data: encodeSnapshotToken(50, crc), + Metadata: raftpb.SnapshotMetadata{Index: 50}, + } + obs := &recordingObs{} + _, gateErr := restoreSnapshotState(fsm, snap, snap.Metadata.Index, dir, obs, nil) + require.NoError(t, gateErr) + require.NotEmpty(t, fsm.restored, "not_reader MUST fall back to full restore") + require.Equal(t, []string{"not_reader"}, obs.fallbacks) +} + +// --- applyHeaderStateOnSkip CRC failure modes --- + +// TestApplyHeaderStateOnSkip_TruncatedFile asserts the size check +// (step 1) catches an empty file and surfaces ErrFSMSnapshotTooSmall +// WITHOUT mutating FSM state. +func TestApplyHeaderStateOnSkip_TruncatedFile(t *testing.T) { + dir := t.TempDir() + path := fsmSnapPath(dir, 1) + require.NoError(t, os.WriteFile(path, []byte{}, 0o600)) //nolint:mnd + + fsm := &skipGateFSM{} + err := applyHeaderStateOnSkip(fsm, path, 0xDEADBEEF) + require.ErrorIs(t, err, ErrFSMSnapshotTooSmall) + require.False(t, fsm.restoredHeader, "FSM state MUST NOT mutate on verification failure") +} + +// TestApplyHeaderStateOnSkip_WrongTokenCRC asserts step 2 catches a +// footer-vs-token mismatch. +func TestApplyHeaderStateOnSkip_WrongTokenCRC(t *testing.T) { + dir := t.TempDir() + crc, path := writeFSMFileForTest(t, dir, 1, []byte("payload-bytes")) + _ = crc + + fsm := &skipGateFSM{} + err := applyHeaderStateOnSkip(fsm, path, 0xBADC0FFE) + require.ErrorIs(t, err, ErrFSMSnapshotTokenCRC) + require.False(t, fsm.restoredHeader, "FSM state MUST NOT mutate on verification failure") +} + +// TestApplyHeaderStateOnSkip_BodyCorruption asserts step 3 catches a +// flipped body byte (CRC mismatch). +func TestApplyHeaderStateOnSkip_BodyCorruption(t *testing.T) { + dir := t.TempDir() + crc, path := writeFSMFileForTest(t, dir, 1, []byte("payload-bytes")) + + // Flip the first byte of the body in-place. The footer still + // reads as `crc`, but the on-the-wire content no longer matches + // it. Step 2 (footer-vs-token) passes (we pass the same `crc` + // as tokenCRC), step 3 (full-body CRC) fails. + f, err := os.OpenFile(path, os.O_RDWR, 0) + require.NoError(t, err) + defer f.Close() + var b [1]byte + _, err = f.ReadAt(b[:], 0) + require.NoError(t, err) + b[0] ^= 0x01 + _, err = f.WriteAt(b[:], 0) + require.NoError(t, err) + + fsm := &skipGateFSM{} + err = applyHeaderStateOnSkip(fsm, path, crc) + require.ErrorIs(t, err, ErrFSMSnapshotFileCRC) + require.False(t, fsm.restoredHeader, "FSM state MUST NOT mutate on verification failure") +} + +// --- kvFSM header preservation contract --- + +// TestSkipGate_KVFSMHeaderRoundTrip verifies the production kvFSM +// satisfies the SnapshotHeaderApplier contract: ParseSnapshotHeader +// returns the v1 ceiling, ApplySnapshotHeader sets f.hlc and +// f.restoredCutover. The cold-start skip path threads them through. +func TestSkipGate_KVFSMHeaderRoundTrip(t *testing.T) { + const ceilingMs uint64 = 1700_000_000_000 + + // 16-byte v1 header followed by inner-store payload. + const v1HeaderLen = 16 + suffix := []byte("inner-store-bytes-here") + payload := make([]byte, v1HeaderLen, v1HeaderLen+len(suffix)) + copy(payload[:8], "EKVTHLC1") + binary.BigEndian.PutUint64(payload[8:], ceilingMs) + payload = append(payload, suffix...) + + dir := t.TempDir() + crc, path := writeFSMFileForTest(t, dir, 42, payload) + + // We build a real *kv.kvFSM via NewKvFSMWithHLC so the type- + // assert in applyHeaderStateOnSkip sees the production type. + hlc := kv.NewHLC() + st := store.NewMVCCStore() + fsm := kv.NewKvFSMWithHLC(st, hlc) + sm, ok := fsm.(StateMachine) + require.True(t, ok, "kvFSM must satisfy StateMachine") + require.NoError(t, applyHeaderStateOnSkip(sm, path, crc)) + require.Equal(t, int64(ceilingMs), hlc.PhysicalCeiling(), + "applyHeaderStateOnSkip MUST set the HLC ceiling on the production kvFSM") + + // Sanity: the file path is intact (we didn't accidentally delete + // it via a wrong-cleanup); future test runs can re-open if needed. + _, statErr := os.Stat(path) + require.NoError(t, statErr) + + // Also confirm restoredCutover is 0 for v1 (no Stage 8a cutover + // in the header). RestoredCutover is the public accessor. + type cutoverer interface { + RestoredCutover() uint64 + } + cov, ok := fsm.(cutoverer) + require.True(t, ok, "kvFSM must expose RestoredCutover()") + require.Equal(t, uint64(0), cov.RestoredCutover(), + "v1 header MUST result in restoredCutover=0") +} diff --git a/internal/raftengine/etcd/wal_store_test.go b/internal/raftengine/etcd/wal_store_test.go index 01b6a209..71afc632 100644 --- a/internal/raftengine/etcd/wal_store_test.go +++ b/internal/raftengine/etcd/wal_store_test.go @@ -22,7 +22,7 @@ import ( func TestRestoreSnapshotStateEmptySnapshot(t *testing.T) { // An empty (zero-value) snapshot must be a no-op: FSM not touched. fsm := &dummyFSM{} - err := restoreSnapshotState(fsm, raftpb.Snapshot{}, "") + _, err := restoreSnapshotState(fsm, raftpb.Snapshot{}, 0, "", nil, nil) require.NoError(t, err) require.Nil(t, fsm.restored) } @@ -33,7 +33,7 @@ func TestRestoreSnapshotStateNilData(t *testing.T) { Metadata: raftpb.SnapshotMetadata{Index: 5, Term: 1}, } fsm := &dummyFSM{} - err := restoreSnapshotState(fsm, snap, "") + _, err := restoreSnapshotState(fsm, snap, snap.Metadata.Index, "", nil, nil) require.NoError(t, err) require.Nil(t, fsm.restored) } @@ -44,7 +44,7 @@ func TestRestoreSnapshotStateNilFSM(t *testing.T) { Data: []byte("some payload"), Metadata: raftpb.SnapshotMetadata{Index: 1, Term: 1}, } - err := restoreSnapshotState(nil, snap, "") + _, err := restoreSnapshotState(nil, snap, snap.Metadata.Index, "", nil, nil) require.NoError(t, err) } @@ -58,7 +58,7 @@ func TestRestoreSnapshotStateLegacyFormat(t *testing.T) { } fsm := &dummyFSM{} - err := restoreSnapshotState(fsm, snap, "") + _, err := restoreSnapshotState(fsm, snap, snap.Metadata.Index, "", nil, nil) require.NoError(t, err) require.Equal(t, payload, fsm.restored) } @@ -76,7 +76,7 @@ func TestRestoreSnapshotStateTokenFormat(t *testing.T) { } fsm := &dummyFSM{} - err := restoreSnapshotState(fsm, snap, dir) + _, err := restoreSnapshotState(fsm, snap, snap.Metadata.Index, dir, nil, nil) require.NoError(t, err) require.Equal(t, payload, fsm.restored) } @@ -93,7 +93,7 @@ func TestRestoreSnapshotStateTokenEmptyPayload(t *testing.T) { } fsm := &dummyFSM{} - err := restoreSnapshotState(fsm, snap, dir) + _, err := restoreSnapshotState(fsm, snap, snap.Metadata.Index, dir, nil, nil) require.NoError(t, err) require.Equal(t, []byte{}, fsm.restored) } @@ -112,7 +112,7 @@ func TestRestoreSnapshotStateTokenCRCMismatch(t *testing.T) { } fsm := &dummyFSM{} - err := restoreSnapshotState(fsm, snap, dir) + _, err := restoreSnapshotState(fsm, snap, snap.Metadata.Index, dir, nil, nil) require.ErrorIs(t, err, ErrFSMSnapshotTokenCRC) require.Nil(t, fsm.restored, "FSM must not be restored after CRC mismatch") } @@ -129,7 +129,7 @@ func TestRestoreSnapshotStateTokenFileNotFound(t *testing.T) { } fsm := &dummyFSM{} - err := restoreSnapshotState(fsm, snap, dir) + _, err := restoreSnapshotState(fsm, snap, snap.Metadata.Index, dir, nil, nil) require.ErrorIs(t, err, ErrFSMSnapshotNotFound) require.Nil(t, fsm.restored) } diff --git a/internal/raftengine/statemachine.go b/internal/raftengine/statemachine.go index 65b6b45f..626ae3aa 100644 --- a/internal/raftengine/statemachine.go +++ b/internal/raftengine/statemachine.go @@ -90,3 +90,66 @@ type AppliedIndexReader interface { type AppliedIndexWriter interface { SetDurableAppliedIndex(idx uint64) error } + +// SnapshotHeaderApplier is an OPTIONAL extension that lets the +// cold-start skip gate preserve the header state (HLC ceiling, +// Stage 8a cutover) the FSM's Restore would normally apply, without +// running the (multi-GiB) body restore. See +// docs/design/2026_06_02_idempotent_snapshot_restore.md §5. +// +// The interface is two-phase by design: +// +// - ParseSnapshotHeader reads the v1/v2 header from a caller- +// supplied io.Reader (wrapped in a crc32 TeeReader by the +// engine) and drains the remaining bytes so the wrapping CRC +// covers the full payload. It returns the parsed (ceiling, +// cutover) pair WITHOUT mutating FSM state. Errors propagate +// from the underlying header parser +// (ErrSnapshotHeaderUnknownMagic / InvalidLength) or from the +// drain pass (I/O errors); FSM state stays untouched on error. +// +// - ApplySnapshotHeader is pure assignment of the verified header +// state. The engine calls this only after ParseSnapshotHeader +// returned successfully AND the wrapping crc32 hash matched +// the file footer. +// +// Splitting parse from apply lets the CRC verifier stay co-located +// with its private helpers in internal/raftengine/etcd (matching +// the openAndRestoreFSMSnapshot safety contract) while the v1/v2 +// header parser stays inside the kv package where it already lives. +// Neither package imports the other in production. +type SnapshotHeaderApplier interface { + ParseSnapshotHeader(r io.Reader) (ceiling, cutover uint64, err error) + ApplySnapshotHeader(ceiling, cutover uint64) +} + +// VolatileEntryClassifier is an OPTIONAL extension that lets the +// cold-start skip path distinguish data-mutating entries (whose +// effects are durably carried by `metaAppliedIndex` and must NOT be +// re-applied) from volatile-only entries (whose effects exist purely +// in process memory and MUST be re-applied on every cold start to +// reconstruct the in-memory state). +// +// Concrete case: HLC lease entries (kv.raftEncodeHLCLease, tag 0x02) +// only call `HLC.SetPhysicalCeiling`, which is monotonic and lives in +// memory. After the skip gate fires the WAL committed tail still +// carries those leases; if the engine's idempotency guard drops them +// alongside KV/MVCC duplicates, the restarted node loses every +// post-snapshot ceiling raise and `ApplySnapshotHeader` only restores +// the older snapshot-time ceiling. The next leader-issued fenced +// timestamp can then collide with persisted commit_ts values that +// were stamped under the larger lease ceiling. Codex P1 #934 round 7. +// +// Implementations classify the cleartext FSM payload (after raft +// envelope decode + decryption, i.e. the same `data` Apply receives) +// — NOT the raw raft entry bytes. Returning true means "re-apply +// this duplicate entry purely for its in-memory effect"; the engine +// will NOT call setApplied or resolveProposal in that case. Returning +// false means "skip this duplicate" (current behavior). FSMs that do +// not implement this interface default to skip-all-duplicates. +// +// IsVolatileOnlyPayload MUST be a pure classification: the same bytes +// must always return the same answer, and no FSM state may change. +type VolatileEntryClassifier interface { + IsVolatileOnlyPayload(payload []byte) bool +} diff --git a/kv/fsm.go b/kv/fsm.go index ec7afc69..75731f57 100644 --- a/kv/fsm.go +++ b/kv/fsm.go @@ -583,6 +583,69 @@ func (f *kvFSM) RestoredCutover() uint64 { return f.restoredCutover } +// ParseSnapshotHeader implements raftengine.SnapshotHeaderApplier +// phase 1 — the cold-start skip path's parse-without-side-effect +// step. The engine has wrapped `r` in a crc32 TeeReader sized at +// the body payload (file size minus 4-byte footer), so every byte +// pulled from `r` flows through the engine's hash. We read the +// v1/v2 header via ReadSnapshotHeader, then drain the rest of the +// body so the wrapping hash covers every payload byte — matching +// restoreAndComputeCRC's behaviour in openAndRestoreFSMSnapshot. +// +// IMPORTANT: this method MUST NOT touch f.hlc or f.restoredCutover. +// The engine calls ApplySnapshotHeader separately, only after the +// wrapping CRC verification passes. Mutating FSM state here would +// defeat the "no side-effect on CRC failure" contract that the +// PR #910 design §5 round-7 split is designed to preserve. +func (f *kvFSM) ParseSnapshotHeader(r io.Reader) (uint64, uint64, error) { + br := bufio.NewReaderSize(r, 1<<20) //nolint:mnd // 1 MiB, local to kv + ceiling, cutover, err := ReadSnapshotHeader(br) + if err != nil { + return 0, 0, errors.WithStack(err) + } + // Drain the remainder so the engine's TeeReader-wrapped CRC + // covers every byte of the body (LimitReader exhaustion + // signals "full payload consumed" to the caller). + if _, err := io.Copy(io.Discard, br); err != nil { + return 0, 0, errors.WithStack(err) + } + return ceiling, cutover, nil +} + +// ApplySnapshotHeader implements raftengine.SnapshotHeaderApplier +// phase 2 — pure assignment of the verified header state. Called +// only after ParseSnapshotHeader returned successfully AND the +// engine's wrapping crc32 hash matched the file footer. Mirrors +// the two side-effects Restore would have applied for the header +// portion (HLC physical ceiling + restoredCutover). See PR #910 +// design §5 round-7. +func (f *kvFSM) ApplySnapshotHeader(ceiling, cutover uint64) { + if f.hlc != nil && ceiling > 0 { + f.hlc.SetPhysicalCeiling(int64(ceiling)) //nolint:gosec // ceiling is a Unix ms timestamp encoded as uint64 + } + f.restoredCutover = cutover +} + +// IsVolatileOnlyPayload satisfies raftengine.VolatileEntryClassifier. +// Returns true iff payload is an HLC lease entry (raftEncodeHLCLease +// tag, 0x02) — those entries only call HLC.SetPhysicalCeiling, which +// is monotonic and lives purely in memory. After the cold-start skip +// gate fires, the engine still delivers WAL committed-tail entries +// past snapshot.Metadata.Index; without this classifier those +// volatile entries get dropped along with KV/MVCC duplicates and the +// post-snapshot ceiling raise is lost. Codex P1 #934 round 7. +// +// Re-applying KV/MVCC entries would re-execute OCC validation against +// store state that has already moved past commit_ts, surfacing +// spurious conflicts. Returning false for any non-HLC payload tag +// preserves that idempotency. Encryption opcodes (0x03..0x07) MUST +// also return false — they persist DEK state in the encryption +// sidecar and re-applying would diverge the sidecar's +// RaftAppliedIndex from the engine's appliedIndex. +func (f *kvFSM) IsVolatileOnlyPayload(payload []byte) bool { + return len(payload) > 0 && payload[0] == raftEncodeHLCLease +} + func (f *kvFSM) handleTxnRequest(ctx context.Context, r *pb.Request, commitTS uint64) error { if err := f.verifyComposed1(r); err != nil { return err diff --git a/kv/fsm_applied_index_iface_check.go b/kv/fsm_applied_index_iface_check.go index 1447c919..14d522fe 100644 --- a/kv/fsm_applied_index_iface_check.go +++ b/kv/fsm_applied_index_iface_check.go @@ -7,3 +7,20 @@ import "github.com/bootjp/elastickv/internal/raftengine" // type-assertion (Branch 3) succeeds. var _ raftengine.AppliedIndexReader = (*kvFSM)(nil) var _ raftengine.AppliedIndexWriter = (*kvFSM)(nil) + +// Branch 3: kvFSM must directly implement +// raftengine.SnapshotHeaderApplier so applyHeaderStateOnSkip in +// wal_store.go can deliver the snapshot header state (HLC ceiling + +// Stage 8a cutover) to the FSM without running the multi-GiB body +// restore. A future rename or signature drift fails the build +// immediately rather than silently degrading the skip optimisation. +var _ raftengine.SnapshotHeaderApplier = (*kvFSM)(nil) + +// Codex P1 #934 round 7: kvFSM must directly implement +// raftengine.VolatileEntryClassifier so the engine's cold-start +// duplicate-entry guard can distinguish HLC lease entries (volatile, +// must replay) from KV/MVCC duplicates (idempotency-violating, must +// skip). A future rename of IsVolatileOnlyPayload or accidental +// removal of the raftEncodeHLCLease classification would otherwise +// silently re-introduce the post-snapshot lease replay loss. +var _ raftengine.VolatileEntryClassifier = (*kvFSM)(nil) diff --git a/main.go b/main.go index da9e79d2..ca6bbb70 100644 --- a/main.go +++ b/main.go @@ -50,15 +50,16 @@ const ( etcdMaxInflightMsg = 256 ) -func newRaftFactory(engineType raftEngineType) (raftengine.Factory, error) { +func newRaftFactory(engineType raftEngineType, coldStartObs raftengine.ColdStartObserver) (raftengine.Factory, error) { switch engineType { case raftEngineEtcd: return etcdraftengine.NewFactory(etcdraftengine.FactoryConfig{ - TickInterval: etcdTickInterval, - HeartbeatTick: durationToTicks(heartbeatTimeout, etcdTickInterval, etcdHeartbeatMinTicks), - ElectionTick: durationToTicks(electionTimeout, etcdTickInterval, etcdElectionMinTicks), - MaxSizePerMsg: etcdMaxSizePerMsg, - MaxInflightMsg: etcdMaxInflightMsg, + TickInterval: etcdTickInterval, + HeartbeatTick: durationToTicks(heartbeatTimeout, etcdTickInterval, etcdHeartbeatMinTicks), + ElectionTick: durationToTicks(electionTimeout, etcdTickInterval, etcdElectionMinTicks), + MaxSizePerMsg: etcdMaxSizePerMsg, + MaxInflightMsg: etcdMaxInflightMsg, + ColdStartObserver: coldStartObs, }), nil default: return nil, errors.Wrapf(ErrUnsupportedRaftEngine, "%q", engineType) @@ -317,15 +318,21 @@ func run() error { return err } - factory, err := newRaftFactory(engineType) - if err != nil { - return err - } - var lc net.ListenConfig metricsRegistry := monitoring.NewRegistry(*raftId, *myAddr) + // Factory needs the cold-start observer from the registry so the + // engine's restoreSnapshotState path can emit + // elastickv_fsm_cold_start_restore_total / _applied_index_gap + // (PR #934 round-1 codex P2 closed this plumbing gap — the + // observer was previously unused because Factory.Create did not + // carry it through to OpenConfig). + factory, err := newRaftFactory(engineType, metricsRegistry.ColdStartObserver()) + if err != nil { + return err + } + // Create the shared HLC before building shard groups so every FSM can update // physicalCeiling when HLC lease entries are applied to the Raft log. clock := kv.NewHLC() diff --git a/main_bootstrap_e2e_test.go b/main_bootstrap_e2e_test.go index 80f5edc9..9fa37947 100644 --- a/main_bootstrap_e2e_test.go +++ b/main_bootstrap_e2e_test.go @@ -351,7 +351,7 @@ func startBootstrapE2ENode( } bootstrap = bootstrap || len(bootstrapServers) > 0 - factory, err := newRaftFactory(engineType) + factory, err := newRaftFactory(engineType, nil) if err != nil { return nil, err } diff --git a/main_bootstrap_test.go b/main_bootstrap_test.go index 3a364957..52a35c18 100644 --- a/main_bootstrap_test.go +++ b/main_bootstrap_test.go @@ -91,6 +91,6 @@ func TestDurationToTicks(t *testing.T) { func TestNewRaftFactory_UnsupportedEngine(t *testing.T) { t.Parallel() - _, err := newRaftFactory(raftEngineType("unknown")) + _, err := newRaftFactory(raftEngineType("unknown"), nil) require.ErrorIs(t, err, ErrUnsupportedRaftEngine) } diff --git a/monitoring/cold_start.go b/monitoring/cold_start.go new file mode 100644 index 00000000..06b6eb88 --- /dev/null +++ b/monitoring/cold_start.go @@ -0,0 +1,107 @@ +package monitoring + +import "github.com/prometheus/client_golang/prometheus" + +// ColdStartMetrics exposes the cold-start snapshot-restore skip +// gate's three outcomes as Prometheus series. The skip gate fires +// at most once per process lifetime (on cold start) so these are +// low-cardinality, low-rate signals — counters rather than +// histograms. Operators alert when the skip rate falls below a +// soak threshold (design target: ≥ 90% in steady state) or when +// fallback_reason rises unexpectedly. +// +// See docs/design/2026_06_02_idempotent_snapshot_restore.md §9. +type ColdStartMetrics struct { + // restoreTotal is the per-outcome counter for the three skip- + // gate outcomes. Labels: + // outcome=skipped — the gate fired (live store fresh enough) + // outcome=executed — gate did NOT fire (FSM genuinely stale) + // outcome=fallback — strictly-additive fallback (FSM doesn't + // opt in, meta key missing, or LastAppliedIndex errored) + // fallback_reason is empty for skipped/executed and one of + // not_reader / missing_meta / read_err for fallback. + restoreTotal *prometheus.CounterVec + // appliedIndexGap is the absolute distance between the + // snapshot pointer and the FSM's reported applied index at the + // moment of the decision. Emitted with the same outcome label + // so dashboards can plot "how far ahead were we when we + // skipped" alongside "how far behind were we when we + // restored". + appliedIndexGap *prometheus.GaugeVec +} + +func newColdStartMetrics(registerer prometheus.Registerer) *ColdStartMetrics { + m := &ColdStartMetrics{ + restoreTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "elastickv_fsm_cold_start_restore_total", + Help: "Cumulative count of cold-start snapshot-restore skip-gate outcomes. outcome=skipped means the gate fired (LastAppliedIndex >= snapshot.Index); outcome=executed means the FSM was genuinely stale; outcome=fallback means the strictly-additive fallback path ran (FSM did not opt in, meta key missing, or read error). fallback_reason is empty for skipped/executed.", + }, []string{"outcome", "fallback_reason"}), + appliedIndexGap: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "elastickv_fsm_cold_start_applied_index_gap", + Help: "Absolute distance between the snapshot pointer and the FSM's reported applied index at cold start, in Raft log entries. outcome=skipped reports how far ahead the live store was; outcome=executed reports how far behind it was.", + }, []string{"outcome"}), + } + if registerer != nil { + registerer.MustRegister(m.restoreTotal) + registerer.MustRegister(m.appliedIndexGap) + } + return m +} + +// ColdStartObserver is the monitoring-side implementation of +// raftengine.ColdStartObserver. Holds nothing but the metrics +// handle; safe to share across goroutines (Prometheus collectors +// are concurrency-safe). +type ColdStartObserver struct { + metrics *ColdStartMetrics +} + +func newColdStartObserver(metrics *ColdStartMetrics) *ColdStartObserver { + return &ColdStartObserver{metrics: metrics} +} + +// RestoreSkipped records a successful skip. gap = have - snapIndex +// (positive: how far ahead the live store was). +func (o *ColdStartObserver) RestoreSkipped(snapIndex, have uint64) { + if o == nil || o.metrics == nil { + return + } + o.metrics.restoreTotal.WithLabelValues("skipped", "").Inc() + o.metrics.appliedIndexGap.WithLabelValues("skipped").Set(float64(have - snapIndex)) +} + +// RestoreExecuted records a full restore that ran. gap is the +// absolute distance |snapIndex - have| in entry indices. +// +// The skip-gate threshold is no longer snap.Metadata.Index — it +// shifted to the WAL committed-tail (codex P1 #934). An executed +// restore therefore no longer implies have < snapIndex: a crashed +// follower can have the FSM ahead of the snapshot pointer but +// behind the WAL committed tail. Using snapIndex - have directly +// would underflow into ~2^64 in that case and corrupt the gauge. +// Codex P2 #934 round 3. +func (o *ColdStartObserver) RestoreExecuted(snapIndex, have uint64) { + if o == nil || o.metrics == nil { + return + } + var gap uint64 + if have >= snapIndex { + gap = have - snapIndex + } else { + gap = snapIndex - have + } + o.metrics.restoreTotal.WithLabelValues("executed", "").Inc() + o.metrics.appliedIndexGap.WithLabelValues("executed").Set(float64(gap)) +} + +// RestoreFallback records the strictly-additive fallback path. +// reason is a stable enum the engine supplies (not_reader / +// missing_meta / read_err). No gap is reported — the store could +// not authoritatively name a value. +func (o *ColdStartObserver) RestoreFallback(snapIndex uint64, reason string) { + _ = snapIndex + if o == nil || o.metrics == nil { + return + } + o.metrics.restoreTotal.WithLabelValues("fallback", reason).Inc() +} diff --git a/monitoring/cold_start_test.go b/monitoring/cold_start_test.go new file mode 100644 index 00000000..28930bfa --- /dev/null +++ b/monitoring/cold_start_test.go @@ -0,0 +1,102 @@ +package monitoring + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" +) + +// TestColdStartObserver_ExecutedAbsoluteGap pins codex P2 #934 +// round 3. The skip-gate threshold shifted to the WAL committed +// tail, so an executed restore can now fire when have >= snapIndex +// (FSM ahead of snapshot but behind the committed tail after a +// crash). The previous `snapIndex - have` subtraction underflowed +// into ~2^64 in that case. Verify the gauge stores the absolute +// distance. +func TestColdStartObserver_ExecutedAbsoluteGap(t *testing.T) { + t.Parallel() + cases := []struct { + name string + snapIndex uint64 + have uint64 + wantGap float64 + }{ + {"FSM behind snapshot (legacy executed case)", 200, 100, 100}, + {"FSM ahead of snapshot but behind WAL tail (new case)", 100, 150, 50}, + {"FSM equal to snapshot", 100, 100, 0}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + t.Parallel() + reg := prometheus.NewRegistry() + metrics := newColdStartMetrics(reg) + obs := newColdStartObserver(metrics) + + obs.RestoreExecuted(c.snapIndex, c.have) + + got := testutil.ToFloat64(metrics.appliedIndexGap.WithLabelValues("executed")) + if got != c.wantGap { + t.Errorf("appliedIndexGap[executed] = %v, want %v", got, c.wantGap) + } + // Counter must increment once. + if cnt := testutil.ToFloat64(metrics.restoreTotal.WithLabelValues("executed", "")); cnt != 1 { + t.Errorf("restoreTotal[executed,\"\"] = %v, want 1", cnt) + } + }) + } +} + +// TestColdStartObserver_SkippedGap pins the skipped-path gauge: +// gap = have - snapIndex (always >= 0 on the skip path; the gate +// guarantees have >= threshold >= snapIndex). +func TestColdStartObserver_SkippedGap(t *testing.T) { + t.Parallel() + reg := prometheus.NewRegistry() + metrics := newColdStartMetrics(reg) + obs := newColdStartObserver(metrics) + + obs.RestoreSkipped(100, 250) + + got := testutil.ToFloat64(metrics.appliedIndexGap.WithLabelValues("skipped")) + if got != 150 { + t.Errorf("appliedIndexGap[skipped] = %v, want 150", got) + } +} + +// TestColdStartObserver_NilSafe verifies that a nil observer is a +// no-op (the engine constructs nil observers when monitoring is +// disabled). +func TestColdStartObserver_NilSafe(t *testing.T) { + t.Parallel() + defer func() { + if r := recover(); r != nil { + t.Fatalf("nil observer must not panic, got: %v", r) + } + }() + var obs *ColdStartObserver + obs.RestoreSkipped(1, 2) + obs.RestoreExecuted(1, 2) + obs.RestoreFallback(1, "read_err") +} + +// TestColdStartObserver_FallbackReasonLabel pins the strictly- +// additive fallback path: the counter increments under the +// fallback outcome with the supplied reason label; no gauge value +// is reported. +func TestColdStartObserver_FallbackReasonLabel(t *testing.T) { + t.Parallel() + for _, reason := range []string{"not_reader", "missing_meta", "read_err"} { + t.Run(reason, func(t *testing.T) { + t.Parallel() + reg := prometheus.NewRegistry() + metrics := newColdStartMetrics(reg) + obs := newColdStartObserver(metrics) + + obs.RestoreFallback(100, reason) + if cnt := testutil.ToFloat64(metrics.restoreTotal.WithLabelValues("fallback", reason)); cnt != 1 { + t.Errorf("restoreTotal[fallback,%q] = %v, want 1", reason, cnt) + } + }) + } +} diff --git a/monitoring/registry.go b/monitoring/registry.go index dcef56a9..51f45735 100644 --- a/monitoring/registry.go +++ b/monitoring/registry.go @@ -25,6 +25,8 @@ type Registry struct { sqsObserver *SQSObserver hlc *HLCMetrics hlcObserver *HLCObserver + coldStart *ColdStartMetrics + coldStartObs *ColdStartObserver } // NewRegistry builds a registry with constant labels that identify the local node. @@ -51,9 +53,22 @@ func NewRegistry(nodeID string, nodeAddress string) *Registry { r.sqsObserver = newSQSObserver(r.sqs) r.hlc = newHLCMetrics(registerer) r.hlcObserver = newHLCObserver(r.hlc) + r.coldStart = newColdStartMetrics(registerer) + r.coldStartObs = newColdStartObserver(r.coldStart) return r } +// ColdStartObserver returns the cold-start snapshot-restore observer +// backed by this registry. The engine receives it through +// raftengine/etcd.OpenConfig.ColdStartObserver and calls it on each +// skip-gate outcome (PR #910 design §9). +func (r *Registry) ColdStartObserver() *ColdStartObserver { + if r == nil { + return nil + } + return r.coldStartObs +} + // Handler returns an HTTP handler that exposes the Prometheus scrape endpoint. func (r *Registry) Handler() http.Handler { if r == nil || r.gatherer == nil { diff --git a/multiraft_runtime_test.go b/multiraft_runtime_test.go index 87e07a45..f35f6e2f 100644 --- a/multiraft_runtime_test.go +++ b/multiraft_runtime_test.go @@ -51,7 +51,7 @@ func TestBuildShardGroupsWithEtcdEngineRoutesAcrossGroups(t *testing.T) { {id: 2, address: "127.0.0.1:15002"}, } - factory, err := newRaftFactory(raftEngineEtcd) + factory, err := newRaftFactory(raftEngineEtcd, nil) require.NoError(t, err) clock := kv.NewHLC() runtimes, shardGroups, err := buildShardGroups("n1", baseDir, groups, true, true, nil, factory, nil, clock, nil, nil, "", encryptionWriteWiring{}, nil) @@ -106,7 +106,7 @@ func TestBuildShardGroupsWithEtcdEngineRestartsAcrossGroups(t *testing.T) { sharedClock := kv.NewHLC() openShardStore := func(bootstrap bool) ([]*raftGroupRuntime, map[uint64]*kv.ShardGroup, *kv.ShardStore) { - factory, err := newRaftFactory(raftEngineEtcd) + factory, err := newRaftFactory(raftEngineEtcd, nil) require.NoError(t, err) runtimes, shardGroups, err := buildShardGroups("n1", baseDir, groups, true, bootstrap, nil, factory, nil, sharedClock, nil, nil, "", encryptionWriteWiring{}, nil) require.NoError(t, err)