Skip to content
This repository was archived by the owner on Mar 5, 2026. It is now read-only.

feat: writer client mvp#300

Closed
loferris wants to merge 30 commits intomainfrom
writer_veneer_sandbox
Closed

feat: writer client mvp#300
loferris wants to merge 30 commits intomainfrom
writer_veneer_sandbox

Conversation

@loferris
Copy link
Copy Markdown
Contributor

@loferris loferris commented Oct 1, 2022

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:

  • Make sure to open an issue as a bug/issue before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea
  • Ensure the tests and linter pass
  • Code coverage does not decrease (if any source code was changed)
  • Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> 🦕

@product-auto-label product-auto-label Bot added size: xl Pull request size is extra large. api: bigquerystorage Issues related to the googleapis/nodejs-bigquery-storage API. labels Oct 1, 2022
@snippet-bot
Copy link
Copy Markdown

snippet-bot Bot commented Oct 1, 2022

No region tags are edited in this PR.

This comment is generated by snippet-bot.
If you find problems with this result, please file an issue at:
https://github.com/googleapis/repo-automation-bots/issues.
To update this comment, add snippet-bot:force-run label or use the checkbox below:

  • Refresh this comment

@product-auto-label product-auto-label Bot added size: l Pull request size is large. size: m Pull request size is medium. and removed size: xl Pull request size is extra large. size: l Pull request size is large. labels Oct 1, 2022
@loferris loferris force-pushed the writer_veneer_sandbox branch from 1f1bb09 to 738357f Compare October 4, 2022 20:39
@product-auto-label product-auto-label Bot added size: xl Pull request size is extra large. and removed size: m Pull request size is medium. labels Oct 4, 2022
@product-auto-label product-auto-label Bot added size: m Pull request size is medium. and removed size: xl Pull request size is extra large. labels Oct 4, 2022
@product-auto-label product-auto-label Bot added size: l Pull request size is large. and removed size: m Pull request size is medium. labels Nov 2, 2022
@product-auto-label product-auto-label Bot added size: xl Pull request size is extra large. and removed size: l Pull request size is large. labels Jan 13, 2023
Copy link
Copy Markdown
Contributor

@alvarowolfx alvarowolfx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to introduce some extra layers of abstractions to be able to support different use cases of the Storage Write API ( like creating a WriteStream manually vs using the Default Stream ) and also to reuse stream connections to send rows multiples times to it.

return this._client_closed;
}

async initializeStreamConnection(clientOptions?: CallOptions): Promise<void> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to split this method into two actions: Create a WriteStream ( if applicable, as users can use the DefaultStream and create a "ManagedStream", which is a concept of stream connection open to append rows to it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ManagedStream idea came from the Go implementation. The Python implementation has a similar concept called StreamSession. That class will have a reference to a given streamId, schema/proto descriptor and will have a connection opened to append rows. That connection can be reused.

}
}

async appendRowsToStream(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method would be part of a ManagedStream class, that will keep the connection open until we finish sending rows. The issue right now with this function is that is not reusing the connection that much, because is closing the stream when we get the first response.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method can potentially have a behavior similar to a Promise, where users can call appendRows, returning a "pending write" promise. This way users can call this multiple times and wait for multiple pending writes. Internally the ManagedStream will reuse the same connection and handle them in sequence and mark the pending write as complete. I'll add an example of that on the tests.

Comment thread src/managedwriter/writer_client.ts Outdated
return responses;
}

async closeStream(): Promise<void> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing the connection/stream and committing/finalizing the stream should be separated actions, specially because for Default Stream, they are a Stream of type COMMITED and doesn't support calling finalizeWriteStream, but users would also want to close the connection to end the streaming. For user created WriteStreams of type PENDING, they will call finalize to commit the changes and then close the connection later.

})*/
});

