Skip to content

Commit 6faec8f

Browse files
authored
bigquery: simplify query types (#2653)
Updates #2591. Previous PR, #2642, broke Job.getQueryResults in two ways: 1. It wasn't possible to get total number of rows. 2. The returned iterator doesn't record schema, so it wasn't possible to call `row.get(String)`. This PR prepares the fix. Actually landing the fix will change tests, so it is punted to another PR to keep changes small. From previous meeting, it was our decision to remove - BigQuery.getQueryResults method - QueryResponse class However, the method is being used internally by the client, and interfaces only have public members. Removing them altogether entails a big surgery, so this PR instead mark them @internalapi.
1 parent 5fa8fa6 commit 6faec8f

12 files changed

Lines changed: 149 additions & 571 deletions

File tree

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static com.google.common.base.Preconditions.checkArgument;
2020

21+
import com.google.api.core.InternalApi;
2122
import com.google.api.gax.paging.Page;
2223
import com.google.cloud.FieldSelector;
2324
import com.google.cloud.FieldSelector.Helper;
@@ -433,8 +434,7 @@ public static QueryResultsOption startIndex(long startIndex) {
433434

434435
/**
435436
* Returns an option that sets how long to wait for the query to complete, in milliseconds,
436-
* before returning. Default is 10 seconds. If the timeout passes before the job completes,
437-
* {@link QueryResponse#jobCompleted()} will be {@code false}.
437+
* before returning. Default is 10 seconds.
438438
*/
439439
public static QueryResultsOption maxWaitTime(long maxWaitTime) {
440440
checkArgument(maxWaitTime >= 0);
@@ -1063,28 +1063,9 @@ Job query(QueryJobConfiguration configuration, JobId jobId, JobOption... options
10631063
/**
10641064
* Returns results of the query associated with the provided job.
10651065
*
1066-
* <p>Example of getting the results of query.
1067-
* <pre> {@code
1068-
* String query = "SELECT distinct(corpus) FROM `bigquery-public-data.samples.shakespeare`";
1069-
* QueryJobConfiguration queryConfig = QueryJobConfiguration.of(query);
1070-
* QueryResponse response = bigquery.query(queryConfig);
1071-
* // Wait for things to finish
1072-
* while (!response.jobCompleted()) {
1073-
* Thread.sleep(1000);
1074-
* response = bigquery.getQueryResults(response.getJobId());
1075-
* }
1076-
* if (response.hasErrors()) {
1077-
* // handle errors
1078-
* }
1079-
* QueryResult result = response.getResult();
1080-
* Iterator<FieldValueList> rowIterator = result.iterateAll();
1081-
* for (FieldValueList row : result.iterateAll()) {
1082-
* // do something with the data
1083-
* }
1084-
* }</pre>
1085-
*
1086-
* @throws BigQueryException upon failure
1066+
* <p>Users are encouraged to use {@link Job#getQueryResults(QueryResultsOption...)} instead.
10871067
*/
1068+
@InternalApi
10881069
QueryResponse getQueryResults(JobId jobId, QueryResultsOption... options);
10891070

10901071
/**

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryError.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@
2323
import java.io.Serializable;
2424
import java.util.Objects;
2525

26+
// TODO(pongad): Clean up this doc, depending on what we decide to do with query().
27+
2628
/**
2729
* Google Cloud BigQuery Error. Objects of this class represent errors encountered by the BigQuery
2830
* service while executing a request. A BigQuery Job that terminated with an error has a non-null
2931
* {@link JobStatus#getError()}. A job can also encounter errors during its execution that do not
3032
* cause the whole job to fail (see {@link JobStatus#getExecutionErrors()}). Similarly, queries and
3133
* insert all requests can cause BigQuery errors that do not mean the whole operation failed (see
32-
* {@link QueryResponse#getExecutionErrors()} and {@link InsertAllResponse#getInsertErrors()}).
34+
* {@link JobStatus#getExecutionErrors()} and {@link InsertAllResponse#getInsertErrors()}).
3335
* When a {@link BigQueryException} is thrown the BigQuery Error that caused it, if any, can be
3436
* accessed with {@link BigQueryException#getError()}.
3537
*/

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java

Lines changed: 6 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import com.google.common.annotations.VisibleForTesting;
3838
import com.google.common.base.Function;
3939
import com.google.common.collect.ImmutableList;
40-
import com.google.common.collect.ImmutableMap;
4140
import com.google.common.collect.Iterables;
4241
import com.google.common.collect.Lists;
4342
import com.google.common.collect.Maps;
@@ -129,28 +128,6 @@ public Page<FieldValueList> getNextPage() {
129128
}
130129
}
131130

132-
private static class QueryResultsPageFetcherImpl
133-
implements NextPageFetcher<FieldValueList>, QueryResult.QueryResultsPageFetcher {
134-
135-
private static final long serialVersionUID = -9198905840550459803L;
136-
private final Map<BigQueryRpc.Option, ?> requestOptions;
137-
private final BigQueryOptions serviceOptions;
138-
private final JobId job;
139-
140-
QueryResultsPageFetcherImpl(JobId job, BigQueryOptions serviceOptions, String cursor,
141-
Map<BigQueryRpc.Option, ?> optionMap) {
142-
this.requestOptions =
143-
PageImpl.nextRequestOptions(BigQueryRpc.Option.PAGE_TOKEN, cursor, optionMap);
144-
this.serviceOptions = serviceOptions;
145-
this.job = job;
146-
}
147-
148-
@Override
149-
public QueryResult getNextPage() {
150-
return getQueryResults(job, serviceOptions, requestOptions).getResult();
151-
}
152-
}
153-
154131
private final BigQueryRpc bigQueryRpc;
155132

156133
BigQueryImpl(BigQueryOptions options) {
@@ -612,49 +589,17 @@ public GetQueryResultsResponse call() {
612589
completeJobId.getProject(), completeJobId.getJob(), optionsMap);
613590
}
614591
}, serviceOptions.getRetrySettings(), EXCEPTION_HANDLER, serviceOptions.getClock());
615-
QueryResponse.Builder builder = QueryResponse.newBuilder();
616-
builder.setJobId(JobId.fromPb(results.getJobReference()));
617-
builder.setNumDmlAffectedRows(results.getNumDmlAffectedRows());
618-
builder.setEtag(results.getEtag());
619-
builder.setJobCompleted(results.getJobComplete());
620-
List<TableRow> rowsPb = results.getRows();
621-
if (results.getJobComplete()) {
622-
QueryResult.Builder resultBuilder =
623-
transformQueryResults(
624-
completeJobId,
625-
rowsPb,
626-
results.getSchema(),
627-
results.getPageToken(),
628-
serviceOptions,
629-
ImmutableMap.<BigQueryRpc.Option, Object>of());
630-
if (results.getTotalRows() != null) {
631-
resultBuilder.setTotalRows(results.getTotalRows().longValue());
632-
}
633-
builder.setResult(resultBuilder.build());
634-
}
635-
if (results.getErrors() != null) {
636-
builder.setExecutionErrors(
637-
Lists.transform(results.getErrors(), BigQueryError.FROM_PB_FUNCTION));
638-
}
639-
return builder.build();
592+
TableSchema schemaPb = results.getSchema();
593+
return QueryResponse.newBuilder()
594+
.setCompleted(results.getJobComplete())
595+
.setSchema(schemaPb == null ? null : Schema.fromPb(schemaPb))
596+
.setTotalRows(results.getTotalRows() == null ? 0 : results.getTotalRows().longValue())
597+
.build();
640598
} catch (RetryHelper.RetryHelperException e) {
641599
throw BigQueryException.translateAndThrow(e);
642600
}
643601
}
644602

