Skip to content

feat(data-connect): Add baseline streaming functionality (sending, tracking, routing requests)#9718

Merged
stephenarosaj merged 25 commits intopasta/mainfrom
pasta/lasagna
Mar 12, 2026
Merged

feat(data-connect): Add baseline streaming functionality (sending, tracking, routing requests)#9718
stephenarosaj merged 25 commits intopasta/mainfrom
pasta/lasagna

Conversation

@stephenarosaj
Copy link
Copy Markdown
Contributor

Description

✨ This PR establishes the baseline streaming transport functionality - making and tracking requests, and routing responses to requests. It does not include optimizations, and has only unit tests for request tracking/routing logic (since there is no concrete streaming transport implementation yet).

Changes

  • Message sending: Added tracking data structures to AbstractDataConnectStreamTransport and implemented invoke* methods to send and manage logical stream requests.
  • Response Handling: Introduced handleMessage to focus on routing response data to tracked promises or subscription notification hooks based on incoming server request IDs. Throws explicit unhandled requestId errors.

Testing

New unit tests were added in test/unit/streamTransport.test.ts to test the request management:

  • Verified unique sequential requestId generation behavior for each new message (should have been in last PR).
  • Made sure x-firebase-gmpid header is included in messages (should have been in last PR).
  • Ensured requests are properly tracked when invoke* methods are called.
  • Ensured data is routed properly responses come in.
  • Ensured tracked requests are cleaned up when responses come in.
  • Verified unrecognized request IDs accurately map to a thrown DataConnectError.

@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented Mar 12, 2026

⚠️ No Changeset found

Latest commit: 9d4ab85

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request lays the foundational framework for streaming transport within the Data Connect client. It introduces the necessary mechanisms to initiate, track, and manage the lifecycle of streaming requests (queries, mutations, and subscriptions) and to correctly route their corresponding responses. By centralizing resource path definitions and implementing robust request/response handling, this change provides a solid, test-covered baseline for future streaming capabilities without including specific optimizations.

Highlights

  • Streaming Transport Baseline: Established the core functionality for streaming transport, including making and tracking requests, and routing responses to their respective handlers.
  • Request Management: Introduced data structures within AbstractDataConnectStreamTransport to manage active queries, mutations, and subscriptions, ensuring proper lifecycle management for each request.
  • Response Routing: Implemented a _handleMessage method to efficiently route incoming server responses to either promise resolvers for one-off requests or notification hooks for subscriptions, with robust error handling for unrecognized request IDs.
  • Centralized Resource Path: Refactored the _connectorResourcePath to be defined and managed in the base AbstractDataConnectTransport class, ensuring consistency across different transport implementations.
  • Comprehensive Unit Testing: Added extensive unit tests to validate request ID generation, header inclusion, request tracking, response routing, and cleanup mechanisms within the streaming transport.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • packages/data-connect/src/network/manager.ts
    • Imported Code, DataConnectError, and AbstractDataConnectStreamTransport.
    • Added an optional streamTransport private member to DataConnectTransportManager.
  • packages/data-connect/src/network/rest/restTransport.ts
    • Updated invokeQuery and invokeMutation to utilize the _connectorResourcePath property from the base class.
  • packages/data-connect/src/network/stream/streamTransport.ts
    • Removed DataConnectResponseWithMaxAge import.
    • Imported specific stream request types: CancelStreamRequest, ExecuteStreamRequest, ResumeStreamRequest, SubscribeStreamRequest.
    • Added private maps (_activeQueryExecuteRequests, _activeMutationExecuteRequests, _activeSubscribeRequests, _executeRequestPromises, _subscribeNotificationHooks) for tracking request states.
    • Removed the connectorResourcePath getter, as it is now defined in AbstractDataConnectTransport.
    • Introduced private helper methods (_sendExecuteMessage, _sendSubscribeMessage, _sendCancelMessage, _sendResumeMessage) for sending various stream message types.
    • Implemented _makeExecutePromise to create and manage promises for execute requests.
    • Added _getMapKey and _sortObjectKeys for consistent map key generation.
    • Implemented invokeQuery, invokeMutation, invokeSubscribe, and invokeUnsubscribe methods to handle stream request initiation and tracking.
    • Implemented _handleMessage to route incoming responses to the appropriate promise resolvers or subscription notification hooks, including error handling for unrecognized request IDs.
  • packages/data-connect/src/network/transport.ts
    • Added _connectorResourcePath as a protected member.
    • Initialized _connectorResourcePath in the constructor based on project, location, service, and connector details.
  • packages/data-connect/test/unit/streamTransport.test.ts
    • Updated chai and sinon imports and usage, adding chai-as-promised.
    • Imported specific stream request types for comprehensive testing.
    • Extended TestStreamTransport with invokeHandleMessage to expose the internal response handling for testing.
    • Modified TransportWithInternals interface to expose new private members and methods for testing request tracking.
    • Added tests for x-firebase-gmpid header inclusion in prepared messages.
    • Added tests to verify connection state reset upon onConnectionReady().
    • Introduced extensive unit tests for request tracking, including _getMapKey consistency, unique request ID generation, and the functionality of invokeQuery, invokeMutation, invokeSubscribe, and invokeUnsubscribe.
    • Added tests for incoming response handling, verifying correct routing to promises and subscription hooks, and error handling for unknown request IDs.
Activity
  • No human activity (comments, reviews) has been recorded for this pull request yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the foundational components for streaming transport, including request tracking and response routing. The changes are well-structured, moving common logic to base classes and adding comprehensive unit tests for the new functionality. I have a few suggestions to improve error handling in the response routing logic and to make the new tests more robust against unhandled promise rejections.

Comment on lines +384 to +386
const { resolve } = this._executeRequestPromises.get(requestId)!;
resolve(response);
this._executeRequestPromises.delete(requestId);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The reject function for the promise is not being used in _handleMessage. If a response for an execute request contains errors, the corresponding promise should be rejected rather than resolved. This provides a more idiomatic error handling mechanism for consumers of invokeQuery and invokeMutation.

Suggested change
const { resolve } = this._executeRequestPromises.get(requestId)!;
resolve(response);
this._executeRequestPromises.delete(requestId);
const { resolve, reject } = this._executeRequestPromises.get(requestId)!;
if (response.errors?.length) {
reject(response.errors);
} else {
resolve(response);
}
this._executeRequestPromises.delete(requestId);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In RESTTransport, errors are thrown by dcFetch when there's a fetch request error or when there's errors in the response - this happens when the response initially comes in, not in the invoke* functions. We should use the same error system for streaming.

Comment thread packages/data-connect/test/unit/streamTransport.test.ts Outdated
Comment thread packages/data-connect/test/unit/streamTransport.test.ts Outdated
@stephenarosaj stephenarosaj marked this pull request as ready for review March 12, 2026 01:44
@stephenarosaj stephenarosaj merged commit a8e9031 into pasta/main Mar 12, 2026
15 checks passed
@stephenarosaj stephenarosaj deleted the pasta/lasagna branch March 12, 2026 17:36
@stephenarosaj stephenarosaj changed the title Pasta/lasagna feat(data-connect): Add baseline streaming functionality (sending, tracking, routing requests) Mar 16, 2026
@firebase firebase locked and limited conversation to collaborators Apr 11, 2026
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants