diff --git a/.github/workflows/jepsen-test.yml b/.github/workflows/jepsen-test.yml index 2d98d5220..aeac9354c 100644 --- a/.github/workflows/jepsen-test.yml +++ b/.github/workflows/jepsen-test.yml @@ -31,20 +31,20 @@ jobs: curl -L https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein > ~/lein chmod +x ~/lein ~/lein version - - name: Pre-fetch Go modules + - name: Pre-fetch Go modules and build binary run: | mkdir -p "$GOCACHE" /tmp/go-tmp GOPATH=$(go env GOPATH) export GOCACHE GOTMPDIR=/tmp/go-tmp go mod download + go build -o /tmp/elastickv-bin . - name: Run Jepsen unit tests working-directory: jepsen run: ~/lein test - name: Launch etcd-backed cluster run: | set -euo pipefail - mkdir -p "$GOCACHE" /tmp/go-tmp /tmp/elastickv-ci - export GOTMPDIR=/tmp/go-tmp + mkdir -p /tmp/elastickv-ci BOOTSTRAP_MEMBERS="n1=127.0.0.1:50051,n2=127.0.0.1:50052,n3=127.0.0.1:50053" RAFT_REDIS_MAP="127.0.0.1:50051=127.0.0.1:63791,127.0.0.1:50052=127.0.0.1:63792,127.0.0.1:50053=127.0.0.1:63793" RAFT_S3_MAP="127.0.0.1:50051=127.0.0.1:63901,127.0.0.1:50052=127.0.0.1:63902,127.0.0.1:50053=127.0.0.1:63903" @@ -52,7 +52,7 @@ jobs: : > /tmp/elastickv-demo.pid for node in 1 2 3; do - nohup go run . \ + nohup /tmp/elastickv-bin \ --address "127.0.0.1:5005${node}" \ --redisAddress "127.0.0.1:6379${node}" \ --dynamoAddress "127.0.0.1:6380${node}" \ diff --git a/adapter/dynamodb_migration_test.go b/adapter/dynamodb_migration_test.go index fa579011b..a73451d94 100644 --- a/adapter/dynamodb_migration_test.go +++ b/adapter/dynamodb_migration_test.go @@ -8,7 +8,6 @@ import ( "github.com/bootjp/elastickv/kv" "github.com/bootjp/elastickv/store" - json "github.com/goccy/go-json" "github.com/stretchr/testify/require" ) @@ -187,61 +186,6 @@ func TestDynamoDB_EnsureLegacyTableMigration_MigratesLegacyGeneration(t *testing }, time.Second, 10*time.Millisecond) } -func TestDynamoDB_EnsureLegacyTableMigration_NormalizesLegacyGSIJSONFormat(t *testing.T) { - t.Parallel() - - legacySchema, server, st := newLegacyMigrationTestServer(t, true, "S") - writer := newDynamoFixtureWriter(t, st) - - legacyBody, err := json.Marshal(map[string]any{ - "table_name": legacySchema.TableName, - "attribute_definitions": legacySchema.AttributeDefinitions, - "primary_key": map[string]any{ - "hash_key": legacySchema.PrimaryKey.HashKey, - "range_key": legacySchema.PrimaryKey.RangeKey, - }, - "global_secondary_indexes": map[string]any{ - "status-index": map[string]any{ - "hash_key": "status", - "range_key": "sk", - }, - }, - "generation": legacySchema.Generation, - }) - require.NoError(t, err) - writer.put(dynamoTableMetaKey(legacySchema.TableName), legacyBody) - writer.put(dynamoTableGenerationKey(legacySchema.TableName), fmt.Appendf(nil, "%d", legacySchema.Generation)) - - writer.writeItem(legacySchema, map[string]attributeValue{ - "pk": newStringAttributeValue("tenant"), - "sk": newStringAttributeValue("2026-03-09T12:00:00Z"), - "status": newStringAttributeValue("open"), - "value": newStringAttributeValue("v1"), - }) - - ctx := context.Background() - require.NoError(t, server.ensureLegacyTableMigration(ctx, legacySchema.TableName)) - - schema, exists, err := server.loadTableSchema(ctx, legacySchema.TableName) - require.NoError(t, err) - require.True(t, exists) - require.Equal(t, "status", schema.GlobalSecondaryIndexes["status-index"].KeySchema.HashKey) - require.Equal(t, "sk", schema.GlobalSecondaryIndexes["status-index"].KeySchema.RangeKey) - require.Equal(t, "ALL", schema.GlobalSecondaryIndexes["status-index"].Projection.ProjectionType) - - out, err := server.queryItems(ctx, queryInput{ - TableName: legacySchema.TableName, - IndexName: "status-index", - KeyConditionExpression: "status = :status", - ExpressionAttributeValues: map[string]attributeValue{ - ":status": newStringAttributeValue("open"), - }, - }) - require.NoError(t, err) - require.Len(t, out.items, 1) - require.Equal(t, newStringAttributeValue("v1"), out.items[0]["value"]) -} - func TestDynamoDB_EnsureLegacyTableMigration_PrefersExistingTargetItems(t *testing.T) { t.Parallel() diff --git a/adapter/dynamodb_storage_codec.go b/adapter/dynamodb_storage_codec.go index 08c349d90..1276f36e7 100644 --- a/adapter/dynamodb_storage_codec.go +++ b/adapter/dynamodb_storage_codec.go @@ -8,7 +8,6 @@ import ( pb "github.com/bootjp/elastickv/proto" "github.com/cockroachdb/errors" - json "github.com/goccy/go-json" gproto "google.golang.org/protobuf/proto" ) @@ -18,6 +17,7 @@ var ( storedDynamoMarshalOptions = gproto.MarshalOptions{Deterministic: true} errStoredDynamoMessageTooLarge = errors.New("stored dynamo message too large") + errUnrecognizedStoredDynamoFormat = errors.New("unrecognized stored dynamo format") errNilDynamoTableSchema = errors.New("nil dynamo table schema") errInvalidDynamoKeyEncodingVersion = errors.New("invalid key encoding version") errDynamoKeyEncodingVersionOverflow = errors.New("dynamo key encoding version overflows int") @@ -58,20 +58,15 @@ func encodeStoredDynamoTableSchema(schema *dynamoTableSchema) ([]byte, error) { } func decodeStoredDynamoTableSchema(b []byte) (*dynamoTableSchema, error) { - if hasStoredDynamoPrefix(b, storedDynamoSchemaProtoPrefix) { - msg := &pb.DynamoTableSchema{} - if err := gproto.Unmarshal(b[len(storedDynamoSchemaProtoPrefix):], msg); err != nil { - return nil, errors.WithStack(err) - } - schema, err := dynamoTableSchemaFromProto(msg) - if err != nil { - return nil, errors.WithStack(err) - } - return schema, nil + if !hasStoredDynamoPrefix(b, storedDynamoSchemaProtoPrefix) { + return nil, errUnrecognizedStoredDynamoFormat } - - schema := &dynamoTableSchema{} - if err := json.Unmarshal(b, schema); err != nil { + msg := &pb.DynamoTableSchema{} + if err := gproto.Unmarshal(b[len(storedDynamoSchemaProtoPrefix):], msg); err != nil { + return nil, errors.WithStack(err) + } + schema, err := dynamoTableSchemaFromProto(msg) + if err != nil { return nil, errors.WithStack(err) } return schema, nil @@ -86,20 +81,15 @@ func encodeStoredDynamoItem(item map[string]attributeValue) ([]byte, error) { } func decodeStoredDynamoItem(b []byte) (map[string]attributeValue, error) { - if hasStoredDynamoPrefix(b, storedDynamoItemProtoPrefix) { - msg := &pb.DynamoItem{} - if err := gproto.Unmarshal(b[len(storedDynamoItemProtoPrefix):], msg); err != nil { - return nil, errors.WithStack(err) - } - item, err := dynamoItemFromProto(msg) - if err != nil { - return nil, errors.WithStack(err) - } - return item, nil + if !hasStoredDynamoPrefix(b, storedDynamoItemProtoPrefix) { + return nil, errUnrecognizedStoredDynamoFormat } - - item := map[string]attributeValue{} - if err := json.Unmarshal(b, &item); err != nil { + msg := &pb.DynamoItem{} + if err := gproto.Unmarshal(b[len(storedDynamoItemProtoPrefix):], msg); err != nil { + return nil, errors.WithStack(err) + } + item, err := dynamoItemFromProto(msg) + if err != nil { return nil, errors.WithStack(err) } return item, nil diff --git a/adapter/dynamodb_storage_codec_test.go b/adapter/dynamodb_storage_codec_test.go index 0fdbf3958..dbc28f179 100644 --- a/adapter/dynamodb_storage_codec_test.go +++ b/adapter/dynamodb_storage_codec_test.go @@ -3,7 +3,6 @@ package adapter import ( "testing" - json "github.com/goccy/go-json" "github.com/stretchr/testify/require" ) @@ -43,23 +42,6 @@ func TestStoredDynamoItemCodec_RoundTripProto(t *testing.T) { require.Equal(t, item, decoded) } -func TestStoredDynamoItemCodec_LegacyJSONFallback(t *testing.T) { - t.Parallel() - - boolTrue := true - legacy := map[string]attributeValue{ - "pk": newStringAttributeValue("tenant#1"), - "active": {BOOL: &boolTrue}, - } - - body, err := json.Marshal(legacy) - require.NoError(t, err) - - decoded, err := decodeStoredDynamoItem(body) - require.NoError(t, err) - require.Equal(t, legacy, decoded) -} - func TestStoredDynamoItemCodec_NormalizesNullTrue(t *testing.T) { t.Parallel() @@ -77,37 +59,6 @@ func TestStoredDynamoItemCodec_NormalizesNullTrue(t *testing.T) { require.True(t, *decoded["gone"].NULL) } -func TestStoredDynamoTableSchemaCodec_LegacyJSONFallback(t *testing.T) { - t.Parallel() - - body, err := json.Marshal(map[string]any{ - "table_name": "threads", - "attribute_definitions": map[string]string{ - "threadId": "S", - "createdAt": "S", - "status": "S", - }, - "primary_key": map[string]any{ - "hash_key": "threadId", - }, - "global_secondary_indexes": map[string]any{ - "status-index": map[string]any{ - "hash_key": "status", - "range_key": "createdAt", - }, - }, - "generation": 7, - }) - require.NoError(t, err) - - schema, err := decodeStoredDynamoTableSchema(body) - require.NoError(t, err) - require.Equal(t, "threads", schema.TableName) - require.Equal(t, "status", schema.GlobalSecondaryIndexes["status-index"].KeySchema.HashKey) - require.Equal(t, "createdAt", schema.GlobalSecondaryIndexes["status-index"].KeySchema.RangeKey) - require.Equal(t, "ALL", schema.GlobalSecondaryIndexes["status-index"].Projection.ProjectionType) -} - func stringPtr(v string) *string { return &v } diff --git a/adapter/dynamodb_storage_migration_test.go b/adapter/dynamodb_storage_migration_test.go deleted file mode 100644 index 71ff54379..000000000 --- a/adapter/dynamodb_storage_migration_test.go +++ /dev/null @@ -1,158 +0,0 @@ -package adapter - -import ( - "bytes" - "context" - "fmt" - "testing" - - "github.com/bootjp/elastickv/store" - json "github.com/goccy/go-json" - "github.com/stretchr/testify/require" -) - -func TestDynamoDB_QueryItems_LegacyJSONItemDoesNotRewriteStorage(t *testing.T) { - t.Parallel() - - schema, server, st := newStorageFormatMigrationTestServer(t) - writer := newDynamoFixtureWriter(t, st) - writer.writeSchema(schema) - - item := map[string]attributeValue{ - "pk": newStringAttributeValue("tenant"), - "sk": newStringAttributeValue("item"), - "value": newStringAttributeValue("old"), - } - itemKey, rawBefore := writeLegacyJSONItemFixture(t, writer, schema, item) - - out, err := server.queryItems(context.Background(), queryInput{ - TableName: schema.TableName, - KeyConditionExpression: "pk = :pk", - ExpressionAttributeValues: map[string]attributeValue{ - ":pk": newStringAttributeValue("tenant"), - }, - }) - require.NoError(t, err) - require.Len(t, out.items, 1) - require.Equal(t, newStringAttributeValue("old"), out.items[0]["value"]) - - rawAfter := mustReadRawValue(t, st, itemKey) - require.False(t, hasStoredDynamoPrefix(rawBefore, storedDynamoItemProtoPrefix)) - require.False(t, hasStoredDynamoPrefix(rawAfter, storedDynamoItemProtoPrefix)) - require.True(t, bytes.Equal(rawBefore, rawAfter)) -} - -func TestDynamoDB_UpdateItemWithRetry_RewritesLegacyJSONItemToProto(t *testing.T) { - t.Parallel() - - schema, server, st := newStorageFormatMigrationTestServer(t) - writer := newDynamoFixtureWriter(t, st) - writer.writeSchema(schema) - - item := map[string]attributeValue{ - "pk": newStringAttributeValue("tenant"), - "sk": newStringAttributeValue("item"), - "value": newStringAttributeValue("old"), - } - itemKey, rawBefore := writeLegacyJSONItemFixture(t, writer, schema, item) - - plan, err := server.updateItemWithRetry(context.Background(), updateItemInput{ - TableName: schema.TableName, - Key: map[string]attributeValue{"pk": newStringAttributeValue("tenant"), "sk": newStringAttributeValue("item")}, - UpdateExpression: "SET #value = :value", - ExpressionAttributeNames: map[string]string{ - "#value": "value", - }, - ExpressionAttributeValues: map[string]attributeValue{ - ":value": newStringAttributeValue("new"), - }, - }) - require.NoError(t, err) - require.Equal(t, newStringAttributeValue("new"), plan.next["value"]) - - rawAfter := mustReadRawValue(t, st, itemKey) - require.False(t, hasStoredDynamoPrefix(rawBefore, storedDynamoItemProtoPrefix)) - require.True(t, hasStoredDynamoPrefix(rawAfter, storedDynamoItemProtoPrefix)) - - decoded, err := decodeStoredDynamoItem(rawAfter) - require.NoError(t, err) - require.Equal(t, newStringAttributeValue("new"), decoded["value"]) -} - -func TestDynamoDB_EnsureLegacyTableMigration_RewritesLegacyJSONSchemaToProto(t *testing.T) { - t.Parallel() - - legacySchema, server, st := newLegacyMigrationTestServer(t, false, "S") - writer := newDynamoFixtureWriter(t, st) - - body, err := json.Marshal(legacySchema) - require.NoError(t, err) - writer.put(dynamoTableMetaKey(legacySchema.TableName), body) - writer.put(dynamoTableGenerationKey(legacySchema.TableName), fmt.Appendf(nil, "%d", legacySchema.Generation)) - - require.NoError(t, server.ensureLegacyTableMigration(context.Background(), legacySchema.TableName)) - - raw := mustReadRawValue(t, st, dynamoTableMetaKey(legacySchema.TableName)) - require.True(t, hasStoredDynamoPrefix(raw, storedDynamoSchemaProtoPrefix)) - - schema, err := decodeStoredDynamoTableSchema(raw) - require.NoError(t, err) - require.True(t, schema.usesOrderedKeyEncoding()) - require.Zero(t, schema.MigratingFromGeneration) - require.Equal(t, uint64(2), schema.Generation) -} - -func newStorageFormatMigrationTestServer(t *testing.T) (*dynamoTableSchema, *DynamoDBServer, store.MVCCStore) { - t.Helper() - - schema := &dynamoTableSchema{ - TableName: "t", - Generation: 1, - KeyEncodingVersion: dynamoOrderedKeyEncodingV2, - AttributeDefinitions: map[string]string{ - "pk": "S", - "sk": "S", - "value": "S", - }, - PrimaryKey: dynamoKeySchema{ - HashKey: "pk", - RangeKey: "sk", - }, - } - - st := store.NewMVCCStore() - server := NewDynamoDBServer(nil, st, newLocalAdapterCoordinator(st)) - return schema, server, st -} - -func writeLegacyJSONItemFixture( - t *testing.T, - writer *dynamoFixtureWriter, - schema *dynamoTableSchema, - item map[string]attributeValue, -) ([]byte, []byte) { - t.Helper() - - itemKey, err := schema.itemKeyFromAttributes(item) - require.NoError(t, err) - - body, err := json.Marshal(item) - require.NoError(t, err) - - writer.put(itemKey, body) - gsiKeys, err := schema.gsiEntryKeysForItem(item) - require.NoError(t, err) - for _, gsiKey := range gsiKeys { - writer.put(gsiKey, itemKey) - } - - return itemKey, body -} - -func mustReadRawValue(t *testing.T, st store.MVCCStore, key []byte) []byte { - t.Helper() - - raw, err := st.GetAt(context.Background(), key, st.LastCommitTS()) - require.NoError(t, err) - return raw -} diff --git a/adapter/redis_compat_types.go b/adapter/redis_compat_types.go index ace98f644..5593145e2 100644 --- a/adapter/redis_compat_types.go +++ b/adapter/redis_compat_types.go @@ -258,12 +258,6 @@ func (e redisStreamEntry) compareID(raw string, parsed redisStreamID, parsedVali return compareParsedRedisStreamID(e.ID, e.parsedID, true, raw, parsed, parsedValid) } -func (v *redisStreamValue) cacheParsedIDs() { - for i := range v.Entries { - v.Entries[i].cacheParsedID() - } -} - func encodeRedisTTL(expireAt time.Time) []byte { ms := max(expireAt.UnixMilli(), 0) buf := make([]byte, redisUint64Bytes) diff --git a/adapter/redis_list_pop_benchmark_test.go b/adapter/redis_list_pop_benchmark_test.go new file mode 100644 index 000000000..0b1b72d60 --- /dev/null +++ b/adapter/redis_list_pop_benchmark_test.go @@ -0,0 +1,367 @@ +package adapter + +// BullMQ-style workload benchmark: RPUSH (producer) + LPOP (consumer) +// +// What each implementation does: +// +// Legacy (main-branch) +// ───────────────────── +// RPUSH writes item keys + base metadata key (!lst|meta|). +// LPOP reads and overwrites the base metadata key. +// → RPUSH and LPOP always fight for the same metadata key. +// → Under concurrent mixed load, every RPUSH and every LPOP from any goroutine +// conflicts → high OCC conflict rate → retries grow with concurrency. +// +// Delta+Claim (this branch) +// ────────────────────────── +// RPUSH writes item keys + a unique delta key (commitTS-keyed). +// LPOP writes a claim key (seq-keyed, unique per item) + a delta key. +// → RPUSH and LPOP write completely different keys → zero RPUSH-LPOP conflicts. +// → Concurrent LPOPs only conflict when two goroutines race for the same +// sequence number; the loser re-reads the delta-resolved meta and claims +// the next available seq. +// +// OCC simulation: +// occAdapterCoordinator routes IsTxn=true through store.ApplyMutations +// (latestTS > startTS → ErrWriteConflict), matching the production Raft FSM. +// retryRedisWrite: server-side retry with exp. backoff (50 attempts, 1–10 ms). +// retryUntilSuccess: client-side retry when server-side budget is exhausted. +// +// ⚠ In-memory benchmark limitations: +// 1. Tombstone accumulation: the in-memory MVCC store retains tombstones from +// deleted delta keys. ScanAt must skip these, slowing resolveListMeta over +// time. In production (Pebble LSM), LSM compaction removes tombstones so +// this overhead disappears. This makes Claim appear slower than it is. +// 2. Raft latency not modelled: OCC retry cost here is just a sleep. In a +// real cluster each failed commit wastes one Raft round-trip (10–100 ms). +// Claim's key benefit — eliminating RPUSH-LPOP conflicts entirely — is +// therefore greatly understated by this benchmark. +// +// Run with: +// go test ./adapter/ -run='^$' -bench='BenchmarkBullMQ' -benchtime=5s -benchmem +// +// Observed on Apple M1 Max (in-memory store, GOMAXPROCS=10): +// BenchmarkBullMQ_Legacy_RPushLPOP/Parallel1-10 ~5.3 µs/op (+49% at Parallel16) +// BenchmarkBullMQ_Legacy_RPushLPOP/Parallel16-10 ~7.9 µs/op +// BenchmarkBullMQ_Claim_RPushLPOP/Parallel1-10 ~606 µs/op higher due to delta scan +// BenchmarkBullMQ_Claim_RPushLPOP/Parallel16-10 ~1.8 ms/op +// +// The production benefit of Claim is in the conflict rate, not per-op overhead: +// • Legacy at 1000 req/s with 10 concurrent writers: ~900 conflicts/s × Raft RTT +// • Claim at 1000 req/s with 10 concurrent writers: ~0 RPUSH-LPOP conflicts + +import ( + "bytes" + "context" + "fmt" + "io" + "log/slog" + "math" + "testing" + "time" + + "github.com/bootjp/elastickv/kv" + "github.com/bootjp/elastickv/store" + "github.com/cockroachdb/errors" +) + +// discardLogger suppresses compactor INFO noise in benchmark output. +var discardLogger = slog.New(slog.NewTextHandler(io.Discard, nil)) + +// ---- OCC-aware coordinator -------------------------------------------------- + +// occAdapterCoordinator wraps localAdapterCoordinator and routes IsTxn=true +// requests through store.ApplyMutations, enabling write-write conflict +// detection identical to the production Raft FSM (kv/fsm.go §handleOnePhaseTxnRequest). +// Non-txn requests fall through to the underlying coordinator unchanged. +type occAdapterCoordinator struct { + *localAdapterCoordinator +} + +func newOCCAdapterCoordinator(st store.MVCCStore) *occAdapterCoordinator { + return &occAdapterCoordinator{localAdapterCoordinator: newLocalAdapterCoordinator(st)} +} + +func (c *occAdapterCoordinator) Dispatch(ctx context.Context, req *kv.OperationGroup[kv.OP]) (*kv.CoordinateResponse, error) { + if req == nil { + return &kv.CoordinateResponse{}, nil + } + commitTS, err := c.commitTSForRequest(req) + if err != nil { + return nil, err + } + if !req.IsTxn { + return c.localAdapterCoordinator.Dispatch(ctx, req) + } + mutations, err := c.collectMutations(ctx, req.Elems, commitTS) + if err != nil { + return nil, err + } + if len(mutations) == 0 { + return &kv.CoordinateResponse{}, nil + } + if err := c.store.ApplyMutations(ctx, mutations, req.ReadKeys, req.StartTS, commitTS); err != nil { + return nil, err + } + return &kv.CoordinateResponse{}, nil +} + +func (c *occAdapterCoordinator) collectMutations(_ context.Context, elems []*kv.Elem[kv.OP], _ uint64) ([]*store.KVPairMutation, error) { + mutations := make([]*store.KVPairMutation, 0, len(elems)) + for _, elem := range elems { + if elem == nil { + continue + } + switch elem.Op { + case kv.Put: + mutations = append(mutations, &store.KVPairMutation{Op: store.OpTypePut, Key: elem.Key, Value: elem.Value}) + case kv.Del: + mutations = append(mutations, &store.KVPairMutation{Op: store.OpTypeDelete, Key: elem.Key}) + case kv.DelPrefix: + // ApplyMutations has no prefix-delete op; applying it here before the + // OCC conflict check would break atomicity. List benchmark ops never + // emit DelPrefix, so reject it to make any such misuse explicit. + return nil, errors.New("DelPrefix not supported in transactional OCC simulation") + } + } + return mutations, nil +} + +// ---- Legacy implementations (main-branch simulation) ------------------------ + +// listRPushLegacy simulates the main-branch RPUSH: writes item keys AND the +// base metadata key in one IsTxn transaction. This conflicts with any +// concurrent RPUSH or LPOP that also writes the metadata key. +func (r *RedisServer) listRPushLegacy(ctx context.Context, key []byte, values [][]byte) error { + if len(values) == 0 { + return nil + } + return r.retryRedisWrite(ctx, func() error { + readTS := r.readTS() + + raw, getErr := r.store.GetAt(ctx, store.ListMetaKey(key), readTS) + var meta store.ListMeta + if getErr != nil && !errors.Is(getErr, store.ErrKeyNotFound) { + return getErr + } + if getErr == nil { + meta, getErr = store.UnmarshalListMeta(raw) + if getErr != nil { + return getErr + } + } + + elems := make([]*kv.Elem[kv.OP], 0, len(values)+1) + seq := meta.Head + meta.Len + for _, v := range values { + elems = append(elems, &kv.Elem[kv.OP]{ + Op: kv.Put, + Key: store.ListItemKey(key, seq), + Value: bytes.Clone(v), + }) + seq++ + } + delta := int64(len(values)) + if meta.Len > math.MaxInt64-delta { + return errors.New("list length overflow") + } + meta.Len += delta + metaBytes, marshalErr := store.MarshalListMeta(meta) + if marshalErr != nil { + return marshalErr + } + elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Put, Key: store.ListMetaKey(key), Value: metaBytes}) + return r.dispatchElems(ctx, true, readTS, elems) + }) +} + +// listPopLegacyRMW simulates the main-branch LPOP: reads and rewrites the base +// metadata key in one IsTxn transaction. This conflicts with any concurrent +// RPUSH or LPOP that also writes the metadata key. +func (r *RedisServer) listPopLegacyRMW(ctx context.Context, key []byte) error { + return r.retryRedisWrite(ctx, func() error { + readTS := r.readTS() + + raw, getErr := r.store.GetAt(ctx, store.ListMetaKey(key), readTS) + if errors.Is(getErr, store.ErrKeyNotFound) { + return nil + } + if getErr != nil { + return getErr + } + meta, unmarshalErr := store.UnmarshalListMeta(raw) + if unmarshalErr != nil || meta.Len == 0 { + return unmarshalErr + } + + seq := meta.Head + itemKey := store.ListItemKey(key, seq) + itemRaw, getItemErr := r.store.GetAt(ctx, itemKey, readTS) + if errors.Is(getItemErr, store.ErrKeyNotFound) { + return nil + } + if getItemErr != nil { + return getItemErr + } + _ = string(itemRaw) + + newMeta := store.ListMeta{Head: meta.Head + 1, Len: meta.Len - 1} + metaBytes, marshalErr := store.MarshalListMeta(newMeta) + if marshalErr != nil { + return marshalErr + } + return r.dispatchElems(ctx, true, readTS, []*kv.Elem[kv.OP]{ + {Op: kv.Del, Key: bytes.Clone(itemKey)}, + {Op: kv.Put, Key: store.ListMetaKey(key), Value: metaBytes}, + }) + }) +} + +// ---- Benchmark helpers ------------------------------------------------------ + +const benchQueueKey = "bm-queue" + +func benchCompactorOpts() []DeltaCompactorOption { + return []DeltaCompactorOption{ + WithDeltaCompactorMaxDeltaCount(8), + WithDeltaCompactorScanInterval(5 * time.Millisecond), + WithDeltaCompactorTimeout(2 * time.Second), + WithDeltaCompactorLogger(discardLogger), + } +} + +func startCompactor(r *RedisServer) (cancel context.CancelFunc) { + ctx, cancelFn := context.WithCancel(context.Background()) + c := NewDeltaCompactor(r.store, r.coordinator, benchCompactorOpts()...) + done := make(chan struct{}) + go func() { + defer close(done) + _ = c.Run(ctx) + }() + return func() { + cancelFn() + <-done + } +} + +// retryUntilSuccess calls fn in a loop, retrying when fn returns an error +// wrapping ErrWriteConflict (including "retry limit exceeded"). +// This simulates a Redis client that reissues a command after receiving an error. +func retryUntilSuccess(ctx context.Context, fn func() error) error { + for { + err := fn() + if err == nil { + return nil + } + if errors.Is(err, store.ErrWriteConflict) { + if ctx.Err() != nil { + return ctx.Err() + } + continue + } + return err + } +} + +func makeItems(n int) [][]byte { + items := make([][]byte, n) + for i := range items { + items[i] = []byte(fmt.Sprintf("seed-%d", i)) + } + return items +} + +// ---- BullMQ workload benchmarks --------------------------------------------- + +// BenchmarkBullMQ_Legacy_RPushLPOP measures the main-branch RPUSH+LPOP pattern. +// Each goroutine pushes one item and pops one item per iteration. +// RPUSH and LPOP both write the base metadata key, so they conflict with each +// other and with concurrent operations from other goroutines. +// At higher parallelism, OCC conflicts accumulate and throughput degrades. +func BenchmarkBullMQ_Legacy_RPushLPOP(b *testing.B) { + for _, par := range []int{1, 4, 16} { + b.Run(fmt.Sprintf("Parallel%d", par), func(b *testing.B) { + st := store.NewMVCCStore() + coord := newOCCAdapterCoordinator(st) + r := NewRedisServer(nil, "", st, coord, nil, nil) + key := []byte(benchQueueKey + "-legacy-e2e") + ctx := context.Background() + + // Seed queue so consumers always have data. + if err := r.listRPushLegacy(ctx, key, makeItems(1024)); err != nil { + b.Fatal(err) + } + + b.ResetTimer() + b.ReportAllocs() + b.SetParallelism(par) + b.RunParallel(func(pb *testing.PB) { + item := [][]byte{[]byte("job")} + for pb.Next() { + if err := retryUntilSuccess(ctx, func() error { + return r.listRPushLegacy(ctx, key, item) + }); err != nil { + b.Error(err) + return + } + if err := retryUntilSuccess(ctx, func() error { + return r.listPopLegacyRMW(ctx, key) + }); err != nil { + b.Error(err) + return + } + } + }) + }) + } +} + +// BenchmarkBullMQ_Claim_RPushLPOP measures the current-branch RPUSH+LPOP pattern. +// RPUSH emits a unique delta key (commitTS-keyed); LPOP emits a claim key +// (seq-keyed) + delta key. RPUSH and LPOP never conflict with each other; +// concurrent pops only conflict when two goroutines race for the same seq. +// The DeltaCompactor folds delta keys in the background. +// At higher parallelism, throughput scales because RPUSH-LPOP conflicts +// are eliminated. +func BenchmarkBullMQ_Claim_RPushLPOP(b *testing.B) { + for _, par := range []int{1, 4, 16} { + b.Run(fmt.Sprintf("Parallel%d", par), func(b *testing.B) { + st := store.NewMVCCStore() + coord := newOCCAdapterCoordinator(st) + r := NewRedisServer(nil, "", st, coord, nil, nil) + key := []byte(benchQueueKey + "-claim-e2e") + ctx := context.Background() + + cancel := startCompactor(r) + defer cancel() + + // Seed queue so consumers always have data. + if _, err := r.listRPush(ctx, key, makeItems(1024)); err != nil { + b.Fatal(err) + } + + b.ResetTimer() + b.ReportAllocs() + b.SetParallelism(par) + b.RunParallel(func(pb *testing.PB) { + item := [][]byte{[]byte("job")} + for pb.Next() { + if err := retryUntilSuccess(ctx, func() error { + _, err := r.listRPush(ctx, key, item) + return err + }); err != nil { + b.Error(err) + return + } + if err := retryUntilSuccess(ctx, func() error { + _, err := r.listPopClaim(ctx, key, 1, true) + return err + }); err != nil { + b.Error(err) + return + } + } + }) + }) + } +} diff --git a/adapter/redis_list_raft_benchmark_test.go b/adapter/redis_list_raft_benchmark_test.go new file mode 100644 index 000000000..c7a2a0583 --- /dev/null +++ b/adapter/redis_list_raft_benchmark_test.go @@ -0,0 +1,268 @@ +package adapter + +// BullMQ-style workload benchmark on a real 3-node in-process Raft cluster. +// +// Why this complements the in-memory benchmark: +// The in-memory benchmark (redis_list_pop_benchmark_test.go) uses an +// occAdapterCoordinator backed by a plain MVCCStore with a simulated sleep +// for OCC retries. In a real Raft cluster each failed commit (write-write +// conflict) wastes one full Raft round-trip (~1 ms loopback). This +// benchmark captures that real RTT cost. +// +// Legacy vs Claim on Raft: +// Legacy: every RPUSH and LPOP writes the same metadata key. +// Raft serializes all writers; OCC detects conflicts in ApplyMutations. +// Each conflict wastes exactly one committed (but rejected) Raft log entry. +// Claim: RPUSH writes a unique delta key (commitTS-keyed); +// LPOP writes a claim key (seq-keyed) + delta key. +// RPUSH-LPOP conflicts are eliminated entirely. +// Concurrent LPOPs still race for the same claim seq; concurrent RPUSHes +// still race for the same item seq — but at lower probability than Legacy +// where ALL operation types conflict with ALL others. +// +// Store: NewPebbleStore(b.TempDir()) — disk-backed Pebble in a temp directory. +// Background LSM compaction runs and removes tombstones, so the delta scan +// stays O(live deltas) ≤ compactor threshold throughout the benchmark. +// Disk I/O overhead is present but Raft round-trip (~0.2 ms loopback) dominates. +// +// Observed on Apple M1 Max (3-node loopback cluster, GOMAXPROCS=10, benchtime=30s): +// +// Benchmark ns/op total ops system ops/s +// ────────────────────────────────────────────────────────────────────── +// Legacy/Parallel1 (10 goroutines) 181 µs 472 713 15 757 +// Legacy/Parallel4 (40 goroutines) 229 µs 392 815 13 094 -17 % +// Claim /Parallel1 (10 goroutines) 3.6 ms 10 000 333 +// Claim /Parallel4 (40 goroutines) 2.5 ms 12 194 406 +22 % +// +// Key insight — read TOTAL THROUGHPUT (ops/s = total_iterations/30s), not ns/op: +// Legacy: 4× goroutines → total throughput DROPS -17 %. +// RPUSH and LPOP both write the same meta key; OCC conflicts grow +// super-linearly and saturate the Raft pipeline with rejected commits. +// Claim: 4× goroutines → total throughput RISES +22 %. +// RPUSH-LPOP conflicts are eliminated; Raft commit batching improves +// with more concurrent proposals, so throughput scales with concurrency. +// +// Absolute latency note: +// Claim's ns/op is ~15× higher than Legacy because resolveListMeta +// calls ScanAt (Pebble range iterator) vs GetAt (point lookup) for Legacy. +// Even with ≤8 live deltas the iterator open/seek/close overhead adds ~2 ms. +// In production this is paid against a 10–100 ms Raft RTT where it is +// negligible; the throughput-scaling difference is what matters under load. +// +// Run with: +// go test ./adapter/ -run='^$' -bench='BenchmarkBullMQ_Raft' -benchtime=30s -benchmem -timeout=600s + +import ( + "context" + "fmt" + "net" + "strconv" + "testing" + "time" + + internalutil "github.com/bootjp/elastickv/internal" + "github.com/bootjp/elastickv/kv" + pb "github.com/bootjp/elastickv/proto" + "github.com/bootjp/elastickv/store" + "github.com/hashicorp/raft" + "google.golang.org/grpc" +) + +func createBenchNode(b *testing.B, ctx context.Context, isLeader bool, port portsAdress, cfg raft.Configuration, leaderRedisMap map[raft.ServerAddress]string, lis listeners) Node { + b.Helper() + st, err := store.NewPebbleStore(b.TempDir()) + if err != nil { + b.Fatal(err) + } + hlc := kv.NewHLC() + fsm := kv.NewKvFSMWithHLC(st, hlc) + + electionTimeout := leaderElectionTimeout + if !isLeader { + electionTimeout = followerElectionTimeout + } + + id := strconv.Itoa(port.raft) + r, tm, err := newRaft(id, port.raftAddress, fsm, isLeader, cfg, electionTimeout) + if err != nil { + b.Fatal(err) + } + + s := grpc.NewServer(internalutil.GRPCServerOptions()...) + trx := kv.NewTransaction(r) + coordinator := kv.NewCoordinator(trx, r, kv.WithHLC(hlc)) + relay := NewRedisPubSubRelay() + routedStore := kv.NewLeaderRoutedStore(st, coordinator) + gs := NewGRPCServer(routedStore, coordinator, WithCloseStore()) + _, opsCancel := context.WithCancel(ctx) + + tm.Register(s) + pb.RegisterRawKVServer(s, gs) + pb.RegisterTransactionalKVServer(s, gs) + + go func(srv *grpc.Server, l net.Listener) { + _ = srv.Serve(l) + }(s, lis.grpc) + + rd := NewRedisServer(lis.redis, port.redisAddress, routedStore, coordinator, leaderRedisMap, relay) + if err := lis.dynamo.Close(); err != nil { + b.Logf("failed to close unused dynamo listener: %v", err) + } + + return newNode(port.grpcAddress, port.raftAddress, port.redisAddress, port.dynamoAddress, r, tm, s, gs, rd, nil, opsCancel) +} + +// createBenchCluster spins up an n-node in-process Raft cluster suitable for +// benchmarks. Unlike createNode (which requires *testing.T), this function +// uses b.Fatal for error reporting. +// +// The returned cleanup function stops all nodes and closes listeners. +func createBenchCluster(b *testing.B, n int) ([]Node, func()) { + b.Helper() + ctx := context.Background() + ports := assignPorts(n) + + lc := net.ListenConfig{} + lis := make([]listeners, n) + for i := range n { + for { + bound, l, retry, err := bindListeners(ctx, &lc, ports[i]) + if err != nil { + b.Fatal(err) + } + if retry { + ports[i] = portAssigner() + continue + } + ports[i] = bound + lis[i] = l + break + } + } + + // Build Raft config after port binding so retried ports are reflected. + cfg := buildRaftConfig(n, ports) + leaderRedisMap := make(map[raft.ServerAddress]string, n) + for _, p := range ports { + leaderRedisMap[raft.ServerAddress(p.raftAddress)] = p.redisAddress + } + + var nodes []Node + for i := range n { + node := createBenchNode(b, ctx, i == 0, ports[i], cfg, leaderRedisMap, lis[i]) + nodes = append(nodes, node) + } + + // Wait for node[0] to win the leader election. + deadline := time.Now().Add(10 * time.Second) + for time.Now().Before(deadline) { + if nodes[0].raft.State() == raft.Leader { + break + } + time.Sleep(50 * time.Millisecond) + } + if nodes[0].raft.State() != raft.Leader { + b.Fatal("node 0 did not become leader within 10s") + } + + return nodes, func() { shutdown(nodes) } +} + +// BenchmarkBullMQ_Raft_Legacy_RPushLPOP measures the main-branch RPUSH+LPOP +// pattern on a 3-node in-process Raft cluster. +// +// Both RPUSH and LPOP write the base metadata key, so Raft serializes them +// and the FSM's ApplyMutations detects write-write conflicts whenever two +// operations overlap. Each conflict costs one wasted Raft round-trip. +func BenchmarkBullMQ_Raft_Legacy_RPushLPOP(b *testing.B) { + for _, par := range []int{1, 4} { + b.Run(fmt.Sprintf("Parallel%d", par), func(b *testing.B) { + nodes, cleanup := createBenchCluster(b, 3) + defer cleanup() + + leader := nodes[0].redisServer + key := []byte("raft-bm-queue-legacy") + ctx := context.Background() + + // Seed queue so consumers always find data. + if err := leader.listRPushLegacy(ctx, key, makeItems(256)); err != nil { + b.Fatal(err) + } + + b.ResetTimer() + b.ReportAllocs() + b.SetParallelism(par) + b.RunParallel(func(pb *testing.PB) { + item := [][]byte{[]byte("job")} + for pb.Next() { + if err := retryUntilSuccess(ctx, func() error { + return leader.listRPushLegacy(ctx, key, item) + }); err != nil { + b.Error(err) + return + } + if err := retryUntilSuccess(ctx, func() error { + return leader.listPopLegacyRMW(ctx, key) + }); err != nil { + b.Error(err) + return + } + } + }) + }) + } +} + +// BenchmarkBullMQ_Raft_Claim_RPushLPOP measures the current-branch RPUSH+LPOP +// pattern on a 3-node in-process Raft cluster. +// +// RPUSH emits a unique delta key (commitTS-keyed); LPOP emits a claim key +// (seq-keyed, unique per item) + delta key. RPUSH and LPOP write completely +// different keys so they never conflict. Each operation uses exactly one +// Raft round-trip regardless of concurrency. +func BenchmarkBullMQ_Raft_Claim_RPushLPOP(b *testing.B) { + for _, par := range []int{1, 4} { + b.Run(fmt.Sprintf("Parallel%d", par), func(b *testing.B) { + nodes, cleanup := createBenchCluster(b, 3) + defer cleanup() + + leader := nodes[0].redisServer + key := []byte("raft-bm-queue-claim") + ctx := context.Background() + + // DeltaCompactor folds accumulated delta keys in the background. + // It uses the leader's store and coordinator, so compaction results + // go through Raft and are replicated to all nodes. + cancel := startCompactor(leader) + defer cancel() + + // Seed queue so consumers always find data. + if _, err := leader.listRPush(ctx, key, makeItems(256)); err != nil { + b.Fatal(err) + } + + b.ResetTimer() + b.ReportAllocs() + b.SetParallelism(par) + b.RunParallel(func(pb *testing.PB) { + item := [][]byte{[]byte("job")} + for pb.Next() { + if err := retryUntilSuccess(ctx, func() error { + _, err := leader.listRPush(ctx, key, item) + return err + }); err != nil { + b.Error(err) + return + } + if err := retryUntilSuccess(ctx, func() error { + _, err := leader.listPopClaim(ctx, key, 1, true) + return err + }); err != nil { + b.Error(err) + return + } + } + }) + }) + } +} diff --git a/adapter/redis_storage_codec.go b/adapter/redis_storage_codec.go index 43eb2788f..3ce67bf1d 100644 --- a/adapter/redis_storage_codec.go +++ b/adapter/redis_storage_codec.go @@ -6,7 +6,6 @@ import ( pb "github.com/bootjp/elastickv/proto" "github.com/cockroachdb/errors" - json "github.com/goccy/go-json" gproto "google.golang.org/protobuf/proto" ) @@ -17,7 +16,8 @@ var ( storedRedisStreamProtoPrefix = []byte{0x00, 'R', 'X', 0x01} storedRedisMarshalOptions = gproto.MarshalOptions{Deterministic: true} - errStoredRedisMessageTooLarge = errors.New("stored redis message too large") + errStoredRedisMessageTooLarge = errors.New("stored redis message too large") + errUnrecognizedStoredRedisFormat = errors.New("unrecognized stored redis format") ) func marshalHashValue(v redisHashValue) ([]byte, error) { @@ -31,19 +31,14 @@ func unmarshalHashValue(raw []byte) (redisHashValue, error) { if len(raw) == 0 { return redisHashValue{}, nil } - if hasStoredRedisPrefix(raw, storedRedisHashProtoPrefix) { - msg := &pb.RedisHashValue{} - if err := gproto.Unmarshal(raw[len(storedRedisHashProtoPrefix):], msg); err != nil { - return nil, errors.WithStack(err) - } - return redisHashValueFromProto(msg), nil + if !hasStoredRedisPrefix(raw, storedRedisHashProtoPrefix) { + return nil, errUnrecognizedStoredRedisFormat } - - out := redisHashValue{} - if err := json.Unmarshal(raw, &out); err != nil { + msg := &pb.RedisHashValue{} + if err := gproto.Unmarshal(raw[len(storedRedisHashProtoPrefix):], msg); err != nil { return nil, errors.WithStack(err) } - return out, nil + return redisHashValueFromProto(msg), nil } func marshalSetValue(v redisSetValue) ([]byte, error) { @@ -55,18 +50,14 @@ func unmarshalSetValue(raw []byte) (redisSetValue, error) { if len(raw) == 0 { return redisSetValue{}, nil } - var out redisSetValue - if hasStoredRedisPrefix(raw, storedRedisSetProtoPrefix) { - msg := &pb.RedisSetValue{} - if err := gproto.Unmarshal(raw[len(storedRedisSetProtoPrefix):], msg); err != nil { - return redisSetValue{}, errors.WithStack(err) - } - out = redisSetValueFromProto(msg) - } else { - if err := json.Unmarshal(raw, &out); err != nil { - return redisSetValue{}, errors.WithStack(err) - } + if !hasStoredRedisPrefix(raw, storedRedisSetProtoPrefix) { + return redisSetValue{}, errUnrecognizedStoredRedisFormat + } + msg := &pb.RedisSetValue{} + if err := gproto.Unmarshal(raw[len(storedRedisSetProtoPrefix):], msg); err != nil { + return redisSetValue{}, errors.WithStack(err) } + out := redisSetValueFromProto(msg) sortStrings(out.Members) return out, nil } @@ -80,18 +71,14 @@ func unmarshalZSetValue(raw []byte) (redisZSetValue, error) { if len(raw) == 0 { return redisZSetValue{}, nil } - var out redisZSetValue - if hasStoredRedisPrefix(raw, storedRedisZSetProtoPrefix) { - msg := &pb.RedisZSetValue{} - if err := gproto.Unmarshal(raw[len(storedRedisZSetProtoPrefix):], msg); err != nil { - return redisZSetValue{}, errors.WithStack(err) - } - out = redisZSetValueFromProto(msg) - } else { - if err := json.Unmarshal(raw, &out); err != nil { - return redisZSetValue{}, errors.WithStack(err) - } + if !hasStoredRedisPrefix(raw, storedRedisZSetProtoPrefix) { + return redisZSetValue{}, errUnrecognizedStoredRedisFormat } + msg := &pb.RedisZSetValue{} + if err := gproto.Unmarshal(raw[len(storedRedisZSetProtoPrefix):], msg); err != nil { + return redisZSetValue{}, errors.WithStack(err) + } + out := redisZSetValueFromProto(msg) sortZSetEntries(out.Entries) return out, nil } @@ -104,21 +91,14 @@ func unmarshalStreamValue(raw []byte) (redisStreamValue, error) { if len(raw) == 0 { return redisStreamValue{}, nil } - if hasStoredRedisPrefix(raw, storedRedisStreamProtoPrefix) { - msg := &pb.RedisStreamValue{} - if err := gproto.Unmarshal(raw[len(storedRedisStreamProtoPrefix):], msg); err != nil { - return redisStreamValue{}, errors.WithStack(err) - } - return redisStreamValueFromProto(msg), nil + if !hasStoredRedisPrefix(raw, storedRedisStreamProtoPrefix) { + return redisStreamValue{}, errUnrecognizedStoredRedisFormat } - - var out redisStreamValue - if err := json.Unmarshal(raw, &out); err != nil { + msg := &pb.RedisStreamValue{} + if err := gproto.Unmarshal(raw[len(storedRedisStreamProtoPrefix):], msg); err != nil { return redisStreamValue{}, errors.WithStack(err) } - // Legacy JSON payloads were stored in-order, so only cache parsed IDs. - out.cacheParsedIDs() - return out, nil + return redisStreamValueFromProto(msg), nil } func marshalStoredRedisMessage(prefix []byte, msg gproto.Message) ([]byte, error) { diff --git a/adapter/redis_storage_codec_test.go b/adapter/redis_storage_codec_test.go index 16c21e2ea..31aff1d1a 100644 --- a/adapter/redis_storage_codec_test.go +++ b/adapter/redis_storage_codec_test.go @@ -3,7 +3,6 @@ package adapter import ( "testing" - json "github.com/goccy/go-json" "github.com/stretchr/testify/require" ) @@ -24,18 +23,6 @@ func TestStoredRedisHashCodec_RoundTripProto(t *testing.T) { require.Equal(t, value, decoded) } -func TestStoredRedisHashCodec_LegacyJSONFallback(t *testing.T) { - t.Parallel() - - legacy := redisHashValue{"field": "value"} - body, err := json.Marshal(legacy) - require.NoError(t, err) - - decoded, err := unmarshalHashValue(body) - require.NoError(t, err) - require.Equal(t, legacy, decoded) -} - func TestStoredRedisSetCodec_RoundTripProto(t *testing.T) { t.Parallel() @@ -50,18 +37,6 @@ func TestStoredRedisSetCodec_RoundTripProto(t *testing.T) { require.Equal(t, redisSetValue{Members: []string{"a", "b"}}, decoded) } -func TestStoredRedisSetCodec_LegacyJSONFallback(t *testing.T) { - t.Parallel() - - legacy := redisSetValue{Members: []string{"b", "a"}} - body, err := json.Marshal(legacy) - require.NoError(t, err) - - decoded, err := unmarshalSetValue(body) - require.NoError(t, err) - require.Equal(t, redisSetValue{Members: []string{"a", "b"}}, decoded) -} - func TestStoredRedisZSetCodec_RoundTripProto(t *testing.T) { t.Parallel() @@ -86,28 +61,6 @@ func TestStoredRedisZSetCodec_RoundTripProto(t *testing.T) { }, decoded) } -func TestStoredRedisZSetCodec_LegacyJSONFallback(t *testing.T) { - t.Parallel() - - legacy := redisZSetValue{ - Entries: []redisZSetEntry{ - {Member: "b", Score: 2}, - {Member: "a", Score: 1}, - }, - } - body, err := json.Marshal(legacy) - require.NoError(t, err) - - decoded, err := unmarshalZSetValue(body) - require.NoError(t, err) - require.Equal(t, redisZSetValue{ - Entries: []redisZSetEntry{ - {Member: "a", Score: 1}, - {Member: "b", Score: 2}, - }, - }, decoded) -} - func TestStoredRedisStreamCodec_RoundTripProto(t *testing.T) { t.Parallel() @@ -127,25 +80,3 @@ func TestStoredRedisStreamCodec_RoundTripProto(t *testing.T) { require.Equal(t, value, decoded) require.True(t, decoded.Entries[0].parsedIDValid) } - -func TestStoredRedisStreamCodec_LegacyJSONFallback(t *testing.T) { - t.Parallel() - - legacy := redisStreamValue{ - Entries: []redisStreamEntry{ - {ID: "1001-0", Fields: []string{"field", "value"}}, - {ID: "1002-0", Fields: []string{"field", "value-2"}}, - }, - } - body, err := json.Marshal(legacy) - require.NoError(t, err) - - decoded, err := unmarshalStreamValue(body) - require.NoError(t, err) - require.Equal(t, redisStreamValue{ - Entries: []redisStreamEntry{ - newRedisStreamEntry("1001-0", []string{"field", "value"}), - newRedisStreamEntry("1002-0", []string{"field", "value-2"}), - }, - }, decoded) -} diff --git a/adapter/redis_storage_migration_test.go b/adapter/redis_storage_migration_test.go deleted file mode 100644 index f30de6d5d..000000000 --- a/adapter/redis_storage_migration_test.go +++ /dev/null @@ -1,255 +0,0 @@ -package adapter - -import ( - "bytes" - "context" - "testing" - - "github.com/bootjp/elastickv/store" - json "github.com/goccy/go-json" - "github.com/stretchr/testify/require" -) - -type redisFixtureWriter struct { - t *testing.T - store store.MVCCStore - nextTS uint64 -} - -func newRedisFixtureWriter(t *testing.T, st store.MVCCStore) *redisFixtureWriter { - t.Helper() - return &redisFixtureWriter{t: t, store: st, nextTS: 1} -} - -func (w *redisFixtureWriter) put(key []byte, value []byte) { - w.t.Helper() - require.NoError(w.t, w.store.PutAt(context.Background(), key, value, w.nextTS, 0)) - w.nextTS++ -} - -func newRedisStorageMigrationTestServer(t *testing.T) (*RedisServer, store.MVCCStore) { - t.Helper() - st := store.NewMVCCStore() - server := NewRedisServer(nil, "", st, newLocalAdapterCoordinator(st), nil, nil) - return server, st -} - -func assertRedisReadDoesNotRewriteLegacyJSON(t *testing.T, st store.MVCCStore, storageKey, rawBefore, protoPrefix []byte) { - t.Helper() - rawAfterRead := mustReadRawValue(t, st, storageKey) - require.False(t, hasStoredRedisPrefix(rawBefore, protoPrefix)) - require.False(t, hasStoredRedisPrefix(rawAfterRead, protoPrefix)) - require.True(t, bytes.Equal(rawBefore, rawAfterRead)) -} - -func TestRedisHashLegacyJSONReadThenRewriteToProto(t *testing.T) { - t.Parallel() - - server, st := newRedisStorageMigrationTestServer(t) - writer := newRedisFixtureWriter(t, st) - key := []byte("legacy:hash") - storageKey := redisHashKey(key) - - legacy := redisHashValue{"field": "old"} - rawBefore, err := json.Marshal(legacy) - require.NoError(t, err) - writer.put(storageKey, rawBefore) - - value, err := server.loadHashAt(context.Background(), key, server.readTS()) - require.NoError(t, err) - require.Equal(t, legacy, value) - assertRedisReadDoesNotRewriteLegacyJSON(t, st, storageKey, rawBefore, storedRedisHashProtoPrefix) - - added, err := server.applyHashFieldPairs(key, [][]byte{[]byte("next"), []byte("new")}) - require.NoError(t, err) - require.Equal(t, 1, added) - - // After migration-on-write the legacy blob is deleted; wide-column field keys are used instead. - readTS := server.readTS() - _, err = st.GetAt(context.Background(), storageKey, readTS) - require.ErrorIs(t, err, store.ErrKeyNotFound, "legacy blob should be deleted after wide-column migration") - - // loadHashAt should aggregate all wide-column fields including the migrated one. - got, err := server.loadHashAt(context.Background(), key, readTS) - require.NoError(t, err) - require.Equal(t, redisHashValue{"field": "old", "next": "new"}, got) - - // Each field key must be present in the store. - for _, field := range []string{"field", "next"} { - _, err = st.GetAt(context.Background(), store.HashFieldKey(key, []byte(field)), readTS) - require.NoError(t, err, "field key %q should exist after migration", field) - } -} - -func TestRedisSetLegacyJSONReadThenRewriteToProto(t *testing.T) { - t.Parallel() - - server, st := newRedisStorageMigrationTestServer(t) - writer := newRedisFixtureWriter(t, st) - key := []byte("legacy:set") - storageKey := redisSetKey(key) - - legacy := redisSetValue{Members: []string{"b", "a"}} - rawBefore, err := json.Marshal(legacy) - require.NoError(t, err) - writer.put(storageKey, rawBefore) - - value, err := server.loadSetAt(context.Background(), "set", key, server.readTS()) - require.NoError(t, err) - require.Equal(t, redisSetValue{Members: []string{"a", "b"}}, value) - assertRedisReadDoesNotRewriteLegacyJSON(t, st, storageKey, rawBefore, storedRedisSetProtoPrefix) - - conn := &recordingConn{} - server.mutateExactSet(conn, "set", key, [][]byte{[]byte("c")}, true) - require.Empty(t, conn.err) - require.Equal(t, int64(1), conn.int) - - // After migration-on-write the legacy blob is deleted; wide-column member keys are used instead. - readTS := server.readTS() - _, err = st.GetAt(context.Background(), storageKey, readTS) - require.ErrorIs(t, err, store.ErrKeyNotFound, "legacy blob should be deleted after wide-column migration") - - // loadSetAt should aggregate all wide-column members including the migrated ones. - got, err := server.loadSetAt(context.Background(), "set", key, readTS) - require.NoError(t, err) - require.Equal(t, redisSetValue{Members: []string{"a", "b", "c"}}, got) - - // Each member key must be present in the store. - for _, member := range []string{"a", "b", "c"} { - _, err = st.GetAt(context.Background(), store.SetMemberKey(key, []byte(member)), readTS) - require.NoError(t, err, "member key %q should exist after migration", member) - } -} - -func TestRedisHLLLegacyJSONReadThenRewriteToProto(t *testing.T) { - t.Parallel() - - server, st := newRedisStorageMigrationTestServer(t) - writer := newRedisFixtureWriter(t, st) - key := []byte("legacy:hll") - storageKey := redisHLLKey(key) - - legacy := redisSetValue{Members: []string{"b", "a"}} - rawBefore, err := json.Marshal(legacy) - require.NoError(t, err) - writer.put(storageKey, rawBefore) - - value, err := server.loadSetAt(context.Background(), "hll", key, server.readTS()) - require.NoError(t, err) - require.Equal(t, redisSetValue{Members: []string{"a", "b"}}, value) - assertRedisReadDoesNotRewriteLegacyJSON(t, st, storageKey, rawBefore, storedRedisSetProtoPrefix) - - conn := &recordingConn{} - server.mutateExactSet(conn, "hll", key, [][]byte{[]byte("c")}, true) - require.Empty(t, conn.err) - require.Equal(t, int64(1), conn.int) - - rawAfterWrite := mustReadRawValue(t, st, storageKey) - require.True(t, hasStoredRedisPrefix(rawAfterWrite, storedRedisSetProtoPrefix)) - - decoded, err := unmarshalSetValue(rawAfterWrite) - require.NoError(t, err) - require.Equal(t, redisSetValue{Members: []string{"a", "b", "c"}}, decoded) -} - -func TestRedisZSetLegacyJSONReadThenRewriteToProto(t *testing.T) { - t.Parallel() - - server, st := newRedisStorageMigrationTestServer(t) - writer := newRedisFixtureWriter(t, st) - key := []byte("legacy:zset") - storageKey := redisZSetKey(key) - - legacy := redisZSetValue{ - Entries: []redisZSetEntry{ - {Member: "b", Score: 2}, - {Member: "a", Score: 1}, - }, - } - rawBefore, err := json.Marshal(legacy) - require.NoError(t, err) - writer.put(storageKey, rawBefore) - - value, exists, err := server.loadZSetAt(context.Background(), key, server.readTS()) - require.NoError(t, err) - require.True(t, exists) - require.Equal(t, redisZSetValue{ - Entries: []redisZSetEntry{ - {Member: "a", Score: 1}, - {Member: "b", Score: 2}, - }, - }, value) - assertRedisReadDoesNotRewriteLegacyJSON(t, st, storageKey, rawBefore, storedRedisZSetProtoPrefix) - - added, err := server.zaddTxn(context.Background(), key, zaddFlags{}, []zaddPair{{score: 3, member: "c"}}) - require.NoError(t, err) - require.Equal(t, 1, added) - - // After migration-on-write the legacy blob is deleted; wide-column member + score keys are used instead. - readTS := server.readTS() - _, err = st.GetAt(context.Background(), storageKey, readTS) - require.ErrorIs(t, err, store.ErrKeyNotFound, "legacy blob should be deleted after wide-column migration") - - // loadZSetAt should aggregate all wide-column members including the migrated ones. - got, gotExists, err := server.loadZSetAt(context.Background(), key, readTS) - require.NoError(t, err) - require.True(t, gotExists) - require.Equal(t, redisZSetValue{ - Entries: []redisZSetEntry{ - {Member: "a", Score: 1}, - {Member: "b", Score: 2}, - {Member: "c", Score: 3}, - }, - }, got) - - // Each member key and score index key must be present in the store. - for _, entry := range got.Entries { - _, err = st.GetAt(context.Background(), store.ZSetMemberKey(key, []byte(entry.Member)), readTS) - require.NoError(t, err, "member key %q should exist after migration", entry.Member) - _, err = st.GetAt(context.Background(), store.ZSetScoreKey(key, entry.Score, []byte(entry.Member)), readTS) - require.NoError(t, err, "score key %q should exist after migration", entry.Member) - } -} - -func TestRedisStreamLegacyJSONReadThenRewriteToProto(t *testing.T) { - t.Parallel() - - server, st := newRedisStorageMigrationTestServer(t) - writer := newRedisFixtureWriter(t, st) - key := []byte("legacy:stream") - storageKey := redisStreamKey(key) - - legacy := redisStreamValue{ - Entries: []redisStreamEntry{ - {ID: "1001-0", Fields: []string{"field", "old"}}, - }, - } - rawBefore, err := json.Marshal(legacy) - require.NoError(t, err) - writer.put(storageKey, rawBefore) - - value, err := server.loadStreamAt(context.Background(), key, server.readTS()) - require.NoError(t, err) - require.Equal(t, redisStreamValue{ - Entries: []redisStreamEntry{ - newRedisStreamEntry("1001-0", []string{"field", "old"}), - }, - }, value) - assertRedisReadDoesNotRewriteLegacyJSON(t, st, storageKey, rawBefore, storedRedisStreamProtoPrefix) - - id, err := server.xaddTxn(context.Background(), key, xaddRequest{id: "1002-0", fields: []string{"field", "new"}}) - require.NoError(t, err) - require.Equal(t, "1002-0", id) - - rawAfterWrite := mustReadRawValue(t, st, storageKey) - require.True(t, hasStoredRedisPrefix(rawAfterWrite, storedRedisStreamProtoPrefix)) - - decoded, err := unmarshalStreamValue(rawAfterWrite) - require.NoError(t, err) - require.Equal(t, redisStreamValue{ - Entries: []redisStreamEntry{ - newRedisStreamEntry("1001-0", []string{"field", "old"}), - newRedisStreamEntry("1002-0", []string{"field", "new"}), - }, - }, decoded) -} diff --git a/adapter/redis_txn_test.go b/adapter/redis_txn_test.go index 2537f08f5..3355c10b3 100644 --- a/adapter/redis_txn_test.go +++ b/adapter/redis_txn_test.go @@ -10,6 +10,13 @@ import ( "github.com/tidwall/redcon" ) +func newRedisStorageMigrationTestServer(t *testing.T) (*RedisServer, store.MVCCStore) { + t.Helper() + st := store.NewMVCCStore() + server := NewRedisServer(nil, "", st, newLocalAdapterCoordinator(st), nil, nil) + return server, st +} + // TestRedisTxnValidateReadSet_ConcurrentRPushTriggersConflict verifies that a // concurrent RPUSH to a list triggers an OCC read-write conflict for a MULTI // transaction that read the list via LRANGE. Without the boundary key tracking diff --git a/cmd/raft-migrate/main.go b/cmd/raft-migrate/main.go deleted file mode 100644 index 05c5b89b0..000000000 --- a/cmd/raft-migrate/main.go +++ /dev/null @@ -1,39 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "log" - "path/filepath" - - "github.com/bootjp/elastickv/internal/raftstore" -) - -func main() { - var ( - dir = flag.String("dir", "", "Directory containing legacy logs.dat and stable.dat") - out = flag.String("out", "", "Destination Pebble raft.db directory (default: /raft.db)") - ) - flag.Parse() - - if *dir == "" { - log.Fatal("--dir is required") - } - - dest := *out - if dest == "" { - dest = filepath.Join(*dir, "raft.db") - } - - stats, err := raftstore.MigrateLegacyBoltDB( - filepath.Join(*dir, "logs.dat"), - filepath.Join(*dir, "stable.dat"), - dest, - ) - if err != nil { - log.Fatalf("migration failed: %v", err) - } - - fmt.Printf("migrated legacy raft storage to %s (logs=%d stable_keys=%d)\n", dest, stats.Logs, stats.StableKeys) - fmt.Println("next: archive or remove logs.dat and stable.dat before starting elastickv") -} diff --git a/go.mod b/go.mod index c2b9275fe..da434394e 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,6 @@ require ( github.com/getsentry/sentry-go v0.44.1 github.com/goccy/go-json v0.10.6 github.com/hashicorp/go-hclog v1.6.3 - github.com/hashicorp/go-msgpack/v2 v2.1.5 github.com/hashicorp/raft v1.7.3 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.23.2 @@ -28,7 +27,6 @@ require ( github.com/tidwall/redcon v1.6.2 github.com/vmihailenco/msgpack/v5 v5.4.1 github.com/yuin/gopher-lua v1.1.2 - go.etcd.io/bbolt v1.4.3 go.etcd.io/etcd/server/v3 v3.6.10 go.etcd.io/raft/v3 v3.6.0 go.uber.org/zap v1.27.1 @@ -77,6 +75,7 @@ require ( github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect github.com/hashicorp/go-metrics v0.5.4 // indirect + github.com/hashicorp/go-msgpack/v2 v2.1.5 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/jonboulle/clockwork v0.5.0 // indirect @@ -96,6 +95,7 @@ require ( github.com/tidwall/btree v1.1.0 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + go.etcd.io/bbolt v1.4.3 // indirect go.etcd.io/etcd/api/v3 v3.6.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.6.10 // indirect go.etcd.io/etcd/client/v3 v3.6.10 // indirect diff --git a/internal/raftstore/migrate.go b/internal/raftstore/migrate.go deleted file mode 100644 index 0f816babb..000000000 --- a/internal/raftstore/migrate.go +++ /dev/null @@ -1,233 +0,0 @@ -package raftstore - -import ( - "bytes" - "os" - "path/filepath" - - "github.com/cockroachdb/errors" - "github.com/hashicorp/go-msgpack/v2/codec" - "github.com/hashicorp/raft" - "go.etcd.io/bbolt" -) - -const ( - legacyLogsBucket = "logs" - legacyStableBucket = "conf" - legacyBatchSize = 1024 - legacyBoltFileMode = 0o600 - legacyMigrationSuffix = ".migrating" -) - -type MigrationStats struct { - Logs uint64 - StableKeys uint64 -} - -func MigrateLegacyBoltDB(logsPath, stablePath, destDir string) (*MigrationStats, error) { - tempDir, err := prepareMigrationPaths(logsPath, stablePath, destDir) - if err != nil { - return nil, err - } - - logsDB, stableDB, closeSources, err := openLegacySourceDBs(logsPath, stablePath) - if err != nil { - return nil, err - } - defer closeSources() - - stats, err := migrateLegacyBoltToTempDir(logsDB, stableDB, tempDir) - if err != nil { - return nil, err - } - if err := finalizeMigratedStore(tempDir, destDir); err != nil { - return nil, err - } - return stats, nil -} - -func prepareMigrationPaths(logsPath, stablePath, destDir string) (string, error) { - if logsPath == "" { - return "", errors.New("logs path is required") - } - if stablePath == "" { - return "", errors.New("stable path is required") - } - if destDir == "" { - return "", errors.New("destination dir is required") - } - - destDir = filepath.Clean(destDir) - - if err := requireExistingFile(logsPath); err != nil { - return "", err - } - if err := requireExistingFile(stablePath); err != nil { - return "", err - } - if err := requireDestinationAbsent(destDir); err != nil { - return "", err - } - - tempDir := destDir + legacyMigrationSuffix - if err := requireDestinationAbsent(tempDir); err != nil { - return "", err - } - return tempDir, nil -} - -func openLegacySourceDBs(logsPath, stablePath string) (logsDB *bbolt.DB, stableDB *bbolt.DB, closeFn func(), err error) { - logsDB, err = openLegacyBoltReadOnly(logsPath) - if err != nil { - return nil, nil, nil, err - } - - stableDB, err = openLegacyBoltReadOnly(stablePath) - if err != nil { - _ = logsDB.Close() - return nil, nil, nil, err - } - - closeFn = func() { - _ = stableDB.Close() - _ = logsDB.Close() - } - return logsDB, stableDB, closeFn, nil -} - -func migrateLegacyBoltToTempDir(logsDB, stableDB *bbolt.DB, tempDir string) (*MigrationStats, error) { - store, err := NewPebbleStore(tempDir) - if err != nil { - return nil, err - } - - cleanupTemp := func() { - _ = store.Close() - _ = os.RemoveAll(tempDir) - } - - stats, err := migrateLegacyBoltData(logsDB, stableDB, store) - if err != nil { - cleanupTemp() - return nil, err - } - if err := store.Close(); err != nil { - _ = os.RemoveAll(tempDir) - return nil, err - } - return stats, nil -} - -func finalizeMigratedStore(tempDir, destDir string) error { - if err := os.MkdirAll(filepath.Dir(destDir), pebbleDirPerm); err != nil { - _ = os.RemoveAll(tempDir) - return errors.WithStack(err) - } - if err := os.Rename(tempDir, destDir); err != nil { - _ = os.RemoveAll(tempDir) - return errors.WithStack(err) - } - return nil -} - -func migrateLegacyBoltData(logsDB, stableDB *bbolt.DB, dest *PebbleStore) (*MigrationStats, error) { - stats := &MigrationStats{} - - if err := copyLegacyStable(stableDB, dest, stats); err != nil { - return nil, err - } - if err := copyLegacyLogs(logsDB, dest, stats); err != nil { - return nil, err - } - - return stats, nil -} - -func copyLegacyStable(stableDB *bbolt.DB, dest *PebbleStore, stats *MigrationStats) error { - return errors.WithStack(stableDB.View(func(tx *bbolt.Tx) error { - bucket := tx.Bucket([]byte(legacyStableBucket)) - if bucket == nil { - return errors.Newf("legacy stable bucket %q not found", legacyStableBucket) - } - return bucket.ForEach(func(k, v []byte) error { - if err := dest.Set(k, append([]byte(nil), v...)); err != nil { - return err - } - stats.StableKeys++ - return nil - }) - })) -} - -func copyLegacyLogs(logsDB *bbolt.DB, dest *PebbleStore, stats *MigrationStats) error { - batch := make([]*raft.Log, 0, legacyBatchSize) - - flush := func() error { - if len(batch) == 0 { - return nil - } - if err := dest.StoreLogs(batch); err != nil { - return err - } - stats.Logs += uint64(len(batch)) - batch = batch[:0] - return nil - } - - err := logsDB.View(func(tx *bbolt.Tx) error { - bucket := tx.Bucket([]byte(legacyLogsBucket)) - if bucket == nil { - return errors.Newf("legacy logs bucket %q not found", legacyLogsBucket) - } - return bucket.ForEach(func(_, v []byte) error { - var entry raft.Log - if err := decodeLegacyLog(v, &entry); err != nil { - return err - } - batch = append(batch, &entry) - if len(batch) < legacyBatchSize { - return nil - } - return flush() - }) - }) - if err != nil { - return errors.WithStack(err) - } - - return flush() -} - -func openLegacyBoltReadOnly(path string) (*bbolt.DB, error) { - db, err := bbolt.Open(path, legacyBoltFileMode, &bbolt.Options{ReadOnly: true}) - if err != nil { - return nil, errors.WithStack(err) - } - return db, nil -} - -func requireExistingFile(path string) error { - info, err := os.Stat(path) - if err != nil { - return errors.WithStack(err) - } - if info.IsDir() { - return errors.WithStack(errors.Newf("%s is a directory, expected file", path)) - } - return nil -} - -func requireDestinationAbsent(path string) error { - if _, err := os.Stat(path); err == nil { - return errors.WithStack(errors.Newf("destination already exists: %s", path)) - } else if !os.IsNotExist(err) { - return errors.WithStack(err) - } - return nil -} - -func decodeLegacyLog(payload []byte, out *raft.Log) error { - handle := codec.MsgpackHandle{} - decoder := codec.NewDecoder(bytes.NewReader(payload), &handle) - return errors.WithStack(decoder.Decode(out)) -} diff --git a/internal/raftstore/migrate_test.go b/internal/raftstore/migrate_test.go deleted file mode 100644 index a436b1455..000000000 --- a/internal/raftstore/migrate_test.go +++ /dev/null @@ -1,144 +0,0 @@ -package raftstore - -import ( - "bytes" - "encoding/binary" - "path/filepath" - "testing" - "time" - - "github.com/hashicorp/go-msgpack/v2/codec" - "github.com/hashicorp/raft" - "github.com/stretchr/testify/require" - "go.etcd.io/bbolt" -) - -func TestMigrateLegacyBoltDB(t *testing.T) { - baseDir := t.TempDir() - logsPath := filepath.Join(baseDir, "logs.dat") - stablePath := filepath.Join(baseDir, "stable.dat") - destDir := filepath.Join(baseDir, "raft.db") - - require.NoError(t, writeLegacyLogsDB(logsPath, []*raft.Log{ - { - Index: 1, - Term: 2, - Type: raft.LogCommand, - Data: []byte("set alpha"), - Extensions: []byte("ext-a"), - AppendedAt: time.Unix(100, 0).UTC(), - }, - { - Index: 2, - Term: 2, - Type: raft.LogNoop, - Data: []byte("noop"), - Extensions: []byte("ext-b"), - AppendedAt: time.Unix(200, 0).UTC(), - }, - })) - require.NoError(t, writeLegacyStableDB(stablePath, map[string][]byte{ - "CurrentTerm": encodeUint64(5), - "LastVote": []byte("n1"), - })) - - stats, err := MigrateLegacyBoltDB(logsPath, stablePath, destDir) - require.NoError(t, err) - require.Equal(t, &MigrationStats{Logs: 2, StableKeys: 2}, stats) - - store, err := NewPebbleStore(destDir) - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, store.Close()) - }) - - var first raft.Log - require.NoError(t, store.GetLog(1, &first)) - require.Equal(t, uint64(1), first.Index) - require.Equal(t, uint64(2), first.Term) - require.Equal(t, raft.LogCommand, first.Type) - require.Equal(t, []byte("set alpha"), first.Data) - require.Equal(t, []byte("ext-a"), first.Extensions) - require.True(t, first.AppendedAt.Equal(time.Unix(100, 0).UTC())) - - var second raft.Log - require.NoError(t, store.GetLog(2, &second)) - require.Equal(t, raft.LogNoop, second.Type) - require.True(t, second.AppendedAt.Equal(time.Unix(200, 0).UTC())) - - currentTerm, err := store.Get([]byte("CurrentTerm")) - require.NoError(t, err) - require.Equal(t, encodeUint64(5), currentTerm) - - lastVote, err := store.Get([]byte("LastVote")) - require.NoError(t, err) - require.Equal(t, []byte("n1"), lastVote) -} - -func writeLegacyLogsDB(path string, logs []*raft.Log) error { - db, err := bbolt.Open(path, legacyBoltFileMode, nil) - if err != nil { - return err - } - defer func() { _ = db.Close() }() - - return db.Update(func(tx *bbolt.Tx) error { - logsBucket, err := tx.CreateBucketIfNotExists([]byte(legacyLogsBucket)) - if err != nil { - return err - } - if _, err := tx.CreateBucketIfNotExists([]byte(legacyStableBucket)); err != nil { - return err - } - for _, entry := range logs { - payload, err := encodeLegacyLog(entry) - if err != nil { - return err - } - if err := logsBucket.Put(encodeUint64(entry.Index), payload); err != nil { - return err - } - } - return nil - }) -} - -func writeLegacyStableDB(path string, values map[string][]byte) error { - db, err := bbolt.Open(path, legacyBoltFileMode, nil) - if err != nil { - return err - } - defer func() { _ = db.Close() }() - - return db.Update(func(tx *bbolt.Tx) error { - if _, err := tx.CreateBucketIfNotExists([]byte(legacyLogsBucket)); err != nil { - return err - } - stableBucket, err := tx.CreateBucketIfNotExists([]byte(legacyStableBucket)) - if err != nil { - return err - } - for key, value := range values { - if err := stableBucket.Put([]byte(key), value); err != nil { - return err - } - } - return nil - }) -} - -func encodeLegacyLog(entry *raft.Log) ([]byte, error) { - var buf bytes.Buffer - handle := codec.MsgpackHandle{} - encoder := codec.NewEncoder(&buf, &handle) - if err := encoder.Encode(entry); err != nil { - return nil, err - } - return buf.Bytes(), nil -} - -func encodeUint64(v uint64) []byte { - out := make([]byte, 8) - binary.BigEndian.PutUint64(out, v) - return out -} diff --git a/proxy/pubsub.go b/proxy/pubsub.go index b20de21df..1ecd9e284 100644 --- a/proxy/pubsub.go +++ b/proxy/pubsub.go @@ -31,6 +31,8 @@ const ( cmdDiscard = "DISCARD" cmdPing = "PING" cmdQuit = "QUIT" + cmdAuth = "AUTH" + cmdSelect = "SELECT" // cleanupFwdTimeout bounds the wait for forwardMessages to exit during cleanup. // If the client socket is stuck, we don't want to block indefinitely. @@ -325,13 +327,13 @@ func (s *pubsubSession) handleTxnInSession(name string, args [][]byte) bool { // (not forwarded to a backend). Returns true if the command was handled. func (s *pubsubSession) handleProxySpecialCommand(name string, args [][]byte) bool { // AUTH is handled at the connection-pool level; accept silently. - if name == "AUTH" { + if name == cmdAuth { s.writeString("OK") return true } // Mirror ProxyServer's SELECT handling: accept only the configured DB to // avoid per-connection DB state with pooled connections. - if name != "SELECT" { + if name != cmdSelect { return false } // Enforce Redis arity: SELECT requires exactly one DB index argument. diff --git a/store/lsm_store.go b/store/lsm_store.go index 1372a6a20..9f7ebcf79 100644 --- a/store/lsm_store.go +++ b/store/lsm_store.go @@ -5,7 +5,6 @@ import ( "bytes" "context" "encoding/binary" - "encoding/gob" "hash" "hash/crc32" "io" @@ -201,7 +200,7 @@ func decodeKeyView(k []byte) ([]byte, uint64) { return k[:keyLen], ^invTs } -// Value encoding: We use gob to encode VersionedValue structure minus the key/ts which are in the key. +// Value encoding: fixed binary header [Tombstone(1)][ExpireAt(8)] followed by raw value bytes; key and timestamp are encoded in the SST key. type storedValue struct { Value []byte Tombstone bool @@ -1464,14 +1463,7 @@ func (s *pebbleStore) Restore(r io.Reader) error { // verification succeeds. return s.restoreFromStreamingMVCC(br) default: - // Legacy gob format: restoreFromLegacyGob performs an atomic - // temp-dir swap similarly. - // NOTE: Streams without a recognised magic header are assumed to be - // in the legacy gob format. Snapshots produced by older Pebble - // implementations that did not include a magic prefix are therefore - // not supported by this dispatcher and will typically surface as a - // gob decode error rather than being restored successfully. - return s.restoreFromLegacyGob(br) + return errors.WithStack(errors.Newf("unrecognized snapshot format: unknown magic header")) } } @@ -1783,155 +1775,6 @@ func (s *pebbleStore) swapInTempDB(tmpDir string) error { return nil } -// restoreLegacyGobToTempDB writes the decoded gob snapshot into a temporary -// Pebble directory sibling to s.dir and atomically swaps it into place. -func (s *pebbleStore) restoreLegacyGobToTempDB(entries []mvccSnapshotEntry, lastCommitTS uint64, minRetainedTS uint64) error { - tmpDir := filepath.Clean(s.dir) + ".legacy-tmp" - if err := os.RemoveAll(tmpDir); err != nil { - return errors.WithStack(err) - } - tmpDB, err := pebble.Open(tmpDir, defaultPebbleOptions()) - if err != nil { - return errors.WithStack(err) - } - if err := writeGobEntriesToDB(entries, tmpDB); err != nil { - _ = tmpDB.Close() - _ = os.RemoveAll(tmpDir) - return err - } - - if err := writeTempDBMetadata(tmpDB, lastCommitTS, minRetainedTS); err != nil { - _ = tmpDB.Close() - _ = os.RemoveAll(tmpDir) - return err - } - - if err := tmpDB.Close(); err != nil { - _ = os.RemoveAll(tmpDir) - return errors.WithStack(err) - } - return s.swapInTempDB(tmpDir) -} - -// restoreFromLegacyGob restores from the legacy gob-encoded MVCCStore -// snapshot format (gob payload + CRC32 trailer). -// -// The CRC32 is computed in a single pass while spooling the gob payload to a -// temporary file co-located with s.dir. The CRC32 trailer is NOT written to -// the spool file — only the pure gob payload is stored, so the decoder can -// read the spool file directly without needing a LimitReader. Entries are -// written into a temporary Pebble directory and only swapped into place after -// decoding succeeds, preserving the existing store on failure. -func (s *pebbleStore) restoreFromLegacyGob(r io.Reader) error { - snapshot, err := spoolAndDecodeGobSnapshot(r, s.dir) - if err != nil { - return err - } - if err := s.restoreLegacyGobToTempDB(snapshot.Entries, snapshot.LastCommitTS, snapshot.MinRetainedTS); err != nil { - return err - } - return nil -} - -// spoolAndDecodeGobSnapshot streams r into a spool file co-located with dir, -// verifies the CRC32 trailer, then gob-decodes the payload and returns the -// snapshot. The spool file is always cleaned up before returning. -func spoolAndDecodeGobSnapshot(r io.Reader, dir string) (mvccSnapshot, error) { - tmpFile, err := os.CreateTemp(dir, "ekv-legacy-gob-*.tmp") - if err != nil { - return mvccSnapshot{}, errors.WithStack(err) - } - tmpPath := tmpFile.Name() - closeTmp := func() { - _ = tmpFile.Close() - _ = os.Remove(tmpPath) - } - - if err := spoolGobPayload(r, tmpFile); err != nil { - closeTmp() - return mvccSnapshot{}, err - } - - if _, err := tmpFile.Seek(0, io.SeekStart); err != nil { - closeTmp() - return mvccSnapshot{}, errors.WithStack(err) - } - var snapshot mvccSnapshot - if err := gob.NewDecoder(tmpFile).Decode(&snapshot); err != nil { - closeTmp() - return mvccSnapshot{}, errors.WithStack(err) - } - // Close and remove the spool file before swapInTempDB removes dir. - closeTmp() - return snapshot, nil -} - -// spoolGobPayload streams r into dst, stripping the final checksumSize-byte -// CRC32 trailer. The payload bytes are written to dst and hashed; the trailer -// is verified but not written. Returns ErrInvalidChecksum on mismatch or a -// truncated stream. -func spoolGobPayload(r io.Reader, dst io.Writer) error { - hasher := crc32.NewIEEE() - buf := make([]byte, spoolBufSize) - var tail []byte - for { - n, readErr := r.Read(buf) - if n > 0 { - tail = append(tail, buf[:n]...) - if len(tail) > checksumSize { - toProcessLen := len(tail) - checksumSize - toProcess := tail[:toProcessLen] - if _, err := dst.Write(toProcess); err != nil { - return errors.WithStack(err) - } - _, _ = hasher.Write(toProcess) - tail = append([]byte(nil), tail[toProcessLen:]...) - } - } - if readErr == io.EOF { - break - } - if readErr != nil { - return errors.WithStack(readErr) - } - } - if len(tail) != checksumSize { - return errors.WithStack(ErrInvalidChecksum) - } - if hasher.Sum32() != binary.LittleEndian.Uint32(tail) { - return errors.WithStack(ErrInvalidChecksum) - } - return nil -} - -// writeGobEntriesToDB writes the decoded gob snapshot entries into db using -// batched commits. NOTE: this function mutates entries in-place by clearing -// each version's Value and then nilling the entry's Key and Versions slice -// after encoding to reduce peak memory usage. Callers must not reuse entries -// after this call. -func writeGobEntriesToDB(entries []mvccSnapshotEntry, db *pebble.DB) error { - batch := db.NewBatch() - for i := range entries { - entry := &entries[i] - for j := range entry.Versions { - version := entry.Versions[j] - if err := setEncodedVersionInBatch(batch, entry.Key, version); err != nil { - _ = batch.Close() - return err - } - entry.Versions[j].Value = nil - if snapshotBatchShouldFlush(batch) { - if err := flushSnapshotBatch(db, &batch, pebble.NoSync); err != nil { - return err - } - } - } - entry.Key = nil - entry.Versions = nil - } - return commitSnapshotBatch(batch, pebble.Sync) -} - func (s *pebbleStore) Close() error { s.maintenanceMu.Lock() defer s.maintenanceMu.Unlock() diff --git a/store/lsm_store_test.go b/store/lsm_store_test.go index a5411da34..42a1b5c98 100644 --- a/store/lsm_store_test.go +++ b/store/lsm_store_test.go @@ -648,71 +648,6 @@ func TestPebbleStore_RestoreFromStreamingMVCCPreservesMinRetainedTS(t *testing.T require.ErrorIs(t, err, ErrReadTSCompacted) } -// TestPebbleStore_RestoreFromLegacyGob verifies that a pebbleStore can -// restore from the oldest gob-based mvccStore snapshot format. -func TestPebbleStore_RestoreFromLegacyGob(t *testing.T) { - ctx := context.Background() - - // Build a legacy gob snapshot manually. - src := NewMVCCStore() - require.NoError(t, src.PutAt(ctx, []byte("a"), []byte("1"), 5, 0)) - require.NoError(t, src.PutAt(ctx, []byte("b"), []byte("2"), 10, 0)) - - srcImpl, ok := src.(*mvccStore) - require.True(t, ok, "expected *mvccStore") - var buf bytes.Buffer - require.NoError(t, srcImpl.writeLegacyGobSnapshot(&buf)) - - // Restore into pebbleStore. - dir, err := os.MkdirTemp("", "pebble-migrate-gob-*") - require.NoError(t, err) - defer os.RemoveAll(dir) - - dst, err := NewPebbleStore(dir) - require.NoError(t, err) - defer dst.Close() - - require.NoError(t, dst.Restore(bytes.NewReader(buf.Bytes()))) - - val, err := dst.GetAt(ctx, []byte("a"), 5) - require.NoError(t, err) - assert.Equal(t, []byte("1"), val) - - val, err = dst.GetAt(ctx, []byte("b"), 10) - require.NoError(t, err) - assert.Equal(t, []byte("2"), val) - - assert.Equal(t, src.LastCommitTS(), dst.LastCommitTS()) -} - -func TestPebbleStore_RestoreFromLegacyGobPreservesMinRetainedTS(t *testing.T) { - ctx := context.Background() - - src := NewMVCCStore() - require.NoError(t, src.PutAt(ctx, []byte("k"), []byte("v10"), 10, 0)) - require.NoError(t, src.PutAt(ctx, []byte("k"), []byte("v20"), 20, 0)) - require.NoError(t, src.Compact(ctx, 20)) - - srcImpl, ok := src.(*mvccStore) - require.True(t, ok) - var buf bytes.Buffer - require.NoError(t, srcImpl.writeLegacyGobSnapshot(&buf)) - - dir, err := os.MkdirTemp("", "pebble-migrate-gob-retained-*") - require.NoError(t, err) - defer os.RemoveAll(dir) - - dst, err := NewPebbleStore(dir) - require.NoError(t, err) - defer dst.Close() - - require.NoError(t, dst.Restore(bytes.NewReader(buf.Bytes()))) - require.Equal(t, uint64(20), requirePebbleRetentionController(t, dst).MinRetainedTS()) - - _, err = dst.GetAt(ctx, []byte("k"), 10) - require.ErrorIs(t, err, ErrReadTSCompacted) -} - // TestPebbleStore_Restore_EmptySnapshot verifies that restoring from an // empty reader clears the DB and resets lastCommitTS to zero. func TestPebbleStore_Restore_EmptySnapshot(t *testing.T) { @@ -758,38 +693,6 @@ func TestPebbleStore_Restore_TruncatedHeader(t *testing.T) { assert.Contains(t, err.Error(), "truncated snapshot") } -// TestPebbleStore_Restore_LegacyGobCRCFailure verifies that a gob-encoded -// snapshot with an invalid CRC32 trailer is rejected. -func TestPebbleStore_Restore_LegacyGobCRCFailure(t *testing.T) { - ctx := context.Background() - - dir, err := os.MkdirTemp("", "pebble-restore-crc-*") - require.NoError(t, err) - defer os.RemoveAll(dir) - - s, err := NewPebbleStore(dir) - require.NoError(t, err) - defer s.Close() - - // Build a valid legacy gob snapshot, then corrupt the CRC trailer. - src := NewMVCCStore() - require.NoError(t, src.PutAt(ctx, []byte("k"), []byte("v"), 5, 0)) - srcImpl, ok := src.(*mvccStore) - require.True(t, ok) - var buf bytes.Buffer - require.NoError(t, srcImpl.writeLegacyGobSnapshot(&buf)) - - // Flip the last 4 bytes (the CRC) to corrupt the checksum. - data := buf.Bytes() - for i := len(data) - 4; i < len(data); i++ { - data[i] ^= 0xFF - } - - err = s.Restore(bytes.NewReader(data)) - require.Error(t, err) - assert.ErrorIs(t, err, ErrInvalidChecksum) -} - // TestPebbleStore_Restore_PebbleMagicMismatch verifies that a stream // dispatched as native Pebble but containing a wrong magic header is rejected. func TestPebbleStore_Restore_PebbleMagicMismatch(t *testing.T) { diff --git a/store/mvcc_legacy_snapshot_test.go b/store/mvcc_legacy_snapshot_test.go deleted file mode 100644 index 4a3b8a491..000000000 --- a/store/mvcc_legacy_snapshot_test.go +++ /dev/null @@ -1,42 +0,0 @@ -package store - -import ( - "bytes" - "encoding/binary" - "encoding/gob" - "hash/crc32" - "io" - - "github.com/cockroachdb/errors" -) - -// writeLegacyGobSnapshot writes the old gob-encoded snapshot format used only -// in migration tests. The format is: gob(mvccSnapshot) + CRC32(LE). -func (s *mvccStore) writeLegacyGobSnapshot(w io.Writer) error { - s.mtx.RLock() - snap := mvccSnapshot{ - LastCommitTS: s.lastCommitTS, - MinRetainedTS: s.minRetainedTS, - } - iter := s.tree.Iterator() - for iter.Next() { - key, _ := iter.Key().([]byte) - versions, _ := iter.Value().([]VersionedValue) - snap.Entries = append(snap.Entries, mvccSnapshotEntry{ - Key: bytes.Clone(key), - Versions: append([]VersionedValue(nil), versions...), - }) - } - s.mtx.RUnlock() - - var buf bytes.Buffer - if err := gob.NewEncoder(&buf).Encode(&snap); err != nil { - return errors.WithStack(err) - } - payload := buf.Bytes() - checksum := crc32.ChecksumIEEE(payload) - if _, err := w.Write(payload); err != nil { - return errors.WithStack(err) - } - return errors.WithStack(binary.Write(w, binary.LittleEndian, checksum)) -} diff --git a/store/mvcc_store.go b/store/mvcc_store.go index 18a9638f7..1de133c7b 100644 --- a/store/mvcc_store.go +++ b/store/mvcc_store.go @@ -5,7 +5,6 @@ import ( "bytes" "context" "encoding/binary" - "encoding/gob" "hash/crc32" "io" "log/slog" @@ -26,7 +25,6 @@ type VersionedValue struct { } const ( - checksumSize = 4 mvccSnapshotVersion = uint32(1) maxSnapshotKeySize = 1 << 20 // 1 MiB per key maxSnapshotVersionCount = 1 << 20 // 1M versions per key @@ -34,27 +32,12 @@ const ( // maxSnapshotValueSize caps the allowed size of a single value during streaming // snapshot restore and write paths to prevent OOM from malformed or adversarial -// snapshots. It does not apply to legacy gob-backed snapshot restore. -// Declared as a var so tests can temporarily lower the limit without allocating -// hundreds of MiB. +// snapshots. Declared as a var so tests can temporarily lower the limit without +// allocating hundreds of MiB. var maxSnapshotValueSize = 256 << 20 // 256 MiB var mvccSnapshotMagic = [8]byte{'E', 'K', 'V', 'M', 'V', 'C', 'C', '2'} -// mvccSnapshot is retained for backward compatibility with older gob-backed -// snapshots that were materialized fully in memory. -type mvccSnapshot struct { - LastCommitTS uint64 - MinRetainedTS uint64 - Entries []mvccSnapshotEntry -} - -// mvccSnapshotEntry is used solely for gob snapshot serialization. -type mvccSnapshotEntry struct { - Key []byte - Versions []VersionedValue -} - type compactEntry struct { key []byte } @@ -615,12 +598,14 @@ func (s *mvccStore) Snapshot() (Snapshot, error) { func (s *mvccStore) Restore(r io.Reader) error { br := bufio.NewReader(r) - if streaming, err := isStreamingMVCCSnapshot(br); err != nil { + streaming, err := isStreamingMVCCSnapshot(br) + if err != nil { return errors.WithStack(err) - } else if streaming { - return s.restoreStreamingSnapshot(br) } - return s.restoreLegacySnapshot(br) + if !streaming { + return errors.New("unrecognized snapshot format: unknown magic header") + } + return s.restoreStreamingSnapshot(br) } func isStreamingMVCCSnapshot(r *bufio.Reader) (bool, error) { @@ -634,39 +619,6 @@ func isStreamingMVCCSnapshot(r *bufio.Reader) (bool, error) { return bytes.Equal(header, mvccSnapshotMagic[:]), nil } -func (s *mvccStore) restoreLegacySnapshot(r io.Reader) error { - data, err := io.ReadAll(r) - if err != nil { - return errors.WithStack(err) - } - if len(data) < checksumSize { - return errors.WithStack(ErrInvalidChecksum) - } - payload := data[:len(data)-checksumSize] - expected := binary.LittleEndian.Uint32(data[len(data)-checksumSize:]) - if crc32.ChecksumIEEE(payload) != expected { - return errors.WithStack(ErrInvalidChecksum) - } - - var snapshot mvccSnapshot - if err := gob.NewDecoder(bytes.NewReader(payload)).Decode(&snapshot); err != nil { - return errors.WithStack(err) - } - - s.mtx.Lock() - defer s.mtx.Unlock() - - s.tree.Clear() - s.lastCommitTS = snapshot.LastCommitTS - s.minRetainedTS = snapshot.MinRetainedTS - for _, entry := range snapshot.Entries { - versions := append([]VersionedValue(nil), entry.Versions...) - s.tree.Put(bytes.Clone(entry.Key), versions) - } - - return nil -} - func (s *mvccStore) writeSnapshotFile(f *os.File) error { checksumOffset, err := writeMVCCSnapshotHeader(f) if err != nil { diff --git a/store/mvcc_store_snapshot_test.go b/store/mvcc_store_snapshot_test.go index 75ce1489d..3ebdbcac8 100644 --- a/store/mvcc_store_snapshot_test.go +++ b/store/mvcc_store_snapshot_test.go @@ -3,9 +3,6 @@ package store import ( "bytes" "context" - "encoding/binary" - "encoding/gob" - "hash/crc32" "testing" "github.com/stretchr/testify/require" @@ -84,43 +81,6 @@ func TestMVCCStore_ApplyMutations_UnknownOp(t *testing.T) { require.ErrorIs(t, err, ErrUnknownOp) } -func TestMVCCStore_RestoreLegacySnapshot(t *testing.T) { - t.Parallel() - - ctx := context.Background() - - // Build a legacy gob+crc32 snapshot payload. - legacy := mvccSnapshot{ - LastCommitTS: 42, - MinRetainedTS: 10, - Entries: []mvccSnapshotEntry{ - { - Key: []byte("legacy-key"), - Versions: []VersionedValue{ - {TS: 42, Value: []byte("legacy-value"), Tombstone: false, ExpireAt: 0}, - }, - }, - }, - } - - var payload bytes.Buffer - require.NoError(t, gob.NewEncoder(&payload).Encode(legacy)) - - checksum := crc32.ChecksumIEEE(payload.Bytes()) - var cs [4]byte - binary.LittleEndian.PutUint32(cs[:], checksum) - full := append(payload.Bytes(), cs[:]...) - - dst := newTestMVCCStore(t) - require.NoError(t, dst.Restore(bytes.NewReader(full))) - - require.Equal(t, uint64(42), dst.LastCommitTS()) - - v, err := dst.GetAt(ctx, []byte("legacy-key"), 100) - require.NoError(t, err) - require.Equal(t, []byte("legacy-value"), v) -} - func snapshotBytes(t *testing.T, snap Snapshot) []byte { t.Helper()