Skip to content

Commit f5e3b8e

Browse files
dhalperidavorbonaci
authored andcommitted
BigQueryTableRowIterator: make getF/getV work correctly
BigQuery's recommended API is that users should use the getF and getV commands to read data from TableRows, but we have been breaking that API and supporting a Map<String, Object> interface instead. Switch to make the code support both interfaces, with the eventual goal of deprecating the Map<String, Object> version. Includes a little bit of cleanup to internal function names and arguments. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=105022905
1 parent d6d067e commit f5e3b8e

4 files changed

Lines changed: 259 additions & 63 deletions

File tree

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/BigQueryReader.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -102,24 +102,25 @@ public String getQuery() {
102102
@Override
103103
public ReaderIterator<WindowedValue<TableRow>> iterator() throws IOException {
104104
if (tableRef != null) {
105-
return new BigQueryReaderIterator(bigQueryClient, tableRef);
105+
return new BigQueryReaderIterator(tableRef, bigQueryClient);
106106
} else {
107-
return new BigQueryReaderIterator(bigQueryClient, query, projectId);
107+
return new BigQueryReaderIterator(query, projectId, bigQueryClient);
108108
}
109109
}
110110

111111
/**
112112
* A ReaderIterator that yields TableRow objects for each row of a BigQuery table.
113113
*/
114-
class BigQueryReaderIterator extends AbstractBoundedReaderIterator<WindowedValue<TableRow>> {
114+
private static class BigQueryReaderIterator
115+
extends AbstractBoundedReaderIterator<WindowedValue<TableRow>> {
115116
private BigQueryTableRowIterator rowIterator;
116117

117-
public BigQueryReaderIterator(Bigquery bigQueryClient, TableReference tableRef) {
118-
rowIterator = BigQueryTableRowIterator.of(bigQueryClient, tableRef);
118+
public BigQueryReaderIterator(TableReference tableRef, Bigquery bigQueryClient) {
119+
rowIterator = BigQueryTableRowIterator.fromTable(tableRef, bigQueryClient);
119120
}
120121

121-
public BigQueryReaderIterator(Bigquery bigQueryClient, String query, String projectId) {
122-
rowIterator = BigQueryTableRowIterator.of(bigQueryClient, query, projectId);
122+
public BigQueryReaderIterator(String query, String projectId, Bigquery bigQueryClient) {
123+
rowIterator = BigQueryTableRowIterator.fromQuery(query, projectId, bigQueryClient);
123124
}
124125

125126
@Override

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java

Lines changed: 102 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.google.cloud.dataflow.sdk.util;
1818

1919
import static com.google.common.base.Preconditions.checkNotNull;
20+
import static com.google.common.base.Preconditions.checkState;
2021

2122
import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
2223
import com.google.api.client.util.BackOff;
@@ -32,12 +33,13 @@
3233
import com.google.api.services.bigquery.model.JobConfigurationQuery;
3334
import com.google.api.services.bigquery.model.JobReference;
3435
import com.google.api.services.bigquery.model.Table;
36+
import com.google.api.services.bigquery.model.TableCell;
3537
import com.google.api.services.bigquery.model.TableDataList;
3638
import com.google.api.services.bigquery.model.TableFieldSchema;
3739
import com.google.api.services.bigquery.model.TableReference;
3840
import com.google.api.services.bigquery.model.TableRow;
3941
import com.google.api.services.bigquery.model.TableSchema;
40-
import com.google.common.base.Preconditions;
42+
import com.google.common.collect.ImmutableList;
4143

4244
import org.joda.time.Duration;
4345
import org.joda.time.format.DateTimeFormat;
@@ -47,7 +49,6 @@
4749

4850
import java.io.Closeable;
4951
import java.io.IOException;
50-
import java.util.ArrayList;
5152
import java.util.Collections;
5253
import java.util.Iterator;
5354
import java.util.List;
@@ -56,16 +57,18 @@
5657
import java.util.Objects;
5758
import java.util.Random;
5859

60+
import javax.annotation.Nullable;
61+
5962
/**
6063
* Iterates over all rows in a table.
6164
*/
6265
public class BigQueryTableRowIterator implements Iterator<TableRow>, Closeable {
6366
private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableRowIterator.class);
6467

68+
@Nullable private TableReference ref;
69+
@Nullable private final String projectId;
70+
@Nullable private TableSchema schema;
6571
private final Bigquery client;
66-
private TableReference ref;
67-
private final String projectId;
68-
private TableSchema schema;
6972
private String pageToken;
7073
private Iterator<TableRow> rowIterator;
7174
// Set true when the final page is seen from the service.
@@ -87,28 +90,33 @@ public class BigQueryTableRowIterator implements Iterator<TableRow>, Closeable {
8790
private String temporaryTableId = null;
8891

8992
private BigQueryTableRowIterator(
90-
Bigquery client, TableReference ref, String query, String projectId) {
91-
this.client = checkNotNull(client, "client");
93+
@Nullable TableReference ref, @Nullable String query, @Nullable String projectId,
94+
Bigquery client) {
9295
this.ref = ref;
9396
this.query = query;
9497
this.projectId = projectId;
98+
this.client = checkNotNull(client, "client");
9599
}
96100

97101
/**
98-
* Constructs a {@code BigQueryTableRowIterator} that uses the specified client to read from
99-
* the specified table.
102+
* Constructs a {@code BigQueryTableRowIterator} that reads from the specified table.
100103
*/
101-
public static BigQueryTableRowIterator of(Bigquery client, TableReference ref) {
104+
public static BigQueryTableRowIterator fromTable(TableReference ref, Bigquery client) {
102105
checkNotNull(ref, "ref");
103-
return new BigQueryTableRowIterator(client, ref, null, ref.getProjectId());
106+
checkNotNull(client, "client");
107+
return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client);
104108
}
105109

106110
/**
107-
* Constructs a {@code BigQueryTableRowIterator} that uses the specified client to read from
108-
* the results of executing the specified client in the specified project.
111+
* Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the
112+
* specified query in the specified project.
109113
*/
110-
public static BigQueryTableRowIterator of(Bigquery client, String query, String projectId) {
111-
return new BigQueryTableRowIterator(client, null, query, projectId);
114+
public static BigQueryTableRowIterator fromQuery(
115+
String query, String projectId, Bigquery client) {
116+
checkNotNull(query, "query");
117+
checkNotNull(projectId, "projectId");
118+
checkNotNull(client, "client");
119+
return new BigQueryTableRowIterator(null, query, projectId, client);
112120
}
113121

114122
@Override
@@ -125,20 +133,19 @@ public boolean hasNext() {
125133
}
126134

127135
/**
128-
* Adjusts a field returned from the API to
129-
* match the type that will be seen when run on the
130-
* backend service. The end result is:
136+
* Adjusts a field returned from the BigQuery API to match the type that will be seen when
137+
* run on the backend service. The end result is:
131138
*
132-
* <p><ul>
133-
* <li> Nulls are {@code null}.
134-
* <li> Repeated fields are lists.
135-
* <li> Record columns are {@link TableRow}s.
136-
* <li> {@code BOOLEAN} columns are JSON booleans, hence Java {@link Boolean}s.
137-
* <li> {@code FLOAT} columns are JSON floats, hence Java {@link Double}s.
138-
* <li> {@code TIMESTAMP} columns are {@link String}s that are of the format
139-
* {yyyy-MM-dd HH:mm:ss.SSS UTC}.
140-
* <li> Every other atomic type is a {@link String}.
141-
* </ul></p>
139+
* <ul>
140+
* <li>Nulls are {@code null}.
141+
* <li>Repeated fields are {@code List} of objects.
142+
* <li>Record columns are {@link TableRow} objects.
143+
* <li>{@code BOOLEAN} columns are JSON booleans, hence Java {@code Boolean} objects.
144+
* <li>{@code FLOAT} columns are JSON floats, hence Java {@code Double} objects.
145+
* <li>{@code TIMESTAMP} columns are {@code String} objects that are of the format
146+
* {@code yyyy-MM-dd HH:mm:ss.SSS UTC}.
147+
* <li>Every other atomic type is a {@code String}.
148+
* </ul>
142149
*
143150
* <p>Note that currently integers are encoded as strings to match
144151
* the behavior of the backend service.
@@ -155,12 +162,12 @@ private Object getTypedCellValue(TableFieldSchema fieldSchema, Object v) {
155162
if (Objects.equals(fieldSchema.getMode(), "REPEATED")) {
156163
TableFieldSchema elementSchema = fieldSchema.clone().setMode("REQUIRED");
157164
@SuppressWarnings("unchecked")
158-
List<Map<String, Object>> rawValues = (List<Map<String, Object>>) v;
159-
List<Object> values = new ArrayList<Object>(rawValues.size());
160-
for (Map<String, Object> element : rawValues) {
165+
List<Map<String, Object>> rawCells = (List<Map<String, Object>>) v;
166+
ImmutableList.Builder<Object> values = ImmutableList.builder();
167+
for (Map<String, Object> element : rawCells) {
161168
values.add(getTypedCellValue(elementSchema, element.get("v")));
162169
}
163-
return values;
170+
return values.build();
164171
}
165172

166173
if (fieldSchema.getType().equals("RECORD")) {
@@ -188,19 +195,74 @@ private Object getTypedCellValue(TableFieldSchema fieldSchema, Object v) {
188195
return v;
189196
}
190197

198+
/**
199+
* Converts a row returned from the BigQuery JSON API as a {@code Map<String, Object>} into a
200+
* Java {@link TableRow} with nested {@link TableCell TableCells}. The {@code Object} values in
201+
* the cells are converted to Java types according to the provided field schemas.
202+
*
203+
* <p>See {@link #getTypedCellValue(TableFieldSchema, Object)} for details on how BigQuery
204+
* types are mapped to Java types.
205+
*/
191206
private TableRow getTypedTableRow(List<TableFieldSchema> fields, Map<String, Object> rawRow) {
192-
@SuppressWarnings("unchecked")
193-
List<Map<String, Object>> cells = (List<Map<String, Object>>) rawRow.get("f");
194-
Preconditions.checkState(cells.size() == fields.size());
207+
// If rawRow is a TableRow, use it. If not, create a new one.
208+
TableRow row;
209+
if (rawRow instanceof TableRow) {
210+
// Since rawRow is a TableRow, we also know that rawRow.getF() returns a List<TableCell>.
211+
// We do not need to do any type conversion.
212+
row = (TableRow) rawRow;
213+
} else {
214+
row = new TableRow();
215+
216+
// Since rawRow is a Map<String, Object> we use Map.get("f") instead of TableRow.getF() to
217+
// get its cells. Similarly, when rawCell is a Map<String, Object> instead of a TableCell,
218+
// we will use Map.get("v") instead of TableCell.getV() get its value.
219+
@SuppressWarnings("unchecked")
220+
List<Map<String, Object>> rawCells = (List<Map<String, Object>>) rawRow.get("f");
221+
222+
ImmutableList.Builder<TableCell> builder = ImmutableList.builder();
223+
for (Map<String, Object> rawCell : rawCells) {
224+
// If rawCell is a TableCell, use it. If not, create a new one.
225+
if (rawCell instanceof TableCell) {
226+
builder.add((TableCell) rawCell);
227+
} else {
228+
builder.add(new TableCell().setV(rawCell.get("v")));
229+
}
230+
}
231+
row.setF(builder.build());
232+
}
195233

196-
Iterator<Map<String, Object>> cellIt = cells.iterator();
197-
Iterator<TableFieldSchema> fieldIt = fields.iterator();
234+
// From here, everything is a TableRow/TableCell, no need to interpret as Map<String,Object>.
235+
List<TableCell> cells = row.getF();
236+
checkState(cells.size() == fields.size(),
237+
"Expected that the row has the same number of cells %s as fields in the schema %s",
238+
cells.size(), fields.size());
198239

199-
TableRow row = new TableRow();
240+
// Loop through all the fields in the row, normalizing their types with the TableFieldSchema
241+
// and also storing the normalized values by field name in the Map<String, Object> that
242+
// underlies the TableRow.
243+
Iterator<TableCell> cellIt = cells.iterator();
244+
Iterator<TableFieldSchema> fieldIt = fields.iterator();
200245
while (cellIt.hasNext()) {
201-
Map<String, Object> cell = cellIt.next();
246+
TableCell cell = cellIt.next();
202247
TableFieldSchema fieldSchema = fieldIt.next();
203-
row.set(fieldSchema.getName(), getTypedCellValue(fieldSchema, cell.get("v")));
248+
249+
// Convert the object in this cell to the Java type corresponding to its type in the schema.
250+
Object convertedValue = getTypedCellValue(fieldSchema, cell.getV());
251+
cell.setV(convertedValue);
252+
253+
String fieldName = fieldSchema.getName();
254+
if (fieldName.equals("f")) {
255+
// This is a workaround for a crash when the schema has a field named "f". Specifically,
256+
// tableRow.set("f", value) is equivalent to tableRow.setF(value), and value must be a
257+
// List<TableCell> or a ClassCastException will be thrown. To avoid the crash, we simply
258+
// do not set the Map property named "f".
259+
//
260+
// The value for a field named "f" can instead be retrieved by calling tableRow.getF() and
261+
// to get the list of cells, and accessing the positional entry that corresponds to the
262+
// position of the "f" field in the TableSchema.
263+
continue;
264+
}
265+
row.set(fieldName, convertedValue);
204266
}
205267
return row;
206268
}

0 commit comments

Comments
 (0)