Bc/bulk error debug#1330
Conversation
…e into bc/bulk-error
d144bfe to
c716b54
Compare
schmidt-sebastian
left a comment
There was a problem hiding this comment.
Thanks for the update. We are pretty close. Feel free to ignore feedback that is contradicting :)
| export function voidPromise(promise: Promise<unknown>): Promise<void> { | ||
| return promise.then( | ||
| () => {}, | ||
| () => {} | ||
| ); | ||
| } |
There was a problem hiding this comment.
The scary part of this is that it swallows all errors. This is what you want, but it should apparent from the name.
| * | ||
| * @private | ||
| */ | ||
| _addOperation( |
There was a problem hiding this comment.
Now unused.
| * | ||
| * @private | ||
| */ | ||
| get _lastOp(): PendingWriteOp { |
There was a problem hiding this comment.
Now unused.
| }); | ||
| } | ||
|
|
||
| private wrapRetry( |
There was a problem hiding this comment.
Please add a method comment.
| private wrapRetry( | ||
| promise: Promise<WriteResult>, | ||
| documentRef: firestore.DocumentReference, | ||
| retryCount: number, |
There was a problem hiding this comment.
Again, not sure what this means.
| } else { | ||
| throw bulkWriterError; | ||
| } | ||
| }); |
There was a problem hiding this comment.
Nit: This might be slightly prettier if we used try/catch with async/await here.
There was a problem hiding this comment.
good idea. Looks much cleaner and easier to understand.
| documentRef: firestore.DocumentReference, | ||
| retryCount: number, | ||
| operationType: 'create' | 'set' | 'update' | 'delete', | ||
| retryFn: () => Promise<WriteResult> |
There was a problem hiding this comment.
This is the original function isn't it? Could we just call it in this method ourselves, which would drop the promise argument.
d3cd6d0 to
6ca67f9
Compare
Codecov Report
@@ Coverage Diff @@
## master #1330 +/- ##
==========================================
+ Coverage 98.48% 98.51% +0.02%
==========================================
Files 32 32
Lines 19108 19282 +174
Branches 1377 1382 +5
==========================================
+ Hits 18819 18995 +176
+ Misses 286 284 -2
Partials 3 3
Continue to review full report at Codecov.
|
| // on end users. | ||
| // eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
| get(field: string | FieldPath): any { | ||
| console.error('current field', field); |
There was a problem hiding this comment.
Please remove.
| * This is primarily used to wait for a promise to complete when the result of | ||
| * the promise will be discarded. | ||
| */ | ||
| export function voidPromiseNoThrow(promise: Promise<unknown>): Promise<void> { |
There was a problem hiding this comment.
Optional suggestion: s/voidPromiseNoThrow/silence?
| * new operations can be enqueued, except for retry operations scheduled by | ||
| * the error handler. | ||
| */ | ||
| private _closed = false; |
There was a problem hiding this comment.
s/_closed/_closing based on your comment.
| * fails. | ||
| */ | ||
| private _errorFn: (error: BulkWriterError) => boolean = error => { | ||
| const retryCodes = getRetryCodes('batchWrite'); |
There was a problem hiding this comment.
Regarding logging since the comment thread doesn't show up:
I think we need to add logging, otherwise we have no insight into what happens and why writes get re-scheduled. I am not worried about this being noisy, but we should log to the log function and not to console.log.
Note that a user-provided error handler would also overwrite this.
| * bulkWriter | ||
| * .onWriteError((error) => { | ||
| * if ( | ||
| * shouldRetry(error.code) && |
There was a problem hiding this comment.
Can you just use some hardcoded error check (error.code === GrpcStatus.UNAVAILABLE) in this example?
|
|
||
| this.sendReadyBatches(); | ||
| if (batchQueue === this._retryBatchQueue) { | ||
| this._retryBatchQueue.forEach(batch => batch.markReadyToSend()); |
There was a problem hiding this comment.
Who is going to send these batches? Don't we need to schedule a commit here so that the retries work even without flush()? If we do so, we might not need to retry in _flush().
There was a problem hiding this comment.
Changed to power all retry batch scheduling to sendBatch(). Now, flush() schedules writes on the batchQueue, then kicks off the retryBatch if there are retries. Subsequent retries are now handled when the first retry batch comes back.
Also, I moved the markReadyToSend() mapping for retryBatches into sendReadyBatches(). There's two call points (flush and sendBatch) that would require a forEach() call, and I got bitten several times by forgetting to mark the batches as ready to send.
| documentRef: firestore.DocumentReference, | ||
| operationType: 'create' | 'set' | 'update' | 'delete', | ||
| operationFn: (bulkCommitBatch: BulkCommitBatch) => Promise<WriteResult> | ||
| ): Promise<WriteResult> { | ||
| this.verifyNotClosed(); | ||
|
|
||
| // A deferred promise that resolves when operationFn completes. | ||
| const operationCompletedDeferred = new Deferred<void>(); | ||
| this._pendingOps.add(operationCompletedDeferred.promise); | ||
|
|
||
| const op = this.runOperationWithRetries( | ||
| documentRef, | ||
| operationType, | ||
| operationFn, | ||
| /* failedAttempts= */ 0 | ||
| ); | ||
|
|
||
| this.sendReadyBatches(); | ||
| return op | ||
| .then(res => { | ||
| operationCompletedDeferred.resolve(); | ||
| this._pendingOps.delete(operationCompletedDeferred.promise); | ||
| return res; | ||
| }) | ||
| .catch(err => { | ||
| operationCompletedDeferred.resolve(); | ||
| this._pendingOps.delete(operationCompletedDeferred.promise); | ||
| throw err; | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Runs the provided operation and retries on failure until the error | ||
| * callback says not to. | ||
| */ | ||
| private async runOperationWithRetries( | ||
| documentRef: firestore.DocumentReference, | ||
| operationType: 'create' | 'set' | 'update' | 'delete', | ||
| operationFn: (bulkCommitBatch: BulkCommitBatch) => Promise<WriteResult>, | ||
| failedAttempts: number | ||
| ): Promise<WriteResult> { | ||
| try { | ||
| const batchQueue = | ||
| failedAttempts > 0 ? this._retryBatchQueue : this._batchQueue; | ||
| const bulkCommitBatch = this.getEligibleBatch(batchQueue); | ||
| const operationResult = await operationFn(bulkCommitBatch); | ||
|
|
||
| this._successFn(documentRef, operationResult); | ||
| return operationResult; | ||
| } catch (error) { | ||
| const bulkWriterError = new BulkWriterError( | ||
| error.code, | ||
| error.message, | ||
| documentRef, | ||
| operationType, | ||
| failedAttempts | ||
| ); | ||
|
|
||
| const shouldRetry = this._errorFn(bulkWriterError); | ||
| if (shouldRetry) { | ||
| return this.runOperationWithRetries( | ||
| documentRef, | ||
| operationType, | ||
| operationFn, | ||
| failedAttempts + 1 | ||
| ); | ||
| } else { | ||
| throw bulkWriterError; | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Simplification suggestion:
| documentRef: firestore.DocumentReference, | |
| operationType: 'create' | 'set' | 'update' | 'delete', | |
| operationFn: (bulkCommitBatch: BulkCommitBatch) => Promise<WriteResult> | |
| ): Promise<WriteResult> { | |
| this.verifyNotClosed(); | |
| // A deferred promise that resolves when operationFn completes. | |
| const operationCompletedDeferred = new Deferred<void>(); | |
| this._pendingOps.add(operationCompletedDeferred.promise); | |
| const op = this.runOperationWithRetries( | |
| documentRef, | |
| operationType, | |
| operationFn, | |
| /* failedAttempts= */ 0 | |
| ); | |
| this.sendReadyBatches(); | |
| return op | |
| .then(res => { | |
| operationCompletedDeferred.resolve(); | |
| this._pendingOps.delete(operationCompletedDeferred.promise); | |
| return res; | |
| }) | |
| .catch(err => { | |
| operationCompletedDeferred.resolve(); | |
| this._pendingOps.delete(operationCompletedDeferred.promise); | |
| throw err; | |
| }); | |
| } | |
| /** | |
| * Runs the provided operation and retries on failure until the error | |
| * callback says not to. | |
| */ | |
| private async runOperationWithRetries( | |
| documentRef: firestore.DocumentReference, | |
| operationType: 'create' | 'set' | 'update' | 'delete', | |
| operationFn: (bulkCommitBatch: BulkCommitBatch) => Promise<WriteResult>, | |
| failedAttempts: number | |
| ): Promise<WriteResult> { | |
| try { | |
| const batchQueue = | |
| failedAttempts > 0 ? this._retryBatchQueue : this._batchQueue; | |
| const bulkCommitBatch = this.getEligibleBatch(batchQueue); | |
| const operationResult = await operationFn(bulkCommitBatch); | |
| this._successFn(documentRef, operationResult); | |
| return operationResult; | |
| } catch (error) { | |
| const bulkWriterError = new BulkWriterError( | |
| error.code, | |
| error.message, | |
| documentRef, | |
| operationType, | |
| failedAttempts | |
| ); | |
| const shouldRetry = this._errorFn(bulkWriterError); | |
| if (shouldRetry) { | |
| return this.runOperationWithRetries( | |
| documentRef, | |
| operationType, | |
| operationFn, | |
| failedAttempts + 1 | |
| ); | |
| } else { | |
| throw bulkWriterError; | |
| } | |
| } | |
| } | |
| private async runOperation( | |
| documentRef: firestore.DocumentReference, | |
| operationType: 'create' | 'set' | 'update' | 'delete', | |
| operationFn: (bulkCommitBatch: BulkCommitBatch) => Promise<WriteResult> | |
| ): Promise<WriteResult> { | |
| this.verifyNotClosed(); | |
| this.sendReadyBatches(); | |
| for (let failedAttempts = 0; ; ++failedAttempts) { | |
| // A deferred promise that resolves when operationFn completes. | |
| const operationCompletedDeferred = new Deferred<void>(); | |
| this._pendingOps.add(operationCompletedDeferred.promise); | |
| const batchQueue = | |
| failedAttempts > 0 ? this._retryBatchQueue : this._batchQueue; | |
| const bulkCommitBatch = this.getEligibleBatch(batchQueue); | |
| try { | |
| const operationResult = await operationFn(bulkCommitBatch); | |
| this._successFn(documentRef, operationResult); | |
| return operationResult; | |
| } catch (error) { | |
| const bulkWriterError = new BulkWriterError( | |
| error.code, | |
| error.message, | |
| documentRef, | |
| operationType, | |
| failedAttempts | |
| ); | |
| const shouldRetry = this._errorFn(bulkWriterError); | |
| if (!shouldRetry) { | |
| throw bulkWriterError; | |
| } | |
| } finally { | |
| this._pendingOps.delete(operationCompletedDeferred.promise); | |
| } | |
| } | |
| } |
There was a problem hiding this comment.
Man... I remember staring at my code trying to cut it down, but you've done it!
Had to make a couple of changes
- move out operationCompletedDeferred to outside of the for-loop. The implementation here resolved the pendingOp after each retry loop.
- move out
verifyNotClosed()so an error is thrown instead of a rejected promise. - move
sendReadyBatches()into the for loop since it needs to wait forgetEligibleBatch()to create the new batch (for the first batch). - resolved the deferred promise at the end.
| * @param data The object to serialize as the document. | ||
| * @returns A promise that resolves with the result | ||
| * of the write. Throws an error if the write fails. | ||
| * @returns {Promise<WriteResult>} A promise that resolves with the result |
There was a problem hiding this comment.
No need to include the return type in TypeScript. You should keep the type of the rejected result, since that is not apparent here.
Co-authored-by: Sebastian Schmidt <[email protected]>
schmidt-sebastian
left a comment
There was a problem hiding this comment.
This is pretty good. I think we should verify some of the assumptions we made at API discussion, but we can probably take this back to the original PR
| error.code !== undefined && | ||
| retryCodes.includes(error.code) && | ||
| error.failedAttempts < MAX_RETRY_ATTEMPTS; | ||
| logger( |
There was a problem hiding this comment.
As proposed in API meeting, we should move this logging to the callsite. This will also allow to us to log user-customized retries.
| let batchQueue = this._batchQueue; | ||
| batchQueue.forEach(batch => batch.markReadyToSend()); | ||
|
|
||
| // Send all scheduled operations on the BatchQueue first. | ||
| this.sendReadyBatches(batchQueue); | ||
| await Promise.all(this._pendingBatches); | ||
|
|
||
| // Afterwards, send all accumulated retry operations. Wait until the | ||
| // retryBatchQueue is cleared. This way, operations scheduled after | ||
| // flush() will not be sent until the retries are completed. | ||
| batchQueue = this._retryBatchQueue; | ||
| if (batchQueue.length > 0) { | ||
| this.sendReadyBatches(batchQueue); | ||
| } |
There was a problem hiding this comment.
I wonder if this logic can be combined. The logic for the two different lists is essentially the same.
There was a problem hiding this comment.
I could combine it into a two-loop for-loop, but that doesn't feel as readable. Any suggestions?
| // Always mark retry batches as READY_TO_SEND. | ||
| if (batchQueue === this._retryBatchQueue) { | ||
| batchQueue.forEach(batch => batch.markReadyToSend()); | ||
| } |
There was a problem hiding this comment.
This seems out of place here. Can we do this at the callsite?
There was a problem hiding this comment.
There are two callsites, which is why i originally moved it into here. Moved it back out.
| } finally { | ||
| operationCompletedDeferred.resolve(); | ||
| this._pendingOps.delete(operationCompletedDeferred.promise); | ||
| } |
There was a problem hiding this comment.
I think the double try/catch block is a bit harder to parse then just calling these two lines twice.
There was a problem hiding this comment.
The operationCompletedDeferred must resolve after the error is thrown, or else the flush and write promises resolve out of order. Given that, I don't see any good way to do so other than moving the outer try block into a separate function that encapsulates this one, or having duplicate code to handle this in each operation.
| process.on('unhandledRejection', () => { | ||
| errorThrown = true; | ||
| unhandledDeferred.resolve(); | ||
| }); |
There was a problem hiding this comment.
Let's confirm with Yoshi team whether it would be ok to swallow these.
There was a problem hiding this comment.
SG, started a conversation with them in the node room.
…restore into bc/bulk-error-debug
|
Closing this as well are moving back to the original PR. |
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes #<issue_number_goes_here> 🦕