Skip to content

Commit 854716f

Browse files
Add Joda time logical type conversion. (#6704)
### Motivation After upgrade to Apache Avro 1.9.x, the default time conversion changed to JSR-310. For forwarding compatibility, we'd better add the Joda time conversion. related to #5938 ### Modifications Add joda time conversions ### Verifying this change New integration test added
1 parent 364876b commit 854716f

File tree

20 files changed

+313
-75
lines changed

20 files changed

+313
-75
lines changed

.github/workflows/ci-integration-process.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ jobs:
5252
- name: clean disk
5353
if: steps.docs.outputs.changed_only == 'no'
5454
run: |
55-
sudo swapoff -a
56-
sudo rm -f /swapfile
55+
sudo swapoff /swapfile
56+
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
5757
sudo apt clean
5858
docker rmi $(docker images -q) -f
5959
df -h

.github/workflows/ci-integration-thread.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ jobs:
5252
- name: clean disk
5353
if: steps.docs.outputs.changed_only == 'no'
5454
run: |
55-
sudo swapoff -a
56-
sudo rm -f /swapfile
55+
sudo swapoff /swapfile
56+
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
5757
sudo apt clean
5858
docker rmi $(docker images -q) -f
5959
df -h

.github/workflows/ci-unit-broker.yml

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,21 +49,20 @@ jobs:
4949
with:
5050
java-version: 1.8
5151

52+
- name: Set up Maven
53+
uses: apache/pulsar-test-infra/setup-maven@master
54+
if: steps.docs.outputs.changed_only == 'no'
55+
with:
56+
maven-version: 3.6.1
57+
5258
- name: clean disk
5359
if: steps.docs.outputs.changed_only == 'no'
5460
run: |
55-
sudo swapoff -a
56-
sudo rm -f /swapfile
61+
sudo swapoff /swapfile
62+
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
5763
sudo apt clean
5864
docker rmi $(docker images -q) -f
5965
df -h
60-
free -h
61-
62-
- name: Set up Maven
63-
uses: apache/pulsar-test-infra/setup-maven@master
64-
if: steps.docs.outputs.changed_only == 'no'
65-
with:
66-
maven-version: 3.6.1
6766
6867
- name: run unit tests install by skip tests
6968
if: steps.docs.outputs.changed_only == 'no'

.github/workflows/ci-unit.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,15 @@ jobs:
5555
with:
5656
maven-version: 3.6.1
5757

58+
- name: clean disk
59+
if: steps.docs.outputs.changed_only == 'no'
60+
run: |
61+
sudo swapoff /swapfile
62+
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
63+
sudo apt clean
64+
docker rmi $(docker images -q) -f
65+
df -h
66+
5867
- name: run unit tests install by skip tests
5968
if: steps.docs.outputs.changed_only == 'no'
6069
run: mvn clean install -DskipTests

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ static <T> SchemaDefinitionBuilder<T> builder() {
4242
*/
4343
boolean getAlwaysAllowNull();
4444

45+
/**
46+
* Get JSR310 conversion enabled.
47+
*
48+
* @return return true if enable JSR310 conversion. false means use Joda time conversion.
49+
*/
50+
boolean isJsr310ConversionEnabled();
51+
4552
/**
4653
* Get schema class.
4754
*

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,22 @@ public interface SchemaDefinitionBuilder<T> {
3535
*/
3636
SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull);
3737

38+
/**
39+
* Set schema use JRS310 conversion or not.
40+
*
41+
* <p>Before Avro 1.9 the Joda time library was used for handling the logical date(time) values.
42+
* But since the introduction of Java8 the Java Specification Request (JSR) 310 has been included,
43+
* which greatly improves the handling of date and time natively. To keep forwarding compatibility,
44+
* default is use Joda time conversion.
45+
*
46+
* <p>JSR310 conversion is recommended here. Joda time conversion is has been marked deprecated.
47+
* In future versions, joda time conversion may be removed
48+
*
49+
* @param jsr310ConversionEnabled use JRS310 conversion or not, default is false for keep forwarding compatibility
50+
* @return schema definition builder
51+
*/
52+
SchemaDefinitionBuilder<T> withJSR310ConversionEnabled(boolean jsr310ConversionEnabled);
53+
3854
/**
3955
* Set schema info properties.
4056
*

pulsar-client-messagecrypto-bc/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,5 @@
4646
<artifactId>bouncy-castle-bc-shaded</artifactId>
4747
<version>${project.parent.version}</version>
4848
</dependency>
49-
5049
</dependencies>
5150
</project>

pulsar-client/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,12 @@
129129
</exclusions>
130130
</dependency>
131131

132+
<dependency>
133+
<groupId>joda-time</groupId>
134+
<artifactId>joda-time</artifactId>
135+
<scope>provided</scope>
136+
</dependency>
137+
132138
<dependency>
133139
<groupId>com.google.protobuf</groupId>
134140
<artifactId>protobuf-java</artifactId>

pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import lombok.extern.slf4j.Slf4j;
2222
import org.apache.avro.Conversions;
23+
import org.apache.avro.data.JodaTimeConversions;
2324
import org.apache.avro.data.TimeConversions;
2425
import org.apache.avro.reflect.ReflectData;
2526
import org.apache.pulsar.client.api.Schema;
@@ -42,36 +43,14 @@
4243
public class AvroSchema<T> extends StructSchema<T> {
4344
private static final Logger LOG = LoggerFactory.getLogger(AvroSchema.class);
4445

45-
// the aim to fix avro's bug
46-
// https://issues.apache.org/jira/browse/AVRO-1891 bug address explain
47-
// fix the avro logical type read and write
48-
static {
49-
ReflectData reflectDataAllowNull = ReflectData.AllowNull.get();
50-
51-
reflectDataAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
52-
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
53-
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
54-
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
55-
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
56-
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
57-
58-
ReflectData reflectDataNotAllowNull = ReflectData.get();
59-
60-
reflectDataNotAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
61-
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
62-
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
63-
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
64-
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
65-
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
66-
}
67-
6846
private ClassLoader pojoClassLoader;
6947

7048
private AvroSchema(SchemaInfo schemaInfo, ClassLoader pojoClassLoader) {
7149
super(schemaInfo);
7250
this.pojoClassLoader = pojoClassLoader;
73-
setReader(new AvroReader<>(schema, pojoClassLoader));
74-
setWriter(new AvroWriter<>(schema));
51+
boolean jsr310ConversionEnabled = getJsr310ConversionEnabledFromSchemaInfo(schemaInfo);
52+
setReader(new AvroReader<>(schema, pojoClassLoader, jsr310ConversionEnabled));
53+
setWriter(new AvroWriter<>(schema, jsr310ConversionEnabled));
7554
}
7655

7756
@Override
@@ -116,7 +95,8 @@ protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
11695
log.info("Load schema reader for version({}), schema is : {}, schemaInfo: {}",
11796
SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
11897
schemaInfo.getSchemaDefinition(), schemaInfo.toString());
119-
return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema, pojoClassLoader);
98+
boolean jsr310ConversionEnabled = getJsr310ConversionEnabledFromSchemaInfo(schemaInfo);
99+
return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema, pojoClassLoader, jsr310ConversionEnabled);
120100
} else {
121101
log.warn("No schema found for version({}), use latest schema : {}",
122102
SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
@@ -125,4 +105,30 @@ protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
125105
}
126106
}
127107

108+
private static boolean getJsr310ConversionEnabledFromSchemaInfo(SchemaInfo schemaInfo) {
109+
if (schemaInfo != null) {
110+
return Boolean.parseBoolean(schemaInfo.getProperties()
111+
.getOrDefault(SchemaDefinitionBuilderImpl.JSR310_CONVERSION_ENABLED, "false"));
112+
}
113+
return false;
114+
}
115+
116+
public static void addLogicalTypeConversions(ReflectData reflectData, boolean jsr310ConversionEnabled) {
117+
reflectData.addLogicalTypeConversion(new Conversions.DecimalConversion());
118+
reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion());
119+
reflectData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
120+
reflectData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
121+
reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
122+
if (jsr310ConversionEnabled) {
123+
reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
124+
} else {
125+
try {
126+
Class.forName("org.joda.time.DateTime");
127+
reflectData.addLogicalTypeConversion(new JodaTimeConversions.TimestampConversion());
128+
} catch (ClassNotFoundException e) {
129+
// Skip if have not provide joda-time dependency.
130+
}
131+
}
132+
}
133+
128134
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,13 @@
3333
public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T> {
3434

3535
public static final String ALWAYS_ALLOW_NULL = "__alwaysAllowNull";
36+
public static final String JSR310_CONVERSION_ENABLED = "__jsr310ConversionEnabled";
3637

3738
/**
3839
* the schema definition class
3940
*/
4041
private Class<T> clazz;
42+
4143
/**
4244
* The flag of schema type always allow null
4345
*
@@ -48,6 +50,13 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T
4850
*/
4951
private boolean alwaysAllowNull = true;
5052

53+
/**
54+
* The flag of use JSR310 conversion or Joda time conversion.
55+
*
56+
* If value is true, use JSR310 conversion in the Avro schema. Otherwise, use Joda time conversion.
57+
*/
58+
private boolean jsr310ConversionEnabled = false;
59+
5160
/**
5261
* The schema info properties
5362
*/
@@ -69,6 +78,12 @@ public SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull) {
6978
return this;
7079
}
7180

81+
@Override
82+
public SchemaDefinitionBuilder<T> withJSR310ConversionEnabled(boolean jsr310ConversionEnabled) {
83+
this.jsr310ConversionEnabled = jsr310ConversionEnabled;
84+
return this;
85+
}
86+
7287
@Override
7388
public SchemaDefinitionBuilder<T> addProperty(String key, String value) {
7489
this.properties.put(key, value);
@@ -107,8 +122,10 @@ public SchemaDefinition<T> build() {
107122
checkArgument(!(StringUtils.isNotBlank(jsonDef) && clazz != null),
108123
"Not allowed to set pojo and jsonDef both for the schema definition.");
109124

110-
properties.put(ALWAYS_ALLOW_NULL, this.alwaysAllowNull ? "true" : "false");
111-
return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties, supportSchemaVersioning);
125+
properties.put(ALWAYS_ALLOW_NULL, String.valueOf(this.alwaysAllowNull));
126+
properties.put(JSR310_CONVERSION_ENABLED, String.valueOf(this.jsr310ConversionEnabled));
127+
return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties, supportSchemaVersioning,
128+
jsr310ConversionEnabled);
112129

113130
}
114131
}

0 commit comments

Comments
 (0)