diff --git a/kv/coordinator.go b/kv/coordinator.go index 06a276dd..df4f8c87 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -120,6 +120,11 @@ func NewCoordinatorWithEngine(txm Transactional, engine raftengine.Engine, opts for _, opt := range opts { opt(c) } + // Resolve the optional LeaseProvider capability once here so the + // LeaseRead / refreshLeaseAfterDispatch hot paths test a cached + // field instead of repeating the interface type assertion per call. + // engine is never reassigned after construction, so the cached value + // stays valid for the Coordinate's lifetime. // Register a leader-loss hook so the lease is invalidated the instant // the engine notices a state transition out of the leader role, // rather than waiting for wall-clock expiry of the current lease. @@ -128,6 +133,7 @@ func NewCoordinatorWithEngine(txm Transactional, engine raftengine.Engine, opts // one-shot tools) MUST call Close() to avoid leaking a closure // pointing into this Coordinate. if lp, ok := engine.(raftengine.LeaseProvider); ok { + c.lp = lp c.deregisterLeaseCb = lp.RegisterLeaderLossCallback(c.lease.invalidate) } return c @@ -169,10 +175,18 @@ type CoordinateResponse struct { type Coordinate struct { transactionManager Transactional engine raftengine.Engine - clock *HLC - connCache GRPCConnCache - log *slog.Logger - lease leaseState + // lp caches the engine's optional LeaseProvider capability so the + // LeaseRead hot path (and refreshLeaseAfterDispatch) test a single + // field for nil instead of performing an interface type assertion on + // every call. It is set once in NewCoordinatorWithEngine and is nil + // when the engine does not implement raftengine.LeaseProvider. The + // engine field is never reassigned after construction, so this stays + // in sync without a lock. + lp raftengine.LeaseProvider + clock *HLC + connCache GRPCConnCache + log *slog.Logger + lease leaseState // deregisterLeaseCb removes the leader-loss callback registered // against engine at construction. Long-lived Coordinates don't // need to call it (the engine will be closed after them), but @@ -599,11 +613,10 @@ func (c *Coordinate) refreshLeaseAfterDispatch(resp *CoordinateResponse, err err if resp == nil || resp.CommitIndex == 0 { return } - lp, ok := c.engine.(raftengine.LeaseProvider) - if !ok { + if c.lp == nil { return } - c.lease.extend(dispatchStart.Add(lp.LeaseDuration()), expectedGen) + c.lease.extend(dispatchStart.Add(c.lp.LeaseDuration()), expectedGen) } func (c *Coordinate) IsLeader() bool { @@ -716,8 +729,8 @@ func (c *Coordinate) LinearizableReadForKey(ctx context.Context, _ []byte) (uint // Callers that resolve timestamps via store.LastCommitTS may discard // the value. func (c *Coordinate) LeaseRead(ctx context.Context) (uint64, error) { - lp, ok := c.engine.(raftengine.LeaseProvider) - if !ok { + lp := c.lp + if lp == nil { return c.LinearizableRead(ctx) } leaseDur := lp.LeaseDuration() diff --git a/kv/lease_read_benchmark_test.go b/kv/lease_read_benchmark_test.go new file mode 100644 index 00000000..d9069d5e --- /dev/null +++ b/kv/lease_read_benchmark_test.go @@ -0,0 +1,70 @@ +package kv + +import ( + "context" + "testing" + "time" + + "github.com/bootjp/elastickv/distribution" + "github.com/bootjp/elastickv/internal/monoclock" +) + +// BenchmarkLeaseRead measures the Coordinate.LeaseRead fast path, where +// the engine-driven lease anchor (LastQuorumAck + State==Leader) is +// fresh so the read is served from the cached AppliedIndex without a +// LinearizableRead round-trip. The LeaseProvider capability is resolved +// once at construction (NewCoordinatorWithEngine) and cached on +// Coordinate.lp, so this hot loop performs a single nil check rather +// than an interface type assertion per call. +func BenchmarkLeaseRead(b *testing.B) { + eng := &fakeLeaseEngine{applied: 4242, leaseDur: time.Hour} + eng.setQuorumAck(monoclock.Now()) + c := NewCoordinatorWithEngine(nil, eng) + ctx := context.Background() + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + if _, err := c.LeaseRead(ctx); err != nil { + b.Fatal(err) + } + } + b.StopTimer() + + // Guard the benchmark against silently exercising the slow path: a + // single LinearizableRead would invalidate the "assertion is off the + // hot path" claim because the slow path dominates the measurement. + if got := eng.linearizableCalls.Load(); got != 0 { + b.Fatalf("expected the lease fast path on every iteration, but LinearizableRead ran %d times", got) + } +} + +// BenchmarkGroupLeaseRead measures the sharded groupLeaseRead fast path +// (via ShardedCoordinator.LeaseRead on the default group). The +// LeaseProvider capability is resolved once per group in +// NewShardedCoordinator and cached on ShardGroup.lp, so the hot loop is +// a single nil check rather than a per-call interface type assertion. +func BenchmarkGroupLeaseRead(b *testing.B) { + eng := newShardedLeaseEngine(7777) + eng.setQuorumAck(monoclock.Now()) + + distEngine := distribution.NewEngine() + distEngine.UpdateRoute([]byte("a"), nil, 1) + coord := NewShardedCoordinator(distEngine, map[uint64]*ShardGroup{ + 1: {Engine: eng, Txn: &recordingTransactional{}}, + }, 1, NewHLC(), nil) + ctx := context.Background() + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + if _, err := coord.LeaseRead(ctx); err != nil { + b.Fatal(err) + } + } + b.StopTimer() + + if got := eng.linearizableCalls.Load(); got != 0 { + b.Fatalf("expected the lease fast path on every iteration, but LinearizableRead ran %d times", got) + } +} diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index df4dc327..f50b6e0e 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -24,6 +24,14 @@ type ShardGroup struct { Store store.MVCCStore Txn Transactional lease leaseState + // lp caches the Engine's optional LeaseProvider capability so the + // groupLeaseRead / maybeRefresh hot paths test a single field for + // nil instead of performing an interface type assertion per call. + // NewShardedCoordinator resolves it once for every group it owns; it + // is nil when the Engine does not implement raftengine.LeaseProvider. + // Engine is not reassigned after the coordinator is constructed, so + // the cached value stays valid. + lp raftengine.LeaseProvider // raftPayloadWrap is the Stage 6E-2c hot-swap point for the raft // envelope wrap closure. A nil load means the wrap is inactive // and proposals pass through cleartext (the Stage 3 default). @@ -283,11 +291,10 @@ func (t *leaseRefreshingTxn) maybeRefresh(resp *TransactionResponse, start monoc if resp == nil || resp.CommitIndex == 0 { return } - lp, ok := t.g.Engine.(raftengine.LeaseProvider) - if !ok { + if t.g.lp == nil { return } - t.g.lease.extend(start.Add(lp.LeaseDuration()), expectedGen) + t.g.lease.extend(start.Add(t.g.lp.LeaseDuration()), expectedGen) } // Close forwards to the wrapped Transactional if it implements @@ -577,10 +584,14 @@ func NewShardedCoordinator(engine *distribution.Engine, groups map[uint64]*Shard } } router.Register(gid, g.Txn, g.Store) + // Resolve the optional LeaseProvider capability once so + // groupLeaseRead / maybeRefresh test g.lp for nil instead of + // re-asserting the interface per call. // Per-shard leader-loss hook: when this group's engine notices // a state transition out of leader, drop the lease so the next // LeaseReadForKey on that shard takes the slow path. if lp, ok := g.Engine.(raftengine.LeaseProvider); ok { + g.lp = lp deregisters = append(deregisters, lp.RegisterLeaderLossCallback(g.lease.invalidate)) } } @@ -1547,10 +1558,15 @@ func observeLeaseRead(observer LeaseReadObserver, hit bool) { func groupLeaseRead(ctx context.Context, g *ShardGroup, observer LeaseReadObserver) (uint64, error) { engine := engineForGroup(g) - lp, ok := engine.(raftengine.LeaseProvider) - if !ok { + // g.lp caches the LeaseProvider assertion done once at construction + // (NewShardedCoordinator); a nil group or an engine without the + // capability falls through to the linearizable slow path. The nil-g + // guard preserves engineForGroup's nil-safety since g.lp would panic + // on a nil receiver. + if g == nil || g.lp == nil { return linearizableReadEngineCtx(ctx, engine) } + lp := g.lp leaseDur := lp.LeaseDuration() if leaseDur <= 0 { return linearizableReadEngineCtx(ctx, engine)