Skip to content

Commit fcb841d

Browse files
kevin-dpclaude
andcommitted
Key child collections by composite routing key to fix shared correlation key collision
When multiple parents share the same correlation key but have different parent-referenced filter values, child collections were incorrectly shared. Fix by keying child collections by (correlationKey, parentFilterValues) composite, and using composite child keys in the D2 stream to prevent collisions. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent e2f3d56 commit fcb841d

File tree

2 files changed

+159
-70
lines changed

2 files changed

+159
-70
lines changed

packages/db/src/query/compiler/index.ts

Lines changed: 89 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ import type { QueryCache, QueryMapping, WindowOptions } from './types.js'
3838

3939
export type { WindowOptions } from './types.js'
4040

41+
/** Symbol used to tag parent $selected with routing metadata for includes */
42+
export const INCLUDES_ROUTING = Symbol(`includesRouting`)
43+
4144
/**
4245
* Result of compiling an includes subquery, including the child pipeline
4346
* and metadata needed to route child results to parent-scoped Collections.
@@ -55,6 +58,8 @@ export interface IncludesCompilationResult {
5558
hasOrderBy: boolean
5659
/** Full compilation result for the child query (for nested includes + alias tracking) */
5760
childCompilationResult: CompilationResult
61+
/** Parent-side projection refs for parent-referencing filters */
62+
parentProjection?: Array<PropRef>
5863
}
5964

