Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 9d272dd

Browse files
authored
fix: add string to DATETIME, TIME, NUMERIC, BIGNUMERIC support in JsonStreamWriter v1 (#1345)
* fix: update code comment to reflect max size change * fix: String to DATETIME, TIME, NUMERIC, BIGNUMERIC conversion in JsonWriter * . * . * .
1 parent 691f078 commit 9d272dd

4 files changed

Lines changed: 498 additions & 35 deletions

File tree

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ private JsonStreamWriter(Builder builder)
7575
builder.traceId);
7676
this.streamWriter = streamWriterBuilder.build();
7777
this.streamName = builder.streamName;
78+
this.tableSchema = builder.tableSchema;
7879
}
7980

8081
/**
@@ -105,7 +106,8 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
105106
// of JSON data.
106107
for (int i = 0; i < jsonArr.length(); i++) {
107108
JSONObject json = jsonArr.getJSONObject(i);
108-
Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, json);
109+
Message protoMessage =
110+
JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, this.tableSchema, json);
109111
rowsBuilder.addSerializedRows(protoMessage.toByteString());
110112
}
111113
// Need to make sure refreshAppendAndSetDescriptor finish first before this can run

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java

Lines changed: 166 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.google.cloud.bigquery.storage.v1;
1717

18+
import com.google.api.pathtemplate.ValidationException;
1819
import com.google.common.base.Preconditions;
1920
import com.google.common.collect.ImmutableMap;
2021
import com.google.protobuf.ByteString;
@@ -23,10 +24,14 @@
2324
import com.google.protobuf.DynamicMessage;
2425
import com.google.protobuf.Message;
2526
import com.google.protobuf.UninitializedMessageException;
27+
import java.math.BigDecimal;
28+
import java.util.List;
2629
import java.util.logging.Logger;
2730
import org.json.JSONArray;
2831
import org.json.JSONException;
2932
import org.json.JSONObject;
33+
import org.threeten.bp.LocalDateTime;
34+
import org.threeten.bp.LocalTime;
3035

3136
/**
3237
* Converts Json data to protocol buffer messages given the protocol buffer descriptor. The protobuf
@@ -58,7 +63,28 @@ public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, J
5863
Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null.");
5964
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");
6065

61-
return convertJsonToProtoMessageImpl(protoSchema, json, "root", /*topLevel=*/ true);
66+
return convertJsonToProtoMessageImpl(protoSchema, null, json, "root", /*topLevel=*/ true);
67+
}
68+
69+
/**
70+
* Converts Json data to protocol buffer messages given the protocol buffer descriptor.
71+
*
72+
* @param protoSchema
73+
* @param tableSchema bigquery table schema is needed for type conversion of DATETIME, TIME,
74+
* NUMERIC, BIGNUMERIC
75+
* @param json
76+
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
77+
*/
78+
public static DynamicMessage convertJsonToProtoMessage(
79+
Descriptor protoSchema, TableSchema tableSchema, JSONObject json)
80+
throws IllegalArgumentException {
81+
Preconditions.checkNotNull(json, "JSONObject is null.");
82+
Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null.");
83+
Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
84+
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");
85+
86+
return convertJsonToProtoMessageImpl(
87+
protoSchema, tableSchema.getFieldsList(), json, "root", /*topLevel=*/ true);
6288
}
6389

6490
/**
@@ -71,7 +97,11 @@ public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, J
7197
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
7298
*/
7399
private static DynamicMessage convertJsonToProtoMessageImpl(
74-
Descriptor protoSchema, JSONObject json, String jsonScope, boolean topLevel)
100+
Descriptor protoSchema,
101+
List<TableFieldSchema> tableSchema,
102+
JSONObject json,
103+
String jsonScope,
104+
boolean topLevel)
75105
throws IllegalArgumentException {
76106

77107
DynamicMessage.Builder protoMsg = DynamicMessage.newBuilder(protoSchema);
@@ -90,10 +120,25 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
90120
throw new IllegalArgumentException(
91121
String.format("JSONObject has fields unknown to BigQuery: %s.", currentScope));
92122
}
123+
TableFieldSchema fieldSchema = null;
124+
if (tableSchema != null) {
125+
// protoSchema is generated from tableSchema so their field ordering should match.
126+
fieldSchema = tableSchema.get(field.getIndex());
127+
if (!fieldSchema.getName().equals(field.getName())) {
128+
throw new ValidationException(
129+
"Field at index "
130+
+ field.getIndex()
131+
+ " has mismatch names ("
132+
+ fieldSchema.getName()
133+
+ ") ("
134+
+ field.getName()
135+
+ ")");
136+
}
137+
}
93138
if (!field.isRepeated()) {
94-
fillField(protoMsg, field, json, jsonName, currentScope);
139+
fillField(protoMsg, field, fieldSchema, json, jsonName, currentScope);
95140
} else {
96-
fillRepeatedField(protoMsg, field, json, jsonName, currentScope);
141+
fillRepeatedField(protoMsg, field, fieldSchema, json, jsonName, currentScope);
97142
}
98143
}
99144

