Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions kv/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ func NewCoordinatorWithEngine(txm Transactional, engine raftengine.Engine, opts
for _, opt := range opts {
opt(c)
}
// Resolve the optional LeaseProvider capability once here so the
// LeaseRead / refreshLeaseAfterDispatch hot paths test a cached
// field instead of repeating the interface type assertion per call.
// engine is never reassigned after construction, so the cached value
// stays valid for the Coordinate's lifetime.
// Register a leader-loss hook so the lease is invalidated the instant
// the engine notices a state transition out of the leader role,
// rather than waiting for wall-clock expiry of the current lease.
Expand All @@ -128,6 +133,7 @@ func NewCoordinatorWithEngine(txm Transactional, engine raftengine.Engine, opts
// one-shot tools) MUST call Close() to avoid leaking a closure
// pointing into this Coordinate.
if lp, ok := engine.(raftengine.LeaseProvider); ok {
c.lp = lp
c.deregisterLeaseCb = lp.RegisterLeaderLossCallback(c.lease.invalidate)
}
return c
Expand Down Expand Up @@ -169,10 +175,18 @@ type CoordinateResponse struct {
type Coordinate struct {
transactionManager Transactional
engine raftengine.Engine
clock *HLC
connCache GRPCConnCache
log *slog.Logger
lease leaseState
// lp caches the engine's optional LeaseProvider capability so the
// LeaseRead hot path (and refreshLeaseAfterDispatch) test a single
// field for nil instead of performing an interface type assertion on
// every call. It is set once in NewCoordinatorWithEngine and is nil
// when the engine does not implement raftengine.LeaseProvider. The
// engine field is never reassigned after construction, so this stays
// in sync without a lock.
lp raftengine.LeaseProvider
clock *HLC
connCache GRPCConnCache
log *slog.Logger
lease leaseState
// deregisterLeaseCb removes the leader-loss callback registered
// against engine at construction. Long-lived Coordinates don't
// need to call it (the engine will be closed after them), but
Expand Down Expand Up @@ -599,11 +613,10 @@ func (c *Coordinate) refreshLeaseAfterDispatch(resp *CoordinateResponse, err err
if resp == nil || resp.CommitIndex == 0 {
return
}
lp, ok := c.engine.(raftengine.LeaseProvider)
if !ok {
if c.lp == nil {
return
}
c.lease.extend(dispatchStart.Add(lp.LeaseDuration()), expectedGen)
c.lease.extend(dispatchStart.Add(c.lp.LeaseDuration()), expectedGen)
}

func (c *Coordinate) IsLeader() bool {
Expand Down Expand Up @@ -716,8 +729,8 @@ func (c *Coordinate) LinearizableReadForKey(ctx context.Context, _ []byte) (uint
// Callers that resolve timestamps via store.LastCommitTS may discard
// the value.
func (c *Coordinate) LeaseRead(ctx context.Context) (uint64, error) {
lp, ok := c.engine.(raftengine.LeaseProvider)
if !ok {
lp := c.lp
if lp == nil {
return c.LinearizableRead(ctx)
}
leaseDur := lp.LeaseDuration()
Expand Down
70 changes: 70 additions & 0 deletions kv/lease_read_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package kv

import (
"context"
"testing"
"time"

"github.com/bootjp/elastickv/distribution"
"github.com/bootjp/elastickv/internal/monoclock"
)

// BenchmarkLeaseRead measures the Coordinate.LeaseRead fast path, where
// the engine-driven lease anchor (LastQuorumAck + State==Leader) is
// fresh so the read is served from the cached AppliedIndex without a
// LinearizableRead round-trip. The LeaseProvider capability is resolved
// once at construction (NewCoordinatorWithEngine) and cached on
// Coordinate.lp, so this hot loop performs a single nil check rather
// than an interface type assertion per call.
func BenchmarkLeaseRead(b *testing.B) {
eng := &fakeLeaseEngine{applied: 4242, leaseDur: time.Hour}
eng.setQuorumAck(monoclock.Now())
c := NewCoordinatorWithEngine(nil, eng)
ctx := context.Background()

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
if _, err := c.LeaseRead(ctx); err != nil {
b.Fatal(err)
}
}
b.StopTimer()

// Guard the benchmark against silently exercising the slow path: a
// single LinearizableRead would invalidate the "assertion is off the
// hot path" claim because the slow path dominates the measurement.
if got := eng.linearizableCalls.Load(); got != 0 {
b.Fatalf("expected the lease fast path on every iteration, but LinearizableRead ran %d times", got)
}
}

// BenchmarkGroupLeaseRead measures the sharded groupLeaseRead fast path
// (via ShardedCoordinator.LeaseRead on the default group). The
// LeaseProvider capability is resolved once per group in
// NewShardedCoordinator and cached on ShardGroup.lp, so the hot loop is
// a single nil check rather than a per-call interface type assertion.
func BenchmarkGroupLeaseRead(b *testing.B) {
eng := newShardedLeaseEngine(7777)
eng.setQuorumAck(monoclock.Now())

distEngine := distribution.NewEngine()
distEngine.UpdateRoute([]byte("a"), nil, 1)
coord := NewShardedCoordinator(distEngine, map[uint64]*ShardGroup{
1: {Engine: eng, Txn: &recordingTransactional{}},
}, 1, NewHLC(), nil)
ctx := context.Background()

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
if _, err := coord.LeaseRead(ctx); err != nil {
b.Fatal(err)
}
}
b.StopTimer()

if got := eng.linearizableCalls.Load(); got != 0 {
b.Fatalf("expected the lease fast path on every iteration, but LinearizableRead ran %d times", got)
}
}
26 changes: 21 additions & 5 deletions kv/sharded_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ type ShardGroup struct {
Store store.MVCCStore
Txn Transactional
lease leaseState
// lp caches the Engine's optional LeaseProvider capability so the
// groupLeaseRead / maybeRefresh hot paths test a single field for
// nil instead of performing an interface type assertion per call.
// NewShardedCoordinator resolves it once for every group it owns; it
// is nil when the Engine does not implement raftengine.LeaseProvider.
// Engine is not reassigned after the coordinator is constructed, so
// the cached value stays valid.
lp raftengine.LeaseProvider
// raftPayloadWrap is the Stage 6E-2c hot-swap point for the raft
// envelope wrap closure. A nil load means the wrap is inactive
// and proposals pass through cleartext (the Stage 3 default).
Expand Down Expand Up @@ -283,11 +291,10 @@ func (t *leaseRefreshingTxn) maybeRefresh(resp *TransactionResponse, start monoc
if resp == nil || resp.CommitIndex == 0 {
return
}
lp, ok := t.g.Engine.(raftengine.LeaseProvider)
if !ok {
if t.g.lp == nil {
return
}
t.g.lease.extend(start.Add(lp.LeaseDuration()), expectedGen)
t.g.lease.extend(start.Add(t.g.lp.LeaseDuration()), expectedGen)
}

// Close forwards to the wrapped Transactional if it implements
Expand Down Expand Up @@ -577,10 +584,14 @@ func NewShardedCoordinator(engine *distribution.Engine, groups map[uint64]*Shard
}
}
router.Register(gid, g.Txn, g.Store)
// Resolve the optional LeaseProvider capability once so
// groupLeaseRead / maybeRefresh test g.lp for nil instead of
// re-asserting the interface per call.
// Per-shard leader-loss hook: when this group's engine notices
// a state transition out of leader, drop the lease so the next
// LeaseReadForKey on that shard takes the slow path.
if lp, ok := g.Engine.(raftengine.LeaseProvider); ok {
g.lp = lp
deregisters = append(deregisters, lp.RegisterLeaderLossCallback(g.lease.invalidate))
}
}
Expand Down Expand Up @@ -1547,10 +1558,15 @@ func observeLeaseRead(observer LeaseReadObserver, hit bool) {

func groupLeaseRead(ctx context.Context, g *ShardGroup, observer LeaseReadObserver) (uint64, error) {
engine := engineForGroup(g)
lp, ok := engine.(raftengine.LeaseProvider)
if !ok {
// g.lp caches the LeaseProvider assertion done once at construction
// (NewShardedCoordinator); a nil group or an engine without the
// capability falls through to the linearizable slow path. The nil-g
// guard preserves engineForGroup's nil-safety since g.lp would panic
// on a nil receiver.
if g == nil || g.lp == nil {
return linearizableReadEngineCtx(ctx, engine)
}
lp := g.lp
leaseDur := lp.LeaseDuration()
if leaseDur <= 0 {
return linearizableReadEngineCtx(ctx, engine)
Expand Down
Loading