Skip to content

Commit f9cdab5

Browse files
tom-andersengcf-owl-bot[bot]JoeWang1127renovate-botdconeybe
authored
fix: Thread safe UpdateBuilder (#1537)
* Thread safe UpdateBuilder * Add comment * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * chore: add an unmanaged dependency check (#1532) * refactor: Optimize FieldMask instantiation (#1536) * Optimize FieldMask instantiation * Pretty * Use synchronize * Update comment * Make sure commit prevents writes. * Pretty * Refactor * Add comments and make committed volatile. * Make WriteOperation immutable. * Refactor * fix(deps): Update the Java code generator (gapic-generator-java) to 2.32.0 (#1534) * chore: Add FindNearest API to the preview branch docs: Improve the documentation on Document.fields PiperOrigin-RevId: 599602467 Source-Link: googleapis/googleapis@d32bd97 Source-Link: googleapis/googleapis-gen@0545ffc Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiMDU0NWZmYzQ4OGI4MmQzYTQ3NzExMThjOTIzZDY0Y2QwYjc1OTk1MyJ9 * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix(deps): Update the Java code generator (gapic-generator-java) to 2.32.0 PiperOrigin-RevId: 599914188 Source-Link: googleapis/googleapis@17e6661 Source-Link: googleapis/googleapis-gen@d86ba5b Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiZDg2YmE1YmU1MzdlNDg5NDM1MTA1Y2E4NTU2NmNjNDEwMzMwMWFiYSJ9 * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com> * test(deps): update dependency com.google.truth:truth to v1.3.0 (#1538) * make some methods static: applyFieldMask() and convertToFieldPaths() * Comment * Inline * use explicit synchronization * Review feedback --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com> Co-authored-by: Joe Wang <[email protected]> Co-authored-by: gcf-owl-bot[bot] <78513119+gcf-owl-bot[bot]@users.noreply.github.com> Co-authored-by: Mend Renovate <[email protected]> Co-authored-by: Denver Coneybeare <[email protected]>
1 parent 46e09aa commit f9cdab5

File tree

3 files changed

+107
-78
lines changed

3 files changed

+107
-78
lines changed

google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkCommitBatch.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -69,24 +69,21 @@ ApiFuture<WriteResult> wrapResult(int writeIndex) {
6969
* <p>The writes in the batch are not applied atomically and can be applied out of order.
7070
*/
7171
ApiFuture<Void> bulkCommit() {
72+
73+
// Follows same thread safety logic as `UpdateBuilder::commit`.
74+
committed = true;
75+
BatchWriteRequest request = buildBatchWriteRequest();
76+
7277
Tracing.getTracer()
7378
.getCurrentSpan()
7479
.addAnnotation(
7580
TraceUtil.SPAN_NAME_BATCHWRITE,
76-
ImmutableMap.of("numDocuments", AttributeValue.longAttributeValue(getWrites().size())));
77-
78-
final BatchWriteRequest.Builder request = BatchWriteRequest.newBuilder();
79-
request.setDatabase(firestore.getDatabaseName());
80-
81-
for (WriteOperation writeOperation : getWrites()) {
82-
request.addWrites(writeOperation.write);
83-
}
84-
85-
committed = true;
81+
ImmutableMap.of(
82+
"numDocuments", AttributeValue.longAttributeValue(request.getWritesCount())));
8683

8784
ApiFuture<BatchWriteResponse> response =
8885
processExceptions(
89-
firestore.sendRequest(request.build(), firestore.getClient().batchWriteCallable()));
86+
firestore.sendRequest(request, firestore.getClient().batchWriteCallable()));
9087

9188
return ApiFutures.transformAsync(
9289
response,
@@ -117,6 +114,13 @@ ApiFuture<Void> bulkCommit() {
117114
executor);
118115
}
119116

117+
private BatchWriteRequest buildBatchWriteRequest() {
118+
BatchWriteRequest.Builder builder = BatchWriteRequest.newBuilder();
119+
builder.setDatabase(firestore.getDatabaseName());
120+
forEachWrite(builder::addWrites);
121+
return builder.build();
122+
}
123+
120124
/** Maps an RPC failure to each individual write's result. */
121125
private ApiFuture<BatchWriteResponse> processExceptions(ApiFuture<BatchWriteResponse> response) {
122126
return ApiFutures.catching(

google-cloud-firestore/src/main/java/com/google/cloud/firestore/UpdateBuilder.java

Lines changed: 90 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,20 @@
3030
import com.google.firestore.v1.CommitResponse;
3131
import com.google.firestore.v1.Write;
3232
import com.google.protobuf.ByteString;
33+
import com.google.protobuf.Timestamp;
3334
import io.opencensus.trace.AttributeValue;
3435
import io.opencensus.trace.Tracing;
3536
import java.util.ArrayList;
3637
import java.util.HashMap;
38+
import java.util.HashSet;
3739
import java.util.List;
3840
import java.util.Map;
3941
import java.util.Map.Entry;
42+
import java.util.Set;
4043
import java.util.SortedSet;
4144
import java.util.TreeSet;
45+
import java.util.function.Consumer;
46+
import java.util.stream.Collectors;
4247
import javax.annotation.Nonnull;
4348
import javax.annotation.Nullable;
4449

@@ -48,11 +53,11 @@
4853
*/
4954
@InternalExtensionOnly
5055
public abstract class UpdateBuilder<T> {
51-
static class WriteOperation {
52-
Write.Builder write;
53-
DocumentReference documentReference;
56+
static final class WriteOperation {
57+
final Write write;
58+
final DocumentReference documentReference;
5459

55-
WriteOperation(DocumentReference documentReference, Write.Builder write) {
60+
WriteOperation(DocumentReference documentReference, Write write) {
5661
this.documentReference = documentReference;
5762
this.write = write;
5863
}
@@ -65,13 +70,11 @@ public String toString() {
6570

6671
final FirestoreImpl firestore;
6772

73+
// All reads and writes on `writes` must be done in a block that is synchronized on `writes`;
74+
// otherwise, you get undefined behavior.
6875
private final List<WriteOperation> writes = new ArrayList<>();
6976

70-
protected boolean committed;
71-
72-
boolean isCommitted() {
73-
return committed;
74-
}
77+
protected volatile boolean committed;
7578

7679
UpdateBuilder(FirestoreImpl firestore) {
7780
this.firestore = firestore;
@@ -141,7 +144,6 @@ public T create(
141144

142145
private T performCreate(
143146
@Nonnull DocumentReference documentReference, @Nonnull Map<String, Object> fields) {
144-
verifyNotCommitted();
145147
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_CREATEDOCUMENT);
146148
DocumentSnapshot documentSnapshot =
147149
DocumentSnapshot.fromObject(
@@ -157,17 +159,7 @@ private T performCreate(
157159
write.addAllUpdateTransforms(documentTransform.toPb());
158160
}
159161

160-
writes.add(new WriteOperation(documentReference, write));
161-
162-
return wrapResult(writes.size() - 1);
163-
}
164-
165-
private void verifyNotCommitted() {
166-
Preconditions.checkState(
167-
!isCommitted(),
168-
String.format(
169-
"Cannot modify a %s that has already been committed.",
170-
this.getClass().getSimpleName()));
162+
return addWrite(documentReference, write);
171163
}
172164

173165
/**
@@ -258,7 +250,6 @@ private T performSet(
258250
@Nonnull DocumentReference documentReference,
259251
@Nonnull Map<String, Object> fields,
260252
@Nonnull SetOptions options) {
261-
verifyNotCommitted();
262253
Map<FieldPath, Object> documentData;
263254

264255
if (options.getFieldMask() != null) {
@@ -293,23 +284,36 @@ private T performSet(
293284
write.setUpdateMask(documentMask.toPb());
294285
}
295286

296-
writes.add(new WriteOperation(documentReference, write));
287+
return addWrite(documentReference, write);
288+
}
297289

298-
return wrapResult(writes.size() - 1);
290+
private T addWrite(DocumentReference documentReference, Write.Builder write) {
291+
WriteOperation operation = new WriteOperation(documentReference, write.build());
292+
int writeIndex;
293+
synchronized (writes) {
294+
Preconditions.checkState(
295+
!committed,
296+
String.format(
297+
"Cannot modify a %s that has already been committed.",
298+
this.getClass().getSimpleName()));
299+
writes.add(operation);
300+
writeIndex = writes.size() - 1;
301+
}
302+
return wrapResult(writeIndex);
299303
}
300304

301305
/** Removes all values in 'fields' that are not specified in 'fieldMask'. */
302-
private Map<FieldPath, Object> applyFieldMask(
306+
private static Map<FieldPath, Object> applyFieldMask(
303307
Map<String, Object> fields, List<FieldPath> fieldMask) {
304-
List<FieldPath> remainingFields = new ArrayList<>(fieldMask);
308+
Set<FieldPath> remainingFields = new HashSet<>(fieldMask);
305309
Map<FieldPath, Object> filteredData =
306310
applyFieldMask(fields, remainingFields, FieldPath.empty());
307311

308312
if (!remainingFields.isEmpty()) {
309313
throw new IllegalArgumentException(
310314
String.format(
311315
"Field masks contains invalid path. No data exist at field '%s'.",
312-
remainingFields.get(0)));
316+
remainingFields.iterator().next()));
313317
}
314318

315319
return filteredData;
@@ -319,8 +323,8 @@ private Map<FieldPath, Object> applyFieldMask(
319323
* Strips all values in 'fields' that are not specified in 'fieldMask'. Modifies 'fieldMask'
320324
* inline and removes all matched fields.
321325
*/
322-
private Map<FieldPath, Object> applyFieldMask(
323-
Map<String, Object> fields, List<FieldPath> fieldMask, FieldPath root) {
326+
private static Map<FieldPath, Object> applyFieldMask(
327+
Map<String, Object> fields, Set<FieldPath> fieldMask, FieldPath root) {
324328
Map<FieldPath, Object> filteredMap = new HashMap<>();
325329

326330
for (Entry<String, Object> entry : fields.entrySet()) {
@@ -340,7 +344,7 @@ private Map<FieldPath, Object> applyFieldMask(
340344
return filteredMap;
341345
}
342346

343-
private Map<FieldPath, Object> convertToFieldPaths(
347+
private static Map<FieldPath, Object> convertToFieldPaths(
344348
@Nonnull Map<String, Object> fields, boolean splitOnDots) {
345349
Map<FieldPath, Object> fieldPaths = new HashMap<>();
346350

@@ -532,7 +536,6 @@ private T performUpdate(
532536
@Nonnull DocumentReference documentReference,
533537
@Nonnull final Map<FieldPath, Object> fields,
534538
@Nonnull Precondition precondition) {
535-
verifyNotCommitted();
536539
Preconditions.checkArgument(!fields.isEmpty(), "Data for update() cannot be empty.");
537540
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_UPDATEDOCUMENT);
538541
Map<String, Object> deconstructedMap = expandObject(fields);
@@ -567,9 +570,8 @@ public boolean allowTransform() {
567570
if (!documentTransform.isEmpty()) {
568571
write.addAllUpdateTransforms(documentTransform.toPb());
569572
}
570-
writes.add(new WriteOperation(documentReference, write));
571573

572-
return wrapResult(writes.size() - 1);
574+
return addWrite(documentReference, write);
573575
}
574576

575577
/**
@@ -598,76 +600,98 @@ public T delete(@Nonnull DocumentReference documentReference) {
598600

599601
private T performDelete(
600602
@Nonnull DocumentReference documentReference, @Nonnull Precondition precondition) {
601-
verifyNotCommitted();
602603
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_DELETEDOCUMENT);
603604
Write.Builder write = Write.newBuilder().setDelete(documentReference.getName());
604605

605606
if (!precondition.isEmpty()) {
606607
write.setCurrentDocument(precondition.toPb());
607608
}
608-
writes.add(new WriteOperation(documentReference, write));
609609

610-
return wrapResult(writes.size() - 1);
610+
return addWrite(documentReference, write);
611611
}
612612

613613
/** Commit the current batch. */
614614
ApiFuture<List<WriteResult>> commit(@Nullable ByteString transactionId) {
615+
616+
// Sequence is thread safe.
617+
//
618+
// 1. Set committed = true
619+
// 2. Build commit request
620+
//
621+
// Step 1 sets uses volatile property to ensure committed is visible to all
622+
// threads immediately.
623+
//
624+
// Step 2 uses `forEach(..)` that is synchronized, therefore will be blocked
625+
// until any writes are complete.
626+
//
627+
// Writes will verify `committed==false` within synchronized block of code
628+
// before appending writes. Since committed is set to true before accessing
629+
// writes, we are ensured that no more writes will be appended after commit
630+
// accesses writes.
631+
committed = true;
632+
CommitRequest request = buildCommitRequest(transactionId);
633+
615634
Tracing.getTracer()
616635
.getCurrentSpan()
617636
.addAnnotation(
618637
TraceUtil.SPAN_NAME_COMMIT,
619-
ImmutableMap.of("numDocuments", AttributeValue.longAttributeValue(writes.size())));
620-
621-
final CommitRequest.Builder request = CommitRequest.newBuilder();
622-
request.setDatabase(firestore.getDatabaseName());
623-
624-
for (WriteOperation writeOperation : writes) {
625-
request.addWrites(writeOperation.write);
626-
}
627-
628-
if (transactionId != null) {
629-
request.setTransaction(transactionId);
630-
}
631-
632-
committed = true;
638+
ImmutableMap.of(
639+
"numDocuments", AttributeValue.longAttributeValue(request.getWritesCount())));
633640

634641
ApiFuture<CommitResponse> response =
635-
firestore.sendRequest(request.build(), firestore.getClient().commitCallable());
642+
firestore.sendRequest(request, firestore.getClient().commitCallable());
636643

637644
return ApiFutures.transform(
638645
response,
639646
commitResponse -> {
640-
List<com.google.firestore.v1.WriteResult> writeResults =
641-
commitResponse.getWriteResultsList();
642-
643-
List<WriteResult> result = new ArrayList<>();
644-
645-
for (com.google.firestore.v1.WriteResult writeResult : writeResults) {
646-
result.add(WriteResult.fromProto(writeResult, commitResponse.getCommitTime()));
647-
}
648-
649-
return result;
647+
Timestamp commitTime = commitResponse.getCommitTime();
648+
return commitResponse.getWriteResultsList().stream()
649+
.map(writeResult -> WriteResult.fromProto(writeResult, commitTime))
650+
.collect(Collectors.toList());
650651
},
651652
MoreExecutors.directExecutor());
652653
}
653654

655+
private CommitRequest buildCommitRequest(ByteString transactionId) {
656+
CommitRequest.Builder builder = CommitRequest.newBuilder();
657+
builder.setDatabase(firestore.getDatabaseName());
658+
forEachWrite(builder::addWrites);
659+
if (transactionId != null) {
660+
builder.setTransaction(transactionId);
661+
}
662+
return builder.build();
663+
}
664+
654665
/** Checks whether any updates have been queued. */
655666
boolean isEmpty() {
656-
return writes.isEmpty();
667+
synchronized (writes) {
668+
return writes.isEmpty();
669+
}
657670
}
658671

659-
List<WriteOperation> getWrites() {
660-
return writes;
672+
void forEachWrite(Consumer<Write> consumer) {
673+
synchronized (writes) {
674+
for (WriteOperation writeOperation : writes) {
675+
consumer.accept(writeOperation.write);
676+
}
677+
}
661678
}
662679

663680
/** Get the number of writes. */
664681
public int getMutationsSize() {
665-
return writes.size();
682+
synchronized (writes) {
683+
return writes.size();
684+
}
666685
}
667686

668687
@Override
669688
public String toString() {
689+
final String writesAsString;
690+
synchronized (writes) {
691+
writesAsString = writes.toString();
692+
}
693+
670694
return String.format(
671-
"%s{writes=%s, committed=%s}", getClass().getSimpleName(), writes, committed);
695+
"%s{writes=%s, committed=%s}", getClass().getSimpleName(), writesAsString, committed);
672696
}
673697
}

google-cloud-firestore/src/test/java/com/google/cloud/firestore/ToStringTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ public void testWriteOperation() {
8686
documentReference,
8787
Collections.singletonMap("key", "value"),
8888
UserDataConverter.NO_DELETES)
89-
.toPb())
89+
.toPb()
90+
.build())
9091
.toString();
9192
assertThat(toStringResult).startsWith("WriteOperation{");
9293
assertThat(toStringResult)

0 commit comments

Comments
 (0)