Skip to content

Commit 48d0889

Browse files
authored
fix race condition in loading joined collections into live query (#451)
1 parent 79c95a3 commit 48d0889

4 files changed

Lines changed: 202 additions & 9 deletions

File tree

.changeset/new-swans-heal.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@tanstack/db": patch
3+
---
4+
5+
fix a race condition that could result in the initial state of a joined collection being sent to the live query pipeline twice, this would result in incorrect join results.

packages/db/src/query/live/collection-subscriber.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ export class CollectionSubscriber<
113113
// filter out deletes for keys that have not been sent
114114
continue
115115
}
116+
this.sentKeys.add(change.key)
116117
}
117118
newChanges.push(newChange)
118119
}
@@ -153,12 +154,29 @@ export class CollectionSubscriber<
153154
private subscribeToMatchingChanges(
154155
whereExpression: BasicExpression<boolean> | undefined
155156
) {
157+
// Flag to indicate we have send to whole initial state of the collection
158+
// to the pipeline, this is set when there are no indexes that can be used
159+
// to filter the changes and so the whole state was requested from the collection
156160
let loadedInitialState = false
157161

162+
// Flag to indicate that we have started sending changes to the pipeline.
163+
// This is set to true by either the first call to `loadKeys` or when the
164+
// query requests the whole initial state in `loadInitialState`.
165+
// Until that point we filter out all changes from subscription to the collection.
166+
let sendChanges = false
167+
158168
const sendVisibleChanges = (
159169
changes: Array<ChangeMessage<any, string | number>>
160170
) => {
161-
this.sendVisibleChangesToPipeline(changes, loadedInitialState)
171+
// We filter out changes when sendChanges is false to ensure that we don't send
172+
// any changes from the live subscription until the join operator requests either
173+
// the initial state or its first key. This is needed otherwise it could receive
174+
// changes which are then later subsumed by the initial state (and that would
175+
// lead to weird bugs due to the data being received twice).
176+
this.sendVisibleChangesToPipeline(
177+
sendChanges ? changes : [],
178+
loadedInitialState
179+
)
162180
}
163181

164182
const unsubscribe = this.collection.subscribeChanges(sendVisibleChanges, {
@@ -171,6 +189,7 @@ export class CollectionSubscriber<
171189
? createFilterFunctionFromExpression(whereExpression)
172190
: () => true
173191
const loadKs = (keys: Set<string | number>) => {
192+
sendChanges = true
174193
return this.loadKeys(keys, filterFn)
175194
}
176195

@@ -183,6 +202,7 @@ export class CollectionSubscriber<
183202
// Make sure we only load the initial state once
184203
if (loadedInitialState) return
185204
loadedInitialState = true
205+
sendChanges = true
186206

187207
const changes = this.collection.currentStateAsChanges({
188208
whereExpression,

packages/db/tests/query/live-query-collection.test.ts

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ import { beforeEach, describe, expect, it } from "vitest"
22
import { createCollection } from "../../src/collection.js"
33
import { createLiveQueryCollection, eq } from "../../src/query/index.js"
44
import { Query } from "../../src/query/builder/index.js"
5-
import { mockSyncCollectionOptions } from "../utls.js"
5+
import {
6+
mockSyncCollectionOptions,
7+
mockSyncCollectionOptionsNoInitialState,
8+
} from "../utls.js"
69
import type { ChangeMessage } from "../../src/types.js"
710

811
// Sample user type for tests
@@ -313,4 +316,92 @@ describe(`createLiveQueryCollection`, () => {
313316
// Resubscribe should not throw (would throw "Graph already finalized" without the fix)
314317
expect(() => liveQuery.subscribeChanges(() => {})).not.toThrow()
315318
})
319+
320+
for (const autoIndex of [`eager`, `off`] as const) {
321+
it(`should not send the initial state twice on joins with autoIndex: ${autoIndex}`, async () => {
322+
type Player = { id: number; name: string }
323+
type Challenge = { id: number; value: number }
324+
325+
const playerCollection = createCollection(
326+
mockSyncCollectionOptionsNoInitialState<Player>({
327+
id: `player`,
328+
getKey: (post) => post.id,
329+
autoIndex,
330+
})
331+
)
332+
333+
const challenge1Collection = createCollection(
334+
mockSyncCollectionOptionsNoInitialState<Challenge>({
335+
id: `challenge1`,
336+
getKey: (post) => post.id,
337+
autoIndex,
338+
})
339+
)
340+
341+
const challenge2Collection = createCollection(
342+
mockSyncCollectionOptionsNoInitialState<Challenge>({
343+
id: `challenge2`,
344+
getKey: (post) => post.id,
345+
autoIndex,
346+
})
347+
)
348+
349+
const liveQuery = createLiveQueryCollection((q) =>
350+
q
351+
.from({ player: playerCollection })
352+
.leftJoin(
353+
{ challenge1: challenge1Collection },
354+
({ player, challenge1 }) => eq(player.id, challenge1.id)
355+
)
356+
.leftJoin(
357+
{ challenge2: challenge2Collection },
358+
({ player, challenge2 }) => eq(player.id, challenge2.id)
359+
)
360+
)
361+
362+
// Start the query, but don't wait it, we are doing to write the data to the
363+
// source collections while the query is loading the initial state
364+
const preloadPromise = liveQuery.preload()
365+
366+
// Write player
367+
playerCollection.utils.begin()
368+
playerCollection.utils.write({
369+
type: `insert`,
370+
value: { id: 1, name: `Alice` },
371+
})
372+
playerCollection.utils.commit()
373+
playerCollection.utils.markReady()
374+
375+
// Write challenge1
376+
challenge1Collection.utils.begin()
377+
challenge1Collection.utils.write({
378+
type: `insert`,
379+
value: { id: 1, value: 100 },
380+
})
381+
challenge1Collection.utils.commit()
382+
challenge1Collection.utils.markReady()
383+
384+
// Write challenge2
385+
challenge2Collection.utils.begin()
386+
challenge2Collection.utils.write({
387+
type: `insert`,
388+
value: { id: 1, value: 200 },
389+
})
390+
challenge2Collection.utils.commit()
391+
challenge2Collection.utils.markReady()
392+
393+
await preloadPromise
394+
395+
// With a failed test the results show more than 1 item
396+
// It returns both an unjoined player with no joined challenges, and a joined
397+
// player with the challenges
398+
const results = liveQuery.toArray
399+
expect(results.length).toBe(1)
400+
401+
const result = results[0]!
402+
expect(result.player.name).toBe(`Alice`)
403+
expect(result.challenge1?.value).toBe(100)
404+
expect(result.challenge2?.value).toBe(200)
405+
})
406+
}
316407
})

packages/db/tests/utls.ts

Lines changed: 84 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,6 @@ import type {
55
SyncConfig,
66
} from "../src/index.js"
77

8-
type MockSyncCollectionConfig<T> = {
9-
id: string
10-
initialData: Array<T>
11-
getKey: (item: T) => string | number
12-
autoIndex?: `off` | `eager`
13-
}
14-
158
// Index usage tracking utilities
169
export interface IndexUsageStats {
1710
rangeQueryCalls: number
@@ -179,6 +172,13 @@ export function withIndexTracking(
179172
}
180173
}
181174

175+
type MockSyncCollectionConfig<T> = {
176+
id: string
177+
initialData: Array<T>
178+
getKey: (item: T) => string | number
179+
autoIndex?: `off` | `eager`
180+
}
181+
182182
export function mockSyncCollectionOptions<
183183
T extends object = Record<string, unknown>,
184184
>(config: MockSyncCollectionConfig<T>) {
@@ -257,3 +257,80 @@ export function mockSyncCollectionOptions<
257257

258258
return options
259259
}
260+
261+
type MockSyncCollectionConfigNoInitialState<T> = {
262+
id: string
263+
getKey: (item: T) => string | number
264+
autoIndex?: `off` | `eager`
265+
}
266+
267+
export function mockSyncCollectionOptionsNoInitialState<
268+
T extends object = Record<string, unknown>,
269+
>(config: MockSyncCollectionConfigNoInitialState<T>) {
270+
let begin: () => void
271+
let write: Parameters<SyncConfig<T>[`sync`]>[0][`write`]
272+
let commit: () => void
273+
let markReady: () => void
274+
275+
let syncPendingPromise: Promise<void> | undefined
276+
let syncPendingResolve: (() => void) | undefined
277+
let syncPendingReject: ((error: Error) => void) | undefined
278+
279+
const awaitSync = async () => {
280+
if (syncPendingPromise) {
281+
return syncPendingPromise
282+
}
283+
syncPendingPromise = new Promise((resolve, reject) => {
284+
syncPendingResolve = resolve
285+
syncPendingReject = reject
286+
})
287+
syncPendingPromise.then(() => {
288+
syncPendingPromise = undefined
289+
syncPendingResolve = undefined
290+
syncPendingReject = undefined
291+
})
292+
return syncPendingPromise
293+
}
294+
295+
const utils = {
296+
begin: () => begin!(),
297+
write: ((value) => write!(value)) as typeof write,
298+
commit: () => commit!(),
299+
markReady: () => markReady!(),
300+
resolveSync: () => {
301+
syncPendingResolve!()
302+
},
303+
rejectSync: (error: Error) => {
304+
syncPendingReject!(error)
305+
},
306+
}
307+
308+
const options: CollectionConfig<T> & { utils: typeof utils } = {
309+
sync: {
310+
sync: (params: Parameters<SyncConfig<T>[`sync`]>[0]) => {
311+
begin = params.begin
312+
write = params.write
313+
commit = params.commit
314+
markReady = params.markReady
315+
},
316+
},
317+
startSync: false,
318+
onInsert: async (_params: MutationFnParams<T>) => {
319+
// TODO
320+
await awaitSync()
321+
},
322+
onUpdate: async (_params: MutationFnParams<T>) => {
323+
// TODO
324+
await awaitSync()
325+
},
326+
onDelete: async (_params: MutationFnParams<T>) => {
327+
// TODO
328+
await awaitSync()
329+
},
330+
utils,
331+
...config,
332+
autoIndex: config.autoIndex,
333+
}
334+
335+
return options
336+
}

0 commit comments

Comments
 (0)