6065
/**
@@ -213,7 +218,11 @@ export function compileQuery(
213218
if (parentSide != null) {
214219
tagged.__parentContext = parentSide
215220
}
216-
return [childKey, tagged]
221+
const effectiveKey =
222+
parentSide != null
223+
? `${String(childKey)}::${JSON.stringify(parentSide)}`
224+
: childKey
225+
return [effectiveKey, tagged]
217226
}),
218227
)
219228

@@ -231,6 +240,7 @@ export function compileQuery(
231240
const nsRow: Record<string, any> = { [mainSource]: cleanRow }
232241
if (__parentContext) {
233242
Object.assign(nsRow, __parentContext)
243+
;(nsRow as any).__parentContext = __parentContext
234244
}
235245
const ret = [key, nsRow] as [string, Record<string, typeof row>]
236246
return ret
@@ -291,6 +301,13 @@ export function compileQuery(
291301
// This must happen AFTER WHERE (so parent pipeline is filtered) but BEFORE processSelect
292302
// (so IncludesSubquery nodes are stripped before select compilation).
293303
const includesResults: Array<IncludesCompilationResult> = []
304+
const includesRoutingFns: Array<{
305+
fieldName: string
306+
getRouting: (nsRow: any) => {
307+
correlationKey: unknown
308+
parentContext: Record<string, any> | null
309+
}
310+
}> = []
294311
if (query.select) {
295312
const includesEntries = extractIncludesFromSelect(query.select)
296313
for (const { key, subquery } of includesEntries) {
@@ -374,30 +391,52 @@ export function compileQuery(
374391
subquery.query.orderBy && subquery.query.orderBy.length > 0
375392
),
376393
childCompilationResult: childResult,
394+
parentProjection: subquery.parentProjection,
377395
})
378396

397+
// Capture routing function for INCLUDES_ROUTING tagging
398+
if (subquery.parentProjection && subquery.parentProjection.length > 0) {
399+
const compiledProjs = subquery.parentProjection.map((ref) => ({
400+
alias: ref.path[0]!,
401+
field: ref.path.slice(1),
402+
compiled: compileExpression(ref),
403+
}))
404+
const compiledCorr = compiledCorrelation
405+
includesRoutingFns.push({
406+
fieldName: subquery.fieldName,
407+
getRouting: (nsRow: any) => {
408+
const parentContext: Record<string, Record<string, any>> = {}
409+
for (const proj of compiledProjs) {
410+
if (!parentContext[proj.alias]) {
411+
parentContext[proj.alias] = {}
412+
}
413+
const value = proj.compiled(nsRow)
414+
let target = parentContext[proj.alias]!
415+
for (let i = 0; i < proj.field.length - 1; i++) {
416+
if (!target[proj.field[i]!]) {
417+
target[proj.field[i]!] = {}
418+
}
419+
target = target[proj.field[i]!]
420+
}
421+
target[proj.field[proj.field.length - 1]!] = value
422+
}
423+
return { correlationKey: compiledCorr(nsRow), parentContext }
424+
},
425+
})
426+
} else {
427+
includesRoutingFns.push({
428+
fieldName: subquery.fieldName,
429+
getRouting: (nsRow: any) => ({
430+
correlationKey: compiledCorrelation(nsRow),
431+
parentContext: null,
432+
}),
433+
})
434+
}
435+
379436
// Replace includes entry in select with a null placeholder
380437
replaceIncludesInSelect(query.select, key)
381438
}
382439

383-
// Stamp correlation key values onto the namespaced row so they survive
384-
// select extraction. This allows flushIncludesState to read them directly
385-
// without requiring the correlation field to be in the user's select.
386-
if (includesEntries.length > 0) {
387-
const compiledCorrelations = includesEntries.map(({ subquery }) => ({
388-
fieldName: subquery.fieldName,
389-
compiled: compileExpression(subquery.correlationField),
390-
}))
391-
pipeline = pipeline.pipe(
392-
map(([key, nsRow]: any) => {
393-
const correlationKeys: Record<string, unknown> = {}
394-
for (const { fieldName: fn, compiled } of compiledCorrelations) {
395-
correlationKeys[fn] = compiled(nsRow)
396-
}
397-
return [key, { ...nsRow, __includesCorrelationKeys: correlationKeys }]
398-
}),
399-
)
400-
}
401440
}
402441

403442
if (query.distinct && !query.fnSelect && !query.select) {
@@ -442,6 +481,25 @@ export function compileQuery(
442481
)
443482
}
444483

484+
// Tag $selected with routing metadata for includes.
485+
// This lets collection-config-builder extract routing info (correlationKey + parentContext)
486+
// from parent results without depending on the user's select.
487+
if (includesRoutingFns.length > 0) {
488+
pipeline = pipeline.pipe(
489+
map(([key, namespacedRow]: any) => {
490+
const routing: Record<
491+
string,
492+
{ correlationKey: unknown; parentContext: Record<string, any> | null }
493+
> = {}
494+
for (const { fieldName, getRouting } of includesRoutingFns) {
495+
routing[fieldName] = getRouting(namespacedRow)
496+
}
497+
namespacedRow.$selected[INCLUDES_ROUTING] = routing
498+
return [key, namespacedRow]
499+
}),
500+
)
501+
}
502+
445503
// Process the GROUP BY clause if it exists
446504
if (query.groupBy && query.groupBy.length > 0) {
447505
pipeline = processGroupBy(
@@ -531,16 +589,14 @@ export function compileQuery(
531589
// Extract the final results from $selected and include orderBy index
532590
const raw = (row as any).$selected
533591
const finalResults = unwrapValue(raw)
534-
// Stamp includes correlation keys onto the result for child routing
535-
if ((row as any).__includesCorrelationKeys) {
536-
finalResults.__includesCorrelationKeys = (
537-
row as any
538-
).__includesCorrelationKeys
539-
}
540-
// When in includes mode, embed the correlation key as third element
592+
// When in includes mode, embed the correlation key and parentContext
541593
if (parentKeyStream) {
542594
const correlationKey = (row as any)[mainSource]?.__correlationKey
543-
return [key, [finalResults, orderByIndex, correlationKey]] as any
595+
const parentContext = (row as any).__parentContext ?? null
596+
return [
597+
key,
598+
[finalResults, orderByIndex, correlationKey, parentContext],
599+
] as any
544600
}
545601
return [key, [finalResults, orderByIndex]] as [unknown, [any, string]]
546602
}),
@@ -570,16 +626,14 @@ export function compileQuery(
570626
// Extract the final results from $selected and return [key, [results, undefined]]
571627
const raw = (row as any).$selected
572628
const finalResults = unwrapValue(raw)
573-
// Stamp includes correlation keys onto the result for child routing
574-
if ((row as any).__includesCorrelationKeys) {
575-
finalResults.__includesCorrelationKeys = (
576-
row as any
577-
).__includesCorrelationKeys
578-
}
579-
// When in includes mode, embed the correlation key as third element
629+
// When in includes mode, embed the correlation key and parentContext
580630
if (parentKeyStream) {
581631
const correlationKey = (row as any)[mainSource]?.__correlationKey
582-
return [key, [finalResults, undefined, correlationKey]] as any
632+
const parentContext = (row as any).__parentContext ?? null
633+
return [
634+
key,
635+
[finalResults, undefined, correlationKey, parentContext],
636+
] as any
583637
}
584638
return [key, [finalResults, undefined]] as [
585639
unknown,

packages/db/src/query/live/collection-config-builder.ts

Lines changed: 70 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { D2, output, serializeValue } from '@tanstack/db-ivm'
2-
import { compileQuery } from '../compiler/index.js'
2+
import { INCLUDES_ROUTING, compileQuery } from '../compiler/index.js'
33
import { createCollection } from '../../collection/index.js'
44
import { IncludesSubquery } from '../ir.js'
55
import { buildQuery, getQueryIR } from '../builder/index.js'
@@ -831,14 +831,21 @@ export class CollectionConfigBuilder<
831831
syncState.messagesCount += messages.length
832832

833833
for (const [[childKey, tupleData], multiplicity] of messages) {
834-
const [childResult, _orderByIndex, correlationKey] =
835-
tupleData as unknown as [any, string | undefined, unknown]
836-
837-
// Accumulate by [correlationKey, childKey]
838-
let byChild = state.pendingChildChanges.get(correlationKey)
834+
const [childResult, _orderByIndex, correlationKey, parentContext] =
835+
tupleData as unknown as [
836+
any,
837+
string | undefined,
838+
unknown,
839+
Record<string, any> | null,
840+
]
841+
842+
const routingKey = computeRoutingKey(correlationKey, parentContext)
843+
844+
// Accumulate by [routingKey, childKey]
845+
let byChild = state.pendingChildChanges.get(routingKey)
839846
if (!byChild) {
840847
byChild = new Map()
841-
state.pendingChildChanges.set(correlationKey, byChild)
848+
state.pendingChildChanges.set(routingKey, byChild)
842849
}
843850

844851
const existing = byChild.get(childKey) || {
@@ -1351,13 +1358,20 @@ function setupNestedPipelines(
13511358
syncState.messagesCount += messages.length
13521359

13531360
for (const [[childKey, tupleData], multiplicity] of messages) {
1354-
const [childResult, _orderByIndex, correlationKey] =
1355-
tupleData as unknown as [any, string | undefined, unknown]
1361+
const [childResult, _orderByIndex, correlationKey, parentContext] =
1362+
tupleData as unknown as [
1363+
any,
1364+
string | undefined,
1365+
unknown,
1366+
Record<string, any> | null,
1367+
]
1368+
1369+
const routingKey = computeRoutingKey(correlationKey, parentContext)
13561370

1357-
let byChild = buffer.get(correlationKey)
1371+
let byChild = buffer.get(routingKey)
13581372
if (!byChild) {
13591373
byChild = new Map()
1360-
buffer.set(correlationKey, byChild)
1374+
buffer.set(routingKey, byChild)
13611375
}
13621376

13631377
const existing = byChild.get(childKey) || {
@@ -1499,11 +1513,12 @@ function updateRoutingIndex(
14991513
for (const setup of state.nestedSetups) {
15001514
for (const [, change] of childChanges) {
15011515
if (change.inserts > 0) {
1502-
// Read the pre-computed nested correlation key from the compiler stamp
1503-
const nestedCorrelationKey =
1504-
change.value.__includesCorrelationKeys?.[
1516+
// Read the nested correlation key from the INCLUDES_ROUTING stamp
1517+
const nestedRouting =
1518+
change.value[INCLUDES_ROUTING]?.[
15051519
setup.compilationResult.fieldName
15061520
]
1521+
const nestedCorrelationKey = nestedRouting?.correlationKey
15071522

15081523
if (nestedCorrelationKey != null) {
15091524
state.nestedRoutingIndex!.set(nestedCorrelationKey, correlationKey)
@@ -1516,10 +1531,11 @@ function updateRoutingIndex(
15161531
}
15171532
} else if (change.deletes > 0 && change.inserts === 0) {
15181533
// Remove from routing index
1519-
const nestedCorrelationKey =
1520-
change.value.__includesCorrelationKeys?.[
1534+
const nestedRouting2 =
1535+
change.value[INCLUDES_ROUTING]?.[
15211536
setup.compilationResult.fieldName
15221537
]
1538+
const nestedCorrelationKey = nestedRouting2?.correlationKey
15231539

15241540
if (nestedCorrelationKey != null) {
15251541
state.nestedRoutingIndex!.delete(nestedCorrelationKey)
@@ -1568,6 +1584,19 @@ function hasNestedBufferChanges(setups: Array<NestedIncludesSetup>): boolean {
15681584
return false
15691585
}
15701586

1587+
/**
1588+
* Computes a composite routing key from correlation key and parent context.
1589+
* When parentContext is null (no parent filters), returns the raw correlationKey
1590+
* for zero behavioral change on existing queries.
1591+
*/
1592+
function computeRoutingKey(
1593+
correlationKey: unknown,
1594+
parentContext: Record<string, any> | null,
1595+
): unknown {
1596+
if (parentContext == null) return correlationKey
1597+
return JSON.stringify([correlationKey, parentContext])
1598+
}
1599+
15711600
/**
15721601
* Creates a child Collection entry for includes subqueries.
15731602
* The child Collection is a full-fledged Collection instance that starts syncing immediately.
@@ -1640,33 +1669,35 @@ function flushIncludesState(
16401669
for (const [parentKey, changes] of parentChanges) {
16411670
if (changes.inserts > 0) {
16421671
const parentResult = changes.value
1643-
// Read the pre-computed correlation key from the compiler stamp
1644-
const correlationKey =
1645-
parentResult.__includesCorrelationKeys?.[state.fieldName]
1672+
// Extract routing info from INCLUDES_ROUTING symbol (set by compiler)
1673+
const routing = parentResult[INCLUDES_ROUTING]?.[state.fieldName]
1674+
const correlationKey = routing?.correlationKey
1675+
const parentContext = routing?.parentContext ?? null
1676+
const routingKey = computeRoutingKey(correlationKey, parentContext)
16461677

16471678
if (correlationKey != null) {
1648-
// Ensure child Collection exists for this correlation key
1649-
if (!state.childRegistry.has(correlationKey)) {
1679+
// Ensure child Collection exists for this routing key
1680+
if (!state.childRegistry.has(routingKey)) {
16501681
const entry = createChildCollectionEntry(
16511682
parentId,
16521683
state.fieldName,
1653-
correlationKey,
1684+
routingKey,
16541685
state.hasOrderBy,
16551686
state.nestedSetups,
16561687
)
1657-
state.childRegistry.set(correlationKey, entry)
1688+
state.childRegistry.set(routingKey, entry)
16581689
}
1659-
// Update reverse index: correlation key → parent keys
1660-
let parentKeys = state.correlationToParentKeys.get(correlationKey)
1690+
// Update reverse index: routing key → parent keys
1691+
let parentKeys = state.correlationToParentKeys.get(routingKey)
16611692
if (!parentKeys) {
16621693
parentKeys = new Set()
1663-
state.correlationToParentKeys.set(correlationKey, parentKeys)
1694+
state.correlationToParentKeys.set(routingKey, parentKeys)
16641695
}
16651696
parentKeys.add(parentKey)
16661697

16671698
// Attach child Collection to the parent result
16681699
parentResult[state.fieldName] =
1669-
state.childRegistry.get(correlationKey)!.collection
1700+
state.childRegistry.get(routingKey)!.collection
16701701
}
16711702
}
16721703
}
@@ -1769,17 +1800,21 @@ function flushIncludesState(
17691800
if (parentChanges) {
17701801
for (const [parentKey, changes] of parentChanges) {
17711802
if (changes.deletes > 0 && changes.inserts === 0) {
1772-
const correlationKey =
1773-
changes.value.__includesCorrelationKeys?.[state.fieldName]
1803+
const routing =
1804+
changes.value[INCLUDES_ROUTING]?.[state.fieldName]
1805+
const correlationKey = routing?.correlationKey
1806+
const parentContext = routing?.parentContext ?? null
1807+
const routingKey = computeRoutingKey(correlationKey, parentContext)
17741808
if (correlationKey != null) {
1775-
cleanRoutingIndexOnDelete(state, correlationKey)
1776-
state.childRegistry.delete(correlationKey)
1809+
cleanRoutingIndexOnDelete(state, routingKey)
1810+
state.childRegistry.delete(routingKey)
17771811
// Clean up reverse index
1778-
const parentKeys = state.correlationToParentKeys.get(correlationKey)
1812+
const parentKeys =
1813+
state.correlationToParentKeys.get(routingKey)
17791814
if (parentKeys) {
17801815
parentKeys.delete(parentKey)
17811816
if (parentKeys.size === 0) {
1782-
state.correlationToParentKeys.delete(correlationKey)
1817+
state.correlationToParentKeys.delete(routingKey)
17831818
}
17841819
}
17851820
}
@@ -1788,10 +1823,10 @@ function flushIncludesState(
17881823
}
17891824
}
17901825

1791-
// Clean up the internal stamp from parent/child results so it doesn't leak to the user
1826+
// Clean up the internal routing stamp from parent/child results
17921827
if (parentChanges) {
17931828
for (const [, changes] of parentChanges) {
1794-
delete changes.value.__includesCorrelationKeys
1829+
delete changes.value[INCLUDES_ROUTING]
17951830
}
17961831
}
17971832
}

0 commit comments

Comments
 (0)