test: test ReadRows logic with local gRPC server#1282
test: test ReadRows logic with local gRPC server#1282alexander-fenster merged 10 commits intomainfrom
Conversation
49cf82b to
318aeff
Compare
leahecole
left a comment
There was a problem hiding this comment.
Added some comments about comments! Thanks for such a helpful PR description. Also double check there are some GHA lint warnings
|
|
||
| it('should create read stream and read synchronously', done => { | ||
| const keyFrom = 0; | ||
| const keyTo = 1000; |
There was a problem hiding this comment.
Is 1000 a particularly special number that triggers particular behavior? If so consider a quick comment about why that value is used
There was a problem hiding this comment.
For the test that currently fails, 1000 is big enough to trigger the problem, and it does not happen with e.g. 10 or 100. I'll add comments.
| it('should be able to stop reading from the read stream', done => { | ||
| const keyFrom = 0; | ||
| const keyTo = 1000; | ||
| const stopAfter = 42; |
There was a problem hiding this comment.
Consider adding a comment about why 42 is the chosen "stopAfter" value if there's anything of note about it
There was a problem hiding this comment.
It's random (and also the answer to life the universe and everything). I'll note that!
| import {GoogleError, Status} from 'google-gax'; | ||
|
|
||
| const valueSize = 1024 * 1024; | ||
| const chunkSize = 1023 * 1024 - 1; // make it uneven |
There was a problem hiding this comment.
-
Clarification when you say "make it uneven" do you mean make it an odd number, make it offset from valueSize by one, or make it not a multiple of 1024?
-
Double checking that this should be 1023*1024 - I think it should, because the number of chunks would logically be smaller than the number of values but I am just wanted to double check 🙂
There was a problem hiding this comment.
Just making it so that one row occupies multiple chunks (2 in this case), and that these chunks have different sizes.
| if (chunkIdx === errorAfterChunkNo) { | ||
| debugLog(`sending error after chunk #${chunkIdx}`); | ||
| errorAfterChunkNo = undefined; // do not send error for the second time | ||
| const error = new GoogleError('Uh oh'); |
There was a problem hiding this comment.
lol, love this test error message
| let chunksSent = 0; | ||
| const chunks = generateChunks(keyFrom, keyTo, stream); | ||
| let lastScannedRowKey: string | undefined; | ||
| let firstN: protos.google.bigtable.v2.ReadRowsResponse.ICellChunk[] = []; |
There was a problem hiding this comment.
Should we rename this variable to downstreamChunks or something?
There was a problem hiding this comment.
Yeah I was struggling with finding a good name, since chunks is already used :) downstreamChunks or responseChunks maybe. I'll rename.
There was a problem hiding this comment.
I renamed to currentResponseChunks, does it make more sense now?
dfc75a4 to
3f2385b
Compare
| import {MockService} from '../src/util/mock-servers/mock-service'; | ||
| import {debugLog, readRowsImpl} from './utils/readRowsImpl'; | ||
|
|
||
| describe('Bigtable/Streams', () => { |
There was a problem hiding this comment.
Please update to Bigtable/ReadRows as well
In this PR I'm adding some tests for
ReadRowslogic. This is a pretty straightforward but rather big PR, so please bear with me while I explain what's going on here.I'm reusing @danieljbruce's awesome implementation of the local gRPC server (#1090) to present a reasonable mock for
ReadRowsintest/utils/readRowsImpl.ts. This new mock generates rows with incremental keys which are just numbers converted to strings and padded with zeros, e.g. forreadRowsImpl(0, 3)the generated keys will be00000000,00000001,00000002. The rows are unevenly split into chunks, and chunks are grouped into response messages that are sent over the server stream.There is some primitive support for range queries which is important for stream retries, and it's also possible to cancel the stream and request an error to be emitted.
The server implementation I suggest in this PR is backpressure-aware (checks return value of
stream.write(...)), and - which is the most crucial part - is asynchronous, imitating the behavior of a remote gRPC server; by that I mean that allstream.write(...)calls are being sent after a zerosetTimeoutto move them to the next event loop iteration. This was the main missing piece to reliably reproduce an issue described in #607.Now, the new test file I'm adding checks five basic use cases:
The first test case runs the very basic scenario when the table scan is requested and the result is consumed using the regular
.on('data', ...)event handler. It works.The second test case pipes the read stream to a
Transformstream, which then pipes it to aPassThrough, but all components run synchronously. It works as well.The third test case does the same, but the
Transformstream usessetTimeoutto delay processing of each row, triggering the issue described in #607. Since this test currently fails, it's skipped.The fourth test cases stops streaming from the user's code by calling
.end()on a read stream; it works (with one caveat caused by grpc/grpc-node#2446 which required me to set a custom timeout for that server call).Now, the fifth test looks like a new issue for me. It tests the case when the server emits a retryable
errorevent, and from what I see, this test case uncovers a problem in theChunkTransformerimplementation where it prematurely updateslastRowKeybefore this row is actually fully received and committed. We can discuss it offline and fix it if it's indeed a bug.I suggest merging this PR which will help us fix the code (and un-skip the two tests in this PR whenever they start passing). As I said, it's a lot of code, but mostly straightforward, so I will really appreciate some extra 👀 :)
Thanks folks!