Skip to content

Commit 5163087

Browse files
kevin-dpclaude
andcommitted
fix(browser): prevent stuck leadership state on acquireLeadership errors
Previously, `state.isLeader = true` was set before the setup code that calls `getStreamPosition()`. If `getStreamPosition` threw (e.g. due to a UNIQUE constraint violation from React StrictMode double-mounting), `isLeader` remained permanently stuck at `true` because the `finally` block that resets it was inside an inner try/finally that was never entered. Fix: Wrap the entire lock callback body in a single try/finally. Set `state.isLeader = true` only after successful setup (stream position restore and term increment). The finally block always runs and resets `isLeader = false` + cleans up the heartbeat timer. Also refactors the coordinator to support lazy adapter wiring via `setAdapter()`, allowing `createBrowserWASQLitePersistence` to inject the adapter after construction. This enables the demo to construct the coordinator without requiring the adapter upfront. Co-Authored-By: Claude Opus 4.6 <[email protected]>
1 parent 6809715 commit 5163087

2 files changed

Lines changed: 56 additions & 26 deletions

File tree

packages/db-browser-wa-sqlite-persisted-collection/src/browser-coordinator.ts

Lines changed: 48 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ type AdapterWithPullSince = PersistenceAdapter<
113113

114114
export type BrowserCollectionCoordinatorOptions = {
115115
dbName: string
116-
adapter: AdapterWithPullSince
116+
adapter?: AdapterWithPullSince
117117
}
118118

