@@ -113,7 +113,7 @@ type AdapterWithPullSince = PersistenceAdapter<
113113
114114export type BrowserCollectionCoordinatorOptions = {
115115 dbName : string
116- adapter : AdapterWithPullSince
116+ adapter ? : AdapterWithPullSince
117117}
118118
119119// ---------------------------------------------------------------------------
@@ -123,7 +123,7 @@ export type BrowserCollectionCoordinatorOptions = {
123123export class BrowserCollectionCoordinator implements PersistedCollectionCoordinator {
124124 private readonly nodeId = crypto . randomUUID ( )
125125 private readonly dbName : string
126- private readonly adapter : AdapterWithPullSince
126+ private adapter : AdapterWithPullSince | null
127127 private readonly channel : BroadcastChannel
128128 private readonly collections = new Map < string , CollectionState > ( )
129129 private readonly pendingRPCs = new Map < string , PendingRPC > ( )
@@ -135,15 +135,33 @@ export class BrowserCollectionCoordinator implements PersistedCollectionCoordina
135135 return this . disposed
136136 }
137137
138+ private requireAdapter ( ) : AdapterWithPullSince {
139+ if ( ! this . adapter ) {
140+ throw new Error (
141+ `BrowserCollectionCoordinator: adapter not set. Call setAdapter() before using leader-side operations.` ,
142+ )
143+ }
144+ return this . adapter
145+ }
146+
138147 constructor ( options : BrowserCollectionCoordinatorOptions ) {
139148 this . dbName = options . dbName
140- this . adapter = options . adapter
149+ this . adapter = options . adapter ?? null
141150 this . channel = new BroadcastChannel ( `tsdb:coord:${ this . dbName } ` )
142151 this . channel . onmessage = ( event : MessageEvent ) => {
143152 this . onChannelMessage ( event . data )
144153 }
145154 }
146155
156+ /**
157+ * Set or replace the persistence adapter used for leader-side RPC handling.
158+ * Called by `createBrowserWASQLitePersistence` to wire the internally-created
159+ * adapter into the coordinator.
160+ */
161+ setAdapter ( adapter : AdapterWithPullSince ) : void {
162+ this . adapter = adapter
163+ }
164+
147165 // -----------------------------------------------------------------------
148166 // PersistedCollectionCoordinator interface
149167 // -----------------------------------------------------------------------
@@ -207,7 +225,7 @@ export class BrowserCollectionCoordinator implements PersistedCollectionCoordina
207225 spec : PersistedIndexSpec ,
208226 ) : Promise < void > {
209227 if ( this . isLeader ( collectionId ) ) {
210- await this . adapter . ensureIndex ( collectionId , signature , spec )
228+ await this . requireAdapter ( ) . ensureIndex ( collectionId , signature , spec )
211229 return
212230 }
213231
@@ -328,26 +346,27 @@ export class BrowserCollectionCoordinator implements PersistedCollectionCoordina
328346 lockName ,
329347 { signal : abortController . signal } ,
330348 async ( ) => {
331- if ( this . disposed ) return
332-
333- state . isLeader = true
334-
335- // Restore stream position from DB
336- if ( this . adapter . getStreamPosition ) {
337- const pos = await this . adapter . getStreamPosition ( collectionId )
338- state . latestTerm = pos . latestTerm
339- state . latestSeq = pos . latestSeq
340- state . latestRowVersion = pos . latestRowVersion
341- }
349+ if ( this . isDisposed ( ) ) return
342350
343- state . latestTerm ++
351+ try {
352+ // Restore stream position from DB before claiming leadership
353+ const adapter = this . requireAdapter ( )
354+ if ( adapter . getStreamPosition ) {
355+ const pos =
356+ await adapter . getStreamPosition ( collectionId )
357+ state . latestTerm = pos . latestTerm
358+ state . latestSeq = pos . latestSeq
359+ state . latestRowVersion = pos . latestRowVersion
360+ }
361+
362+ state . latestTerm ++
363+ state . isLeader = true
344364
345- this . emitHeartbeat ( collectionId , state )
346- state . heartbeatTimer = setInterval ( ( ) => {
347365 this . emitHeartbeat ( collectionId , state )
348- } , HEARTBEAT_INTERVAL_MS )
366+ state . heartbeatTimer = setInterval ( ( ) => {
367+ this . emitHeartbeat ( collectionId , state )
368+ } , HEARTBEAT_INTERVAL_MS )
349369
350- try {
351370 // Hold the lock until disposed or aborted
352371 await new Promise < void > ( ( resolve ) => {
353372 const onAbort = ( ) => {
@@ -362,8 +381,10 @@ export class BrowserCollectionCoordinator implements PersistedCollectionCoordina
362381 } )
363382 } finally {
364383 state . isLeader = false
365- clearInterval ( state . heartbeatTimer )
366- state . heartbeatTimer = null
384+ if ( state . heartbeatTimer ) {
385+ clearInterval ( state . heartbeatTimer )
386+ state . heartbeatTimer = null
387+ }
367388 }
368389 } ,
369390 )
@@ -592,7 +613,7 @@ export class BrowserCollectionCoordinator implements PersistedCollectionCoordina
592613 } ,
593614 ) : Promise < RPCResponse > {
594615 await this . withWriterLock ( ( ) =>
595- this . adapter . ensureIndex ( collectionId , request . signature , request . spec ) ,
616+ this . requireAdapter ( ) . ensureIndex ( collectionId , request . signature , request . spec ) ,
596617 )
597618 return {
598619 type : `rpc:ensurePersistedIndex:res` ,
@@ -654,7 +675,7 @@ export class BrowserCollectionCoordinator implements PersistedCollectionCoordina
654675 }
655676
656677 await this . withWriterLock ( ( ) =>
657- this . adapter . applyCommittedTx ( collectionId , tx ) ,
678+ this . requireAdapter ( ) . applyCommittedTx ( collectionId , tx ) ,
658679 )
659680
660681 // Track envelope for dedup
@@ -714,7 +735,8 @@ export class BrowserCollectionCoordinator implements PersistedCollectionCoordina
714735 ) : Promise < PullSinceResponse > {
715736 const state = this . collections . get ( collectionId )
716737
717- if ( ! this . adapter . pullSince ) {
738+ const adapter = this . requireAdapter ( )
739+ if ( ! adapter . pullSince ) {
718740 return {
719741 type : `rpc:pullSince:res` ,
720742 rpcId : request . rpcId ,
@@ -726,7 +748,7 @@ export class BrowserCollectionCoordinator implements PersistedCollectionCoordina
726748 }
727749 }
728750
729- const result = await this . adapter . pullSince (
751+ const result = await adapter . pullSince (
730752 collectionId ,
731753 request . fromRowVersion ,
732754 )
0 commit comments