Skip to content

Commit b162556

Browse files
authored
fix stuck loading state after an electric must-refetch (#532)
1 parent 5d148fa commit b162556

4 files changed

Lines changed: 362 additions & 7 deletions

File tree

.changeset/wet-camels-brush.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@tanstack/electric-db-collection": patch
3+
"@tanstack/db": patch
4+
---
5+
6+
Fixed a bug where a live query could get stuck in "loading" state, or show incomplete data, when an electric "must-refetch" message arrived before the first "up-to-date".

packages/db/src/collection.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -395,13 +395,14 @@ export class CollectionImpl<
395395
const callbacks = [...this.onFirstReadyCallbacks]
396396
this.onFirstReadyCallbacks = []
397397
callbacks.forEach((callback) => callback())
398-
399-
// to notify subscribers (like LiveQueryCollection) that the collection is ready
400-
if (this.changeListeners.size > 0) {
401-
this.emitEmptyReadyEvent()
402-
}
403398
}
404399
}
400+
401+
// Always notify dependents when markReady is called, after status is set
402+
// This ensures live queries get notified when their dependencies become ready
403+
if (this.changeListeners.size > 0) {
404+
this.emitEmptyReadyEvent()
405+
}
405406
}
406407

407408
public id = ``
@@ -1270,6 +1271,13 @@ export class CollectionImpl<
12701271
this.syncedData.clear()
12711272
this.syncedMetadata.clear()
12721273
this.syncedKeys.clear()
1274+
1275+
// 3) Clear currentVisibleState for truncated keys to ensure subsequent operations
1276+
// are compared against the post-truncate state (undefined) rather than pre-truncate state
1277+
// This ensures that re-inserted keys are emitted as INSERT events, not UPDATE events
1278+
for (const key of changedKeys) {
1279+
currentVisibleState.delete(key)
1280+
}
12731281
}
12741282

