Skip to content

Commit f606ac3

Browse files
mmladenovskishollyman
authored andcommitted
---
yaml --- r: 33781 b: refs/heads/autosynth-redis c: 756b3d4 h: refs/heads/master i: 33779: 0185e54
1 parent 3eac3a2 commit f606ac3

3 files changed

Lines changed: 278 additions & 2 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ refs/heads/autosynth-iot: 044be280805a59e06d09658688c9ee474a9815ad
135135
refs/heads/autosynth-kms: d31449d6621a50fb16a4bef4f30f0f3051d27d7c
136136
refs/heads/autosynth-language: 6130869312f99a1e7d3aa0485759172a23333cc5
137137
refs/heads/autosynth-os-login: 49028d40ac477fca5f948cc5a3ce7422729fdb67
138-
refs/heads/autosynth-redis: 16cb98eef755a4b8c0828378f120fe54bc88a87b
138+
refs/heads/autosynth-redis: 756b3d4ff0a4253ef45ed1cdce2c1d22500cc922
139139
refs/heads/autosynth-scheduler: 57f9fdb1e7de30c85f4ec7198931a07f50603e55
140140
refs/heads/autosynth-spanner: de02ca32edea133b68b51052e325359a3704b5d2
141141
refs/heads/autosynth-speech: 64692f6db11364f663921be02c08072b966b6e7b

branches/autosynth-redis/google-cloud-clients/google-cloud-bigquerystorage/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@
4141
<artifactId>gax-grpc</artifactId>
4242
<scope>test</scope>
4343
</dependency>
44+
<dependency>
45+
<groupId>com.google.cloud</groupId>
46+
<artifactId>google-cloud-bigquery</artifactId>
47+
<scope>test</scope>
48+
</dependency>
4449
<dependency>
4550
<groupId>org.apache.avro</groupId>
4651
<artifactId>avro</artifactId>

branches/autosynth-redis/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageTest.java

Lines changed: 272 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,36 +18,66 @@
1818

1919
import static com.google.common.truth.Truth.assertWithMessage;
2020
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertNotNull;
22+
import static org.junit.Assert.assertNull;
2123
import static org.junit.Assert.assertTrue;
2224

2325
import com.google.api.gax.rpc.ServerStream;
26+
import com.google.cloud.RetryOption;
2427
import com.google.cloud.ServiceOptions;
28+
import com.google.cloud.bigquery.BigQuery;
29+
import com.google.cloud.bigquery.DatasetInfo;
30+
import com.google.cloud.bigquery.Field;
31+
import com.google.cloud.bigquery.Field.Mode;
32+
import com.google.cloud.bigquery.Job;
33+
import com.google.cloud.bigquery.JobInfo;
34+
import com.google.cloud.bigquery.JobInfo.WriteDisposition;
35+
import com.google.cloud.bigquery.LegacySQLTypeName;
36+
import com.google.cloud.bigquery.QueryJobConfiguration;
37+
import com.google.cloud.bigquery.StandardTableDefinition;
38+
import com.google.cloud.bigquery.TableId;
39+
import com.google.cloud.bigquery.TableInfo;
2540
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient;
2641
import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions;
2742
import com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest;
2843
import com.google.cloud.bigquery.storage.v1beta1.Storage.DataFormat;
2944
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsRequest;
3045
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsResponse;
3146
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession;
47+
import com.google.cloud.bigquery.storage.v1beta1.Storage.Stream;
3248
import com.google.cloud.bigquery.storage.v1beta1.Storage.StreamPosition;
49+
import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto.TableModifiers;
3350
import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto.TableReference;
51+
import com.google.cloud.bigquery.storage.v1beta1.it.SimpleRowReader.AvroRowConsumer;
52+
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
53+
import com.google.common.base.Preconditions;
3454
import com.google.protobuf.TextFormat;
55+
import com.google.protobuf.Timestamp;
3556
import java.io.IOException;
57+
import java.util.ArrayList;
58+
import java.util.Arrays;
59+
import java.util.Collections;
60+
import java.util.Iterator;
61+
import java.util.List;
3662
import java.util.logging.Logger;
3763
import org.apache.avro.Schema;
3864
import org.apache.avro.generic.GenericRecord;
3965
import org.apache.avro.util.Utf8;
4066
import org.junit.AfterClass;
4167
import org.junit.BeforeClass;
4268
import org.junit.Test;
69+
import org.threeten.bp.Duration;
4370

4471
/** Integration tests for BigQuery Storage API. */
4572
public class ITBigQueryStorageTest {
4673

4774
private static final Logger LOG = Logger.getLogger(ITBigQueryStorageTest.class.getName());
75+
private static final String DATASET = RemoteBigQueryHelper.generateDatasetName();
76+
private static final String DESCRIPTION = "BigQuery Storage Java client test dataset";
4877

4978
private static BigQueryStorageClient client;
5079
private static String parentProjectId;
80+
private static BigQuery bigquery;
5181

5282
@BeforeClass
5383
public static void beforeClass() throws IOException {
@@ -58,13 +88,25 @@ public static void beforeClass() throws IOException {
5888
String.format(
5989
"%s tests running with parent project: %s",
6090
ITBigQueryStorageTest.class.getSimpleName(), parentProjectId));
91+
92+
RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create();
93+
bigquery = bigqueryHelper.getOptions().getService();
94+
DatasetInfo datasetInfo =
95+
DatasetInfo.newBuilder(/* datasetId = */ DATASET).setDescription(DESCRIPTION).build();
96+
bigquery.create(datasetInfo);
97+
LOG.info("Created test dataset: " + DATASET);
6198
}
6299

63100
@AfterClass
64101
public static void afterClass() {
65102
if (client != null) {
66103
client.close();
67104
}
105+
106+
if (bigquery != null) {
107+
RemoteBigQueryHelper.forceDelete(bigquery, DATASET);
108+
LOG.info("Deleted test dataset: " + DATASET);
109+
}
68110
}
69111

70112
@Test
@@ -76,7 +118,11 @@ public void testSimpleRead() {
76118
.setTableId("shakespeare")
77119
.build();
78120

79-
ReadSession session = client.createReadSession(tableReference, parentProjectId, 1);
121+
ReadSession session =
122+
client.createReadSession(
123+
/* tableReference = */ tableReference,
124+
/* parent = */ parentProjectId,
125+
/* requestedStreams = */ 1);
80126
assertEquals(
81127
String.format(
82128
"Did not receive expected number of streams for table reference '%s' CreateReadSession response:%n%s",
@@ -104,6 +150,57 @@ public void testSimpleRead() {
104150
assertEquals(164_656, avroRowCount);
105151
}
106152

153+
@Test
154+
public void testSimpleReadAndResume() {
155+
TableReference tableReference =
156+
TableReference.newBuilder()
157+
.setProjectId("bigquery-public-data")
158+
.setDatasetId("samples")
159+
.setTableId("shakespeare")
160+
.build();
161+
162+
ReadSession session =
163+
client.createReadSession(
164+
/* tableReference = */ tableReference,
165+
/* parent = */ parentProjectId,
166+
/* requestedStreams = */ 1);
167+
assertEquals(
168+
String.format(
169+
"Did not receive expected number of streams for table reference '%s' CreateReadSession response:%n%s",
170+
TextFormat.shortDebugString(tableReference), session.toString()),
171+
1,
172+
session.getStreamsCount());
173+
174+
// We have to read some number of rows in order to be able to resume. More details:
175+
// https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#google.cloud.bigquery.storage.v1beta1.ReadRowsRequest
176+
177+
long avroRowCount = ReadStreamToOffset(session.getStreams(0), /* rowOffset = */ 34_846);
178+
179+
StreamPosition readPosition =
180+
StreamPosition.newBuilder()
181+
.setStream(session.getStreams(0))
182+
.setOffset(avroRowCount)
183+
.build();
184+
185+
ReadRowsRequest readRowsRequest =
186+
ReadRowsRequest.newBuilder().setReadPosition(readPosition).build();
187+
188+
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
189+
190+
for (ReadRowsResponse response : stream) {
191+
assertTrue(
192+
String.format(
193+
"Response is missing 'avro_rows'. Read %d rows so far. ReadRows response:%n%s",
194+
avroRowCount, response.toString()),
195+
response.hasAvroRows());
196+
avroRowCount += response.getAvroRows().getRowCount();
197+
}
198+
199+
// Verifies that the number of rows skipped and read equals to the total number of rows in the
200+
// table.
201+
assertEquals(164_656, avroRowCount);
202+
}
203+
107204
@Test
108205
public void testFilter() throws IOException {
109206
TableReference tableReference =
@@ -254,4 +351,178 @@ public void accept(GenericRecord record) {
254351

255352
assertEquals(1_333, avroRowCount);
256353
}
354+
355+
@Test
356+
public void testReadAtSnapshot() throws InterruptedException, IOException {
357+
Field intFieldSchema =
358+
Field.newBuilder("col", LegacySQLTypeName.INTEGER)
359+
.setMode(Mode.REQUIRED)
360+
.setDescription("IntegerDescription")
361+
.build();
362+
com.google.cloud.bigquery.Schema tableSchema =
363+
com.google.cloud.bigquery.Schema.of(intFieldSchema);
364+
365+
TableId testTableId = TableId.of(/* dataset = */ DATASET, /* table = */ "test_read_snapshot");
366+
bigquery.create(TableInfo.of(testTableId, StandardTableDefinition.of(tableSchema)));
367+
368+
TableReference tableReference =
369+
TableReference.newBuilder()
370+
.setTableId(testTableId.getTable())
371+
.setDatasetId(DATASET)
372+
.setProjectId(ServiceOptions.getDefaultProjectId())
373+
.build();
374+
375+
Job firstJob =
376+
RunQueryJobAndExpectSuccess(
377+
/* destinationTableId = */ testTableId, /* query = */ "SELECT 1 AS col");
378+
379+
Job secondJob =
380+
RunQueryJobAndExpectSuccess(
381+
/* destinationTableId = */ testTableId, /* query = */ "SELECT 2 AS col");
382+
383+
final List<Long> rowsAfterFirstSnapshot = new ArrayList<>();
384+
ProcessRowsAtSnapshot(
385+
tableReference,
386+
firstJob.getStatistics().getEndTime(),
387+
new AvroRowConsumer() {
388+
@Override
389+
public void accept(GenericRecord record) {
390+
rowsAfterFirstSnapshot.add((Long) record.get("col"));
391+
}
392+
});
393+
assertEquals(Arrays.asList(1L), rowsAfterFirstSnapshot);
394+
395+
final List<Long> rowsAfterSecondSnapshot = new ArrayList<>();
396+
ProcessRowsAtSnapshot(
397+
tableReference,
398+
secondJob.getStatistics().getEndTime(),
399+
new AvroRowConsumer() {
400+
@Override
401+
public void accept(GenericRecord record) {
402+
rowsAfterSecondSnapshot.add((Long) record.get("col"));
403+
}
404+
});
405+
Collections.sort(rowsAfterSecondSnapshot);
406+
assertEquals(Arrays.asList(1L, 2L), rowsAfterSecondSnapshot);
407+
}
408+
409+
/**
410+
* Reads to the specified row offset within the stream. If the stream does not have the desired
411+
* rows to read, it will read all of them.
412+
*
413+
* @param stream
414+
* @param rowOffset
415+
* @return the number of requested rows to skip or the total rows read if stream had less rows.
416+
*/
417+
private long ReadStreamToOffset(Stream stream, long rowOffset) {
418+
StreamPosition readPosition = StreamPosition.newBuilder().setStream(stream).build();
419+
420+
ReadRowsRequest readRowsRequest =
421+
ReadRowsRequest.newBuilder().setReadPosition(readPosition).build();
422+
423+
long avroRowCount = 0;
424+
ServerStream<ReadRowsResponse> serverStream = client.readRowsCallable().call(readRowsRequest);
425+
Iterator<ReadRowsResponse> responseIterator = serverStream.iterator();
426+
427+
while (responseIterator.hasNext()) {
428+
ReadRowsResponse response = responseIterator.next();
429+
avroRowCount += response.getAvroRows().getRowCount();
430+
if (avroRowCount >= rowOffset) {
431+
return rowOffset;
432+
}
433+
}
434+
435+
return avroRowCount;
436+
}
437+
438+
/**
439+
* Reads all the rows from the specified tableReference that are added up to timestamp defined in
440+
* snapshot. If snapshot is not provided, current time will be used.
441+
*
442+
* <p>For every row, the consumer is called for processing.
443+
*
444+
* @param tableReference
445+
* @param snapshotInMillis
446+
* @param consumer
447+
* @throws IOException
448+
*/
449+
private void ProcessRowsAtSnapshot(
450+
TableReference tableReference, Long snapshotInMillis, AvroRowConsumer consumer)
451+
throws IOException {
452+
Preconditions.checkNotNull(tableReference);
453+
Preconditions.checkNotNull(consumer);
454+
455+
CreateReadSessionRequest.Builder createSessionRequestBuilder =
456+
CreateReadSessionRequest.newBuilder()
457+
.setParent(parentProjectId)
458+
.setRequestedStreams(1)
459+
.setTableReference(tableReference)
460+
.setFormat(DataFormat.AVRO);
461+
462+
if (snapshotInMillis != null) {
463+
Timestamp snapshotTimestamp =
464+
Timestamp.newBuilder()
465+
.setSeconds(snapshotInMillis / 1_000)
466+
.setNanos((int) ((snapshotInMillis % 1000) * 1000000))
467+
.build();
468+
createSessionRequestBuilder.setTableModifiers(
469+
TableModifiers.newBuilder().setSnapshotTime(snapshotTimestamp).build());
470+
}
471+
472+
ReadSession session = client.createReadSession(createSessionRequestBuilder.build());
473+
assertEquals(
474+
String.format(
475+
"Did not receive expected number of streams for table reference '%s' CreateReadSession response:%n%s",
476+
TextFormat.shortDebugString(tableReference), session.toString()),
477+
1,
478+
session.getStreamsCount());
479+
480+
StreamPosition readPosition =
481+
StreamPosition.newBuilder().setStream(session.getStreams(0)).build();
482+
483+
ReadRowsRequest readRowsRequest =
484+
ReadRowsRequest.newBuilder().setReadPosition(readPosition).build();
485+
486+
SimpleRowReader reader =
487+
new SimpleRowReader(new Schema.Parser().parse(session.getAvroSchema().getSchema()));
488+
489+
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
490+
for (ReadRowsResponse response : stream) {
491+
reader.processRows(response.getAvroRows(), consumer);
492+
}
493+
}
494+
495+
/**
496+
* Runs a query job with WRITE_APPEND disposition to the destination table and returns the
497+
* successfully completed job.
498+
*
499+
* @param destinationTableId
500+
* @param query
501+
* @return
502+
* @throws InterruptedException
503+
*/
504+
private Job RunQueryJobAndExpectSuccess(TableId destinationTableId, String query)
505+
throws InterruptedException {
506+
QueryJobConfiguration configuration =
507+
QueryJobConfiguration.newBuilder(query)
508+
.setDestinationTable(destinationTableId)
509+
.setUseQueryCache(false)
510+
.setUseLegacySql(false)
511+
.setWriteDisposition(WriteDisposition.WRITE_APPEND)
512+
.build();
513+
514+
Job job = bigquery.create(JobInfo.of(configuration));
515+
Job completedJob =
516+
job.waitFor(
517+
RetryOption.initialRetryDelay(Duration.ofSeconds(1)),
518+
RetryOption.totalTimeout(Duration.ofMinutes(1)));
519+
520+
assertNotNull(completedJob);
521+
assertNull(
522+
/* message = */ "Received a job status that is not a success: "
523+
+ completedJob.getStatus().toString(),
524+
/* object = */ completedJob.getStatus().getError());
525+
526+
return completedJob;
527+
}
257528
}

0 commit comments

Comments
 (0)