feat(data-connect): Migrate streaming to use SubscribeObserver and further harden errors / disconnects#9815
Conversation
|
There was a problem hiding this comment.
Code Review
This pull request refactors the subscription mechanism by replacing the SubscribeNotificationHook callback with a SubscribeObserver interface across the QueryManager and transport layers. This change improves the handling of data updates, disconnections, and errors. The PR also adds a WebSocket availability check for Node.js environments. Feedback includes suggestions to support and await asynchronous processing in the observer to prevent race conditions, refining error construction in the QueryManager, and addressing potential crashes due to WebSocket protocol limits for reason strings.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request refactors the subscription mechanism in the Data Connect SDK, replacing the SubscribeNotificationHook with a more robust SubscribeObserver interface that includes explicit handlers for data, disconnections, and errors. It also improves WebSocket handling by ensuring close reasons are truncated to the protocol-defined limit and adds environment checks for WebSocket availability in Node.js. Feedback focuses on improving error propagation by extracting specific error codes from stream notifications, providing better default error messages for WebSocket closures, and preserving original error details during message parsing.
|
Firestore tests keep failing... I'm merging anyways. I believe they are just flaky |
Description
✨ This PR migrates the internal streaming transport notification system to the
SubscribeObserverpattern, simplifies error handling, and refactors REST routing fallback logic.Changes
SubscribeObserverinterface inQueryManagerand transport layers (no functionality change from user perspective - just changed communication scheme between query layer + transport layer).executeShouldUseStream()in catch blocks instead of checkingisUnableToConnectdirectly.isUnableToConnectgetter with a property, updating it on WebSocket open/error and adding checks inTransportManager.WebSocketDataConnectErrorand custom close codes, and refactoredrejectAllActiveRequeststo use standard codes.onStreamClose.WebSocketinitialization in Node and truncated close reasons to 123 bytes to comply with the spec.Testing
test/unit/streamTransport.test.tsandtest/unit/streaming.test.tsfor the observer pattern.test/unit/websocketTransport.test.tsandtest/unit/transportManager.test.tsfor error handling and routing.