|
23 | 23 | import com.google.protobuf.InvalidProtocolBufferException; |
24 | 24 | import com.google.protobuf.Value; |
25 | 25 | import com.google.protobuf.util.JsonFormat; |
26 | | -import java.util.ArrayList; |
27 | | -import java.util.Arrays; |
28 | 26 | import java.util.Collections; |
29 | 27 | import java.util.HashSet; |
30 | 28 | import java.util.List; |
|
44 | 42 | import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition; |
45 | 43 | import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod; |
46 | 44 | import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType; |
47 | | -import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord; |
48 | | -import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord; |
49 | 45 | import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; |
50 | | -import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionStartRecord; |
51 | 46 | import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode; |
52 | 47 | import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType; |
53 | 48 | import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; |
@@ -228,218 +223,12 @@ public List<ChangeStreamRecord> toChangeStreamRecords( |
228 | 223 | return Collections.singletonList( |
229 | 224 | toChangeStreamRecordJson(partition, resultSet.getPgJsonb(0), resultSetMetadata)); |
230 | 225 | } |
231 | | - |
232 | | - // In GoogleSQL, for `IMMUTABLE_KEY_RANGE` option, change stream records are returned as Protos. |
233 | | - if (resultSet.isProtoChangeRecord()) { |
234 | | - return Arrays.asList( |
235 | | - toChangeStreamRecord( |
236 | | - partition, resultSet.getProtoChangeStreamRecord(), resultSetMetadata)); |
237 | | - } |
238 | | - |
239 | | - // In GoogleSQL, for `MUTABLE_KEY_RANGE` option, change stream records are returned as an array |
240 | | - // of structs. |
| 226 | + // In GoogleSQL, change stream records are returned as an array of structs. |
241 | 227 | return resultSet.getCurrentRowAsStruct().getStructList(0).stream() |
242 | 228 | .flatMap(struct -> toChangeStreamRecord(partition, struct, resultSetMetadata)) |
243 | 229 | .collect(Collectors.toList()); |
244 | 230 | } |
245 | 231 |
|
246 | | - ChangeStreamRecord toChangeStreamRecord( |
247 | | - PartitionMetadata partition, |
248 | | - com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto, |
249 | | - ChangeStreamResultSetMetadata resultSetMetadata) { |
250 | | - if (changeStreamRecordProto.hasPartitionStartRecord()) { |
251 | | - return parseProtoPartitionStartRecord( |
252 | | - partition, resultSetMetadata, changeStreamRecordProto.getPartitionStartRecord()); |
253 | | - } else if (changeStreamRecordProto.hasPartitionEndRecord()) { |
254 | | - return parseProtoPartitionEndRecord( |
255 | | - partition, resultSetMetadata, changeStreamRecordProto.getPartitionEndRecord()); |
256 | | - } else if (changeStreamRecordProto.hasPartitionEventRecord()) { |
257 | | - return parseProtoPartitionEventRecord( |
258 | | - partition, resultSetMetadata, changeStreamRecordProto.getPartitionEventRecord()); |
259 | | - } else if (changeStreamRecordProto.hasHeartbeatRecord()) { |
260 | | - return parseProtoHeartbeatRecord( |
261 | | - partition, resultSetMetadata, changeStreamRecordProto.getHeartbeatRecord()); |
262 | | - } else if (changeStreamRecordProto.hasDataChangeRecord()) { |
263 | | - return parseProtoDataChangeRecord( |
264 | | - partition, resultSetMetadata, changeStreamRecordProto.getDataChangeRecord()); |
265 | | - } else { |
266 | | - throw new IllegalArgumentException( |
267 | | - "Unknown change stream record type " + changeStreamRecordProto.toString()); |
268 | | - } |
269 | | - } |
270 | | - |
271 | | - ChangeStreamRecord parseProtoPartitionStartRecord( |
272 | | - PartitionMetadata partition, |
273 | | - ChangeStreamResultSetMetadata resultSetMetadata, |
274 | | - com.google.spanner.v1.ChangeStreamRecord.PartitionStartRecord partitionStartRecordProto) { |
275 | | - final Timestamp startTimestamp = |
276 | | - Timestamp.fromProto(partitionStartRecordProto.getStartTimestamp()); |
277 | | - return new PartitionStartRecord( |
278 | | - startTimestamp, |
279 | | - partitionStartRecordProto.getRecordSequence(), |
280 | | - partitionStartRecordProto.getPartitionTokensList(), |
281 | | - changeStreamRecordMetadataFrom(partition, startTimestamp, resultSetMetadata)); |
282 | | - } |
283 | | - |
284 | | - ChangeStreamRecord parseProtoPartitionEndRecord( |
285 | | - PartitionMetadata partition, |
286 | | - ChangeStreamResultSetMetadata resultSetMetadata, |
287 | | - com.google.spanner.v1.ChangeStreamRecord.PartitionEndRecord partitionEndRecordProto) { |
288 | | - final Timestamp endTimestamp = Timestamp.fromProto(partitionEndRecordProto.getEndTimestamp()); |
289 | | - return new PartitionEndRecord( |
290 | | - endTimestamp, |
291 | | - partitionEndRecordProto.getRecordSequence(), |
292 | | - changeStreamRecordMetadataFrom(partition, endTimestamp, resultSetMetadata)); |
293 | | - } |
294 | | - |
295 | | - ChangeStreamRecord parseProtoPartitionEventRecord( |
296 | | - PartitionMetadata partition, |
297 | | - ChangeStreamResultSetMetadata resultSetMetadata, |
298 | | - com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord partitionEventRecordProto) { |
299 | | - final Timestamp commitTimestamp = |
300 | | - Timestamp.fromProto(partitionEventRecordProto.getCommitTimestamp()); |
301 | | - return new PartitionEventRecord( |
302 | | - commitTimestamp, |
303 | | - partitionEventRecordProto.getRecordSequence(), |
304 | | - changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata)); |
305 | | - } |
306 | | - |
307 | | - ChangeStreamRecord parseProtoHeartbeatRecord( |
308 | | - PartitionMetadata partition, |
309 | | - ChangeStreamResultSetMetadata resultSetMetadata, |
310 | | - com.google.spanner.v1.ChangeStreamRecord.HeartbeatRecord heartbeatRecordProto) { |
311 | | - final Timestamp heartbeatTimestamp = Timestamp.fromProto(heartbeatRecordProto.getTimestamp()); |
312 | | - return new HeartbeatRecord( |
313 | | - heartbeatTimestamp, |
314 | | - changeStreamRecordMetadataFrom(partition, heartbeatTimestamp, resultSetMetadata)); |
315 | | - } |
316 | | - |
317 | | - ChangeStreamRecord parseProtoDataChangeRecord( |
318 | | - PartitionMetadata partition, |
319 | | - ChangeStreamResultSetMetadata resultSetMetadata, |
320 | | - com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord dataChangeRecordProto) { |
321 | | - final Timestamp commitTimestamp = |
322 | | - Timestamp.fromProto(dataChangeRecordProto.getCommitTimestamp()); |
323 | | - return new DataChangeRecord( |
324 | | - partition.getPartitionToken(), |
325 | | - commitTimestamp, |
326 | | - dataChangeRecordProto.getServerTransactionId(), |
327 | | - dataChangeRecordProto.getIsLastRecordInTransactionInPartition(), |
328 | | - dataChangeRecordProto.getRecordSequence(), |
329 | | - dataChangeRecordProto.getTable(), |
330 | | - parseProtoColumnMetadata(dataChangeRecordProto.getColumnMetadataList()), |
331 | | - parseProtoMod( |
332 | | - dataChangeRecordProto.getModsList(), dataChangeRecordProto.getColumnMetadataList()), |
333 | | - parseProtoModType(dataChangeRecordProto.getModType()), |
334 | | - parseProtoValueCaptureType(dataChangeRecordProto.getValueCaptureType()), |
335 | | - dataChangeRecordProto.getNumberOfRecordsInTransaction(), |
336 | | - dataChangeRecordProto.getNumberOfPartitionsInTransaction(), |
337 | | - dataChangeRecordProto.getTransactionTag(), |
338 | | - dataChangeRecordProto.getIsSystemTransaction(), |
339 | | - changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata)); |
340 | | - } |
341 | | - |
342 | | - List<ColumnType> parseProtoColumnMetadata( |
343 | | - List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata> |
344 | | - columnMetadataProtos) { |
345 | | - List<ColumnType> columnTypes = new ArrayList<>(); |
346 | | - for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata |
347 | | - columnMetadataProto : columnMetadataProtos) { |
348 | | - // TypeCode class takes json format argument in its constructor, e.g. `{\"code\":\"INT64\"}`. |
349 | | - String typeCodeJson; |
350 | | - try { |
351 | | - typeCodeJson = this.printer.print(columnMetadataProto.getType()); |
352 | | - } catch (InvalidProtocolBufferException exc) { |
353 | | - throw new IllegalArgumentException( |
354 | | - "Failed to print type: " + columnMetadataProto.getType().toString()); |
355 | | - } |
356 | | - ColumnType columnType = |
357 | | - new ColumnType( |
358 | | - columnMetadataProto.getName(), |
359 | | - new TypeCode(typeCodeJson), |
360 | | - columnMetadataProto.getIsPrimaryKey(), |
361 | | - columnMetadataProto.getOrdinalPosition()); |
362 | | - columnTypes.add(columnType); |
363 | | - } |
364 | | - return columnTypes; |
365 | | - } |
366 | | - |
367 | | - String convertModValueProtosToJson( |
368 | | - List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModValue> modValueProtos, |
369 | | - List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata> |
370 | | - columnMetadataProtos) { |
371 | | - com.google.protobuf.Struct.Builder modStructValueBuilder = |
372 | | - com.google.protobuf.Struct.newBuilder(); |
373 | | - for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModValue modValueProto : |
374 | | - modValueProtos) { |
375 | | - final String columnName = |
376 | | - columnMetadataProtos.get(modValueProto.getColumnMetadataIndex()).getName(); |
377 | | - final Value columnValue = modValueProto.getValue(); |
378 | | - modStructValueBuilder.putFields(columnName, columnValue); |
379 | | - } |
380 | | - Value modStructValue = Value.newBuilder().setStructValue(modStructValueBuilder.build()).build(); |
381 | | - String modValueJson; |
382 | | - try { |
383 | | - modValueJson = this.printer.print(modStructValue); |
384 | | - } catch (InvalidProtocolBufferException exc) { |
385 | | - throw new IllegalArgumentException("Failed to print type: " + modStructValue); |
386 | | - } |
387 | | - return modValueJson; |
388 | | - } |
389 | | - |
390 | | - List<Mod> parseProtoMod( |
391 | | - List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.Mod> modProtos, |
392 | | - List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata> |
393 | | - columnMetadataProtos) { |
394 | | - List<Mod> mods = new ArrayList<>(); |
395 | | - for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.Mod modProto : modProtos) { |
396 | | - final String keysJson = |
397 | | - convertModValueProtosToJson(modProto.getKeysList(), columnMetadataProtos); |
398 | | - final String oldValuesJson = |
399 | | - convertModValueProtosToJson(modProto.getOldValuesList(), columnMetadataProtos); |
400 | | - final String newValuesJson = |
401 | | - convertModValueProtosToJson(modProto.getNewValuesList(), columnMetadataProtos); |
402 | | - Mod mod = new Mod(keysJson, oldValuesJson, newValuesJson); |
403 | | - mods.add(mod); |
404 | | - } |
405 | | - return mods; |
406 | | - } |
407 | | - |
408 | | - ModType parseProtoModType( |
409 | | - com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType modTypeProto) { |
410 | | - if (modTypeProto == com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.INSERT) { |
411 | | - return ModType.INSERT; |
412 | | - } else if (modTypeProto |
413 | | - == com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.UPDATE) { |
414 | | - return ModType.UPDATE; |
415 | | - } else if (modTypeProto |
416 | | - == com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.DELETE) { |
417 | | - return ModType.DELETE; |
418 | | - } |
419 | | - return ModType.UNKNOWN; |
420 | | - } |
421 | | - |
422 | | - ValueCaptureType parseProtoValueCaptureType( |
423 | | - com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType |
424 | | - valueCaptureTypeProto) { |
425 | | - if (valueCaptureTypeProto |
426 | | - == com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType.NEW_ROW) { |
427 | | - return ValueCaptureType.NEW_ROW; |
428 | | - } else if (valueCaptureTypeProto |
429 | | - == com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType.NEW_VALUES) { |
430 | | - return ValueCaptureType.NEW_VALUES; |
431 | | - } else if (valueCaptureTypeProto |
432 | | - == com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType |
433 | | - .OLD_AND_NEW_VALUES) { |
434 | | - return ValueCaptureType.OLD_AND_NEW_VALUES; |
435 | | - } else if (valueCaptureTypeProto |
436 | | - == com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType |
437 | | - .NEW_ROW_AND_OLD_VALUES) { |
438 | | - return ValueCaptureType.NEW_ROW_AND_OLD_VALUES; |
439 | | - } |
440 | | - return ValueCaptureType.UNKNOWN; |
441 | | - } |
442 | | - |
443 | 232 | Stream<ChangeStreamRecord> toChangeStreamRecord( |
444 | 233 | PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) { |
445 | 234 |
|
|
0 commit comments