Skip to content

Commit 1623fb0

Browse files
Merge main into release
2 parents 6e7226e + 87d5cc1 commit 1623fb0

29 files changed

Lines changed: 5004 additions & 415 deletions

.changeset/thick-roses-lick.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'firebase': minor
3+
'@firebase/data-connect': minor
4+
---
5+
6+
Add streaming support for Firebase Data Connect.

packages/data-connect/src/api/DataConnect.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ import {
3737
import { QueryManager } from '../core/query/QueryManager';
3838
import { logDebug, logError } from '../logger';
3939
import {
40+
DataConnectTransportInterface,
41+
TransportClass,
4042
CallerSdkType,
41-
CallerSdkTypeEnum,
42-
DataConnectTransport,
43-
TransportClass
43+
CallerSdkTypeEnum
4444
} from '../network';
45-
import { RESTTransport } from '../network/transport/rest';
45+
import { DataConnectTransportManager } from '../network/manager';
4646
import { PROD_HOST } from '../util/url';
4747

4848
import { MutationManager } from './Mutation';
@@ -96,7 +96,7 @@ export class DataConnect {
9696
_mutationManager!: MutationManager;
9797
isEmulator = false;
9898
_initialized = false;
99-
private _transport!: DataConnectTransport;
99+
private _transport!: DataConnectTransportInterface;
100100
private _transportClass: TransportClass | undefined;
101101
private _transportOptions?: TransportOptions;
102102
private _authTokenProvider?: AuthTokenProvider;
@@ -172,8 +172,10 @@ export class DataConnect {
172172
return;
173173
}
174174
if (this._transportClass === undefined) {
175-
logDebug('transportClass not provided. Defaulting to RESTTransport.');
176-
this._transportClass = RESTTransport;
175+
logDebug(
176+
'transportClass not provided. Defaulting to DataConnectTransportManager.'
177+
);
178+
this._transportClass = DataConnectTransportManager;
177179
}
178180

179181
this._authTokenProvider = new FirebaseAuthProvider(

packages/data-connect/src/api/Mutation.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
import { DataConnectTransport } from '../network/transport';
18+
import { DataConnectTransportInterface } from '../network';
1919

2020
import { DataConnect } from './DataConnect';
2121
import {
@@ -77,7 +77,7 @@ export function mutationRef<Data, Variables>(
7777
*/
7878
export class MutationManager {
7979
private _inflight: Array<Promise<unknown>> = [];
80-
constructor(private _transport: DataConnectTransport) {}
80+
constructor(private _transport: DataConnectTransportInterface) {}
8181
executeMutation<Data, Variables>(
8282
mutationRef: MutationRef<Data, Variables>
8383
): MutationPromise<Data, Variables> {

packages/data-connect/src/api/query.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import {
2020
ExecuteQueryOptions,
2121
QueryFetchPolicy
2222
} from '../core/query/queryOptions';
23-
import { DataConnectExtensionWithMaxAge } from '../network/transport';
23+
import { DataConnectExtensionWithMaxAge } from '../network';
2424

2525
import { DataConnect, getDataConnect } from './DataConnect';
2626
import {

packages/data-connect/src/core/query/QueryManager.ts

Lines changed: 144 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,21 @@ import { DataConnectSubscription } from '../../api.browser';
2828
import { DataConnectCache, ServerValues } from '../../cache/Cache';
2929
import { parseEntityIds } from '../../cache/cacheUtils';
3030
import { EncodingMode } from '../../cache/EntityNode';
31-
import { DataConnectTransport, Extensions } from '../../network';
3231
import {
32+
DataConnectTransportInterface,
33+
Extensions,
3334
DataConnectExtensionWithMaxAge,
34-
ExtensionsWithMaxAge
35-
} from '../../network/transport';
35+
ExtensionsWithMaxAge,
36+
SubscribeObserver,
37+
DataConnectResponse
38+
} from '../../network';
3639
import { decoderImpl, encoderImpl } from '../../util/encoder';
37-
import { Code, DataConnectError } from '../error';
40+
import {
41+
Code,
42+
DataConnectError,
43+
DataConnectOperationError,
44+
DataConnectOperationFailureResponse
45+
} from '../error';
3846

3947
import {
4048
OnCompleteSubscription,
@@ -85,9 +93,13 @@ export class QueryManager {
8593
string,
8694
Array<DataConnectSubscription<unknown, unknown>>
8795
>();
96+
/**
97+
* Map of serialized query keys to most recent Query Result. Used as a simple fallback cache
98+
* for subsciptions if caching is not enabled.
99+
*/
88100
private subscriptionCache = new Map<string, QueryResult<unknown, unknown>>();
89101
constructor(
90-
private transport: DataConnectTransport,
102+
private transport: DataConnectTransportInterface,
91103
private dc: DataConnect,
92104
private cache?: DataConnectCache
93105
) {}
@@ -154,10 +166,15 @@ export class QueryManager {
154166
const unsubscribe = (): void => {
155167
if (this.callbacks.has(key)) {
156168
const callbackList = this.callbacks.get(key)!;
157-
this.callbacks.set(
158-
key,
159-
callbackList.filter(callback => callback !== subscription)
169+
const newList = callbackList.filter(
170+
callback => callback !== subscription
160171
);
172+
this.callbacks.set(key, newList);
173+
174+
if (newList.length === 0) {
175+
this.callbacks.delete(key);
176+
this.transport.invokeUnsubscribe(queryRef.name, queryRef.variables);
177+
}
161178
onCompleteCallback?.();
162179
}
163180
};
@@ -175,12 +192,22 @@ export class QueryManager {
175192
// We want to ignore the error and let subscriptions handle it
176193
promise.then(undefined, err => {});
177194

178-
if (!this.callbacks.has(key)) {
179-
this.callbacks.set(key, []);
195+
if (this.callbacks.has(key)) {
196+
this.callbacks
197+
.get(key)!
198+
.push(subscription as DataConnectSubscription<unknown, unknown>);
199+
} else {
200+
this.callbacks.set(key, [
201+
subscription as DataConnectSubscription<unknown, unknown>
202+
]);
203+
204+
// only invoke subscription if we don't already have an active subscription
205+
this.transport.invokeSubscribe<Data, Variables>(
206+
this.makeSubscribeObserver(queryRef),
207+
queryRef.name,
208+
queryRef.variables
209+
);
180210
}
181-
this.callbacks
182-
.get(key)!
183-
.push(subscription as DataConnectSubscription<unknown, unknown>);
184211

185212
return unsubscribe;
186213
}
@@ -215,8 +242,7 @@ export class QueryManager {
215242
fetchTime
216243
)
217244
};
218-
let updatedKeys: string[] = [];
219-
updatedKeys = await this.updateCache(
245+
const updatedKeys = await this.updateCache(
220246
queryResult,
221247
originalExtensions?.dataConnect
222248
);
@@ -342,6 +368,7 @@ export class QueryManager {
342368
return result as QueryResult<Data, Variables>;
343369
}
344370

371+
/** Call the registered onNext callbacks for the given key */
345372
publishDataToSubscribers(
346373
key: string,
347374
queryResult: QueryResult<unknown, unknown>
@@ -391,6 +418,108 @@ export class QueryManager {
391418
enableEmulator(host: string, port: number): void {
392419
this.transport.useEmulator(host, port);
393420
}
421+
422+
/**
423+
* Create a new {@link SubscribeObserver} for the given QueryRef. This will be passed to
424+
* {@link DataConnectTransportInterface.invokeSubscribe | invokeSubscribe()} to notify the query
425+
* layer of data update notifications or if the stream disconnected.
426+
*/
427+
private makeSubscribeObserver<Data, Variables>(
428+
queryRef: QueryRef<Data, Variables>
429+
): SubscribeObserver<Data> {
430+
const key = encoderImpl({
431+
name: queryRef.name,
432+
variables: queryRef.variables,
433+
refType: QUERY_STR
434+
});
435+
return {
436+
onData: async response => {
437+
await this.handleStreamNotification(key, response, queryRef);
438+
},
439+
onDisconnect: (code, reason) => {
440+
this.handleStreamDisconnect(key, code, reason);
441+
},
442+
onError: error => {
443+
this.publishErrorToSubscribers(key, error);
444+
}
445+
};
446+
}
447+
448+
/**
449+
* Handle a data update notification from the stream. Notify subscribers of results/errors, and
450+
* update the cache.
451+
*/
452+
private async handleStreamNotification<Data, Variables>(
453+
key: string,
454+
response: DataConnectResponse<Data>,
455+
queryRef: QueryRef<Data, Variables>
456+
): Promise<void> {
457+
if (response.errors && response.errors.length > 0) {
458+
const stringified = JSON.stringify(
459+
response.errors.map(e => {
460+
if (e && typeof e === 'object') {
461+
return {
462+
message: (e as unknown as { message: string }).message,
463+
code: (e as unknown as { code?: unknown }).code
464+
};
465+
}
466+
return e;
467+
})
468+
);
469+
const failureResponse: DataConnectOperationFailureResponse = {
470+
errors: response.errors as [],
471+
data: response.data as Record<string, unknown>
472+
};
473+
const error = new DataConnectOperationError(
474+
'DataConnect error received from subscribe notification: ' +
475+
stringified,
476+
failureResponse
477+
);
478+
this.publishErrorToSubscribers(key, error);
479+
return;
480+
}
481+
482+
const fetchTime = Date.now().toString();
483+
const queryResult: QueryResult<Data, Variables> = {
484+
ref: queryRef,
485+
source: SOURCE_SERVER,
486+
fetchTime,
487+
data: response.data,
488+
extensions: getDataConnectExtensionsWithoutMaxAge(response.extensions),
489+
toJSON: getRefSerializer(
490+
queryRef,
491+
response.data,
492+
SOURCE_SERVER,
493+
fetchTime
494+
)
495+
};
496+
const updatedKeys = await this.updateCache(
497+
queryResult,
498+
response.extensions?.dataConnect
499+
);
500+
this.publishDataToSubscribers(key, queryResult);
501+
if (this.cache) {
502+
await this.publishCacheResultsToSubscribers(updatedKeys, fetchTime);
503+
}
504+
}
505+
506+
/**
507+
* Handle a disconnect from the stream. Unsubscribe all callbacks for the given key.
508+
*/
509+
private handleStreamDisconnect(
510+
key: string,
511+
code: string,
512+
reason: string
513+
): void {
514+
const error = new DataConnectError(code as Code, reason);
515+
this.publishErrorToSubscribers(key, error);
516+
517+
const callbacks = this.callbacks.get(key);
518+
if (callbacks) {
519+
[...callbacks].forEach(cb => cb.unsubscribe());
520+
}
521+
return;
522+
}
394523
}
395524

396525
export function getMaxAgeFromExtensions(

packages/data-connect/src/index.node.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,21 @@
1515
* limitations under the License.
1616
*/
1717

18-
import { initializeFetch } from './network/fetch';
18+
import { initializeFetch } from './network/rest';
19+
import { initializeWebSocket } from './network/stream/websocket';
1920
import { registerDataConnect } from './register';
2021

2122
export * from './api';
2223
export * from './api.node';
24+
2325
initializeFetch(fetch);
2426

27+
if (typeof WebSocket !== 'undefined') {
28+
initializeWebSocket(WebSocket);
29+
} else {
30+
console.warn(
31+
'WebSocket is not available in this environment. Use a polyfill or upgrade your Node version to one that supports WebSockets.'
32+
);
33+
}
34+
2535
registerDataConnect('node');

packages/data-connect/src/network/index.ts

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,4 @@
1515
* limitations under the License.
1616
*/
1717

18-
export {
19-
CallerSdkType,
20-
CallerSdkTypeEnum,
21-
DataConnectTransport,
22-
DataConnectEntityArray,
23-
DataConnectSingleEntity,
24-
DataConnectExtension,
25-
Extensions,
26-
TransportClass
27-
} from './transport';
18+
export * from './transport';

0 commit comments

Comments
 (0)