Skip to content

Commit 026e8be

Browse files
mmladenovskishollyman
authored andcommitted
---
yaml --- r: 30589 b: refs/heads/autosynth-automl c: a364aab h: refs/heads/master i: 30587: 3e41427
1 parent 20140ec commit 026e8be

3 files changed

Lines changed: 170 additions & 27 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ refs/heads/spanner: b01127f885b4611bf1852abb0ce481eeb7fcc131
121121
refs/tags/v0.68.0: 9cc799fcf68c82ab431d425fefa58ef615ce8e5b
122122
refs/tags/v0.69.0: 78f67a29e8b9c46ba01de566a2eae0fd1c03edea
123123
refs/heads/autosynth-asset: bdb45634a0fe8f7a510692b56b31f5312e25f453
124-
refs/heads/autosynth-automl: 4ed9a31e938d8184dd0acf7d61a8affe8b64b15a
124+
refs/heads/autosynth-automl: a364aab78f1b0cbbad87abbe77f020b7ef0a942c
125125
refs/heads/autosynth-bigquerydatatransfer: d88aa5aae5fd9d3c6d75bbab1a05162c6d4d948f
126126
refs/heads/autosynth-bigquerystorage: d2c53da3b012e38c662e4df0738042435f19365f
127127
refs/heads/autosynth-bigtable: 9e5429f45cf9face9fed585d0233534993e36b58

branches/autosynth-automl/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageTest.java

