3030import com .google .firestore .v1 .CommitResponse ;
3131import com .google .firestore .v1 .Write ;
3232import com .google .protobuf .ByteString ;
33+ import com .google .protobuf .Timestamp ;
3334import io .opencensus .trace .AttributeValue ;
3435import io .opencensus .trace .Tracing ;
3536import java .util .ArrayList ;
3637import java .util .HashMap ;
38+ import java .util .HashSet ;
3739import java .util .List ;
3840import java .util .Map ;
3941import java .util .Map .Entry ;
42+ import java .util .Set ;
4043import java .util .SortedSet ;
4144import java .util .TreeSet ;
45+ import java .util .function .Consumer ;
46+ import java .util .stream .Collectors ;
4247import javax .annotation .Nonnull ;
4348import javax .annotation .Nullable ;
4449
4853 */
4954@ InternalExtensionOnly
5055public abstract class UpdateBuilder <T > {
51- static class WriteOperation {
52- Write . Builder write ;
53- DocumentReference documentReference ;
56+ static final class WriteOperation {
57+ final Write write ;
58+ final DocumentReference documentReference ;
5459
55- WriteOperation (DocumentReference documentReference , Write . Builder write ) {
60+ WriteOperation (DocumentReference documentReference , Write write ) {
5661 this .documentReference = documentReference ;
5762 this .write = write ;
5863 }
@@ -65,13 +70,11 @@ public String toString() {
6570
6671 final FirestoreImpl firestore ;
6772
73+ // All reads and writes on `writes` must be done in a block that is synchronized on `writes`;
74+ // otherwise, you get undefined behavior.
6875 private final List <WriteOperation > writes = new ArrayList <>();
6976
70- protected boolean committed ;
71-
72- boolean isCommitted () {
73- return committed ;
74- }
77+ protected volatile boolean committed ;
7578
7679 UpdateBuilder (FirestoreImpl firestore ) {
7780 this .firestore = firestore ;
@@ -141,7 +144,6 @@ public T create(
141144
142145 private T performCreate (
143146 @ Nonnull DocumentReference documentReference , @ Nonnull Map <String , Object > fields ) {
144- verifyNotCommitted ();
145147 Tracing .getTracer ().getCurrentSpan ().addAnnotation (TraceUtil .SPAN_NAME_CREATEDOCUMENT );
146148 DocumentSnapshot documentSnapshot =
147149 DocumentSnapshot .fromObject (
@@ -157,17 +159,7 @@ private T performCreate(
157159 write .addAllUpdateTransforms (documentTransform .toPb ());
158160 }
159161
160- writes .add (new WriteOperation (documentReference , write ));
161-
162- return wrapResult (writes .size () - 1 );
163- }
164-
165- private void verifyNotCommitted () {
166- Preconditions .checkState (
167- !isCommitted (),
168- String .format (
169- "Cannot modify a %s that has already been committed." ,
170- this .getClass ().getSimpleName ()));
162+ return addWrite (documentReference , write );
171163 }
172164
173165 /**
@@ -258,7 +250,6 @@ private T performSet(
258250 @ Nonnull DocumentReference documentReference ,
259251 @ Nonnull Map <String , Object > fields ,
260252 @ Nonnull SetOptions options ) {
261- verifyNotCommitted ();
262253 Map <FieldPath , Object > documentData ;
263254
264255 if (options .getFieldMask () != null ) {
@@ -293,23 +284,36 @@ private T performSet(
293284 write .setUpdateMask (documentMask .toPb ());
294285 }
295286
296- writes .add (new WriteOperation (documentReference , write ));
287+ return addWrite (documentReference , write );
288+ }
297289
298- return wrapResult (writes .size () - 1 );
290+ private T addWrite (DocumentReference documentReference , Write .Builder write ) {
291+ WriteOperation operation = new WriteOperation (documentReference , write .build ());
292+ int writeIndex ;
293+ synchronized (writes ) {
294+ Preconditions .checkState (
295+ !committed ,
296+ String .format (
297+ "Cannot modify a %s that has already been committed." ,
298+ this .getClass ().getSimpleName ()));
299+ writes .add (operation );
300+ writeIndex = writes .size () - 1 ;
301+ }
302+ return wrapResult (writeIndex );
299303 }
300304
301305 /** Removes all values in 'fields' that are not specified in 'fieldMask'. */
302- private Map <FieldPath , Object > applyFieldMask (
306+ private static Map <FieldPath , Object > applyFieldMask (
303307 Map <String , Object > fields , List <FieldPath > fieldMask ) {
304- List <FieldPath > remainingFields = new ArrayList <>(fieldMask );
308+ Set <FieldPath > remainingFields = new HashSet <>(fieldMask );
305309 Map <FieldPath , Object > filteredData =
306310 applyFieldMask (fields , remainingFields , FieldPath .empty ());
307311
308312 if (!remainingFields .isEmpty ()) {
309313 throw new IllegalArgumentException (
310314 String .format (
311315 "Field masks contains invalid path. No data exist at field '%s'." ,
312- remainingFields .get ( 0 )));
316+ remainingFields .iterator (). next ( )));
313317 }
314318
315319 return filteredData ;
@@ -319,8 +323,8 @@ private Map<FieldPath, Object> applyFieldMask(
319323 * Strips all values in 'fields' that are not specified in 'fieldMask'. Modifies 'fieldMask'
320324 * inline and removes all matched fields.
321325 */
322- private Map <FieldPath , Object > applyFieldMask (
323- Map <String , Object > fields , List <FieldPath > fieldMask , FieldPath root ) {
326+ private static Map <FieldPath , Object > applyFieldMask (
327+ Map <String , Object > fields , Set <FieldPath > fieldMask , FieldPath root ) {
324328 Map <FieldPath , Object > filteredMap = new HashMap <>();
325329
326330 for (Entry <String , Object > entry : fields .entrySet ()) {
@@ -340,7 +344,7 @@ private Map<FieldPath, Object> applyFieldMask(
340344 return filteredMap ;
341345 }
342346
343- private Map <FieldPath , Object > convertToFieldPaths (
347+ private static Map <FieldPath , Object > convertToFieldPaths (
344348 @ Nonnull Map <String , Object > fields , boolean splitOnDots ) {
345349 Map <FieldPath , Object > fieldPaths = new HashMap <>();
346350
@@ -532,7 +536,6 @@ private T performUpdate(
532536 @ Nonnull DocumentReference documentReference ,
533537 @ Nonnull final Map <FieldPath , Object > fields ,
534538 @ Nonnull Precondition precondition ) {
535- verifyNotCommitted ();
536539 Preconditions .checkArgument (!fields .isEmpty (), "Data for update() cannot be empty." );
537540 Tracing .getTracer ().getCurrentSpan ().addAnnotation (TraceUtil .SPAN_NAME_UPDATEDOCUMENT );
538541 Map <String , Object > deconstructedMap = expandObject (fields );
@@ -567,9 +570,8 @@ public boolean allowTransform() {
567570 if (!documentTransform .isEmpty ()) {
568571 write .addAllUpdateTransforms (documentTransform .toPb ());
569572 }
570- writes .add (new WriteOperation (documentReference , write ));
571573
572- return wrapResult ( writes . size () - 1 );
574+ return addWrite ( documentReference , write );
573575 }
574576
575577 /**
@@ -598,76 +600,98 @@ public T delete(@Nonnull DocumentReference documentReference) {
598600
599601 private T performDelete (
600602 @ Nonnull DocumentReference documentReference , @ Nonnull Precondition precondition ) {
601- verifyNotCommitted ();
602603 Tracing .getTracer ().getCurrentSpan ().addAnnotation (TraceUtil .SPAN_NAME_DELETEDOCUMENT );
603604 Write .Builder write = Write .newBuilder ().setDelete (documentReference .getName ());
604605
605606 if (!precondition .isEmpty ()) {
606607 write .setCurrentDocument (precondition .toPb ());
607608 }
608- writes .add (new WriteOperation (documentReference , write ));
609609
610- return wrapResult ( writes . size () - 1 );
610+ return addWrite ( documentReference , write );
611611 }
612612
613613 /** Commit the current batch. */
614614 ApiFuture <List <WriteResult >> commit (@ Nullable ByteString transactionId ) {
615+
616+ // Sequence is thread safe.
617+ //
618+ // 1. Set committed = true
619+ // 2. Build commit request
620+ //
621+ // Step 1 sets uses volatile property to ensure committed is visible to all
622+ // threads immediately.
623+ //
624+ // Step 2 uses `forEach(..)` that is synchronized, therefore will be blocked
625+ // until any writes are complete.
626+ //
627+ // Writes will verify `committed==false` within synchronized block of code
628+ // before appending writes. Since committed is set to true before accessing
629+ // writes, we are ensured that no more writes will be appended after commit
630+ // accesses writes.
631+ committed = true ;
632+ CommitRequest request = buildCommitRequest (transactionId );
633+
615634 Tracing .getTracer ()
616635 .getCurrentSpan ()
617636 .addAnnotation (
618637 TraceUtil .SPAN_NAME_COMMIT ,
619- ImmutableMap .of ("numDocuments" , AttributeValue .longAttributeValue (writes .size ())));
620-
621- final CommitRequest .Builder request = CommitRequest .newBuilder ();
622- request .setDatabase (firestore .getDatabaseName ());
623-
624- for (WriteOperation writeOperation : writes ) {
625- request .addWrites (writeOperation .write );
626- }
627-
628- if (transactionId != null ) {
629- request .setTransaction (transactionId );
630- }
631-
632- committed = true ;
638+ ImmutableMap .of (
639+ "numDocuments" , AttributeValue .longAttributeValue (request .getWritesCount ())));
633640
634641 ApiFuture <CommitResponse > response =
635- firestore .sendRequest (request . build () , firestore .getClient ().commitCallable ());
642+ firestore .sendRequest (request , firestore .getClient ().commitCallable ());
636643
637644 return ApiFutures .transform (
638645 response ,
639646 commitResponse -> {
640- List <com .google .firestore .v1 .WriteResult > writeResults =
641- commitResponse .getWriteResultsList ();
642-
643- List <WriteResult > result = new ArrayList <>();
644-
645- for (com .google .firestore .v1 .WriteResult writeResult : writeResults ) {
646- result .add (WriteResult .fromProto (writeResult , commitResponse .getCommitTime ()));
647- }
648-
649- return result ;
647+ Timestamp commitTime = commitResponse .getCommitTime ();
648+ return commitResponse .getWriteResultsList ().stream ()
649+ .map (writeResult -> WriteResult .fromProto (writeResult , commitTime ))
650+ .collect (Collectors .toList ());
650651 },
651652 MoreExecutors .directExecutor ());
652653 }
653654
655+ private CommitRequest buildCommitRequest (ByteString transactionId ) {
656+ CommitRequest .Builder builder = CommitRequest .newBuilder ();
657+ builder .setDatabase (firestore .getDatabaseName ());
658+ forEachWrite (builder ::addWrites );
659+ if (transactionId != null ) {
660+ builder .setTransaction (transactionId );
661+ }
662+ return builder .build ();
663+ }
664+
654665 /** Checks whether any updates have been queued. */
655666 boolean isEmpty () {
656- return writes .isEmpty ();
667+ synchronized (writes ) {
668+ return writes .isEmpty ();
669+ }
657670 }
658671
659- List <WriteOperation > getWrites () {
660- return writes ;
672+ void forEachWrite (Consumer <Write > consumer ) {
673+ synchronized (writes ) {
674+ for (WriteOperation writeOperation : writes ) {
675+ consumer .accept (writeOperation .write );
676+ }
677+ }
661678 }
662679
663680 /** Get the number of writes. */
664681 public int getMutationsSize () {
665- return writes .size ();
682+ synchronized (writes ) {
683+ return writes .size ();
684+ }
666685 }
667686
668687 @ Override
669688 public String toString () {
689+ final String writesAsString ;
690+ synchronized (writes ) {
691+ writesAsString = writes .toString ();
692+ }
693+
670694 return String .format (
671- "%s{writes=%s, committed=%s}" , getClass ().getSimpleName (), writes , committed );
695+ "%s{writes=%s, committed=%s}" , getClass ().getSimpleName (), writesAsString , committed );
672696 }
673697}
0 commit comments