@@ -28,13 +28,21 @@ import { DataConnectSubscription } from '../../api.browser';
2828import { DataConnectCache , ServerValues } from '../../cache/Cache' ;
2929import { parseEntityIds } from '../../cache/cacheUtils' ;
3030import { EncodingMode } from '../../cache/EntityNode' ;
31- import { DataConnectTransport , Extensions } from '../../network' ;
3231import {
32+ DataConnectTransportInterface ,
33+ Extensions ,
3334 DataConnectExtensionWithMaxAge ,
34- ExtensionsWithMaxAge
35- } from '../../network/transport' ;
35+ ExtensionsWithMaxAge ,
36+ SubscribeObserver ,
37+ DataConnectResponse
38+ } from '../../network' ;
3639import { decoderImpl , encoderImpl } from '../../util/encoder' ;
37- import { Code , DataConnectError } from '../error' ;
40+ import {
41+ Code ,
42+ DataConnectError ,
43+ DataConnectOperationError ,
44+ DataConnectOperationFailureResponse
45+ } from '../error' ;
3846
3947import {
4048 OnCompleteSubscription ,
@@ -85,9 +93,13 @@ export class QueryManager {
8593 string ,
8694 Array < DataConnectSubscription < unknown , unknown > >
8795 > ( ) ;
96+ /**
97+ * Map of serialized query keys to most recent Query Result. Used as a simple fallback cache
98+ * for subsciptions if caching is not enabled.
99+ */
88100 private subscriptionCache = new Map < string , QueryResult < unknown , unknown > > ( ) ;
89101 constructor (
90- private transport : DataConnectTransport ,
102+ private transport : DataConnectTransportInterface ,
91103 private dc : DataConnect ,
92104 private cache ?: DataConnectCache
93105 ) { }
@@ -154,10 +166,15 @@ export class QueryManager {
154166 const unsubscribe = ( ) : void => {
155167 if ( this . callbacks . has ( key ) ) {
156168 const callbackList = this . callbacks . get ( key ) ! ;
157- this . callbacks . set (
158- key ,
159- callbackList . filter ( callback => callback !== subscription )
169+ const newList = callbackList . filter (
170+ callback => callback !== subscription
160171 ) ;
172+ this . callbacks . set ( key , newList ) ;
173+
174+ if ( newList . length === 0 ) {
175+ this . callbacks . delete ( key ) ;
176+ this . transport . invokeUnsubscribe ( queryRef . name , queryRef . variables ) ;
177+ }
161178 onCompleteCallback ?.( ) ;
162179 }
163180 } ;
@@ -175,12 +192,22 @@ export class QueryManager {
175192 // We want to ignore the error and let subscriptions handle it
176193 promise . then ( undefined , err => { } ) ;
177194
178- if ( ! this . callbacks . has ( key ) ) {
179- this . callbacks . set ( key , [ ] ) ;
195+ if ( this . callbacks . has ( key ) ) {
196+ this . callbacks
197+ . get ( key ) !
198+ . push ( subscription as DataConnectSubscription < unknown , unknown > ) ;
199+ } else {
200+ this . callbacks . set ( key , [
201+ subscription as DataConnectSubscription < unknown , unknown >
202+ ] ) ;
203+
204+ // only invoke subscription if we don't already have an active subscription
205+ this . transport . invokeSubscribe < Data , Variables > (
206+ this . makeSubscribeObserver ( queryRef ) ,
207+ queryRef . name ,
208+ queryRef . variables
209+ ) ;
180210 }
181- this . callbacks
182- . get ( key ) !
183- . push ( subscription as DataConnectSubscription < unknown , unknown > ) ;
184211
185212 return unsubscribe ;
186213 }
@@ -215,8 +242,7 @@ export class QueryManager {
215242 fetchTime
216243 )
217244 } ;
218- let updatedKeys : string [ ] = [ ] ;
219- updatedKeys = await this . updateCache (
245+ const updatedKeys = await this . updateCache (
220246 queryResult ,
221247 originalExtensions ?. dataConnect
222248 ) ;
@@ -342,6 +368,7 @@ export class QueryManager {
342368 return result as QueryResult < Data , Variables > ;
343369 }
344370
371+ /** Call the registered onNext callbacks for the given key */
345372 publishDataToSubscribers (
346373 key : string ,
347374 queryResult : QueryResult < unknown , unknown >
@@ -391,6 +418,108 @@ export class QueryManager {
391418 enableEmulator ( host : string , port : number ) : void {
392419 this . transport . useEmulator ( host , port ) ;
393420 }
421+
422+ /**
423+ * Create a new {@link SubscribeObserver} for the given QueryRef. This will be passed to
424+ * {@link DataConnectTransportInterface.invokeSubscribe | invokeSubscribe()} to notify the query
425+ * layer of data update notifications or if the stream disconnected.
426+ */
427+ private makeSubscribeObserver < Data , Variables > (
428+ queryRef : QueryRef < Data , Variables >
429+ ) : SubscribeObserver < Data > {
430+ const key = encoderImpl ( {
431+ name : queryRef . name ,
432+ variables : queryRef . variables ,
433+ refType : QUERY_STR
434+ } ) ;
435+ return {
436+ onData : async response => {
437+ await this . handleStreamNotification ( key , response , queryRef ) ;
438+ } ,
439+ onDisconnect : ( code , reason ) => {
440+ this . handleStreamDisconnect ( key , code , reason ) ;
441+ } ,
442+ onError : error => {
443+ this . publishErrorToSubscribers ( key , error ) ;
444+ }
445+ } ;
446+ }
447+
448+ /**
449+ * Handle a data update notification from the stream. Notify subscribers of results/errors, and
450+ * update the cache.
451+ */
452+ private async handleStreamNotification < Data , Variables > (
453+ key : string ,
454+ response : DataConnectResponse < Data > ,
455+ queryRef : QueryRef < Data , Variables >
456+ ) : Promise < void > {
457+ if ( response . errors && response . errors . length > 0 ) {
458+ const stringified = JSON . stringify (
459+ response . errors . map ( e => {
460+ if ( e && typeof e === 'object' ) {
461+ return {
462+ message : ( e as unknown as { message : string } ) . message ,
463+ code : ( e as unknown as { code ?: unknown } ) . code
464+ } ;
465+ }
466+ return e ;
467+ } )
468+ ) ;
469+ const failureResponse : DataConnectOperationFailureResponse = {
470+ errors : response . errors as [ ] ,
471+ data : response . data as Record < string , unknown >
472+ } ;
473+ const error = new DataConnectOperationError (
474+ 'DataConnect error received from subscribe notification: ' +
475+ stringified ,
476+ failureResponse
477+ ) ;
478+ this . publishErrorToSubscribers ( key , error ) ;
479+ return ;
480+ }
481+
482+ const fetchTime = Date . now ( ) . toString ( ) ;
483+ const queryResult : QueryResult < Data , Variables > = {
484+ ref : queryRef ,
485+ source : SOURCE_SERVER ,
486+ fetchTime,
487+ data : response . data ,
488+ extensions : getDataConnectExtensionsWithoutMaxAge ( response . extensions ) ,
489+ toJSON : getRefSerializer (
490+ queryRef ,
491+ response . data ,
492+ SOURCE_SERVER ,
493+ fetchTime
494+ )
495+ } ;
496+ const updatedKeys = await this . updateCache (
497+ queryResult ,
498+ response . extensions ?. dataConnect
499+ ) ;
500+ this . publishDataToSubscribers ( key , queryResult ) ;
501+ if ( this . cache ) {
502+ await this . publishCacheResultsToSubscribers ( updatedKeys , fetchTime ) ;
503+ }
504+ }
505+
506+ /**
507+ * Handle a disconnect from the stream. Unsubscribe all callbacks for the given key.
508+ */
509+ private handleStreamDisconnect (
510+ key : string ,
511+ code : string ,
512+ reason : string
513+ ) : void {
514+ const error = new DataConnectError ( code as Code , reason ) ;
515+ this . publishErrorToSubscribers ( key , error ) ;
516+
517+ const callbacks = this . callbacks . get ( key ) ;
518+ if ( callbacks ) {
519+ [ ...callbacks ] . forEach ( cb => cb . unsubscribe ( ) ) ;
520+ }
521+ return ;
522+ }
394523}
395524
396525export function getMaxAgeFromExtensions (
0 commit comments