Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 13 additions & 14 deletions .github/workflows/jepsen-test-scheduled-dedup.yml
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
# Jepsen Scheduled Stress Test — Option-2 Dedup Mode
#
# Daily run with ELASTICKV_REDIS_ONEPHASE_DEDUP=1 and
# ELASTICKV_DYNAMODB_ONEPHASE_DEDUP=1 so the demo cluster exercises the
# option-2 idempotency path on both adapters. The criterion for
# default-on (docs/design/2026_05_21 §M4 and
# docs/design/2026_06_03_partial_dynamodb_onephase_dedup.md §M2) is 7
# consecutive days of green runs with no :duplicate-elements /
# :G-single-item-realtime anomalies in each workload's analysis output.
# ELASTICKV_DYNAMODB_ONEPHASE_DEDUP=1 pinned so the demo cluster exercises the
# option-2 idempotency path on both adapters, independent of future default
# changes. This preserves the high-pressure dedup signal for
# :duplicate-elements / :G-single-item-realtime anomalies in each workload's
# analysis output.
#
# Scope: Redis + DynamoDB workloads. The dedup feature ships behind the
# Scope: Redis + DynamoDB workloads. The dedup feature is controlled by the
# Redis adapter's onePhaseTxnDedup flag (RPUSH/LPUSH, MULTI/EXEC,
# standalone SET) and the DynamoDB adapter's onePhaseTxnDedup flag
# (single-item UpdateItem/PutItem/DeleteItem via PR #920). S3 / SQS do
# (single-item UpdateItem/PutItem/DeleteItem). S3 / SQS do
# not yet route through the dedup loop, so re-running them here would add
# hours of CI for no signal on the new code path.
#
# Cadence: 03:17 UTC daily (off-peak; non-zero minute per ScheduleWakeup
# guidance). The general 6-hourly scheduled workflow continues to run
# without the dedup gate so the legacy path also stays covered.
# with explicit dedup opt-outs so the legacy path also stays covered.

on:
schedule:
Expand Down Expand Up @@ -72,8 +71,8 @@ jobs:
# state. Anomalies in :duplicate-elements / :G-single-item-realtime
# under this flag indicate a regression in option-2 plumbing.
ELASTICKV_REDIS_ONEPHASE_DEDUP: "1"
# Enable the DynamoDB adapter option-2 dedup gate (PR #920). demo.go
# wires it via adapter.NewDynamoDBServer, which reads this env var;
# Pin the DynamoDB adapter option-2 dedup path on. demo.go wires it
# via adapter.NewDynamoDBServer, which reads this env var;
# the single-item write path (UpdateItem/PutItem/DeleteItem) then
# routes through retryItemWriteWithGenerationDedup on the leader,
# exercising the same FSM exact-ts probe as the Redis path. The
Expand Down Expand Up @@ -156,7 +155,7 @@ jobs:
# The env vars are set at the JOB level above and inherited by
# all `run:` steps; nothing in demo.go can intercept or unset
# them before the adapters read os.Getenv. So if they are "1"
# here, the dedup gates ARE active in the cluster. We print
# here, the dedup paths ARE active in the cluster. We print
# them explicitly so a failed run's log makes the configuration
# unambiguous (vs the general 6-hourly workflow whose runs would
# have empty values here).
Expand All @@ -165,7 +164,7 @@ jobs:
exit 2
fi
if [ "${ELASTICKV_DYNAMODB_ONEPHASE_DEDUP:-}" != "1" ]; then
echo "FATAL: ELASTICKV_DYNAMODB_ONEPHASE_DEDUP is not '1' — this workflow runs only with the dedup gate on"
echo "FATAL: ELASTICKV_DYNAMODB_ONEPHASE_DEDUP is not '1' — this workflow pins the dedup path on"
exit 2
fi

