@@ -61,15 +61,13 @@ public interface WriteErrorCallback {
6161 * @param error The error object from the failed BulkWriter operation attempt.
6262 * @return Whether to retry the operation or not.
6363 */
64- boolean shouldRetryError (BulkWriterError error );
64+ boolean onError (BulkWriterError error );
6565 }
6666
6767 private interface BulkWriterOperationCallback {
6868 ApiFuture <WriteResult > apply (BulkCommitBatch batch );
6969 }
7070
71- private final Executor userCallbackExecutor ;
72-
7371 enum OperationType {
7472 CREATE ,
7573 SET ,
@@ -155,7 +153,7 @@ public void onResult(DocumentReference documentReference, WriteResult result) {}
155153
156154 private WriteErrorCallback errorListener =
157155 new WriteErrorCallback () {
158- public boolean shouldRetryError (BulkWriterError error ) {
156+ public boolean onError (BulkWriterError error ) {
159157 if (error .getFailedAttempts () == MAX_RETRY_ATTEMPTS ) {
160158 return false ;
161159 }
@@ -169,14 +167,17 @@ public boolean shouldRetryError(BulkWriterError error) {
169167 }
170168 };
171169
172- private final FirestoreImpl firestore ;
170+ private Executor successExecutor ;
171+ private Executor errorExecutor ;
173172
173+ private final FirestoreImpl firestore ;
174174 private final ScheduledExecutorService firestoreExecutor ;
175175
176176 BulkWriter (FirestoreImpl firestore , BulkWriterOptions options ) {
177177 this .firestore = firestore ;
178178 this .firestoreExecutor = firestore .getClient ().getExecutor ();
179- this .userCallbackExecutor = Context .currentContextExecutor (this .firestoreExecutor );
179+ this .successExecutor = Context .currentContextExecutor (this .firestoreExecutor );
180+ this .errorExecutor = Context .currentContextExecutor (this .firestoreExecutor );
180181
181182 if (!options .getThrottlingEnabled ()) {
182183 this .rateLimiter =
@@ -609,12 +610,15 @@ private ApiFuture<WriteResult> executeWrite(
609610 final SettableApiFuture <Void > operationCompletedFuture = SettableApiFuture .create ();
610611 pendingOperations .add (operationCompletedFuture );
611612
613+ Executor currentExecutor = MoreExecutors .directExecutor ();
614+
612615 ApiFuture <WriteResult > writeResultApiFuture =
613616 ApiFutures .transformAsync (
614617 executeWriteHelper (documentReference , operationType , operationCallback , 0 ),
615618 new ApiAsyncFunction <WriteResult , WriteResult >() {
616- public ApiFuture <WriteResult > apply (WriteResult result )
619+ public ApiFuture <WriteResult > apply (final WriteResult result )
617620 throws ExecutionException , InterruptedException {
621+
618622 invokeUserSuccessCallback (documentReference , result ).get ();
619623 return ApiFutures .immediateFuture (result );
620624 }
@@ -680,12 +684,12 @@ public ApiFuture<WriteResult> apply(FirestoreException exception)
680684 /** Invokes the user error callback on the user callback executor and returns the result. */
681685 private SettableApiFuture <Boolean > invokeUserErrorCallback (final BulkWriterError error ) {
682686 final SettableApiFuture <Boolean > callbackResult = SettableApiFuture .create ();
683- userCallbackExecutor .execute (
687+ errorExecutor .execute (
684688 new Runnable () {
685689 @ Override
686690 public void run () {
687691 try {
688- boolean shouldRetry = errorListener .shouldRetryError (error );
692+ boolean shouldRetry = errorListener .onError (error );
689693 callbackResult .set (shouldRetry );
690694 } catch (Exception e ) {
691695 callbackResult .setException (e );
@@ -699,7 +703,7 @@ public void run() {
699703 private ApiFuture <Void > invokeUserSuccessCallback (
700704 final DocumentReference documentReference , final WriteResult result ) {
701705 final SettableApiFuture <Void > callbackResult = SettableApiFuture .create ();
702- userCallbackExecutor .execute (
706+ successExecutor .execute (
703707 new Runnable () {
704708 @ Override
705709 public void run () {
@@ -806,14 +810,14 @@ private void verifyNotClosed() {
806810 * <p>For example, see the sample code: <code>
807811 * BulkWriter bulkWriter = firestore.bulkWriter();
808812 * bulkWriter.addWriteResultListener(
809- * (DocumentReference documentReference, WriteResult result -> {
813+ * (DocumentReference documentReference, WriteResult result) -> {
810814 * System.out.println(
811815 * "Successfully executed write on document: "
812816 * + documentReference
813817 * + " at: "
814818 * + result.getUpdateTime());
815819 * }
816- * } );
820+ * );
817821 * </code>
818822 *
819823 * @param writeResultCallback A callback to be called every time a BulkWriter operation
@@ -823,6 +827,12 @@ public void addWriteResultListener(WriteResultCallback writeResultCallback) {
823827 successListener = writeResultCallback ;
824828 }
825829
830+ public void addWriteResultListener (
831+ @ Nonnull Executor executor , WriteResultCallback writeResultCallback ) {
832+ successListener = writeResultCallback ;
833+ successExecutor = executor ;
834+ }
835+
826836 /**
827837 * Attaches an error handler listener that is run every time a BulkWriter operation fails.
828838 *
@@ -842,14 +852,19 @@ public void addWriteResultListener(WriteResultCallback writeResultCallback) {
842852 * return false;
843853 * }
844854 * }
845- * } );
855+ * );
846856 * </code>
847857 *
848- * @param shouldRetryCallback A callback to be called every time a BulkWriter operation fails.
849- * Returning `true` will retry the operation. Returning `false` will stop the retry loop.
858+ * @param onError A callback to be called every time a BulkWriter operation fails. Returning
859+ * `true` will retry the operation. Returning `false` will stop the retry loop.
850860 */
851- public void addWriteErrorListener (WriteErrorCallback shouldRetryCallback ) {
852- errorListener = shouldRetryCallback ;
861+ public void addWriteErrorListener (WriteErrorCallback onError ) {
862+ errorListener = onError ;
863+ }
864+
865+ public void addWriteErrorListener (@ Nonnull Executor executor , WriteErrorCallback onError ) {
866+ errorListener = onError ;
867+ errorExecutor = executor ;
853868 }
854869
855870 /**
0 commit comments