Lines changed: 157 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.google.cloud.bigquery.StandardTableDefinition;
3838
import com.google.cloud.bigquery.TableId;
3939
import com.google.cloud.bigquery.TableInfo;
40+
import com.google.cloud.bigquery.TimePartitioning;
4041
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient;
4142
import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions;
4243
import com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest;
@@ -61,7 +62,8 @@
6162
import java.util.List;
6263
import java.util.logging.Logger;
6364
import org.apache.avro.Schema;
64-
import org.apache.avro.generic.GenericRecord;
65+
import org.apache.avro.generic.GenericData;
66+
import org.apache.avro.generic.GenericRecordBuilder;
6567
import org.apache.avro.util.Utf8;
6668
import org.junit.AfterClass;
6769
import org.junit.BeforeClass;
@@ -254,7 +256,7 @@ public void testFilter() throws IOException {
254256
response.getAvroRows(),
255257
new SimpleRowReader.AvroRowConsumer() {
256258
@Override
257-
public void accept(GenericRecord record) {
259+
public void accept(GenericData.Record record) {
258260
Long wordCount = (Long) record.get("word_count");
259261
assertWithMessage("Row not matching expectations: %s", record.toString())
260262
.that(wordCount)
@@ -336,7 +338,7 @@ public void testColumnSelection() throws IOException {
336338
response.getAvroRows(),
337339
new SimpleRowReader.AvroRowConsumer() {
338340
@Override
339-
public void accept(GenericRecord record) {
341+
public void accept(GenericData.Record record) {
340342
String rowAssertMessage =
341343
String.format("Row not matching expectations: %s", record.toString());
342344

@@ -373,39 +375,133 @@ public void testReadAtSnapshot() throws InterruptedException, IOException {
373375
.build();
374376

375377
Job firstJob =
376-
RunQueryJobAndExpectSuccess(
378+
RunQueryAppendJobAndExpectSuccess(
377379
/* destinationTableId = */ testTableId, /* query = */ "SELECT 1 AS col");
378380

379381
Job secondJob =
380-
RunQueryJobAndExpectSuccess(
382+
RunQueryAppendJobAndExpectSuccess(
381383
/* destinationTableId = */ testTableId, /* query = */ "SELECT 2 AS col");
382384

383385
final List<Long> rowsAfterFirstSnapshot = new ArrayList<>();
384386
ProcessRowsAtSnapshot(
385-
tableReference,
386-
firstJob.getStatistics().getEndTime(),
387-
new AvroRowConsumer() {
387+
/* tableReference = */ tableReference,
388+
/* snapshotInMillis = */ firstJob.getStatistics().getEndTime(),
389+
/* filter = */ null,
390+
/* consumer = */ new AvroRowConsumer() {
388391
@Override
389-
public void accept(GenericRecord record) {
392+
public void accept(GenericData.Record record) {
390393
rowsAfterFirstSnapshot.add((Long) record.get("col"));
391394
}
392395
});
393396
assertEquals(Arrays.asList(1L), rowsAfterFirstSnapshot);
394397

395398
final List<Long> rowsAfterSecondSnapshot = new ArrayList<>();
396399
ProcessRowsAtSnapshot(
397-
tableReference,
398-
secondJob.getStatistics().getEndTime(),
399-
new AvroRowConsumer() {
400+
/* tableReference = */ tableReference,
401+
/* snapshotInMillis = */ secondJob.getStatistics().getEndTime(),
402+
/* filter = */ null,
403+
/* consumer = */ new AvroRowConsumer() {
400404
@Override
401-
public void accept(GenericRecord record) {
405+
public void accept(GenericData.Record record) {
402406
rowsAfterSecondSnapshot.add((Long) record.get("col"));
403407
}
404408
});
405409
Collections.sort(rowsAfterSecondSnapshot);
406410
assertEquals(Arrays.asList(1L, 2L), rowsAfterSecondSnapshot);
407411
}
408412

413+
@Test
414+
public void testColumnPartitionedTableByDateField() throws InterruptedException, IOException {
415+
String partitionedTableName = "test_column_partition_table_by_date";
416+
String createTableStatement =
417+
String.format(
418+
" CREATE TABLE %s.%s (num_field INT64, date_field DATE) "
419+
+ " PARTITION BY date_field "
420+
+ " OPTIONS( "
421+
+ " description=\"a table partitioned by date_field\" "
422+
+ " ) "
423+
+ "AS "
424+
+ " SELECT 1, CAST(\"2019-01-01\" AS DATE)"
425+
+ " UNION ALL"
426+
+ " SELECT 2, CAST(\"2019-01-02\" AS DATE)"
427+
+ " UNION ALL"
428+
+ " SELECT 3, CAST(\"2019-01-03\" AS DATE)",
429+
DATASET, partitionedTableName);
430+
431+
RunQueryJobAndExpectSuccess(QueryJobConfiguration.newBuilder(createTableStatement).build());
432+
433+
TableReference tableReference =
434+
TableReference.newBuilder()
435+
.setTableId(partitionedTableName)
436+
.setDatasetId(DATASET)
437+
.setProjectId(ServiceOptions.getDefaultProjectId())
438+
.build();
439+
440+
List<GenericData.Record> unfilteredRows =
441+
ReadAllRows(/* tableReference = */ tableReference, /* filter = */ null);
442+
assertEquals("Actual rows read: " + unfilteredRows.toString(), 3, unfilteredRows.size());
443+
444+
List<GenericData.Record> partitionFilteredRows =
445+
ReadAllRows(
446+
/* tableReference = */ tableReference,
447+
/* filter = */ "date_field = CAST(\"2019-01-02\" AS DATE)");
448+
assertEquals(
449+
"Actual rows read: " + partitionFilteredRows.toString(), 1, partitionFilteredRows.size());
450+
assertEquals(2L, partitionFilteredRows.get(0).get("num_field"));
451+
}
452+
453+
@Test
454+
public void testIngestionTimePartitionedTable() throws InterruptedException, IOException {
455+
Field intFieldSchema =
456+
Field.newBuilder("num_field", LegacySQLTypeName.INTEGER)
457+
.setMode(Mode.REQUIRED)
458+
.setDescription("IntegerDescription")
459+
.build();
460+
com.google.cloud.bigquery.Schema tableSchema =
461+
com.google.cloud.bigquery.Schema.of(intFieldSchema);
462+
463+
TableId testTableId =
464+
TableId.of(/* dataset = */ DATASET, /* table = */ "test_date_partitioned_table");
465+
bigquery.create(
466+
TableInfo.of(
467+
testTableId,
468+
StandardTableDefinition.newBuilder()
469+
.setTimePartitioning(TimePartitioning.of(TimePartitioning.Type.DAY))
470+
.setSchema(tableSchema)
471+
.build()));
472+
473+
// Simulate ingestion for 2019-01-01.
474+
RunQueryAppendJobAndExpectSuccess(
475+
/* destinationTableId = */ TableId.of(
476+
/* dataset = */ DATASET, /* table = */ testTableId.getTable() + "$20190101"),
477+
/* query = */ "SELECT 1 AS num_field");
478+
479+
// Simulate ingestion for 2019-01-02.
480+
RunQueryAppendJobAndExpectSuccess(
481+
/* destinationTableId = */ TableId.of(
482+
/* dataset = */ DATASET, /* table = */ testTableId.getTable() + "$20190102"),
483+
/* query = */ "SELECT 2 AS num_field");
484+
485+
TableReference tableReference =
486+
TableReference.newBuilder()
487+
.setTableId(testTableId.getTable())
488+
.setDatasetId(testTableId.getDataset())
489+
.setProjectId(ServiceOptions.getDefaultProjectId())
490+
.build();
491+
492+
List<GenericData.Record> unfilteredRows =
493+
ReadAllRows(/* tableReference = */ tableReference, /* filter = */ null);
494+
assertEquals("Actual rows read: " + unfilteredRows.toString(), 2, unfilteredRows.size());
495+
496+
List<GenericData.Record> partitionFilteredRows =
497+
ReadAllRows(
498+
/* tableReference = */ tableReference,
499+
/* filter = */ "_PARTITIONDATE > \"2019-01-01\"");
500+
assertEquals(
501+
"Actual rows read: " + partitionFilteredRows.toString(), 1, partitionFilteredRows.size());
502+
assertEquals(2L, partitionFilteredRows.get(0).get("num_field"));
503+
}
504+
409505
/**
410506
* Reads to the specified row offset within the stream. If the stream does not have the desired
411507
* rows to read, it will read all of them.
@@ -436,18 +532,18 @@ private long ReadStreamToOffset(Stream stream, long rowOffset) {
436532
}
437533

438534
/**
439-
* Reads all the rows from the specified tableReference that are added up to timestamp defined in
440-
* snapshot. If snapshot is not provided, current time will be used.
535+
* Reads all the rows from the specified tableReference.
441536
*
442537
* <p>For every row, the consumer is called for processing.
443538
*
444539
* @param tableReference
445-
* @param snapshotInMillis
446-
* @param consumer
540+
* @param snapshotInMillis Optional. If specified, all rows up to timestamp will be returned.
541+
* @param filter Optional. If specified, it will be used to restrict returned data.
542+
* @param consumer that receives all Avro rows.
447543
* @throws IOException
448544
*/
449545
private void ProcessRowsAtSnapshot(
450-
TableReference tableReference, Long snapshotInMillis, AvroRowConsumer consumer)
546+
TableReference tableReference, Long snapshotInMillis, String filter, AvroRowConsumer consumer)
451547
throws IOException {
452548
Preconditions.checkNotNull(tableReference);
453549
Preconditions.checkNotNull(consumer);
@@ -469,6 +565,11 @@ private void ProcessRowsAtSnapshot(
469565
TableModifiers.newBuilder().setSnapshotTime(snapshotTimestamp).build());
470566
}
471567

568+
if (filter != null && !filter.isEmpty()) {
569+
createSessionRequestBuilder.setReadOptions(
570+
TableReadOptions.newBuilder().setRowRestriction(filter).build());
571+
}
572+
472573
ReadSession session = client.createReadSession(createSessionRequestBuilder.build());
473574
assertEquals(
474575
String.format(
@@ -492,6 +593,31 @@ private void ProcessRowsAtSnapshot(
492593
}
493594
}
494595

596+
/**
597+
* Reads all the rows from the specified table reference and returns a list as generic Avro
598+
* records.
599+
*
600+
* @param tableReference
601+
* @param filter Optional. If specified, it will be used to restrict returned data.
602+
* @return
603+
*/
604+
List<GenericData.Record> ReadAllRows(TableReference tableReference, String filter)
605+
throws IOException {
606+
final List<GenericData.Record> rows = new ArrayList<>();
607+
ProcessRowsAtSnapshot(
608+
/* tableReference = */ tableReference,
609+
/* snapshotInMillis = */ null,
610+
/* filter = */ filter,
611+
new AvroRowConsumer() {
612+
@Override
613+
public void accept(GenericData.Record record) {
614+
// clone the record since that reference will be reused by the reader.
615+
rows.add(new GenericRecordBuilder(record).build());
616+
}
617+
});
618+
return rows;
619+
}
620+
495621
/**
496622
* Runs a query job with WRITE_APPEND disposition to the destination table and returns the
497623
* successfully completed job.
@@ -501,16 +627,26 @@ private void ProcessRowsAtSnapshot(
501627
* @return
502628
* @throws InterruptedException
503629
*/
504-
private Job RunQueryJobAndExpectSuccess(TableId destinationTableId, String query)
630+
private Job RunQueryAppendJobAndExpectSuccess(TableId destinationTableId, String query)
505631
throws InterruptedException {
506-
QueryJobConfiguration configuration =
632+
return RunQueryJobAndExpectSuccess(
507633
QueryJobConfiguration.newBuilder(query)
508634
.setDestinationTable(destinationTableId)
509635
.setUseQueryCache(false)
510636
.setUseLegacySql(false)
511637
.setWriteDisposition(WriteDisposition.WRITE_APPEND)
512-
.build();
638+
.build());
639+
}
513640

641+
/**
642+
* Runs a query job with provided configuration and returns the successfully completed job.
643+
*
644+
* @param configuration
645+
* @return
646+
* @throws InterruptedException
647+
*/
648+
private Job RunQueryJobAndExpectSuccess(QueryJobConfiguration configuration)
649+
throws InterruptedException {
514650
Job job = bigquery.create(JobInfo.of(configuration));
515651
Job completedJob =
516652
job.waitFor(

branches/autosynth-automl/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/SimpleRowReader.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import com.google.common.base.Preconditions;
2121
import java.io.IOException;
2222
import org.apache.avro.Schema;
23+
import org.apache.avro.generic.GenericData;
2324
import org.apache.avro.generic.GenericDatumReader;
24-
import org.apache.avro.generic.GenericRecord;
2525
import org.apache.avro.io.BinaryDecoder;
2626
import org.apache.avro.io.DatumReader;
2727
import org.apache.avro.io.DecoderFactory;
@@ -33,16 +33,23 @@
3333
public class SimpleRowReader {
3434

3535
public interface AvroRowConsumer {
36-
void accept(GenericRecord record);
36+
37+
/**
38+
* Handler for every new Avro row that is read.
39+
*
40+
* @param record is Avro generic record structure. Consumers should not rely on the reference
41+
* and should copy it if needed. The record reference is reused.
42+
*/
43+
void accept(GenericData.Record record);
3744
}
3845

39-
private final DatumReader<GenericRecord> datumReader;
46+
private final DatumReader<GenericData.Record> datumReader;
4047

4148
// Decoder object will be reused to avoid re-allocation and too much garbage collection.
4249
private BinaryDecoder decoder = null;
4350

44-
// GenericRecord object will be reused.
45-
private GenericRecord row = null;
51+
// Record object will be reused.
52+
private GenericData.Record row = null;
4653

4754
public SimpleRowReader(Schema schema) {
4855
Preconditions.checkNotNull(schema);

0 commit comments

Comments
 (0)