1818
1919import static com .google .common .truth .Truth .assertWithMessage ;
2020import static org .junit .Assert .assertEquals ;
21+ import static org .junit .Assert .assertNotNull ;
22+ import static org .junit .Assert .assertNull ;
2123import static org .junit .Assert .assertTrue ;
2224
2325import com .google .api .gax .rpc .ServerStream ;
26+ import com .google .cloud .RetryOption ;
2427import com .google .cloud .ServiceOptions ;
28+ import com .google .cloud .bigquery .BigQuery ;
29+ import com .google .cloud .bigquery .DatasetInfo ;
30+ import com .google .cloud .bigquery .Field ;
31+ import com .google .cloud .bigquery .Field .Mode ;
32+ import com .google .cloud .bigquery .Job ;
33+ import com .google .cloud .bigquery .JobInfo ;
34+ import com .google .cloud .bigquery .JobInfo .WriteDisposition ;
35+ import com .google .cloud .bigquery .LegacySQLTypeName ;
36+ import com .google .cloud .bigquery .QueryJobConfiguration ;
37+ import com .google .cloud .bigquery .StandardTableDefinition ;
38+ import com .google .cloud .bigquery .TableId ;
39+ import com .google .cloud .bigquery .TableInfo ;
2540import com .google .cloud .bigquery .storage .v1beta1 .BigQueryStorageClient ;
2641import com .google .cloud .bigquery .storage .v1beta1 .ReadOptions .TableReadOptions ;
2742import com .google .cloud .bigquery .storage .v1beta1 .Storage .CreateReadSessionRequest ;
2843import com .google .cloud .bigquery .storage .v1beta1 .Storage .DataFormat ;
2944import com .google .cloud .bigquery .storage .v1beta1 .Storage .ReadRowsRequest ;
3045import com .google .cloud .bigquery .storage .v1beta1 .Storage .ReadRowsResponse ;
3146import com .google .cloud .bigquery .storage .v1beta1 .Storage .ReadSession ;
47+ import com .google .cloud .bigquery .storage .v1beta1 .Storage .Stream ;
3248import com .google .cloud .bigquery .storage .v1beta1 .Storage .StreamPosition ;
49+ import com .google .cloud .bigquery .storage .v1beta1 .TableReferenceProto .TableModifiers ;
3350import com .google .cloud .bigquery .storage .v1beta1 .TableReferenceProto .TableReference ;
51+ import com .google .cloud .bigquery .storage .v1beta1 .it .SimpleRowReader .AvroRowConsumer ;
52+ import com .google .cloud .bigquery .testing .RemoteBigQueryHelper ;
53+ import com .google .common .base .Preconditions ;
3454import com .google .protobuf .TextFormat ;
55+ import com .google .protobuf .Timestamp ;
3556import java .io .IOException ;
57+ import java .util .ArrayList ;
58+ import java .util .Arrays ;
59+ import java .util .Collections ;
60+ import java .util .Iterator ;
61+ import java .util .List ;
3662import java .util .logging .Logger ;
3763import org .apache .avro .Schema ;
3864import org .apache .avro .generic .GenericRecord ;
3965import org .apache .avro .util .Utf8 ;
4066import org .junit .AfterClass ;
4167import org .junit .BeforeClass ;
4268import org .junit .Test ;
69+ import org .threeten .bp .Duration ;
4370
4471/** Integration tests for BigQuery Storage API. */
4572public class ITBigQueryStorageTest {
4673
4774 private static final Logger LOG = Logger .getLogger (ITBigQueryStorageTest .class .getName ());
75+ private static final String DATASET = RemoteBigQueryHelper .generateDatasetName ();
76+ private static final String DESCRIPTION = "BigQuery Storage Java client test dataset" ;
4877
4978 private static BigQueryStorageClient client ;
5079 private static String parentProjectId ;
80+ private static BigQuery bigquery ;
5181
5282 @ BeforeClass
5383 public static void beforeClass () throws IOException {
@@ -58,13 +88,25 @@ public static void beforeClass() throws IOException {
5888 String .format (
5989 "%s tests running with parent project: %s" ,
6090 ITBigQueryStorageTest .class .getSimpleName (), parentProjectId ));
91+
92+ RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper .create ();
93+ bigquery = bigqueryHelper .getOptions ().getService ();
94+ DatasetInfo datasetInfo =
95+ DatasetInfo .newBuilder (/* datasetId = */ DATASET ).setDescription (DESCRIPTION ).build ();
96+ bigquery .create (datasetInfo );
97+ LOG .info ("Created test dataset: " + DATASET );
6198 }
6299
63100 @ AfterClass
64101 public static void afterClass () {
65102 if (client != null ) {
66103 client .close ();
67104 }
105+
106+ if (bigquery != null ) {
107+ RemoteBigQueryHelper .forceDelete (bigquery , DATASET );
108+ LOG .info ("Deleted test dataset: " + DATASET );
109+ }
68110 }
69111
70112 @ Test
@@ -76,7 +118,11 @@ public void testSimpleRead() {
76118 .setTableId ("shakespeare" )
77119 .build ();
78120
79- ReadSession session = client .createReadSession (tableReference , parentProjectId , 1 );
121+ ReadSession session =
122+ client .createReadSession (
123+ /* tableReference = */ tableReference ,
124+ /* parent = */ parentProjectId ,
125+ /* requestedStreams = */ 1 );
80126 assertEquals (
81127 String .format (
82128 "Did not receive expected number of streams for table reference '%s' CreateReadSession response:%n%s" ,
@@ -104,6 +150,57 @@ public void testSimpleRead() {
104150 assertEquals (164_656 , avroRowCount );
105151 }
106152
153+ @ Test
154+ public void testSimpleReadAndResume () {
155+ TableReference tableReference =
156+ TableReference .newBuilder ()
157+ .setProjectId ("bigquery-public-data" )
158+ .setDatasetId ("samples" )
159+ .setTableId ("shakespeare" )
160+ .build ();
161+
162+ ReadSession session =
163+ client .createReadSession (
164+ /* tableReference = */ tableReference ,
165+ /* parent = */ parentProjectId ,
166+ /* requestedStreams = */ 1 );
167+ assertEquals (
168+ String .format (
169+ "Did not receive expected number of streams for table reference '%s' CreateReadSession response:%n%s" ,
170+ TextFormat .shortDebugString (tableReference ), session .toString ()),
171+ 1 ,
172+ session .getStreamsCount ());
173+
174+ // We have to read some number of rows in order to be able to resume. More details:
175+ // https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#google.cloud.bigquery.storage.v1beta1.ReadRowsRequest
176+
177+ long avroRowCount = ReadStreamToOffset (session .getStreams (0 ), /* rowOffset = */ 34_846 );
178+
179+ StreamPosition readPosition =
180+ StreamPosition .newBuilder ()
181+ .setStream (session .getStreams (0 ))
182+ .setOffset (avroRowCount )
183+ .build ();
184+
185+ ReadRowsRequest readRowsRequest =
186+ ReadRowsRequest .newBuilder ().setReadPosition (readPosition ).build ();
187+
188+ ServerStream <ReadRowsResponse > stream = client .readRowsCallable ().call (readRowsRequest );
189+
190+ for (ReadRowsResponse response : stream ) {
191+ assertTrue (
192+ String .format (
193+ "Response is missing 'avro_rows'. Read %d rows so far. ReadRows response:%n%s" ,
194+ avroRowCount , response .toString ()),
195+ response .hasAvroRows ());
196+ avroRowCount += response .getAvroRows ().getRowCount ();
197+ }
198+
199+ // Verifies that the number of rows skipped and read equals to the total number of rows in the
200+ // table.
201+ assertEquals (164_656 , avroRowCount );
202+ }
203+
107204 @ Test
108205 public void testFilter () throws IOException {
109206 TableReference tableReference =
@@ -254,4 +351,178 @@ public void accept(GenericRecord record) {
254351
255352 assertEquals (1_333 , avroRowCount );
256353 }
354+
355+ @ Test
356+ public void testReadAtSnapshot () throws InterruptedException , IOException {
357+ Field intFieldSchema =
358+ Field .newBuilder ("col" , LegacySQLTypeName .INTEGER )
359+ .setMode (Mode .REQUIRED )
360+ .setDescription ("IntegerDescription" )
361+ .build ();
362+ com .google .cloud .bigquery .Schema tableSchema =
363+ com .google .cloud .bigquery .Schema .of (intFieldSchema );
364+
365+ TableId testTableId = TableId .of (/* dataset = */ DATASET , /* table = */ "test_read_snapshot" );
366+ bigquery .create (TableInfo .of (testTableId , StandardTableDefinition .of (tableSchema )));
367+
368+ TableReference tableReference =
369+ TableReference .newBuilder ()
370+ .setTableId (testTableId .getTable ())
371+ .setDatasetId (DATASET )
372+ .setProjectId (ServiceOptions .getDefaultProjectId ())
373+ .build ();
374+
375+ Job firstJob =
376+ RunQueryJobAndExpectSuccess (
377+ /* destinationTableId = */ testTableId , /* query = */ "SELECT 1 AS col" );
378+
379+ Job secondJob =
380+ RunQueryJobAndExpectSuccess (
381+ /* destinationTableId = */ testTableId , /* query = */ "SELECT 2 AS col" );
382+
383+ final List <Long > rowsAfterFirstSnapshot = new ArrayList <>();
384+ ProcessRowsAtSnapshot (
385+ tableReference ,
386+ firstJob .getStatistics ().getEndTime (),
387+ new AvroRowConsumer () {
388+ @ Override
389+ public void accept (GenericRecord record ) {
390+ rowsAfterFirstSnapshot .add ((Long ) record .get ("col" ));
391+ }
392+ });
393+ assertEquals (Arrays .asList (1L ), rowsAfterFirstSnapshot );
394+
395+ final List <Long > rowsAfterSecondSnapshot = new ArrayList <>();
396+ ProcessRowsAtSnapshot (
397+ tableReference ,
398+ secondJob .getStatistics ().getEndTime (),
399+ new AvroRowConsumer () {
400+ @ Override
401+ public void accept (GenericRecord record ) {
402+ rowsAfterSecondSnapshot .add ((Long ) record .get ("col" ));
403+ }
404+ });
405+ Collections .sort (rowsAfterSecondSnapshot );
406+ assertEquals (Arrays .asList (1L , 2L ), rowsAfterSecondSnapshot );
407+ }
408+
409+ /**
410+ * Reads to the specified row offset within the stream. If the stream does not have the desired
411+ * rows to read, it will read all of them.
412+ *
413+ * @param stream
414+ * @param rowOffset
415+ * @return the number of requested rows to skip or the total rows read if stream had less rows.
416+ */
417+ private long ReadStreamToOffset (Stream stream , long rowOffset ) {
418+ StreamPosition readPosition = StreamPosition .newBuilder ().setStream (stream ).build ();
419+
420+ ReadRowsRequest readRowsRequest =
421+ ReadRowsRequest .newBuilder ().setReadPosition (readPosition ).build ();
422+
423+ long avroRowCount = 0 ;
424+ ServerStream <ReadRowsResponse > serverStream = client .readRowsCallable ().call (readRowsRequest );
425+ Iterator <ReadRowsResponse > responseIterator = serverStream .iterator ();
426+
427+ while (responseIterator .hasNext ()) {
428+ ReadRowsResponse response = responseIterator .next ();
429+ avroRowCount += response .getAvroRows ().getRowCount ();
430+ if (avroRowCount >= rowOffset ) {
431+ return rowOffset ;
432+ }
433+ }
434+
435+ return avroRowCount ;
436+ }
437+
438+ /**
439+ * Reads all the rows from the specified tableReference that are added up to timestamp defined in
440+ * snapshot. If snapshot is not provided, current time will be used.
441+ *
442+ * <p>For every row, the consumer is called for processing.
443+ *
444+ * @param tableReference
445+ * @param snapshotInMillis
446+ * @param consumer
447+ * @throws IOException
448+ */
449+ private void ProcessRowsAtSnapshot (
450+ TableReference tableReference , Long snapshotInMillis , AvroRowConsumer consumer )
451+ throws IOException {
452+ Preconditions .checkNotNull (tableReference );
453+ Preconditions .checkNotNull (consumer );
454+
455+ CreateReadSessionRequest .Builder createSessionRequestBuilder =
456+ CreateReadSessionRequest .newBuilder ()
457+ .setParent (parentProjectId )
458+ .setRequestedStreams (1 )
459+ .setTableReference (tableReference )
460+ .setFormat (DataFormat .AVRO );
461+
462+ if (snapshotInMillis != null ) {
463+ Timestamp snapshotTimestamp =
464+ Timestamp .newBuilder ()
465+ .setSeconds (snapshotInMillis / 1_000 )
466+ .setNanos ((int ) ((snapshotInMillis % 1000 ) * 1000000 ))
467+ .build ();
468+ createSessionRequestBuilder .setTableModifiers (
469+ TableModifiers .newBuilder ().setSnapshotTime (snapshotTimestamp ).build ());
470+ }
471+
472+ ReadSession session = client .createReadSession (createSessionRequestBuilder .build ());
473+ assertEquals (
474+ String .format (
475+ "Did not receive expected number of streams for table reference '%s' CreateReadSession response:%n%s" ,
476+ TextFormat .shortDebugString (tableReference ), session .toString ()),
477+ 1 ,
478+ session .getStreamsCount ());
479+
480+ StreamPosition readPosition =
481+ StreamPosition .newBuilder ().setStream (session .getStreams (0 )).build ();
482+
483+ ReadRowsRequest readRowsRequest =
484+ ReadRowsRequest .newBuilder ().setReadPosition (readPosition ).build ();
485+
486+ SimpleRowReader reader =
487+ new SimpleRowReader (new Schema .Parser ().parse (session .getAvroSchema ().getSchema ()));
488+
489+ ServerStream <ReadRowsResponse > stream = client .readRowsCallable ().call (readRowsRequest );
490+ for (ReadRowsResponse response : stream ) {
491+ reader .processRows (response .getAvroRows (), consumer );
492+ }
493+ }
494+
495+ /**
496+ * Runs a query job with WRITE_APPEND disposition to the destination table and returns the
497+ * successfully completed job.
498+ *
499+ * @param destinationTableId
500+ * @param query
501+ * @return
502+ * @throws InterruptedException
503+ */
504+ private Job RunQueryJobAndExpectSuccess (TableId destinationTableId , String query )
505+ throws InterruptedException {
506+ QueryJobConfiguration configuration =
507+ QueryJobConfiguration .newBuilder (query )
508+ .setDestinationTable (destinationTableId )
509+ .setUseQueryCache (false )
510+ .setUseLegacySql (false )
511+ .setWriteDisposition (WriteDisposition .WRITE_APPEND )
512+ .build ();
513+
514+ Job job = bigquery .create (JobInfo .of (configuration ));
515+ Job completedJob =
516+ job .waitFor (
517+ RetryOption .initialRetryDelay (Duration .ofSeconds (1 )),
518+ RetryOption .totalTimeout (Duration .ofMinutes (1 )));
519+
520+ assertNotNull (completedJob );
521+ assertNull (
522+ /* message = */ "Received a job status that is not a success: "
523+ + completedJob .getStatus ().toString (),
524+ /* object = */ completedJob .getStatus ().getError ());
525+
526+ return completedJob ;
527+ }
257528}
0 commit comments