119119
// ---------------------------------------------------------------------------
@@ -123,7 +123,7 @@ export type BrowserCollectionCoordinatorOptions = {
123123
export class BrowserCollectionCoordinator implements PersistedCollectionCoordinator {
124124
private readonly nodeId = crypto.randomUUID()
125125
private readonly dbName: string
126-
private readonly adapter: AdapterWithPullSince
126+
private adapter: AdapterWithPullSince | null
127127
private readonly channel: BroadcastChannel
128128
private readonly collections = new Map<string, CollectionState>()
129129
private readonly pendingRPCs = new Map<string, PendingRPC>()
@@ -135,15 +135,33 @@ export class BrowserCollectionCoordinator implements PersistedCollectionCoordina
135135
return this.disposed
136136
}
137137

138+
private requireAdapter(): AdapterWithPullSince {
139+
if (!this.adapter) {
140+
throw new Error(
141+
`BrowserCollectionCoordinator: adapter not set. Call setAdapter() before using leader-side operations.`,
142+
)
143+
}
144+
return this.adapter
145+
}
146+
138147
constructor(options: BrowserCollectionCoordinatorOptions) {
139148
this.dbName = options.dbName
140-
this.adapter = options.adapter
149+
this.adapter = options.adapter ?? null
141150
this.channel = new BroadcastChannel(`tsdb:coord:${this.dbName}`)
142151
this.channel.onmessage = (event: MessageEvent) => {
143152
this.onChannelMessage(event.data)
144153
}
145154
}
146155

156+
/**
157+
* Set or replace the persistence adapter used for leader-side RPC handling.
158+
* Called by `createBrowserWASQLitePersistence` to wire the internally-created
159+
* adapter into the coordinator.
160+
*/
161+
setAdapter(adapter: AdapterWithPullSince): void {
162+
this.adapter = adapter
163+
}
164+
147165
// -----------------------------------------------------------------------
148166
// PersistedCollectionCoordinator interface
149167
// -----------------------------------------------------------------------
@@ -207,7 +225,7 @@ export class BrowserCollectionCoordinator implements PersistedCollectionCoordina
207225
spec: PersistedIndexSpec,
208226
): Promise<void> {
209227
if (this.isLeader(collectionId)) {
210-
await this.adapter.ensureIndex(collectionId, signature, spec)
228+
await this.requireAdapter().ensureIndex(collectionId, signature, spec)
211229
return
212230
}
213231

@@ -328,26 +346,27 @@ export class BrowserCollectionCoordinator implements PersistedCollectionCoordina
328346
lockName,
329347
{ signal: abortController.signal },
330348
async () => {
331-
if (this.disposed) return
332-
333-
state.isLeader = true
334-
335-
// Restore stream position from DB
336-
if (this.adapter.getStreamPosition) {
337-
const pos = await this.adapter.getStreamPosition(collectionId)
338-
state.latestTerm = pos.latestTerm
339-
state.latestSeq = pos.latestSeq
340-
state.latestRowVersion = pos.latestRowVersion
341-
}
349+
if (this.isDisposed()) return
342350

343-
state.latestTerm++
351+
try {
352+
// Restore stream position from DB before claiming leadership
353+
const adapter = this.requireAdapter()
354+
if (adapter.getStreamPosition) {
355+
const pos =
356+
await adapter.getStreamPosition(collectionId)
357+
state.latestTerm = pos.latestTerm
358+
state.latestSeq = pos.latestSeq
359+
state.latestRowVersion = pos.latestRowVersion
360+
}
361+
362+
state.latestTerm++
363+
state.isLeader = true
344364

345-
this.emitHeartbeat(collectionId, state)
346-
state.heartbeatTimer = setInterval(() => {
347365
this.emitHeartbeat(collectionId, state)
348-
}, HEARTBEAT_INTERVAL_MS)
366+
state.heartbeatTimer = setInterval(() => {
367+
this.emitHeartbeat(collectionId, state)
368+
}, HEARTBEAT_INTERVAL_MS)
349369

350-
try {
351370
// Hold the lock until disposed or aborted
352371
await new Promise<void>((resolve) => {
353372
const onAbort = () => {
@@ -362,8 +381,10 @@ export class BrowserCollectionCoordinator implements PersistedCollectionCoordina
362381
})
363382
} finally {
364383
state.isLeader = false
365-
clearInterval(state.heartbeatTimer)
366-
state.heartbeatTimer = null
384+
if (state.heartbeatTimer) {
385+
clearInterval(state.heartbeatTimer)
386+
state.heartbeatTimer = null
387+
}
367388
}
368389
},
369390
)
@@ -592,7 +613,7 @@ export class BrowserCollectionCoordinator implements PersistedCollectionCoordina
592613
},
593614
): Promise<RPCResponse> {
594615
await this.withWriterLock(() =>
595-
this.adapter.ensureIndex(collectionId, request.signature, request.spec),
616+
this.requireAdapter().ensureIndex(collectionId, request.signature, request.spec),
596617
)
597618
return {
598619
type: `rpc:ensurePersistedIndex:res`,
@@ -654,7 +675,7 @@ export class BrowserCollectionCoordinator implements PersistedCollectionCoordina
654675
}
655676

656677
await this.withWriterLock(() =>
657-
this.adapter.applyCommittedTx(collectionId, tx),
678+
this.requireAdapter().applyCommittedTx(collectionId, tx),
658679
)
659680

660681
// Track envelope for dedup
@@ -714,7 +735,8 @@ export class BrowserCollectionCoordinator implements PersistedCollectionCoordina
714735
): Promise<PullSinceResponse> {
715736
const state = this.collections.get(collectionId)
716737

717-
if (!this.adapter.pullSince) {
738+
const adapter = this.requireAdapter()
739+
if (!adapter.pullSince) {
718740
return {
719741
type: `rpc:pullSince:res`,
720742
rpcId: request.rpcId,
@@ -726,7 +748,7 @@ export class BrowserCollectionCoordinator implements PersistedCollectionCoordina
726748
}
727749
}
728750

729-
const result = await this.adapter.pullSince(
751+
const result = await adapter.pullSince(
730752
collectionId,
731753
request.fromRowVersion,
732754
)

packages/db-browser-wa-sqlite-persisted-collection/src/browser-persistence.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import {
22
SingleProcessCoordinator,
33
createSQLiteCorePersistenceAdapter,
44
} from '@tanstack/db-sqlite-persisted-collection-core'
5+
import { BrowserCollectionCoordinator } from './browser-coordinator'
56
import { BrowserWASQLiteDriver } from './wa-sqlite-driver'
67
import type {
78
PersistedCollectionCoordinator,
@@ -135,6 +136,13 @@ export function createBrowserWASQLitePersistence<
135136
...(schemaVersion === undefined ? {} : { schemaVersion }),
136137
})
137138
adapterCache.set(cacheKey, adapter)
139+
140+
// Wire the adapter into the multi-tab coordinator so it can handle
141+
// leader-side RPCs (applyCommittedTx, pullSince, ensureIndex, etc.)
142+
if (resolvedCoordinator instanceof BrowserCollectionCoordinator) {
143+
resolvedCoordinator.setAdapter(adapter)
144+
}
145+
138146
return adapter
139147
}
140148

0 commit comments

Comments
 (0)