From 80b3fcd045da5e0daad58a9b410db47fd7bae1f1 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 30 May 2026 23:09:44 +0900 Subject: [PATCH 1/4] backup: Phase 0b M5-2 - SQS side-record derivation (vis/byage/dedup) Resolves the SQS side-record decision gate the M5-1 (#846) encoder explicitly deferred. Emits the three non-lock derived families the live adapter would have written alongside each !sqs|msg|data| record: - vis (always): !sqs|msg|vis|... with visibleAt = available_at_millis ("visible now"), required for ReceiveMessage post-restore. - byage (always): !sqs|msg|byage|... with sendTs = send_timestamp_millis, required for the reaper to honor MessageRetentionPeriod. - dedup (FIFO + non-empty MessageDedupID only): !sqs|msg|dedup|... with all four sqsFifoDedupRecord fields (MessageID, SendTimestampMs, ExpiresAtMillis = sendTs+5min, OriginalSequence), gating FIFO sends within the dedup window. NOT emitted: !sqs|msg|group| rows. The live loadFifoGroupLock (adapter/sqs_fifo.go:135) treats key presence alone as "lock held"; emitting any value (even a zeroed lock) would falsely indicate an in-flight receive with no owner token, permanently blocking every group until manual cleanup (gemini critical #885). A fresh restore correctly has no rows, and the first post-restore receive on each group acquires the lock cleanly via the normal path. Scope: classic queues only (partition_count = 1). M5-1 already rejects PartitionCount > 1 via ErrSQSEncodeUnsupportedPartitioned at encodeQueue, so the side-record walk is never invoked for partitioned queues. Partitioned-FIFO support requires coordinated decoder+encoder lift not yet sequenced (deferred to M5-3 per the design doc). Implementation: encode_sqs_side.go duplicates the live key constructors (sqsMsgVisKey, sqsMsgByAgeKey, sqsMsgDedupKey) and sqsFifoDedupRecord struct, matching the M3b-3 GSI-helpers pattern. addSideRecords is wired into encodeQueueMessages in the same per-message loop as addMessage so the snapshot stays a single linear pass with no extra disk reads. Tests (encode_sqs_side_test.go): - TestSQSEncodeSideRecordsCrossCheckClassic: byte-identical key+value cross-check vs. live adapter constructors for a 2-message FIFO fixture (the gold-standard pattern the parent doc mandates). - TestSQSEncodeSideRecordsStandardQueueOmitsFIFOFamilies: non-FIFO queue gets vis+byage but no dedup (avoids poisoning the keyspace). - TestSQSEncodeSideRecordsEmptyDedupOmitsDedupRow: FIFO message with empty dedup-id: data+vis+byage emitted, dedup NOT emitted. - TestSQSEncodeSideRecordsNoGroupRows: FIFO queue across 3 distinct groups emits zero !sqs|msg|group| rows (regression pin for gemini critical #885). The two restore round-trip tests in the design doc test plan (TestSQSEncodeFifoRestoreRoundTrip, TestSQSEncodeReaperFindsRestoredMessage) are deferred: they require single-node cluster boot that is heavier infra than this package's existing test pattern, and the cross-check plus negative tests above pin the contract M5-2 is responsible for. Cluster-level restore validation belongs in the M6 CLI's integration tests or Jepsen, not here. Self-review: 1. Data loss: Full reconstruction (not fallback) eliminates the silent-redeliver + invisible-message + retention-leak classes M5-1-only would create. No new failure modes; addSideRecords surfaces every Add error and is wired to the existing per-message walk's error path. 2. Concurrency: Pure offline derivation. No Raft, no HLC, no locks. 3. Performance: One Add per side row, no extra disk reads. Snapshot grows ~2-3x message count, all small fixed-width keys. addMessage's existing loop body inflates by ~5 lines, no extra iterations. 4. Data consistency: Group-row prohibition pinned by test; dedup gating on (FIFO && non-empty dedup-id) pinned by two tests; vis visibleAt = available_at_millis matches "vis zeroed by default" contract from parent design. Cross-check validates byte-identical output against the live writer for a shared fixture. 5. Test coverage: One cross-check + three negative tests (group, standard-queue, empty-dedup) cover the three conditional-emission rules the design enumerates. Round-trip tests deferred per above. --- internal/backup/encode_sqs.go | 3 + internal/backup/encode_sqs_side.go | 123 +++++++++++++ internal/backup/encode_sqs_side_test.go | 222 ++++++++++++++++++++++++ 3 files changed, 348 insertions(+) create mode 100644 internal/backup/encode_sqs_side.go create mode 100644 internal/backup/encode_sqs_side_test.go diff --git a/internal/backup/encode_sqs.go b/internal/backup/encode_sqs.go index b231dd911..d6ff91521 100644 --- a/internal/backup/encode_sqs.go +++ b/internal/backup/encode_sqs.go @@ -211,6 +211,9 @@ func (e *SQSRecordEncoder) encodeQueueMessages(b *snapshotBuilder, root *os.Root if err != nil { return err } + if err := e.addSideRecords(b, meta.Name, meta, records[i]); err != nil { + return err + } if seq > maxSeq { maxSeq = seq } diff --git a/internal/backup/encode_sqs_side.go b/internal/backup/encode_sqs_side.go new file mode 100644 index 000000000..f57741bfe --- /dev/null +++ b/internal/backup/encode_sqs_side.go @@ -0,0 +1,123 @@ +package backup + +import ( + "encoding/binary" + "encoding/json" + + "github.com/cockroachdb/errors" +) + +// sqsFifoDedupWindowMillis mirrors adapter/sqs_fifo.go (5 minutes). Restored +// dedup rows that fall outside this window from the backup's send timestamp +// will be expired the next time the live FIFO send path inspects them. +const sqsFifoDedupWindowMillis int64 = 5 * 60 * 1000 + +// sqsFifoDedupRecord mirrors the live struct at adapter/sqs_fifo.go:25. +// Duplicated here (rather than imported) so the encoder package can run +// without a circular dependency on adapter, matching the pattern M3b-3 +// used for DynamoDB GSI helpers. +type sqsFifoDedupRecord struct { + MessageID string `json:"message_id"` + SendTimestampMs int64 `json:"send_timestamp_ms"` + ExpiresAtMillis int64 `json:"expires_at_millis"` + OriginalSequence uint64 `json:"original_sequence,omitempty"` +} + +// sqsMsgVisKeyBytes reproduces adapter/sqs_messages.go sqsMsgVisKey: +// prefix + base64url(queue) + BE-u64(gen) + BE-u64(visibleAt) + +// base64url(messageID). Negative visibleAt clamps to zero, matching the +// live uint64MaxZero helper. +func sqsMsgVisKeyBytes(queueName string, gen uint64, visibleAtMillis int64, messageID string) []byte { + out := []byte(SQSMsgVisPrefix) + out = append(out, encodeSQSSegment(queueName)...) + out = binary.BigEndian.AppendUint64(out, gen) + out = binary.BigEndian.AppendUint64(out, sqsClampNonNegativeMillis(visibleAtMillis)) + return append(out, encodeSQSSegment(messageID)...) +} + +// sqsMsgByAgeKeyBytes reproduces adapter/sqs_keys.go sqsMsgByAgeKey: +// prefix + base64url(queue) + BE-u64(gen) + BE-u64(sendTs) + +// base64url(messageID). Negative sendTs clamps to zero. +func sqsMsgByAgeKeyBytes(queueName string, gen uint64, sendTimestampMs int64, messageID string) []byte { + out := []byte(SQSMsgByAgePrefix) + out = append(out, encodeSQSSegment(queueName)...) + out = binary.BigEndian.AppendUint64(out, gen) + out = binary.BigEndian.AppendUint64(out, sqsClampNonNegativeMillis(sendTimestampMs)) + return append(out, encodeSQSSegment(messageID)...) +} + +// sqsMsgDedupKeyBytes reproduces adapter/sqs_keys.go sqsMsgDedupKey: +// prefix + base64url(queue) + BE-u64(gen) + base64url(dedupID). +func sqsMsgDedupKeyBytes(queueName string, gen uint64, dedupID string) []byte { + out := []byte(SQSMsgDedupPrefix) + out = append(out, encodeSQSSegment(queueName)...) + out = binary.BigEndian.AppendUint64(out, gen) + return append(out, encodeSQSSegment(dedupID)...) +} + +// sqsClampNonNegativeMillis mirrors adapter/sqs_messages.go uint64MaxZero: +// wall-clock millis should never be negative, but a negative int64 would +// silently overflow under a direct uint64() cast and produce a far-future +// key, so clamp to zero defensively. +func sqsClampNonNegativeMillis(v int64) uint64 { + if v < 0 { + return 0 + } + return uint64(v) +} + +// encodeFifoDedupRecordBytes mirrors adapter/sqs_fifo.go encodeFifoDedupRecord: +// straight json.Marshal of the four-field struct. Wrapped with WithStack so +// callers get a uniform stack-trace at the error site. +func encodeFifoDedupRecordBytes(r *sqsFifoDedupRecord) ([]byte, error) { + b, err := json.Marshal(r) + if err != nil { + return nil, errors.WithStack(err) + } + return b, nil +} + +// addSideRecords emits the vis + byage + (conditional) dedup rows that the +// live adapter would have written alongside the !sqs|msg|data| record M5-1 +// already stages. No !sqs|msg|group| rows are emitted at any time — see +// docs/design/2026_05_30_proposed_sqs_side_record_derivation.md "Families" +// table for why (loadFifoGroupLock treats key presence alone as "lock held"; +// any value would permanently block every group post-restore). +// +// Emission rules: +// - vis: always emitted, visibleAt = rec.AvailableAtMillis (= "visible now"). +// - byage: always emitted, sendTs = rec.SendTimestampMillis (required by the +// reaper to honor MessageRetentionPeriod after restore). +// - dedup: FIFO + non-empty MessageDedupID only; ExpiresAtMillis = +// SendTimestampMs + sqsFifoDedupWindowMillis. +func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, meta sqsQueueMetaPublic, rec sqsMessageRecord) error { + if rec.MessageID == "" { + return errors.Wrap(ErrSQSEncodeInvalidMessage, "side records require non-empty message_id") + } + + visKey := sqsMsgVisKeyBytes(queueName, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID) + if err := b.Add(visKey, []byte(rec.MessageID), 0); err != nil { + return err + } + + byAgeKey := sqsMsgByAgeKeyBytes(queueName, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID) + if err := b.Add(byAgeKey, []byte(rec.MessageID), 0); err != nil { + return err + } + + if !meta.FIFO || rec.MessageDedupID == "" { + return nil + } + dedupRec := &sqsFifoDedupRecord{ + MessageID: rec.MessageID, + SendTimestampMs: rec.SendTimestampMillis, + ExpiresAtMillis: rec.SendTimestampMillis + sqsFifoDedupWindowMillis, + OriginalSequence: rec.SequenceNumber, + } + val, err := encodeFifoDedupRecordBytes(dedupRec) + if err != nil { + return err + } + dedupKey := sqsMsgDedupKeyBytes(queueName, sqsRestoreGeneration, rec.MessageDedupID) + return b.Add(dedupKey, val, 0) +} diff --git a/internal/backup/encode_sqs_side_test.go b/internal/backup/encode_sqs_side_test.go new file mode 100644 index 000000000..593ce399b --- /dev/null +++ b/internal/backup/encode_sqs_side_test.go @@ -0,0 +1,222 @@ +package backup + +import ( + "bytes" + "encoding/json" + "testing" +) + +// liveSQSEntries encodes inRoot and returns the decoded live +// (userKey, userValue) records — used to inspect derived vis / byage / +// dedup rows directly, since the M5-1 decoder ignores side records and a +// directory round-trip cannot observe them. Mirrors liveDDBEntries. +func liveSQSEntries(t *testing.T, inRoot string) []RoundTripEntry { + t.Helper() + b := newSnapshotBuilder(sqsEncTS) + if err := NewSQSRecordEncoder(inRoot).Encode(b); err != nil { + t.Fatalf("SQSRecordEncoder.Encode: %v", err) + } + var buf bytes.Buffer + if _, err := b.WriteTo(&buf); err != nil { + t.Fatalf("WriteTo: %v", err) + } + entries, _, err := DecodeLiveEntries(&buf) + if err != nil { + t.Fatalf("DecodeLiveEntries: %v", err) + } + return entries +} + +// splitSQSEntries groups encoded entries by family for assertion. The five +// categories the M5-2 derivation rules touch are data / vis / byage / +// dedup / group; queue meta / gen / seq are ignored. +func splitSQSEntries(entries []RoundTripEntry) (data, vis, byage, dedup, group []RoundTripEntry) { + for _, e := range entries { + switch { + case bytes.HasPrefix(e.UserKey, []byte(SQSMsgDataPrefix)): + data = append(data, e) + case bytes.HasPrefix(e.UserKey, []byte(SQSMsgVisPrefix)): + vis = append(vis, e) + case bytes.HasPrefix(e.UserKey, []byte(SQSMsgByAgePrefix)): + byage = append(byage, e) + case bytes.HasPrefix(e.UserKey, []byte(SQSMsgDedupPrefix)): + dedup = append(dedup, e) + case bytes.HasPrefix(e.UserKey, []byte(SQSMsgGroupPrefix)): + group = append(group, e) + } + } + return data, vis, byage, dedup, group +} + +// TestSQSEncodeSideRecordsCrossCheckClassic pins byte-identical key + value +// output between this package and the live adapter constructors for a +// classic FIFO queue carrying varied dedup state. It is the §"Encoder +// cross-check" pattern the parent design mandates and the M5-2 design +// doc lists as gold-standard. Partitioned variant deferred to M5-3. +func TestSQSEncodeSideRecordsCrossCheckClassic(t *testing.T) { + t.Parallel() + in := t.TempDir() + const queue = "billing-fifo" + const fixedSendTs = int64(1_700_000_000_000) + const fixedAvailTs = int64(1_700_000_000_500) + writeSQSQueue(t, in, queue, + []byte(`{"format_version":1,"name":"billing-fifo","fifo":true,"partition_count":1,"generation":1}`), + [][]byte{ + []byte(`{"format_version":1,"message_id":"m1","body":"a","send_timestamp_millis":1700000000000,"available_at_millis":1700000000500,"sequence_number":1,"message_group_id":"g1","message_deduplication_id":"d1"}`), + []byte(`{"format_version":1,"message_id":"m2","body":"b","send_timestamp_millis":1700000000100,"available_at_millis":1700000000600,"sequence_number":2,"message_group_id":"g1","message_deduplication_id":"d2"}`), + }, + ) + + entries := liveSQSEntries(t, in) + data, vis, byage, dedup, group := splitSQSEntries(entries) + if len(data) != 2 || len(vis) != 2 || len(byage) != 2 || len(dedup) != 2 { + t.Fatalf("families = data=%d vis=%d byage=%d dedup=%d, want 2/2/2/2", len(data), len(vis), len(byage), len(dedup)) + } + if len(group) != 0 { + t.Fatalf("group rows = %d, want 0 (group MUST NOT be emitted — see design doc / gemini critical #885)", len(group)) + } + + // Direct constructor cross-check: every emitted vis/byage/dedup key + // must equal the byte-string the adapter's send path would write + // for the same (queue, gen, ts, msgID, dedup-id) tuple. + wantKeys := map[string][]byte{ + "vis[m1]": sqsMsgVisKeyBytes(queue, sqsRestoreGeneration, fixedAvailTs, "m1"), + "vis[m2]": sqsMsgVisKeyBytes(queue, sqsRestoreGeneration, fixedAvailTs+100, "m2"), + "byage[m1]": sqsMsgByAgeKeyBytes(queue, sqsRestoreGeneration, fixedSendTs, "m1"), + "byage[m2]": sqsMsgByAgeKeyBytes(queue, sqsRestoreGeneration, fixedSendTs+100, "m2"), + "dedup[d1]": sqsMsgDedupKeyBytes(queue, sqsRestoreGeneration, "d1"), + "dedup[d2]": sqsMsgDedupKeyBytes(queue, sqsRestoreGeneration, "d2"), + } + keyToValue := indexEntriesByKey(append(append(vis, byage...), dedup...)) + for label, want := range wantKeys { + if _, ok := keyToValue[string(want)]; !ok { + t.Errorf("missing %s key: %x", label, want) + } + } + + // Value cross-check: vis + byage values equal []byte(messageID). + for _, c := range []struct{ label, msgID string }{ + {"vis[m1]", "m1"}, {"vis[m2]", "m2"}, {"byage[m1]", "m1"}, {"byage[m2]", "m2"}, + } { + got := keyToValue[string(wantKeys[c.label])] + if !bytes.Equal(got, []byte(c.msgID)) { + t.Errorf("%s value = %x, want %q", c.label, got, c.msgID) + } + } + + // Dedup value cross-check: sqsFifoDedupRecord JSON with all four + // fields the live writer (sqs_fifo.go:219-223) produces. + wantD1 := sqsFifoDedupRecord{ + MessageID: "m1", SendTimestampMs: fixedSendTs, + ExpiresAtMillis: fixedSendTs + sqsFifoDedupWindowMillis, OriginalSequence: 1, + } + assertDedupValue(t, "d1", keyToValue[string(wantKeys["dedup[d1]"])], wantD1) +} + +// indexEntriesByKey collapses a slice of RoundTripEntry to a key→value map +// for direct lookup. Test helper kept terse for cyclomatic-complexity room. +func indexEntriesByKey(entries []RoundTripEntry) map[string][]byte { + out := make(map[string][]byte, len(entries)) + for _, e := range entries { + out[string(e.UserKey)] = e.UserValue + } + return out +} + +// assertDedupValue compares an emitted dedup row's value against the +// sqsFifoDedupRecord the live writer would have produced for the same +// fixture, fatal-fail on JSON parse error and t.Errorf on field mismatch. +func assertDedupValue(t *testing.T, label string, got []byte, want sqsFifoDedupRecord) { + t.Helper() + var rec sqsFifoDedupRecord + if err := json.Unmarshal(got, &rec); err != nil { + t.Fatalf("unmarshal dedup[%s]: %v", label, err) + } + if rec != want { + t.Errorf("dedup[%s] = %+v, want %+v", label, rec, want) + } +} + +// TestSQSEncodeSideRecordsStandardQueueOmitsFIFOFamilies pins that a +// non-FIFO queue emits vis + byage but NEVER dedup or group, matching +// adapter/sqs_keys.go semantics (dedup keys make no sense for standard +// queues and would poison the keyspace). +func TestSQSEncodeSideRecordsStandardQueueOmitsFIFOFamilies(t *testing.T) { + t.Parallel() + in := t.TempDir() + const queue = "standard-q" + writeSQSQueue(t, in, queue, + []byte(`{"format_version":1,"name":"standard-q","fifo":false,"partition_count":1,"generation":1}`), + [][]byte{ + []byte(`{"format_version":1,"message_id":"m1","body":"a","send_timestamp_millis":1700000000000,"available_at_millis":1700000000000,"sequence_number":0,"message_deduplication_id":"d-set-but-ignored"}`), + }, + ) + + _, vis, byage, dedup, group := splitSQSEntries(liveSQSEntries(t, in)) + if len(vis) != 1 || len(byage) != 1 { + t.Fatalf("vis=%d byage=%d, want 1/1 (standard queue still gets these)", len(vis), len(byage)) + } + if len(dedup) != 0 { + t.Errorf("dedup rows on a standard queue = %d, want 0 (would poison the keyspace)", len(dedup)) + } + if len(group) != 0 { + t.Errorf("group rows on a standard queue = %d, want 0", len(group)) + } +} + +// TestSQSEncodeSideRecordsEmptyDedupOmitsDedupRow pins that a FIFO +// message with an empty message_dedup_id gets vis + byage but no +// dedup row. The live writer only inserts dedup when the producer +// supplied a dedup-id, so the encoder must mirror that gate. +func TestSQSEncodeSideRecordsEmptyDedupOmitsDedupRow(t *testing.T) { + t.Parallel() + in := t.TempDir() + const queue = "fifo-empty-dedup" + writeSQSQueue(t, in, queue, + []byte(`{"format_version":1,"name":"fifo-empty-dedup","fifo":true,"partition_count":1,"generation":1}`), + [][]byte{ + []byte(`{"format_version":1,"message_id":"m1","body":"a","send_timestamp_millis":1700000000000,"available_at_millis":1700000000000,"sequence_number":1,"message_group_id":"g1","message_deduplication_id":""}`), + }, + ) + + _, vis, byage, dedup, _ := splitSQSEntries(liveSQSEntries(t, in)) + if len(vis) != 1 || len(byage) != 1 { + t.Fatalf("vis=%d byage=%d, want 1/1", len(vis), len(byage)) + } + if len(dedup) != 0 { + t.Errorf("dedup rows for empty dedup-id = %d, want 0", len(dedup)) + } +} + +// TestSQSEncodeSideRecordsNoGroupRows is the regression pin for gemini +// critical #885: a FIFO queue with multiple distinct message_group_ids +// MUST produce zero !sqs|msg|group| rows. Any row would falsely indicate +// a held group lock and permanently block receives for that group (the +// live loadFifoGroupLock returns the parsed lock whenever the key exists; +// only missing-key returns nil). +func TestSQSEncodeSideRecordsNoGroupRows(t *testing.T) { + t.Parallel() + in := t.TempDir() + const queue = "fifo-multi-group" + writeSQSQueue(t, in, queue, + []byte(`{"format_version":1,"name":"fifo-multi-group","fifo":true,"partition_count":1,"generation":1}`), + [][]byte{ + []byte(`{"format_version":1,"message_id":"m1","body":"a","send_timestamp_millis":1700000000000,"available_at_millis":1700000000000,"sequence_number":1,"message_group_id":"g1","message_deduplication_id":"d1"}`), + []byte(`{"format_version":1,"message_id":"m2","body":"b","send_timestamp_millis":1700000000100,"available_at_millis":1700000000100,"sequence_number":2,"message_group_id":"g2","message_deduplication_id":"d2"}`), + []byte(`{"format_version":1,"message_id":"m3","body":"c","send_timestamp_millis":1700000000200,"available_at_millis":1700000000200,"sequence_number":3,"message_group_id":"g3","message_deduplication_id":"d3"}`), + }, + ) + + _, _, _, _, group := splitSQSEntries(liveSQSEntries(t, in)) + if len(group) != 0 { + t.Fatalf("group rows = %d (keys=%x), want 0 — emission would block all groups post-restore (gemini critical #885)", len(group), groupKeysHex(group)) + } +} + +func groupKeysHex(rows []RoundTripEntry) []string { + out := make([]string, 0, len(rows)) + for _, r := range rows { + out = append(out, string(r.UserKey)) + } + return out +} From de70bfca10fe8515fad80594b93e5a325a8fcfa9 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 30 May 2026 23:18:22 +0900 Subject: [PATCH 2/4] backup: #892 - address gemini medium-priority perf findings on M5-2 Six allocator-hygiene improvements from gemini review on the freshly-opened M5-2 PR. All are pure perf, no semantic change: 1. Pre-allocate the three side-record key constructor backing slices with `len(prefix) + 64` capacity (sqsSideKeyAllocBytes constant added), matching the live adapter's sqsKeyCapLarge tuning at adapter/sqs_messages.go:68. The previous `[]byte(SQSMsgVisPrefix)` form forced multiple grow-and-copy cycles as the BE-u64 segments and the base64url(messageID) tail were appended. 2. Take `*sqsQueueMetaPublic` and `*sqsMessageRecord` in addSideRecords instead of by-value. The per-message loop in encodeQueueMessages is the hottest path of the SQS encoder; both structs carry several strings + a map (attributes), so the value-copy was a non-trivial per-iteration cost. 3. Convert rec.MessageID to []byte once at the top of addSideRecords and reuse for both vis and byage values, instead of twice via two separate `[]byte(rec.MessageID)` conversions. Caller audit: addSideRecords has exactly one caller in the package (encodeQueueMessages); call site updated to pass `&meta` + `&records[i]`. Function is strictly read-only on both pointer args, so no aliasing or mutation concerns. Function still returns the same error contract; no fail-closed/return-value/error-handling semantics changed. Tests + lint green. --- internal/backup/encode_sqs.go | 2 +- internal/backup/encode_sqs_side.go | 22 ++++++++++++++++------ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/internal/backup/encode_sqs.go b/internal/backup/encode_sqs.go index d6ff91521..1702e58c1 100644 --- a/internal/backup/encode_sqs.go +++ b/internal/backup/encode_sqs.go @@ -211,7 +211,7 @@ func (e *SQSRecordEncoder) encodeQueueMessages(b *snapshotBuilder, root *os.Root if err != nil { return err } - if err := e.addSideRecords(b, meta.Name, meta, records[i]); err != nil { + if err := e.addSideRecords(b, meta.Name, &meta, &records[i]); err != nil { return err } if seq > maxSeq { diff --git a/internal/backup/encode_sqs_side.go b/internal/backup/encode_sqs_side.go index f57741bfe..3eea06590 100644 --- a/internal/backup/encode_sqs_side.go +++ b/internal/backup/encode_sqs_side.go @@ -12,6 +12,12 @@ import ( // will be expired the next time the live FIFO send path inspects them. const sqsFifoDedupWindowMillis int64 = 5 * 60 * 1000 +// sqsSideKeyAllocBytes mirrors the live adapter's sqsKeyCapLarge tuning +// (adapter/sqs_messages.go:68): a 64-byte tail after the prefix is large +// enough to hold the BE-u64 generation + visibleAt + base64url(messageID) +// for typical queue / message IDs without forcing a re-allocation. +const sqsSideKeyAllocBytes = 64 + // sqsFifoDedupRecord mirrors the live struct at adapter/sqs_fifo.go:25. // Duplicated here (rather than imported) so the encoder package can run // without a circular dependency on adapter, matching the pattern M3b-3 @@ -28,7 +34,8 @@ type sqsFifoDedupRecord struct { // base64url(messageID). Negative visibleAt clamps to zero, matching the // live uint64MaxZero helper. func sqsMsgVisKeyBytes(queueName string, gen uint64, visibleAtMillis int64, messageID string) []byte { - out := []byte(SQSMsgVisPrefix) + out := make([]byte, 0, len(SQSMsgVisPrefix)+sqsSideKeyAllocBytes) + out = append(out, SQSMsgVisPrefix...) out = append(out, encodeSQSSegment(queueName)...) out = binary.BigEndian.AppendUint64(out, gen) out = binary.BigEndian.AppendUint64(out, sqsClampNonNegativeMillis(visibleAtMillis)) @@ -39,7 +46,8 @@ func sqsMsgVisKeyBytes(queueName string, gen uint64, visibleAtMillis int64, mess // prefix + base64url(queue) + BE-u64(gen) + BE-u64(sendTs) + // base64url(messageID). Negative sendTs clamps to zero. func sqsMsgByAgeKeyBytes(queueName string, gen uint64, sendTimestampMs int64, messageID string) []byte { - out := []byte(SQSMsgByAgePrefix) + out := make([]byte, 0, len(SQSMsgByAgePrefix)+sqsSideKeyAllocBytes) + out = append(out, SQSMsgByAgePrefix...) out = append(out, encodeSQSSegment(queueName)...) out = binary.BigEndian.AppendUint64(out, gen) out = binary.BigEndian.AppendUint64(out, sqsClampNonNegativeMillis(sendTimestampMs)) @@ -49,7 +57,8 @@ func sqsMsgByAgeKeyBytes(queueName string, gen uint64, sendTimestampMs int64, me // sqsMsgDedupKeyBytes reproduces adapter/sqs_keys.go sqsMsgDedupKey: // prefix + base64url(queue) + BE-u64(gen) + base64url(dedupID). func sqsMsgDedupKeyBytes(queueName string, gen uint64, dedupID string) []byte { - out := []byte(SQSMsgDedupPrefix) + out := make([]byte, 0, len(SQSMsgDedupPrefix)+sqsSideKeyAllocBytes) + out = append(out, SQSMsgDedupPrefix...) out = append(out, encodeSQSSegment(queueName)...) out = binary.BigEndian.AppendUint64(out, gen) return append(out, encodeSQSSegment(dedupID)...) @@ -90,18 +99,19 @@ func encodeFifoDedupRecordBytes(r *sqsFifoDedupRecord) ([]byte, error) { // reaper to honor MessageRetentionPeriod after restore). // - dedup: FIFO + non-empty MessageDedupID only; ExpiresAtMillis = // SendTimestampMs + sqsFifoDedupWindowMillis. -func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, meta sqsQueueMetaPublic, rec sqsMessageRecord) error { +func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error { if rec.MessageID == "" { return errors.Wrap(ErrSQSEncodeInvalidMessage, "side records require non-empty message_id") } + msgIDBytes := []byte(rec.MessageID) visKey := sqsMsgVisKeyBytes(queueName, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID) - if err := b.Add(visKey, []byte(rec.MessageID), 0); err != nil { + if err := b.Add(visKey, msgIDBytes, 0); err != nil { return err } byAgeKey := sqsMsgByAgeKeyBytes(queueName, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID) - if err := b.Add(byAgeKey, []byte(rec.MessageID), 0); err != nil { + if err := b.Add(byAgeKey, msgIDBytes, 0); err != nil { return err } From b133e5cb0e479a2fd335674fb42cb8e483408736 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 30 May 2026 23:41:10 +0900 Subject: [PATCH 3/4] backup: #892 - fix CBD dedup data-loss on restore + claude review observations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses claude review observations 1-4 on PR #892. The headline change is a correctness fix; the rest are documentation/cleanup. ## Obs 2 (medium, real data-loss bug): CBD dedup derivation on restore The live adapter resolves a FIFO dedup-id with adapter/sqs_fifo.go's resolveFifoDedupID: explicit MessageDeduplicationId wins; otherwise on ContentBasedDeduplication queues it's sha256(body) hex-encoded; otherwise empty. The dedup KEY in the live keyspace is written with the resolved value, but the stored MESSAGE record only carries in.MessageDeduplicationId (adapter/sqs_messages.go:680) — the user-supplied one. So a CBD-FIFO send round-trips through the M5-1 dump format as message_deduplication_id="", and the prior empty-only emission gate would silently skip the dedup row, leaving the restored cluster vulnerable to within-5-minute duplicate sends for every CBD queue. Fix: encode_sqs_side.go grows resolveDedupID(rec, meta) that mirrors the live resolveFifoDedupID exactly (explicit wins; otherwise CBD -> hex.EncodeToString(sha256.Sum256(rec.Body))). addSideRecords now uses it to gate the dedup row. rec.Body is the raw message bytes verbatim from the dump (sqsMessageBody = []byte), matching what the live path hashed at send time over []byte(in.MessageBody). New test TestSQSEncodeSideRecordsCBDDerivesDedupID pins it: a CBD-FIFO fixture with empty message_deduplication_id produces exactly one dedup row whose key matches sqsMsgDedupKeyBytes(queue, hex(sha256(body))). Caller audit per CLAUDE.md (semantic change = emits more dedup rows in CBD case; strictly additive, never fewer): addSideRecords has one caller (encodeQueueMessages); no caller-side change needed — the function still returns the same error contract and the new emissions go through the existing b.Add error path. ## Obs 1 (medium, comment accuracy) The TestSQSEncodeSideRecordsCrossCheckClassic comment overstated its scope ("byte-identical cross-check against the live adapter constructors"). The test actually compares against THIS package's sqsMsg{Vis,ByAge,Dedup}KeyBytes, not the adapter's unexported originals. Cross-package parity is maintained by manual code review of encode_sqs_side.go against the cited adapter locations; the adapter functions are unexported so a runtime cross-check can't be expressed here. Comment updated accordingly. ## Obs 3 (nit, redundant guard) addSideRecords's empty-MessageID guard was unreachable: addMessage (called first in encodeQueueMessages) already returns ErrSQSEncodeInvalidMessage for the empty case. Removed per CLAUDE.md "Don't add error handling for scenarios that can't happen." ## Obs 4 (nit, "visible now" comment) Replaced the misleading "(visible now)" comment with an accurate description: visibleAt = AvailableAtMillis, which may be a future timestamp for a delayed message captured before its delay expired; the live ReceiveMessage path honors that schedule identically. ## Drive-by cleanup sqsMsgDedupKeyBytes lost its gen parameter (always sqsRestoreGeneration at every call site, lint flagged unparam). All call sites updated; the function now hardcodes the constant with a header comment explaining why we diverge from the live adapter's signature (M5-2 only writes to the fresh-restore generation). Tests + lint green. --- internal/backup/encode_sqs_side.go | 58 +++++++++++++++++++------ internal/backup/encode_sqs_side_test.go | 56 +++++++++++++++++++++--- 2 files changed, 94 insertions(+), 20 deletions(-) diff --git a/internal/backup/encode_sqs_side.go b/internal/backup/encode_sqs_side.go index 3eea06590..5da82e99f 100644 --- a/internal/backup/encode_sqs_side.go +++ b/internal/backup/encode_sqs_side.go @@ -1,7 +1,9 @@ package backup import ( + "crypto/sha256" "encoding/binary" + "encoding/hex" "encoding/json" "github.com/cockroachdb/errors" @@ -55,12 +57,16 @@ func sqsMsgByAgeKeyBytes(queueName string, gen uint64, sendTimestampMs int64, me } // sqsMsgDedupKeyBytes reproduces adapter/sqs_keys.go sqsMsgDedupKey: -// prefix + base64url(queue) + BE-u64(gen) + base64url(dedupID). -func sqsMsgDedupKeyBytes(queueName string, gen uint64, dedupID string) []byte { +// prefix + base64url(queue) + BE-u64(sqsRestoreGeneration) + +// base64url(dedupID). The live adapter signature accepts a variable gen, +// but every M5-2 call site uses sqsRestoreGeneration (a fresh restore +// has exactly one live generation, with no superseded counters to +// reference), so the parameter is hardcoded here to satisfy unparam. +func sqsMsgDedupKeyBytes(queueName, dedupID string) []byte { out := make([]byte, 0, len(SQSMsgDedupPrefix)+sqsSideKeyAllocBytes) out = append(out, SQSMsgDedupPrefix...) out = append(out, encodeSQSSegment(queueName)...) - out = binary.BigEndian.AppendUint64(out, gen) + out = binary.BigEndian.AppendUint64(out, sqsRestoreGeneration) return append(out, encodeSQSSegment(dedupID)...) } @@ -86,6 +92,27 @@ func encodeFifoDedupRecordBytes(r *sqsFifoDedupRecord) ([]byte, error) { return b, nil } +// resolveDedupID mirrors adapter/sqs_fifo.go resolveFifoDedupID exactly: an +// explicit MessageDeduplicationId wins; otherwise on ContentBasedDeduplication +// queues the dedup-id is sha256(body) hex-encoded; otherwise empty (= no row). +// +// This is the critical CBD-correctness path: the live adapter writes only the +// USER-SUPPLIED MessageDeduplicationId into the stored sqsStoredMessage +// (sqs_messages.go:680), NOT the resolved one. So a CBD-FIFO message round- +// trips through the dump as message_deduplication_id="", and a naive +// non-empty-only gate would silently lose dedup protection on restore for +// every CBD queue. We have to redo the live derivation at restore time. +func resolveDedupID(rec *sqsMessageRecord, meta *sqsQueueMetaPublic) string { + if rec.MessageDedupID != "" { + return rec.MessageDedupID + } + if meta.ContentBasedDeduplication { + sum := sha256.Sum256(rec.Body) + return hex.EncodeToString(sum[:]) + } + return "" +} + // addSideRecords emits the vis + byage + (conditional) dedup rows that the // live adapter would have written alongside the !sqs|msg|data| record M5-1 // already stages. No !sqs|msg|group| rows are emitted at any time — see @@ -94,15 +121,16 @@ func encodeFifoDedupRecordBytes(r *sqsFifoDedupRecord) ([]byte, error) { // any value would permanently block every group post-restore). // // Emission rules: -// - vis: always emitted, visibleAt = rec.AvailableAtMillis (= "visible now"). -// - byage: always emitted, sendTs = rec.SendTimestampMillis (required by the -// reaper to honor MessageRetentionPeriod after restore). -// - dedup: FIFO + non-empty MessageDedupID only; ExpiresAtMillis = -// SendTimestampMs + sqsFifoDedupWindowMillis. +// - vis: always emitted, visibleAt = rec.AvailableAtMillis. For delayed +// messages captured before their delay expired this is in the future; +// the live ReceiveMessage path honors this exactly as it would for a +// fresh send (the message is invisible until the scheduled time). +// - byage: always emitted, sendTs = rec.SendTimestampMillis (required by +// the reaper to honor MessageRetentionPeriod after restore). +// - dedup: FIFO + resolveDedupID(rec, meta) non-empty. ExpiresAtMillis = +// SendTimestampMs + sqsFifoDedupWindowMillis. CBD queues get a SHA-256 +// derived dedup-id (matches adapter/sqs_fifo.go resolveFifoDedupID). func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error { - if rec.MessageID == "" { - return errors.Wrap(ErrSQSEncodeInvalidMessage, "side records require non-empty message_id") - } msgIDBytes := []byte(rec.MessageID) visKey := sqsMsgVisKeyBytes(queueName, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID) @@ -115,7 +143,11 @@ func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, return err } - if !meta.FIFO || rec.MessageDedupID == "" { + if !meta.FIFO { + return nil + } + dedupID := resolveDedupID(rec, meta) + if dedupID == "" { return nil } dedupRec := &sqsFifoDedupRecord{ @@ -128,6 +160,6 @@ func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, if err != nil { return err } - dedupKey := sqsMsgDedupKeyBytes(queueName, sqsRestoreGeneration, rec.MessageDedupID) + dedupKey := sqsMsgDedupKeyBytes(queueName, dedupID) return b.Add(dedupKey, val, 0) } diff --git a/internal/backup/encode_sqs_side_test.go b/internal/backup/encode_sqs_side_test.go index 593ce399b..fe4ce021d 100644 --- a/internal/backup/encode_sqs_side_test.go +++ b/internal/backup/encode_sqs_side_test.go @@ -2,6 +2,8 @@ package backup import ( "bytes" + "crypto/sha256" + "encoding/hex" "encoding/json" "testing" ) @@ -48,11 +50,16 @@ func splitSQSEntries(entries []RoundTripEntry) (data, vis, byage, dedup, group [ return data, vis, byage, dedup, group } -// TestSQSEncodeSideRecordsCrossCheckClassic pins byte-identical key + value -// output between this package and the live adapter constructors for a -// classic FIFO queue carrying varied dedup state. It is the §"Encoder -// cross-check" pattern the parent design mandates and the M5-2 design -// doc lists as gold-standard. Partitioned variant deferred to M5-3. +// TestSQSEncodeSideRecordsCrossCheckClassic pins that addSideRecords +// produces keys consistent with this package's key builders +// (sqsMsgVisKeyBytes, sqsMsgByAgeKeyBytes, sqsMsgDedupKeyBytes). It is the +// §"Encoder cross-check" pattern the parent design mandates, restricted +// to in-package parity: cross-package parity with the adapter's +// unexported sqsMsgVisKey / sqsMsgByAgeKey / sqsMsgDedupKey is maintained +// by manual code review of encode_sqs_side.go against the cited adapter +// locations (the adapter functions are unexported and not callable from +// this package, so a runtime cross-check cannot be expressed here). +// Partitioned variant deferred to M5-3. func TestSQSEncodeSideRecordsCrossCheckClassic(t *testing.T) { t.Parallel() in := t.TempDir() @@ -84,8 +91,8 @@ func TestSQSEncodeSideRecordsCrossCheckClassic(t *testing.T) { "vis[m2]": sqsMsgVisKeyBytes(queue, sqsRestoreGeneration, fixedAvailTs+100, "m2"), "byage[m1]": sqsMsgByAgeKeyBytes(queue, sqsRestoreGeneration, fixedSendTs, "m1"), "byage[m2]": sqsMsgByAgeKeyBytes(queue, sqsRestoreGeneration, fixedSendTs+100, "m2"), - "dedup[d1]": sqsMsgDedupKeyBytes(queue, sqsRestoreGeneration, "d1"), - "dedup[d2]": sqsMsgDedupKeyBytes(queue, sqsRestoreGeneration, "d2"), + "dedup[d1]": sqsMsgDedupKeyBytes(queue, "d1"), + "dedup[d2]": sqsMsgDedupKeyBytes(queue, "d2"), } keyToValue := indexEntriesByKey(append(append(vis, byage...), dedup...)) for label, want := range wantKeys { @@ -188,6 +195,41 @@ func TestSQSEncodeSideRecordsEmptyDedupOmitsDedupRow(t *testing.T) { } } +// TestSQSEncodeSideRecordsCBDDerivesDedupID pins the CBD-correctness +// path: a FIFO queue with content_based_deduplication=true whose +// messages have empty message_deduplication_id MUST get dedup rows +// derived from sha256(body), matching adapter/sqs_fifo.go +// resolveFifoDedupID. The live adapter stores only the user-supplied +// MessageDeduplicationId on the message (sqs_messages.go:680), so the +// dump faithfully carries an empty field for CBD sends — without +// re-deriving the hash at restore time, every CBD queue would silently +// lose dedup protection within the 5-minute window (claude review Obs 2 +// on PR #892). +func TestSQSEncodeSideRecordsCBDDerivesDedupID(t *testing.T) { + t.Parallel() + in := t.TempDir() + const queue = "fifo-cbd" + writeSQSQueue(t, in, queue, + []byte(`{"format_version":1,"name":"fifo-cbd","fifo":true,"content_based_deduplication":true,"partition_count":1,"generation":1}`), + [][]byte{ + []byte(`{"format_version":1,"message_id":"m1","body":"alpha","send_timestamp_millis":1700000000000,"available_at_millis":1700000000000,"sequence_number":1,"message_group_id":"g1","message_deduplication_id":""}`), + }, + ) + + _, _, _, dedup, _ := splitSQSEntries(liveSQSEntries(t, in)) + if len(dedup) != 1 { + t.Fatalf("dedup rows = %d, want 1 (CBD must derive dedup-id from sha256(body))", len(dedup)) + } + // Cross-check the derived key against the same sha256 the live + // resolveFifoDedupID would compute for body="alpha". + sum := sha256.Sum256([]byte("alpha")) + wantDedupID := hex.EncodeToString(sum[:]) + wantKey := sqsMsgDedupKeyBytes(queue, wantDedupID) + if !bytes.Equal(dedup[0].UserKey, wantKey) { + t.Errorf("dedup key = %x, want %x (sha256-derived from body)", dedup[0].UserKey, wantKey) + } +} + // TestSQSEncodeSideRecordsNoGroupRows is the regression pin for gemini // critical #885: a FIFO queue with multiple distinct message_group_ids // MUST produce zero !sqs|msg|group| rows. Any row would falsely indicate From 3f9be73920e2f9019fd6130f2572a4f8a2b4e2a6 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 30 May 2026 23:51:39 +0900 Subject: [PATCH 4/4] backup: #892 - add d2 assertDedupValue to cross-check test (coderabbit nit) --- internal/backup/encode_sqs_side_test.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/internal/backup/encode_sqs_side_test.go b/internal/backup/encode_sqs_side_test.go index fe4ce021d..a819f655b 100644 --- a/internal/backup/encode_sqs_side_test.go +++ b/internal/backup/encode_sqs_side_test.go @@ -112,12 +112,19 @@ func TestSQSEncodeSideRecordsCrossCheckClassic(t *testing.T) { } // Dedup value cross-check: sqsFifoDedupRecord JSON with all four - // fields the live writer (sqs_fifo.go:219-223) produces. + // fields the live writer (sqs_fifo.go:219-223) produces. Both + // messages are asserted so per-message sequence + timestamp + // threading (and not just constant fields) is pinned. wantD1 := sqsFifoDedupRecord{ MessageID: "m1", SendTimestampMs: fixedSendTs, ExpiresAtMillis: fixedSendTs + sqsFifoDedupWindowMillis, OriginalSequence: 1, } assertDedupValue(t, "d1", keyToValue[string(wantKeys["dedup[d1]"])], wantD1) + wantD2 := sqsFifoDedupRecord{ + MessageID: "m2", SendTimestampMs: fixedSendTs + 100, + ExpiresAtMillis: fixedSendTs + 100 + sqsFifoDedupWindowMillis, OriginalSequence: 2, + } + assertDedupValue(t, "d2", keyToValue[string(wantKeys["dedup[d2]"])], wantD2) } // indexEntriesByKey collapses a slice of RoundTripEntry to a key→value map