Skip to content

Commit f9b7c74

Browse files
authored
Revert "Parse v2 record. (#35408)" (#35469)
This reverts commit 3f3f214.
1 parent 4d58e6d commit f9b7c74

File tree

6 files changed

+3
-601
lines changed

6 files changed

+3
-601
lines changed

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -651,7 +651,6 @@ class BeamModulePlugin implements Plugin<Project> {
651651
def arrow_version = "15.0.2"
652652
def jmh_version = "1.34"
653653
def jupiter_version = "5.7.0"
654-
def spanner_grpc_proto_version = "6.95.1"
655654

656655
// Export Spark versions, so they are defined in a single place only
657656
project.ext.spark3_version = spark3_version
@@ -861,7 +860,7 @@ class BeamModulePlugin implements Plugin<Project> {
861860
proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version
862861
proto_google_cloud_pubsublite_v1 : "com.google.api.grpc:proto-google-cloud-pubsublite-v1", // google_cloud_platform_libraries_bom sets version
863862
proto_google_cloud_secret_manager_v1 : "com.google.api.grpc:proto-google-cloud-secretmanager-v1", // google_cloud_platform_libraries_bom sets version
864-
proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1:$spanner_grpc_proto_version", // google_cloud_platform_libraries_bom sets version
863+
proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1", // google_cloud_platform_libraries_bom sets version
865864
proto_google_cloud_spanner_admin_database_v1: "com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1", // google_cloud_platform_libraries_bom sets version
866865
proto_google_common_protos : "com.google.api.grpc:proto-google-common-protos", // google_cloud_platform_libraries_bom sets version
867866
qpid_jms_client : "org.apache.qpid:qpid-jms-client:$qpid_jms_client_version",

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -108,26 +108,6 @@ public Struct getCurrentRowAsStruct() {
108108
return resultSet.getCurrentRowAsStruct();
109109
}
110110

111-
/**
112-
* Returns the only change stream record proto at the current pointer of the result set. It also
113-
* updates the timestamp at which the record was read. This function enhances the getProtoMessage
114-
* function but only focus on the ChangeStreamRecord type.
115-
*
116-
* @return a change stream record as a proto or null
117-
*/
118-
public com.google.spanner.v1.ChangeStreamRecord getProtoChangeStreamRecord() {
119-
recordReadAt = Timestamp.now();
120-
return resultSet.getProtoMessage(
121-
0, com.google.spanner.v1.ChangeStreamRecord.getDefaultInstance());
122-
}
123-
124-
/** Returns true if the result set at the current pointer contain only one proto change record. */
125-
public boolean isProtoChangeRecord() {
126-
return resultSet.getColumnCount() == 1
127-
&& !resultSet.isNull(0)
128-
&& resultSet.getColumnType(0).getCode() == com.google.cloud.spanner.Type.Code.PROTO;
129-
}
130-
131111
/**
132112
* Returns the record at the current pointer as {@link JsonB}. It also updates the timestamp at
133113
* which the record was read.

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java

Lines changed: 1 addition & 212 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
import com.google.protobuf.InvalidProtocolBufferException;
2424
import com.google.protobuf.Value;
2525
import com.google.protobuf.util.JsonFormat;
26-
import java.util.ArrayList;
27-
import java.util.Arrays;
2826
import java.util.Collections;
2927
import java.util.HashSet;
3028
import java.util.List;
@@ -44,10 +42,7 @@
4442
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
4543
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
4644
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
47-
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord;
48-
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord;
4945
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
50-
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionStartRecord;
5146
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode;
5247
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
5348
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
@@ -228,218 +223,12 @@ public List<ChangeStreamRecord> toChangeStreamRecords(
228223
return Collections.singletonList(
229224
toChangeStreamRecordJson(partition, resultSet.getPgJsonb(0), resultSetMetadata));
230225
}
231-
232-
// In GoogleSQL, for `IMMUTABLE_KEY_RANGE` option, change stream records are returned as Protos.
233-
if (resultSet.isProtoChangeRecord()) {
234-
return Arrays.asList(
235-
toChangeStreamRecord(
236-
partition, resultSet.getProtoChangeStreamRecord(), resultSetMetadata));
237-
}
238-
239-
// In GoogleSQL, for `MUTABLE_KEY_RANGE` option, change stream records are returned as an array
240-
// of structs.
226+
// In GoogleSQL, change stream records are returned as an array of structs.
241227
return resultSet.getCurrentRowAsStruct().getStructList(0).stream()
242228
.flatMap(struct -> toChangeStreamRecord(partition, struct, resultSetMetadata))
243229
.collect(Collectors.toList());
244230
}
245231

246-
ChangeStreamRecord toChangeStreamRecord(
247-
PartitionMetadata partition,
248-
com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto,
249-
ChangeStreamResultSetMetadata resultSetMetadata) {
250-
if (changeStreamRecordProto.hasPartitionStartRecord()) {
251-
return parseProtoPartitionStartRecord(
252-
partition, resultSetMetadata, changeStreamRecordProto.getPartitionStartRecord());
253-
} else if (changeStreamRecordProto.hasPartitionEndRecord()) {
254-
return parseProtoPartitionEndRecord(
255-
partition, resultSetMetadata, changeStreamRecordProto.getPartitionEndRecord());
256-
} else if (changeStreamRecordProto.hasPartitionEventRecord()) {
257-
return parseProtoPartitionEventRecord(
258-
partition, resultSetMetadata, changeStreamRecordProto.getPartitionEventRecord());
259-
} else if (changeStreamRecordProto.hasHeartbeatRecord()) {
260-
return parseProtoHeartbeatRecord(
261-
partition, resultSetMetadata, changeStreamRecordProto.getHeartbeatRecord());
262-
} else if (changeStreamRecordProto.hasDataChangeRecord()) {
263-
return parseProtoDataChangeRecord(
264-
partition, resultSetMetadata, changeStreamRecordProto.getDataChangeRecord());
265-
} else {
266-
throw new IllegalArgumentException(
267-
"Unknown change stream record type " + changeStreamRecordProto.toString());
268-
}
269-
}
270-
271-
ChangeStreamRecord parseProtoPartitionStartRecord(
272-
PartitionMetadata partition,
273-
ChangeStreamResultSetMetadata resultSetMetadata,
274-
com.google.spanner.v1.ChangeStreamRecord.PartitionStartRecord partitionStartRecordProto) {
275-
final Timestamp startTimestamp =
276-
Timestamp.fromProto(partitionStartRecordProto.getStartTimestamp());
277-
return new PartitionStartRecord(
278-
startTimestamp,
279-
partitionStartRecordProto.getRecordSequence(),
280-
partitionStartRecordProto.getPartitionTokensList(),
281-
changeStreamRecordMetadataFrom(partition, startTimestamp, resultSetMetadata));
282-
}
283-
284-
ChangeStreamRecord parseProtoPartitionEndRecord(
285-
PartitionMetadata partition,
286-
ChangeStreamResultSetMetadata resultSetMetadata,
287-
com.google.spanner.v1.ChangeStreamRecord.PartitionEndRecord partitionEndRecordProto) {
288-
final Timestamp endTimestamp = Timestamp.fromProto(partitionEndRecordProto.getEndTimestamp());
289-
return new PartitionEndRecord(
290-
endTimestamp,
291-
partitionEndRecordProto.getRecordSequence(),
292-
changeStreamRecordMetadataFrom(partition, endTimestamp, resultSetMetadata));
293-
}
294-
295-
ChangeStreamRecord parseProtoPartitionEventRecord(
296-
PartitionMetadata partition,
297-
ChangeStreamResultSetMetadata resultSetMetadata,
298-
com.google.spanner.v1.ChangeStreamRecord.PartitionEventRecord partitionEventRecordProto) {
299-
final Timestamp commitTimestamp =
300-
Timestamp.fromProto(partitionEventRecordProto.getCommitTimestamp());
301-
return new PartitionEventRecord(
302-
commitTimestamp,
303-
partitionEventRecordProto.getRecordSequence(),
304-
changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
305-
}
306-
307-
ChangeStreamRecord parseProtoHeartbeatRecord(
308-
PartitionMetadata partition,
309-
ChangeStreamResultSetMetadata resultSetMetadata,
310-
com.google.spanner.v1.ChangeStreamRecord.HeartbeatRecord heartbeatRecordProto) {
311-
final Timestamp heartbeatTimestamp = Timestamp.fromProto(heartbeatRecordProto.getTimestamp());
312-
return new HeartbeatRecord(
313-
heartbeatTimestamp,
314-
changeStreamRecordMetadataFrom(partition, heartbeatTimestamp, resultSetMetadata));
315-
}
316-
317-
ChangeStreamRecord parseProtoDataChangeRecord(
318-
PartitionMetadata partition,
319-
ChangeStreamResultSetMetadata resultSetMetadata,
320-
com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord dataChangeRecordProto) {
321-
final Timestamp commitTimestamp =
322-
Timestamp.fromProto(dataChangeRecordProto.getCommitTimestamp());
323-
return new DataChangeRecord(
324-
partition.getPartitionToken(),
325-
commitTimestamp,
326-
dataChangeRecordProto.getServerTransactionId(),
327-
dataChangeRecordProto.getIsLastRecordInTransactionInPartition(),
328-
dataChangeRecordProto.getRecordSequence(),
329-
dataChangeRecordProto.getTable(),
330-
parseProtoColumnMetadata(dataChangeRecordProto.getColumnMetadataList()),
331-
parseProtoMod(
332-
dataChangeRecordProto.getModsList(), dataChangeRecordProto.getColumnMetadataList()),
333-
parseProtoModType(dataChangeRecordProto.getModType()),
334-
parseProtoValueCaptureType(dataChangeRecordProto.getValueCaptureType()),
335-
dataChangeRecordProto.getNumberOfRecordsInTransaction(),
336-
dataChangeRecordProto.getNumberOfPartitionsInTransaction(),
337-
dataChangeRecordProto.getTransactionTag(),
338-
dataChangeRecordProto.getIsSystemTransaction(),
339-
changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
340-
}
341-
342-
List<ColumnType> parseProtoColumnMetadata(
343-
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata>
344-
columnMetadataProtos) {
345-
List<ColumnType> columnTypes = new ArrayList<>();
346-
for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata
347-
columnMetadataProto : columnMetadataProtos) {
348-
// TypeCode class takes json format argument in its constructor, e.g. `{\"code\":\"INT64\"}`.
349-
String typeCodeJson;
350-
try {
351-
typeCodeJson = this.printer.print(columnMetadataProto.getType());
352-
} catch (InvalidProtocolBufferException exc) {
353-
throw new IllegalArgumentException(
354-
"Failed to print type: " + columnMetadataProto.getType().toString());
355-
}
356-
ColumnType columnType =
357-
new ColumnType(
358-
columnMetadataProto.getName(),
359-
new TypeCode(typeCodeJson),
360-
columnMetadataProto.getIsPrimaryKey(),
361-
columnMetadataProto.getOrdinalPosition());
362-
columnTypes.add(columnType);
363-
}
364-
return columnTypes;
365-
}
366-
367-
String convertModValueProtosToJson(
368-
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModValue> modValueProtos,
369-
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata>
370-
columnMetadataProtos) {
371-
com.google.protobuf.Struct.Builder modStructValueBuilder =
372-
com.google.protobuf.Struct.newBuilder();
373-
for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModValue modValueProto :
374-
modValueProtos) {
375-
final String columnName =
376-
columnMetadataProtos.get(modValueProto.getColumnMetadataIndex()).getName();
377-
final Value columnValue = modValueProto.getValue();
378-
modStructValueBuilder.putFields(columnName, columnValue);
379-
}
380-
Value modStructValue = Value.newBuilder().setStructValue(modStructValueBuilder.build()).build();
381-
String modValueJson;
382-
try {
383-
modValueJson = this.printer.print(modStructValue);
384-
} catch (InvalidProtocolBufferException exc) {
385-
throw new IllegalArgumentException("Failed to print type: " + modStructValue);
386-
}
387-
return modValueJson;
388-
}
389-
390-
List<Mod> parseProtoMod(
391-
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.Mod> modProtos,
392-
List<com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ColumnMetadata>
393-
columnMetadataProtos) {
394-
List<Mod> mods = new ArrayList<>();
395-
for (com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.Mod modProto : modProtos) {
396-
final String keysJson =
397-
convertModValueProtosToJson(modProto.getKeysList(), columnMetadataProtos);
398-
final String oldValuesJson =
399-
convertModValueProtosToJson(modProto.getOldValuesList(), columnMetadataProtos);
400-
final String newValuesJson =
401-
convertModValueProtosToJson(modProto.getNewValuesList(), columnMetadataProtos);
402-
Mod mod = new Mod(keysJson, oldValuesJson, newValuesJson);
403-
mods.add(mod);
404-
}
405-
return mods;
406-
}
407-
408-
ModType parseProtoModType(
409-
com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType modTypeProto) {
410-
if (modTypeProto == com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.INSERT) {
411-
return ModType.INSERT;
412-
} else if (modTypeProto
413-
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.UPDATE) {
414-
return ModType.UPDATE;
415-
} else if (modTypeProto
416-
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ModType.DELETE) {
417-
return ModType.DELETE;
418-
}
419-
return ModType.UNKNOWN;
420-
}
421-
422-
ValueCaptureType parseProtoValueCaptureType(
423-
com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType
424-
valueCaptureTypeProto) {
425-
if (valueCaptureTypeProto
426-
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType.NEW_ROW) {
427-
return ValueCaptureType.NEW_ROW;
428-
} else if (valueCaptureTypeProto
429-
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType.NEW_VALUES) {
430-
return ValueCaptureType.NEW_VALUES;
431-
} else if (valueCaptureTypeProto
432-
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType
433-
.OLD_AND_NEW_VALUES) {
434-
return ValueCaptureType.OLD_AND_NEW_VALUES;
435-
} else if (valueCaptureTypeProto
436-
== com.google.spanner.v1.ChangeStreamRecord.DataChangeRecord.ValueCaptureType
437-
.NEW_ROW_AND_OLD_VALUES) {
438-
return ValueCaptureType.NEW_ROW_AND_OLD_VALUES;
439-
}
440-
return ValueCaptureType.UNKNOWN;
441-
}
442-
443232
Stream<ChangeStreamRecord> toChangeStreamRecord(
444233
PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata resultSetMetadata) {
445234

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/model/DataChangeRecord.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ public String toString() {
288288
+ '\''
289289
+ ", isSystemTransaction="
290290
+ isSystemTransaction
291-
+ ", metadata="
291+
+ ", metadata"
292292
+ metadata
293293
+ '}';
294294
}

0 commit comments

Comments
 (0)