Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/jepsen-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,28 @@ jobs:
curl -L https://github.com/technomancy/leiningen/stable/bin/lein > ~/lein
chmod +x ~/lein
~/lein version
- name: Pre-fetch Go modules
- name: Pre-fetch Go modules and build binary
run: |
mkdir -p "$GOCACHE" /tmp/go-tmp
GOPATH=$(go env GOPATH)
export GOCACHE GOTMPDIR=/tmp/go-tmp
go mod download
go build -o /tmp/elastickv-bin .
- name: Run Jepsen unit tests
working-directory: jepsen
run: ~/lein test
- name: Launch etcd-backed cluster
run: |
set -euo pipefail
mkdir -p "$GOCACHE" /tmp/go-tmp /tmp/elastickv-ci
export GOTMPDIR=/tmp/go-tmp
mkdir -p /tmp/elastickv-ci
BOOTSTRAP_MEMBERS="n1=127.0.0.1:50051,n2=127.0.0.1:50052,n3=127.0.0.1:50053"
RAFT_REDIS_MAP="127.0.0.1:50051=127.0.0.1:63791,127.0.0.1:50052=127.0.0.1:63792,127.0.0.1:50053=127.0.0.1:63793"
RAFT_S3_MAP="127.0.0.1:50051=127.0.0.1:63901,127.0.0.1:50052=127.0.0.1:63902,127.0.0.1:50053=127.0.0.1:63903"
RAFT_DYNAMO_MAP="127.0.0.1:50051=127.0.0.1:63801,127.0.0.1:50052=127.0.0.1:63802,127.0.0.1:50053=127.0.0.1:63803"

: > /tmp/elastickv-demo.pid
for node in 1 2 3; do
nohup go run . \
nohup /tmp/elastickv-bin \
--address "127.0.0.1:5005${node}" \
--redisAddress "127.0.0.1:6379${node}" \
--dynamoAddress "127.0.0.1:6380${node}" \
Expand Down
56 changes: 0 additions & 56 deletions adapter/dynamodb_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/bootjp/elastickv/kv"
"github.com/bootjp/elastickv/store"
json "github.com/goccy/go-json"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -187,61 +186,6 @@ func TestDynamoDB_EnsureLegacyTableMigration_MigratesLegacyGeneration(t *testing
}, time.Second, 10*time.Millisecond)
}

func TestDynamoDB_EnsureLegacyTableMigration_NormalizesLegacyGSIJSONFormat(t *testing.T) {
t.Parallel()

legacySchema, server, st := newLegacyMigrationTestServer(t, true, "S")
writer := newDynamoFixtureWriter(t, st)

legacyBody, err := json.Marshal(map[string]any{
"table_name": legacySchema.TableName,
"attribute_definitions": legacySchema.AttributeDefinitions,
"primary_key": map[string]any{
"hash_key": legacySchema.PrimaryKey.HashKey,
"range_key": legacySchema.PrimaryKey.RangeKey,
},
"global_secondary_indexes": map[string]any{
"status-index": map[string]any{
"hash_key": "status",
"range_key": "sk",
},
},
"generation": legacySchema.Generation,
})
require.NoError(t, err)
writer.put(dynamoTableMetaKey(legacySchema.TableName), legacyBody)
writer.put(dynamoTableGenerationKey(legacySchema.TableName), fmt.Appendf(nil, "%d", legacySchema.Generation))

writer.writeItem(legacySchema, map[string]attributeValue{
"pk": newStringAttributeValue("tenant"),
"sk": newStringAttributeValue("2026-03-09T12:00:00Z"),
"status": newStringAttributeValue("open"),
"value": newStringAttributeValue("v1"),
})

ctx := context.Background()
require.NoError(t, server.ensureLegacyTableMigration(ctx, legacySchema.TableName))

schema, exists, err := server.loadTableSchema(ctx, legacySchema.TableName)
require.NoError(t, err)
require.True(t, exists)
require.Equal(t, "status", schema.GlobalSecondaryIndexes["status-index"].KeySchema.HashKey)
require.Equal(t, "sk", schema.GlobalSecondaryIndexes["status-index"].KeySchema.RangeKey)
require.Equal(t, "ALL", schema.GlobalSecondaryIndexes["status-index"].Projection.ProjectionType)

out, err := server.queryItems(ctx, queryInput{
TableName: legacySchema.TableName,
IndexName: "status-index",
KeyConditionExpression: "status = :status",
ExpressionAttributeValues: map[string]attributeValue{
":status": newStringAttributeValue("open"),
},
})
require.NoError(t, err)
require.Len(t, out.items, 1)
require.Equal(t, newStringAttributeValue("v1"), out.items[0]["value"])
}