12751283
for (const operation of transaction.operations) {

packages/db/tests/collection-subscribe-changes.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1169,8 +1169,8 @@ describe(`Collection.subscribeChanges`, () => {
11691169
f.write({ type: `insert`, value: { id: 1, value: `server-after` } })
11701170
f.commit()
11711171

1172-
// Expect delete then insert with optimistic value
1173-
expect(changeEvents.length).toBe(2)
1172+
// Expect delete, insert with optimistic value, and an empty event from markReady
1173+
expect(changeEvents.length).toBe(3)
11741174
expect(changeEvents[0]).toEqual({
11751175
type: `delete`,
11761176
key: 1,
Lines changed: 341 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,341 @@
1+
import { beforeEach, describe, expect, it, vi } from "vitest"
2+
import {
3+
createCollection,
4+
createLiveQueryCollection,
5+
eq,
6+
gt,
7+
} from "@tanstack/db"
8+
import { electricCollectionOptions } from "../src/electric"
9+
import type { ElectricCollectionUtils } from "../src/electric"
10+
import type { Collection } from "@tanstack/db"
11+
import type { Message } from "@electric-sql/client"
12+
import type { StandardSchemaV1 } from "@standard-schema/spec"
13+
14+
// Sample user type for tests
15+
type User = {
16+
id: number
17+
name: string
18+
age: number
19+
email: string
20+
active: boolean
21+
}
22+
23+
// Sample data for tests
24+
const sampleUsers: Array<User> = [
25+
{
26+
id: 1,
27+
name: `Alice`,
28+
age: 25,
29+
30+
active: true,
31+
},
32+
{
33+
id: 2,
34+
name: `Bob`,
35+
age: 19,
36+
37+
active: true,
38+
},
39+
{
40+
id: 3,
41+
name: `Charlie`,
42+
age: 30,
43+
44+
active: false,
45+
},
46+
{
47+
id: 4,
48+
name: `Dave`,
49+
age: 22,
50+
51+
active: true,
52+
},
53+
]
54+
55+
// Mock the ShapeStream module
56+
const mockSubscribe = vi.fn()
57+
const mockStream = {
58+
subscribe: mockSubscribe,
59+
}
60+
61+
vi.mock(`@electric-sql/client`, async () => {
62+
const actual = await vi.importActual(`@electric-sql/client`)
63+
return {
64+
...actual,
65+
ShapeStream: vi.fn(() => mockStream),
66+
}
67+
})
68+
69+
describe.each([
70+
[`autoIndex enabled (default)`, `eager` as const],
71+
[`autoIndex disabled`, `off` as const],
72+
])(`Electric Collection with Live Query - %s`, (description, autoIndex) => {
73+
let electricCollection: Collection<
74+
User,
75+
string | number,
76+
ElectricCollectionUtils,
77+
StandardSchemaV1<unknown, unknown>,
78+
User
79+
>
80+
let subscriber: (messages: Array<Message<User>>) => void
81+
82+
function createElectricUsersCollection() {
83+
vi.clearAllMocks()
84+
85+
// Reset mock subscriber
86+
mockSubscribe.mockImplementation((callback) => {
87+
subscriber = callback
88+
return () => {}
89+
})
90+
91+
// Create Electric collection with specified autoIndex
92+
const config = {
93+
id: `electric-users`,
94+
shapeOptions: {
95+
url: `http://test-url`,
96+
params: {
97+
table: `users`,
98+
},
99+
},
100+
getKey: (user: User) => user.id,
101+
autoIndex,
102+
}
103+
104+
const options = electricCollectionOptions(config)
105+
return createCollection({
106+
...options,
107+
startSync: true,
108+
})
109+
}
110+
111+
function simulateInitialSync(users: Array<User> = sampleUsers) {
112+
const messages: Array<Message<User>> = users.map((user) => ({
113+
key: user.id.toString(),
114+
value: user,
115+
headers: { operation: `insert` },
116+
}))
117+
118+
messages.push({
119+
headers: { control: `up-to-date` },
120+
})
121+
122+
subscriber(messages)
123+
}
124+
125+
function simulateMustRefetch() {
126+
subscriber([
127+
{
128+
headers: { control: `must-refetch` },
129+
},
130+
])
131+
}
132+
133+
function simulateResync(users: Array<User>) {
134+
const messages: Array<Message<User>> = users.map((user) => ({
135+
key: user.id.toString(),
136+
value: user,
137+
headers: { operation: `insert` },
138+
}))
139+
140+
messages.push({
141+
headers: { control: `up-to-date` },
142+
})
143+
144+
subscriber(messages)
145+
}
146+
147+
beforeEach(() => {
148+
electricCollection = createElectricUsersCollection()
149+
})
150+
151+
it(`should handle basic must-refetch with filtered live query`, () => {
152+
// Create a live query with WHERE clause
153+
const activeLiveQuery = createLiveQueryCollection({
154+
id: `active-users-live-query`,
155+
startSync: true,
156+
query: (q) =>
157+
q
158+
.from({ user: electricCollection })
159+
.where(({ user }) => eq(user.active, true))
160+
.select(({ user }) => ({
161+
id: user.id,
162+
name: user.name,
163+
active: user.active,
164+
})),
165+
})
166+
167+
// Initial sync
168+
simulateInitialSync()
169+
expect(electricCollection.status).toBe(`ready`)
170+
expect(electricCollection.size).toBe(4)
171+
expect(activeLiveQuery.status).toBe(`ready`)
172+
expect(activeLiveQuery.size).toBe(3) // Only active users
173+
174+
// Must-refetch and resync with updated data
175+
simulateMustRefetch()
176+
const updatedUsers = [
177+
{
178+
id: 1,
179+
name: `Alice Updated`,
180+
age: 26,
181+
182+
active: true,
183+
},
184+
{ id: 5, name: `Eve`, age: 24, email: `[email protected]`, active: true },
185+
{
186+
id: 6,
187+
name: `Frank`,
188+
age: 35,
189+
190+
active: false,
191+
},
192+
]
193+
simulateResync(updatedUsers)
194+
195+
// BUG: Live query should have 2 active users but only shows 1
196+
expect(electricCollection.status).toBe(`ready`)
197+
expect(electricCollection.size).toBe(3)
198+
expect(activeLiveQuery.status).toBe(`ready`)
199+
expect(activeLiveQuery.size).toBe(2) // Only active users (Alice Updated and Eve)
200+
})
201+
202+
it(`should handle must-refetch with complex projections`, () => {
203+
const complexLiveQuery = createLiveQueryCollection({
204+
startSync: true,
205+
query: (q) =>
206+
q
207+
.from({ user: electricCollection })
208+
.where(({ user }) => gt(user.age, 18))
209+
.select(({ user }) => ({
210+
userId: user.id,
211+
displayName: user.name,
212+
isAdult: user.age,
213+
})),
214+
})
215+
216+
// Initial sync and must-refetch
217+
simulateInitialSync()
218+
simulateMustRefetch()
219+
220+
const newUsers = [
221+
{
222+
id: 9,
223+
name: `Iris`,
224+
age: 30,
225+
226+
active: false,
227+
},
228+
{
229+
id: 10,
230+
name: `Jack`,
231+
age: 17,
232+
233+
active: true,
234+
}, // Under 18, filtered
235+
]
236+
simulateResync(newUsers)
237+
238+
expect(complexLiveQuery.status).toBe(`ready`)
239+
expect(complexLiveQuery.size).toBe(1) // Only Iris (Jack filtered by age)
240+
expect(complexLiveQuery.get(9)).toMatchObject({
241+
userId: 9,
242+
displayName: `Iris`,
243+
isAdult: 30,
244+
})
245+
})
246+
247+
it(`should handle rapid must-refetch sequences`, () => {
248+
const liveQuery = createLiveQueryCollection({
249+
startSync: true,
250+
query: (q) => q.from({ user: electricCollection }),
251+
})
252+
253+
// Initial sync
254+
simulateInitialSync()
255+
expect(liveQuery.size).toBe(4)
256+
257+
// Multiple rapid must-refetch messages
258+
simulateMustRefetch()
259+
simulateMustRefetch()
260+
simulateMustRefetch()
261+
262+
// Final resync
263+
const newUsers = [
264+
{
265+
id: 10,
266+
name: `New User`,
267+
age: 20,
268+
269+
active: true,
270+
},
271+
]
272+
simulateResync(newUsers)
273+
274+
expect(electricCollection.status).toBe(`ready`)
275+
expect(liveQuery.status).toBe(`ready`)
276+
expect(liveQuery.size).toBe(1)
277+
})
278+
279+
it(`should handle live query becoming ready after must-refetch during initial sync`, () => {
280+
// Test that live queries properly transition to ready state when must-refetch
281+
// occurs during the initial sync of the source Electric collection
282+
283+
let testSubscriber: (messages: Array<Message<User>>) => void = () => {}
284+
vi.clearAllMocks()
285+
mockSubscribe.mockImplementation((callback) => {
286+
testSubscriber = callback
287+
return () => {}
288+
})
289+
290+
// Create Electric collection
291+
const testElectricCollection = createCollection({
292+
...electricCollectionOptions({
293+
id: `initial-sync-collection`,
294+
shapeOptions: {
295+
url: `http://test-url`,
296+
params: {
297+
table: `users`,
298+
},
299+
},
300+
getKey: (user: User) => user.id,
301+
}),
302+
autoIndex,
303+
startSync: true,
304+
})
305+
306+
// Send initial data but don't complete sync (no up-to-date)
307+
testSubscriber([
308+
{
309+
key: `1`,
310+
value: {
311+
id: 1,
312+
name: `Alice`,
313+
age: 25,
314+
315+
active: true,
316+
},
317+
headers: { operation: `insert` },
318+
},
319+
])
320+
321+
expect(testElectricCollection.status).toBe(`loading`)
322+
323+
// Create live query while Electric collection is still loading
324+
const liveQuery = createLiveQueryCollection({
325+
startSync: true,
326+
query: (q) => q.from({ user: testElectricCollection }),
327+
})
328+
329+
expect(liveQuery.status).toBe(`loading`)
330+
331+
// Send must-refetch while collection is in loading state
332+
testSubscriber([{ headers: { control: `must-refetch` } }])
333+
334+
// Complete the sync
335+
testSubscriber([{ headers: { control: `up-to-date` } }])
336+
337+
// Both Electric collection and live query should be ready
338+
expect(testElectricCollection.status).toBe(`ready`)
339+
expect(liveQuery.status).toBe(`ready`)
340+
})
341+
})

0 commit comments

Comments
 (0)