1515 */
1616package com .google .cloud .bigquery .storage .v1 ;
1717
18+ import com .google .api .pathtemplate .ValidationException ;
1819import com .google .common .base .Preconditions ;
1920import com .google .common .collect .ImmutableMap ;
2021import com .google .protobuf .ByteString ;
2324import com .google .protobuf .DynamicMessage ;
2425import com .google .protobuf .Message ;
2526import com .google .protobuf .UninitializedMessageException ;
27+ import java .math .BigDecimal ;
28+ import java .util .List ;
2629import java .util .logging .Logger ;
2730import org .json .JSONArray ;
2831import org .json .JSONException ;
2932import org .json .JSONObject ;
33+ import org .threeten .bp .LocalDateTime ;
34+ import org .threeten .bp .LocalTime ;
3035
3136/**
3237 * Converts Json data to protocol buffer messages given the protocol buffer descriptor. The protobuf
@@ -58,7 +63,28 @@ public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, J
5863 Preconditions .checkNotNull (protoSchema , "Protobuf descriptor is null." );
5964 Preconditions .checkState (json .length () != 0 , "JSONObject is empty." );
6065
61- return convertJsonToProtoMessageImpl (protoSchema , json , "root" , /*topLevel=*/ true );
66+ return convertJsonToProtoMessageImpl (protoSchema , null , json , "root" , /*topLevel=*/ true );
67+ }
68+
69+ /**
70+ * Converts Json data to protocol buffer messages given the protocol buffer descriptor.
71+ *
72+ * @param protoSchema
73+ * @param tableSchema bigquery table schema is needed for type conversion of DATETIME, TIME,
74+ * NUMERIC, BIGNUMERIC
75+ * @param json
76+ * @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
77+ */
78+ public static DynamicMessage convertJsonToProtoMessage (
79+ Descriptor protoSchema , TableSchema tableSchema , JSONObject json )
80+ throws IllegalArgumentException {
81+ Preconditions .checkNotNull (json , "JSONObject is null." );
82+ Preconditions .checkNotNull (protoSchema , "Protobuf descriptor is null." );
83+ Preconditions .checkNotNull (tableSchema , "TableSchema is null." );
84+ Preconditions .checkState (json .length () != 0 , "JSONObject is empty." );
85+
86+ return convertJsonToProtoMessageImpl (
87+ protoSchema , tableSchema .getFieldsList (), json , "root" , /*topLevel=*/ true );
6288 }
6389
6490 /**
@@ -71,7 +97,11 @@ public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, J
7197 * @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
7298 */
7399 private static DynamicMessage convertJsonToProtoMessageImpl (
74- Descriptor protoSchema , JSONObject json , String jsonScope , boolean topLevel )
100+ Descriptor protoSchema ,
101+ List <TableFieldSchema > tableSchema ,
102+ JSONObject json ,
103+ String jsonScope ,
104+ boolean topLevel )
75105 throws IllegalArgumentException {
76106
77107 DynamicMessage .Builder protoMsg = DynamicMessage .newBuilder (protoSchema );
@@ -90,10 +120,25 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
90120 throw new IllegalArgumentException (
91121 String .format ("JSONObject has fields unknown to BigQuery: %s." , currentScope ));
92122 }
123+ TableFieldSchema fieldSchema = null ;
124+ if (tableSchema != null ) {
125+ // protoSchema is generated from tableSchema so their field ordering should match.
126+ fieldSchema = tableSchema .get (field .getIndex ());
127+ if (!fieldSchema .getName ().equals (field .getName ())) {
128+ throw new ValidationException (
129+ "Field at index "
130+ + field .getIndex ()
131+ + " has mismatch names ("
132+ + fieldSchema .getName ()
133+ + ") ("
134+ + field .getName ()
135+ + ")" );
136+ }
137+ }
93138 if (!field .isRepeated ()) {
94- fillField (protoMsg , field , json , jsonName , currentScope );
139+ fillField (protoMsg , field , fieldSchema , json , jsonName , currentScope );
95140 } else {
96- fillRepeatedField (protoMsg , field , json , jsonName , currentScope );
141+ fillRepeatedField (protoMsg , field , fieldSchema , json , jsonName , currentScope );
97142 }
98143 }
99144
@@ -119,6 +164,7 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
119164 *
120165 * @param protoMsg The protocol buffer message being constructed
121166 * @param fieldDescriptor
167+ * @param fieldSchema
122168 * @param json
123169 * @param exactJsonKeyName Exact key name in JSONObject instead of lowercased version
124170 * @param currentScope Debugging purposes
@@ -127,6 +173,7 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
127173 private static void fillField (
128174 DynamicMessage .Builder protoMsg ,
129175 FieldDescriptor fieldDescriptor ,
176+ TableFieldSchema fieldSchema ,
130177 JSONObject json ,
131178 String exactJsonKeyName ,
132179 String currentScope )
@@ -144,6 +191,25 @@ private static void fillField(
144191 }
145192 break ;
146193 case BYTES :
194+ if (fieldSchema != null ) {
195+ if (fieldSchema .getType () == TableFieldSchema .Type .NUMERIC ) {
196+ if (val instanceof String ) {
197+ protoMsg .setField (
198+ fieldDescriptor ,
199+ BigDecimalByteStringEncoder .encodeToNumericByteString (
200+ new BigDecimal ((String ) val )));
201+ return ;
202+ }
203+ } else if (fieldSchema .getType () == TableFieldSchema .Type .BIGNUMERIC ) {
204+ if (val instanceof String ) {
205+ protoMsg .setField (
206+ fieldDescriptor ,
207+ BigDecimalByteStringEncoder .encodeToNumericByteString (
208+ new BigDecimal ((String ) val )));
209+ return ;
210+ }
211+ }
212+ }
147213 if (val instanceof ByteString ) {
148214 protoMsg .setField (fieldDescriptor , ((ByteString ) val ).toByteArray ());
149215 return ;
@@ -170,6 +236,29 @@ private static void fillField(
170236 }
171237 break ;
172238 case INT64 :
239+ if (fieldSchema != null ) {
240+ if (fieldSchema .getType () == TableFieldSchema .Type .DATETIME ) {
241+ if (val instanceof String ) {
242+ protoMsg .setField (
243+ fieldDescriptor ,
244+ CivilTimeEncoder .encodePacked64DatetimeMicros (LocalDateTime .parse ((String ) val )));
245+ return ;
246+ } else if (val instanceof Long ) {
247+ protoMsg .setField (fieldDescriptor , (Long ) val );
248+ return ;
249+ }
250+ } else if (fieldSchema .getType () == TableFieldSchema .Type .TIME ) {
251+ if (val instanceof String ) {
252+ protoMsg .setField (
253+ fieldDescriptor ,
254+ CivilTimeEncoder .encodePacked64TimeMicros (LocalTime .parse ((String ) val )));
255+ return ;
256+ } else if (val instanceof Long ) {
257+ protoMsg .setField (fieldDescriptor , (Long ) val );
258+ return ;
259+ }
260+ }
261+ }
173262 if (val instanceof Integer ) {
174263 protoMsg .setField (fieldDescriptor , new Long ((Integer ) val ));
175264 return ;
@@ -206,6 +295,7 @@ private static void fillField(
206295 fieldDescriptor ,
207296 convertJsonToProtoMessageImpl (
208297 fieldDescriptor .getMessageType (),
298+ fieldSchema == null ? null : fieldSchema .getFieldsList (),
209299 json .getJSONObject (exactJsonKeyName ),
210300 currentScope ,
211301 /*topLevel =*/ false ));
@@ -224,6 +314,7 @@ private static void fillField(
224314 *
225315 * @param protoMsg The protocol buffer message being constructed
226316 * @param fieldDescriptor
317+ * @param fieldSchema
227318 * @param json If root level has no matching fields, throws exception.
228319 * @param exactJsonKeyName Exact key name in JSONObject instead of lowercased version
229320 * @param currentScope Debugging purposes
@@ -232,6 +323,7 @@ private static void fillField(
232323 private static void fillRepeatedField (
233324 DynamicMessage .Builder protoMsg ,
234325 FieldDescriptor fieldDescriptor ,
326+ TableFieldSchema fieldSchema ,
235327 JSONObject json ,
236328 String exactJsonKeyName ,
237329 String currentScope )
@@ -259,40 +351,81 @@ private static void fillRepeatedField(
259351 }
260352 break ;
261353 case BYTES :
262- if (val instanceof JSONArray ) {
263- try {
264- byte [] bytes = new byte [((JSONArray ) val ).length ()];
265- for (int j = 0 ; j < ((JSONArray ) val ).length (); j ++) {
266- bytes [j ] = (byte ) ((JSONArray ) val ).getInt (j );
267- if (bytes [j ] != ((JSONArray ) val ).getInt (j )) {
268- throw new IllegalArgumentException (
269- String .format (
270- "Error: "
271- + currentScope
272- + "["
273- + index
274- + "] could not be converted to byte[]." ));
354+ Boolean added = false ;
355+ if (fieldSchema != null && fieldSchema .getType () == TableFieldSchema .Type .NUMERIC ) {
356+ if (val instanceof String ) {
357+ protoMsg .addRepeatedField (
358+ fieldDescriptor ,
359+ BigDecimalByteStringEncoder .encodeToNumericByteString (
360+ new BigDecimal ((String ) val )));
361+ added = true ;
362+ }
363+ } else if (fieldSchema != null
364+ && fieldSchema .getType () == TableFieldSchema .Type .BIGNUMERIC ) {
365+ if (val instanceof String ) {
366+ protoMsg .addRepeatedField (
367+ fieldDescriptor ,
368+ BigDecimalByteStringEncoder .encodeToNumericByteString (
369+ new BigDecimal ((String ) val )));
370+ added = true ;
371+ }
372+ }
373+ if (!added ) {
374+ if (val instanceof JSONArray ) {
375+ try {
376+ byte [] bytes = new byte [((JSONArray ) val ).length ()];
377+ for (int j = 0 ; j < ((JSONArray ) val ).length (); j ++) {
378+ bytes [j ] = (byte ) ((JSONArray ) val ).getInt (j );
379+ if (bytes [j ] != ((JSONArray ) val ).getInt (j )) {
380+ throw new IllegalArgumentException (
381+ String .format (
382+ "Error: "
383+ + currentScope
384+ + "["
385+ + index
386+ + "] could not be converted to byte[]." ));
387+ }
275388 }
389+ protoMsg .addRepeatedField (fieldDescriptor , bytes );
390+ } catch (JSONException e ) {
391+ throw new IllegalArgumentException (
392+ String .format (
393+ "Error: "
394+ + currentScope
395+ + "["
396+ + index
397+ + "] could not be converted to byte[]." ));
276398 }
277- protoMsg .addRepeatedField (fieldDescriptor , bytes );
278- } catch (JSONException e ) {
279- throw new IllegalArgumentException (
280- String .format (
281- "Error: "
282- + currentScope
283- + "["
284- + index
285- + "] could not be converted to byte[]." ));
399+ } else if (val instanceof ByteString ) {
400+ protoMsg .addRepeatedField (fieldDescriptor , ((ByteString ) val ).toByteArray ());
401+ return ;
402+ } else {
403+ fail = true ;
286404 }
287- } else if (val instanceof ByteString ) {
288- protoMsg .addRepeatedField (fieldDescriptor , ((ByteString ) val ).toByteArray ());
289- return ;
290- } else {
291- fail = true ;
292405 }
293406 break ;
294407 case INT64 :
295- if (val instanceof Integer ) {
408+ if (fieldSchema != null && fieldSchema .getType () == TableFieldSchema .Type .DATETIME ) {
409+ if (val instanceof String ) {
410+ protoMsg .addRepeatedField (
411+ fieldDescriptor ,
412+ CivilTimeEncoder .encodePacked64DatetimeMicros (LocalDateTime .parse ((String ) val )));
413+ } else if (val instanceof Long ) {
414+ protoMsg .addRepeatedField (fieldDescriptor , (Long ) val );
415+ } else {
416+ fail = true ;
417+ }
418+ } else if (fieldSchema != null && fieldSchema .getType () == TableFieldSchema .Type .TIME ) {
419+ if (val instanceof String ) {
420+ protoMsg .addRepeatedField (
421+ fieldDescriptor ,
422+ CivilTimeEncoder .encodePacked64TimeMicros (LocalTime .parse ((String ) val )));
423+ } else if (val instanceof Long ) {
424+ protoMsg .addRepeatedField (fieldDescriptor , (Long ) val );
425+ } else {
426+ fail = true ;
427+ }
428+ } else if (val instanceof Integer ) {
296429 protoMsg .addRepeatedField (fieldDescriptor , new Long ((Integer ) val ));
297430 } else if (val instanceof Long ) {
298431 protoMsg .addRepeatedField (fieldDescriptor , (Long ) val );
@@ -330,6 +463,7 @@ private static void fillRepeatedField(
330463 fieldDescriptor ,
331464 convertJsonToProtoMessageImpl (
332465 fieldDescriptor .getMessageType (),
466+ fieldSchema == null ? null : fieldSchema .getFieldsList (),
333467 jsonArray .getJSONObject (i ),
334468 currentScope ,
335469 /*topLevel =*/ false ));
0 commit comments