Skip to content

Commit 2133403

Browse files
author
Brian Chen
committed
change naming to onError and expose callback executors
1 parent 2b5f04a commit 2133403

2 files changed

Lines changed: 39 additions & 26 deletions

File tree

google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,13 @@ public interface WriteErrorCallback {
6161
* @param error The error object from the failed BulkWriter operation attempt.
6262
* @return Whether to retry the operation or not.
6363
*/
64-
boolean shouldRetryError(BulkWriterError error);
64+
boolean onError(BulkWriterError error);
6565
}
6666

6767
private interface BulkWriterOperationCallback {
6868
ApiFuture<WriteResult> apply(BulkCommitBatch batch);
6969
}
7070

71-
private final Executor userCallbackExecutor;
72-
7371
enum OperationType {
7472
CREATE,
7573
SET,
@@ -155,7 +153,7 @@ public void onResult(DocumentReference documentReference, WriteResult result) {}
155153

156154
private WriteErrorCallback errorListener =
157155
new WriteErrorCallback() {
158-
public boolean shouldRetryError(BulkWriterError error) {
156+
public boolean onError(BulkWriterError error) {
159157
if (error.getFailedAttempts() == MAX_RETRY_ATTEMPTS) {
160158
return false;
161159
}
@@ -169,14 +167,17 @@ public boolean shouldRetryError(BulkWriterError error) {
169167
}
170168
};
171169

172-
private final FirestoreImpl firestore;
170+
private Executor successExecutor;
171+
private Executor errorExecutor;
173172

173+
private final FirestoreImpl firestore;
174174
private final ScheduledExecutorService firestoreExecutor;
175175

176176
BulkWriter(FirestoreImpl firestore, BulkWriterOptions options) {
177177
this.firestore = firestore;
178178
this.firestoreExecutor = firestore.getClient().getExecutor();
179-
this.userCallbackExecutor = Context.currentContextExecutor(this.firestoreExecutor);
179+
this.successExecutor = Context.currentContextExecutor(this.firestoreExecutor);
180+
this.errorExecutor = Context.currentContextExecutor(this.firestoreExecutor);
180181

181182
if (!options.getThrottlingEnabled()) {
182183
this.rateLimiter =
@@ -609,12 +610,15 @@ private ApiFuture<WriteResult> executeWrite(
609610
final SettableApiFuture<Void> operationCompletedFuture = SettableApiFuture.create();
610611
pendingOperations.add(operationCompletedFuture);
611612

613+
Executor currentExecutor = MoreExecutors.directExecutor();
614+
612615
ApiFuture<WriteResult> writeResultApiFuture =
613616
ApiFutures.transformAsync(
614617
executeWriteHelper(documentReference, operationType, operationCallback, 0),
615618
new ApiAsyncFunction<WriteResult, WriteResult>() {
616-
public ApiFuture<WriteResult> apply(WriteResult result)
619+
public ApiFuture<WriteResult> apply(final WriteResult result)
617620
throws ExecutionException, InterruptedException {
621+
618622
invokeUserSuccessCallback(documentReference, result).get();
619623
return ApiFutures.immediateFuture(result);
620624
}
@@ -680,12 +684,12 @@ public ApiFuture<WriteResult> apply(FirestoreException exception)
680684
/** Invokes the user error callback on the user callback executor and returns the result. */
681685
private SettableApiFuture<Boolean> invokeUserErrorCallback(final BulkWriterError error) {
682686
final SettableApiFuture<Boolean> callbackResult = SettableApiFuture.create();
683-
userCallbackExecutor.execute(
687+
errorExecutor.execute(
684688
new Runnable() {
685689
@Override
686690
public void run() {
687691
try {
688-
boolean shouldRetry = errorListener.shouldRetryError(error);
692+
boolean shouldRetry = errorListener.onError(error);
689693
callbackResult.set(shouldRetry);
690694
} catch (Exception e) {
691695
callbackResult.setException(e);
@@ -699,7 +703,7 @@ public void run() {
699703
private ApiFuture<Void> invokeUserSuccessCallback(
700704
final DocumentReference documentReference, final WriteResult result) {
701705
final SettableApiFuture<Void> callbackResult = SettableApiFuture.create();
702-
userCallbackExecutor.execute(
706+
successExecutor.execute(
703707
new Runnable() {
704708
@Override
705709
public void run() {
@@ -806,14 +810,14 @@ private void verifyNotClosed() {
806810
* <p>For example, see the sample code: <code>
807811
* BulkWriter bulkWriter = firestore.bulkWriter();
808812
* bulkWriter.addWriteResultListener(
809-
* (DocumentReference documentReference, WriteResult result -> {
813+
* (DocumentReference documentReference, WriteResult result) -> {
810814
* System.out.println(
811815
* "Successfully executed write on document: "
812816
* + documentReference
813817
* + " at: "
814818
* + result.getUpdateTime());
815819
* }
816-
* });
820+
* );
817821
* </code>
818822
*
819823
* @param writeResultCallback A callback to be called every time a BulkWriter operation
@@ -823,6 +827,12 @@ public void addWriteResultListener(WriteResultCallback writeResultCallback) {
823827
successListener = writeResultCallback;
824828
}
825829

830+
public void addWriteResultListener(
831+
@Nonnull Executor executor, WriteResultCallback writeResultCallback) {
832+
successListener = writeResultCallback;
833+
successExecutor = executor;
834+
}
835+
826836
/**
827837
* Attaches an error handler listener that is run every time a BulkWriter operation fails.
828838
*
@@ -842,14 +852,19 @@ public void addWriteResultListener(WriteResultCallback writeResultCallback) {
842852
* return false;
843853
* }
844854
* }
845-
* });
855+
* );
846856
* </code>
847857
*
848-
* @param shouldRetryCallback A callback to be called every time a BulkWriter operation fails.
849-
* Returning `true` will retry the operation. Returning `false` will stop the retry loop.
858+
* @param onError A callback to be called every time a BulkWriter operation fails. Returning
859+
* `true` will retry the operation. Returning `false` will stop the retry loop.
850860
*/
851-
public void addWriteErrorListener(WriteErrorCallback shouldRetryCallback) {
852-
errorListener = shouldRetryCallback;
861+
public void addWriteErrorListener(WriteErrorCallback onError) {
862+
errorListener = onError;
863+
}
864+
865+
public void addWriteErrorListener(@Nonnull Executor executor, WriteErrorCallback onError) {
866+
errorListener = onError;
867+
errorExecutor = executor;
853868
}
854869

855870
/**

google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,7 @@
5252
import javax.annotation.Nonnull;
5353
import org.junit.Assert;
5454
import org.junit.Before;
55-
import org.junit.Rule;
5655
import org.junit.Test;
57-
import org.junit.rules.Timeout;
5856
import org.junit.runner.RunWith;
5957
import org.mockito.ArgumentCaptor;
6058
import org.mockito.Captor;
@@ -68,7 +66,7 @@
6866
@RunWith(MockitoJUnitRunner.class)
6967
public class BulkWriterTest {
7068

71-
@Rule public Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS);
69+
// @Rule public Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS);
7270

7371
@Spy private final FirestoreRpc firestoreRpc = Mockito.mock(FirestoreRpc.class);
7472

@@ -425,7 +423,7 @@ public void retriesFailedOperationsWithGlobalErrorCallback() throws Exception {
425423
DocumentReference doc4 = firestoreMock.document("coll/doc4");
426424
bulkWriter.addWriteErrorListener(
427425
new WriteErrorCallback() {
428-
public boolean shouldRetryError(BulkWriterError error) {
426+
public boolean onError(BulkWriterError error) {
429427
operations.add(error.getOperationType().name());
430428
return true;
431429
}
@@ -463,7 +461,7 @@ public void errorSurfacedEvenWithRetryFunction() throws Exception {
463461
final boolean[] errorListenerCalled = {false};
464462
bulkWriter.addWriteErrorListener(
465463
new WriteErrorCallback() {
466-
public boolean shouldRetryError(BulkWriterError error) {
464+
public boolean onError(BulkWriterError error) {
467465
errorListenerCalled[0] = true;
468466
assertEquals(Status.INTERNAL, error.getStatus());
469467
return false;
@@ -495,7 +493,7 @@ public void surfacesExceptionsThrownByUserProvidedErrorListener() throws Excepti
495493

496494
bulkWriter.addWriteErrorListener(
497495
new WriteErrorCallback() {
498-
public boolean shouldRetryError(BulkWriterError error) {
496+
public boolean onError(BulkWriterError error) {
499497
throw new UnsupportedOperationException(
500498
"Test code threw UnsupportedOperationException");
501499
}
@@ -564,7 +562,7 @@ public void retriesMultipleTimes() throws Exception {
564562

565563
bulkWriter.addWriteErrorListener(
566564
new WriteErrorCallback() {
567-
public boolean shouldRetryError(BulkWriterError error) {
565+
public boolean onError(BulkWriterError error) {
568566
return true;
569567
}
570568
});
@@ -600,7 +598,7 @@ public void retriesMaintainCorrectWriteResolutionOrdering() throws Exception {
600598
final List<String> operations = new ArrayList<>();
601599
bulkWriter.addWriteErrorListener(
602600
new WriteErrorCallback() {
603-
public boolean shouldRetryError(BulkWriterError error) {
601+
public boolean onError(BulkWriterError error) {
604602
return true;
605603
}
606604
});
@@ -663,7 +661,7 @@ public void returnsTheErrorIfNoRetrySpecified() throws Exception {
663661

664662
bulkWriter.addWriteErrorListener(
665663
new WriteErrorCallback() {
666-
public boolean shouldRetryError(BulkWriterError error) {
664+
public boolean onError(BulkWriterError error) {
667665
return error.getFailedAttempts() < 3;
668666
}
669667
});

0 commit comments

Comments
 (0)