Skip to content

Commit 9432956

Browse files
igorbernstein2garrettjonesgoogle
authored andcommitted
Bigtable: 16. Implement ReadModifyWriteRow (#2981)
1 parent c64e907 commit 9432956

3 files changed

Lines changed: 252 additions & 6 deletions

File tree

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,12 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
9595
.setRetryableCodes(settings.checkAndMutateRowSettings().getRetryableCodes())
9696
.setRetrySettings(settings.checkAndMutateRowSettings().getRetrySettings());
9797

98+
// ReadModifyWriteRow is a simple passthrough
99+
baseSettingsBuilder
100+
.readModifyWriteRowSettings()
101+
.setRetryableCodes(settings.readModifyWriteRowSettings().getRetryableCodes())
102+
.setRetrySettings(settings.readModifyWriteRowSettings().getRetrySettings());
103+
98104
BigtableStubSettings baseSettings = baseSettingsBuilder.build();
99105
ClientContext clientContext = ClientContext.create(baseSettings);
100106
GrpcBigtableStub stub = new GrpcBigtableStub(baseSettings, clientContext);
@@ -215,13 +221,20 @@ private UnaryCallable<ConditionalRowMutation, Boolean> createCheckAndMutateRowCa
215221
return userFacing.withDefaultCallContext(clientContext.getDefaultCallContext());
216222
}
217223

224+
/**
225+
* Creates a callable chain to handle ReadModifyWriteRow RPCs. The chain will:
226+
*
227+
* <ul>
228+
* <li>Convert {@link ReadModifyWriteRow}s into {@link
229+
* com.google.bigtable.v2.ReadModifyWriteRowRequest}s.
230+
* <li>Convert the responses into {@link Row}.
231+
* </ul>
232+
*/
218233
private UnaryCallable<ReadModifyWriteRow, Row> createReadModifyWriteRowCallable() {
219-
return new UnaryCallable<ReadModifyWriteRow, Row>() {
220-
@Override
221-
public ApiFuture<Row> futureCall(ReadModifyWriteRow request, ApiCallContext context) {
222-
throw new UnsupportedOperationException("todo");
223-
}
224-
};
234+
ReadModifyWriteRowCallable userFacing =
235+
new ReadModifyWriteRowCallable(stub.readModifyWriteRowCallable(), requestContext);
236+
237+
return userFacing.withDefaultCallContext(clientContext.getDefaultCallContext());
225238
}
226239
// </editor-fold>
227240

@@ -250,6 +263,10 @@ public UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable(
250263
return checkAndMutateRowCallable;
251264
}
252265

