Conversation
No region tags are edited in this PR.This comment is generated by snippet-bot.
|
…into writer_veneer_sandbox merging local and remote
1f1bb09 to
738357f
Compare
alvarowolfx
left a comment
There was a problem hiding this comment.
I think we need to introduce some extra layers of abstractions to be able to support different use cases of the Storage Write API ( like creating a WriteStream manually vs using the Default Stream ) and also to reuse stream connections to send rows multiples times to it.
| return this._client_closed; | ||
| } | ||
|
|
||
| async initializeStreamConnection(clientOptions?: CallOptions): Promise<void> { |
There was a problem hiding this comment.
I think we need to split this method into two actions: Create a WriteStream ( if applicable, as users can use the DefaultStream and create a "ManagedStream", which is a concept of stream connection open to append rows to it.
There was a problem hiding this comment.
The ManagedStream idea came from the Go implementation. The Python implementation has a similar concept called StreamSession. That class will have a reference to a given streamId, schema/proto descriptor and will have a connection opened to append rows. That connection can be reused.
| } | ||
| } | ||
|
|
||
| async appendRowsToStream( |
There was a problem hiding this comment.
This method would be part of a ManagedStream class, that will keep the connection open until we finish sending rows. The issue right now with this function is that is not reusing the connection that much, because is closing the stream when we get the first response.
There was a problem hiding this comment.
This method can potentially have a behavior similar to a Promise, where users can call appendRows, returning a "pending write" promise. This way users can call this multiple times and wait for multiple pending writes. Internally the ManagedStream will reuse the same connection and handle them in sequence and mark the pending write as complete. I'll add an example of that on the tests.
| return responses; | ||
| } | ||
|
|
||
| async closeStream(): Promise<void> { |
There was a problem hiding this comment.
Closing the connection/stream and committing/finalizing the stream should be separated actions, specially because for Default Stream, they are a Stream of type COMMITED and doesn't support calling finalizeWriteStream, but users would also want to close the connection to end the streaming. For user created WriteStreams of type PENDING, they will call finalize to commit the changes and then close the connection later.
| })*/ | ||
| }); | ||
|
|
||
| describe('appendRowsToStream', () => { |
There was a problem hiding this comment.
We need two versions of this test, one for using an user created WriteStream, and another one using the DefaultStream.
it('should invoke appendRowsToStream without errors', async () => {
/*
Client and Proto initialization, rows creation, etc
....
*/
const streamName = await client.createWriteStream();
const appendRowsResponsesResult: AppendRowsResponse[] = [
{
appendResult: {
offset: offset,
},
writeStream: streamName,
},
];
try {
const managedStream = await client.createManagedStream(
streamName,
protoDescriptor
);
const pw = await managedStream.appendRows(
{
serializedRows: [serializedRow1Message, serializedRow2Message],
},
offset
);
const result = await pw.getResult();
const responses: AppendRowsResponse[] = [
{
appendResult: result.appendResult,
writeStream: result.writeStream,
},
];
assert.deepEqual(appendRowsResponsesResult, responses);
const rowCount = await managedStream.finalize();
managedStream.close();
assert.equal(rowCount, 2);
const commitResponse = await client.batchCommitWriteStream({
parent: client.getParent(),
writeStreams: [streamName],
});
assert.equal(commitResponse.streamErrors?.length, 0);
} finally {
client.close();
}
return Promise.resolve();
});
For Default Stream
it('should invoke appendRows to default stream without errors', async () => {
/*
Client and Proto initialization, rows creation, etc
...
*/
const appendRowsResponsesResult: AppendRowsResponse[] = [
{
appendResult: {
offset: null,
},
writeStream: parent + '/streams/_default',
},
];
try {
const managedStream = await client.createManagedStream(
bigquerywriterModule.managedwriter.DefaultStream,
protoDescriptor
);
const pw = await managedStream.appendRows({
serializedRows: [serializedRow1Message, serializedRow2Message],
});
const result = await pw.getResult();
const responses: AppendRowsResponse[] = [
{
appendResult: result.appendResult,
writeStream: result.writeStream,
},
];
assert.deepEqual(appendRowsResponsesResult, responses);
managedStream.close();
client.close();
} finally {
client.close();
}
return Promise.resolve();
});
There was a problem hiding this comment.
We can also test appending rows multiple times and waiting for the proper response:
const appendRowsResponsesResult: AppendRowsResponse[] = [
{
appendResult: {
offset: null,
},
writeStream: parent + '/streams/_default',
},
{
appendResult: {
offset: null,
},
writeStream: parent + '/streams/_default',
},
];
try {
const managedStream = await client.createManagedStream(
bigquerywriterModule.managedwriter.DefaultStream,
protoDescriptor
);
const pw1 = await managedStream.appendRows({
serializedRows: [serializedRow1Message, serializedRow2Message],
});
const pw2 = await managedStream.appendRows({
serializedRows: [serializedRow1Message, serializedRow2Message],
});
const results = await Promise.all([
pw1.getResult(),
pw2.getResult(),
]);
const responses: AppendRowsResponse[] = results.map(result => (
{
appendResult: result.appendResult,
writeStream: result.writeStream,
}
));
assert.deepEqual(appendRowsResponsesResult, responses);
managedStream.close();
client.close();
} finally {
client.close();
}
return Promise.resolve();
|
|
||
| // eslint-disable-next-line @typescript-eslint/no-unused-vars | ||
|
|
||
| describe('managedwriter.WriterClient', () => { |
There was a problem hiding this comment.
I think for those tests, would be nice to test the backend behavior in different scenarios, not sure if we will be feasible to mock everything in. So as part of the initialization process, we need to set up a test dataset and table. For testing purposes I ended up creating them manually here, but for CI we might need to automate that, similarly on how we do for Samples testing.
| .batchCommitWriteStreams(batchCommitWriteStreamsReq) | ||
| .then(result => console.log(result)); | ||
|
|
||
| this._client_closed = true; |
There was a problem hiding this comment.
nit: client_closed should be clientClosed
| }; | ||
| type streamConnectionsMap = Record<string, gax.CancellableStream>; | ||
| type StreamConnections = { | ||
| connection_list: StreamConnection[]; |
There was a problem hiding this comment.
nit: connection_list should be connectionList
| private _writeStreamType: WriteStream['type'] = 'TYPE_UNSPECIFIED'; | ||
| private _streamId: string; | ||
| private _client: BigQueryWriteClient; | ||
| private _connections: StreamConnections; |
There was a problem hiding this comment.
instead of keeping the raw connection directly here, we can keep a reference to the ManagedStream as I comment more below.
…bigquery-storage into writer_veneer_sandbox merge local and remote
|
Closing in favor of #328 |
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes #<issue_number_goes_here> 🦕