Skip to content

Commit 817d81a

Browse files
igorbernstein2pongad
authored andcommitted
---
yaml --- r: 8739 b: refs/heads/master c: 7131ecc h: refs/heads/master i: 8737: 84aadf4 8735: 561a332
1 parent 82e605a commit 817d81a

11 files changed

Lines changed: 2245 additions & 4 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
refs/heads/master: 3fd597b73adc551f855e8199eb08ab32c9b4f948
2+
refs/heads/master: 7131ecccd21d8cd51eb6fa01d062d0502c501a4e
33
refs/heads/travis: 47e4fee4fd5af9b2a8ce46f23c72ec95f9b195b2
44
refs/heads/gh-pages: 6daca92127d91b7c2c99490080ecf8a13fa94cde
55
refs/tags/0.0.9: 22f1839238f66c39e67ed4dfdcd273b1ae2e8444
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2018 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.internal;
17+
18+
import com.google.api.core.InternalApi;
19+
import com.google.protobuf.ByteString;
20+
import java.util.Comparator;
21+
22+
/** Compares {@link ByteString}s as unsigned byte arrays. */
23+
@InternalApi
24+
public class ByteStringComparator implements Comparator<ByteString> {
25+
public static final ByteStringComparator INSTANCE = new ByteStringComparator();
26+
27+
@Override
28+
public int compare(ByteString o1, ByteString o2) {
29+
int sizeA = o1.size();
30+
int sizeB = o2.size();
31+
int shortestSize = Math.min(sizeA, sizeB);
32+
for (int i = 0; i < shortestSize; i++) {
33+
int byteA = o1.byteAt(i) & 0xff;
34+
int byteB = o2.byteAt(i) & 0xff;
35+
36+
if (byteA != byteB) {
37+
return byteA < byteB ? -1 : 1;
38+
}
39+
}
40+
if (sizeA == sizeB) {
41+
return 0;
42+
}
43+
return sizeA < sizeB ? -1 : 1;
44+
}
45+
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Copyright 2018 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub.readrows;
17+
18+
import com.google.api.core.InternalApi;
19+
import com.google.bigtable.v2.ReadRowsResponse;
20+
import com.google.cloud.bigtable.data.v2.wrappers.RowAdapter.RowBuilder;
21+
import com.google.cloud.bigtable.gaxx.reframing.Reframer;
22+
import com.google.common.base.Preconditions;
23+
24+
/**
25+
* An implementation of a {@link Reframer} that feeds the row merging {@link StateMachine}.
26+
*
27+
* <p>{@link com.google.cloud.bigtable.gaxx.reframing.ReframingResponseObserver} pushes {@link
28+
* ReadRowsResponse.CellChunk}s into this class and pops fully merged logical rows. Example usage:
29+
*
30+
* <pre>{@code
31+
* RowMerger<Row> rowMerger = new RowMerger<>(myRowBuilder);
32+
*
33+
* while(responseIterator.hasNext()) {
34+
* ReadRowsResponse response = responseIterator.next();
35+
*
36+
* if (rowMerger.hasFullFrame()) {
37+
* Row row = rowMerger.pop();
38+
* // Do something with row.
39+
* } else {
40+
* rowMerger.push(response);
41+
* }
42+
* }
43+
*
44+
* if (rowMerger.hasPartialFrame()) {
45+
* throw new RuntimeException("Incomplete stream");
46+
* }
47+
*
48+
* }</pre>
49+
*
50+
* <p>This class is considered an internal implementation detail and not meant to be used by
51+
* applications.
52+
*
53+
* <p>Package-private for internal use.
54+
*
55+
* @see com.google.cloud.bigtable.gaxx.reframing.ReframingResponseObserver for more details
56+
*/
57+
@InternalApi
58+
public class RowMerger<RowT> implements Reframer<RowT, ReadRowsResponse> {
59+
private final StateMachine<RowT> stateMachine;
60+
private ReadRowsResponse buffer;
61+
private int nextChunk;
62+
private RowT nextRow;
63+
64+
public RowMerger(RowBuilder<RowT> rowBuilder) {
65+
stateMachine = new StateMachine<>(rowBuilder);
66+
67+
nextChunk = 0;
68+
buffer = ReadRowsResponse.getDefaultInstance();
69+
}
70+
71+
@Override
72+
public void push(ReadRowsResponse response) {
73+
Preconditions.checkState(
74+
buffer.getChunksCount() <= nextChunk, "Previous response not fully consumed");
75+
76+
buffer = response;
77+
nextChunk = 0;
78+
79+
// If the server sends a scan heartbeat, notify the StateMachine. It will generate a synthetic
80+
// row marker. See RowAdapter for more info.
81+
if (!response.getLastScannedRowKey().isEmpty()) {
82+
stateMachine.handleLastScannedRow(response.getLastScannedRowKey());
83+
}
84+
}
85+
86+
@Override
87+
public boolean hasFullFrame() {
88+
// Check if there an assembled row to consume
89+
if (nextRow != null) {
90+
return true;
91+
}
92+
93+
// Otherwise try to assemble a new row (readNextRow will set nextRow)
94+
boolean newRowCompleted = readNextRow();
95+
return newRowCompleted;
96+
}
97+
98+
@Override
99+
public boolean hasPartialFrame() {
100+
// Check if any of the buffers in this class contain data.
101+
// `hasFullFrame()` will check if `nextRow` has a row ready to go or if chunks in `buffer` can
102+
// be used to create a new `nextRow`
103+
if (hasFullFrame()) {
104+
return true;
105+
}
106+
107+
// If an assembled is still not available, then that means `buffer` has been fully consumed.
108+
// The last place to check is the StateMachine buffer, to see if its holding on to an incomplete
109+
// row.
110+
return stateMachine.isRowInProgress();
111+
}
112+
113+
@Override
114+
public RowT pop() {
115+
RowT row = nextRow;
116+
nextRow = null;
117+
return row;
118+
}
119+
120+
private boolean readNextRow() {
121+
// StateMachine might have a complete row already from receiving a scan marker.
122+
if (stateMachine.hasCompleteRow()) {
123+
nextRow = stateMachine.consumeRow();
124+
return true;
125+
}
126+
127+
while (nextChunk < buffer.getChunksCount()) {
128+
stateMachine.handleChunk(buffer.getChunks(nextChunk++));
129+
130+
if (stateMachine.hasCompleteRow()) {
131+
nextRow = stateMachine.consumeRow();
132+
return true;
133+
}
134+
}
135+
return false;
136+
}
137+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2018 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub.readrows;
17+
18+
import com.google.api.core.InternalApi;
19+
import com.google.api.gax.rpc.ApiCallContext;
20+
import com.google.api.gax.rpc.ResponseObserver;
21+
import com.google.api.gax.rpc.ServerStreamingCallable;
22+
import com.google.bigtable.v2.ReadRowsRequest;
23+
import com.google.bigtable.v2.ReadRowsResponse;
24+
import com.google.cloud.bigtable.data.v2.wrappers.RowAdapter;
25+
import com.google.cloud.bigtable.data.v2.wrappers.RowAdapter.RowBuilder;
26+
import com.google.cloud.bigtable.gaxx.reframing.ReframingResponseObserver;
27+
28+
/**
29+
* A ServerStreamingCallable that will merge {@link
30+
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s into logical rows. This class delegates all
31+
* of the work to gax's {@link ReframingResponseObserver} and the logic to {@link RowMerger}.
32+
*
33+
* <p>This class is considered an internal implementation detail and not meant to be used by
34+
* applications.
35+
*/
36+
@InternalApi
37+
public class RowMergingCallable<RowT> extends ServerStreamingCallable<ReadRowsRequest, RowT> {
38+
private final ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> inner;
39+
private final RowAdapter<RowT> rowAdapter;
40+
41+
public RowMergingCallable(
42+
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> inner,
43+
RowAdapter<RowT> rowAdapter) {
44+
this.inner = inner;
45+
this.rowAdapter = rowAdapter;
46+
}
47+
48+
@Override
49+
public void call(
50+
ReadRowsRequest request, ResponseObserver<RowT> responseObserver, ApiCallContext context) {
51+
RowBuilder<RowT> rowBuilder = rowAdapter.createRowBuilder();
52+
RowMerger<RowT> merger = new RowMerger<>(rowBuilder);
53+
ReframingResponseObserver<ReadRowsResponse, RowT> innerObserver =
54+
new ReframingResponseObserver<>(responseObserver, merger);
55+
inner.call(request, innerObserver, context);
56+
}
57+
}

0 commit comments

Comments
 (0)