3737import com .google .cloud .bigquery .StandardTableDefinition ;
3838import com .google .cloud .bigquery .TableId ;
3939import com .google .cloud .bigquery .TableInfo ;
40+ import com .google .cloud .bigquery .TimePartitioning ;
4041import com .google .cloud .bigquery .storage .v1beta1 .BigQueryStorageClient ;
4142import com .google .cloud .bigquery .storage .v1beta1 .ReadOptions .TableReadOptions ;
4243import com .google .cloud .bigquery .storage .v1beta1 .Storage .CreateReadSessionRequest ;
6162import java .util .List ;
6263import java .util .logging .Logger ;
6364import org .apache .avro .Schema ;
64- import org .apache .avro .generic .GenericRecord ;
65+ import org .apache .avro .generic .GenericData ;
66+ import org .apache .avro .generic .GenericRecordBuilder ;
6567import org .apache .avro .util .Utf8 ;
6668import org .junit .AfterClass ;
6769import org .junit .BeforeClass ;
@@ -254,7 +256,7 @@ public void testFilter() throws IOException {
254256 response .getAvroRows (),
255257 new SimpleRowReader .AvroRowConsumer () {
256258 @ Override
257- public void accept (GenericRecord record ) {
259+ public void accept (GenericData . Record record ) {
258260 Long wordCount = (Long ) record .get ("word_count" );
259261 assertWithMessage ("Row not matching expectations: %s" , record .toString ())
260262 .that (wordCount )
@@ -336,7 +338,7 @@ public void testColumnSelection() throws IOException {
336338 response .getAvroRows (),
337339 new SimpleRowReader .AvroRowConsumer () {
338340 @ Override
339- public void accept (GenericRecord record ) {
341+ public void accept (GenericData . Record record ) {
340342 String rowAssertMessage =
341343 String .format ("Row not matching expectations: %s" , record .toString ());
342344
@@ -373,39 +375,133 @@ public void testReadAtSnapshot() throws InterruptedException, IOException {
373375 .build ();
374376
375377 Job firstJob =
376- RunQueryJobAndExpectSuccess (
378+ RunQueryAppendJobAndExpectSuccess (
377379 /* destinationTableId = */ testTableId , /* query = */ "SELECT 1 AS col" );
378380
379381 Job secondJob =
380- RunQueryJobAndExpectSuccess (
382+ RunQueryAppendJobAndExpectSuccess (
381383 /* destinationTableId = */ testTableId , /* query = */ "SELECT 2 AS col" );
382384
383385 final List <Long > rowsAfterFirstSnapshot = new ArrayList <>();
384386 ProcessRowsAtSnapshot (
385- tableReference ,
386- firstJob .getStatistics ().getEndTime (),
387- new AvroRowConsumer () {
387+ /* tableReference = */ tableReference ,
388+ /* snapshotInMillis = */ firstJob .getStatistics ().getEndTime (),
389+ /* filter = */ null ,
390+ /* consumer = */ new AvroRowConsumer () {
388391 @ Override
389- public void accept (GenericRecord record ) {
392+ public void accept (GenericData . Record record ) {
390393 rowsAfterFirstSnapshot .add ((Long ) record .get ("col" ));
391394 }
392395 });
393396 assertEquals (Arrays .asList (1L ), rowsAfterFirstSnapshot );
394397
395398 final List <Long > rowsAfterSecondSnapshot = new ArrayList <>();
396399 ProcessRowsAtSnapshot (
397- tableReference ,
398- secondJob .getStatistics ().getEndTime (),
399- new AvroRowConsumer () {
400+ /* tableReference = */ tableReference ,
401+ /* snapshotInMillis = */ secondJob .getStatistics ().getEndTime (),
402+ /* filter = */ null ,
403+ /* consumer = */ new AvroRowConsumer () {
400404 @ Override
401- public void accept (GenericRecord record ) {
405+ public void accept (GenericData . Record record ) {
402406 rowsAfterSecondSnapshot .add ((Long ) record .get ("col" ));
403407 }
404408 });
405409 Collections .sort (rowsAfterSecondSnapshot );
406410 assertEquals (Arrays .asList (1L , 2L ), rowsAfterSecondSnapshot );
407411 }
408412
413+ @ Test
414+ public void testColumnPartitionedTableByDateField () throws InterruptedException , IOException {
415+ String partitionedTableName = "test_column_partition_table_by_date" ;
416+ String createTableStatement =
417+ String .format (
418+ " CREATE TABLE %s.%s (num_field INT64, date_field DATE) "
419+ + " PARTITION BY date_field "
420+ + " OPTIONS( "
421+ + " description=\" a table partitioned by date_field\" "
422+ + " ) "
423+ + "AS "
424+ + " SELECT 1, CAST(\" 2019-01-01\" AS DATE)"
425+ + " UNION ALL"
426+ + " SELECT 2, CAST(\" 2019-01-02\" AS DATE)"
427+ + " UNION ALL"
428+ + " SELECT 3, CAST(\" 2019-01-03\" AS DATE)" ,
429+ DATASET , partitionedTableName );
430+
431+ RunQueryJobAndExpectSuccess (QueryJobConfiguration .newBuilder (createTableStatement ).build ());
432+
433+ TableReference tableReference =
434+ TableReference .newBuilder ()
435+ .setTableId (partitionedTableName )
436+ .setDatasetId (DATASET )
437+ .setProjectId (ServiceOptions .getDefaultProjectId ())
438+ .build ();
439+
440+ List <GenericData .Record > unfilteredRows =
441+ ReadAllRows (/* tableReference = */ tableReference , /* filter = */ null );
442+ assertEquals ("Actual rows read: " + unfilteredRows .toString (), 3 , unfilteredRows .size ());
443+
444+ List <GenericData .Record > partitionFilteredRows =
445+ ReadAllRows (
446+ /* tableReference = */ tableReference ,
447+ /* filter = */ "date_field = CAST(\" 2019-01-02\" AS DATE)" );
448+ assertEquals (
449+ "Actual rows read: " + partitionFilteredRows .toString (), 1 , partitionFilteredRows .size ());
450+ assertEquals (2L , partitionFilteredRows .get (0 ).get ("num_field" ));
451+ }
452+
453+ @ Test
454+ public void testIngestionTimePartitionedTable () throws InterruptedException , IOException {
455+ Field intFieldSchema =
456+ Field .newBuilder ("num_field" , LegacySQLTypeName .INTEGER )
457+ .setMode (Mode .REQUIRED )
458+ .setDescription ("IntegerDescription" )
459+ .build ();
460+ com .google .cloud .bigquery .Schema tableSchema =
461+ com .google .cloud .bigquery .Schema .of (intFieldSchema );
462+
463+ TableId testTableId =
464+ TableId .of (/* dataset = */ DATASET , /* table = */ "test_date_partitioned_table" );
465+ bigquery .create (
466+ TableInfo .of (
467+ testTableId ,
468+ StandardTableDefinition .newBuilder ()
469+ .setTimePartitioning (TimePartitioning .of (TimePartitioning .Type .DAY ))
470+ .setSchema (tableSchema )
471+ .build ()));
472+
473+ // Simulate ingestion for 2019-01-01.
474+ RunQueryAppendJobAndExpectSuccess (
475+ /* destinationTableId = */ TableId .of (
476+ /* dataset = */ DATASET , /* table = */ testTableId .getTable () + "$20190101" ),
477+ /* query = */ "SELECT 1 AS num_field" );
478+
479+ // Simulate ingestion for 2019-01-02.
480+ RunQueryAppendJobAndExpectSuccess (
481+ /* destinationTableId = */ TableId .of (
482+ /* dataset = */ DATASET , /* table = */ testTableId .getTable () + "$20190102" ),
483+ /* query = */ "SELECT 2 AS num_field" );
484+
485+ TableReference tableReference =
486+ TableReference .newBuilder ()
487+ .setTableId (testTableId .getTable ())
488+ .setDatasetId (testTableId .getDataset ())
489+ .setProjectId (ServiceOptions .getDefaultProjectId ())
490+ .build ();
491+
492+ List <GenericData .Record > unfilteredRows =
493+ ReadAllRows (/* tableReference = */ tableReference , /* filter = */ null );
494+ assertEquals ("Actual rows read: " + unfilteredRows .toString (), 2 , unfilteredRows .size ());
495+
496+ List <GenericData .Record > partitionFilteredRows =
497+ ReadAllRows (
498+ /* tableReference = */ tableReference ,
499+ /* filter = */ "_PARTITIONDATE > \" 2019-01-01\" " );
500+ assertEquals (
501+ "Actual rows read: " + partitionFilteredRows .toString (), 1 , partitionFilteredRows .size ());
502+ assertEquals (2L , partitionFilteredRows .get (0 ).get ("num_field" ));
503+ }
504+
409505 /**
410506 * Reads to the specified row offset within the stream. If the stream does not have the desired
411507 * rows to read, it will read all of them.
@@ -436,18 +532,18 @@ private long ReadStreamToOffset(Stream stream, long rowOffset) {
436532 }
437533
438534 /**
439- * Reads all the rows from the specified tableReference that are added up to timestamp defined in
440- * snapshot. If snapshot is not provided, current time will be used.
535+ * Reads all the rows from the specified tableReference.
441536 *
442537 * <p>For every row, the consumer is called for processing.
443538 *
444539 * @param tableReference
445- * @param snapshotInMillis
446- * @param consumer
540+ * @param snapshotInMillis Optional. If specified, all rows up to timestamp will be returned.
541+ * @param filter Optional. If specified, it will be used to restrict returned data.
542+ * @param consumer that receives all Avro rows.
447543 * @throws IOException
448544 */
449545 private void ProcessRowsAtSnapshot (
450- TableReference tableReference , Long snapshotInMillis , AvroRowConsumer consumer )
546+ TableReference tableReference , Long snapshotInMillis , String filter , AvroRowConsumer consumer )
451547 throws IOException {
452548 Preconditions .checkNotNull (tableReference );
453549 Preconditions .checkNotNull (consumer );
@@ -469,6 +565,11 @@ private void ProcessRowsAtSnapshot(
469565 TableModifiers .newBuilder ().setSnapshotTime (snapshotTimestamp ).build ());
470566 }
471567
568+ if (filter != null && !filter .isEmpty ()) {
569+ createSessionRequestBuilder .setReadOptions (
570+ TableReadOptions .newBuilder ().setRowRestriction (filter ).build ());
571+ }
572+
472573 ReadSession session = client .createReadSession (createSessionRequestBuilder .build ());
473574 assertEquals (
474575 String .format (
@@ -492,6 +593,31 @@ private void ProcessRowsAtSnapshot(
492593 }
493594 }
494595
596+ /**
597+ * Reads all the rows from the specified table reference and returns a list as generic Avro
598+ * records.
599+ *
600+ * @param tableReference
601+ * @param filter Optional. If specified, it will be used to restrict returned data.
602+ * @return
603+ */
604+ List <GenericData .Record > ReadAllRows (TableReference tableReference , String filter )
605+ throws IOException {
606+ final List <GenericData .Record > rows = new ArrayList <>();
607+ ProcessRowsAtSnapshot (
608+ /* tableReference = */ tableReference ,
609+ /* snapshotInMillis = */ null ,
610+ /* filter = */ filter ,
611+ new AvroRowConsumer () {
612+ @ Override
613+ public void accept (GenericData .Record record ) {
614+ // clone the record since that reference will be reused by the reader.
615+ rows .add (new GenericRecordBuilder (record ).build ());
616+ }
617+ });
618+ return rows ;
619+ }
620+
495621 /**
496622 * Runs a query job with WRITE_APPEND disposition to the destination table and returns the
497623 * successfully completed job.
@@ -501,16 +627,26 @@ private void ProcessRowsAtSnapshot(
501627 * @return
502628 * @throws InterruptedException
503629 */
504- private Job RunQueryJobAndExpectSuccess (TableId destinationTableId , String query )
630+ private Job RunQueryAppendJobAndExpectSuccess (TableId destinationTableId , String query )
505631 throws InterruptedException {
506- QueryJobConfiguration configuration =
632+ return RunQueryJobAndExpectSuccess (
507633 QueryJobConfiguration .newBuilder (query )
508634 .setDestinationTable (destinationTableId )
509635 .setUseQueryCache (false )
510636 .setUseLegacySql (false )
511637 .setWriteDisposition (WriteDisposition .WRITE_APPEND )
512- .build ();
638+ .build ());
639+ }
513640
641+ /**
642+ * Runs a query job with provided configuration and returns the successfully completed job.
643+ *
644+ * @param configuration
645+ * @return
646+ * @throws InterruptedException
647+ */
648+ private Job RunQueryJobAndExpectSuccess (QueryJobConfiguration configuration )
649+ throws InterruptedException {
514650 Job job = bigquery .create (JobInfo .of (configuration ));
515651 Job completedJob =
516652 job .waitFor (
0 commit comments