Skip to content

Commit 18c82ad

Browse files
chamikaramjdavorbonaci
authored andcommitted
Updated BigQueryReader so that BigQuery values of type TIMESTAMP are formatted similarly for local and service-based runs. This change updates the TIMESTAMP values produced in
local runs to be formatted similar to the values produced by serviced-based runs. This closes apache#20 ----Release Notes---- This change modifies the formatting of the values of type TIMESTAMP produced by Dataflow BigQuery source. This change may only affect Dataflow jobs that are run in direct mode and Dataflow jobs that use BigQuery data as side inputs. This change will not affect Dataflow jobs that only use BigQuery data as a main input. [] ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=93090065
1 parent 3ff3dfb commit 18c82ad

2 files changed

Lines changed: 56 additions & 0 deletions

File tree

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
import com.google.api.services.bigquery.model.TableRow;
2727
import com.google.api.services.bigquery.model.TableSchema;
2828

29+
import org.joda.time.format.DateTimeFormat;
30+
import org.joda.time.format.DateTimeFormatter;
31+
2932
import java.io.Closeable;
3033
import java.io.IOException;
3134
import java.util.ArrayList;
@@ -82,6 +85,8 @@ public boolean hasNext() {
8285
* <li> Record columns are {@link TableRow}s.
8386
* <li> {@code BOOLEAN} columns are JSON booleans, hence Java {@link Boolean}s.
8487
* <li> {@code FLOAT} columns are JSON floats, hence Java {@link Double}s.
88+
* <li> {@code TIMESTAMP} columns are {@link String}s that are of the format
89+
* {yyyy-MM-dd HH:mm:ss.SSS UTC}.
8590
* <li> Every other atomic type is a {@link String}.
8691
* </ul></p>
8792
*
@@ -122,6 +127,14 @@ private Object getTypedCellValue(TableFieldSchema fieldSchema, Object v) {
122127
return Boolean.parseBoolean((String) v);
123128
}
124129

130+
if (fieldSchema.getType().equals("TIMESTAMP")) {
131+
// Seconds to milliseconds
132+
long milliSecs = (new Double(Double.parseDouble((String) v) * 1000)).longValue();
133+
DateTimeFormatter formatter =
134+
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS").withZoneUTC();
135+
return formatter.print(milliSecs) + " UTC";
136+
}
137+
125138
return v;
126139
}
127140

sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryUtilTest.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,49 @@ private Table basicTableSchema() {
166166
)));
167167
}
168168

169+
private Table basicTableSchemaWithTime() {
170+
return new Table()
171+
.setSchema(new TableSchema()
172+
.setFields(Arrays.asList(
173+
new TableFieldSchema()
174+
.setName("name")
175+
.setType("STRING"),
176+
new TableFieldSchema()
177+
.setName("time")
178+
.setType("TIMESTAMP"),
179+
new TableFieldSchema()
180+
.setName("answer")
181+
.setType("INTEGER")
182+
)));
183+
}
184+
185+
@Test
186+
public void testReadWithTime() throws IOException {
187+
onTableGet(basicTableSchemaWithTime());
188+
189+
TableDataList dataList = rawDataList(rawRow("Arthur", "1.430397296789E9", 42));
190+
onTableList(dataList);
191+
192+
BigQueryTableRowIterator iterator = new BigQueryTableRowIterator(
193+
mockClient,
194+
BigQueryIO.parseTableSpec("project:dataset.table"));
195+
196+
Assert.assertTrue(iterator.hasNext());
197+
TableRow row = iterator.next();
198+
199+
Assert.assertTrue(row.containsKey("name"));
200+
Assert.assertTrue(row.containsKey("time"));
201+
Assert.assertTrue(row.containsKey("answer"));
202+
Assert.assertEquals("Arthur", row.get("name"));
203+
Assert.assertEquals("2015-04-30 12:34:56.789 UTC", row.get("time"));
204+
Assert.assertEquals(42, row.get("answer"));
205+
206+
Assert.assertFalse(iterator.hasNext());
207+
208+
verifyTableGet();
209+
verifyTabledataList();
210+
}
211+
169212
private TableRow rawRow(Object...args) {
170213
List<TableCell> cells = new LinkedList<>();
171214
for (Object a : args) {

0 commit comments

Comments
 (0)