@@ -548,6 +548,15 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
548548 } ,
549549 } ) ;
550550
551+ const requireWatchClient = (
552+ watchClient : IMessageRpcClient | null | undefined ,
553+ ) : IMessageRpcClient => {
554+ if ( ! watchClient ) {
555+ throw new Error ( "imessage monitor client not initialized" ) ;
556+ }
557+ return watchClient ;
558+ } ;
559+
551560 for ( let attempt = 1 ; attempt <= WATCH_SUBSCRIBE_MAX_ATTEMPTS ; attempt ++ ) {
552561 if ( abort ?. aborted ) {
553562 return ;
@@ -556,7 +565,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
556565 let attemptDetachAbortHandler = ( ) => { } ;
557566 let keepAttemptClient = false ;
558567 try {
559- attemptClient = await createWatchClient ( ) ;
568+ attemptClient = requireWatchClient ( await createWatchClient ( ) ) ;
560569 let attemptSubscriptionId : number | null = null ;
561570 attemptDetachAbortHandler = attachIMessageMonitorAbortHandler ( {
562571 abortSignal : abort ,
@@ -590,6 +599,12 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
590599 `imessage: watch.subscribe startup failed (attempt ${ attempt } /${ WATCH_SUBSCRIBE_MAX_ATTEMPTS } ): ${ String ( err ) } ; retrying` ,
591600 ) ,
592601 ) ;
602+ // Tear down the failed client before waiting so a slow subscribe attempt
603+ // cannot keep emitting notifications into the next retry window.
604+ attemptDetachAbortHandler ( ) ;
605+ attemptDetachAbortHandler = ( ) => { } ;
606+ await attemptClient ?. stop ( ) ;
607+ attemptClient = undefined ;
593608 await waitForWatchSubscribeRetryDelay ( {
594609 ms : WATCH_SUBSCRIBE_RETRY_DELAY_MS ,
595610 abortSignal : abort ,
@@ -605,12 +620,13 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
605620 }
606621 }
607622
608- if ( ! client ) {
623+ const activeClient = client ;
624+ if ( ! activeClient ) {
609625 return ;
610626 }
611627
612628 try {
613- await client . waitForClose ( ) ;
629+ await activeClient . waitForClose ( ) ;
614630 } catch ( err ) {
615631 if ( abort ?. aborted ) {
616632 return ;
@@ -619,7 +635,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
619635 throw err ;
620636 } finally {
621637 detachAbortHandler ( ) ;
622- await client . stop ( ) ;
638+ await activeClient . stop ( ) ;
623639 }
624640}
625641
0 commit comments