Skip to content

Commit 6a11a72

Browse files
dhalperilukecwik
authored andcommitted
Fix BigQueryIO.Read to work the same in Direct and Dataflow runners
This is a partial revert of commits f5e3b8e and 18c82ad. When running a batch Dataflow job on Cloud Dataflow service, the data are produced by running a BigQuery export job and then reading all the files in parallel. When run in the DirectPipelineRunner, BigQuery's JSON API is used directly. These data come back in different formats. To compensate, we use BigQueryTableRowIterator to normalize the behavior in DirectPipelineRunner to the behavior seen when running on the service. (We cannot change this decision without a major breaking change.) This patch fixes some discrepancies in the way that BigQueryTableRowIterator is implemented. Specifically, *) In commit 18c82ad (response to issue apache#20) we updated the format of timestamps to be printed as strings. However, we did not correctly match the behavior of BigQuery export. Here is a sample set of times from the export job vs the JSON API. 2016-01-06 06:38:00 UTC 1.45206228E9 2016-01-06 06:38:11 UTC 1.452062291E9 2016-01-06 06:38:11.1 UTC 1.4520622911E9 2016-01-06 06:38:11.12 UTC 1.45206229112E9 2016-01-06 06:38:11.123 UTC 1.452062291123E9 * 2016-01-06 06:38:11.1234 UTC 1.4520622911234E9 2016-01-06 06:38:11.12345 UTC 1.45206229112345E9 2016-01-06 06:38:11.123456 UTC 1.452062291123456E9 Before, only the * test would have passed. *) In commit f5e3b8e we updated TableRow iterator to preserve the usual TableRow field `f` corresponding to getF(), which returns a list of fields in Schema order. This was my mistaken attempt to better support users who have prior experience with BigQuery's API and expect to use getF()/getV(). However, there were two issues: 1. this change did not affect the behavior in the DataflowPipelineRunner. 2. this was actually a breaking backwards-incompatible change, because common downstream DoFns may iterate over the keys of the TableRow, and it added the field "f". So we should not propagate the change to DataflowPipelineRunner, but instead we should revert the change to BigQueryTableRowIterator. (Note this is also a slightly-backwards-incompatible change, but it's reverting to old behavior and users are more likely to be depending on DataflowPipelineRunner rather than DirectPipelineRunner.) Fix both these issues and add tests. This is still ugly for now. The long-term fix here is to support a parser that lets users skip TableRow altogether and goes straight to POJOs of their choosing (See apache#41). That would also eliminate our performance and typing issues using TableRow as an inner type in pipelines (See e.g. http://stackoverflow.com/questions/33622227/dataflow-mixing-integer-long-types). ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=111746236
1 parent a918a31 commit 6a11a72

3 files changed

Lines changed: 130 additions & 98 deletions

File tree

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java

Lines changed: 75 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616

1717
package com.google.cloud.dataflow.sdk.util;
1818

19+
import static com.google.common.base.Preconditions.checkArgument;
1920
import static com.google.common.base.Preconditions.checkNotNull;
2021
import static com.google.common.base.Preconditions.checkState;
2122

2223
import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
2324
import com.google.api.client.util.BackOff;
2425
import com.google.api.client.util.BackOffUtils;
26+
import com.google.api.client.util.ClassInfo;
2527
import com.google.api.client.util.Data;
2628
import com.google.api.client.util.Sleeper;
2729
import com.google.api.services.bigquery.Bigquery;
@@ -49,6 +51,7 @@
4951

5052
import java.io.Closeable;
5153
import java.io.IOException;
54+
import java.util.Collection;
5255
import java.util.Collections;
5356
import java.util.Iterator;
5457
import java.util.List;
@@ -133,8 +136,44 @@ public boolean hasNext() {
133136
}
134137

135138
/**
136-
* Adjusts a field returned from the BigQuery API to match the type that will be seen when
137-
* run on the backend service. The end result is:
139+
* Formats BigQuery seconds-since-epoch into String matching JSON export. Thread-safe and
140+
* immutable.
141+
*/
142+
private static final DateTimeFormatter DATE_AND_SECONDS_FORMATER =
143+
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC();
144+
private static String formatTimestamp(String timestamp) {
145+
// timestamp is in "seconds since epoch" format, with scientific notation.
146+
// e.g., "1.45206229112345E9" to mean "2016-01-06 06:38:11.123456 UTC".
147+
// Separate into seconds and microseconds.
148+
double timestampDoubleMicros = Double.parseDouble(timestamp) * 1000000;
149+
long timestampMicros = (long) timestampDoubleMicros;
150+
long seconds = timestampMicros / 1000000;
151+
int micros = (int) (timestampMicros % 1000000);
152+
String dayAndTime = DATE_AND_SECONDS_FORMATER.print(seconds * 1000);
153+
154+
// No sub-second component.
155+
if (micros == 0) {
156+
return String.format("%s UTC", dayAndTime);
157+
}
158+
159+
// Sub-second component.
160+
int digits = 6;
161+
int subsecond = micros;
162+
while (subsecond % 10 == 0) {
163+
digits--;
164+
subsecond /= 10;
165+
}
166+
String formatString = String.format("%%0%dd", digits);
167+
String fractionalSeconds = String.format(formatString, subsecond);
168+
return String.format("%s.%s UTC", dayAndTime, fractionalSeconds);
169+
}
170+
171+
/**
172+
* Adjusts a field returned from the BigQuery API to match what we will receive when running
173+
* BigQuery's export-to-GCS and parallel read, which is the efficient parallel implementation
174+
* used for batch jobs executed on the Cloud Dataflow service.
175+
*
176+
* <p>The following is the relationship between BigQuery schema and Java types:
138177
*
139178
* <ul>
140179
* <li>Nulls are {@code null}.
@@ -143,18 +182,17 @@ public boolean hasNext() {
143182
* <li>{@code BOOLEAN} columns are JSON booleans, hence Java {@code Boolean} objects.
144183
* <li>{@code FLOAT} columns are JSON floats, hence Java {@code Double} objects.
145184
* <li>{@code TIMESTAMP} columns are {@code String} objects that are of the format
146-
* {@code yyyy-MM-dd HH:mm:ss.SSS UTC}.
185+
* {@code yyyy-MM-dd HH:mm:ss[.SSSSSS] UTC}, where the {@code .SSSSSS} has no trailing
186+
* zeros and can be 1 to 6 digits long.
147187
* <li>Every other atomic type is a {@code String}.
148188
* </ul>
149189
*
150-
* <p>Note that currently integers are encoded as strings to match
151-
* the behavior of the backend service.
190+
* <p>Note that integers are encoded as strings to match BigQuery's exported JSON format.
191+
*
192+
* <p>Finally, values are stored in the {@link TableRow} as {"field name": value} pairs
193+
* and are not accessible through the {@link TableRow#getF} function.
152194
*/
153-
private Object getTypedCellValue(TableFieldSchema fieldSchema, Object v) {
154-
// In the input from the BQ API, atomic types all come in as
155-
// strings, while on the Dataflow service they have more precise
156-
// types.
157-
195+
@Nullable private Object getTypedCellValue(TableFieldSchema fieldSchema, Object v) {
158196
if (Data.isNull(v)) {
159197
return null;
160198
}
@@ -185,16 +223,22 @@ private Object getTypedCellValue(TableFieldSchema fieldSchema, Object v) {
185223
}
186224

187225
if (fieldSchema.getType().equals("TIMESTAMP")) {
188-
// Seconds to milliseconds
189-
long milliSecs = (new Double(Double.parseDouble((String) v) * 1000)).longValue();
190-
DateTimeFormatter formatter =
191-
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS").withZoneUTC();
192-
return formatter.print(milliSecs) + " UTC";
226+
return formatTimestamp((String) v);
193227
}
194228

195229
return v;
196230
}
197231

232+
/**
233+
* A list of the field names that cannot be used in BigQuery tables processed by Dataflow,
234+
* because they are reserved keywords in {@link TableRow}.
235+
*/
236+
// TODO: This limitation is unfortunate. We need to give users a way to use BigQueryIO that does
237+
// not indirect through our broken use of {@link TableRow}.
238+
// See discussion: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/41
239+
private static final Collection<String> RESERVED_FIELD_NAMES =
240+
ClassInfo.of(TableRow.class).getNames();
241+
198242
/**
199243
* Converts a row returned from the BigQuery JSON API as a {@code Map<String, Object>} into a
200244
* Java {@link TableRow} with nested {@link TableCell TableCells}. The {@code Object} values in
@@ -206,62 +250,46 @@ private Object getTypedCellValue(TableFieldSchema fieldSchema, Object v) {
206250
private TableRow getTypedTableRow(List<TableFieldSchema> fields, Map<String, Object> rawRow) {
207251
// If rawRow is a TableRow, use it. If not, create a new one.
208252
TableRow row;
253+
List<? extends Map<String, Object>> cells;
209254
if (rawRow instanceof TableRow) {
210-
// Since rawRow is a TableRow, we also know that rawRow.getF() returns a List<TableCell>.
211-
// We do not need to do any type conversion.
255+
// Since rawRow is a TableRow it already has TableCell objects in setF. We do not need to do
256+
// any type conversion, but extract the cells for cell-wise processing below.
212257
row = (TableRow) rawRow;
258+
cells = row.getF();
259+
// Clear the cells from the row, so that row.getF() will return null. This matches the
260+
// behavior of rows produced by the BigQuery export API used on the service.
261+
row.setF(null);
213262
} else {
214263
row = new TableRow();
215264

216265
// Since rawRow is a Map<String, Object> we use Map.get("f") instead of TableRow.getF() to
217266
// get its cells. Similarly, when rawCell is a Map<String, Object> instead of a TableCell,
218267
// we will use Map.get("v") instead of TableCell.getV() get its value.
219268
@SuppressWarnings("unchecked")
220-
List<Map<String, Object>> rawCells = (List<Map<String, Object>>) rawRow.get("f");
221-
222-
ImmutableList.Builder<TableCell> builder = ImmutableList.builder();
223-
for (Map<String, Object> rawCell : rawCells) {
224-
// If rawCell is a TableCell, use it. If not, create a new one.
225-
if (rawCell instanceof TableCell) {
226-
builder.add((TableCell) rawCell);
227-
} else {
228-
builder.add(new TableCell().setV(rawCell.get("v")));
229-
}
230-
}
231-
row.setF(builder.build());
269+
List<? extends Map<String, Object>> rawCells =
270+
(List<? extends Map<String, Object>>) rawRow.get("f");
271+
cells = rawCells;
232272
}
233273

234-
// From here, everything is a TableRow/TableCell, no need to interpret as Map<String,Object>.
235-
List<TableCell> cells = row.getF();
236274
checkState(cells.size() == fields.size(),
237275
"Expected that the row has the same number of cells %s as fields in the schema %s",
238276
cells.size(), fields.size());
239277

240278
// Loop through all the fields in the row, normalizing their types with the TableFieldSchema
241-
// and also storing the normalized values by field name in the Map<String, Object> that
279+
// and storing the normalized values by field name in the Map<String, Object> that
242280
// underlies the TableRow.
243-
Iterator<TableCell> cellIt = cells.iterator();
281+
Iterator<? extends Map<String, Object>> cellIt = cells.iterator();
244282
Iterator<TableFieldSchema> fieldIt = fields.iterator();
245283
while (cellIt.hasNext()) {
246-
TableCell cell = cellIt.next();
284+
Map<String, Object> cell = cellIt.next();
247285
TableFieldSchema fieldSchema = fieldIt.next();
248286

249287
// Convert the object in this cell to the Java type corresponding to its type in the schema.
250-
Object convertedValue = getTypedCellValue(fieldSchema, cell.getV());
251-
cell.setV(convertedValue);
288+
Object convertedValue = getTypedCellValue(fieldSchema, cell.get("v"));
252289

253290
String fieldName = fieldSchema.getName();
254-
if (fieldName.equals("f")) {
255-
// This is a workaround for a crash when the schema has a field named "f". Specifically,
256-
// tableRow.set("f", value) is equivalent to tableRow.setF(value), and value must be a
257-
// List<TableCell> or a ClassCastException will be thrown. To avoid the crash, we simply
258-
// do not set the Map property named "f".
259-
//
260-
// The value for a field named "f" can instead be retrieved by calling tableRow.getF() and
261-
// to get the list of cells, and accessing the positional entry that corresponds to the
262-
// position of the "f" field in the TableSchema.
263-
continue;
264-
}
291+
checkArgument(!RESERVED_FIELD_NAMES.contains(fieldName),
292+
"BigQueryIO does not support records with columns named %s", fieldName);
265293
row.set(fieldName, convertedValue);
266294
}
267295
return row;

sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/worker/BigQueryReaderTest.java

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818

1919
import static org.junit.Assert.assertEquals;
2020
import static org.junit.Assert.assertFalse;
21+
import static org.junit.Assert.assertNull;
2122
import static org.junit.Assert.assertTrue;
22-
2323
import static org.mockito.Matchers.contains;
2424
import static org.mockito.Matchers.endsWith;
2525
import static org.mockito.Matchers.eq;
@@ -35,16 +35,16 @@
3535
import com.google.api.client.testing.http.MockLowLevelHttpRequest;
3636
import com.google.api.client.testing.http.MockLowLevelHttpResponse;
3737
import com.google.api.services.bigquery.Bigquery;
38-
import com.google.api.services.bigquery.model.TableCell;
3938
import com.google.api.services.bigquery.model.TableReference;
4039
import com.google.api.services.bigquery.model.TableRow;
4140
import com.google.cloud.dataflow.sdk.util.Transport;
4241
import com.google.cloud.dataflow.sdk.util.WindowedValue;
4342
import com.google.cloud.dataflow.sdk.util.common.worker.Reader;
44-
import com.google.common.collect.ImmutableList;
4543
import com.google.common.collect.Lists;
4644

45+
import org.junit.Rule;
4746
import org.junit.Test;
47+
import org.junit.rules.ExpectedException;
4848
import org.junit.runner.RunWith;
4949
import org.junit.runners.JUnit4;
5050
import org.mockito.Mock;
@@ -63,6 +63,8 @@
6363
*/
6464
@RunWith(JUnit4.class)
6565
public class BigQueryReaderTest {
66+
@Rule public final ExpectedException thrown = ExpectedException.none();
67+
6668
private static final String PROJECT_ID = "project";
6769
private static final String DATASET = "dataset";
6870
private static final String TABLE = "table";
@@ -73,14 +75,6 @@ public class BigQueryReaderTest {
7375
private static final String GET_TABLE_REQUEST_PATH =
7476
String.format("projects/%s/datasets/%s/tables/%s", PROJECT_ID, DATASET, TABLE);
7577

76-
private static final List<TableCell> makeCellList(Object... fields) {
77-
ImmutableList.Builder<TableCell> cells = ImmutableList.builder();
78-
for (Object o : fields) {
79-
cells.add(new TableCell().setV(o));
80-
}
81-
return cells.build();
82-
}
83-
8478
// This is a real response (with some unused fields removed) for the table created from this
8579
// schema:
8680
// [
@@ -578,12 +572,12 @@ public void testReadQuery() throws Exception {
578572

579573
assertEquals("Arthur", row.get("name"));
580574
assertEquals("42", row.get("integer"));
581-
assertEquals(makeCellList("Arthur", "42"), row.getF());
575+
assertNull(row.getF());
582576

583577
row = iterator.next().getValue();
584578
assertEquals("Allison", row.get("name"));
585579
assertEquals("79", row.get("integer"));
586-
assertEquals(makeCellList("Allison", "79"), row.getF());
580+
assertNull(row.getF());
587581

588582
iterator.close();
589583

@@ -784,7 +778,7 @@ public void testReadTable() throws Exception {
784778
TableRow nested = (TableRow) row.get("record");
785779
assertEquals("43", nested.get("nestedInt"));
786780
assertEquals(4.14159, nested.get("nestedFloat"));
787-
assertEquals(makeCellList("43", 4.14159), nested.getF());
781+
assertNull(nested.getF());
788782

789783
assertEquals(Lists.newArrayList("42", "43", "79"), row.get("repeatedInt"));
790784
assertTrue(((List<?>) row.get("repeatedFloat")).isEmpty());
@@ -800,7 +794,7 @@ public void testReadTable() throws Exception {
800794
nested = (TableRow) row.get("record");
801795
assertEquals("80", nested.get("nestedInt"));
802796
assertEquals(3.71828, nested.get("nestedFloat"));
803-
assertEquals(makeCellList("80", 3.71828), nested.getF());
797+
assertNull(nested.getF());
804798

805799
assertTrue(((List<?>) row.get("repeatedInt")).isEmpty());
806800
assertEquals(Lists.newArrayList(3.14159, 2.71828), row.get("repeatedFloat"));
@@ -810,10 +804,10 @@ public void testReadTable() throws Exception {
810804
assertEquals(2, nestedRecords.size());
811805
assertEquals("hello", nestedRecords.get(0).get("string"));
812806
assertEquals(true, nestedRecords.get(0).get("bool"));
813-
assertEquals(makeCellList(true, "hello"), nestedRecords.get(0).getF());
807+
assertNull(nestedRecords.get(0).getF());
814808
assertEquals("world", nestedRecords.get(1).get("string"));
815809
assertEquals(false, nestedRecords.get(1).get("bool"));
816-
assertEquals(makeCellList(false, "world"), nestedRecords.get(1).getF());
810+
assertNull(nestedRecords.get(1).getF());
817811

818812
assertFalse(iterator.hasNext());
819813

@@ -909,12 +903,9 @@ public void testReadTableWithFieldF() throws Exception {
909903
Reader.ReaderIterator<WindowedValue<TableRow>> iterator = reader.iterator();
910904
assertTrue(iterator.hasNext());
911905

912-
TableRow row = iterator.next().getValue();
913-
assertEquals(makeCellList("5", "Arthur"), row.getF());
914-
assertEquals("Arthur", row.getF().get(1).getV());
906+
thrown.expect(IllegalArgumentException.class);
907+
thrown.expectMessage("BigQueryIO does not support records with columns named f");
915908

916-
row = iterator.next().getValue();
917-
assertEquals(makeCellList("42", "Allison"), row.getF());
918-
assertEquals("Allison", row.getF().get(1).getV());
909+
iterator.next().getValue();
919910
}
920911
}

0 commit comments

Comments
 (0)