Skip to content

Commit 259ece8

Browse files
author
Brian Chen
authored
fix: bulkWriter: writing to the same doc doesn't create a new batch (#394)
* fix: bulkWriter: writing to the same doc doesn't create a new batch * fix clirr rules * resolve comments * remove usage of BatchWriteResult.documentKey * clirr rule update
1 parent 5e8c154 commit 259ece8

6 files changed

Lines changed: 129 additions & 198 deletions

File tree

google-cloud-firestore/clirr-ignored-differences.xml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@
161161
<method>com.google.cloud.firestore.Query collectionGroup(java.lang.String)</method>
162162
<to>com.google.cloud.firestore.CollectionGroup</to>
163163
</difference>
164-
164+
165165
<!--
166166
BulkWriter
167167
-->
@@ -199,6 +199,17 @@
199199
<className>com/google/cloud/firestore/spi/v1/FirestoreRpc</className>
200200
<method>com.google.api.gax.rpc.UnaryCallable batchWriteCallable()</method>
201201
</difference>
202+
<difference>
203+
<differenceType>1001</differenceType>
204+
<className>com/google/cloud/firestore/BatchWriteResult</className>
205+
</difference>
206+
<difference>
207+
<differenceType>6004</differenceType>
208+
<className>com/google/cloud/firestore/UpdateBuilder</className>
209+
<field>pendingOperations</field>
210+
<from>java.util.Map</from>
211+
<to>java.util.List</to>
212+
</difference>
202213

203214
<!--
204215
FakeCredentials Refactor

