Skip to content

Commit 533be70

Browse files
kevin-dpclaude
andcommitted
fix(persistence): always route mutations through coordinator to prevent seq collisions
The leader tab had two mutation paths: a "direct" path (write to SQLite and broadcast) and an RPC path (through the coordinator). Previously, only follower tabs used the RPC path — the leader bypassed the coordinator and wrote directly. This caused a seq collision: the leader's direct writes incremented the runtime's `localSeq` but left the coordinator's `state.latestSeq` at 0. When a follower later sent an RPC, the coordinator assigned seq starting from 1 again, producing duplicate seq numbers. The leader then skipped these "already-seen" tx:committed messages, causing follower mutations to silently disappear. Fix: Always route through `requestApplyLocalMutations` when available, regardless of leader/follower status. This keeps the coordinator's seq counter in sync with all writes. Also removes `requestApplyLocalMutations` from `SingleProcessCoordinator` — it was a stub that returned success without persisting, which would break now that the leader uses this path. Single-process mode correctly falls back to the direct path since it has no multi-tab coordination. Co-Authored-By: Claude Opus 4.6 <[email protected]>
1 parent 5163087 commit 533be70

1 file changed

Lines changed: 9 additions & 19 deletions

File tree

  • packages/db-sqlite-persisted-collection-core/src

packages/db-sqlite-persisted-collection-core/src/persisted.ts

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -406,21 +406,6 @@ export class SingleProcessCoordinator implements PersistedCollectionCoordinator
406406

407407
public async requestEnsurePersistedIndex(): Promise<void> {}
408408

409-
public requestApplyLocalMutations(
410-
_collectionId: string,
411-
mutations: Array<PersistedMutationEnvelope>,
412-
): Promise<ApplyLocalMutationsResponse> {
413-
return Promise.resolve({
414-
type: `rpc:applyLocalMutations:res`,
415-
rpcId: crypto.randomUUID(),
416-
ok: true,
417-
term: 1,
418-
seq: mutations.length,
419-
latestRowVersion: mutations.length,
420-
acceptedMutationIds: mutations.map((mutation) => mutation.mutationId),
421-
})
422-
}
423-
424409
public pullSince(): Promise<PullSinceResponse> {
425410
return Promise.resolve({
426411
type: `rpc:pullSince:res`,
@@ -1321,10 +1306,13 @@ class PersistedCollectionRuntime<
13211306
private async persistCollectionMutationsUnsafe(
13221307
mutations: Array<PendingMutation<T>>,
13231308
): Promise<Array<string>> {
1324-
if (
1325-
this.persistence.coordinator.requestApplyLocalMutations &&
1326-
!this.persistence.coordinator.isLeader(this.collectionId)
1327-
) {
1309+
// When a coordinator with requestApplyLocalMutations is available, always
1310+
// route through it — even on the leader tab. This ensures the coordinator's
1311+
// seq/rowVersion counters stay in sync with actual writes. Without this,
1312+
// the leader's direct-path writes would increment the runtime's localSeq
1313+
// but leave the coordinator's state.latestSeq stale, causing seq collisions
1314+
// when follower RPCs later arrive.
1315+
if (this.persistence.coordinator.requestApplyLocalMutations) {
13281316
const envelopeMutations = mutations.map((mutation) =>
13291317
toPersistedMutationEnvelope(
13301318
mutation as unknown as PendingMutation<Record<string, unknown>>,
@@ -1368,6 +1356,8 @@ class PersistedCollectionRuntime<
13681356
return uniqueAcceptedMutationIds
13691357
}
13701358

1359+
// Fallback: no coordinator with requestApplyLocalMutations (e.g.
1360+
// SingleProcessCoordinator). Apply directly and broadcast.
13711361
const streamPosition = this.nextLocalStreamPosition()
13721362
const tx = this.createPersistedTxFromMutations(mutations, streamPosition)
13731363
await this.persistence.adapter.applyCommittedTx(this.collectionId, tx)

0 commit comments

Comments
 (0)