Skip to content

Commit f4357da

Browse files
igorbernstein2garrettjonesgoogle
authored andcommitted
---
yaml --- r: 9083 b: refs/heads/master c: 487c2a9 h: refs/heads/master i: 9081: 121f9e4 9079: d3024f5
1 parent d7122f3 commit f4357da

8 files changed

Lines changed: 471 additions & 9 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
refs/heads/master: c5d35baa06dbf2e355b56d2a726fccccbe11aacb
2+
refs/heads/master: 487c2a9d45a42da20e6d895503068b6ddd804322
33
refs/heads/travis: 47e4fee4fd5af9b2a8ce46f23c72ec95f9b195b2
44
refs/heads/gh-pages: 8e9b065ba06cd7a4af306aaea1010aade81670e0
55
refs/tags/0.0.9: 22f1839238f66c39e67ed4dfdcd273b1ae2e8444

trunk/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
import com.google.api.gax.rpc.ApiCallContext;
2121
import com.google.api.gax.rpc.Callables;
2222
import com.google.api.gax.rpc.ClientContext;
23-
import com.google.api.gax.rpc.ResponseObserver;
2423
import com.google.api.gax.rpc.ServerStreamingCallable;
2524
import com.google.api.gax.rpc.UnaryCallable;
25+
import com.google.bigtable.v2.ReadRowsRequest;
2626
import com.google.bigtable.v2.SampleRowKeysRequest;
2727
import com.google.bigtable.v2.SampleRowKeysResponse;
2828
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
@@ -34,6 +34,9 @@
3434
import com.google.cloud.bigtable.data.v2.models.Row;
3535
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
3636
import com.google.cloud.bigtable.data.v2.models.RowMutation;
37+
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
38+
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
39+
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
3740
import java.io.IOException;
3841
import java.util.List;
3942

@@ -105,15 +108,38 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
105108
}
106109

107110
// <editor-fold desc="Callable creators">
111+
112+
/**
113+
* Creates a callable chain to handle ReadRows RPCs. The chain will:
114+
*
115+
* <ul>
116+
* <li>Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest} and
117+
* dispatch the RPC.
118+
* <li>Upon receiving the response stream, it will merge the {@link
119+
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
120+
* implementation can be configured in {@link
121+
* com.google.cloud.bigtable.data.v2.BigtableDataSettings}.
122+
* <li>Retry/resume on failure.
123+
* <li>Filter out marker rows.
124+
* </ul>
125+
*/
108126
public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
109127
RowAdapter<RowT> rowAdapter) {
110-
return new ServerStreamingCallable<Query, RowT>() {
111-
@Override
112-
public void call(
113-
Query query, ResponseObserver<RowT> responseObserver, ApiCallContext context) {
114-
throw new UnsupportedOperationException("todo");
115-
}
116-
};
128+
129+
ServerStreamingCallable<ReadRowsRequest, RowT> merging =
130+
new RowMergingCallable<>(stub.readRowsCallable(), rowAdapter);
131+
132+
FilterMarkerRowsCallable<RowT> filtering = new FilterMarkerRowsCallable<>(merging, rowAdapter);
133+
134+
ServerStreamingCallable<ReadRowsRequest, RowT> withContext =
135+
filtering.withDefaultCallContext(clientContext.getDefaultCallContext());
136+
137+
// NOTE: Ideally `withDefaultCallContext` should be the outer-most callable, however the
138+
// ReadRowsUserCallable overrides the first() method. This override would be lost if
139+
// ReadRowsUserCallable is wrapped by another callable. At some point in the future,
140+
// gax-java should allow preserving these kind of overrides through callable chains, at which
141+
// point this should be re-ordered.
142+
return new ReadRowsUserCallable<>(withContext, requestContext);
117143
}
118144