Expand Down Expand Up @@ -199,7 +198,7 @@ jobs:
working-directory: jepsen
timeout-minutes: 10
# --local connects to the already-running demo cluster's dynamo
# ports (gate ON via ELASTICKV_DYNAMODB_ONEPHASE_DEDUP), so the
# ports (dedup ON via ELASTICKV_DYNAMODB_ONEPHASE_DEDUP), so the
# single-item list_append writes exercise the option-2 reuse +
# exact-ts probe path. A :duplicate-elements anomaly here is the
# regression this workflow is meant to catch.
Expand Down
21 changes: 9 additions & 12 deletions .github/workflows/jepsen-test-scheduled.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,16 @@ jobs:
runs-on: ubuntu-latest
env:
GOCACHE: /tmp/go-build
# Explicit dedup-OFF control baseline. The Redis adapter's
# onePhaseTxnDedup flipped to default-on in
# docs/design/2026_06_10_proposed_redis_onephase_dedup_default_on.md;
# this workflow is preserved as the legacy-path coverage so anomalies
# the dedup gate prevents (`:duplicate-elements`, `:future-read`,
# `:G-single-item-realtime`) continue to be measured against an
# unprotected build. Pair with the dedup-ON workflow
# (.github/workflows/jepsen-test-scheduled-dedup.yml) which sets
# ELASTICKV_REDIS_ONEPHASE_DEDUP=1 explicitly. Retirement of this
# workflow is a follow-up after 30 days of post-flip data; until
# then, do NOT remove this env var — without it the two workflows
# would exercise the same path under the new default.
# Explicit dedup-OFF control baseline. The Redis and DynamoDB adapter
# onePhaseTxnDedup flags are default-on, so this workflow is preserved
# as legacy-path coverage. Pair with the dedup-ON workflow
# (.github/workflows/jepsen-test-scheduled-dedup.yml) which pins both
# env vars to 1. Retirement of this workflow is a follow-up after 30
# days of post-flip data; until then, do NOT remove these env vars —
# without them the two workflows would exercise the same path under
# the new defaults.
ELASTICKV_REDIS_ONEPHASE_DEDUP: "0"
ELASTICKV_DYNAMODB_ONEPHASE_DEDUP: "0"
steps:
- uses: actions/checkout@v6
with:
Expand Down
17 changes: 9 additions & 8 deletions adapter/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,9 @@ type DynamoDBServer struct {
// retryable write error, the retry REUSES the failed attempt's write set
// under a fresh commit_ts and carries prev_commit_ts so the FSM no-ops a
// commit that already landed under leadership churn (the :duplicate-elements
// anomaly). It MUST stay off until every node runs a probe-aware binary —
// see R5 in docs/design/2026_06_03_partial_dynamodb_onephase_dedup.md.
// Default off; enabled via WithDynamoOnePhaseTxnDedup or the
// ELASTICKV_DYNAMODB_ONEPHASE_DEDUP env var.
// anomaly). The FSM probe now ships on every node, so this defaults on.
// Set ELASTICKV_DYNAMODB_ONEPHASE_DEDUP=0 or pass
// WithDynamoOnePhaseTxnDedup(false) for an operator rollback.
onePhaseTxnDedup bool
}

Expand Down Expand Up @@ -163,9 +162,11 @@ func WithDynamoDBLeaderMap(m map[string]string) DynamoDBServerOption {
}
}

