Skip to content

Commit 0e2e9ed

Browse files
authored
bigquery: make query() return iterator (#2685)
This also makes queries require fewer RPCs if detailed job statuses are not required. If a query fails, then query() throws an exception detailing why it fails and provides the job ID so users can retrieve more details themselves. Updates #2591.
1 parent f6bcbc2 commit 0e2e9ed

15 files changed

Lines changed: 124 additions & 151 deletions

File tree

google-cloud-bigquery/README.md

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -171,27 +171,18 @@ directly or through a Query Job. In this code snippet we show how to run a query
171171
for the result. Add the following imports at the top of your file:
172172

173173
```java
174-
import com.google.cloud.bigquery.FieldValue;
174+
import com.google.cloud.bigquery.FieldValueList;
175175
import com.google.cloud.bigquery.QueryJobConfiguration;
176-
import com.google.cloud.bigquery.QueryResponse;
177-
178-
import java.util.Iterator;
179-
import java.util.List;
180176
```
181177
Then add the following code to run the query and wait for the result:
182178

183179
```java
184180
// Create a query request
185-
QueryJobConfiguration queryConfig =
186-
QueryJobConfiguration.of("SELECT * FROM my_dataset_id.my_table_id");
187-
// Request query to be executed and wait for results
188-
QueryResponse queryResponse = bigquery.query(
189-
queryConfig,
190-
QueryOption.of(QueryResultsOption.maxWaitTime(60000L)),
191-
QueryOption.of(QueryResultsOption.pageSize(1000L)));
181+
QueryJobConfiguration queryConfig =
182+
QueryJobConfiguration.newBuilder("SELECT * FROM my_dataset_id.my_table_id").build();
192183
// Read rows
193184
System.out.println("Table rows:");
194-
for (FieldValues row : queryResponse.getResult().iterateAll()) {
185+
for (FieldValueList row : bigquery.query(queryConfig).iterateAll()) {
195186
System.out.println(row);
196187
}
197188
```

google-cloud-bigquery/src/benchmark/java/com/google/cloud/bigquery/benchmark/Benchmark.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import com.google.cloud.bigquery.BigQuery;
2121
import com.google.cloud.bigquery.BigQueryOptions;
2222
import com.google.cloud.bigquery.FieldValue;
23-
import com.google.cloud.bigquery.Job;
2423
import com.google.cloud.bigquery.QueryJobConfiguration;
24+
import com.google.cloud.bigquery.QueryResult;
2525
import java.io.FileInputStream;
2626
import java.util.List;
2727
import org.threeten.bp.Clock;
@@ -53,13 +53,13 @@ public static void main(String[] args) throws Exception {
5353
}
5454

5555
Instant start = clock.instant();
56-
Job job = bq.query(QueryJobConfiguration.newBuilder(request).setUseLegacySql(false).build());
57-
job = job.waitFor();
56+
QueryResult result =
57+
bq.query(QueryJobConfiguration.newBuilder(request).setUseLegacySql(false).build());
5858

5959
int rows = 0;
6060
int cols = 0;
6161
Duration firstByte = null;
62-
for (List<FieldValue> row : job.getQueryResults().iterateAll()) {
62+
for (List<FieldValue> row : result.iterateAll()) {
6363
rows++;
6464
if (cols == 0) {
6565
cols = row.size();

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -989,8 +989,7 @@ Page<FieldValueList> listTableData(String datasetId, String tableId,
989989
boolean cancel(JobId jobId);
990990

991991
/**
992-
* Runs the query associated with the request, using an internally-generated random JobId. The
993-
* returned job is always completed.
992+
* Runs the query associated with the request, using an internally-generated random JobId.
994993
*
995994
* <p>Example of running a query.
996995
*
@@ -1003,11 +1002,7 @@ Page<FieldValueList> listTableData(String datasetId, String tableId,
10031002
* // QueryJobConfiguration queryConfig =
10041003
* // QueryJobConfiguration.newBuilder(query).setUseLegacySql(true).build();
10051004
*
1006-
* Job job = bigquery.query(queryConfig);
1007-
* if (job.getStatus().getError() != null) {
1008-
* // handle errors
1009-
* }
1010-
* for (FieldValueList row : job.getQueryResults().iterateAll()) {
1005+
* for (FieldValueList row : bigquery.query(queryConfig).iterateAll()) {
10111006
* // do something with the data
10121007
* }
10131008
* }</pre>
@@ -1020,23 +1015,21 @@ Page<FieldValueList> listTableData(String datasetId, String tableId,
10201015
* QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query)
10211016
* .addPositionalParameter(QueryParameterValue.int64(5))
10221017
* .build();
1023-
* Job job = bigquery.query(queryConfig);
1024-
* if (job.getStatus().getError() != null) {
1025-
* // handle errors
1026-
* }
1027-
* for (FieldValueList row : job.getQueryResults().iterateAll()) {
1018+
* for (FieldValueList row : bigquery.query(queryConfig).iterateAll()) {
10281019
* // do something with the data
10291020
* }
10301021
* }</pre>
10311022
*
10321023
* @throws BigQueryException upon failure
10331024
* @throws InterruptedException if the current thread gets interrupted while waiting for the query
10341025
* to complete
1026+
* @throws JobException if the job completes unsuccessfully
10351027
*/
1036-
Job query(QueryJobConfiguration configuration, JobOption... options) throws InterruptedException;
1028+
QueryResult query(QueryJobConfiguration configuration, JobOption... options)
1029+
throws InterruptedException, JobException;
10371030

10381031
/**
1039-
* Runs the query associated with the request, using the given JobId. The returned job is always completed.
1032+
* Runs the query associated with the request, using the given JobId.
10401033
*
10411034
* <p>See {@link #query(QueryJobConfiguration, JobOption...)} for examples on populating a {@link
10421035
* QueryJobConfiguration}.
@@ -1056,9 +1049,10 @@ Page<FieldValueList> listTableData(String datasetId, String tableId,
10561049
* @throws BigQueryException upon failure
10571050
* @throws InterruptedException if the current thread gets interrupted while waiting for the query
10581051
* to complete
1052+
* @throws JobException if the job completes unsuccessfully
10591053
*/
1060-
Job query(QueryJobConfiguration configuration, JobId jobId, JobOption... options)
1061-
throws InterruptedException;
1054+
QueryResult query(QueryJobConfiguration configuration, JobId jobId, JobOption... options)
1055+
throws InterruptedException, JobException;
10621056

10631057
/**
10641058
* Returns results of the query associated with the provided job.

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import com.google.common.collect.Lists;
4242
import com.google.common.collect.Maps;
4343
import java.util.List;
44+
import com.google.api.services.bigquery.model.ErrorProto;
4445
import java.util.Map;
4546
import java.util.concurrent.Callable;
4647

@@ -560,15 +561,15 @@ public Boolean call() {
560561
}
561562

562563
@Override
563-
public Job query(QueryJobConfiguration configuration, JobOption... options)
564-
throws InterruptedException {
564+
public QueryResult query(QueryJobConfiguration configuration, JobOption... options)
565+
throws InterruptedException, JobException {
565566
return query(configuration, JobId.of(), options);
566567
}
567568

568569
@Override
569-
public Job query(QueryJobConfiguration configuration, JobId jobId, JobOption... options)
570-
throws InterruptedException {
571-
return create(JobInfo.of(jobId, configuration), options).waitFor();
570+
public QueryResult query(QueryJobConfiguration configuration, JobId jobId, JobOption... options)
571+
throws InterruptedException, JobException {
572+
return create(JobInfo.of(jobId, configuration), options).getQueryResults();
572573
}
573574

574575
@Override
@@ -590,10 +591,19 @@ public GetQueryResultsResponse call() {
590591
}
591592
}, serviceOptions.getRetrySettings(), EXCEPTION_HANDLER, serviceOptions.getClock());
592593
TableSchema schemaPb = results.getSchema();
594+
595+
ImmutableList.Builder<BigQueryError> errors = ImmutableList.builder();
596+
if (results.getErrors() != null) {
597+
for (ErrorProto error : results.getErrors()) {
598+
errors.add(BigQueryError.fromPb(error));
599+
}
600+
}
601+
593602
return QueryResponse.newBuilder()
594603
.setCompleted(results.getJobComplete())
595604
.setSchema(schemaPb == null ? null : Schema.fromPb(schemaPb))
596605
.setTotalRows(results.getTotalRows() == null ? 0 : results.getTotalRows().longValue())
606+
.setErrors(errors.build())
597607
.build();
598608
} catch (RetryHelper.RetryHelperException e) {
599609
throw BigQueryException.translateAndThrow(e);

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -246,14 +246,16 @@ public Job waitFor(RetryOption... waitOptions) throws InterruptedException {
246246
/**
247247
* Gets the query results of this job. This job must be of type {@code
248248
* JobConfiguration.Type.QUERY}, otherwise this method will throw {@link
249-
* UnsupportedOperationException}. This method does not wait for the job to complete, to ensure
250-
* that the job is completed first call {@link #waitFor(RetryOption...)} method.
249+
* UnsupportedOperationException}.
250+
*
251+
* <p>If the job hasn't finished, this method waits for the job to complete. However, the state of
252+
* the current {@code Job} instance is not updated. To get the new state, call {@link
253+
* #waitFor(RetryOption...)} or {@link #reload(JobOption...)}.
251254
*
252255
* <p>Example of getting the results of a query job.
253256
*
254257
* <pre>{@code
255258
* Job job = bigquery.create(queryJobInfo);
256-
* job = job.waitFor();
257259
* QueryResult result = job.getQueryResults();
258260
* for (FieldValueList row : result.iterateAll()) {
259261
* // do something with the data
@@ -262,23 +264,20 @@ public Job waitFor(RetryOption... waitOptions) throws InterruptedException {
262264
*
263265
* @throws BigQueryException upon failure
264266
*/
265-
public QueryResult getQueryResults(QueryResultsOption... options) {
267+
public QueryResult getQueryResults(QueryResultsOption... options)
268+
throws InterruptedException, JobException {
266269
if (getConfiguration().getType() != Type.QUERY) {
267270
throw new UnsupportedOperationException(
268271
"Getting query results is supported only for " + Type.QUERY + " jobs");
269272
}
270-
if (getStatus().getState() != JobStatus.State.DONE) {
271-
throw new BigQueryException(BigQueryException.UNKNOWN_CODE, "the job hasn't completed yet");
272-
}
273-
if (getStatus().getError() != null) {
274-
throw new BigQueryException(
275-
BigQueryException.UNKNOWN_CODE, "job completed with error", getStatus().getError());
276-
}
277273

278274
TableId table = ((QueryJobConfiguration) getConfiguration()).getDestinationTable();
275+
// TODO(pongad): merge options?
279276
QueryResponse response =
280-
bigquery.getQueryResults(
281-
getJobId(), DEFAULT_QUERY_WAIT_OPTIONS); // should return immediately
277+
waitForQueryResults(DEFAULT_JOB_WAIT_SETTINGS, DEFAULT_QUERY_WAIT_OPTIONS);
278+
if (response.getSchema() == null) {
279+
throw new JobException(getJobId(), response.getErrors());
280+
}
282281
return new QueryResult(
283282
response.getSchema(), response.getTotalRows(), bigquery.listTableData(table));
284283
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2017, Google LLC All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.bigquery;
18+
19+
import com.google.common.collect.ImmutableList;
20+
import java.util.List;
21+
22+
/** Exception describing a failure of a job. */
23+
public class JobException extends RuntimeException {
24+
private final JobId id;
25+
private final ImmutableList<BigQueryError> errors;
26+
27+
JobException(JobId id, ImmutableList<BigQueryError> errors) {
28+
super(String.format("job %s failed with error: %s", id, errors));
29+
this.id = id;
30+
this.errors = errors;
31+
}
32+
33+
/** The ID for the failed job. */
34+
public JobId getId() {
35+
return id;
36+
}
37+
/**
38+
* The errors reported by the job.
39+
*
40+
* <p>The list is immutable.
41+
*/
42+
public List<BigQueryError> getErrors() {
43+
return errors;
44+
}
45+
}

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/QueryResponse.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.api.core.InternalApi;
2020
import com.google.auto.value.AutoValue;
21+
import com.google.common.collect.ImmutableList;
2122
import javax.annotation.Nullable;
2223

2324
@InternalApi
@@ -33,6 +34,8 @@ public abstract class QueryResponse {
3334

3435
abstract long getTotalRows();
3536

37+
abstract ImmutableList<BigQueryError> getErrors();
38+
3639
static Builder newBuilder() {
3740
return new AutoValue_QueryResponse.Builder();
3841
}
@@ -45,6 +48,8 @@ abstract static class Builder {
4548

4649
abstract Builder setTotalRows(long val);
4750

51+
abstract Builder setErrors(ImmutableList<BigQueryError> val);
52+
4853
abstract QueryResponse build();
4954
}
5055

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/QueryResult.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,17 @@
2222
import com.google.common.base.Function;
2323
import com.google.common.base.MoreObjects;
2424
import com.google.common.collect.Iterables;
25+
import java.io.Serializable;
2526
import java.util.Objects;
2627

27-
// TODO(pongad): Serializable ?
28-
public class QueryResult implements Page<FieldValueList> {
28+
public class QueryResult implements Page<FieldValueList>, Serializable {
2929

30-
private static final long serialVersionUID = -4831062717210349818L;
30+
private static final long serialVersionUID = -4831062717210349819L;
3131

3232
private final Schema schema;
3333
private final long totalRows;
3434
private final Page<FieldValueList> pageNoSchema;
35-
private final Function<FieldValueList, FieldValueList> addSchemaFunc;
35+
private final transient Function<FieldValueList, FieldValueList> addSchemaFunc;
3636

3737
QueryResult(final Schema schema, long totalRows, Page<FieldValueList> pageNoSchema) {
3838
// TODO(pongad): read totalRows directly from listTableData.

google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1132,14 +1132,6 @@ public void testQueryRequestCompleted() throws InterruptedException {
11321132
QueryResultsOption pageSizeOption = QueryResultsOption.pageSize(42L);
11331133
optionMap.put(pageSizeOption.getRpcOption(), pageSizeOption.getValue());
11341134

1135-
EasyMock.expect(
1136-
bigqueryRpcMock.getQueryResults(
1137-
PROJECT, JOB, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
1138-
.andReturn(responsePb);
1139-
EasyMock.expect(
1140-
bigqueryRpcMock.getJob(
1141-
PROJECT, JOB, Collections.<BigQueryRpc.Option, Object>emptyMap()))
1142-
.andReturn(jobResponsePb);
11431135
EasyMock.expect(
11441136
bigqueryRpcMock.getQueryResults(
11451137
PROJECT, JOB, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
@@ -1151,10 +1143,8 @@ public void testQueryRequestCompleted() throws InterruptedException {
11511143

11521144
EasyMock.replay(bigqueryRpcMock);
11531145
bigquery = options.getService();
1154-
QueryResult result =
1155-
bigquery
1156-
.query(QUERY_JOB_CONFIGURATION_FOR_QUERY, queryJob)
1157-
.getQueryResults(QueryResultsOption.pageSize(42L));
1146+
// TODO(pongad): pagesize = 42
1147+
QueryResult result = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_QUERY, queryJob);
11581148
assertThat(result.getSchema()).isEqualTo(TABLE_SCHEMA);
11591149
assertThat(result.getTotalRows()).isEqualTo(1);
11601150
for (FieldValueList row : result.getValues()) {
@@ -1211,25 +1201,15 @@ public void testQueryRequestCompletedOnSecondAttempt() throws InterruptedExcepti
12111201
bigqueryRpcMock.getQueryResults(
12121202
PROJECT, JOB, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
12131203
.andReturn(responsePb2);
1214-
EasyMock.expect(
1215-
bigqueryRpcMock.getJob(
1216-
PROJECT, JOB, Collections.<BigQueryRpc.Option, Object>emptyMap()))
1217-
.andReturn(jobResponsePb2);
1218-
EasyMock.expect(
1219-
bigqueryRpcMock.getQueryResults(
1220-
PROJECT, JOB, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
1221-
.andReturn(responsePb2);
12221204
EasyMock.expect(
12231205
bigqueryRpcMock.listTableData(
12241206
PROJECT, DATASET, TABLE, Collections.<BigQueryRpc.Option, Object>emptyMap()))
12251207
.andReturn(Tuple.<String, Iterable<TableRow>>of("", ImmutableList.of(TABLE_ROW)));
12261208

12271209
EasyMock.replay(bigqueryRpcMock);
12281210
bigquery = options.getService();
1229-
QueryResult result =
1230-
bigquery
1231-
.query(QUERY_JOB_CONFIGURATION_FOR_QUERY, queryJob)
1232-
.getQueryResults(QueryResultsOption.pageSize(42L));
1211+
// TODO(pongad): pagesize = 42
1212+
QueryResult result = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_QUERY, queryJob);
12331213
assertThat(result.getSchema()).isEqualTo(TABLE_SCHEMA);
12341214
assertThat(result.getTotalRows()).isEqualTo(1);
12351215
for (FieldValueList row : result.getValues()) {

google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/JobTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.google.cloud.RetryOption;
3636
import com.google.cloud.bigquery.JobStatistics.CopyStatistics;
3737
import com.google.cloud.bigquery.JobStatistics.QueryStatistics;
38+
import com.google.common.collect.ImmutableList;
3839
import java.util.Collections;
3940
import org.junit.After;
4041
import org.junit.Rule;
@@ -248,7 +249,7 @@ public void testWaitForAndGetQueryResults() throws InterruptedException {
248249
initializeExpectedJob(2, jobInfo);
249250
JobStatus status = createStrictMock(JobStatus.class);
250251
expect(bigquery.getOptions()).andReturn(mockOptions);
251-
expect(mockOptions.getClock()).andReturn(CurrentMillisClock.getDefaultClock());
252+
expect(mockOptions.getClock()).andReturn(CurrentMillisClock.getDefaultClock()).times(2);
252253
Job completedJob = expectedJob.toBuilder().setStatus(status).build();
253254
// TODO(pongad): remove when https://github.com/googleapis/gax-java/pull/431/ lands.
254255
Page<FieldValueList> emptyPage =
@@ -284,6 +285,7 @@ public Iterable<FieldValueList> getValues() {
284285
.setCompleted(true)
285286
.setTotalRows(0)
286287
.setSchema(Schema.of())
288+
.setErrors(ImmutableList.<BigQueryError>of())
287289
.build();
288290

289291
expect(bigquery.getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS)).andReturn(completedQuery);

0 commit comments

Comments
 (0)