645-
private static QueryResult.Builder transformQueryResults(JobId jobId, List<TableRow> rowsPb,
646-
TableSchema schemaPb, String cursor, BigQueryOptions serviceOptions,
647-
Map<BigQueryRpc.Option, ?> optionsMap) {
648-
QueryResultsPageFetcherImpl nextPageFetcher =
649-
new QueryResultsPageFetcherImpl(jobId, serviceOptions, cursor, optionsMap);
650-
Schema schema = schemaPb != null ? Schema.fromPb(schemaPb) : null;
651-
return QueryResult.newBuilder()
652-
.setPageFetcher(nextPageFetcher)
653-
.setCursor(cursor)
654-
.setSchema(schema)
655-
.setResults(transformTableData(rowsPb, schema));
656-
}
657-
658603
@Override
659604
public TableDataWriteChannel writer(WriteChannelConfiguration writeChannelConfiguration) {
660605
return new TableDataWriteChannel(getOptions(),

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/FieldValueList.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ static FieldValueList of(List<FieldValue> row, Field... schema) {
9292
return of(row, schema.length > 0 ? FieldList.of(schema) : null);
9393
}
9494

95+
FieldValueList withSchema(FieldList schema) {
96+
return new FieldValueList(this.row, schema);
97+
}
98+
9599
static FieldValueList fromPb(List<?> rowPb, FieldList schema) {
96100
List<FieldValue> row = new ArrayList<>(rowPb.size());
97101
if (schema != null) {

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,8 @@ public Page<FieldValueList> getQueryResults(QueryResultsOption... options) {
278278

279279
TableId table = ((QueryJobConfiguration) getConfiguration()).getDestinationTable();
280280
// TODO(pongad): return QueryResult so we can inject schema.
281+
// QueryResponse response = bigquery.getQueryResults(getJobId()); // should return immediately
282+
// return new QueryResult(response.getSchema(), response.getTotalRows(), bigquery.listTableData(table));
281283
return bigquery.listTableData(table);
282284
}
283285

@@ -301,7 +303,7 @@ public QueryResponse call() {
301303
new BasicResultRetryAlgorithm<QueryResponse>() {
302304
@Override
303305
public boolean shouldRetry(Throwable prevThrowable, QueryResponse prevResponse) {
304-
return prevResponse != null && !prevResponse.jobCompleted();
306+
return prevResponse != null && !prevResponse.getCompleted();
305307
}
306308
},
307309
options.getClock());

0 commit comments

Comments
 (0)