func TestDynamoDB_EnsureLegacyTableMigration_PrefersExistingTargetItems(t *testing.T) {
t.Parallel()

Expand Down
44 changes: 17 additions & 27 deletions adapter/dynamodb_storage_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

pb "github.com/bootjp/elastickv/proto"
"github.com/cockroachdb/errors"
json "github.com/goccy/go-json"
gproto "google.golang.org/protobuf/proto"
)

Expand All @@ -18,6 +17,7 @@ var (
storedDynamoMarshalOptions = gproto.MarshalOptions{Deterministic: true}

errStoredDynamoMessageTooLarge = errors.New("stored dynamo message too large")
errUnrecognizedStoredDynamoFormat = errors.New("unrecognized stored dynamo format")
errNilDynamoTableSchema = errors.New("nil dynamo table schema")
errInvalidDynamoKeyEncodingVersion = errors.New("invalid key encoding version")
errDynamoKeyEncodingVersionOverflow = errors.New("dynamo key encoding version overflows int")
Expand Down Expand Up @@ -58,20 +58,15 @@ func encodeStoredDynamoTableSchema(schema *dynamoTableSchema) ([]byte, error) {
}

func decodeStoredDynamoTableSchema(b []byte) (*dynamoTableSchema, error) {
if hasStoredDynamoPrefix(b, storedDynamoSchemaProtoPrefix) {
msg := &pb.DynamoTableSchema{}
if err := gproto.Unmarshal(b[len(storedDynamoSchemaProtoPrefix):], msg); err != nil {
return nil, errors.WithStack(err)
}
schema, err := dynamoTableSchemaFromProto(msg)
if err != nil {
return nil, errors.WithStack(err)
}
return schema, nil
if !hasStoredDynamoPrefix(b, storedDynamoSchemaProtoPrefix) {
return nil, errUnrecognizedStoredDynamoFormat
}

schema := &dynamoTableSchema{}
if err := json.Unmarshal(b, schema); err != nil {
msg := &pb.DynamoTableSchema{}
if err := gproto.Unmarshal(b[len(storedDynamoSchemaProtoPrefix):], msg); err != nil {
return nil, errors.WithStack(err)
}
schema, err := dynamoTableSchemaFromProto(msg)
if err != nil {
return nil, errors.WithStack(err)
}
return schema, nil
Comment on lines 60 to 72

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Search decoder call sites and error handling context:"
rg -n -C3 'decodeStoredDynamo(TableSchema|Item)\s*\(' --type go

echo
echo "Search remaining legacy JSON references in DynamoDB codec/tests:"
rg -n -C3 'LegacyJSON|JSONFallback|json\.Unmarshal|go-json|storedDynamo.*JSON|Dynamo.*JSON' --type go

Repository: bootjp/elastickv

Length of output: 28924


🏁 Script executed:

cat -n adapter/dynamodb_storage_codec.go | head -150

Repository: bootjp/elastickv

Length of output: 6936


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Check git history of codec file ==="
git log --oneline -20 adapter/dynamodb_storage_codec.go

echo -e "\n=== Search for migration, legacy, or rollout-related comments ==="
rg -i 'migration|legacy|rollout|break.*change|compat' adapter/dynamodb_storage_codec.go

echo -e "\n=== Check test file for legacy format tests ==="
cat -n adapter/dynamodb_storage_codec_test.go

Repository: bootjp/elastickv

Length of output: 250


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== View test file ==="
cat -n adapter/dynamodb_storage_codec_test.go

echo -e "\n=== Show git diff/changes in the commit ==="
git show 9720786 --stat | head -30

Repository: bootjp/elastickv

Length of output: 3800


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Show full commit message and changes ==="
git show 9720786 --no-patch --format=fuller

echo -e "\n=== Check if there are migration guides or rollout docs ==="
fd -type f \( -name "MIGRATION*" -o -name "ROLLOUT*" -o -name "CHANGELOG*" \) | head -20

echo -e "\n=== Search for migration-related files or scripts ==="
rg -l 'migrate|migration' --type go | grep -i migrate | head -10

Repository: bootjp/elastickv

Length of output: 1331


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Search for MIGRATION or rollout documentation ==="
find . -maxdepth 2 -type f \( -name "*MIGRATION*" -o -name "*ROLLOUT*" -o -name "*CHANGE*" \)

echo -e "\n=== Check PR/commit details for rollout or test evidence ==="
git log --all --oneline | grep -i "migrat\|rollout\|legacy" | head -20

echo -e "\n=== Search for test helpers or migration test evidence ==="
rg 'TestMigration|LegacyJSON|migration.*test' --type go | head -20

echo -e "\n=== Check if this PR includes Jepsen or comprehensive test results ==="
rg 'Jepsen|test.*evidence|test.*result' adapter/dynamodb_storage_codec.go adapter/dynamodb_storage_codec_test.go

Repository: bootjp/elastickv

Length of output: 2107


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Search for DynamoDB-specific migration/flush helpers ==="
rg -i 'flush.*dynamo|dynamo.*flush|dynamo.*migrat' --type go

echo -e "\n=== Check what was in the legacy JSON fallback (show git diff details) ==="
git show 9720786 -- 'adapter/dynamodb_storage_codec.go' | head -200

echo -e "\n=== Verify current tests reject unknown format ==="
rg -A5 'errUnrecognizedStoredDynamoFormat' --type go

Repository: bootjp/elastickv

Length of output: 11926


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Check git history of old codec file (what was replaced) ==="
git log --all --oneline --follow -- 'adapter/dynamodb_storage_codec.go' | head -20

echo -e "\n=== Search for tests that validate legacy format rejection ==="
rg -B5 -A10 'LegacyTableMigration.*Test|legacy.*schema.*codec' adapter/dynamodb_migration_test.go | head -100

echo -e "\n=== Verify if migration tests include codec-format migration ==="
rg 'encodeStoredDynamo|decodeStoredDynamo' adapter/dynamodb_migration_test.go

Repository: bootjp/elastickv

Length of output: 986


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Show that legacy JSON compatibility was previously supported ==="
git show 7540a41:adapter/dynamodb_storage_codec.go | head -150

echo -e "\n=== Check if migration tests validate codec format handling ==="
cat -n adapter/dynamodb_migration_test.go | head -150

Repository: bootjp/elastickv

Length of output: 10646


Rollout verification required: JSON format data will become unreadable after this change.

This PR removes JSON fallback decoders for DynamoDB schemas and items (previously supported in commit 7540a41). Any persisted legacy JSON payloads will now fail immediately with errUnrecognizedStoredDynamoFormat.

Before rollout, confirm:

  • All existing stored JSON schemas and items have been migrated to protobuf format or are safe to reject
  • Appropriate test coverage validates that legacy JSON format is explicitly rejected (not silently ignored)
  • Rollout/migration plan is documented, including any Jepsen testing if applicable

The migration code in dynamodb_migration_test.go handles key encoding migration (generation transitions), not format migration (JSON→proto), so separate verification of data format coverage is needed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/dynamodb_storage_codec.go` around lines 60 - 72,
decodeStoredDynamoTableSchema now rejects legacy JSON payloads (returning
errUnrecognizedStoredDynamoFormat); before rolling out, add an explicit
unit/integration test in dynamodb_migration_test.go that writes a legacy
JSON-encoded DynamoTableSchema and asserts decodeStoredDynamoTableSchema returns
errUnrecognizedStoredDynamoFormat, add a migration verification step or script
that scans persisted storage for any JSON-formatted schema/item records and
fails CI if any are found, and update the rollout/migration docs to describe the
required protobuf migration (including Jepsen or other validation if applicable)
so operators can confirm no legacy JSON remains.

Expand All @@ -86,20 +81,15 @@ func encodeStoredDynamoItem(item map[string]attributeValue) ([]byte, error) {
}

func decodeStoredDynamoItem(b []byte) (map[string]attributeValue, error) {
if hasStoredDynamoPrefix(b, storedDynamoItemProtoPrefix) {
msg := &pb.DynamoItem{}
if err := gproto.Unmarshal(b[len(storedDynamoItemProtoPrefix):], msg); err != nil {
return nil, errors.WithStack(err)
}
item, err := dynamoItemFromProto(msg)
if err != nil {
return nil, errors.WithStack(err)
}
return item, nil
if !hasStoredDynamoPrefix(b, storedDynamoItemProtoPrefix) {
return nil, errUnrecognizedStoredDynamoFormat
}

item := map[string]attributeValue{}
if err := json.Unmarshal(b, &item); err != nil {
msg := &pb.DynamoItem{}
if err := gproto.Unmarshal(b[len(storedDynamoItemProtoPrefix):], msg); err != nil {
return nil, errors.WithStack(err)
}
item, err := dynamoItemFromProto(msg)
if err != nil {
return nil, errors.WithStack(err)
}
return item, nil
Expand Down
49 changes: 0 additions & 49 deletions adapter/dynamodb_storage_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package adapter
import (
"testing"

json "github.com/goccy/go-json"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -43,23 +42,6 @@ func TestStoredDynamoItemCodec_RoundTripProto(t *testing.T) {
require.Equal(t, item, decoded)
}

func TestStoredDynamoItemCodec_LegacyJSONFallback(t *testing.T) {
t.Parallel()

boolTrue := true
legacy := map[string]attributeValue{
"pk": newStringAttributeValue("tenant#1"),
"active": {BOOL: &boolTrue},
}

body, err := json.Marshal(legacy)
require.NoError(t, err)

decoded, err := decodeStoredDynamoItem(body)
require.NoError(t, err)
require.Equal(t, legacy, decoded)
}

func TestStoredDynamoItemCodec_NormalizesNullTrue(t *testing.T) {
t.Parallel()

Expand All @@ -77,37 +59,6 @@ func TestStoredDynamoItemCodec_NormalizesNullTrue(t *testing.T) {
require.True(t, *decoded["gone"].NULL)
}

func TestStoredDynamoTableSchemaCodec_LegacyJSONFallback(t *testing.T) {
t.Parallel()

body, err := json.Marshal(map[string]any{
"table_name": "threads",
"attribute_definitions": map[string]string{
"threadId": "S",
"createdAt": "S",
"status": "S",
},
"primary_key": map[string]any{
"hash_key": "threadId",
},
"global_secondary_indexes": map[string]any{
"status-index": map[string]any{
"hash_key": "status",
"range_key": "createdAt",
},
},
"generation": 7,
})
require.NoError(t, err)

schema, err := decodeStoredDynamoTableSchema(body)
require.NoError(t, err)
require.Equal(t, "threads", schema.TableName)
require.Equal(t, "status", schema.GlobalSecondaryIndexes["status-index"].KeySchema.HashKey)
require.Equal(t, "createdAt", schema.GlobalSecondaryIndexes["status-index"].KeySchema.RangeKey)
require.Equal(t, "ALL", schema.GlobalSecondaryIndexes["status-index"].Projection.ProjectionType)
}

func stringPtr(v string) *string {
return &v
}
Loading
Loading