1717package com .google .cloud .dataflow .sdk .util ;
1818
1919import static com .google .common .base .Preconditions .checkNotNull ;
20+ import static com .google .common .base .Preconditions .checkState ;
2021
2122import com .google .api .client .googleapis .services .AbstractGoogleClientRequest ;
2223import com .google .api .client .util .BackOff ;
3233import com .google .api .services .bigquery .model .JobConfigurationQuery ;
3334import com .google .api .services .bigquery .model .JobReference ;
3435import com .google .api .services .bigquery .model .Table ;
36+ import com .google .api .services .bigquery .model .TableCell ;
3537import com .google .api .services .bigquery .model .TableDataList ;
3638import com .google .api .services .bigquery .model .TableFieldSchema ;
3739import com .google .api .services .bigquery .model .TableReference ;
3840import com .google .api .services .bigquery .model .TableRow ;
3941import com .google .api .services .bigquery .model .TableSchema ;
40- import com .google .common .base . Preconditions ;
42+ import com .google .common .collect . ImmutableList ;
4143
4244import org .joda .time .Duration ;
4345import org .joda .time .format .DateTimeFormat ;
4749
4850import java .io .Closeable ;
4951import java .io .IOException ;
50- import java .util .ArrayList ;
5152import java .util .Collections ;
5253import java .util .Iterator ;
5354import java .util .List ;
5657import java .util .Objects ;
5758import java .util .Random ;
5859
60+ import javax .annotation .Nullable ;
61+
5962/**
6063 * Iterates over all rows in a table.
6164 */
6265public class BigQueryTableRowIterator implements Iterator <TableRow >, Closeable {
6366 private static final Logger LOG = LoggerFactory .getLogger (BigQueryTableRowIterator .class );
6467
68+ @ Nullable private TableReference ref ;
69+ @ Nullable private final String projectId ;
70+ @ Nullable private TableSchema schema ;
6571 private final Bigquery client ;
66- private TableReference ref ;
67- private final String projectId ;
68- private TableSchema schema ;
6972 private String pageToken ;
7073 private Iterator <TableRow > rowIterator ;
7174 // Set true when the final page is seen from the service.
@@ -87,28 +90,33 @@ public class BigQueryTableRowIterator implements Iterator<TableRow>, Closeable {
8790 private String temporaryTableId = null ;
8891
8992 private BigQueryTableRowIterator (
90- Bigquery client , TableReference ref , String query , String projectId ) {
91- this . client = checkNotNull ( client , " client" );
93+ @ Nullable TableReference ref , @ Nullable String query , @ Nullable String projectId ,
94+ Bigquery client ) {
9295 this .ref = ref ;
9396 this .query = query ;
9497 this .projectId = projectId ;
98+ this .client = checkNotNull (client , "client" );
9599 }
96100
97101 /**
98- * Constructs a {@code BigQueryTableRowIterator} that uses the specified client to read from
99- * the specified table.
102+ * Constructs a {@code BigQueryTableRowIterator} that reads from the specified table.
100103 */
101- public static BigQueryTableRowIterator of ( Bigquery client , TableReference ref ) {
104+ public static BigQueryTableRowIterator fromTable ( TableReference ref , Bigquery client ) {
102105 checkNotNull (ref , "ref" );
103- return new BigQueryTableRowIterator (client , ref , null , ref .getProjectId ());
106+ checkNotNull (client , "client" );
107+ return new BigQueryTableRowIterator (ref , null , ref .getProjectId (), client );
104108 }
105109
106110 /**
107- * Constructs a {@code BigQueryTableRowIterator} that uses the specified client to read from
108- * the results of executing the specified client in the specified project.
111+ * Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the
112+ * specified query in the specified project.
109113 */
110- public static BigQueryTableRowIterator of (Bigquery client , String query , String projectId ) {
111- return new BigQueryTableRowIterator (client , null , query , projectId );
114+ public static BigQueryTableRowIterator fromQuery (
115+ String query , String projectId , Bigquery client ) {
116+ checkNotNull (query , "query" );
117+ checkNotNull (projectId , "projectId" );
118+ checkNotNull (client , "client" );
119+ return new BigQueryTableRowIterator (null , query , projectId , client );
112120 }
113121
114122 @ Override
@@ -125,20 +133,19 @@ public boolean hasNext() {
125133 }
126134
127135 /**
128- * Adjusts a field returned from the API to
129- * match the type that will be seen when run on the
130- * backend service. The end result is:
136+ * Adjusts a field returned from the BigQuery API to match the type that will be seen when
137+ * run on the backend service. The end result is:
131138 *
132- * <p>< ul>
133- * <li> Nulls are {@code null}.
134- * <li> Repeated fields are lists .
135- * <li> Record columns are {@link TableRow}s .
136- * <li> {@code BOOLEAN} columns are JSON booleans, hence Java {@link Boolean}s .
137- * <li> {@code FLOAT} columns are JSON floats, hence Java {@link Double}s .
138- * <li> {@code TIMESTAMP} columns are {@link String}s that are of the format
139- * { yyyy-MM-dd HH:mm:ss.SSS UTC}.
140- * <li> Every other atomic type is a {@link String}.
141- * </ul></p>
139+ * <ul>
140+ * <li>Nulls are {@code null}.
141+ * <li>Repeated fields are {@code List} of objects .
142+ * <li>Record columns are {@link TableRow} objects .
143+ * <li>{@code BOOLEAN} columns are JSON booleans, hence Java {@code Boolean} objects .
144+ * <li>{@code FLOAT} columns are JSON floats, hence Java {@code Double} objects .
145+ * <li>{@code TIMESTAMP} columns are {@code String} objects that are of the format
146+ * {@code yyyy-MM-dd HH:mm:ss.SSS UTC}.
147+ * <li>Every other atomic type is a {@code String}.
148+ * </ul>
142149 *
143150 * <p>Note that currently integers are encoded as strings to match
144151 * the behavior of the backend service.
@@ -155,12 +162,12 @@ private Object getTypedCellValue(TableFieldSchema fieldSchema, Object v) {
155162 if (Objects .equals (fieldSchema .getMode (), "REPEATED" )) {
156163 TableFieldSchema elementSchema = fieldSchema .clone ().setMode ("REQUIRED" );
157164 @ SuppressWarnings ("unchecked" )
158- List <Map <String , Object >> rawValues = (List <Map <String , Object >>) v ;
159- List <Object > values = new ArrayList < Object >( rawValues . size () );
160- for (Map <String , Object > element : rawValues ) {
165+ List <Map <String , Object >> rawCells = (List <Map <String , Object >>) v ;
166+ ImmutableList . Builder <Object > values = ImmutableList . builder ( );
167+ for (Map <String , Object > element : rawCells ) {
161168 values .add (getTypedCellValue (elementSchema , element .get ("v" )));
162169 }
163- return values ;
170+ return values . build () ;
164171 }
165172
166173 if (fieldSchema .getType ().equals ("RECORD" )) {
@@ -188,19 +195,74 @@ private Object getTypedCellValue(TableFieldSchema fieldSchema, Object v) {
188195 return v ;
189196 }
190197
198+ /**
199+ * Converts a row returned from the BigQuery JSON API as a {@code Map<String, Object>} into a
200+ * Java {@link TableRow} with nested {@link TableCell TableCells}. The {@code Object} values in
201+ * the cells are converted to Java types according to the provided field schemas.
202+ *
203+ * <p>See {@link #getTypedCellValue(TableFieldSchema, Object)} for details on how BigQuery
204+ * types are mapped to Java types.
205+ */
191206 private TableRow getTypedTableRow (List <TableFieldSchema > fields , Map <String , Object > rawRow ) {
192- @ SuppressWarnings ("unchecked" )
193- List <Map <String , Object >> cells = (List <Map <String , Object >>) rawRow .get ("f" );
194- Preconditions .checkState (cells .size () == fields .size ());
207+ // If rawRow is a TableRow, use it. If not, create a new one.
208+ TableRow row ;
209+ if (rawRow instanceof TableRow ) {
210+ // Since rawRow is a TableRow, we also know that rawRow.getF() returns a List<TableCell>.
211+ // We do not need to do any type conversion.
212+ row = (TableRow ) rawRow ;
213+ } else {
214+ row = new TableRow ();
215+
216+ // Since rawRow is a Map<String, Object> we use Map.get("f") instead of TableRow.getF() to
217+ // get its cells. Similarly, when rawCell is a Map<String, Object> instead of a TableCell,
218+ // we will use Map.get("v") instead of TableCell.getV() get its value.
219+ @ SuppressWarnings ("unchecked" )
220+ List <Map <String , Object >> rawCells = (List <Map <String , Object >>) rawRow .get ("f" );
221+
222+ ImmutableList .Builder <TableCell > builder = ImmutableList .builder ();
223+ for (Map <String , Object > rawCell : rawCells ) {
224+ // If rawCell is a TableCell, use it. If not, create a new one.
225+ if (rawCell instanceof TableCell ) {
226+ builder .add ((TableCell ) rawCell );
227+ } else {
228+ builder .add (new TableCell ().setV (rawCell .get ("v" )));
229+ }
230+ }
231+ row .setF (builder .build ());
232+ }
195233
196- Iterator <Map <String , Object >> cellIt = cells .iterator ();
197- Iterator <TableFieldSchema > fieldIt = fields .iterator ();
234+ // From here, everything is a TableRow/TableCell, no need to interpret as Map<String,Object>.
235+ List <TableCell > cells = row .getF ();
236+ checkState (cells .size () == fields .size (),
237+ "Expected that the row has the same number of cells %s as fields in the schema %s" ,
238+ cells .size (), fields .size ());
198239
199- TableRow row = new TableRow ();
240+ // Loop through all the fields in the row, normalizing their types with the TableFieldSchema
241+ // and also storing the normalized values by field name in the Map<String, Object> that
242+ // underlies the TableRow.
243+ Iterator <TableCell > cellIt = cells .iterator ();
244+ Iterator <TableFieldSchema > fieldIt = fields .iterator ();
200245 while (cellIt .hasNext ()) {
201- Map < String , Object > cell = cellIt .next ();
246+ TableCell cell = cellIt .next ();
202247 TableFieldSchema fieldSchema = fieldIt .next ();
203- row .set (fieldSchema .getName (), getTypedCellValue (fieldSchema , cell .get ("v" )));
248+
249+ // Convert the object in this cell to the Java type corresponding to its type in the schema.
250+ Object convertedValue = getTypedCellValue (fieldSchema , cell .getV ());
251+ cell .setV (convertedValue );
252+
253+ String fieldName = fieldSchema .getName ();
254+ if (fieldName .equals ("f" )) {
255+ // This is a workaround for a crash when the schema has a field named "f". Specifically,
256+ // tableRow.set("f", value) is equivalent to tableRow.setF(value), and value must be a
257+ // List<TableCell> or a ClassCastException will be thrown. To avoid the crash, we simply
258+ // do not set the Map property named "f".
259+ //
260+ // The value for a field named "f" can instead be retrieved by calling tableRow.getF() and
261+ // to get the list of cells, and accessing the positional entry that corresponds to the
262+ // position of the "f" field in the TableSchema.
263+ continue ;
264+ }
265+ row .set (fieldName , convertedValue );
204266 }
205267 return row ;
206268 }
0 commit comments