Skip to content

Commit 0125a13

Browse files
igorbernstein2garrettjonesgoogle
authored andcommitted
Bigtable: 21. Refactor Batching - Move retries behind batching (#3026)
1 parent 73f6c9a commit 0125a13

12 files changed

Lines changed: 1168 additions & 313 deletions
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright 2018 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.models;
17+
18+
import com.google.api.core.InternalApi;
19+
import com.google.api.gax.rpc.ApiException;
20+
import com.google.api.gax.rpc.StatusCode;
21+
import com.google.auto.value.AutoValue;
22+
import com.google.bigtable.v2.MutateRowsRequest;
23+
import com.google.common.base.Preconditions;
24+
import java.util.List;
25+
import javax.annotation.Nonnull;
26+
import javax.annotation.Nullable;
27+
28+
/**
29+
* Thrown by the MutateRows when at least one Mutation failed. If the last failure was caused by an
30+
* RPC error (as opposed to a single entry failing), then this exception's cause will be set to that
31+
* error and {@link #getFailedMutations()} will contain synthetic errors for all of the entries that
32+
* were part of that RPC.
33+
*/
34+
public final class MutateRowsException extends ApiException {
35+
// Synthetic status to use for this ApiException subclass.
36+
private static final StatusCode LOCAL_STATUS =
37+
new StatusCode() {
38+
@Override
39+
public Code getCode() {
40+
return Code.INTERNAL;
41+
}
42+
43+
@Override
44+
public Object getTransportCode() {
45+
return null;
46+
}
47+
};
48+
49+
private final List<FailedMutation> failedMutations;
50+
51+
/**
52+
* This constructor is considered an internal implementation detail and not meant to be used by
53+
* applications.
54+
*/
55+
@InternalApi
56+
public MutateRowsException(
57+
@Nullable Throwable rpcError,
58+
@Nonnull List<FailedMutation> failedMutations,
59+
boolean retryable) {
60+
super("Some mutations failed to apply", rpcError, LOCAL_STATUS, retryable);
61+
Preconditions.checkNotNull(failedMutations);
62+
Preconditions.checkArgument(!failedMutations.isEmpty(), "failedMutations can't be empty");
63+
this.failedMutations = failedMutations;
64+
}
65+
66+
/**
67+
* Retrieve all of the failed mutations. This list will contain failures for all of the mutations
68+
* that have failed across all of the retry attempts so far.
69+
*/
70+
@Nonnull
71+
public List<FailedMutation> getFailedMutations() {
72+
return failedMutations;
73+
}
74+
75+
/**
76+
* Identifies which mutation failed and the reason it failed. The mutation is identified by it's
77+
* index in the original request's {@link MutateRowsRequest#getEntriesList()}.
78+
*/
79+
@AutoValue
80+
public abstract static class FailedMutation {
81+
/**
82+
* This method is considered an internal implementation detail and not meant to be used by
83+
* applications.
84+
*/
85+
@InternalApi
86+
@Nonnull
87+
public static FailedMutation create(int index, ApiException error) {
88+
return new AutoValue_MutateRowsException_FailedMutation(index, error);
89+
}
90+
91+
/**
92+
* The index of the mutation in the original request's {@link
93+
* MutateRowsRequest#getEntriesList()}.
94+
*/
95+
public abstract int getIndex();
96+
97+
/**
98+
* The error that prevented this mutation from being applied. Please note, that if the entire
99+
* RPC attempt failed, all mutations that were part of the attempt will have take on the same
100+
* error.
101+
*/
102+
@Nonnull
103+
public abstract ApiException getError();
104+
}
105+
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,18 @@
1515
*/
1616
package com.google.cloud.bigtable.data.v2.stub;
1717

18-
import com.google.api.core.ApiFuture;
1918
import com.google.api.core.InternalApi;
20-
import com.google.api.gax.retrying.RetrySettings;
21-
import com.google.api.gax.rpc.ApiCallContext;
19+
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
20+
import com.google.api.gax.retrying.RetryAlgorithm;
21+
import com.google.api.gax.retrying.RetryingExecutor;
22+
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
2223
import com.google.api.gax.rpc.BatchingCallSettings;
2324
import com.google.api.gax.rpc.Callables;
2425
import com.google.api.gax.rpc.ClientContext;
2526
import com.google.api.gax.rpc.ServerStreamingCallSettings;
2627
import com.google.api.gax.rpc.ServerStreamingCallable;
2728
import com.google.api.gax.rpc.UnaryCallable;
2829
import com.google.bigtable.v2.MutateRowsRequest;
29-
import com.google.bigtable.v2.MutateRowsResponse;
3030
import com.google.bigtable.v2.ReadRowsRequest;
3131
import com.google.bigtable.v2.SampleRowKeysRequest;
3232
import com.google.bigtable.v2.SampleRowKeysResponse;
@@ -40,13 +40,14 @@
4040
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
4141
import com.google.cloud.bigtable.data.v2.models.RowMutation;
4242
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
43-
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsSpoolingCallable;
43+
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
4444
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsUserFacingCallable;
4545
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
4646
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy;
4747
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable;
4848
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
4949
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
50+
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
5051
import java.io.IOException;
5152
import java.util.List;
5253
import org.threeten.bp.Duration;
@@ -251,28 +252,41 @@ private UnaryCallable<RowMutation, Void> createMutateRowCallable() {
251252
* <li>Convert a {@link RowMutation} into a {@link MutateRowsRequest} with a single entry.
252253
* <li>Using gax's {@link com.google.api.gax.rpc.BatchingCallable} to spool the requests and
253254
* aggregate the {@link MutateRowsRequest.Entry}s.
254-
* <li>Spool the streamed responses.
255+
* <li>Process the response and schedule retries. At the end of each attempt, entries that have
256+
* been applied, are filtered from the next attempt. Also, any entries that failed with a
257+
* nontransient error, are filtered from the next attempt. This will continue until there
258+
* are no more entries or there are no more retry attempts left.
259+
* <li>Wrap batch failures in a {@link
260+
* com.google.cloud.bigtable.data.v2.models.MutateRowsException}.
255261
* <li>Split the responses using {@link MutateRowsBatchingDescriptor}.
256-
* <li>Apply retries to individual mutations
257262
* </ul>
258263
*/
259264
private UnaryCallable<RowMutation, Void> createMutateRowsCallable() {
260-
MutateRowsSpoolingCallable spooling = new MutateRowsSpoolingCallable(stub.mutateRowsCallable());
265+
RetryAlgorithm<Void> retryAlgorithm =
266+
new RetryAlgorithm<>(
267+
new ApiResultRetryAlgorithm<Void>(),
268+
new ExponentialRetryAlgorithm(
269+
settings.mutateRowsSettings().getRetrySettings(), clientContext.getClock()));
270+
RetryingExecutor<Void> retryingExecutor =
271+
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());
272+
273+
UnaryCallable<MutateRowsRequest, Void> retrying =
274+
new MutateRowsRetryingCallable(
275+
clientContext.getDefaultCallContext(),
276+
stub.mutateRowsCallable(),
277+
retryingExecutor,
278+
settings.mutateRowsSettings().getRetryableCodes());
261279

262280
// recreate BatchingCallSettings with the correct descriptor
263-
BatchingCallSettings.Builder<MutateRowsRequest, MutateRowsResponse> batchingCallSettings =
264-
BatchingCallSettings.newBuilder(
265-
new MutateRowsBatchingDescriptor(settings.mutateRowsSettings().getRetryableCodes()))
281+
BatchingCallSettings.Builder<MutateRowsRequest, Void> batchingCallSettings =
282+
BatchingCallSettings.newBuilder(new MutateRowsBatchingDescriptor())
266283
.setBatchingSettings(settings.mutateRowsSettings().getBatchingSettings());
267284

268-
UnaryCallable<MutateRowsRequest, MutateRowsResponse> batching =
269-
Callables.batching(spooling, batchingCallSettings.build(), clientContext);
270-
271-
UnaryCallable<MutateRowsRequest, MutateRowsResponse> retrying =
272-
Callables.retrying(batching, settings.mutateRowsSettings(), clientContext);
285+
UnaryCallable<MutateRowsRequest, Void> batching =
286+
Callables.batching(retrying, batchingCallSettings.build(), clientContext);
273287

274288
MutateRowsUserFacingCallable userFacing =
275-
new MutateRowsUserFacingCallable(retrying, requestContext);
289+
new MutateRowsUserFacingCallable(batching, requestContext);
276290

277291
return userFacing.withDefaultCallContext(clientContext.getDefaultCallContext());
278292
}

0 commit comments

Comments
 (0)