266+
/**
267+
* Returns the callable chain created in {@link #createReadModifyWriteRowCallable()} ()} during
268+
* stub construction.
269+
*/
253270
public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
254271
return readModifyWriteRowCallable;
255272
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2018 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub;
17+
18+
import com.google.api.core.ApiFunction;
19+
import com.google.api.core.ApiFuture;
20+
import com.google.api.core.ApiFutures;
21+
import com.google.api.gax.rpc.ApiCallContext;
22+
import com.google.api.gax.rpc.UnaryCallable;
23+
import com.google.bigtable.v2.Cell;
24+
import com.google.bigtable.v2.Column;
25+
import com.google.bigtable.v2.Family;
26+
import com.google.bigtable.v2.ReadModifyWriteRowRequest;
27+
import com.google.bigtable.v2.ReadModifyWriteRowResponse;
28+
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
29+
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
30+
import com.google.cloud.bigtable.data.v2.models.Row;
31+
import com.google.cloud.bigtable.data.v2.models.RowCell;
32+
import com.google.common.collect.ImmutableList;
33+
34+
/** Simple wrapper for ReadModifyWriteRow to wrap the request and response protobufs. */
35+
class ReadModifyWriteRowCallable extends UnaryCallable<ReadModifyWriteRow, Row> {
36+
private final UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> inner;
37+
private final RequestContext requestContext;
38+
39+
ReadModifyWriteRowCallable(
40+
UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> inner,
41+
RequestContext requestContext) {
42+
this.inner = inner;
43+
this.requestContext = requestContext;
44+
}
45+
46+
@Override
47+
public ApiFuture<Row> futureCall(ReadModifyWriteRow request, ApiCallContext context) {
48+
ApiFuture<ReadModifyWriteRowResponse> rawResponse =
49+
inner.futureCall(request.toProto(requestContext), context);
50+
51+
return ApiFutures.transform(
52+
rawResponse,
53+
new ApiFunction<ReadModifyWriteRowResponse, Row>() {
54+
@Override
55+
public Row apply(ReadModifyWriteRowResponse readModifyWriteRowResponse) {
56+
return convertResponse(readModifyWriteRowResponse);
57+
}
58+
});
59+
}
60+
61+
private Row convertResponse(ReadModifyWriteRowResponse response) {
62+
ImmutableList.Builder<RowCell> cells = ImmutableList.builder();
63+
64+
for (Family family : response.getRow().getFamiliesList()) {
65+
for (Column column : family.getColumnsList()) {
66+
for (Cell cell : column.getCellsList()) {
67+
cells.add(
68+
RowCell.create(
69+
family.getName(),
70+
column.getQualifier(),
71+
cell.getTimestampMicros(),
72+
cell.getLabelsList(),
73+
cell.getValue()));
74+
}
75+
}
76+
}
77+
return Row.create(response.getRow().getKey(), cells.build());
78+
}
79+
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Copyright 2018 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub;
17+
18+
import static com.google.common.truth.Truth.assertThat;
19+
20+
import com.google.api.core.ApiFuture;
21+
import com.google.api.core.SettableApiFuture;
22+
import com.google.api.gax.grpc.GrpcStatusCode;
23+
import com.google.api.gax.rpc.ApiCallContext;
24+
import com.google.api.gax.rpc.NotFoundException;
25+
import com.google.api.gax.rpc.UnaryCallable;
26+
import com.google.bigtable.admin.v2.InstanceName;
27+
import com.google.bigtable.v2.Cell;
28+
import com.google.bigtable.v2.Column;
29+
import com.google.bigtable.v2.Family;
30+
import com.google.bigtable.v2.ReadModifyWriteRowRequest;
31+
import com.google.bigtable.v2.ReadModifyWriteRowResponse;
32+
import com.google.bigtable.v2.ReadModifyWriteRule;
33+
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
34+
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
35+
import com.google.cloud.bigtable.data.v2.models.Row;
36+
import com.google.cloud.bigtable.data.v2.models.RowCell;
37+
import com.google.common.collect.ImmutableList;
38+
import com.google.protobuf.ByteString;
39+
import io.grpc.Status.Code;
40+
import java.util.concurrent.ExecutionException;
41+
import java.util.concurrent.TimeUnit;
42+
import org.junit.Before;
43+
import org.junit.Test;
44+
import org.junit.runner.RunWith;
45+
import org.junit.runners.JUnit4;
46+
47+
@RunWith(JUnit4.class)
48+
public class ReadModifyWriteRowCallableTest {
49+
private final RequestContext requestContext =
50+
RequestContext.create(InstanceName.of("my-project", "my-instance"), "my-app-profile");
51+
private FakeCallable inner;
52+
private ReadModifyWriteRowCallable callable;
53+
54+
@Before
55+
public void setUp() {
56+
inner = new FakeCallable();
57+
callable = new ReadModifyWriteRowCallable(inner, requestContext);
58+
}
59+
60+
@Test
61+
public void requestIsCorrect() {
62+
callable.futureCall(
63+
ReadModifyWriteRow.create("my-table", "my-key").append("my-family", "", "suffix"));
64+
65+
assertThat(inner.request)
66+
.isEqualTo(
67+
ReadModifyWriteRowRequest.newBuilder()
68+
.setTableName(requestContext.getInstanceName() + "/tables/my-table")
69+
.setAppProfileId(requestContext.getAppProfileId())
70+
.setRowKey(ByteString.copyFromUtf8("my-key"))
71+
.addRules(
72+
ReadModifyWriteRule.newBuilder()
73+
.setFamilyName("my-family")
74+
.setColumnQualifier(ByteString.EMPTY)
75+
.setAppendValue(ByteString.copyFromUtf8("suffix")))
76+
.build());
77+
}
78+
79+
@Test
80+
public void responseCorrectlyTransformed() throws Exception {
81+
ApiFuture<Row> result =
82+
callable.futureCall(
83+
ReadModifyWriteRow.create("my-table", "my-key").append("my-family", "col", "suffix"));
84+
85+
inner.response.set(
86+
ReadModifyWriteRowResponse.newBuilder()
87+
.setRow(
88+
com.google.bigtable.v2.Row.newBuilder()
89+
.setKey(ByteString.copyFromUtf8("my-key"))
90+
.addFamilies(
91+
Family.newBuilder()
92+
.setName("my-family")
93+
.addColumns(
94+
Column.newBuilder()
95+
.setQualifier(ByteString.copyFromUtf8("col"))
96+
.addCells(
97+
Cell.newBuilder()
98+
.setTimestampMicros(1_000)
99+
.setValue(ByteString.copyFromUtf8("suffix"))))))
100+
.build());
101+
102+
assertThat(result.get(1, TimeUnit.SECONDS))
103+
.isEqualTo(
104+
Row.create(
105+
ByteString.copyFromUtf8("my-key"),
106+
ImmutableList.of(
107+
RowCell.create(
108+
"my-family",
109+
ByteString.copyFromUtf8("col"),
110+
1_000,
111+
ImmutableList.<String>of(),
112+
ByteString.copyFromUtf8("suffix")))));
113+
}
114+
115+
@Test
116+
public void errorIsPropagated() throws Exception {
117+
ApiFuture<Row> result =
118+
callable.futureCall(
119+
ReadModifyWriteRow.create("my-table", "my-key").append("my-family", "", "suffix"));
120+
121+
Throwable expectedError =
122+
new NotFoundException("fake error", null, GrpcStatusCode.of(Code.NOT_FOUND), false);
123+
inner.response.setException(expectedError);
124+
125+
Throwable actualError = null;
126+
try {
127+
result.get(1, TimeUnit.SECONDS);
128+
} catch (ExecutionException e) {
129+
actualError = e.getCause();
130+
}
131+
132+
assertThat(actualError).isEqualTo(expectedError);
133+
}
134+
135+
static class FakeCallable
136+
extends UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> {
137+
ReadModifyWriteRowRequest request;
138+
ApiCallContext callContext;
139+
SettableApiFuture<ReadModifyWriteRowResponse> response = SettableApiFuture.create();
140+
141+
@Override
142+
public ApiFuture<ReadModifyWriteRowResponse> futureCall(
143+
ReadModifyWriteRowRequest request, ApiCallContext context) {
144+
this.request = request;
145+
this.callContext = context;
146+
147+
return response;
148+
}
149+
}
150+
}

0 commit comments

Comments
 (0)