@@ -119,6 +164,7 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
119164
*
120165
* @param protoMsg The protocol buffer message being constructed
121166
* @param fieldDescriptor
167+
* @param fieldSchema
122168
* @param json
123169
* @param exactJsonKeyName Exact key name in JSONObject instead of lowercased version
124170
* @param currentScope Debugging purposes
@@ -127,6 +173,7 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
127173
private static void fillField(
128174
DynamicMessage.Builder protoMsg,
129175
FieldDescriptor fieldDescriptor,
176+
TableFieldSchema fieldSchema,
130177
JSONObject json,
131178
String exactJsonKeyName,
132179
String currentScope)
@@ -144,6 +191,25 @@ private static void fillField(
144191
}
145192
break;
146193
case BYTES:
194+
if (fieldSchema != null) {
195+
if (fieldSchema.getType() == TableFieldSchema.Type.NUMERIC) {
196+
if (val instanceof String) {
197+
protoMsg.setField(
198+
fieldDescriptor,
199+
BigDecimalByteStringEncoder.encodeToNumericByteString(
200+
new BigDecimal((String) val)));
201+
return;
202+
}
203+
} else if (fieldSchema.getType() == TableFieldSchema.Type.BIGNUMERIC) {
204+
if (val instanceof String) {
205+
protoMsg.setField(
206+
fieldDescriptor,
207+
BigDecimalByteStringEncoder.encodeToNumericByteString(
208+
new BigDecimal((String) val)));
209+
return;
210+
}
211+
}
212+
}
147213
if (val instanceof ByteString) {
148214
protoMsg.setField(fieldDescriptor, ((ByteString) val).toByteArray());
149215
return;
@@ -170,6 +236,29 @@ private static void fillField(
170236
}
171237
break;
172238
case INT64:
239+
if (fieldSchema != null) {
240+
if (fieldSchema.getType() == TableFieldSchema.Type.DATETIME) {
241+
if (val instanceof String) {
242+
protoMsg.setField(
243+
fieldDescriptor,
244+
CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.parse((String) val)));
245+
return;
246+
} else if (val instanceof Long) {
247+
protoMsg.setField(fieldDescriptor, (Long) val);
248+
return;
249+
}
250+
} else if (fieldSchema.getType() == TableFieldSchema.Type.TIME) {
251+
if (val instanceof String) {
252+
protoMsg.setField(
253+
fieldDescriptor,
254+
CivilTimeEncoder.encodePacked64TimeMicros(LocalTime.parse((String) val)));
255+
return;
256+
} else if (val instanceof Long) {
257+
protoMsg.setField(fieldDescriptor, (Long) val);
258+
return;
259+
}
260+
}
261+
}
173262
if (val instanceof Integer) {
174263
protoMsg.setField(fieldDescriptor, new Long((Integer) val));
175264
return;
@@ -206,6 +295,7 @@ private static void fillField(
206295
fieldDescriptor,
207296
convertJsonToProtoMessageImpl(
208297
fieldDescriptor.getMessageType(),
298+
fieldSchema == null ? null : fieldSchema.getFieldsList(),
209299
json.getJSONObject(exactJsonKeyName),
210300
currentScope,
211301
/*topLevel =*/ false));
@@ -224,6 +314,7 @@ private static void fillField(
224314
*
225315
* @param protoMsg The protocol buffer message being constructed
226316
* @param fieldDescriptor
317+
* @param fieldSchema
227318
* @param json If root level has no matching fields, throws exception.
228319
* @param exactJsonKeyName Exact key name in JSONObject instead of lowercased version
229320
* @param currentScope Debugging purposes
@@ -232,6 +323,7 @@ private static void fillField(
232323
private static void fillRepeatedField(
233324
DynamicMessage.Builder protoMsg,
234325
FieldDescriptor fieldDescriptor,
326+
TableFieldSchema fieldSchema,
235327
JSONObject json,
236328
String exactJsonKeyName,
237329
String currentScope)
@@ -259,40 +351,81 @@ private static void fillRepeatedField(
259351
}
260352
break;
261353
case BYTES:
262-
if (val instanceof JSONArray) {
263-
try {
264-
byte[] bytes = new byte[((JSONArray) val).length()];
265-
for (int j = 0; j < ((JSONArray) val).length(); j++) {
266-
bytes[j] = (byte) ((JSONArray) val).getInt(j);
267-
if (bytes[j] != ((JSONArray) val).getInt(j)) {
268-
throw new IllegalArgumentException(
269-
String.format(
270-
"Error: "
271-
+ currentScope
272-
+ "["
273-
+ index
274-
+ "] could not be converted to byte[]."));
354+
Boolean added = false;
355+
if (fieldSchema != null && fieldSchema.getType() == TableFieldSchema.Type.NUMERIC) {
356+
if (val instanceof String) {
357+
protoMsg.addRepeatedField(
358+
fieldDescriptor,
359+
BigDecimalByteStringEncoder.encodeToNumericByteString(
360+
new BigDecimal((String) val)));
361+
added = true;
362+
}
363+
} else if (fieldSchema != null
364+
&& fieldSchema.getType() == TableFieldSchema.Type.BIGNUMERIC) {
365+
if (val instanceof String) {
366+
protoMsg.addRepeatedField(
367+
fieldDescriptor,
368+
BigDecimalByteStringEncoder.encodeToNumericByteString(
369+
new BigDecimal((String) val)));
370+
added = true;
371+
}
372+
}
373+
if (!added) {
374+
if (val instanceof JSONArray) {
375+
try {
376+
byte[] bytes = new byte[((JSONArray) val).length()];
377+
for (int j = 0; j < ((JSONArray) val).length(); j++) {
378+
bytes[j] = (byte) ((JSONArray) val).getInt(j);
379+
if (bytes[j] != ((JSONArray) val).getInt(j)) {
380+
throw new IllegalArgumentException(
381+
String.format(
382+
"Error: "
383+
+ currentScope
384+
+ "["
385+
+ index
386+
+ "] could not be converted to byte[]."));
387+
}
275388
}
389+
protoMsg.addRepeatedField(fieldDescriptor, bytes);
390+
} catch (JSONException e) {
391+
throw new IllegalArgumentException(
392+
String.format(
393+
"Error: "
394+
+ currentScope
395+
+ "["
396+
+ index
397+
+ "] could not be converted to byte[]."));
276398
}
277-
protoMsg.addRepeatedField(fieldDescriptor, bytes);
278-
} catch (JSONException e) {
279-
throw new IllegalArgumentException(
280-
String.format(
281-
"Error: "
282-
+ currentScope
283-
+ "["
284-
+ index
285-
+ "] could not be converted to byte[]."));
399+
} else if (val instanceof ByteString) {
400+
protoMsg.addRepeatedField(fieldDescriptor, ((ByteString) val).toByteArray());
401+
return;
402+
} else {
403+
fail = true;
286404
}
287-
} else if (val instanceof ByteString) {
288-
protoMsg.addRepeatedField(fieldDescriptor, ((ByteString) val).toByteArray());
289-
return;
290-
} else {
291-
fail = true;
292405
}
293406
break;
294407
case INT64:
295-
if (val instanceof Integer) {
408+
if (fieldSchema != null && fieldSchema.getType() == TableFieldSchema.Type.DATETIME) {
409+
if (val instanceof String) {
410+
protoMsg.addRepeatedField(
411+
fieldDescriptor,
412+
CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.parse((String) val)));
413+
} else if (val instanceof Long) {
414+
protoMsg.addRepeatedField(fieldDescriptor, (Long) val);
415+
} else {
416+
fail = true;
417+
}
418+
} else if (fieldSchema != null && fieldSchema.getType() == TableFieldSchema.Type.TIME) {
419+
if (val instanceof String) {
420+
protoMsg.addRepeatedField(
421+
fieldDescriptor,
422+
CivilTimeEncoder.encodePacked64TimeMicros(LocalTime.parse((String) val)));
423+
} else if (val instanceof Long) {
424+
protoMsg.addRepeatedField(fieldDescriptor, (Long) val);
425+
} else {
426+
fail = true;
427+
}
428+
} else if (val instanceof Integer) {
296429
protoMsg.addRepeatedField(fieldDescriptor, new Long((Integer) val));
297430
} else if (val instanceof Long) {
298431
protoMsg.addRepeatedField(fieldDescriptor, (Long) val);
@@ -330,6 +463,7 @@ private static void fillRepeatedField(
330463
fieldDescriptor,
331464
convertJsonToProtoMessageImpl(
332465
fieldDescriptor.getMessageType(),
466+
fieldSchema == null ? null : fieldSchema.getFieldsList(),
333467
jsonArray.getJSONObject(i),
334468
currentScope,
335469
/*topLevel =*/ false));

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.api.gax.grpc.testing.LocalChannelProvider;
2525
import com.google.api.gax.grpc.testing.MockGrpcService;
2626
import com.google.api.gax.grpc.testing.MockServiceHelper;
27+
import com.google.cloud.bigquery.storage.test.JsonTest;
2728
import com.google.cloud.bigquery.storage.test.Test.FooType;
2829
import com.google.protobuf.Descriptors.DescriptorValidationException;
2930
import com.google.protobuf.Int64Value;
@@ -42,6 +43,7 @@
4243
import org.junit.runner.RunWith;
4344
import org.junit.runners.JUnit4;
4445
import org.threeten.bp.Instant;
46+
import org.threeten.bp.LocalTime;
4547

4648
@RunWith(JUnit4.class)
4749
public class JsonStreamWriterTest {
@@ -195,6 +197,56 @@ public void testSingleAppendSimpleJson() throws Exception {
195197
}
196198
}
197199

200+
@Test
201+
public void testSpecialTypeAppend() throws Exception {
202+
TableFieldSchema field =
203+
TableFieldSchema.newBuilder()
204+
.setName("time")
205+
.setType(TableFieldSchema.Type.TIME)
206+
.setMode(TableFieldSchema.Mode.REPEATED)
207+
.build();
208+
TableSchema tableSchema = TableSchema.newBuilder().addFields(field).build();
209+
210+
JsonTest.TestTime expectedProto =
211+
JsonTest.TestTime.newBuilder()
212+
.addTime(CivilTimeEncoder.encodePacked64TimeMicros(LocalTime.of(1, 0, 1)))
213+
.build();
214+
JSONObject foo = new JSONObject();
215+
foo.put("time", new JSONArray(new String[] {"01:00:01"}));
216+
JSONArray jsonArr = new JSONArray();
217+
jsonArr.put(foo);
218+
219+
try (JsonStreamWriter writer =
220+
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) {
221+
222+
testBigQueryWrite.addResponse(
223+
AppendRowsResponse.newBuilder()
224+
.setAppendResult(
225+
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
226+
.build());
227+
228+
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
229+
assertEquals(0L, appendFuture.get().getAppendResult().getOffset().getValue());
230+
appendFuture.get();
231+
assertEquals(
232+
1,
233+
testBigQueryWrite
234+
.getAppendRequests()
235+
.get(0)
236+
.getProtoRows()
237+
.getRows()
238+
.getSerializedRowsCount());
239+
assertEquals(
240+
testBigQueryWrite
241+
.getAppendRequests()
242+
.get(0)
243+
.getProtoRows()
244+
.getRows()
245+
.getSerializedRows(0),
246+
expectedProto.toByteString());
247+
}
248+
}
249+
198250
@Test
199251
public void testSingleAppendMultipleSimpleJson() throws Exception {
200252
FooType expectedProto = FooType.newBuilder().setFoo("allen").build();

0 commit comments

Comments
 (0)