Skip to content

Commit 175c17e

Browse files
igorbernstein2pongad
authored andcommitted
Bigtable: 14. Implement Bulk Mutations (#2987)
1 parent b869e87 commit 175c17e

8 files changed

Lines changed: 983 additions & 9 deletions

File tree

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/DummyBatchingDescriptor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
* are exposed to the user using the {@link com.google.cloud.bigtable.data.v2.models.RowMutation}
2929
* wrapper, but the actual descriptor works on the underlying {@link
3030
* com.google.bigtable.v2.MutateRowsRequest}s. This class is used as a placeholder for the settings
31-
* and is replaced with the actual implementation of
32-
* com.google.cloud.bigtable.data.v2.stub.bulkmutaterows.BulkMutateRowsBatchingDescriptor when
33-
* constructing the callable chain.
31+
* and is replaced with the actual implementation of {@link
32+
* com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor} when constructing
33+
* the callable chain.
3434
*
3535
* <p>This class is considered an internal implementation detail and not meant to be used by
3636
* applications.

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,14 @@
1919
import com.google.api.core.InternalApi;
2020
import com.google.api.gax.retrying.RetrySettings;
2121
import com.google.api.gax.rpc.ApiCallContext;
22+
import com.google.api.gax.rpc.BatchingCallSettings;
2223
import com.google.api.gax.rpc.Callables;
2324
import com.google.api.gax.rpc.ClientContext;
2425
import com.google.api.gax.rpc.ServerStreamingCallSettings;
2526
import com.google.api.gax.rpc.ServerStreamingCallable;
2627
import com.google.api.gax.rpc.UnaryCallable;
28+
import com.google.bigtable.v2.MutateRowsRequest;
29+
import com.google.bigtable.v2.MutateRowsResponse;
2730
import com.google.bigtable.v2.ReadRowsRequest;
2831
import com.google.bigtable.v2.SampleRowKeysRequest;
2932
import com.google.bigtable.v2.SampleRowKeysResponse;
@@ -36,6 +39,9 @@
3639
import com.google.cloud.bigtable.data.v2.models.Row;
3740
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
3841
import com.google.cloud.bigtable.data.v2.models.RowMutation;
42+
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
43+
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsSpoolingCallable;
44+
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsUserFacingCallable;
3945
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
4046
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy;
4147
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable;
@@ -103,6 +109,15 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
103109
.setRetryableCodes(settings.mutateRowSettings().getRetryableCodes())
104110
.setRetrySettings(settings.mutateRowSettings().getRetrySettings());
105111

112+
// MutateRows(BulkMutateRows) retries are handled in the overlay: disable retries in the base
113+
// layer
114+
baseSettingsBuilder
115+
.mutateRowsSettings()
116+
.setSimpleTimeoutNoRetries(Duration.ofHours(2))
117+
.setRetryableCodes(settings.mutateRowsSettings().getRetryableCodes())
118+
.setTimeoutCheckInterval(Duration.ZERO)
119+
.setIdleTimeout(Duration.ZERO);
120+
106121
// CheckAndMutateRow is a simple passthrough
107122
baseSettingsBuilder
108123
.checkAndMutateRowSettings()
@@ -230,13 +245,37 @@ private UnaryCallable<RowMutation, Void> createMutateRowCallable() {
230245
return userFacing.withDefaultCallContext(clientContext.getDefaultCallContext());
231246
}
232247

248+
/**
249+
* Creates a callable chain to handle MutatesRows RPCs. The chain will:
250+
*
251+
* <ul>
252+
* <li>Convert a {@link RowMutation} into a {@link MutateRowsRequest} with a single entry.
253+
* <li>Using gax's {@link com.google.api.gax.rpc.BatchingCallable} to spool the requests and
254+
* aggregate the {@link MutateRowsRequest.Entry}s.
255+
* <li>Spool the streamed responses.
256+
* <li>Split the responses using {@link MutateRowsBatchingDescriptor}.
257+
* <li>Apply retries to individual mutations
258+
* </ul>
259+
*/
233260
private UnaryCallable<RowMutation, Void> createMutateRowsCallable() {
234-
return new UnaryCallable<RowMutation, Void>() {
235-
@Override
236-
public ApiFuture<Void> futureCall(RowMutation request, ApiCallContext context) {
237-
throw new UnsupportedOperationException("todo");
238-
}
239-
};
261+
MutateRowsSpoolingCallable spooling = new MutateRowsSpoolingCallable(stub.mutateRowsCallable());
262+
263+
// recreate BatchingCallSettings with the correct descriptor
264+
BatchingCallSettings.Builder<MutateRowsRequest, MutateRowsResponse> batchingCallSettings =
265+
BatchingCallSettings.newBuilder(
266+
new MutateRowsBatchingDescriptor(settings.mutateRowsSettings().getRetryableCodes()))
267+
.setBatchingSettings(settings.mutateRowsSettings().getBatchingSettings());
268+
269+
UnaryCallable<MutateRowsRequest, MutateRowsResponse> batching =
270+
Callables.batching(spooling, batchingCallSettings.build(), clientContext);
271+
272+
UnaryCallable<MutateRowsRequest, MutateRowsResponse> retrying =
273+
Callables.retrying(batching, settings.mutateRowsSettings(), clientContext);
274+
275+
MutateRowsUserFacingCallable userFacing =
276+
new MutateRowsUserFacingCallable(retrying, requestContext);
277+
278+
return userFacing.withDefaultCallContext(clientContext.getDefaultCallContext());
240279
}
241280

242281
/**
@@ -284,6 +323,10 @@ public UnaryCallable<RowMutation, Void> mutateRowCallable() {
284323
return mutateRowCallable;
285324
}
286325

326+
/**
327+
* Returns the callable chain created in {@link #createMutateRowsCallable()} during stub
328+
* construction.
329+
*/
287330
public UnaryCallable<RowMutation, Void> mutateRowsCallable() {
288331
return mutateRowsCallable;
289332
}
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Copyright 2018 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub.mutaterows;
17+
18+
import com.google.api.core.InternalApi;
19+
import com.google.api.gax.batching.PartitionKey;
20+
import com.google.api.gax.batching.RequestBuilder;
21+
import com.google.api.gax.grpc.GrpcStatusCode;
22+
import com.google.api.gax.rpc.ApiException;
23+
import com.google.api.gax.rpc.ApiExceptionFactory;
24+
import com.google.api.gax.rpc.BatchedRequestIssuer;
25+
import com.google.api.gax.rpc.BatchingDescriptor;
26+
import com.google.api.gax.rpc.StatusCode;
27+
import com.google.bigtable.v2.MutateRowsRequest;
28+
import com.google.bigtable.v2.MutateRowsResponse;
29+
import com.google.common.base.Preconditions;
30+
import com.google.common.collect.ImmutableSet;
31+
import com.google.common.primitives.Ints;
32+
import com.google.rpc.Code;
33+
import com.google.rpc.Status;
34+
import io.grpc.StatusException;
35+
import io.grpc.StatusRuntimeException;
36+
import java.util.Collection;
37+
import java.util.Set;
38+
39+
/**
40+
* A custom implementation of a {@link BatchingDescriptor} to split individual results of a bulk
41+
* MutateRowsResponse. Each individual result will be matched with its issuer. Since the embedded
42+
* results bypass gax's result processing chains, this class is responsible for wrapping errors in
43+
* {@link ApiException}s and marking each error as retryable.
44+
*
45+
* <p>This class is considered an internal implementation detail and not meant to be used by
46+
* applications directly.
47+
*/
48+
@InternalApi
49+
public class MutateRowsBatchingDescriptor
50+
implements BatchingDescriptor<MutateRowsRequest, MutateRowsResponse> {
51+
52+
// Shared response to notify individual issuers of a successful mutation.
53+
private static final MutateRowsResponse OK_RESPONSE =
54+
MutateRowsResponse.newBuilder()
55+
.addEntries(
56+
MutateRowsResponse.Entry.newBuilder()
57+
.setIndex(0)
58+
.setStatus(Status.newBuilder().setCode(Code.OK_VALUE)))
59+
.build();
60+
61+
private final ImmutableSet<StatusCode.Code> retryableCodes;
62+
63+
public MutateRowsBatchingDescriptor(Set<StatusCode.Code> retryableCodes) {
64+
this.retryableCodes = ImmutableSet.copyOf(retryableCodes);
65+
}
66+
67+
/** Return the target table name. This will be used to combine batcheable requests */
68+
@Override
69+
public PartitionKey getBatchPartitionKey(MutateRowsRequest request) {
70+
return new PartitionKey(request.getTableName());
71+
}
72+
73+
/** {@inheritDoc} */
74+
@Override
75+
public RequestBuilder<MutateRowsRequest> getRequestBuilder() {
76+
return new MyRequestBuilder();
77+
}
78+
79+
/** {@inheritDoc} */
80+
@Override
81+
public void splitResponse(
82+
MutateRowsResponse batchResponse,
83+
Collection<? extends BatchedRequestIssuer<MutateRowsResponse>> batch) {
84+
85+
// Sort the result entries by index.
86+
Status[] sortedEntries = new Status[batchResponse.getEntriesCount()];
87+
88+
for (MutateRowsResponse.Entry entry : batchResponse.getEntriesList()) {
89+
int index = Ints.checkedCast(entry.getIndex());
90+
Preconditions.checkState(
91+
sortedEntries[index] == null, "Got multiple results for the same sub-mutation");
92+
sortedEntries[index] = entry.getStatus();
93+
}
94+
95+
// Notify all of issuers of the corresponding result.
96+
int i = 0;
97+
for (BatchedRequestIssuer<MutateRowsResponse> issuer : batch) {
98+
Status entry = sortedEntries[i++];
99+
Preconditions.checkState(entry != null, "Missing result for entry");
100+
101+
if (entry.getCode() == Code.OK_VALUE) {
102+
issuer.setResponse(OK_RESPONSE);
103+
} else {
104+
issuer.setException(createElementException(entry));
105+
}
106+
}
107+
}
108+
109+
/** {@inheritDoc} */
110+
@Override
111+
public void splitException(
112+
Throwable throwable, Collection<? extends BatchedRequestIssuer<MutateRowsResponse>> batch) {
113+
throwable = createElementException(throwable);
114+
115+
for (BatchedRequestIssuer<MutateRowsResponse> responder : batch) {
116+
responder.setException(throwable);
117+
}
118+
}
119+
120+
/** {@inheritDoc} */
121+
@Override
122+
public long countElements(MutateRowsRequest request) {
123+
return request.getEntriesCount();
124+
}
125+
126+
/** {@inheritDoc} */
127+
@Override
128+
public long countBytes(MutateRowsRequest request) {
129+
return request.getSerializedSize();
130+
}
131+
132+
/** Convert an element error Status into an ApiException */
133+
private ApiException createElementException(Status protoStatus) {
134+
Preconditions.checkArgument(protoStatus.getCode() != Code.OK_VALUE, "OK is not an error");
135+
136+
StatusRuntimeException throwable =
137+
io.grpc.Status.fromCodeValue(protoStatus.getCode())
138+
.withDescription(protoStatus.getMessage())
139+
.asRuntimeException();
140+
141+
return createElementException(throwable);
142+
}
143+
144+
/** Convert a Throwable into an ApiException, marking it as retryable when appropriate. */
145+
private ApiException createElementException(Throwable throwable) {
146+
final io.grpc.Status.Code code;
147+
148+
if (throwable instanceof ApiException) {
149+
return (ApiException) throwable;
150+
} else if (throwable instanceof StatusRuntimeException) {
151+
code = ((StatusRuntimeException) throwable).getStatus().getCode();
152+
} else if (throwable instanceof StatusException) {
153+
code = ((StatusException) throwable).getStatus().getCode();
154+
} else {
155+
code = io.grpc.Status.Code.UNKNOWN;
156+
}
157+
158+
GrpcStatusCode gaxStatusCode = GrpcStatusCode.of(code);
159+
boolean isRetryable = retryableCodes.contains(gaxStatusCode.getCode());
160+
161+
return ApiExceptionFactory.createException(throwable, gaxStatusCode, isRetryable);
162+
}
163+
164+
/** A {@link com.google.api.gax.batching.RequestBuilder} that can aggregate MutateRowsRequest */
165+
static class MyRequestBuilder implements RequestBuilder<MutateRowsRequest> {
166+
private MutateRowsRequest.Builder builder;
167+
168+
@Override
169+
public void appendRequest(MutateRowsRequest request) {
170+
if (builder == null) {
171+
builder = request.toBuilder();
172+
} else {
173+
builder.addAllEntries(request.getEntriesList());
174+
}
175+
}
176+
177+
@Override
178+
public MutateRowsRequest build() {
179+
return builder.build();
180+
}
181+
}
182+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2018 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub.mutaterows;
17+
18+
import com.google.api.core.ApiFunction;
19+
import com.google.api.core.ApiFuture;
20+
import com.google.api.core.ApiFutures;
21+
import com.google.api.core.InternalApi;
22+
import com.google.api.gax.rpc.ApiCallContext;
23+
import com.google.api.gax.rpc.ServerStreamingCallable;
24+
import com.google.api.gax.rpc.UnaryCallable;
25+
import com.google.bigtable.v2.MutateRowsRequest;
26+
import com.google.bigtable.v2.MutateRowsResponse;
27+
import java.util.List;
28+
29+
/**
30+
* Converts a stream of {@link MutateRowsResponse}s into a unary MutateRowsResponse. This is
31+
* necessary to adapt Cloud Bigtable API to work with gax's batching infrastructure.
32+
*
33+
* <p>This class is considered an internal implementation detail and not meant to be used by
34+
* applications.
35+
*/
36+
@InternalApi
37+
public class MutateRowsSpoolingCallable
38+
extends UnaryCallable<MutateRowsRequest, MutateRowsResponse> {
39+
40+
private final ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> inner;
41+
42+
public MutateRowsSpoolingCallable(
43+
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> inner) {
44+
this.inner = inner;
45+
}
46+
47+
@Override
48+
public ApiFuture<MutateRowsResponse> futureCall(
49+
MutateRowsRequest request, ApiCallContext context) {
50+
ApiFuture<List<MutateRowsResponse>> rawResponse = inner.all().futureCall(request, context);
51+
52+
return ApiFutures.transform(
53+
rawResponse,
54+
new ApiFunction<List<MutateRowsResponse>, MutateRowsResponse>() {
55+
@Override
56+
public MutateRowsResponse apply(List<MutateRowsResponse> input) {
57+
return convertResponse(input);
58+
}
59+
});
60+
}
61+
62+
private MutateRowsResponse convertResponse(List<MutateRowsResponse> responses) {
63+
if (responses.size() == 1) {
64+
return responses.get(0);
65+
} else {
66+
MutateRowsResponse.Builder fullResponseBuilder = MutateRowsResponse.newBuilder();
67+
for (MutateRowsResponse subResponse : responses) {
68+
fullResponseBuilder.addAllEntries(subResponse.getEntriesList());
69+
}
70+
return fullResponseBuilder.build();
71+
}
72+
}
73+
}

0 commit comments

Comments
 (0)