Skip to content

Commit 29033b8

Browse files
KyleAMathewsclaudeautofix-ci[bot]github-actions[bot]kevin-dp
authored
fix(db): batch D2 output callbacks to prevent duplicate key errors in joins (#1114)
* fix(db): batch D2 output callbacks to prevent duplicate key errors in joins When D2's incremental join processes data, it can produce multiple outputs during a single graph.run() iteration - first a partial join result, then the full result plus a delete of the partial. Previously, each output callback had its own begin()/commit() cycle, causing the second insert for the same key to fail with DuplicateKeySyncError. This fix: - Accumulates all changes from output callbacks into a pendingChanges Map - Flushes accumulated changes in a single transaction after each graph.run() - Adds subscribedToAllCollections check to updateLiveQueryStatus() to ensure markReady() is only called after the graph has processed data - Properly types flushPendingChanges on SyncState to avoid type assertions Co-Authored-By: Claude Opus 4.5 <[email protected]> * ci: apply automated fixes * chore: add changeset for live query join fix Co-Authored-By: Claude Opus 4.5 <[email protected]> * chore: update changeset to mention isReady fix Co-Authored-By: Claude Opus 4.5 <[email protected]> * refactor: restore accumulateChanges helper function Restore the accumulateChanges helper function instead of inlining its logic. This improves code readability and maintainability by keeping the accumulation logic in a separate, testable function. Co-authored-by: Kevin <[email protected]> --------- Co-authored-by: Claude Opus 4.5 <[email protected]> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: claude[bot] <41898282+claude[bot]@users.noreply.github.com> Co-authored-by: Kevin <[email protected]>
1 parent 2456adb commit 29033b8

5 files changed

Lines changed: 68 additions & 24 deletions

File tree

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@tanstack/db': patch
3+
---
4+
5+
Fix `isReady()` returning `true` while `toArray()` returns empty results. The status now correctly waits until data has been processed through the graph before marking ready.
6+
7+
Also fix duplicate key errors when live queries use joins with custom `getKey` functions. D2's incremental join can produce multiple outputs for the same key during a single graph run; this change batches all outputs into a single transaction to prevent conflicts.

packages/db/src/collection/sync.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,10 @@ export class CollectionSyncManager<
134134
!isTruncateTransaction
135135
) {
136136
const existingValue = this.state.syncedData.get(key)
137-
if (
137+
const valuesEqual =
138138
existingValue !== undefined &&
139139
deepEquals(existingValue, messageWithOptionalKey.value)
140-
) {
140+
if (valuesEqual) {
141141
// The "insert" is an echo of a value we already have locally.
142142
// Treat it as an update so we preserve optimistic intent without
143143
// throwing a duplicate-key error during reconciliation.

packages/db/src/query/live/collection-config-builder.ts

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,10 @@ export class CollectionConfigBuilder<
337337
if (syncState.subscribedToAllCollections) {
338338
while (syncState.graph.pendingWork()) {
339339
syncState.graph.run()
340+
// Flush accumulated changes after each graph step to commit them as one transaction.
341+
// This ensures intermediate join states (like null on one side) don't cause
342+
// duplicate key errors when the full join result arrives in the same step.
343+
syncState.flushPendingChanges?.()
340344
callback?.()
341345
}
342346

@@ -345,10 +349,14 @@ export class CollectionConfigBuilder<
345349
if (syncState.messagesCount === 0) {
346350
begin()
347351
commit()
348-
// After initial commit, check if we should mark ready
349-
// (in case all sources were already ready before we subscribed)
350-
this.updateLiveQueryStatus(this.currentSyncConfig)
351352
}
353+
354+
// After graph processing completes, check if we should mark ready.
355+
// This is the canonical place to transition to ready state because:
356+
// 1. All data has been processed through the graph
357+
// 2. All source collections have had a chance to send their initial data
358+
// This prevents marking ready before data is processed (fixes isReady=true with empty data)
359+
this.updateLiveQueryStatus(this.currentSyncConfig)
352360
}
353361
} finally {
354362
this.isGraphRunning = false
@@ -687,22 +695,35 @@ export class CollectionConfigBuilder<
687695
const { begin, commit } = config
688696
const { graph, inputs, pipeline } = this.maybeCompileBasePipeline()
689697

698+
// Accumulator for changes across all output callbacks within a single graph run.
699+
// This allows us to batch all changes from intermediate join states into a single
700+
// transaction, avoiding duplicate key errors when joins produce multiple outputs
701+
// for the same key (e.g., first output with null, then output with joined data).
702+
let pendingChanges: Map<unknown, Changes<TResult>> = new Map()
703+
690704
pipeline.pipe(
691705
output((data) => {
692706
const messages = data.getInner()
693707
syncState.messagesCount += messages.length
694708

695-
begin()
696-
messages
697-
.reduce(
698-
accumulateChanges<TResult>,
699-
new Map<unknown, Changes<TResult>>(),
700-
)
701-
.forEach(this.applyChanges.bind(this, config))
702-
commit()
709+
// Accumulate changes from this output callback into the pending changes map.
710+
// Changes for the same key are merged (inserts/deletes are added together).
711+
messages.reduce(accumulateChanges<TResult>, pendingChanges)
703712
}),
704713
)
705714

715+
// Flush pending changes and reset the accumulator.
716+
// Called at the end of each graph run to commit all accumulated changes.
717+
syncState.flushPendingChanges = () => {
718+
if (pendingChanges.size === 0) {
719+
return
720+
}
721+
begin()
722+
pendingChanges.forEach(this.applyChanges.bind(this, config))
723+
commit()
724+
pendingChanges = new Map()
725+
}
726+
706727
graph.finalize()
707728

708729
// Extend the sync state with the graph, inputs, and pipeline
@@ -808,11 +829,14 @@ export class CollectionConfigBuilder<
808829
return
809830
}
810831

811-
// Mark ready when all source collections are ready AND
812-
// the live query collection is not loading subset data.
813-
// This prevents marking the live query ready before its data is loaded
832+
// Mark ready when:
833+
// 1. All subscriptions are set up (subscribedToAllCollections)
834+
// 2. All source collections are ready
835+
// 3. The live query collection is not loading subset data
836+
// This prevents marking the live query ready before its data is processed
814837
// (fixes issue where useLiveQuery returns isReady=true with empty data)
815838
if (
839+
this.currentSyncState?.subscribedToAllCollections &&
816840
this.allCollectionsReady() &&
817841
!this.liveQueryCollection?.isLoadingSubset
818842
) {
@@ -913,8 +937,10 @@ export class CollectionConfigBuilder<
913937
// (graph only runs when all collections are subscribed)
914938
syncState.subscribedToAllCollections = true
915939

916-
// Initial status check after all subscriptions are set up
917-
this.updateLiveQueryStatus(config)
940+
// Note: We intentionally don't call updateLiveQueryStatus() here.
941+
// The graph hasn't run yet, so marking ready would be premature.
942+
// The canonical place to mark ready is after the graph processes data
943+
// in maybeRunGraph(), which ensures data has been processed first.
918944

919945
return loadSubsetDataCallbacks
920946
}
@@ -1096,8 +1122,11 @@ function accumulateChanges<T>(
10961122
changes.deletes += Math.abs(multiplicity)
10971123
} else if (multiplicity > 0) {
10981124
changes.inserts += multiplicity
1125+
// Update value to the latest version for this key
10991126
changes.value = value
1100-
changes.orderByIndex = orderByIndex
1127+
if (orderByIndex !== undefined) {
1128+
changes.orderByIndex = orderByIndex
1129+
}
11011130
}
11021131
acc.set(key, changes)
11031132
return acc

packages/db/src/query/live/types.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ export type SyncState = {
2222
graph?: D2
2323
inputs?: Record<string, RootStreamBuilder<unknown>>
2424
pipeline?: ResultStream
25+
flushPendingChanges?: () => void
2526
}
2627

27-
export type FullSyncState = Required<SyncState>
28+
export type FullSyncState = Required<Omit<SyncState, `flushPendingChanges`>> &
29+
Pick<SyncState, `flushPendingChanges`>
2830

2931
/**
3032
* Configuration interface for live query collection options

packages/db/tests/query/live-query-collection.test.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2293,10 +2293,11 @@ describe(`createLiveQueryCollection`, () => {
22932293
.limit(10),
22942294
)
22952295

2296-
// Trigger sync which will call loadSubset
2297-
await liveQueryCollection.preload()
2296+
// Start preload (don't await yet - it won't resolve until loadSubset completes)
2297+
const preloadPromise = liveQueryCollection.preload()
22982298
await flushPromises()
22992299

2300+
// Verify loadSubset was called with the correct options
23002301
expect(capturedOptions.length).toBeGreaterThan(0)
23012302

23022303
// Find the call that has orderBy (the limited snapshot request)
@@ -2308,8 +2309,10 @@ describe(`createLiveQueryCollection`, () => {
23082309
expect(callWithOrderBy?.orderBy?.[0]?.expression.type).toBe(`ref`)
23092310
expect(callWithOrderBy?.limit).toBe(10)
23102311

2312+
// Resolve the loadSubset promise so preload can complete
23112313
resolveLoadSubset!()
23122314
await flushPromises()
2315+
await preloadPromise
23132316
})
23142317

23152318
it(`passes multiple orderBy columns to loadSubset when using limit`, async () => {
@@ -2350,10 +2353,11 @@ describe(`createLiveQueryCollection`, () => {
23502353
.limit(10),
23512354
)
23522355

2353-
// Trigger sync which will call loadSubset
2354-
await liveQueryCollection.preload()
2356+
// Start preload (don't await yet - it won't resolve until loadSubset completes)
2357+
const preloadPromise = liveQueryCollection.preload()
23552358
await flushPromises()
23562359

2360+
// Verify loadSubset was called with the correct options
23572361
expect(capturedOptions.length).toBeGreaterThan(0)
23582362

23592363
// Find the call that has orderBy with multiple columns
@@ -2369,8 +2373,10 @@ describe(`createLiveQueryCollection`, () => {
23692373
expect(callWithMultiOrderBy?.orderBy?.[1]?.expression.type).toBe(`ref`)
23702374
expect(callWithMultiOrderBy?.limit).toBe(10)
23712375

2376+
// Resolve the loadSubset promise so preload can complete
23722377
resolveLoadSubset!()
23732378
await flushPromises()
2379+
await preloadPromise
23742380
})
23752381
})
23762382
})

0 commit comments

Comments
 (0)