Skip to content

Commit 5a29253

Browse files
authored
fix: RequestScheduling should handle rejected resource reservations (googleapis#24)
Custom FlowControlerStrategy implementations might, contrary to the default implementation, resolve reservation requests with exception, what we should handle by not performing the action that had to acquire the resources.
1 parent 01df100 commit 5a29253

File tree

5 files changed

+140
-14
lines changed

5 files changed

+140
-14
lines changed

bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,13 @@ public void onFailure(Throwable throwable) {
602602
secondaryWriteErrorConsumer.consume(writeOperationInfo.operations);
603603
}
604604
},
605-
flowController));
605+
flowController,
606+
new Runnable() {
607+
@Override
608+
public void run() {
609+
secondaryWriteErrorConsumer.consume(writeOperationInfo.operations);
610+
}
611+
}));
606612
}
607613

608614
private void scheduleSecondaryWriteBatchOperations(
@@ -631,7 +637,13 @@ private void scheduleSecondaryWriteBatchOperations(
631637
this.secondaryAsyncWrapper.batch(
632638
primarySplitResponse.allSuccessfulOperations, resultsSecondary),
633639
verificationFuture,
634-
this.flowController);
640+
this.flowController,
641+
new Runnable() {
642+
@Override
643+
public void run() {
644+
secondaryWriteErrorConsumer.consume(primarySplitResponse.successfulWrites);
645+
}
646+
});
635647
}
636648

637649
public static class WriteOperationInfo {

bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/RequestScheduling.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,23 @@ public static <T> ListenableFuture<Void> scheduleVerificationAndRequestWithFlowC
4040
final Supplier<ListenableFuture<T>> secondaryResultFutureSupplier,
4141
final FutureCallback<T> verificationCallback,
4242
final FlowController flowController) {
43+
return scheduleVerificationAndRequestWithFlowControl(
44+
requestResourcesDescription,
45+
secondaryResultFutureSupplier,
46+
verificationCallback,
47+
flowController,
48+
new Runnable() {
49+
@Override
50+
public void run() {}
51+
});
52+
}
53+
54+
public static <T> ListenableFuture<Void> scheduleVerificationAndRequestWithFlowControl(
55+
final RequestResourcesDescription requestResourcesDescription,
56+
final Supplier<ListenableFuture<T>> secondaryResultFutureSupplier,
57+
final FutureCallback<T> verificationCallback,
58+
final FlowController flowController,
59+
final Runnable flowControlReservationErrorConsumer) {
4360
final SettableFuture<Void> verificationCompletedFuture = SettableFuture.create();
4461

4562
final ListenableFuture<ResourceReservation> reservationRequest =
@@ -72,14 +89,15 @@ public void onFailure(Throwable throwable) {
7289
}
7390
},
7491
MoreExecutors.directExecutor());
75-
} catch (InterruptedException e) {
92+
} catch (InterruptedException | ExecutionException e) {
93+
flowControlReservationErrorConsumer.run();
7694
FlowController.cancelRequest(reservationRequest);
95+
7796
verificationCompletedFuture.set(null);
78-
Thread.currentThread().interrupt();
79-
} catch (ExecutionException e) {
80-
// We couldn't obtain reservation, this shouldn't happen.
81-
assert false;
82-
verificationCompletedFuture.set(null);
97+
98+
if (e instanceof InterruptedException) {
99+
Thread.currentThread().interrupt();
100+
}
83101
}
84102
return verificationCompletedFuture;
85103
}

bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/flowcontrol/FlowController.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@
2323