// WithDynamoOnePhaseTxnDedup enables the option-2 one-phase idempotency dedup on
// the single-item write retry path (see DynamoDBServer.onePhaseTxnDedup). Off by
// default; enable only after the whole cluster runs a probe-aware binary.
// WithDynamoOnePhaseTxnDedup enables or disables the option-2 one-phase
// idempotency dedup on the single-item write retry path (see
// DynamoDBServer.onePhaseTxnDedup). It defaults on; pass false to opt out from
// code, or set ELASTICKV_DYNAMODB_ONEPHASE_DEDUP=0 to opt out from the
// environment. The constructor option trumps the env var.
func WithDynamoOnePhaseTxnDedup(enabled bool) DynamoDBServerOption {
return func(server *DynamoDBServer) {
server.onePhaseTxnDedup = enabled
Expand Down Expand Up @@ -259,7 +260,7 @@ func NewDynamoDBServer(listen net.Listener, st store.MVCCStore, coordinate kv.Co
listen: listen,
store: st,
coordinator: coordinate,
onePhaseTxnDedup: os.Getenv("ELASTICKV_DYNAMODB_ONEPHASE_DEDUP") == "1",
onePhaseTxnDedup: os.Getenv("ELASTICKV_DYNAMODB_ONEPHASE_DEDUP") != "0",
}
d.targetHandlers = map[string]func(http.ResponseWriter, *http.Request){
createTableTarget: d.createTable,
Expand Down
13 changes: 7 additions & 6 deletions adapter/dynamodb_item_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,12 @@ func (d *DynamoDBServer) retryItemWriteWithGeneration(
exhaustedMessage string,
prepare func(readTS uint64) (*itemWritePlan, error),
) (*itemWritePlan, error) {
// Option-2 one-phase dedup (gated, default off): on a retryable write error,
// reuse the failed attempt's write set under a fresh commit_ts + prev_commit_ts
// so the FSM no-ops a commit that already landed under leadership churn,
// instead of re-reading and re-appending (the :duplicate-elements anomaly).
// See docs/design/2026_06_03_partial_dynamodb_onephase_dedup.md.
// Option-2 one-phase dedup (default on, with an explicit rollback switch):
// on a retryable write error, reuse the failed attempt's write set under a
// fresh commit_ts + prev_commit_ts so the FSM no-ops a commit that already
// landed under leadership churn, instead of re-reading and re-appending (the
// :duplicate-elements anomaly). See
// docs/design/2026_06_03_partial_dynamodb_onephase_dedup.md.
//
// Leader-only (codex P1, PR #920): the dedup path allocates commit_ts from
// the LOCAL HLC and carries it as prev_commit_ts, so that timestamp MUST be
Expand All @@ -143,7 +144,7 @@ func (d *DynamoDBServer) retryItemWriteWithGeneration(

// retryItemWriteWithGenerationLegacy is the pre-dedup retry loop: it recomputes
// the write set from a fresh read on every retryable error. It is the active
// path whenever the dedup gate is off or this node is not the leader, so it
// path whenever dedup is explicitly off or this node is not the leader, so it
// stays byte-identical to the pre-feature behavior.
func (d *DynamoDBServer) retryItemWriteWithGenerationLegacy(
ctx context.Context,
Expand Down
32 changes: 26 additions & 6 deletions adapter/dynamodb_onephase_dedup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,7 @@ func readListValues(t *testing.T, server *DynamoDBServer, schema *dynamoTableSch
}

func newDedupItemWriteServer(st store.MVCCStore, coord kv.Coordinator, dedup bool) (*dynamoTableSchema, *DynamoDBServer) {
var opts []DynamoDBServerOption
if dedup {
opts = append(opts, WithDynamoOnePhaseTxnDedup(true))
}
server := NewDynamoDBServer(nil, st, coord, opts...)
server := NewDynamoDBServer(nil, st, coord, WithDynamoOnePhaseTxnDedup(dedup))
return dedupItemTable(), server
}

Expand Down Expand Up @@ -251,8 +247,32 @@ func TestItemWriteDedup_NonLeaderFallsBackToLegacy(t *testing.T) {
"non-leader falls back to the legacy recompute path (leader allocates commit_ts via redirect)")
}

func TestItemWriteDedup_DefaultOn(t *testing.T) {
t.Setenv("ELASTICKV_DYNAMODB_ONEPHASE_DEDUP", "")
server := NewDynamoDBServer(nil, store.NewMVCCStore(), newDedupTestCoordinator(store.NewMVCCStore(), 0, false))
require.True(t, server.onePhaseTxnDedup)
}

func TestItemWriteDedup_EnvOptOut(t *testing.T) {
t.Setenv("ELASTICKV_DYNAMODB_ONEPHASE_DEDUP", "0")
server := NewDynamoDBServer(nil, store.NewMVCCStore(), newDedupTestCoordinator(store.NewMVCCStore(), 0, false))
require.False(t, server.onePhaseTxnDedup)
}

func TestItemWriteDedup_OptionOverridesEnv(t *testing.T) {
t.Setenv("ELASTICKV_DYNAMODB_ONEPHASE_DEDUP", "0")
server := NewDynamoDBServer(nil, store.NewMVCCStore(), newDedupTestCoordinator(store.NewMVCCStore(), 0, false), WithDynamoOnePhaseTxnDedup(true))
require.True(t, server.onePhaseTxnDedup)
}

func TestItemWriteDedup_OptionOverridesEnvToDisable(t *testing.T) {
t.Setenv("ELASTICKV_DYNAMODB_ONEPHASE_DEDUP", "1")
server := NewDynamoDBServer(nil, store.NewMVCCStore(), newDedupTestCoordinator(store.NewMVCCStore(), 0, false), WithDynamoOnePhaseTxnDedup(false))
require.False(t, server.onePhaseTxnDedup)
}

// TestItemWriteDedup_DisabledKeepsLegacyPath pins that the gate is load-bearing:
// with onePhaseTxnDedup OFF (the default), the legacy retry RE-READS and
// with onePhaseTxnDedup explicitly OFF, the legacy retry RE-READS and
// recomputes, so a landed-then-ambiguous attempt 1 is double-applied — the
// :duplicate-elements bug. This characterizes the pre-fix behavior the gate
// closes; flipping the gate on (the headline test) eliminates the duplicate.
Expand Down
33 changes: 30 additions & 3 deletions adapter/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,16 @@ func (c *redisMetricsConn) WriteError(msg string) {
// kv.isLeadershipLossError so any sentinel those classifiers already
// recognize as transient also flips a Redis reply to NOTLEADER.
func writeRedisError(conn redcon.Conn, err error) {
msg := err.Error()
if isTransientLeaderRedisError(err) {
conn.WriteError("NOTLEADER " + err.Error())
if strings.HasPrefix(strings.ToUpper(msg), "NOTLEADER ") {
conn.WriteError(msg)
return
}
conn.WriteError("NOTLEADER " + msg)
return
}
conn.WriteError(err.Error())
conn.WriteError(msg)
}

// isTransientLeaderRedisError reports whether err is a transient
Expand Down Expand Up @@ -1983,6 +1988,9 @@ func (r *RedisServer) proxyTransactionToLeader(conn redcon.Conn, queue []redcon.
if handleProxyTxnError(conn, err) {
return
}
if handleProxyTxnCommandError(conn, cmds) {
return
}
writeProxyCmdsResult(conn, cmds)
}

Expand Down Expand Up @@ -2033,7 +2041,8 @@ func handleProxyTxnError(conn redcon.Conn, err error) bool {
// Fatal transport / context error: per-command results are unreliable.
if err != nil {
var netErr net.Error
if errors.Is(err, context.DeadlineExceeded) ||
if isTransientLeaderRedisError(err) ||
errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, context.Canceled) ||
errors.Is(err, io.EOF) ||
errors.Is(err, io.ErrUnexpectedEOF) ||
Expand All @@ -2045,6 +2054,24 @@ func handleProxyTxnError(conn redcon.Conn, err error) bool {
return false
}

// handleProxyTxnCommandError promotes transient leadership failures returned
// by the proxied EXEC target to a top-level EXEC error. go-redis can surface a
// target's EXEC-level error on the queued command handles; writing those as
// EXEC array elements makes clients treat leadership churn like command data.
func handleProxyTxnCommandError(conn redcon.Conn, cmds []*redis.Cmd) bool {
for _, cmd := range cmds {
err := cmd.Err()
Comment on lines +2062 to +2063

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

According to the repository's general rules, we should avoid adding dead defensive code, such as unreachable nil guards, if upstream checks or callers already guarantee that the variables are non-nil. Since execTxPipeline always populates cmds with non-nil *redis.Cmd instances, the cmd == nil check is unreachable and can be safely removed.

	for _, cmd := range cmds {
		err := cmd.Err()
References
  1. Avoid adding dead defensive code, such as unreachable nil guards, if upstream checks or callers already guarantee that the variables are non-nil.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

対応しました。handleProxyTxnCommandError から到達不能な cmd == nil guard を削除しました。

Validation:

  • go test ./adapter -run 'Test(IsTransientLeaderRedisError|WriteRedisError|HandleProxyTxnError|HandleProxyTxnCommandError)$'
  • git diff --check

if err == nil || errors.Is(err, redis.Nil) {
continue
}
if isTransientLeaderRedisError(err) {
writeRedisError(conn, err)
return true
}
}
return false
}

// writeProxyCmdsResult writes an EXEC-style array reply for the given pipeline
// command handles. For any other non-nil per-command errors, each cmd carries
// its own result, which is the correct Redis EXEC semantics.
Expand Down
79 changes: 76 additions & 3 deletions adapter/redis_error_prefix_test.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,31 @@
package adapter

import (
"context"
"fmt"
"io"
"testing"

"github.com/bootjp/elastickv/internal/raftengine"
"github.com/bootjp/elastickv/kv"
"github.com/cockroachdb/errors"
"github.com/redis/go-redis/v9"
"github.com/tidwall/redcon"
)

// captureConn is the minimal redcon.Conn surface writeRedisError uses;
// only WriteError needs a real implementation.
// captureConn is the minimal redcon.Conn surface these tests inspect.
type captureConn struct {
redcon.Conn
lastErr string
lastErr string
lastArray int
wroteArray bool
}

func (c *captureConn) WriteError(msg string) { c.lastErr = msg }
func (c *captureConn) WriteArray(count int) {
c.lastArray = count
c.wroteArray = true
}

func TestIsTransientLeaderRedisError(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -105,6 +112,9 @@ func TestWriteRedisError(t *testing.T) {
{"grpc-wrapped leader-not-found gains NOTLEADER prefix",
errors.New("rpc error: code = Unknown desc = leader not found"),
"NOTLEADER rpc error: code = Unknown desc = leader not found"},
{"already NOTLEADER-prefixed error is not double-prefixed",
errors.New("NOTLEADER leader not found"),
"NOTLEADER leader not found"},
// Regression: address-mapping gap errors (raft leader known
// but raft→redis address missing in r.leaderRedis) must be
// ERR-prefixed at the source so Carmine maps to :prefix :err
Expand All @@ -131,6 +141,69 @@ func TestWriteRedisError(t *testing.T) {
}
}

func TestHandleProxyTxnError(t *testing.T) {
t.Parallel()
t.Run("transient leader error is top-level NOTLEADER", func(t *testing.T) {
t.Parallel()
c := &captureConn{}
handled := handleProxyTxnError(c, errors.New("rpc error: code = Unknown desc = leader not found"))
if !handled {
t.Fatal("handleProxyTxnError returned false")
}
if c.lastErr != "NOTLEADER rpc error: code = Unknown desc = leader not found" {
t.Fatalf("last error = %q", c.lastErr)
}
if c.wroteArray {
t.Fatalf("unexpected array reply %d", c.lastArray)
}
})

t.Run("redis transaction abort remains null array", func(t *testing.T) {
t.Parallel()
c := &captureConn{}
handled := handleProxyTxnError(c, redis.TxFailedErr)
if !handled {
t.Fatal("handleProxyTxnError returned false")
}
if !c.wroteArray || c.lastArray != -1 {
t.Fatalf("array reply = (%v, %d), want (-1)", c.wroteArray, c.lastArray)
}
if c.lastErr != "" {
t.Fatalf("unexpected error reply %q", c.lastErr)
}
})
}

func TestHandleProxyTxnCommandError(t *testing.T) {
t.Parallel()
t.Run("transient command error is promoted to top-level NOTLEADER", func(t *testing.T) {
t.Parallel()
cmd := redis.NewCmd(context.Background(), "LRANGE", "k", 0, -1)
cmd.SetErr(errors.New("rpc error: code = FailedPrecondition desc = raft engine: not leader"))
c := &captureConn{}
handled := handleProxyTxnCommandError(c, []*redis.Cmd{cmd})
if !handled {
t.Fatal("handleProxyTxnCommandError returned false")
}
if c.lastErr != "NOTLEADER rpc error: code = FailedPrecondition desc = raft engine: not leader" {
t.Fatalf("last error = %q", c.lastErr)
}
})

t.Run("ordinary command error stays in EXEC result array", func(t *testing.T) {
t.Parallel()
cmd := redis.NewCmd(context.Background(), "LRANGE", "k", 0, -1)
cmd.SetErr(errors.New("WRONGTYPE Operation against a key holding the wrong kind of value"))
c := &captureConn{}
if handleProxyTxnCommandError(c, []*redis.Cmd{cmd}) {
t.Fatal("ordinary command error was handled as terminal")
}
if c.lastErr != "" {
t.Fatalf("unexpected error reply %q", c.lastErr)
}
})
}

// TestHasTransientLeaderSuffix_PinsSentinels closes the gap noted
// at kv/coordinator.go:529 ("A symmetric pin lives in the adapter
// test package"): the adapter's redisLeaderErrorPhrases set must
Expand Down
Loading
Loading