Skip to content

Commit a5494eb

Browse files
snehashah16andreamlin
authored andcommitted
Support for Batch Read API (#2953)
1 parent 80d6424 commit a5494eb

24 files changed

Lines changed: 1664 additions & 29 deletions
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2017 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
/**
20+
* Interface for the Batch Client that is used to read data from a Cloud Spanner database. An
21+
* instance of this is tied to a specific database.
22+
*
23+
* <p>{@code BatchClient} is useful when one wants to read or query a large amount of data from
24+
* Cloud Spanner across multiple processes, even across different machines. It allows to create
25+
* partitions of Cloud Spanner database and then read or query over each partition independently yet
26+
* at the same snapshot.
27+
*/
28+
public interface BatchClient {
29+
30+
/**
31+
* Returns a {@link BatchReadOnlyTransaction} context in which multiple reads and/or queries can
32+
* be performed. All reads/queries will use the same timestamp, and the timestamp can be inspected
33+
* after this transaction is created successfully. This is a blocking method
34+
* since it waits to finish the rpcs.
35+
*
36+
* <p>Note that the bounded staleness modes, {@link TimestampBound.Mode#MIN_READ_TIMESTAMP} and
37+
* {@link TimestampBound.Mode#MAX_STALENESS}, are not supported for
38+
* {@link BatchReadOnlyTransaction}.
39+
*
40+
* @param bound the timestamp bound at which to perform the read
41+
*/
42+
BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound);
43+
44+
/**
45+
* Returns a {@link BatchReadOnlyTransaction} context in which multiple reads and/or queries can
46+
* be performed. This is a non-blocking method. All reads/queries will use the same timestamp, and
47+
* the timestamp can be inspected after this transaction is created successfully.
48+
*
49+
* <p>This method is useful to recreate a BatchReadOnlyTransaction object from an existing
50+
* batchTransactionId. For example one might send the transaction id to a different process or
51+
* machine and recreate the transaction object there.
52+
*
53+
* @param batchTransactionId to re-initialize the transaction, re-using the timestamp for
54+
* successive read/query.
55+
*/
56+
BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batchTransactionId);
57+
}
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
/*
2+
* Copyright 2017 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import static com.google.common.base.Preconditions.checkNotNull;
20+
21+
import com.google.cloud.spanner.Options.QueryOption;
22+
import com.google.cloud.spanner.Options.ReadOption;
23+
import com.google.cloud.spanner.SpannerImpl.MultiUseReadOnlyTransaction;
24+
import com.google.cloud.spanner.SpannerImpl.SessionImpl;
25+
import com.google.cloud.spanner.spi.v1.SpannerRpc;
26+
import com.google.common.base.Preconditions;
27+
import com.google.common.collect.ImmutableList;
28+
import com.google.protobuf.Struct;
29+
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
30+
import com.google.spanner.v1.PartitionQueryRequest;
31+
import com.google.spanner.v1.PartitionReadRequest;
32+
import com.google.spanner.v1.PartitionResponse;
33+
import com.google.spanner.v1.TransactionSelector;
34+
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.concurrent.Callable;
38+
39+
/** Default implementation for Batch Client interface. */
40+
public class BatchClientImpl implements BatchClient {
41+
private final SpannerImpl spanner;
42+
private final DatabaseId db;
43+
44+
BatchClientImpl(DatabaseId db, SpannerImpl spanner) {
45+
this.db = checkNotNull(db);
46+
this.spanner = checkNotNull(spanner);
47+
}
48+
49+
@Override
50+
public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
51+
SessionImpl session = (SessionImpl) spanner.createSession(db);
52+
return new BatchReadOnlyTransactionImpl(spanner, session, checkNotNull(bound));
53+
}
54+
55+
@Override
56+
public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batchTransactionId) {
57+
SessionImpl session =
58+
spanner.sessionWithId(checkNotNull(batchTransactionId).getSessionId());
59+
return new BatchReadOnlyTransactionImpl(spanner, session, batchTransactionId);
60+
}
61+
62+
private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransaction
63+
implements BatchReadOnlyTransaction {
64+
private final String sessionName;
65+
private final Map<SpannerRpc.Option, ?> options;
66+
67+
BatchReadOnlyTransactionImpl(SpannerImpl spanner, SessionImpl session, TimestampBound bound) {
68+
super(
69+
checkNotNull(session),
70+
checkNotNull(bound),
71+
checkNotNull(spanner).getOptions().getSpannerRpcV1(),
72+
spanner.getOptions().getPrefetchChunks());
73+
this.sessionName = session.getName();
74+
this.options = session.getOptions();
75+
initTransaction();
76+
}
77+
78+
BatchReadOnlyTransactionImpl(
79+
SpannerImpl spanner, SessionImpl session, BatchTransactionId batchTransactionId) {
80+
super(
81+
checkNotNull(session),
82+
checkNotNull(batchTransactionId).getTransactionId(),
83+
batchTransactionId.getTimestamp(),
84+
checkNotNull(spanner).getOptions().getSpannerRpcV1(),
85+
spanner.getOptions().getPrefetchChunks());
86+
this.sessionName = session.getName();
87+
this.options = session.getOptions();
88+
}
89+
90+
@Override
91+
public BatchTransactionId getBatchTransactionId() {
92+
return new BatchTransactionId(sessionName, getTransactionId(), getReadTimestamp());
93+
}
94+
95+
@Override
96+
public List<Partition> partitionRead(
97+
PartitionOptions partitionOptions,
98+
String table,
99+
KeySet keys,
100+
Iterable<String> columns,
101+
ReadOption... options) throws SpannerException {
102+
return partitionReadUsingIndex(
103+
partitionOptions, table, null /*index*/, keys, columns, options);
104+
}
105+
106+
@Override
107+
public List<Partition> partitionReadUsingIndex(
108+
PartitionOptions partitionOptions,
109+
String table,
110+
String index,
111+
KeySet keys,
112+
Iterable<String> columns,
113+
ReadOption... option) throws SpannerException {
114+
Options readOptions = Options.fromReadOptions(option);
115+
Preconditions.checkArgument(
116+
!readOptions.hasLimit(),
117+
"Limit option not supported by partitionRead|partitionReadUsingIndex");
118+
final PartitionReadRequest.Builder builder =
119+
PartitionReadRequest.newBuilder()
120+
.setSession(sessionName)
121+
.setTable(checkNotNull(table))
122+
.addAllColumns(columns);
123+
keys.appendToProto(builder.getKeySetBuilder());
124+
if (index != null) {
125+
builder.setIndex(index);
126+
}
127+
TransactionSelector selector = getTransactionSelector();
128+
if (selector != null) {
129+
builder.setTransaction(selector);
130+
}
131+
com.google.spanner.v1.PartitionOptions.Builder pbuilder =
132+
com.google.spanner.v1.PartitionOptions.newBuilder();
133+
if (partitionOptions != null) {
134+
partitionOptions.appendToProto(pbuilder);
135+
}
136+
builder.setPartitionOptions(pbuilder.build());
137+
138+
final PartitionReadRequest request = builder.build();
139+
PartitionResponse response =
140+
SpannerImpl.runWithRetries(
141+
new Callable<PartitionResponse>() {
142+
@Override
143+
public PartitionResponse call() throws Exception {
144+
return rpc.partitionRead(request, options);
145+
}
146+
});
147+
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
148+
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
149+
Partition partition =
150+
Partition.createReadPartition(
151+
p.getPartitionToken(), partitionOptions, table, index, keys, columns, readOptions);
152+
partitions.add(partition);
153+
}
154+
return partitions.build();
155+
}
156+
157+
@Override
158+
public List<Partition> partitionQuery(
159+
PartitionOptions partitionOptions, Statement statement, QueryOption... option)
160+
throws SpannerException {
161+
Options queryOptions = Options.fromQueryOptions(option);
162+
final PartitionQueryRequest.Builder builder =
163+
PartitionQueryRequest.newBuilder()
164+
.setSession(sessionName)
165+
.setSql(statement.getSql());
166+
Map<String, Value> stmtParameters = statement.getParameters();
167+
if (!stmtParameters.isEmpty()) {
168+
Struct.Builder paramsBuilder = builder.getParamsBuilder();
169+
for (Map.Entry<String, Value> param : stmtParameters.entrySet()) {
170+
paramsBuilder.putFields(param.getKey(), param.getValue().toProto());
171+
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
172+
}
173+
}
174+
TransactionSelector selector = getTransactionSelector();
175+
if (selector != null) {
176+
builder.setTransaction(selector);
177+
}
178+
com.google.spanner.v1.PartitionOptions.Builder pbuilder =
179+
com.google.spanner.v1.PartitionOptions.newBuilder();
180+
if (partitionOptions != null) {
181+
partitionOptions.appendToProto(pbuilder);
182+
}
183+
builder.setPartitionOptions(pbuilder.build());
184+
185+
final PartitionQueryRequest request = builder.build();
186+
PartitionResponse response =
187+
SpannerImpl.runWithRetries(
188+
new Callable<PartitionResponse>() {
189+
@Override
190+
public PartitionResponse call() throws Exception {
191+
return rpc.partitionQuery(request, options);
192+
}
193+
});
194+
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
195+
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
196+
Partition partition =
197+
Partition.createQueryPartition(
198+
p.getPartitionToken(), partitionOptions, statement, queryOptions);
199+
partitions.add(partition);
200+
}
201+
return partitions.build();
202+
}
203+
204+
@Override
205+
public ResultSet execute(Partition partition) throws SpannerException {
206+
if (partition.getStatement() != null) {
207+
return executeQueryInternalWithOptions(
208+
partition.getStatement(),
209+
QueryMode.NORMAL,
210+
partition.getQueryOptions(),
211+
partition.getPartitionToken());
212+
}
213+
return readInternalWithOptions(
214+
partition.getTable(),
215+
partition.getIndex(),
216+
partition.getKeys(),
217+
partition.getColumns(),
218+
partition.getReadOptions(),
219+
partition.getPartitionToken());
220+
}
221+
222+
@Override
223+
public void close() {
224+
super.close();
225+
session.close();
226+
}
227+
}
228+
}

0 commit comments

Comments
 (0)