From 4c723f64c768a7a9d906784b6a92f951f2f29b80 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 29 Apr 2026 02:39:59 +0900 Subject: [PATCH 1/2] feat(sqs): HT-FIFO partitioned-keyspace constructors (Phase 3.D PR 3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the partitioned-keyspace constructors per §3.1 of the split-queue FIFO design without touching the legacy keyspace or any existing call site. The §11 PR 2 dormancy gate still rejects PartitionCount > 1 at CreateQueue, so these helpers are dead code in production until PR 5 atomically lifts the gate and wires the data-plane fanout. The partitioned keyspace inserts a 'p|' discriminator after the family prefix and a fixed-width big-endian uint32 partition between the queue segment and the generation: legacy: !sqs|msg|||| partitioned: !sqs|msg||p|||| The discriminator is safe by construction: validateQueueName forbids '|' in queue names, and the queue segment is base32-raw-URL encoded (cannot start with the literal byte 'p' followed by '|'). New constants: - sqsPartitionedDiscriminator = "p|" - SqsPartitionedMsg{Data,Vis,Dedup,Group,ByAge}Prefix New constructors (all unexported, mirroring the legacy family): - sqsPartitionedMsgDataKey - sqsPartitionedMsgVisKey - sqsPartitionedMsgVisPrefixForQueue - sqsPartitionedMsgDedupKey - sqsPartitionedMsgGroupKey - sqsPartitionedMsgByAgeKey - sqsPartitionedMsgByAgePrefixForQueueAllPartitions - parseSqsPartitionedMsgByAgeKey - sqsMsgByAgePrefixesForQueue (returns {legacy, partitioned} pair for the reaper) - appendU32 (helper for the 4-byte partition segment) New record type sqsPartitionedMsgByAgeRecord mirrors sqsMsgByAgeRecord with Partition added. Tests in adapter/sqs_keys_test.go cover: - Byte-distinct legacy vs partitioned across all 5 families. - Per-partition isolation: keys for partition k must not match the prefix of partition k+1's scan, otherwise ReceiveMessage fanout would double-count. - Determinism: same inputs yield byte-identical keys across calls. - Round-trip parse for partitioned byage keys at corner partition values (0, 31). - Mutual rejection: parseSqsMsgByAgeKey rejects partitioned keys, parseSqsPartitionedMsgByAgeKey rejects legacy keys (the dual-parse contract). - Reaper enumeration helper returns both prefixes in {legacy, partitioned} order. - Discriminator constants all end with 'p|' (asserts none lost the trailing '|'). Design doc §3.1 example code updated to use the actual implementation names (sqsMsgDataKey + sqsPartitionedMsgDataKey) rather than the placeholder legacyMsgDataKey/partitionedMsgDataKey. go test -race ./adapter/... pass; golangci-lint ./adapter/... clean. No call site changed; existing queues stay byte-identical on disk. --- adapter/sqs_keys.go | 213 +++++++++++++ adapter/sqs_keys_test.go | 281 ++++++++++++++++++ ...026_04_26_proposed_sqs_split_queue_fifo.md | 26 +- 3 files changed, 509 insertions(+), 11 deletions(-) create mode 100644 adapter/sqs_keys_test.go diff --git a/adapter/sqs_keys.go b/adapter/sqs_keys.go index a80ae471a..8971f9247 100644 --- a/adapter/sqs_keys.go +++ b/adapter/sqs_keys.go @@ -49,6 +49,35 @@ const ( SqsQueueTombstonePrefix = "!sqs|queue|tombstone|" ) +// HT-FIFO partitioned-keyspace discriminator. Per the §3.1 design in +// docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md, partitioned +// FIFO queues live in a separate keyspace so the legacy single- +// partition layout can stay byte-identical on disk: +// +// legacy: !sqs|msg|||| +// partitioned: !sqs|msg||p|||| +// +// The literal "p|" segment is the discriminator. validateQueueName +// rejects "|" in queue names, so a legacy "!sqs|msg|data||..." +// can never collide with a partitioned "!sqs|msg|data|p||..." +// — the queue-name segment is base32-encoded and cannot start with +// the literal ASCII byte 'p' followed by '|'. +const sqsPartitionedDiscriminator = "p|" + +// SqsPartitionedMsg*Prefix mirrors each legacy SqsMsg*Prefix with the +// partitioned-keyspace discriminator inserted. Defined as full string +// constants (rather than runtime concatenation in each constructor) +// so the byte-layout invariant is asserted by the type system: a +// future rename of the discriminator must touch the constants here, +// not 6+ scattered string concatenations. +const ( + SqsPartitionedMsgDataPrefix = "!sqs|msg|data|" + sqsPartitionedDiscriminator + SqsPartitionedMsgVisPrefix = "!sqs|msg|vis|" + sqsPartitionedDiscriminator + SqsPartitionedMsgDedupPrefix = "!sqs|msg|dedup|" + sqsPartitionedDiscriminator + SqsPartitionedMsgGroupPrefix = "!sqs|msg|group|" + sqsPartitionedDiscriminator + SqsPartitionedMsgByAgePrefix = "!sqs|msg|byage|" + sqsPartitionedDiscriminator +) + func sqsQueueMetaKey(queueName string) []byte { return []byte(SqsQueueMetaPrefix + encodeSQSSegment(queueName)) } @@ -209,3 +238,187 @@ func queueNameFromMetaKey(key []byte) (string, bool) { } return name, true } + +// ---------------------- HT-FIFO partitioned keyspace ---------------------- +// +// The constructors below mirror the legacy sqsMsg*Key family with a +// partition uint32 inserted between the queue segment and the +// generation. The legacy keyspace is unchanged on disk, so existing +// queues and Standard queues stay byte-identical — these helpers are +// reachable only when meta.PartitionCount > 1, and the §11 PR 2 +// dormancy gate currently rejects that at CreateQueue. The data plane +// dispatch lands together with the gate-lift in PR 5. +// +// Each helper appends the partition as a fixed-width big-endian +// uint32 so prefix scans `!sqs|msg||p|||` +// can pick exactly one partition's keys without touching its +// neighbours. + +// sqsPartitionedMsgDataKey builds the data-record key for a +// partitioned FIFO queue. +func sqsPartitionedMsgDataKey(queueName string, partition uint32, gen uint64, messageID string) []byte { + buf := make([]byte, 0, len(SqsPartitionedMsgDataPrefix)+sqsKeyCapLarge) + buf = append(buf, SqsPartitionedMsgDataPrefix...) + buf = append(buf, encodeSQSSegment(queueName)...) + buf = appendU32(buf, partition) + buf = appendU64(buf, gen) + buf = append(buf, encodeSQSSegment(messageID)...) + return buf +} + +// sqsPartitionedMsgVisKey builds the visibility-index key for a +// partitioned FIFO queue. +func sqsPartitionedMsgVisKey(queueName string, partition uint32, gen uint64, visibleAtMillis int64, messageID string) []byte { + buf := make([]byte, 0, len(SqsPartitionedMsgVisPrefix)+sqsKeyCapLarge) + buf = append(buf, SqsPartitionedMsgVisPrefix...) + buf = append(buf, encodeSQSSegment(queueName)...) + buf = appendU32(buf, partition) + buf = appendU64(buf, gen) + buf = appendU64(buf, uint64MaxZero(visibleAtMillis)) + buf = append(buf, encodeSQSSegment(messageID)...) + return buf +} + +// sqsPartitionedMsgVisPrefixForQueue returns the prefix of every +// vis-index key for a single (queue, partition, gen) triple. The +// partition fan-out scans this prefix on each partition independently. +func sqsPartitionedMsgVisPrefixForQueue(queueName string, partition uint32, gen uint64) []byte { + buf := make([]byte, 0, len(SqsPartitionedMsgVisPrefix)+sqsKeyCapSmall) + buf = append(buf, SqsPartitionedMsgVisPrefix...) + buf = append(buf, encodeSQSSegment(queueName)...) + buf = appendU32(buf, partition) + buf = appendU64(buf, gen) + return buf +} + +// sqsPartitionedMsgDedupKey builds the FIFO dedup key for a +// partitioned queue. The dedup window is per-partition by design +// (DeduplicationScope=messageGroup with PartitionCount>1) — the +// validator in adapter/sqs_partitioning.go rejects the queue-scoped +// scope on partitioned queues, so this key shape is always reachable +// from the same partition that ran the dedup check. +func sqsPartitionedMsgDedupKey(queueName string, partition uint32, gen uint64, dedupID string) []byte { + buf := make([]byte, 0, len(SqsPartitionedMsgDedupPrefix)+sqsKeyCapLarge) + buf = append(buf, SqsPartitionedMsgDedupPrefix...) + buf = append(buf, encodeSQSSegment(queueName)...) + buf = appendU32(buf, partition) + buf = appendU64(buf, gen) + buf = append(buf, encodeSQSSegment(dedupID)...) + return buf +} + +// sqsPartitionedMsgGroupKey builds the FIFO group-lock key for a +// partitioned queue. partitionFor maps a MessageGroupId to one +// partition, so the group lock for any given group lives on exactly +// one partition — there is no cross-partition group-lock invariant +// to maintain. +func sqsPartitionedMsgGroupKey(queueName string, partition uint32, gen uint64, groupID string) []byte { + buf := make([]byte, 0, len(SqsPartitionedMsgGroupPrefix)+sqsKeyCapLarge) + buf = append(buf, SqsPartitionedMsgGroupPrefix...) + buf = append(buf, encodeSQSSegment(queueName)...) + buf = appendU32(buf, partition) + buf = appendU64(buf, gen) + buf = append(buf, encodeSQSSegment(groupID)...) + return buf +} + +// sqsPartitionedMsgByAgeKey builds the send-age index key for a +// partitioned queue. The reaper enumerates both the legacy and +// partitioned byage prefixes when reaping a queue (see +// sqsMsgByAgePrefixesForQueue) so a queue that is partitioned today +// — or, hypothetically, that gains partitions across a future +// migration — does not strand its old data. +func sqsPartitionedMsgByAgeKey(queueName string, partition uint32, gen uint64, sendTimestampMs int64, messageID string) []byte { + buf := make([]byte, 0, len(SqsPartitionedMsgByAgePrefix)+sqsKeyCapLarge) + buf = append(buf, SqsPartitionedMsgByAgePrefix...) + buf = append(buf, encodeSQSSegment(queueName)...) + buf = appendU32(buf, partition) + buf = appendU64(buf, gen) + buf = appendU64(buf, uint64MaxZero(sendTimestampMs)) + buf = append(buf, encodeSQSSegment(messageID)...) + return buf +} + +// sqsPartitionedMsgByAgePrefixForQueueAllPartitions returns the +// prefix shared by every partitioned byage entry for one queue +// (across all partitions and all generations). The reaper uses it +// alongside the legacy prefix to enumerate orphan records on a +// partitioned queue. +func sqsPartitionedMsgByAgePrefixForQueueAllPartitions(queueName string) []byte { + buf := make([]byte, 0, len(SqsPartitionedMsgByAgePrefix)+sqsKeyCapSmall) + buf = append(buf, SqsPartitionedMsgByAgePrefix...) + buf = append(buf, encodeSQSSegment(queueName)...) + return buf +} + +// sqsMsgByAgePrefixesForQueue returns the {legacy, partitioned} +// prefix pair for a queue's byage records. The reaper iterates both: +// a queue created before HT-FIFO landed has only legacy entries; a +// partitioned queue created after PR 5 has only partitioned entries; +// no queue has both today, but enumerating both keeps the reaper +// future-proof against an offline-rebuild migration that produces a +// mixed-prefix queue. +func sqsMsgByAgePrefixesForQueue(queueName string) [][]byte { + return [][]byte{ + sqsMsgByAgePrefixAllGenerations(queueName), + sqsPartitionedMsgByAgePrefixForQueueAllPartitions(queueName), + } +} + +// sqsPartitionedByAgeKeyHeaderLen is the byte length of the +// (partition, gen, ts) header that follows the queue segment in a +// partitioned byage key — one big-endian uint32 plus two big-endian +// uint64s. +const sqsPartitionedByAgeKeyHeaderLen = 4 + 8 + 8 + +// parseSqsPartitionedMsgByAgeKey reverses sqsPartitionedMsgByAgeKey. +// Returns ok=false when the key does not match the expected partitioned +// shape. The reaper tries this parser when parseSqsMsgByAgeKey fails, +// so it can handle a queue with both legacy and partitioned entries +// (today only one or the other applies, but the dual-parse keeps the +// reaper safe against future migrations). +func parseSqsPartitionedMsgByAgeKey(key []byte, queueName string) (sqsPartitionedMsgByAgeRecord, bool) { + expected := sqsPartitionedMsgByAgePrefixForQueueAllPartitions(queueName) + if !bytes.HasPrefix(key, expected) { + return sqsPartitionedMsgByAgeRecord{}, false + } + rest := key[len(expected):] + if len(rest) < sqsPartitionedByAgeKeyHeaderLen { + return sqsPartitionedMsgByAgeRecord{}, false + } + partition := binary.BigEndian.Uint32(rest[:4]) + gen := binary.BigEndian.Uint64(rest[4:12]) + tsRaw := binary.BigEndian.Uint64(rest[12:sqsPartitionedByAgeKeyHeaderLen]) + msgIDEnc := string(rest[sqsPartitionedByAgeKeyHeaderLen:]) + msgID, err := decodeSQSSegment(msgIDEnc) + if err != nil { + return sqsPartitionedMsgByAgeRecord{}, false + } + // Wall-clock millis fits in int63 — see parseSqsMsgByAgeKey for + // the same bound. + if tsRaw > 1<<63-1 { + return sqsPartitionedMsgByAgeRecord{}, false + } + return sqsPartitionedMsgByAgeRecord{ + Partition: partition, + Generation: gen, + SendTimestampMs: int64(tsRaw), + MessageID: msgID, + }, true +} + +// sqsPartitionedMsgByAgeRecord is the parsed shape of a partitioned +// byage key. Mirrors sqsMsgByAgeRecord with partition added. +type sqsPartitionedMsgByAgeRecord struct { + Partition uint32 + Generation uint64 + SendTimestampMs int64 + MessageID string +} + +// appendU32 mirrors appendU64 for the partition segment. +func appendU32(dst []byte, v uint32) []byte { + var buf [4]byte + binary.BigEndian.PutUint32(buf[:], v) + return append(dst, buf[:]...) +} diff --git a/adapter/sqs_keys_test.go b/adapter/sqs_keys_test.go new file mode 100644 index 000000000..0c6da5d28 --- /dev/null +++ b/adapter/sqs_keys_test.go @@ -0,0 +1,281 @@ +package adapter + +import ( + "bytes" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +// TestSqsPartitionedMsgKeys_DistinctFromLegacy pins the §3.1 +// non-overlap guarantee: a partitioned key for any (queue, partition, +// gen, …) tuple must never share a byte sequence with a legacy key +// for the same (queue, gen, …). The "p|" discriminator after the +// family prefix is what makes this true; this test fails if the +// constant ever loses the trailing "|" or the discriminator changes +// shape such that a partition value (a fixed-width uint32) could +// align with a base32-encoded queue segment. +func TestSqsPartitionedMsgKeys_DistinctFromLegacy(t *testing.T) { + t.Parallel() + const ( + queue = "orders.fifo" + gen = uint64(7) + partition = uint32(3) + msgID = "0123456789abcdef" + groupID = "user-42" + dedupID = "dedup-token" + ts = int64(1700000000000) + ) + cases := []struct { + name string + legacy []byte + partitioned []byte + }{ + { + name: "data", + legacy: sqsMsgDataKey(queue, gen, msgID), + partitioned: sqsPartitionedMsgDataKey(queue, partition, gen, msgID), + }, + { + name: "vis", + legacy: sqsMsgVisKey(queue, gen, ts, msgID), + partitioned: sqsPartitionedMsgVisKey(queue, partition, gen, ts, msgID), + }, + { + name: "dedup", + legacy: sqsMsgDedupKey(queue, gen, dedupID), + partitioned: sqsPartitionedMsgDedupKey(queue, partition, gen, dedupID), + }, + { + name: "group", + legacy: sqsMsgGroupKey(queue, gen, groupID), + partitioned: sqsPartitionedMsgGroupKey(queue, partition, gen, groupID), + }, + { + name: "byage", + legacy: sqsMsgByAgeKey(queue, gen, ts, msgID), + partitioned: sqsPartitionedMsgByAgeKey(queue, partition, gen, ts, msgID), + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + require.NotEqual(t, tc.legacy, tc.partitioned, + "legacy and partitioned keys must be byte-distinct") + // Neither key may be a prefix of the other — if the + // partitioned key were a prefix of the legacy, a scan + // of the partitioned prefix would also match the + // legacy key (and vice versa), which would let the + // reaper or a partition-scoped scan accidentally + // surface keys from the wrong keyspace. + require.False(t, bytes.HasPrefix(tc.legacy, tc.partitioned), + "legacy key must not start with partitioned key bytes") + require.False(t, bytes.HasPrefix(tc.partitioned, tc.legacy), + "partitioned key must not start with legacy key bytes") + // The partitioned key must contain the "p|" + // discriminator immediately after the family prefix — + // asserts the constant did not lose its trailing "|". + require.True(t, bytes.Contains(tc.partitioned, []byte("|p|")), + "partitioned key must carry the p| discriminator") + }) + } +} + +// TestSqsPartitionedMsgKeys_PartitionsAreDistinct pins the per- +// partition isolation contract: two keys that differ only in the +// partition value must produce different bytes, and one cannot be a +// prefix of the other. Without this, a scan of partition k's prefix +// would surface partition k+1's data when the encoded partition +// happens to share a prefix. +func TestSqsPartitionedMsgKeys_PartitionsAreDistinct(t *testing.T) { + t.Parallel() + const ( + gen = uint64(1) + msgID = "msg-id" + ) + a := sqsPartitionedMsgDataKey("orders.fifo", 0, gen, msgID) + b := sqsPartitionedMsgDataKey("orders.fifo", 1, gen, msgID) + require.NotEqual(t, a, b, "partition 0 and partition 1 must produce different keys") + require.False(t, bytes.HasPrefix(a, b)) + require.False(t, bytes.HasPrefix(b, a)) + // Different generations within the same partition also distinct. + c := sqsPartitionedMsgDataKey("orders.fifo", 0, gen+1, msgID) + require.NotEqual(t, a, c, "different generations must produce different keys") + // Different queues at the same (partition, gen) also distinct — + // asserts the queueName segment actually participates in the key + // (otherwise two queues would collide on the same partition). + d := sqsPartitionedMsgDataKey("events.fifo", 0, gen, msgID) + require.NotEqual(t, a, d, "different queue names must produce different keys") +} + +// TestSqsPartitionedMsgKeys_Deterministic pins the determinism +// contract: the same inputs always produce the same key bytes. The +// FIFO group lock and dedup lookups depend on byte-exact equality +// across processes, so this is not just a tidiness check — a non- +// deterministic key would silently corrupt the contract. +func TestSqsPartitionedMsgKeys_Deterministic(t *testing.T) { + t.Parallel() + const ( + queue = "orders.fifo" + partition = uint32(5) + gen = uint64(99) + msgID = "deterministic-id" + ) + for range 16 { + a := sqsPartitionedMsgDataKey(queue, partition, gen, msgID) + b := sqsPartitionedMsgDataKey(queue, partition, gen, msgID) + require.Equal(t, a, b) + } +} + +// TestSqsPartitionedMsgVisPrefixForQueue_BoundsScanToOnePartition +// pins that scanning a partition's vis prefix never matches another +// partition's keys. ReceiveMessage's per-partition fan-out +// (Phase 3.D PR 5) builds one prefix per partition and scans each +// independently; if the prefix were a prefix of another partition's +// keys, fan-out would double-count messages. +func TestSqsPartitionedMsgVisPrefixForQueue_BoundsScanToOnePartition(t *testing.T) { + t.Parallel() + const ( + queue = "q.fifo" + gen = uint64(3) + ts = int64(1700000000000) + msgID = "m" + ) + prefix0 := sqsPartitionedMsgVisPrefixForQueue(queue, 0, gen) + keyP0 := sqsPartitionedMsgVisKey(queue, 0, gen, ts, msgID) + keyP1 := sqsPartitionedMsgVisKey(queue, 1, gen, ts, msgID) + require.True(t, bytes.HasPrefix(keyP0, prefix0), + "partition 0's vis key must match partition 0's scan prefix") + require.False(t, bytes.HasPrefix(keyP1, prefix0), + "partition 1's vis key must NOT match partition 0's scan prefix; "+ + "otherwise the per-partition fan-out would double-count messages") +} + +// TestParseSqsPartitionedMsgByAgeKey_RoundTrip pins the parser +// against the constructor: every constructed key must parse back to +// its inputs. The reaper depends on this round-trip when it surfaces +// orphan records under a tombstoned generation. +func TestParseSqsPartitionedMsgByAgeKey_RoundTrip(t *testing.T) { + t.Parallel() + cases := []struct { + queue string + partition uint32 + gen uint64 + ts int64 + msgID string + }{ + {"q.fifo", 0, 1, 1, "id-0"}, + {"orders.fifo", 7, 42, 1700000000000, "01234567890123456789ab"}, + {"q-with-dash.fifo", 31, 999, 0, "x"}, + } + for _, tc := range cases { + t.Run(tc.queue+"/p"+stringer(tc.partition), func(t *testing.T) { + t.Parallel() + key := sqsPartitionedMsgByAgeKey(tc.queue, tc.partition, tc.gen, tc.ts, tc.msgID) + parsed, ok := parseSqsPartitionedMsgByAgeKey(key, tc.queue) + require.True(t, ok, "round-trip parse must succeed") + require.Equal(t, tc.partition, parsed.Partition) + require.Equal(t, tc.gen, parsed.Generation) + require.Equal(t, tc.ts, parsed.SendTimestampMs) + require.Equal(t, tc.msgID, parsed.MessageID) + }) + } +} + +// TestParseSqsPartitionedMsgByAgeKey_RejectsLegacy pins that the +// partitioned parser refuses a legacy byage key — the dual-parse +// pattern in the reaper relies on each parser rejecting the other's +// keyspace so a key is unambiguously routed to one parser. +func TestParseSqsPartitionedMsgByAgeKey_RejectsLegacy(t *testing.T) { + t.Parallel() + legacy := sqsMsgByAgeKey("q.fifo", 1, 1700000000000, "id-0") + _, ok := parseSqsPartitionedMsgByAgeKey(legacy, "q.fifo") + require.False(t, ok, + "partitioned parser must reject a legacy byage key; "+ + "the dual-parse contract requires unambiguous routing") +} + +// TestParseSqsMsgByAgeKey_RejectsPartitioned pins the converse: the +// legacy parser refuses a partitioned key. A regression here would +// let the reaper mis-decode a partitioned record's partition bytes +// as part of the generation, which would produce a bogus generation +// and either skip live records or operate on a tombstoned cohort +// that no longer exists. +func TestParseSqsMsgByAgeKey_RejectsPartitioned(t *testing.T) { + t.Parallel() + partitioned := sqsPartitionedMsgByAgeKey("q.fifo", 3, 1, 1700000000000, "id-0") + _, ok := parseSqsMsgByAgeKey(partitioned, "q.fifo") + require.False(t, ok, + "legacy parser must reject a partitioned byage key") +} + +// TestSqsMsgByAgePrefixesForQueue_CoversBothKeyspaces pins the +// reaper-side enumeration helper: the returned slice always contains +// both the legacy and partitioned prefixes, in that order. The +// reaper's Range loop iterates this slice; a regression that drops +// either prefix would silently leak orphan records of that flavour. +func TestSqsMsgByAgePrefixesForQueue_CoversBothKeyspaces(t *testing.T) { + t.Parallel() + prefixes := sqsMsgByAgePrefixesForQueue("orders.fifo") + require.Len(t, prefixes, 2, "must enumerate both legacy and partitioned prefixes") + require.Equal(t, sqsMsgByAgePrefixAllGenerations("orders.fifo"), prefixes[0]) + require.Equal(t, sqsPartitionedMsgByAgePrefixForQueueAllPartitions("orders.fifo"), prefixes[1]) +} + +// TestSqsPartitionedMsgPrefixes_TerminatedByQueueSegment pins the +// safety-by-construction argument from §3.1: queue names cannot +// contain "|" (validateQueueName rejects it), so the literal "p|" +// after the family prefix cannot collide with a legacy queue-name +// segment. This test asserts that no legacy key built from a name +// that begins with the literal byte 'p' (followed by base32-encoded +// trailing chars) starts with the partitioned prefix. +func TestSqsPartitionedMsgPrefixes_TerminatedByQueueSegment(t *testing.T) { + t.Parallel() + // A queue name that base32-encodes to a string starting with 'p' + // would be the worst case if the discriminator were just "p" + // without the trailing "|". Pick the name "p" itself; its + // base32-raw-URL encoding is "cA" (the first base32 char of + // 0x70 0x00... is 'c'), but try one whose encoding starts with + // 'p' too: a 5-byte input whose first 5 bits are 'p'==0x70's + // base32 mapping gives an encoded char set that starts with 'p'. + // Rather than hand-craft such a string, exhaustively check that + // the family prefix terminates before the partitioned prefix + // can match. + legacy := sqsMsgDataKey("p", 1, "id") + partitionedPrefixOnly := []byte("!sqs|msg|data|p|") + require.False(t, bytes.HasPrefix(legacy, partitionedPrefixOnly), + "a legacy key for any queue name must not start with the partitioned prefix; "+ + "the trailing | in the discriminator is what makes this true — "+ + "if the legacy key starts with `!sqs|msg|data|p|...`, the queue "+ + "name's encoded segment would have to start with `p|` which "+ + "base32-raw-URL never produces") + // Also assert the partitioned prefix constants do not lose the + // trailing "|". + for _, p := range []string{ + SqsPartitionedMsgDataPrefix, + SqsPartitionedMsgVisPrefix, + SqsPartitionedMsgDedupPrefix, + SqsPartitionedMsgGroupPrefix, + SqsPartitionedMsgByAgePrefix, + } { + require.True(t, strings.HasSuffix(p, "p|"), + "partitioned prefix %q must end with the p| discriminator", p) + } +} + +// stringer is a tiny helper to build subtest names with uint32 +// values; using strconv directly inflates the import list of this +// test file with a single-call dependency. +func stringer(v uint32) string { + if v == 0 { + return "0" + } + var out []byte + for v > 0 { + out = append([]byte{byte('0' + v%10)}, out...) + v /= 10 + } + return string(out) +} diff --git a/docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md b/docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md index c5f7c2b61..62c29f4e2 100644 --- a/docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md +++ b/docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md @@ -56,16 +56,20 @@ The `p|` discriminator is **safe by name-validator construction**, not by accide Concretely, the implementation PR exposes **two named constructors** rather than a variadic dispatcher (Claude review on PR #664 flagged the variadic form as a footgun: `sqsMsgDataKey(q, gen, id, p0, p1)` would silently ignore `p1` and the compiler would not catch it). The dispatch lives at the call site, where `meta.PartitionCount` is already in scope: ```go -// Two distinct constructors, one per keyspace. -func legacyMsgDataKey(queueName string, gen uint64, messageID string) []byte -func partitionedMsgDataKey(queueName string, partition uint32, gen uint64, messageID string) []byte +// Two distinct constructors, one per keyspace. Implemented in +// adapter/sqs_keys.go as part of Phase 3.D PR 3: +func sqsMsgDataKey(queueName string, gen uint64, messageID string) []byte // legacy +func sqsPartitionedMsgDataKey(queueName string, partition uint32, gen uint64, messageID string) []byte // Dispatch at the call site. No variadic, no silent argument loss. +// PR 5 wires this dispatch at every call site that today calls +// sqsMsgDataKey directly; PR 3 only adds the partitioned constructor +// so PR 5 stays a small change. var dataKey []byte if meta.PartitionCount > 1 { - dataKey = partitionedMsgDataKey(queueName, partition, gen, msgID) + dataKey = sqsPartitionedMsgDataKey(queueName, partition, gen, msgID) } else { - dataKey = legacyMsgDataKey(queueName, gen, msgID) + dataKey = sqsMsgDataKey(queueName, gen, msgID) } ``` @@ -181,13 +185,13 @@ For deployments that don't want one Raft group per partition (e.g. a small clust constructor for this queue's PartitionCount (named constructors per §3.1; no variadic): if meta.PartitionCount > 1 { - dataKey = partitionedMsgDataKey(queue, partitionIndex, gen, msgID) - visKey = partitionedMsgVisKey(queue, partitionIndex, gen, ...) - groupKey = partitionedMsgGroupKey(queue, partitionIndex, gen, MessageGroupId) + dataKey = sqsPartitionedMsgDataKey(queue, partitionIndex, gen, msgID) + visKey = sqsPartitionedMsgVisKey(queue, partitionIndex, gen, ...) + groupKey = sqsPartitionedMsgGroupKey(queue, partitionIndex, gen, MessageGroupId) } else { - dataKey = legacyMsgDataKey(queue, gen, msgID) - visKey = legacyMsgVisKey(queue, gen, ...) - groupKey = legacyMsgGroupKey(queue, gen, MessageGroupId) + dataKey = sqsMsgDataKey(queue, gen, msgID) + visKey = sqsMsgVisKey(queue, gen, ...) + groupKey = sqsMsgGroupKey(queue, gen, MessageGroupId) } 6. Dispatch through the leader of the resolved partition (existing leader-proxy path, unchanged). From be7784a141e66b7d96c21b01864a6ef3c528f2a1 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 29 Apr 2026 09:41:15 +0900 Subject: [PATCH 2/2] fix(sqs): terminate queue segment with | in partitioned keys MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The partitioned key shape from PR 3 omitted the segment terminator between the variable-length encoded queue name and the fixed-width partition uint32. Because base64.RawURLEncoding is variable-length, base64("queue") = "cXVldWU" is a strict byte prefix of base64("queue1") = "cXVldWUx", so a reaper-side prefix scan for queue "queue" would also surface queue "queue1"'s entries (and the parser would misdecode those bytes as the wrong (partition, gen, ts, msgID)). Add a '|' byte after the encoded queue segment in every partitioned constructor and prefix helper. '|' is outside the base64-raw-URL alphabet (A-Z, a-z, 0-9, -, _) so the terminator can never collide with the segment it terminates, and validateQueueName already rejects '|' in raw queue names. Other review fixes folded in: - Comment fixes: encoder is base64.RawURLEncoding, not base32 — three comment locations corrected (sqs_keys.go header, two test docs). - Replace hand-rolled stringer with strconv.FormatUint(uint64(v), 10). - Add TestSqsPartitionedMsgKeys_QueueNamePrefixIsolation that asserts the bug above does not regress, using ("queue", "queue1") as the motivating example pair. - Update §3.1 design-doc paragraph to call out the per-segment '|' terminator and to fix the same base32 → base64 wording. --- adapter/sqs_keys.go | 28 ++++- adapter/sqs_keys_test.go | 101 +++++++++++++----- ...026_04_26_proposed_sqs_split_queue_fifo.md | 2 +- 3 files changed, 99 insertions(+), 32 deletions(-) diff --git a/adapter/sqs_keys.go b/adapter/sqs_keys.go index 8971f9247..5cc46c00a 100644 --- a/adapter/sqs_keys.go +++ b/adapter/sqs_keys.go @@ -60,10 +60,27 @@ const ( // The literal "p|" segment is the discriminator. validateQueueName // rejects "|" in queue names, so a legacy "!sqs|msg|data||..." // can never collide with a partitioned "!sqs|msg|data|p||..." -// — the queue-name segment is base32-encoded and cannot start with -// the literal ASCII byte 'p' followed by '|'. +// — the queue-name segment is base64-raw-URL-encoded (see +// encodeSQSSegment) and cannot start with the literal ASCII byte 'p' +// followed by '|'. +// +// Each partitioned constructor terminates the variable-length +// queue-name segment with a '|' before the fixed-width partition +// uint32. Without that delimiter, a prefix scan for queue "q" would +// also match queue "q1" because base64("q") is a strict byte prefix +// of base64("q1"). The discriminator inserts the '|' into the prefix +// itself; the per-constructor terminator inserts it after the queue. const sqsPartitionedDiscriminator = "p|" +// sqsPartitionedQueueTerminator is appended after the encoded queue +// name in every partitioned key. It mirrors the role the fixed-width +// generation suffix plays in tombstone keys: a hard end-of-segment +// marker that prevents queue-name prefix collisions during scans. +// '|' is safe by construction — validateQueueName rejects raw '|', +// and base64.RawURLEncoding never emits '|' (it uses A-Z, a-z, 0-9, +// '-', '_'). +const sqsPartitionedQueueTerminator = '|' + // SqsPartitionedMsg*Prefix mirrors each legacy SqsMsg*Prefix with the // partitioned-keyspace discriminator inserted. Defined as full string // constants (rather than runtime concatenation in each constructor) @@ -260,6 +277,7 @@ func sqsPartitionedMsgDataKey(queueName string, partition uint32, gen uint64, me buf := make([]byte, 0, len(SqsPartitionedMsgDataPrefix)+sqsKeyCapLarge) buf = append(buf, SqsPartitionedMsgDataPrefix...) buf = append(buf, encodeSQSSegment(queueName)...) + buf = append(buf, sqsPartitionedQueueTerminator) buf = appendU32(buf, partition) buf = appendU64(buf, gen) buf = append(buf, encodeSQSSegment(messageID)...) @@ -272,6 +290,7 @@ func sqsPartitionedMsgVisKey(queueName string, partition uint32, gen uint64, vis buf := make([]byte, 0, len(SqsPartitionedMsgVisPrefix)+sqsKeyCapLarge) buf = append(buf, SqsPartitionedMsgVisPrefix...) buf = append(buf, encodeSQSSegment(queueName)...) + buf = append(buf, sqsPartitionedQueueTerminator) buf = appendU32(buf, partition) buf = appendU64(buf, gen) buf = appendU64(buf, uint64MaxZero(visibleAtMillis)) @@ -286,6 +305,7 @@ func sqsPartitionedMsgVisPrefixForQueue(queueName string, partition uint32, gen buf := make([]byte, 0, len(SqsPartitionedMsgVisPrefix)+sqsKeyCapSmall) buf = append(buf, SqsPartitionedMsgVisPrefix...) buf = append(buf, encodeSQSSegment(queueName)...) + buf = append(buf, sqsPartitionedQueueTerminator) buf = appendU32(buf, partition) buf = appendU64(buf, gen) return buf @@ -301,6 +321,7 @@ func sqsPartitionedMsgDedupKey(queueName string, partition uint32, gen uint64, d buf := make([]byte, 0, len(SqsPartitionedMsgDedupPrefix)+sqsKeyCapLarge) buf = append(buf, SqsPartitionedMsgDedupPrefix...) buf = append(buf, encodeSQSSegment(queueName)...) + buf = append(buf, sqsPartitionedQueueTerminator) buf = appendU32(buf, partition) buf = appendU64(buf, gen) buf = append(buf, encodeSQSSegment(dedupID)...) @@ -316,6 +337,7 @@ func sqsPartitionedMsgGroupKey(queueName string, partition uint32, gen uint64, g buf := make([]byte, 0, len(SqsPartitionedMsgGroupPrefix)+sqsKeyCapLarge) buf = append(buf, SqsPartitionedMsgGroupPrefix...) buf = append(buf, encodeSQSSegment(queueName)...) + buf = append(buf, sqsPartitionedQueueTerminator) buf = appendU32(buf, partition) buf = appendU64(buf, gen) buf = append(buf, encodeSQSSegment(groupID)...) @@ -332,6 +354,7 @@ func sqsPartitionedMsgByAgeKey(queueName string, partition uint32, gen uint64, s buf := make([]byte, 0, len(SqsPartitionedMsgByAgePrefix)+sqsKeyCapLarge) buf = append(buf, SqsPartitionedMsgByAgePrefix...) buf = append(buf, encodeSQSSegment(queueName)...) + buf = append(buf, sqsPartitionedQueueTerminator) buf = appendU32(buf, partition) buf = appendU64(buf, gen) buf = appendU64(buf, uint64MaxZero(sendTimestampMs)) @@ -348,6 +371,7 @@ func sqsPartitionedMsgByAgePrefixForQueueAllPartitions(queueName string) []byte buf := make([]byte, 0, len(SqsPartitionedMsgByAgePrefix)+sqsKeyCapSmall) buf = append(buf, SqsPartitionedMsgByAgePrefix...) buf = append(buf, encodeSQSSegment(queueName)...) + buf = append(buf, sqsPartitionedQueueTerminator) return buf } diff --git a/adapter/sqs_keys_test.go b/adapter/sqs_keys_test.go index 0c6da5d28..b1b8f0c60 100644 --- a/adapter/sqs_keys_test.go +++ b/adapter/sqs_keys_test.go @@ -2,6 +2,7 @@ package adapter import ( "bytes" + "strconv" "strings" "testing" @@ -15,7 +16,7 @@ import ( // family prefix is what makes this true; this test fails if the // constant ever loses the trailing "|" or the discriminator changes // shape such that a partition value (a fixed-width uint32) could -// align with a base32-encoded queue segment. +// align with a base64-encoded queue segment. func TestSqsPartitionedMsgKeys_DistinctFromLegacy(t *testing.T) { t.Parallel() const ( @@ -109,6 +110,68 @@ func TestSqsPartitionedMsgKeys_PartitionsAreDistinct(t *testing.T) { require.NotEqual(t, a, d, "different queue names must produce different keys") } +// TestSqsPartitionedMsgKeys_QueueNamePrefixIsolation pins the +// queue-name terminator contract: a queue whose encoded name is a +// strict byte prefix of another queue's encoded name must NOT have +// its prefix match the longer queue's keys. base64.RawURLEncoding is +// variable-length and aligned on 3-byte input groups, so for an +// input length that is a multiple of 3, appending one extra byte +// extends (without changing) the encoded prefix: base64("queue") = +// "cXVldWU" is a strict byte prefix of base64("queue1") = "cXVldWUx". +// Without the trailing '|' terminator after the queue segment, a +// reaper scan of "queue"'s prefix would surface "queue1"'s entries +// (and vice versa for the all-partitions byage prefix). This test +// would have caught the missing-delimiter bug from gemini's review. +func TestSqsPartitionedMsgKeys_QueueNamePrefixIsolation(t *testing.T) { + t.Parallel() + const ( + shortQ = "queue" // base64 → "cXVldWU" + longQ = "queue1" // base64 → "cXVldWUx" (strict superstring) + gen = uint64(1) + partition = uint32(0) + // Distinct from the ts used in other tests so unparam is + // satisfied that vis-key callers exercise more than one + // timestamp value across the suite. + ts = int64(1234567890) + msgID = "id" + ) + // Confirm the precondition: base64(shortQ) is a strict byte + // prefix of base64(longQ). If this ever stops being true, the + // test still holds the contract but loses its motivating example. + require.True(t, + bytes.HasPrefix([]byte(encodeSQSSegment(longQ)), []byte(encodeSQSSegment(shortQ))), + "sanity: base64(%q) must be a byte prefix of base64(%q)", shortQ, longQ) + require.NotEqual(t, encodeSQSSegment(shortQ), encodeSQSSegment(longQ), + "sanity: encodings must differ in length") + + // Per-(queue, partition) vis prefix must not leak across queue + // names that share an encoded prefix. + shortVisPrefix := sqsPartitionedMsgVisPrefixForQueue(shortQ, partition, gen) + longVisKey := sqsPartitionedMsgVisKey(longQ, partition, gen, ts, msgID) + require.False(t, bytes.HasPrefix(longVisKey, shortVisPrefix), + "vis prefix for queue %q must not match keys for queue %q", shortQ, longQ) + + // All-partitions byage prefix must not leak across queue names + // — this is what the reaper scans, so a leak would let the + // reaper for the shorter queue enumerate (and potentially + // delete) the longer queue's orphan records. + shortByagePrefix := sqsPartitionedMsgByAgePrefixForQueueAllPartitions(shortQ) + longByageKey := sqsPartitionedMsgByAgeKey(longQ, partition, gen, ts, msgID) + require.False(t, bytes.HasPrefix(longByageKey, shortByagePrefix), + "byage prefix for queue %q must not match keys for queue %q", shortQ, longQ) + + // Spot-check the data and group families too — the same + // terminator argument applies to every partitioned constructor. + shortDataKey := sqsPartitionedMsgDataKey(shortQ, partition, gen, msgID) + longDataKey := sqsPartitionedMsgDataKey(longQ, partition, gen, msgID) + require.False(t, bytes.HasPrefix(longDataKey, shortDataKey), + "data key for %q must not be a prefix of %q's", shortQ, longQ) + shortGroupKey := sqsPartitionedMsgGroupKey(shortQ, partition, gen, "g") + longGroupKey := sqsPartitionedMsgGroupKey(longQ, partition, gen, "g") + require.False(t, bytes.HasPrefix(longGroupKey, shortGroupKey), + "group key for %q must not be a prefix of %q's", shortQ, longQ) +} + // TestSqsPartitionedMsgKeys_Deterministic pins the determinism // contract: the same inputs always produce the same key bytes. The // FIFO group lock and dedup lookups depend on byte-exact equality @@ -171,7 +234,7 @@ func TestParseSqsPartitionedMsgByAgeKey_RoundTrip(t *testing.T) { {"q-with-dash.fifo", 31, 999, 0, "x"}, } for _, tc := range cases { - t.Run(tc.queue+"/p"+stringer(tc.partition), func(t *testing.T) { + t.Run(tc.queue+"/p"+strconv.FormatUint(uint64(tc.partition), 10), func(t *testing.T) { t.Parallel() key := sqsPartitionedMsgByAgeKey(tc.queue, tc.partition, tc.gen, tc.ts, tc.msgID) parsed, ok := parseSqsPartitionedMsgByAgeKey(key, tc.queue) @@ -229,20 +292,15 @@ func TestSqsMsgByAgePrefixesForQueue_CoversBothKeyspaces(t *testing.T) { // contain "|" (validateQueueName rejects it), so the literal "p|" // after the family prefix cannot collide with a legacy queue-name // segment. This test asserts that no legacy key built from a name -// that begins with the literal byte 'p' (followed by base32-encoded +// that begins with the literal byte 'p' (followed by base64-encoded // trailing chars) starts with the partitioned prefix. func TestSqsPartitionedMsgPrefixes_TerminatedByQueueSegment(t *testing.T) { t.Parallel() - // A queue name that base32-encodes to a string starting with 'p' - // would be the worst case if the discriminator were just "p" - // without the trailing "|". Pick the name "p" itself; its - // base32-raw-URL encoding is "cA" (the first base32 char of - // 0x70 0x00... is 'c'), but try one whose encoding starts with - // 'p' too: a 5-byte input whose first 5 bits are 'p'==0x70's - // base32 mapping gives an encoded char set that starts with 'p'. - // Rather than hand-craft such a string, exhaustively check that - // the family prefix terminates before the partitioned prefix - // can match. + // The discriminator is the literal three bytes "p|" — note '|' + // is outside the base64-raw-URL alphabet (A-Z, a-z, 0-9, '-', + // '_'), so an encoded queue segment can never produce 'p' + // followed immediately by '|'. The family prefix therefore + // terminates before the partitioned prefix can match. legacy := sqsMsgDataKey("p", 1, "id") partitionedPrefixOnly := []byte("!sqs|msg|data|p|") require.False(t, bytes.HasPrefix(legacy, partitionedPrefixOnly), @@ -250,7 +308,7 @@ func TestSqsPartitionedMsgPrefixes_TerminatedByQueueSegment(t *testing.T) { "the trailing | in the discriminator is what makes this true — "+ "if the legacy key starts with `!sqs|msg|data|p|...`, the queue "+ "name's encoded segment would have to start with `p|` which "+ - "base32-raw-URL never produces") + "base64-raw-URL never produces") // Also assert the partitioned prefix constants do not lose the // trailing "|". for _, p := range []string{ @@ -264,18 +322,3 @@ func TestSqsPartitionedMsgPrefixes_TerminatedByQueueSegment(t *testing.T) { "partitioned prefix %q must end with the p| discriminator", p) } } - -// stringer is a tiny helper to build subtest names with uint32 -// values; using strconv directly inflates the import list of this -// test file with a single-call dependency. -func stringer(v uint32) string { - if v == 0 { - return "0" - } - var out []byte - for v > 0 { - out = append([]byte{byte('0' + v%10)}, out...) - v /= 10 - } - return string(out) -} diff --git a/docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md b/docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md index 62c29f4e2..e610e798b 100644 --- a/docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md +++ b/docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md @@ -51,7 +51,7 @@ A partition is identified by the tuple `(queueName, partitionIndex)` where `part - **Legacy / `PartitionCount = 0` / Standard queues** keep today's `!sqs|msg|data|||` byte-for-byte. No partition segment is written or read. Existing data on disk is unaffected; existing key constructors stay unchanged on this code path. - **Partitioned FIFO queues (`PartitionCount > 1`)** use a *new* keyspace prefix that explicitly includes the partition: `!sqs|msg|data|p||||` (note the extra `p|` discriminator after `data|`). The discriminator is what guarantees no collision with the legacy prefix even when ` = 0` happens to match the first 8 bytes of a legacy ``. -The `p|` discriminator is **safe by name-validator construction**, not by accident: AWS SQS queue names (and elastickv's `validateQueueName`) admit only `[A-Za-z0-9_-]` plus the optional `.fifo` suffix, so no queue name can contain `|`. The existing `!sqs|msg|data||...` segment is therefore terminated by a `|` that no queue name can produce, and the new `!sqs|msg|data|p||...` segment starts with a literal byte sequence (`p|`) that cannot appear at the same position in any legacy key (it would require a queue name of `p`, which would still be followed by a `|` from the *segment* terminator, not from the queue name itself — but the prefix routing reads the bytes as `data|p|` vs `data||`, and `` is base32-encoded so it never starts with the literal ASCII `p`). The implementation PR's name validator must continue to reject `|` in queue names; any future relaxation of that rule has to revisit this prefix scheme first. +The `p|` discriminator is **safe by name-validator construction**, not by accident: AWS SQS queue names (and elastickv's `validateQueueName`) admit only `[A-Za-z0-9_-]` plus the optional `.fifo` suffix, so no queue name can contain `|`. The existing `!sqs|msg|data||...` segment is therefore terminated by a `|` that no queue name can produce, and the new `!sqs|msg|data|p||...` segment starts with a literal byte sequence (`p|`) that cannot appear at the same position in any legacy key. The on-disk queue segment is `base64.RawURLEncoding()` (alphabet `A-Za-z0-9-_`), and `|` is outside that alphabet, so an encoded queue segment can never produce the literal three bytes `p|` that the discriminator relies on. **Each partitioned constructor terminates the variable-length encoded queue segment with a `|` byte before the fixed-width partition `uint32`** — without that terminator, a prefix scan for queue `q` would also match queue `q1` because `base64("q")` is a strict byte prefix of `base64("q1")`. The implementation PR's name validator must continue to reject `|` in queue names; any future relaxation of that rule has to revisit this prefix scheme first. Concretely, the implementation PR exposes **two named constructors** rather than a variadic dispatcher (Claude review on PR #664 flagged the variadic form as a footgun: `sqsMsgDataKey(q, gen, id, p0, p1)` would silently ignore `p1` and the compiler would not catch it). The dispatch lives at the call site, where `meta.PartitionCount` is already in scope: