3232import java .util .ArrayList ;
3333import java .util .List ;
3434import java .util .Map ;
35- import java .util .Set ;
3635import java .util .concurrent .CopyOnWriteArrayList ;
3736import java .util .concurrent .ExecutionException ;
3837import java .util .concurrent .ScheduledExecutorService ;
@@ -129,7 +128,7 @@ final class BulkWriter implements AutoCloseable {
129128 public ApiFuture <WriteResult > create (
130129 @ Nonnull DocumentReference documentReference , @ Nonnull Map <String , Object > fields ) {
131130 verifyNotClosed ();
132- BulkCommitBatch bulkCommitBatch = getEligibleBatch (documentReference );
131+ BulkCommitBatch bulkCommitBatch = getEligibleBatch ();
133132 ApiFuture <WriteResult > future = bulkCommitBatch .create (documentReference , fields );
134133 sendReadyBatches ();
135134 return future ;
@@ -147,7 +146,7 @@ public ApiFuture<WriteResult> create(
147146 public ApiFuture <WriteResult > create (
148147 @ Nonnull DocumentReference documentReference , @ Nonnull Object pojo ) {
149148 verifyNotClosed ();
150- BulkCommitBatch bulkCommitBatch = getEligibleBatch (documentReference );
149+ BulkCommitBatch bulkCommitBatch = getEligibleBatch ();
151150 ApiFuture <WriteResult > future = bulkCommitBatch .create (documentReference , pojo );
152151 sendReadyBatches ();
153152 return future ;
@@ -163,7 +162,7 @@ public ApiFuture<WriteResult> create(
163162 @ Nonnull
164163 public ApiFuture <WriteResult > delete (@ Nonnull DocumentReference documentReference ) {
165164 verifyNotClosed ();
166- BulkCommitBatch bulkCommitBatch = getEligibleBatch (documentReference );
165+ BulkCommitBatch bulkCommitBatch = getEligibleBatch ();
167166 ApiFuture <WriteResult > future = bulkCommitBatch .delete (documentReference );
168167 sendReadyBatches ();
169168 return future ;
@@ -181,7 +180,7 @@ public ApiFuture<WriteResult> delete(@Nonnull DocumentReference documentReferenc
181180 public ApiFuture <WriteResult > delete (
182181 @ Nonnull DocumentReference documentReference , @ Nonnull Precondition precondition ) {
183182 verifyNotClosed ();
184- BulkCommitBatch bulkCommitBatch = getEligibleBatch (documentReference );
183+ BulkCommitBatch bulkCommitBatch = getEligibleBatch ();
185184 ApiFuture <WriteResult > future = bulkCommitBatch .delete (documentReference , precondition );
186185 sendReadyBatches ();
187186 return future ;
@@ -200,7 +199,7 @@ public ApiFuture<WriteResult> delete(
200199 public ApiFuture <WriteResult > set (
201200 @ Nonnull DocumentReference documentReference , @ Nonnull Map <String , Object > fields ) {
202201 verifyNotClosed ();
203- BulkCommitBatch bulkCommitBatch = getEligibleBatch (documentReference );
202+ BulkCommitBatch bulkCommitBatch = getEligibleBatch ();
204203 ApiFuture <WriteResult > future = bulkCommitBatch .set (documentReference , fields );
205204 sendReadyBatches ();
206205 return future ;
@@ -222,7 +221,7 @@ public ApiFuture<WriteResult> set(
222221 @ Nonnull Map <String , Object > fields ,
223222 @ Nonnull SetOptions options ) {
224223 verifyNotClosed ();
225- BulkCommitBatch bulkCommitBatch = getEligibleBatch (documentReference );
224+ BulkCommitBatch bulkCommitBatch = getEligibleBatch ();
226225 ApiFuture <WriteResult > future = bulkCommitBatch .set (documentReference , fields , options );
227226 sendReadyBatches ();
228227 return future ;
@@ -242,7 +241,7 @@ public ApiFuture<WriteResult> set(
242241 public ApiFuture <WriteResult > set (
243242 @ Nonnull DocumentReference documentReference , Object pojo , @ Nonnull SetOptions options ) {
244243 verifyNotClosed ();
245- BulkCommitBatch bulkCommitBatch = getEligibleBatch (documentReference );
244+ BulkCommitBatch bulkCommitBatch = getEligibleBatch ();
246245 ApiFuture <WriteResult > future = bulkCommitBatch .set (documentReference , pojo , options );
247246 sendReadyBatches ();
248247 return future ;
@@ -259,7 +258,7 @@ public ApiFuture<WriteResult> set(
259258 @ Nonnull
260259 public ApiFuture <WriteResult > set (@ Nonnull DocumentReference documentReference , Object pojo ) {
261260 verifyNotClosed ();
262- BulkCommitBatch bulkCommitBatch = getEligibleBatch (documentReference );
261+ BulkCommitBatch bulkCommitBatch = getEligibleBatch ();
263262 ApiFuture <WriteResult > future = bulkCommitBatch .set (documentReference , pojo );
264263 sendReadyBatches ();
265264 return future ;
@@ -282,7 +281,7 @@ public ApiFuture<WriteResult> set(@Nonnull DocumentReference documentReference,
282281 public ApiFuture <WriteResult > update (
283282 @ Nonnull DocumentReference documentReference , @ Nonnull Map <String , Object > fields ) {
284283 verifyNotClosed ();
285- BulkCommitBatch bulkCommitBatch = getEligibleBatch (documentReference );
284+ BulkCommitBatch bulkCommitBatch = getEligibleBatch ();
286285 ApiFuture <WriteResult > future = bulkCommitBatch .update (documentReference , fields );
287286 sendReadyBatches ();
288287 return future ;
@@ -308,7 +307,7 @@ public ApiFuture<WriteResult> update(
308307 @ Nonnull Map <String , Object > fields ,
309308 Precondition precondition ) {
310309 verifyNotClosed ();
311- BulkCommitBatch bulkCommitBatch = getEligibleBatch (documentReference );
310+ BulkCommitBatch bulkCommitBatch = getEligibleBatch ();
312311 ApiFuture <WriteResult > future = bulkCommitBatch .update (documentReference , fields , precondition );
313312 sendReadyBatches ();
314313 return future ;
@@ -336,7 +335,7 @@ public ApiFuture<WriteResult> update(
336335 @ Nullable Object value ,
337336 Object ... moreFieldsAndValues ) {
338337 verifyNotClosed ();
339- BulkCommitBatch bulkCommitBatch = getEligibleBatch (documentReference );
338+ BulkCommitBatch bulkCommitBatch = getEligibleBatch ();
340339 ApiFuture <WriteResult > future =
341340 bulkCommitBatch .update (documentReference , field , value , moreFieldsAndValues );
342341 sendReadyBatches ();
@@ -365,7 +364,7 @@ public ApiFuture<WriteResult> update(
365364 @ Nullable Object value ,
366365 Object ... moreFieldsAndValues ) {
367366 verifyNotClosed ();
368- BulkCommitBatch bulkCommitBatch = getEligibleBatch (documentReference );
367+ BulkCommitBatch bulkCommitBatch = getEligibleBatch ();
369368 ApiFuture <WriteResult > future =
370369 bulkCommitBatch .update (documentReference , fieldPath , value , moreFieldsAndValues );
371370 sendReadyBatches ();
@@ -395,7 +394,7 @@ public ApiFuture<WriteResult> update(
395394 @ Nullable Object value ,
396395 Object ... moreFieldsAndValues ) {
397396 verifyNotClosed ();
398- BulkCommitBatch bulkCommitBatch = getEligibleBatch (documentReference );
397+ BulkCommitBatch bulkCommitBatch = getEligibleBatch ();
399398 ApiFuture <WriteResult > future =
400399 bulkCommitBatch .update (documentReference , precondition , field , value , moreFieldsAndValues );
401400 sendReadyBatches ();
@@ -426,7 +425,7 @@ public ApiFuture<WriteResult> update(
426425 @ Nullable Object value ,
427426 Object ... moreFieldsAndValues ) {
428427 verifyNotClosed ();
429- BulkCommitBatch bulkCommitBatch = getEligibleBatch (documentReference );
428+ BulkCommitBatch bulkCommitBatch = getEligibleBatch ();
430429 ApiFuture <WriteResult > future =
431430 bulkCommitBatch .update (
432431 documentReference , precondition , fieldPath , value , moreFieldsAndValues );
@@ -493,11 +492,10 @@ private void verifyNotClosed() {
493492 * Return the first eligible batch that can hold a write to the provided reference, or creates one
494493 * if no eligible batches are found.
495494 */
496- private BulkCommitBatch getEligibleBatch (DocumentReference documentReference ) {
495+ private BulkCommitBatch getEligibleBatch () {
497496 if (batchQueue .size () > 0 ) {
498497 BulkCommitBatch lastBatch = batchQueue .get (batchQueue .size () - 1 );
499- if (lastBatch .getState () == UpdateBuilder .BatchState .OPEN
500- && !lastBatch .hasDocument (documentReference )) {
498+ if (lastBatch .getState () == UpdateBuilder .BatchState .OPEN ) {
501499 return lastBatch ;
502500 }
503501 }
@@ -538,7 +536,8 @@ public boolean apply(BulkCommitBatch batch) {
538536 .toList ();
539537
540538 int index = 0 ;
541- while (index < unsentBatches .size () && isBatchSendable (unsentBatches .get (index ))) {
539+ while (index < unsentBatches .size ()
540+ && unsentBatches .get (index ).state == BatchState .READY_TO_SEND ) {
542541 final BulkCommitBatch batch = unsentBatches .get (index );
543542
544543 // Send the batch if it is under the rate limit, or schedule another attempt after the
@@ -631,8 +630,8 @@ public ApiFuture<Void> apply(Void ignored) {
631630 public ApiFuture <List <BatchWriteResult >> apply (Exception exception ) {
632631 List <BatchWriteResult > results = new ArrayList <>();
633632 // If the BatchWrite RPC fails, map the exception to each individual result.
634- for (DocumentReference documentReference : batch .getPendingDocuments () ) {
635- results .add (new BatchWriteResult (documentReference , null , exception ));
633+ for (int i = 0 ; i < batch .getPendingDocumentPaths (). size (); ++ i ) {
634+ results .add (new BatchWriteResult (null , exception ));
636635 }
637636 return ApiFutures .immediateFuture (results );
638637 }
@@ -655,8 +654,8 @@ public ProcessBulkCommitCallback(BulkCommitBatch batch, int attempt) {
655654
656655 @ Override
657656 public ApiFuture <Void > apply (List <BatchWriteResult > results ) {
658- batch .processResults (results );
659- Set < DocumentReference > remainingOps = batch .getPendingDocuments ();
657+ batch .processResults (results , /* allowRetry= */ true );
658+ List < String > remainingOps = batch .getPendingDocumentPaths ();
660659 if (!remainingOps .isEmpty ()) {
661660 logger .log (
662661 Level .WARNING ,
@@ -666,52 +665,16 @@ public ApiFuture<Void> apply(List<BatchWriteResult> results) {
666665
667666 if (attempt < MAX_RETRY_ATTEMPTS ) {
668667 nextAttempt = backoff .createNextAttempt (nextAttempt );
669- BulkCommitBatch newBatch = new BulkCommitBatch (firestore , batch , remainingOps );
668+ BulkCommitBatch newBatch = new BulkCommitBatch (firestore , batch );
670669 return bulkCommit (newBatch , attempt + 1 );
671670 } else {
672- batch .failRemainingOperations (results );
671+ batch .processResults (results , /* allowRetry= */ false );
673672 }
674673 }
675674 return ApiFutures .immediateFuture (null );
676675 }
677676 }
678677
679- /**
680- * Checks that the provided batch is sendable. To be sendable, a batch must: (1) be marked as
681- * READY_TO_SEND (2) not write to references that are currently in flight.
682- */
683- private boolean isBatchSendable (BulkCommitBatch batch ) {
684- if (!batch .getState ().equals (UpdateBuilder .BatchState .READY_TO_SEND )) {
685- return false ;
686- }
687-
688- for (final DocumentReference documentReference : batch .getPendingDocuments ()) {
689- boolean isRefInFlight =
690- FluentIterable .from (batchQueue )
691- .anyMatch (
692- new Predicate <BulkCommitBatch >() {
693- @ Override
694- public boolean apply (BulkCommitBatch batch ) {
695- return batch .getState ().equals (BatchState .SENT )
696- && batch .hasDocument (documentReference );
697- }
698- });
699-
700- if (isRefInFlight ) {
701- logger .log (
702- Level .WARNING ,
703- String .format (
704- "Duplicate write to document %s detected. Writing to the same document multiple"
705- + " times will slow down BulkWriter. Write to unique documents in order to "
706- + "maximize throughput." ,
707- documentReference .getPath ()));
708- return false ;
709- }
710- }
711-
712- return true ;
713- }
714-
715678 @ VisibleForTesting
716679 void setMaxBatchSize (int size ) {
717680 maxBatchSize = size ;
0 commit comments