119145
/**
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright 2018 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub.readrows;
17+
18+
import com.google.api.core.InternalApi;
19+
import com.google.api.gax.rpc.ApiCallContext;
20+
import com.google.api.gax.rpc.ResponseObserver;
21+
import com.google.api.gax.rpc.ServerStreamingCallable;
22+
import com.google.api.gax.rpc.StreamController;
23+
import com.google.bigtable.v2.ReadRowsRequest;
24+
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
25+
26+
/**
27+
* Remove the special marker rows generated by {@link RowMergingCallable}.
28+
*
29+
* <p>This class is considered an internal implementation detail and not meant to be used by
30+
* applications.
31+
*/
32+
@InternalApi
33+
public class FilterMarkerRowsCallable<RowT> extends ServerStreamingCallable<ReadRowsRequest, RowT> {
34+
private final ServerStreamingCallable<ReadRowsRequest, RowT> innerCallable;
35+
private final RowAdapter<RowT> rowAdapter;
36+
37+
public FilterMarkerRowsCallable(
38+
ServerStreamingCallable<ReadRowsRequest, RowT> inner, RowAdapter<RowT> rowAdapter) {
39+
this.rowAdapter = rowAdapter;
40+
this.innerCallable = inner;
41+
}
42+
43+
@Override
44+
public void call(
45+
ReadRowsRequest request, ResponseObserver<RowT> responseObserver, ApiCallContext context) {
46+
FilteringResponseObserver innerObserver = new FilteringResponseObserver(responseObserver);
47+
innerCallable.call(request, innerObserver, context);
48+
}
49+
50+
private class FilteringResponseObserver implements ResponseObserver<RowT> {
51+
private final ResponseObserver<RowT> outerObserver;
52+
private StreamController innerController;
53+
private boolean autoFlowControl = true;
54+
55+
FilteringResponseObserver(ResponseObserver<RowT> outerObserver) {
56+
this.outerObserver = outerObserver;
57+
}
58+
59+
@Override
60+
public void onStart(final StreamController controller) {
61+
innerController = controller;
62+
63+
outerObserver.onStart(
64+
new StreamController() {
65+
@Override
66+
public void cancel() {
67+
controller.cancel();
68+
}
69+
70+
@Override
71+
public void disableAutoInboundFlowControl() {
72+
autoFlowControl = false;
73+
controller.disableAutoInboundFlowControl();
74+
}
75+
76+
@Override
77+
public void request(int count) {
78+
controller.request(count);
79+
}
80+
});
81+
}
82+
83+
@Override
84+
public void onResponse(RowT response) {
85+
if (rowAdapter.isScanMarkerRow(response)) {
86+
if (!autoFlowControl) {
87+
innerController.request(1);
88+
}
89+
} else {
90+
outerObserver.onResponse(response);
91+
}
92+
}
93+
94+
@Override
95+
public void onError(Throwable t) {
96+
outerObserver.onError(t);
97+
}
98+
99+
@Override
100+
public void onComplete() {
101+
outerObserver.onComplete();
102+
}
103+
}
104+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2018 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub.readrows;
17+
18+
import com.google.api.core.ApiFuture;
19+
import com.google.api.gax.rpc.ApiCallContext;
20+
import com.google.api.gax.rpc.UnaryCallable;
21+
import com.google.cloud.bigtable.data.v2.models.Query;
22+
23+
/**
24+
* Enhancement for `readRowsCallable().first()` to gracefully limit the row count instead of
25+
* cancelling the RPC
26+
*/
27+
class ReadRowsFirstCallable<RowT> extends UnaryCallable<Query, RowT> {
28+
private final UnaryCallable<Query, RowT> inner;
29+
30+
ReadRowsFirstCallable(UnaryCallable<Query, RowT> inner) {
31+
this.inner = inner;
32+
}
33+
34+
@Override
35+
public ApiFuture<RowT> futureCall(Query query, ApiCallContext context) {
36+
return inner.futureCall(query.limit(1), context);
37+
}
38+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2018 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub.readrows;
17+
18+
import com.google.api.core.InternalApi;
19+
import com.google.api.gax.rpc.ApiCallContext;
20+
import com.google.api.gax.rpc.ResponseObserver;
21+
import com.google.api.gax.rpc.ServerStreamingCallable;
22+
import com.google.api.gax.rpc.UnaryCallable;
23+
import com.google.bigtable.v2.ReadRowsRequest;
24+
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
25+
import com.google.cloud.bigtable.data.v2.models.Query;
26+
27+
/**
28+
* Simple wrapper for ReadRows to wrap the request protobufs.
29+
*
30+
* <p>This class is considered an internal implementation detail and not meant to be used by
31+
* applications.
32+
*/
33+
@InternalApi
34+
public class ReadRowsUserCallable<RowT> extends ServerStreamingCallable<Query, RowT> {
35+
private final ServerStreamingCallable<ReadRowsRequest, RowT> inner;
36+
private final RequestContext requestContext;
37+
private final ReadRowsFirstCallable<RowT> firstCallable;
38+
39+
public ReadRowsUserCallable(
40+
ServerStreamingCallable<ReadRowsRequest, RowT> inner, RequestContext requestContext) {
41+
this.inner = inner;
42+
this.requestContext = requestContext;
43+
44+
this.firstCallable = new ReadRowsFirstCallable<>(super.first());
45+
}
46+
47+
@Override
48+
public void call(Query request, ResponseObserver<RowT> responseObserver, ApiCallContext context) {
49+
ReadRowsRequest innerRequest = request.toProto(requestContext);
50+
inner.call(innerRequest, responseObserver, context);
51+
}
52+
53+
// Optimization: since the server supports row limits, override the first callable.
54+
// This way unnecessary data doesn't need to be buffered and the number of CANCELLED request
55+
// statuses is minimized
56+
@Override
57+
public UnaryCallable<Query, RowT> first() {
58+
return firstCallable;
59+
}
60+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright 2018 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub.readrows;
17+
18+
import static com.google.common.truth.Truth.assertThat;
19+
20+
import com.google.api.gax.rpc.ServerStream;
21+
import com.google.bigtable.v2.ReadRowsRequest;
22+
import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter;
23+
import com.google.cloud.bigtable.data.v2.models.Row;
24+
import com.google.cloud.bigtable.data.v2.models.RowAdapter.RowBuilder;
25+
import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable;
26+
import com.google.common.collect.Lists;
27+
import com.google.protobuf.ByteString;
28+
import org.junit.Test;
29+
import org.junit.runner.RunWith;
30+
import org.junit.runners.JUnit4;
31+
32+
@RunWith(JUnit4.class)
33+
public class FilterMarkerRowsCallableTest {
34+
private final DefaultRowAdapter rowAdapter = new DefaultRowAdapter();
35+
36+
@Test
37+
public void testEmpty() {
38+
ServerStreamingStashCallable<ReadRowsRequest, Row> innerCallable =
39+
new ServerStreamingStashCallable<>();
40+
FilterMarkerRowsCallable<Row> filterCallable =
41+
new FilterMarkerRowsCallable<>(innerCallable, rowAdapter);
42+
43+
ServerStream<Row> results = filterCallable.call(ReadRowsRequest.getDefaultInstance());
44+
45+
assertThat(results).isEmpty();
46+
}
47+
48+
@Test
49+
public void testOnlyMarker() {
50+
ServerStreamingStashCallable<ReadRowsRequest, Row> innerCallable =
51+
new ServerStreamingStashCallable<>(Lists.newArrayList(buildScanMarker()));
52+
FilterMarkerRowsCallable<Row> filterCallable =
53+
new FilterMarkerRowsCallable<>(innerCallable, rowAdapter);
54+
55+
ServerStream<Row> results = filterCallable.call(ReadRowsRequest.getDefaultInstance());
56+
57+
assertThat(results).isEmpty();
58+
}
59+
60+
@Test
61+
public void testRealRow() {
62+
Row row = buildRealRow();
63+
64+
ServerStreamingStashCallable<ReadRowsRequest, Row> innerCallable =
65+
new ServerStreamingStashCallable<>(Lists.newArrayList(row));
66+
FilterMarkerRowsCallable<Row> filterCallable =
67+
new FilterMarkerRowsCallable<>(innerCallable, rowAdapter);
68+
69+
ServerStream<Row> results = filterCallable.call(ReadRowsRequest.getDefaultInstance());
70+
71+
assertThat(results).containsExactly(row);
72+
}
73+
74+
@Test
75+
public void testMixed() {
76+
Row row = buildRealRow();
77+
Row markerRow = buildScanMarker();
78+
79+
ServerStreamingStashCallable<ReadRowsRequest, Row> innerCallable =
80+
new ServerStreamingStashCallable<>(Lists.newArrayList(row, markerRow));
81+
FilterMarkerRowsCallable<Row> filterCallable =
82+
new FilterMarkerRowsCallable<>(innerCallable, rowAdapter);
83+
84+
ServerStream<Row> results = filterCallable.call(ReadRowsRequest.getDefaultInstance());
85+
86+
assertThat(results).containsExactly(row);
87+
}
88+
89+
private Row buildRealRow() {
90+
RowBuilder<Row> rowBuilder = rowAdapter.createRowBuilder();
91+
rowBuilder.startRow(ByteString.copyFromUtf8("fake-key"));
92+
rowBuilder.startCell(
93+
"fake-family",
94+
ByteString.copyFromUtf8("fake-qualifer"),
95+
1_000,
96+
Lists.<String>newArrayList(),
97+
0);
98+
rowBuilder.cellValue(ByteString.copyFromUtf8("fake-value"));
99+
rowBuilder.finishCell();
100+
return rowBuilder.finishRow();
101+
}
102+
103+
private Row buildScanMarker() {
104+
return rowAdapter.createRowBuilder().createScanMarkerRow(ByteString.copyFromUtf8("fake-key2"));
105+
}
106+
}

0 commit comments

Comments
 (0)