feat: add onWriteError() and onWriteResult() handlers to BulkWriter#1315
feat: add onWriteError() and onWriteResult() handlers to BulkWriter#1315thebrianchen merged 19 commits intomasterfrom
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1315 +/- ##
==========================================
- Coverage 98.56% 98.55% -0.02%
==========================================
Files 32 32
Lines 19115 19274 +159
Branches 1373 1386 +13
==========================================
+ Hits 18841 18995 +154
- Misses 271 276 +5
Partials 3 3
Continue to review full report at Codecov.
|
schmidt-sebastian
left a comment
There was a problem hiding this comment.
This looks good. Before you address the feedback, we should see if we want to incorporate @bcoe's API suggestion.
| const pendingOp = bulkCommitBatch.getLastOp(); | ||
| resultPromise.catch(err => { | ||
| const operation = new BulkWriterOperation( | ||
| documentRef, | ||
| 'create', | ||
| pendingOp | ||
| ); | ||
| this.errorFn(err, operation); | ||
| }); |
There was a problem hiding this comment.
Ideally, we would only attach our own Promise if errorFn is defined, but that would require users to register an error function before any other API call. I think that we could re-throw the original exception here though if the error callback is not defined. This can be a simple change if you change the default error function to just throw.
There was a problem hiding this comment.
I don't fully follow along here. If we change the default function to throw, that would cause an UnhandledPromiseRejection to surface again. Isn't the point of attaching a .catch() to the existing promise to prevent the UnhandledPromiseRejection from being thrown in the first place?
There was a problem hiding this comment.
Something like:
const defaultCatch = (e) => throw e;
set() {
this.op().catch(() => defaultCatch())
}
onError(errorFn) {
defaultCatch = errorFn;
}
This should avoid the "uncaught Promise rejection" handling only when onError is called.
…e into bc/bulk-error
| * @param {Timestamp=} precondition.lastUpdateTime If set, enforces that the | ||
| * document was last updated at lastUpdateTime. Fails the batch if the | ||
| * document doesn't exist or was last updated at a different time. | ||
| * document doesn't exist or was last updatedra at a different time. |
| if (this.batchQueue.length > 0) { | ||
| await this._flush(); | ||
| await this.closedDeferred.promise; | ||
| } |
There was a problem hiding this comment.
I don't think I see how it's possible for this to be the case after awaiting the flush, above, because it's not possible to add retries after this.closeCalled = true, right?
If it is expected, does this need to be in a loop?
There was a problem hiding this comment.
I now see that verifyNotClosed only throws if fully closed when called from retry. In which case, I think my second question still stands. Do we know that the await this._flush() won't cause more retries? Do we either need to do this in a loop or at least await another flush after this.closeDeferred.promise resolves?
There was a problem hiding this comment.
I think we can use Promise chaining here as proposed in the diff above. Instead of requiring void as the return type of onWriteError, we can require the user to call retry and to pass the result of retry to use via the error handler. The signature would then be:
onWriteError((err:BulkWriterError) => Promise<WriteResult>)
Once we do that, we can chain the result of this error callback and block on the retry attempt in the flush call.
If we don't do this, we have to somehow ensure that all error callbacks have fired once we process the results. This would be a bit tricky and may not port to Java or other languages:
error.reject(...);
setImmediate(() => {
// This only works in Node, but due to the way that ticks are processed in the VM, we can use a `setImmediate`
// to ensure that our code only runs once all Promise chains are processed.
});
There was a problem hiding this comment.
That would be elegant, I think. The ultimate result of the original write would become the result of the retry. It would just be important to document well so that callers don't forget to return the result of retry() from their error handler or the retry promise would end up unhandled and the original write would resolve with undefined.
| validateOptional, | ||
| } from './validate'; | ||
| // TODO: How do I get rid of this? | ||
| // eslint-disable-next-line no-undef |
There was a problem hiding this comment.
The idea here is that you can use GrpcStatus in the typings file and the Gax status in code. They are interchangeable.
There was a problem hiding this comment.
I think this is a necessary evil unfortunately.
| readonly operation: BulkWriterOperation, | ||
| readonly code: GrpcStatus, | ||
| readonly message: string |
There was a problem hiding this comment.
Do these show up in JSDoc? Should we add comments?
| * opened again. | ||
| */ | ||
| private closed = false; | ||
| private isClosed = false; |
There was a problem hiding this comment.
We may want to align this with the status codes in BulkWriterBatch and use an enum (either a new enum or a specific enum for BulkWriter (open/closing/closed)).
| options?: firestore.BulkWriterOptions | ||
| ) { | ||
| this.firestore._incrementBulkWritersCount(); | ||
| this.errorFn = () => {}; |
There was a problem hiding this comment.
By default, you probably want this error function to throw. You still want "Unhandled Promise Rejections" when there is no error handler and no other callback attached.
| .catch(err => { | ||
| this.errorFn(err); | ||
| }); |
There was a problem hiding this comment.
I think you want to return the Promise that has the error handler attached. That way, the default error handler can re-throw and any user specified error handler can prevent Unhandled Promise Rejections.
| private async _flush(): Promise<void> { | ||
| const trackedBatches = this.batchQueue; | ||
| const writePromises = trackedBatches.map(batch => batch.awaitBulkCommit()); | ||
| this.sendReadyBatches(); |
There was a problem hiding this comment.
Random thought: Should the already build-in retry use the new Public APIs? This would simplify some of the code a bit (https://gist.github.com/schmidt-sebastian/302c19baa2eda89b23435da41afb077f) and it would allow us to use consistent backoff for all retries.
| if (this.batchQueue.length > 0) { | ||
| await this._flush(); | ||
| await this.closedDeferred.promise; | ||
| } |
There was a problem hiding this comment.
I think we can use Promise chaining here as proposed in the diff above. Instead of requiring void as the return type of onWriteError, we can require the user to call retry and to pass the result of retry to use via the error handler. The signature would then be:
onWriteError((err:BulkWriterError) => Promise<WriteResult>)
Once we do that, we can chain the result of this error callback and block on the retry attempt in the flush call.
If we don't do this, we have to somehow ensure that all error callbacks have fired once we process the results. This would be a bit tricky and may not port to Java or other languages:
error.reject(...);
setImmediate(() => {
// This only works in Node, but due to the way that ticks are processed in the VM, we can use a `setImmediate`
// to ensure that our code only runs once all Promise chains are processed.
});
| private verifyNotClosed(checkIsClosed = false): void { | ||
| if (!checkIsClosed && this.closeCalled) { | ||
| throw new Error('BulkWriter has already been closed.'); | ||
| } | ||
|
|
||
| if (checkIsClosed && this.isClosed) { | ||
| throw new Error('BulkWriter has already been closed.'); | ||
| } | ||
| } |
There was a problem hiding this comment.
Should these be two different functions?
|
@tilgovi Quick update -- Still working on it in a separate branch #1330. At this point, I have the auto-retry functionality working. |
Remove usage of BulkWriterOperation
|
All (the pull request submitter and all commit authors) CLAs are signed, but one or more commits were authored or co-authored by someone other than the pull request submitter. We need to confirm that all authors are ok with their commits being contributed to this project. Please have them confirm that by leaving a comment that contains only Note to project maintainer: There may be cases where the author cannot leave a comment, or the comment is not properly detected as consent. In those cases, you can manually confirm consent of the commit author(s), and set the ℹ️ Googlers: Go here for more info. |
9b4fd13 to
166952b
Compare
schmidt-sebastian
left a comment
There was a problem hiding this comment.
Whee. Very nice. Some nits left but it looks like we are basically good to go
| validateOptional, | ||
| } from './validate'; | ||
| // TODO: How do I get rid of this? | ||
| // eslint-disable-next-line no-undef |
| } from './validate'; | ||
| // TODO(chenbrian): Figure some way to get rid of this. | ||
| // eslint-disable-next-line no-undef | ||
| import GrpcStatus = FirebaseFirestore.GrpcStatus; |
There was a problem hiding this comment.
Please move this line below all other import. This will help us pretend that we care about import statement order.
| writeBatchIndex: this.opCount, | ||
| key: documentRef.path, | ||
| deferred: deferred, | ||
| }); |
There was a problem hiding this comment.
Could pendingOps be an array of writeBatchIndex to deferred (and not a list of these elements)?
| /** | ||
| * Schedules and runs the provided operation. | ||
| */ | ||
| private async _executeWrite( | ||
| documentRef: firestore.DocumentReference, | ||
| operationType: 'create' | 'set' | 'update' | 'delete', | ||
| operationFn: (bulkCommitBatch: BulkCommitBatch) => Promise<WriteResult> | ||
| ): Promise<WriteResult> { | ||
| // A deferred promise that resolves when operationFn completes. | ||
| const operationCompletedDeferred = new Deferred<void>(); | ||
| this._pendingOps.add(operationCompletedDeferred.promise); | ||
| try { | ||
| for (let failedAttempts = 0; ; ++failedAttempts) { | ||
| const batchQueue = | ||
| failedAttempts > 0 ? this._retryBatchQueue : this._batchQueue; | ||
| const bulkCommitBatch = this.getEligibleBatch(batchQueue); | ||
|
|
||
| // Send ready batches if this is the first attempt. Subsequent retry | ||
| // batches are scheduled after the initial batch returns. | ||
| if (failedAttempts === 0) { | ||
| this.sendReadyBatches(batchQueue); | ||
| } | ||
|
|
||
| try { | ||
| const operationResult = await operationFn(bulkCommitBatch); | ||
| this._successFn(documentRef, operationResult); | ||
| return operationResult; | ||
| } catch (error) { | ||
| const bulkWriterError = new BulkWriterError( | ||
| error.code, | ||
| error.message, | ||
| documentRef, | ||
| operationType, | ||
| failedAttempts | ||
| ); | ||
| const shouldRetry = this._errorFn(bulkWriterError); | ||
| logger( | ||
| 'BulkWriter.errorFn', | ||
| null, | ||
| 'Running error callback on error code:', | ||
| error.code, | ||
| ', shouldRetry:', | ||
| shouldRetry | ||
| ); | ||
| if (!shouldRetry) { | ||
| throw bulkWriterError; | ||
| } | ||
| } | ||
| } | ||
| } finally { | ||
| operationCompletedDeferred.resolve(); | ||
| this._pendingOps.delete(operationCompletedDeferred.promise); | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
My last effort cleanup attempt:
| /** | |
| * Schedules and runs the provided operation. | |
| */ | |
| private async _executeWrite( | |
| documentRef: firestore.DocumentReference, | |
| operationType: 'create' | 'set' | 'update' | 'delete', | |
| operationFn: (bulkCommitBatch: BulkCommitBatch) => Promise<WriteResult> | |
| ): Promise<WriteResult> { | |
| // A deferred promise that resolves when operationFn completes. | |
| const operationCompletedDeferred = new Deferred<void>(); | |
| this._pendingOps.add(operationCompletedDeferred.promise); | |
| try { | |
| for (let failedAttempts = 0; ; ++failedAttempts) { | |
| const batchQueue = | |
| failedAttempts > 0 ? this._retryBatchQueue : this._batchQueue; | |
| const bulkCommitBatch = this.getEligibleBatch(batchQueue); | |
| // Send ready batches if this is the first attempt. Subsequent retry | |
| // batches are scheduled after the initial batch returns. | |
| if (failedAttempts === 0) { | |
| this.sendReadyBatches(batchQueue); | |
| } | |
| try { | |
| const operationResult = await operationFn(bulkCommitBatch); | |
| this._successFn(documentRef, operationResult); | |
| return operationResult; | |
| } catch (error) { | |
| const bulkWriterError = new BulkWriterError( | |
| error.code, | |
| error.message, | |
| documentRef, | |
| operationType, | |
| failedAttempts | |
| ); | |
| const shouldRetry = this._errorFn(bulkWriterError); | |
| logger( | |
| 'BulkWriter.errorFn', | |
| null, | |
| 'Running error callback on error code:', | |
| error.code, | |
| ', shouldRetry:', | |
| shouldRetry | |
| ); | |
| if (!shouldRetry) { | |
| throw bulkWriterError; | |
| } | |
| } | |
| } | |
| } finally { | |
| operationCompletedDeferred.resolve(); | |
| this._pendingOps.delete(operationCompletedDeferred.promise); | |
| } | |
| } | |
| /** | |
| * Schedules and runs the provided operation. | |
| */ | |
| private async _executeWrite( | |
| documentRef: firestore.DocumentReference, | |
| operationType: 'create' | 'set' | 'update' | 'delete', | |
| operationFn: (bulkCommitBatch: BulkCommitBatch) => Promise<WriteResult> | |
| ): Promise<WriteResult> { | |
| let result : Promise<WriteResult> | undefined; | |
| // A deferred promise that resolves when operationFn completes. | |
| const operationCompletedDeferred = new Deferred<void>(); | |
| this._pendingOps.add(operationCompletedDeferred.promise); | |
| let batchQueue = this._batchQueue; | |
| this.sendReadyBatches(batchQueue); | |
| for (let failedAttempts = 0; result != undefined; ++failedAttempts) { | |
| const bulkCommitBatch = this.getEligibleBatch(batchQueue); | |
| try { | |
| const writeResult = await operationFn(bulkCommitBatch); | |
| this._successFn(documentRef, writeResult); | |
| result = Promise.resolve(writeResult); | |
| } catch (error) { | |
| const bulkWriterError = new BulkWriterError( | |
| error.code, | |
| error.message, | |
| documentRef, | |
| operationType, | |
| failedAttempts | |
| ); | |
| const shouldRetry = this._errorFn(bulkWriterError); | |
| logger( | |
| 'BulkWriter.errorFn', | |
| null, | |
| 'Running error callback on error code:', | |
| error.code, | |
| ', shouldRetry:', | |
| shouldRetry | |
| ); | |
| if (shouldRetry) { | |
| batchQueue = this._retryBatchQueue; | |
| } else { | |
| result = Promise.reject(bulkWriterError); | |
| } | |
| } | |
| } | |
| result.then(() => { | |
| operationCompletedDeferred.resolve(); | |
| this._pendingOps.delete(operationCompletedDeferred.promise); | |
| }); | |
| return result!; | |
| } |
Let me know what you think.
There was a problem hiding this comment.
post discussion: The main issue with this implementation is that if the user error callback throws an error, the deferred operation will never be resolved. The double try/catch blocks, while ugly, serve to ensure that the deferred operation is always removed.
Co-authored-by: Sebastian Schmidt <[email protected]>
Fixes #1282.
onWriteError()error andonWriteResult()success handlers to BulkWriter instances.BulkWriter.retry(operation: BulkWriterOperation)method to retry failed operations.BulkWriterErrorthat contain an underlyingBulkWriterOperation.