2424
/**
2525
* FlowController limits the number of concurrently performed requests to the secondary database.
26-
* Call to {@link #asyncRequestResource(RequestResourcesDescription)} returns object with a future
27-
* that will be notified when {@link FlowControlStrategy} decides that it can be allowed to perform
28-
* the requests.
26+
* Call to {@link #asyncRequestResource(RequestResourcesDescription)} returns a future that will be
27+
* completed when {@link FlowControlStrategy} decides that it can be allowed to perform the
28+
* requests. The future might also be completed exceptionally if the resource was not allowed to
29+
* obtain the resources.
2930
*
3031
* <p>Order of allowing requests in determined by {@link FlowControlStrategy}.
3132
*
@@ -45,15 +46,17 @@ public ListenableFuture<ResourceReservation> asyncRequestResource(
4546
}
4647

4748
public static void cancelRequest(Future<ResourceReservation> resourceReservationFuture) {
48-
// The cancellation may fail - then the resources have already been allocated by FlowController.
49-
// Then we must release them - the user wouldn't be able to do it on their own.
49+
// The cancellation may fail if the resources were already allocated by the FlowController, then
50+
// we should free them, or when the reservation was rejected, which we should ignore.
5051
if (!resourceReservationFuture.cancel(true)) {
5152
try {
5253
resourceReservationFuture.get().release();
53-
} catch (InterruptedException | ExecutionException ex) {
54+
} catch (InterruptedException ex) {
5455
// If we couldn't cancel the request, it must have already been set, we assume
5556
// that we will get the reservation without problems
5657
assert false;
58+
} catch (ExecutionException ex) {
59+
// The request was rejected.
5760
}
5861
}
5962
}

bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowController.ResourceReservation;
3939
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestResourcesDescription;
4040
import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector;
41+
import com.google.common.collect.ImmutableList;
4142
import com.google.common.primitives.Longs;
4243
import com.google.common.util.concurrent.ListenableFuture;
4344
import com.google.common.util.concurrent.SettableFuture;
@@ -1121,4 +1122,86 @@ public void testBatchCallbackWithoutResultParameter() throws IOException, Interr
11211122
verify(primaryTable, times(1)).batchCallback(eq(mutations), any(Object[].class), eq(callback));
11221123
verify(secondaryTable, never()).batch(ArgumentMatchers.<Row>anyList(), any(Object[].class));
11231124
}
1125+
1126+
@Test
1127+
public void testFlowControllerExceptionInGetPreventsSecondaryOperation() throws IOException {
1128+
SettableFuture<ResourceReservation> resourceReservationFuture = SettableFuture.create();
1129+
resourceReservationFuture.setException(new Exception("test"));
1130+
1131+
doReturn(resourceReservationFuture)
1132+
.when(flowController)
1133+
.asyncRequestResource(any(RequestResourcesDescription.class));
1134+
1135+
Get request = createGet("test");
1136+
Result expectedResult = createResult("test", "value");
1137+
1138+
when(primaryTable.get(request)).thenReturn(expectedResult);
1139+
1140+
Result result = mirroringTable.get(request);
1141+
executorServiceRule.waitForExecutor();
1142+
1143+
assertThat(result).isEqualTo(expectedResult);
1144+
1145+
verify(primaryTable, times(1)).get(request);
1146+
verify(secondaryTable, never()).get(any(Get.class));
1147+
}
1148+
1149+
@Test
1150+
public void testFlowControllerExceptionInPutExecutesWriteErrorHandler() throws IOException {
1151+
SettableFuture<ResourceReservation> resourceReservationFuture = SettableFuture.create();
1152+
resourceReservationFuture.setException(new Exception("test"));
1153+
1154+
doReturn(resourceReservationFuture)
1155+
.when(flowController)
1156+
.asyncRequestResource(any(RequestResourcesDescription.class));
1157+
1158+
Put request = createPut("test", "f1", "q1", "v1");
1159+
1160+
mirroringTable.put(request);
1161+
executorServiceRule.waitForExecutor();
1162+
1163+
verify(primaryTable, times(1)).put(request);
1164+
verify(secondaryTable, never()).get(any(Get.class));
1165+
verify(secondaryWriteErrorConsumer, times(1)).consume(ImmutableList.of(request));
1166+
}
1167+
1168+
@Test
1169+
public void testFlowControllerExceptionInBatchExecutesWriteErrorHandler()
1170+
throws IOException, InterruptedException {
1171+
SettableFuture<ResourceReservation> resourceReservationFuture = SettableFuture.create();
1172+
resourceReservationFuture.setException(new Exception("test"));
1173+
1174+
doReturn(resourceReservationFuture)
1175+
.when(flowController)
1176+
.asyncRequestResource(any(RequestResourcesDescription.class));
1177+
1178+
Put put1 = createPut("test0", "f1", "q1", "v1");
1179+
Put put2 = createPut("test1", "f1", "q2", "v1");
1180+
List<? extends Row> request = ImmutableList.of(put1, put2, createGet("test2"));
1181+
1182+
doAnswer(
1183+
new Answer<Void>() {
1184+
@Override
1185+
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
1186+
Object[] args = invocationOnMock.getArguments();
1187+
Object[] result = (Object[]) args[1];
1188+
1189+
// secondary
1190+
result[0] = Result.create(new Cell[0]);
1191+
result[1] = Result.create(new Cell[0]);
1192+
result[2] = Result.create(new Cell[0]);
1193+
return null;
1194+
}
1195+
})
1196+
.when(primaryTable)
1197+
.batch(eq(request), any(Object[].class));
1198+
1199+
Object[] results = new Object[3];
1200+
mirroringTable.batch(request, results);
1201+
executorServiceRule.waitForExecutor();
1202+
1203+
verify(primaryTable, times(1)).batch(request, results);
1204+
verify(secondaryTable, never()).batch(ArgumentMatchers.<Row>anyList(), any(Object[].class));
1205+
verify(secondaryWriteErrorConsumer, times(1)).consume(ImmutableList.of(put1, put2));
1206+
}
11241207
}

bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/flowcontrol/TestFlowController.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,4 +236,14 @@ public void testCancellingPendingReservationFuture() {
236236
FlowController.cancelRequest(grantedFuture);
237237
verify(reservation, never()).release();
238238
}
239+
240+
@Test
241+
public void testCancellingRejectedReservationFuture() {
242+
ResourceReservation reservation = mock(ResourceReservation.class);
243+
SettableFuture<ResourceReservation> notGrantedFuture = SettableFuture.create();
244+
notGrantedFuture.setException(new Exception("test"));
245+
246+
FlowController.cancelRequest(notGrantedFuture);
247+
verify(reservation, never()).release();
248+
}
239249
}

0 commit comments

Comments
 (0)