Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.ververica.cdc.debezium;

import com.ververica.cdc.debezium.utils.SourceRecordUtil;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
Expand Down Expand Up @@ -44,35 +45,53 @@ public class JsonDebeziumDeserializationSchema implements DebeziumDeserializatio
* schema in messages.
*/
private final Boolean includeSchema;
private final int timeZoneOffset;

/** The custom configurations for {@link JsonConverter}. */
/**
* The custom configurations for {@link JsonConverter}.
*/
private Map<String, Object> customConverterConfigs;

public JsonDebeziumDeserializationSchema() {
this(false);
}

public JsonDebeziumDeserializationSchema(Boolean includeSchema) {
this(includeSchema, 8);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @youyangkou for the contribution, but the PR looks like work in a wrong way.
Firstly, the timeZoneOffset 8 works is that you happen to be in GMT+8 timezone, for other timezone it doesn't work.
Secondly, Not all source need consider the timezone issue, for example, pg doesn't exist the timezone issue, and even in MySQL, only the record from changelog need consider the timezone offset.

}

public JsonDebeziumDeserializationSchema(Boolean includeSchema, int timeZoneOffset) {
this.includeSchema = includeSchema;
this.timeZoneOffset = timeZoneOffset;
}

public JsonDebeziumDeserializationSchema(
Boolean includeSchema, Map<String, Object> customConverterConfigs) {
this(includeSchema, customConverterConfigs, 8);
}

public JsonDebeziumDeserializationSchema(
Boolean includeSchema, Map<String, Object> customConverterConfigs, int timeZoneOffset) {
this.includeSchema = includeSchema;
this.customConverterConfigs = customConverterConfigs;
this.timeZoneOffset = timeZoneOffset;
}

@Override
public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
public void deserialize(SourceRecord record, Collector<String> out) {
if (jsonConverter == null) {
initializeJsonConverter();
}
// correct the record time zone
SourceRecord sourceRecord = SourceRecordUtil.correctTimeZoneSourceRecord(record, timeZoneOffset);
byte[] bytes =
jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
jsonConverter.fromConnectData(sourceRecord.topic(), sourceRecord.valueSchema(), sourceRecord.value());
out.collect(new String(bytes));
}

/** Initialize {@link JsonConverter} with given configs. */
/**
* Initialize {@link JsonConverter} with given configs.
*/
private void initializeJsonConverter() {
jsonConverter = new JsonConverter();
final HashMap<String, Object> configs = new HashMap<>(2);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.ververica.cdc.debezium;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.ververica.cdc.debezium.utils.SourceRecordUtil;
import javafx.util.Pair;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.ConverterConfig;
import org.apache.kafka.connect.storage.ConverterType;

import java.util.HashMap;
import java.util.Map;


public class PairDebeziumDeserializationSchema implements DebeziumDeserializationSchema<Pair> {

private transient ObjectMapper objectMapper;
private transient JsonConverter valueConverter;
private transient JsonConverter keyConverter;
private transient boolean initialized;
private final int timeZoneOffset;

public PairDebeziumDeserializationSchema() {
this.timeZoneOffset = 8;
}

public PairDebeziumDeserializationSchema(int timeZoneOffset) {
this.timeZoneOffset = timeZoneOffset;
}

@Override
public void deserialize(SourceRecord record, Collector<Pair> collector) throws Exception {
if (!initialized) {
initialize();
}

SourceRecord sourceRecord = SourceRecordUtil.correctTimeZoneSourceRecord(record, timeZoneOffset);

byte[] value = valueConverter.fromConnectData(
sourceRecord.topic(), sourceRecord.valueSchema(), sourceRecord.value());

if (sourceRecord.key() == null) {
collector.collect(new Pair(null, objectMapper.readTree(value)));
} else {
byte[] key =
keyConverter.fromConnectData(
sourceRecord.topic(), sourceRecord.keySchema(), sourceRecord.key());
collector.collect(new Pair(objectMapper.readTree(key), objectMapper.readTree(value)));
}
}

private void initialize() {
initialized = true;
objectMapper = new ObjectMapper();

valueConverter = new JsonConverter();
Map<String, Object> valueConfigs = new HashMap<>(2);
valueConfigs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
valueConfigs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false);
valueConverter.configure(valueConfigs);

keyConverter = new JsonConverter();
Map<String, Object> keyConfigs = new HashMap<>(2);
keyConfigs.put(ConverterConfig.TYPE_CONFIG, ConverterType.KEY.getName());
keyConfigs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false);
keyConverter.configure(keyConfigs);
}