describe('appendRowsToStream', () => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need two versions of this test, one for using an user created WriteStream, and another one using the DefaultStream.

it('should invoke appendRowsToStream without errors', async () => {
     /* 
       Client and Proto initialization, rows creation, etc 
     ....
    */
     
      const streamName = await client.createWriteStream();
      const appendRowsResponsesResult: AppendRowsResponse[] = [
        {
          appendResult: {
            offset: offset,
          },
          writeStream: streamName,
        },
      ];
      try {
        const managedStream = await client.createManagedStream(
          streamName,
          protoDescriptor
        );
        const pw = await managedStream.appendRows(
          {
            serializedRows: [serializedRow1Message, serializedRow2Message],
          },
          offset
        );
        const result = await pw.getResult();
        const responses: AppendRowsResponse[] = [
          {
            appendResult: result.appendResult,
            writeStream: result.writeStream,
          },
        ];

        assert.deepEqual(appendRowsResponsesResult, responses);

        const rowCount = await managedStream.finalize();
        managedStream.close();
        assert.equal(rowCount, 2);

        const commitResponse = await client.batchCommitWriteStream({
          parent: client.getParent(),
          writeStreams: [streamName],
        });
        assert.equal(commitResponse.streamErrors?.length, 0);
      } finally {
        client.close();
      }

      return Promise.resolve();
    });

For Default Stream

it('should invoke appendRows to default stream without errors', async () => {
        /* 
       Client and Proto initialization, rows creation, etc 
      ...
    */

      const appendRowsResponsesResult: AppendRowsResponse[] = [
        {
          appendResult: {
            offset: null,
          },
          writeStream: parent + '/streams/_default',
        },
      ];
      try {
        const managedStream = await client.createManagedStream(
          bigquerywriterModule.managedwriter.DefaultStream,
          protoDescriptor
        );
        const pw = await managedStream.appendRows({
          serializedRows: [serializedRow1Message, serializedRow2Message],
        });
        const result = await pw.getResult();
        const responses: AppendRowsResponse[] = [
          {
            appendResult: result.appendResult,
            writeStream: result.writeStream,
          },
        ];

        assert.deepEqual(appendRowsResponsesResult, responses);

        managedStream.close();
        client.close();
      } finally {
        client.close();
      }

      return Promise.resolve();
    });  

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also test appending rows multiple times and waiting for the proper response:

const appendRowsResponsesResult: AppendRowsResponse[] = [
        {
          appendResult: {
            offset: null,
          },
          writeStream: parent + '/streams/_default',
        },
        {
          appendResult: {
            offset: null,
          },
          writeStream: parent + '/streams/_default',
        },
      ];
      try {
        const managedStream = await client.createManagedStream(
          bigquerywriterModule.managedwriter.DefaultStream,
          protoDescriptor
        );
        const pw1 = await managedStream.appendRows({
          serializedRows: [serializedRow1Message, serializedRow2Message],
        });
        const pw2 = await managedStream.appendRows({
          serializedRows: [serializedRow1Message, serializedRow2Message],
        });
        const results = await Promise.all([
          pw1.getResult(),
          pw2.getResult(),
        ]);
        const responses: AppendRowsResponse[] = results.map(result => (
          {
            appendResult: result.appendResult,
            writeStream: result.writeStream,
          }
        ));

        assert.deepEqual(appendRowsResponsesResult, responses);

        managedStream.close();
        client.close();
      } finally {
        client.close();
      }

      return Promise.resolve();


// eslint-disable-next-line @typescript-eslint/no-unused-vars

describe('managedwriter.WriterClient', () => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for those tests, would be nice to test the backend behavior in different scenarios, not sure if we will be feasible to mock everything in. So as part of the initialization process, we need to set up a test dataset and table. For testing purposes I ended up creating them manually here, but for CI we might need to automate that, similarly on how we do for Samples testing.

.batchCommitWriteStreams(batchCommitWriteStreamsReq)
.then(result => console.log(result));

this._client_closed = true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: client_closed should be clientClosed

};
type streamConnectionsMap = Record<string, gax.CancellableStream>;
type StreamConnections = {
connection_list: StreamConnection[];
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: connection_list should be connectionList

private _writeStreamType: WriteStream['type'] = 'TYPE_UNSPECIFIED';
private _streamId: string;
private _client: BigQueryWriteClient;
private _connections: StreamConnections;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of keeping the raw connection directly here, we can keep a reference to the ManagedStream as I comment more below.

@meredithslota
Copy link
Copy Markdown
Contributor

Closing in favor of #328

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

api: bigquerystorage Issues related to the googleapis/nodejs-bigquery-storage API. size: xl Pull request size is extra large.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants