|
15 | 15 | */ |
16 | 16 | package com.google.cloud.bigtable.data.v2.stub; |
17 | 17 |
|
18 | | -import com.google.api.core.ApiFuture; |
19 | 18 | import com.google.api.core.InternalApi; |
20 | | -import com.google.api.gax.retrying.RetrySettings; |
21 | | -import com.google.api.gax.rpc.ApiCallContext; |
| 19 | +import com.google.api.gax.retrying.ExponentialRetryAlgorithm; |
| 20 | +import com.google.api.gax.retrying.RetryAlgorithm; |
| 21 | +import com.google.api.gax.retrying.RetryingExecutor; |
| 22 | +import com.google.api.gax.retrying.ScheduledRetryingExecutor; |
22 | 23 | import com.google.api.gax.rpc.BatchingCallSettings; |
23 | 24 | import com.google.api.gax.rpc.Callables; |
24 | 25 | import com.google.api.gax.rpc.ClientContext; |
25 | 26 | import com.google.api.gax.rpc.ServerStreamingCallSettings; |
26 | 27 | import com.google.api.gax.rpc.ServerStreamingCallable; |
27 | 28 | import com.google.api.gax.rpc.UnaryCallable; |
28 | 29 | import com.google.bigtable.v2.MutateRowsRequest; |
29 | | -import com.google.bigtable.v2.MutateRowsResponse; |
30 | 30 | import com.google.bigtable.v2.ReadRowsRequest; |
31 | 31 | import com.google.bigtable.v2.SampleRowKeysRequest; |
32 | 32 | import com.google.bigtable.v2.SampleRowKeysResponse; |
|
40 | 40 | import com.google.cloud.bigtable.data.v2.models.RowAdapter; |
41 | 41 | import com.google.cloud.bigtable.data.v2.models.RowMutation; |
42 | 42 | import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor; |
43 | | -import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsSpoolingCallable; |
| 43 | +import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable; |
44 | 44 | import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsUserFacingCallable; |
45 | 45 | import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable; |
46 | 46 | import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy; |
47 | 47 | import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable; |
48 | 48 | import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable; |
49 | 49 | import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable; |
| 50 | +import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm; |
50 | 51 | import java.io.IOException; |
51 | 52 | import java.util.List; |
52 | 53 | import org.threeten.bp.Duration; |
@@ -251,28 +252,41 @@ private UnaryCallable<RowMutation, Void> createMutateRowCallable() { |
251 | 252 | * <li>Convert a {@link RowMutation} into a {@link MutateRowsRequest} with a single entry. |
252 | 253 | * <li>Using gax's {@link com.google.api.gax.rpc.BatchingCallable} to spool the requests and |
253 | 254 | * aggregate the {@link MutateRowsRequest.Entry}s. |
254 | | - * <li>Spool the streamed responses. |
| 255 | + * <li>Process the response and schedule retries. At the end of each attempt, entries that have |
| 256 | + * been applied, are filtered from the next attempt. Also, any entries that failed with a |
| 257 | + * nontransient error, are filtered from the next attempt. This will continue until there |
| 258 | + * are no more entries or there are no more retry attempts left. |
| 259 | + * <li>Wrap batch failures in a {@link |
| 260 | + * com.google.cloud.bigtable.data.v2.models.MutateRowsException}. |
255 | 261 | * <li>Split the responses using {@link MutateRowsBatchingDescriptor}. |
256 | | - * <li>Apply retries to individual mutations |
257 | 262 | * </ul> |
258 | 263 | */ |
259 | 264 | private UnaryCallable<RowMutation, Void> createMutateRowsCallable() { |
260 | | - MutateRowsSpoolingCallable spooling = new MutateRowsSpoolingCallable(stub.mutateRowsCallable()); |
| 265 | + RetryAlgorithm<Void> retryAlgorithm = |
| 266 | + new RetryAlgorithm<>( |
| 267 | + new ApiResultRetryAlgorithm<Void>(), |
| 268 | + new ExponentialRetryAlgorithm( |
| 269 | + settings.mutateRowsSettings().getRetrySettings(), clientContext.getClock())); |
| 270 | + RetryingExecutor<Void> retryingExecutor = |
| 271 | + new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor()); |
| 272 | + |
| 273 | + UnaryCallable<MutateRowsRequest, Void> retrying = |
| 274 | + new MutateRowsRetryingCallable( |
| 275 | + clientContext.getDefaultCallContext(), |
| 276 | + stub.mutateRowsCallable(), |
| 277 | + retryingExecutor, |
| 278 | + settings.mutateRowsSettings().getRetryableCodes()); |
261 | 279 |
|
262 | 280 | // recreate BatchingCallSettings with the correct descriptor |
263 | | - BatchingCallSettings.Builder<MutateRowsRequest, MutateRowsResponse> batchingCallSettings = |
264 | | - BatchingCallSettings.newBuilder( |
265 | | - new MutateRowsBatchingDescriptor(settings.mutateRowsSettings().getRetryableCodes())) |
| 281 | + BatchingCallSettings.Builder<MutateRowsRequest, Void> batchingCallSettings = |
| 282 | + BatchingCallSettings.newBuilder(new MutateRowsBatchingDescriptor()) |
266 | 283 | .setBatchingSettings(settings.mutateRowsSettings().getBatchingSettings()); |
267 | 284 |
|
268 | | - UnaryCallable<MutateRowsRequest, MutateRowsResponse> batching = |
269 | | - Callables.batching(spooling, batchingCallSettings.build(), clientContext); |
270 | | - |
271 | | - UnaryCallable<MutateRowsRequest, MutateRowsResponse> retrying = |
272 | | - Callables.retrying(batching, settings.mutateRowsSettings(), clientContext); |
| 285 | + UnaryCallable<MutateRowsRequest, Void> batching = |
| 286 | + Callables.batching(retrying, batchingCallSettings.build(), clientContext); |
273 | 287 |
|
274 | 288 | MutateRowsUserFacingCallable userFacing = |
275 | | - new MutateRowsUserFacingCallable(retrying, requestContext); |
| 289 | + new MutateRowsUserFacingCallable(batching, requestContext); |
276 | 290 |
|
277 | 291 | return userFacing.withDefaultCallContext(clientContext.getDefaultCallContext()); |
278 | 292 | } |
|
0 commit comments