@Override
public TypeInformation<Pair> getProducedType() {
return TypeInformation.of(Pair.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.ververica.cdc.debezium;

import com.ververica.cdc.debezium.utils.SourceRecordUtil;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
Expand All @@ -28,10 +29,20 @@
*/
public class StringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
private static final long serialVersionUID = -3168848963265670603L;
private final int timeZoneOffset;

public StringDebeziumDeserializationSchema() {
this.timeZoneOffset = 8;
}

public StringDebeziumDeserializationSchema(int timeZoneOffset) {
this.timeZoneOffset = timeZoneOffset;
}

@Override
public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
out.collect(record.toString());
public void deserialize(SourceRecord record, Collector<String> out) {
SourceRecord sourceRecord = SourceRecordUtil.correctTimeZoneSourceRecord(record, timeZoneOffset);
out.collect(sourceRecord.toString());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.ververica.cdc.debezium.utils;

import io.debezium.data.Envelope;
import io.debezium.time.Timestamp;
import io.debezium.time.ZonedTimestamp;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.time.LocalDateTime;
import java.time.ZoneOffset;

/**
* Utilities to {@link SourceRecord}.
*/
public class SourceRecordUtil {

private SourceRecordUtil() {
}

/**
* Return {@link SourceRecord} which correct the record time zone
*/
public static SourceRecord correctTimeZoneSourceRecord(SourceRecord record, int timeZoneOffset) {
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();

if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
Struct after = extractAfterStruct(value, valueSchema, timeZoneOffset);
value.put(Envelope.FieldName.AFTER, after);
} else if (op == Envelope.Operation.DELETE) {
Struct before = extractBeforeStruct(value, valueSchema, timeZoneOffset);
value.put(Envelope.FieldName.BEFORE, before);
} else {
Struct before = extractBeforeStruct(value, valueSchema, timeZoneOffset);
value.put(Envelope.FieldName.BEFORE, before);
Struct after = extractAfterStruct(value, valueSchema, timeZoneOffset);
value.put(Envelope.FieldName.AFTER, after);
}

SourceRecord sourceRecord = new SourceRecord(record.sourcePartition(), record.sourceOffset(), record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), valueSchema, value);
return sourceRecord;
}

private static Struct extractBeforeStruct(Struct value, Schema valueSchema, int timeZoneOffset) {
Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();
Struct beforeStruct = value.getStruct(Envelope.FieldName.BEFORE);
for (Field field : beforeSchema.fields()) {
beforeStruct = convertTimestamp(beforeStruct, field, timeZoneOffset);
}
return beforeStruct;
}

private static Struct extractAfterStruct(Struct value, Schema valueSchema, int timeZoneOffset) {
Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
Struct afterStruct = value.getStruct(Envelope.FieldName.AFTER);
for (Field field : afterSchema.fields()) {
afterStruct = convertTimestamp(afterStruct, field, timeZoneOffset);
}
return afterStruct;
}

private static Struct convertTimestamp(Struct struct, Field field, int timeZoneOffset) {
if (struct.get(field) != null) {
//DATETIME TYPE
if (Timestamp.SCHEMA_NAME.equals(field.schema().name())) {
struct.put(field, (Long) struct.get(field) - ZoneOffset.ofHours(timeZoneOffset).getTotalSeconds() * 1000);
}
//TIMESTAMP TYPE
if (ZonedTimestamp.SCHEMA_NAME.equals(field.schema().name())) {
if (struct.get(field) instanceof String) {
LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(struct.get(field), ZoneOffset.UTC);
String timeStamp = localDateTime.toInstant(ZoneOffset.ofHours(-timeZoneOffset)).toString();
struct.put(field, timeStamp);
}
}
}
return struct;
}
}