google-cloud-firestore/src/main/java/com/google/cloud/firestore/BatchWriteResult.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,15 @@
2525
* BatchWriteRequests.
2626
*/
2727
@InternalApi
28-
public final class BatchWriteResult {
29-
private final DocumentReference documentReference;
28+
final class BatchWriteResult {
3029
@Nullable private final Timestamp writeTime;
3130
@Nullable private final Exception exception;
3231

33-
BatchWriteResult(
34-
DocumentReference documentReference,
35-
@Nullable Timestamp timestamp,
36-
@Nullable Exception exception) {
37-
this.documentReference = documentReference;
32+
BatchWriteResult(@Nullable Timestamp timestamp, @Nullable Exception exception) {
3833
this.writeTime = timestamp;
3934
this.exception = exception;
4035
}
4136

42-
public DocumentReference getDocumentReference() {
43-
return documentReference;
44-
}
45-
4637
@Nullable
4738
public Timestamp getWriteTime() {
4839
return writeTime;

google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkCommitBatch.java

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@
1818

1919
import com.google.api.core.ApiFuture;
2020
import com.google.common.base.Preconditions;
21-
import com.google.common.base.Predicate;
22-
import com.google.common.collect.FluentIterable;
23-
import java.util.Set;
2421

2522
/** Used to represent a batch on the BatchQueue. */
2623
class BulkCommitBatch extends UpdateBuilder<ApiFuture<WriteResult>> {
@@ -29,21 +26,13 @@ class BulkCommitBatch extends UpdateBuilder<ApiFuture<WriteResult>> {
2926
super(firestore, maxBatchSize);
3027
}
3128

32-
BulkCommitBatch(
33-
FirestoreImpl firestore,
34-
BulkCommitBatch retryBatch,
35-
final Set<DocumentReference> docsToRetry) {
29+
BulkCommitBatch(FirestoreImpl firestore, BulkCommitBatch retryBatch) {
3630
super(firestore);
37-
this.writes.addAll(
38-
FluentIterable.from(retryBatch.writes)
39-
.filter(
40-
new Predicate<WriteOperation>() {
41-
@Override
42-
public boolean apply(WriteOperation writeOperation) {
43-
return docsToRetry.contains(writeOperation.documentReference);
44-
}
45-
})
46-
.toList());
31+
32+
// Create a new BulkCommitBatch containing only the indexes from the provided indexes to retry.
33+
for (int index : retryBatch.getPendingIndexes()) {
34+
this.writes.add(retryBatch.writes.get(index));
35+
}
4736

4837
Preconditions.checkState(
4938
retryBatch.state == BatchState.SENT,
@@ -55,9 +44,4 @@ public boolean apply(WriteOperation writeOperation) {
5544
ApiFuture<WriteResult> wrapResult(ApiFuture<WriteResult> result) {
5645
return result;
5746
}
58-
59-
@Override
60-
boolean allowDuplicateDocs() {
61-
return false;
62-
}
6347
}

google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java

Lines changed: 24 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.util.ArrayList;
3333
import java.util.List;
3434
import java.util.Map;
35-
import java.util.Set;
3635
import java.util.concurrent.CopyOnWriteArrayList;
3736
import java.util.concurrent.ExecutionException;
3837
import java.util.concurrent.ScheduledExecutorService;
@@ -129,7 +128,7 @@ final class BulkWriter implements AutoCloseable {
129128
public ApiFuture<WriteResult> create(
130129
@Nonnull DocumentReference documentReference, @Nonnull Map<String, Object> fields) {
131130
verifyNotClosed();
132-
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
131+
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
133132
ApiFuture<WriteResult> future = bulkCommitBatch.create(documentReference, fields);
134133
sendReadyBatches();
135134
return future;
@@ -147,7 +146,7 @@ public ApiFuture<WriteResult> create(
147146
public ApiFuture<WriteResult> create(
148147
@Nonnull DocumentReference documentReference, @Nonnull Object pojo) {
149148
verifyNotClosed();
150-
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
149+
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
151150
ApiFuture<WriteResult> future = bulkCommitBatch.create(documentReference, pojo);
152151
sendReadyBatches();
153152
return future;
@@ -163,7 +162,7 @@ public ApiFuture<WriteResult> create(
163162
@Nonnull
164163
public ApiFuture<WriteResult> delete(@Nonnull DocumentReference documentReference) {
165164
verifyNotClosed();
166-
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
165+
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
167166
ApiFuture<WriteResult> future = bulkCommitBatch.delete(documentReference);
168167
sendReadyBatches();
169168
return future;
@@ -181,7 +180,7 @@ public ApiFuture<WriteResult> delete(@Nonnull DocumentReference documentReferenc
181180
public ApiFuture<WriteResult> delete(
182181
@Nonnull DocumentReference documentReference, @Nonnull Precondition precondition) {
183182
verifyNotClosed();
184-
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
183+
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
185184
ApiFuture<WriteResult> future = bulkCommitBatch.delete(documentReference, precondition);
186185
sendReadyBatches();
187186
return future;
@@ -200,7 +199,7 @@ public ApiFuture<WriteResult> delete(
200199
public ApiFuture<WriteResult> set(
201200
@Nonnull DocumentReference documentReference, @Nonnull Map<String, Object> fields) {
202201
verifyNotClosed();
203-
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
202+
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
204203
ApiFuture<WriteResult> future = bulkCommitBatch.set(documentReference, fields);
205204
sendReadyBatches();
206205
return future;
@@ -222,7 +221,7 @@ public ApiFuture<WriteResult> set(
222221
@Nonnull Map<String, Object> fields,
223222
@Nonnull SetOptions options) {
224223
verifyNotClosed();
225-
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
224+
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
226225
ApiFuture<WriteResult> future = bulkCommitBatch.set(documentReference, fields, options);
227226
sendReadyBatches();
228227
return future;
@@ -242,7 +241,7 @@ public ApiFuture<WriteResult> set(
242241
public ApiFuture<WriteResult> set(
243242
@Nonnull DocumentReference documentReference, Object pojo, @Nonnull SetOptions options) {
244243
verifyNotClosed();
245-
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
244+
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
246245
ApiFuture<WriteResult> future = bulkCommitBatch.set(documentReference, pojo, options);
247246
sendReadyBatches();
248247
return future;
@@ -259,7 +258,7 @@ public ApiFuture<WriteResult> set(
259258
@Nonnull
260259
public ApiFuture<WriteResult> set(@Nonnull DocumentReference documentReference, Object pojo) {
261260
verifyNotClosed();
262-
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
261+
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
263262
ApiFuture<WriteResult> future = bulkCommitBatch.set(documentReference, pojo);
264263
sendReadyBatches();
265264
return future;
@@ -282,7 +281,7 @@ public ApiFuture<WriteResult> set(@Nonnull DocumentReference documentReference,
282281
public ApiFuture<WriteResult> update(
283282
@Nonnull DocumentReference documentReference, @Nonnull Map<String, Object> fields) {
284283
verifyNotClosed();
285-
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
284+
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
286285
ApiFuture<WriteResult> future = bulkCommitBatch.update(documentReference, fields);
287286
sendReadyBatches();
288287
return future;
@@ -308,7 +307,7 @@ public ApiFuture<WriteResult> update(
308307
@Nonnull Map<String, Object> fields,
309308
Precondition precondition) {
310309
verifyNotClosed();
311-
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
310+
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
312311
ApiFuture<WriteResult> future = bulkCommitBatch.update(documentReference, fields, precondition);
313312
sendReadyBatches();
314313
return future;
@@ -336,7 +335,7 @@ public ApiFuture<WriteResult> update(
336335
@Nullable Object value,
337336
Object... moreFieldsAndValues) {
338337
verifyNotClosed();
339-
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
338+
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
340339
ApiFuture<WriteResult> future =
341340
bulkCommitBatch.update(documentReference, field, value, moreFieldsAndValues);
342341
sendReadyBatches();
@@ -365,7 +364,7 @@ public ApiFuture<WriteResult> update(
365364
@Nullable Object value,
366365
Object... moreFieldsAndValues) {
367366
verifyNotClosed();
368-
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
367+
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
369368
ApiFuture<WriteResult> future =
370369
bulkCommitBatch.update(documentReference, fieldPath, value, moreFieldsAndValues);
371370
sendReadyBatches();
@@ -395,7 +394,7 @@ public ApiFuture<WriteResult> update(
395394
@Nullable Object value,
396395
Object... moreFieldsAndValues) {
397396
verifyNotClosed();
398-
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
397+
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
399398
ApiFuture<WriteResult> future =
400399
bulkCommitBatch.update(documentReference, precondition, field, value, moreFieldsAndValues);
401400
sendReadyBatches();
@@ -426,7 +425,7 @@ public ApiFuture<WriteResult> update(
426425
@Nullable Object value,
427426
Object... moreFieldsAndValues) {
428427
verifyNotClosed();
429-
BulkCommitBatch bulkCommitBatch = getEligibleBatch(documentReference);
428+
BulkCommitBatch bulkCommitBatch = getEligibleBatch();
430429
ApiFuture<WriteResult> future =
431430
bulkCommitBatch.update(
432431
documentReference, precondition, fieldPath, value, moreFieldsAndValues);
@@ -493,11 +492,10 @@ private void verifyNotClosed() {
493492
* Return the first eligible batch that can hold a write to the provided reference, or creates one
494493
* if no eligible batches are found.
495494
*/
496-
private BulkCommitBatch getEligibleBatch(DocumentReference documentReference) {
495+
private BulkCommitBatch getEligibleBatch() {
497496
if (batchQueue.size() > 0) {
498497
BulkCommitBatch lastBatch = batchQueue.get(batchQueue.size() - 1);
499-
if (lastBatch.getState() == UpdateBuilder.BatchState.OPEN
500-
&& !lastBatch.hasDocument(documentReference)) {
498+
if (lastBatch.getState() == UpdateBuilder.BatchState.OPEN) {
501499
return lastBatch;
502500
}
503501
}
@@ -538,7 +536,8 @@ public boolean apply(BulkCommitBatch batch) {
538536
.toList();
539537

540538
int index = 0;
541-
while (index < unsentBatches.size() && isBatchSendable(unsentBatches.get(index))) {
539+
while (index < unsentBatches.size()
540+
&& unsentBatches.get(index).state == BatchState.READY_TO_SEND) {
542541
final BulkCommitBatch batch = unsentBatches.get(index);
543542

544543
// Send the batch if it is under the rate limit, or schedule another attempt after the
@@ -631,8 +630,8 @@ public ApiFuture<Void> apply(Void ignored) {
631630
public ApiFuture<List<BatchWriteResult>> apply(Exception exception) {
632631
List<BatchWriteResult> results = new ArrayList<>();
633632
// If the BatchWrite RPC fails, map the exception to each individual result.
634-
for (DocumentReference documentReference : batch.getPendingDocuments()) {
635-
results.add(new BatchWriteResult(documentReference, null, exception));
633+
for (int i = 0; i < batch.getPendingDocumentPaths().size(); ++i) {
634+
results.add(new BatchWriteResult(null, exception));
636635
}
637636
return ApiFutures.immediateFuture(results);
638637
}
@@ -655,8 +654,8 @@ public ProcessBulkCommitCallback(BulkCommitBatch batch, int attempt) {
655654

656655
@Override
657656
public ApiFuture<Void> apply(List<BatchWriteResult> results) {
658-
batch.processResults(results);
659-
Set<DocumentReference> remainingOps = batch.getPendingDocuments();
657+
batch.processResults(results, /* allowRetry= */ true);
658+
List<String> remainingOps = batch.getPendingDocumentPaths();
660659
if (!remainingOps.isEmpty()) {
661660
logger.log(
662661
Level.WARNING,
@@ -666,52 +665,16 @@ public ApiFuture<Void> apply(List<BatchWriteResult> results) {
666665

667666
if (attempt < MAX_RETRY_ATTEMPTS) {
668667
nextAttempt = backoff.createNextAttempt(nextAttempt);
669-
BulkCommitBatch newBatch = new BulkCommitBatch(firestore, batch, remainingOps);
668+
BulkCommitBatch newBatch = new BulkCommitBatch(firestore, batch);
670669
return bulkCommit(newBatch, attempt + 1);
671670
} else {
672-
batch.failRemainingOperations(results);
671+
batch.processResults(results, /* allowRetry= */ false);
673672
}
674673
}
675674
return ApiFutures.immediateFuture(null);
676675
}
677676
}
678677

679-
/**
680-
* Checks that the provided batch is sendable. To be sendable, a batch must: (1) be marked as
681-
* READY_TO_SEND (2) not write to references that are currently in flight.
682-
*/
683-
private boolean isBatchSendable(BulkCommitBatch batch) {
684-
if (!batch.getState().equals(UpdateBuilder.BatchState.READY_TO_SEND)) {
685-
return false;
686-
}
687-
688-
for (final DocumentReference documentReference : batch.getPendingDocuments()) {
689-
boolean isRefInFlight =
690-
FluentIterable.from(batchQueue)
691-
.anyMatch(
692-
new Predicate<BulkCommitBatch>() {
693-
@Override
694-
public boolean apply(BulkCommitBatch batch) {
695-
return batch.getState().equals(BatchState.SENT)
696-
&& batch.hasDocument(documentReference);
697-
}
698-
});
699-
700-
if (isRefInFlight) {
701-
logger.log(
702-
Level.WARNING,
703-
String.format(
704-
"Duplicate write to document %s detected. Writing to the same document multiple"
705-
+ " times will slow down BulkWriter. Write to unique documents in order to "
706-
+ "maximize throughput.",
707-
documentReference.getPath()));
708-
return false;
709-
}
710-
}
711-
712-
return true;
713-
}
714-
715678
@VisibleForTesting
716679
void setMaxBatchSize(int size) {
717680
maxBatchSize = size;

0 commit comments

Comments
 (0)