Skip to content

Commit 6125c56

Browse files
authored
bigquery: make query return a Job (#2566)
Eventually we'll delete QueryResponse completely so that we can use the faster listTableData RPC.
1 parent e30b794 commit 6125c56

9 files changed

Lines changed: 85 additions & 72 deletions

File tree

google-cloud-bigquery/src/benchmark/java/com/google/cloud/bigquery/benchmark/Benchmark.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import com.google.cloud.bigquery.BigQuery;
2121
import com.google.cloud.bigquery.BigQueryOptions;
2222
import com.google.cloud.bigquery.FieldValue;
23+
import com.google.cloud.bigquery.Job;
2324
import com.google.cloud.bigquery.QueryJobConfiguration;
24-
import com.google.cloud.bigquery.QueryResponse;
2525
import java.io.FileInputStream;
2626
import java.util.List;
2727
import org.threeten.bp.Clock;
@@ -53,13 +53,13 @@ public static void main(String[] args) throws Exception {
5353
}
5454

5555
Instant start = clock.instant();
56-
QueryResponse queryResponse =
57-
bq.query(QueryJobConfiguration.newBuilder(request).setUseLegacySql(false).build());
56+
Job job = bq.query(QueryJobConfiguration.newBuilder(request).setUseLegacySql(false).build());
57+
job = job.waitFor();
5858

5959
int rows = 0;
6060
int cols = 0;
6161
Duration firstByte = null;
62-
for (List<FieldValue> row : queryResponse.getResult().iterateAll()) {
62+
for (List<FieldValue> row : job.getQueryResults().getResult().iterateAll()) {
6363
rows++;
6464
if (cols == 0) {
6565
cols = row.size();

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import com.google.common.base.Function;
2828
import com.google.common.collect.ImmutableList;
2929
import com.google.common.collect.Lists;
30-
3130
import java.io.Serializable;
3231
import java.util.ArrayList;
3332
import java.util.List;
@@ -989,11 +988,14 @@ Page<FieldValueList> listTableData(String datasetId, String tableId,
989988
*/
990989
boolean cancel(JobId jobId);
991990

991+
// TODO(pongad): rewrite query() samples.
992+
992993
/**
993-
* Runs the query associated with the request, using an internally-generated random JobId.
994+
* Runs the query associated with the request, using an internally-generated random JobId. The returned job is always completed.
994995
*
995996
* <p>Example of running a query.
996-
* <pre> {@code
997+
*
998+
* <pre>{@code
997999
* String query = "SELECT distinct(corpus) FROM `bigquery-public-data.samples.shakespeare`";
9981000
* QueryJobConfiguration queryConfig = QueryJobConfiguration.of(query);
9991001
*
@@ -1013,7 +1015,8 @@ Page<FieldValueList> listTableData(String datasetId, String tableId,
10131015
* }</pre>
10141016
*
10151017
* <p>Example of running a query with query parameters.
1016-
* <pre> {@code
1018+
*
1019+
* <pre>{@code
10171020
* String query =
10181021
* "SELECT distinct(corpus) FROM `bigquery-public-data.samples.shakespeare` where word_count > ?";
10191022
* QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query)
@@ -1033,32 +1036,31 @@ Page<FieldValueList> listTableData(String datasetId, String tableId,
10331036
* @throws InterruptedException if the current thread gets interrupted while waiting for the query
10341037
* to complete
10351038
*/
1036-
QueryResponse query(QueryJobConfiguration configuration, QueryOption... options)
1037-
throws InterruptedException;
1039+
Job query(QueryJobConfiguration configuration, JobOption... options) throws InterruptedException;
10381040

10391041
/**
1040-
* Runs the query associated with the request, using the given job id.
1042+
* Runs the query associated with the request, using the given JobId. The returned job is always completed.
10411043
*
1042-
* <p>See {@link #query(QueryJobConfiguration, QueryOption...)} for examples on populating a
1043-
* {@link QueryJobConfiguration}.
1044+
* <p>See {@link #query(QueryJobConfiguration, JobOption...)} for examples on populating a {@link
1045+
* QueryJobConfiguration}.
10441046
*
1045-
* <p>
1046-
* The recommended way to create a randomly generated JobId is the following:
1047+
* <p>The recommended way to create a randomly generated JobId is the following:
10471048
*
1048-
* <pre> {@code
1049-
* JobId jobId = JobId.of();
1049+
* <pre>{@code
1050+
* JobId jobId = JobId.of();
10501051
* }</pre>
10511052
*
10521053
* For a user specified job id with an optional prefix use the following:
1053-
* <pre> {@code
1054-
* JobId jobId = JobId.of("my_prefix-my_unique_job_id");
1054+
*
1055+
* <pre>{@code
1056+
* JobId jobId = JobId.of("my_prefix-my_unique_job_id");
10551057
* }</pre>
10561058
*
10571059
* @throws BigQueryException upon failure
10581060
* @throws InterruptedException if the current thread gets interrupted while waiting for the query
10591061
* to complete
10601062
*/
1061-
QueryResponse query(QueryJobConfiguration configuration, JobId jobId, QueryOption... options)
1063+
Job query(QueryJobConfiguration configuration, JobId jobId, JobOption... options)
10621064
throws InterruptedException;
10631065

10641066
/**

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import static com.google.common.base.Preconditions.checkArgument;
2121

2222
import com.google.api.gax.paging.Page;
23-
import com.google.api.gax.retrying.RetrySettings;
2423
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
2524
import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
2625
import com.google.api.services.bigquery.model.TableDataInsertAllRequest.Rows;
@@ -32,10 +31,10 @@
3231
import com.google.cloud.PageImpl.NextPageFetcher;
3332
import com.google.cloud.RetryHelper;
3433
import com.google.cloud.RetryHelper.RetryHelperException;
35-
import com.google.cloud.RetryOption;
3634
import com.google.cloud.Tuple;
3735
import com.google.cloud.bigquery.InsertAllRequest.RowToInsert;
3836
import com.google.cloud.bigquery.spi.v2.BigQueryRpc;
37+
import com.google.common.annotations.VisibleForTesting;
3938
import com.google.common.base.Function;
4039
import com.google.common.collect.ImmutableList;
4140
import com.google.common.collect.ImmutableMap;
@@ -584,24 +583,15 @@ public Boolean call() {
584583
}
585584

586585
@Override
587-
public QueryResponse query(QueryJobConfiguration configuration, QueryOption... options)
586+
public Job query(QueryJobConfiguration configuration, JobOption... options)
588587
throws InterruptedException {
589588
return query(configuration, JobId.of(), options);
590589
}
591590

592591
@Override
593-
public QueryResponse query(QueryJobConfiguration configuration, JobId jobId, QueryOption... options)
592+
public Job query(QueryJobConfiguration configuration, JobId jobId, JobOption... options)
594593
throws InterruptedException {
595-
try {
596-
JobInfo jobInfo = JobInfo.newBuilder(configuration).setJobId(jobId).build();
597-
Job job = create(jobInfo);
598-
RetrySettings waitSettings =
599-
RetryOption.mergeToSettings(
600-
Job.DEFAULT_QUERY_JOB_WAIT_SETTINGS, QueryOption.filterRetryOptions(options));
601-
return job.waitForQueryResults(waitSettings, QueryOption.filterQueryResultsOptions(options));
602-
} catch (RetryHelper.RetryHelperException e) {
603-
throw BigQueryException.translateAndThrow(e);
604-
}
594+
return create(JobInfo.of(jobId, configuration), options).waitFor();
605595
}
606596

607597
@Override
@@ -677,7 +667,8 @@ public TableDataWriteChannel writer(WriteChannelConfiguration writeChannelConfig
677667
writeChannelConfiguration.setProjectId(getOptions().getProjectId()));
678668
}
679669

680-
private Map<BigQueryRpc.Option, ?> optionMap(Option... options) {
670+
@VisibleForTesting
671+
static Map<BigQueryRpc.Option, ?> optionMap(Option... options) {
681672
Map<BigQueryRpc.Option, Object> optionMap = Maps.newEnumMap(BigQueryRpc.Option.class);
682673
for (Option option : options) {
683674
Object prev = optionMap.put(option.getRpcOption(), option.getValue());

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/QueryResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
/**
2828
* Google Cloud BigQuery Query Response. This class contains the results of a Query Job
2929
* ({@link BigQuery#getQueryResults(JobId, BigQuery.QueryResultsOption...)}) or of a
30-
* Query Request ({@link BigQuery#query(QueryJobConfiguration, QueryOption...)}).
30+
* Query Request ({@link BigQuery#query(QueryJobConfiguration, JobOption...)}).
3131
*
3232
* <p>Example usage of a query response:
3333
* <pre> {@code

google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import com.google.cloud.ServiceOptions;
3737
import com.google.cloud.Tuple;
3838
import com.google.cloud.WriteChannel;
39-
import com.google.cloud.bigquery.BigQuery.QueryOption;
4039
import com.google.cloud.bigquery.BigQuery.QueryResultsOption;
4140
import com.google.cloud.bigquery.InsertAllRequest.RowToInsert;
4241
import com.google.cloud.bigquery.spi.BigQueryRpcFactory;
@@ -1134,15 +1133,24 @@ public void testQueryRequestCompleted() throws InterruptedException {
11341133
Map<BigQueryRpc.Option, Object> optionMap = Maps.newEnumMap(BigQueryRpc.Option.class);
11351134
QueryResultsOption pageSizeOption = QueryResultsOption.pageSize(42L);
11361135
optionMap.put(pageSizeOption.getRpcOption(), pageSizeOption.getValue());
1136+
1137+
EasyMock.expect(
1138+
bigqueryRpcMock.getQueryResults(
1139+
PROJECT, JOB, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
1140+
.andReturn(responsePb);
1141+
EasyMock.expect(
1142+
bigqueryRpcMock.getJob(
1143+
PROJECT, JOB, Collections.<BigQueryRpc.Option, Object>emptyMap()))
1144+
.andReturn(jobResponsePb);
11371145
EasyMock.expect(
11381146
bigqueryRpcMock.getQueryResults(PROJECT, JOB, optionMap)).andReturn(responsePb);
11391147

11401148
EasyMock.replay(bigqueryRpcMock);
11411149
bigquery = options.getService();
1142-
QueryResponse response = bigquery.query(
1143-
QUERY_JOB_CONFIGURATION_FOR_QUERY,
1144-
queryJob,
1145-
QueryOption.of(QueryResultsOption.pageSize(42L)));
1150+
QueryResponse response =
1151+
bigquery
1152+
.query(QUERY_JOB_CONFIGURATION_FOR_QUERY, queryJob)
1153+
.getQueryResults(QueryResultsOption.pageSize(42L));
11461154
assertNull(response.getEtag());
11471155
assertEquals(queryJob, response.getJobId());
11481156
assertEquals(true, response.jobCompleted());
@@ -1190,15 +1198,28 @@ public void testQueryRequestCompletedOnSecondAttempt() throws InterruptedExcepti
11901198
QueryResultsOption pageSizeOption = QueryResultsOption.pageSize(42L);
11911199
Map<BigQueryRpc.Option, Object> optionMap = Maps.newEnumMap(BigQueryRpc.Option.class);
11921200
optionMap.put(pageSizeOption.getRpcOption(), pageSizeOption.getValue());
1201+
1202+
EasyMock.expect(
1203+
bigqueryRpcMock.getQueryResults(
1204+
PROJECT, JOB, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
1205+
.andReturn(responsePb1);
11931206
EasyMock.expect(
1194-
bigqueryRpcMock.getQueryResults(PROJECT, JOB, optionMap)).andReturn(responsePb1);
1207+
bigqueryRpcMock.getQueryResults(
1208+
PROJECT, JOB, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
1209+
.andReturn(responsePb2);
1210+
EasyMock.expect(
1211+
bigqueryRpcMock.getJob(
1212+
PROJECT, JOB, Collections.<BigQueryRpc.Option, Object>emptyMap()))
1213+
.andReturn(jobResponsePb);
11951214
EasyMock.expect(
11961215
bigqueryRpcMock.getQueryResults(PROJECT, JOB, optionMap)).andReturn(responsePb2);
11971216

11981217
EasyMock.replay(bigqueryRpcMock);
11991218
bigquery = options.getService();
1200-
QueryResponse response = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_QUERY, queryJob,
1201-
QueryOption.of(QueryResultsOption.pageSize(42L)));
1219+
QueryResponse response =
1220+
bigquery
1221+
.query(QUERY_JOB_CONFIGURATION_FOR_QUERY, queryJob)
1222+
.getQueryResults(QueryResultsOption.pageSize(42L));
12021223
assertNull(response.getEtag());
12031224
assertEquals(queryJob, response.getJobId());
12041225
assertEquals(true, response.jobCompleted());

google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import com.google.cloud.bigquery.BigQuery.JobField;
3434
import com.google.cloud.bigquery.BigQuery.JobListOption;
3535
import com.google.cloud.bigquery.BigQuery.JobOption;
36-
import com.google.cloud.bigquery.BigQuery.QueryOption;
3736
import com.google.cloud.bigquery.BigQuery.QueryResultsOption;
3837
import com.google.cloud.bigquery.BigQuery.TableField;
3938
import com.google.cloud.bigquery.BigQuery.TableOption;
@@ -428,8 +427,10 @@ public void testCreateExternalTable() throws InterruptedException {
428427
.setDefaultDataset(DatasetId.of(DATASET))
429428
.setUseLegacySql(true)
430429
.build();
431-
QueryResponse response = bigquery.query(
432-
config, QueryOption.of(QueryResultsOption.maxWaitTime(60000L)));
430+
QueryResponse response =
431+
bigquery
432+
.query(config)
433+
.getQueryResults();
433434
long integerValue = 0;
434435
int rowCount = 0;
435436
for (FieldValueList row : response.getResult().getValues()) {
@@ -488,10 +489,10 @@ public void testCreateViewTable() throws InterruptedException {
488489
.setDefaultDataset(DatasetId.of(DATASET))
489490
.setUseLegacySql(true)
490491
.build();
491-
QueryResponse response = bigquery.query(
492-
config,
493-
QueryOption.of(QueryResultsOption.maxWaitTime(60000L)),
494-
QueryOption.of(QueryResultsOption.pageSize(1000L)));
492+
QueryResponse response =
493+
bigquery
494+
.query(config)
495+
.getQueryResults(QueryResultsOption.pageSize(1000L));
495496
int rowCount = 0;
496497
for (FieldValueList row : response.getResult().getValues()) {
497498
FieldValue timestampCell = row.get(0);
@@ -778,10 +779,10 @@ public void testQuery() throws InterruptedException {
778779
QueryJobConfiguration config = QueryJobConfiguration.newBuilder(query)
779780
.setDefaultDataset(DatasetId.of(DATASET))
780781
.build();
781-
QueryResponse response = bigquery.query(
782-
config,
783-
QueryOption.of(QueryResultsOption.maxWaitTime(60000L)),
784-
QueryOption.of(QueryResultsOption.pageSize(1000L)));
782+
QueryResponse response =
783+
bigquery
784+
.query(config)
785+
.getQueryResults(QueryResultsOption.pageSize(1000L));
785786
assertEquals(QUERY_RESULT_SCHEMA, response.getResult().getSchema());
786787
int rowCount = 0;
787788
for (FieldValueList row : response.getResult().getValues()) {
@@ -830,10 +831,10 @@ public void testPositionalQueryParameters() throws InterruptedException {
830831
.addPositionalParameter(int64Parameter)
831832
.addPositionalParameter(float64Parameter)
832833
.build();
833-
QueryResponse response = bigquery.query(
834-
config,
835-
QueryOption.of(QueryResultsOption.maxWaitTime(60000L)),
836-
QueryOption.of(QueryResultsOption.pageSize(1000L)));
834+
QueryResponse response =
835+
bigquery
836+
.query(config)
837+
.getQueryResults(QueryResultsOption.pageSize(1000L));
837838
assertEquals(QUERY_RESULT_SCHEMA, response.getResult().getSchema());
838839
assertEquals(2, Iterables.size(response.getResult().getValues()));
839840
}
@@ -853,10 +854,10 @@ public void testNamedQueryParameters() throws InterruptedException {
853854
.addNamedParameter("stringParam", stringParameter)
854855
.addNamedParameter("integerList", intArrayParameter)
855856
.build();
856-
QueryResponse response = bigquery.query(
857-
config,
858-
QueryOption.of(QueryResultsOption.maxWaitTime(60000L)),
859-
QueryOption.of(QueryResultsOption.pageSize(1000L)));
857+
QueryResponse response =
858+
bigquery
859+
.query(config)
860+
.getQueryResults(QueryResultsOption.pageSize(1000L));
860861
assertEquals(QUERY_RESULT_SCHEMA, response.getResult().getSchema());
861862
assertEquals(2, Iterables.size(response.getResult().getValues()));
862863
}
@@ -870,8 +871,8 @@ public void testBytesParameter() throws Exception {
870871
.setUseLegacySql(false)
871872
.addNamedParameter("p", bytesParameter)
872873
.build();
873-
QueryResponse response = bigquery.query(
874-
config, QueryOption.of(QueryResultsOption.maxWaitTime(60000L)));
874+
QueryResponse response =
875+
bigquery.query(config).getQueryResults(QueryResultsOption.pageSize(1000L));
875876
int rowCount = 0;
876877
for (FieldValueList row : response.getResult().getValues()) {
877878
rowCount++;

google-cloud-examples/src/main/java/com/google/cloud/examples/bigquery/BigQueryExample.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -621,7 +621,7 @@ private static class QueryAction extends BigQueryAction<QueryJobConfiguration> {
621621
@Override
622622
void run(BigQuery bigquery, QueryJobConfiguration queryConfig) throws Exception {
623623
System.out.println("Running query");
624-
QueryResponse queryResponse = bigquery.query(queryConfig);
624+
QueryResponse queryResponse = bigquery.query(queryConfig).getQueryResults();
625625
if (!queryResponse.hasErrors()) {
626626
System.out.println("Query succeeded. Results:");
627627
for (FieldValueList row : queryResponse.getResult().iterateAll()) {

google-cloud-examples/src/main/java/com/google/cloud/examples/bigquery/snippets/BigQuerySnippets.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ public QueryResponse runQuery(String query) throws InterruptedException {
577577
// [START runQuery]
578578
QueryJobConfiguration queryConfig =
579579
QueryJobConfiguration.newBuilder(query).setUseLegacySql(true).build();
580-
QueryResponse response = bigquery.query(queryConfig);
580+
QueryResponse response = bigquery.query(queryConfig).getQueryResults();
581581
if (response.hasErrors()) {
582582
// handle errors
583583
}
@@ -600,7 +600,7 @@ public QueryResponse runQueryWithParameters(String query) throws InterruptedExce
600600
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query)
601601
.addNamedParameter("wordCount", QueryParameterValue.int64(5))
602602
.build();
603-
QueryResponse response = bigquery.query(queryConfig);
603+
QueryResponse response = bigquery.query(queryConfig).getQueryResults();
604604
if (response.hasErrors()) {
605605
// handle errors
606606
}
@@ -621,7 +621,7 @@ public QueryResponse queryResults(final String query) throws InterruptedExceptio
621621
// [START queryResults]
622622
QueryJobConfiguration queryConfig =
623623
QueryJobConfiguration.newBuilder(query).setUseLegacySql(true).build();
624-
QueryResponse response = bigquery.query(queryConfig);
624+
QueryResponse response = bigquery.query(queryConfig).getQueryResults();
625625
if (response.hasErrors()) {
626626
// handle errors
627627
}

google-cloud-examples/src/main/java/com/google/cloud/examples/bigquery/snippets/InsertDataAndQueryTable.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,7 @@ public static void main(String... args) throws InterruptedException {
8585
QueryJobConfiguration.newBuilder("SELECT * FROM my_dataset_id.my_table_id").build();
8686
// Request query to be executed and wait for results
8787
QueryResponse queryResponse = bigquery.query(
88-
queryConfig,
89-
QueryOption.of(QueryResultsOption.maxWaitTime(60000L)),
90-
QueryOption.of(QueryResultsOption.pageSize(1000L)));
88+
queryConfig).getQueryResults(QueryResultsOption.pageSize(1000L));
9189
// Read rows
9290
System.out.println("Table rows:");
9391
for (FieldValueList row : queryResponse.getResult().iterateAll()) {

0 commit comments

Comments
 (0)