Skip to content

Commit 8434500

Browse files
authored
[fix][client] Add classLoader field for SchemaDefinition (apache#15915)
Fixes apache#15899 ### Motivation Now, don‘t register logical type conversions when use `SchemaDefinition.<T>builder().withJsonDef()` beacase it without classLoader param. See: https://github.com/apache/pulsar/blob/04aa9e8e51869d1621a7e25402a656084eebfc09/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java#L58-L68 We can add the classLoader field for `SchemaDefinition`, user can manually pass a classLoader to register logical type conversions ### Modifications Add classLoader field for `SchemaDefinition`
1 parent c13d1c7 commit 8434500

File tree

7 files changed

+122
-10
lines changed

7 files changed

+122
-10
lines changed

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ static <T> SchemaDefinitionBuilder<T> builder() {
7575
*/
7676
Class<T> getPojo();
7777

78+
/**
79+
* Get pojo classLoader.
80+
*
81+
* @return pojo schema
82+
*/
83+
ClassLoader getClassLoader();
84+
7885
/**
7986
* Get supportSchemaVersioning schema definition.
8087
*

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,15 @@ public interface SchemaDefinitionBuilder<T> {
8080
*/
8181
SchemaDefinitionBuilder<T> withPojo(Class pojo);
8282

83+
/**
84+
* Set schema of pojo classLoader.
85+
*
86+
* @param classLoader pojo classLoader
87+
*
88+
* @return schema definition builder
89+
*/
90+
SchemaDefinitionBuilder<T> withClassLoader(ClassLoader classLoader);
91+
8392
/**
8493
* Set schema of json definition.
8594
*

pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,12 @@ public static <T> AvroSchema<T> of(SchemaDefinition<T> schemaDefinition) {
8989
schemaDefinition.getSchemaWriterOpt().get(), parseSchemaInfo(schemaDefinition, SchemaType.AVRO));
9090
}
9191
ClassLoader pojoClassLoader = null;
92-
if (schemaDefinition.getPojo() != null) {
92+
if (schemaDefinition.getClassLoader() != null) {
93+
pojoClassLoader = schemaDefinition.getClassLoader();
94+
} else if (schemaDefinition.getPojo() != null) {
9395
pojoClassLoader = schemaDefinition.getPojo().getClassLoader();
9496
}
97+
9598
return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO), pojoClassLoader);
9699
}
97100

pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T
4040
*/
4141
private Class<T> clazz;
4242

43+
/**
44+
* the classLoader definition class.
45+
*/
46+
private ClassLoader classLoader;
47+
4348
/**
4449
* The flag of schema type always allow null.
4550
*
@@ -100,6 +105,12 @@ public SchemaDefinitionBuilder<T> withPojo(Class clazz) {
100105
return this;
101106
}
102107

108+
@Override
109+
public SchemaDefinitionBuilder<T> withClassLoader(ClassLoader classLoader) {
110+
this.classLoader = classLoader;
111+
return this;
112+
}
113+
103114
@Override
104115
public SchemaDefinitionBuilder<T> withJsonDef(String jsonDef) {
105116
this.jsonDef = jsonDef;
@@ -149,8 +160,8 @@ public SchemaDefinition<T> build() {
149160

150161
properties.put(ALWAYS_ALLOW_NULL, String.valueOf(this.alwaysAllowNull));
151162
properties.put(JSR310_CONVERSION_ENABLED, String.valueOf(this.jsr310ConversionEnabled));
152-
return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties, supportSchemaVersioning,
153-
jsr310ConversionEnabled, reader, writer);
163+
return new SchemaDefinitionImpl(clazz, jsonDef, classLoader,
164+
alwaysAllowNull, properties, supportSchemaVersioning, jsr310ConversionEnabled, reader, writer);
154165

155166
}
156167
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T> {
5050

5151
private final String jsonDef;
5252

53+
private final ClassLoader classLoader;
54+
5355
private final boolean supportSchemaVersioning;
5456

5557
private final boolean jsr310ConversionEnabled;
@@ -58,13 +60,15 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T> {
5860

5961
private final SchemaWriter<T> writer;
6062

61-
public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, boolean alwaysAllowNull, Map<String, String> properties,
63+
public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, ClassLoader classLoader,
64+
boolean alwaysAllowNull, Map<String, String> properties,
6265
boolean supportSchemaVersioning, boolean jsr310ConversionEnabled,
6366
SchemaReader<T> reader, SchemaWriter<T> writer) {
6467
this.alwaysAllowNull = alwaysAllowNull;
6568
this.properties = properties;
6669
this.jsonDef = jsonDef;
6770
this.pojo = pojo;
71+
this.classLoader = classLoader;
6872
this.supportSchemaVersioning = supportSchemaVersioning;
6973
this.jsr310ConversionEnabled = jsr310ConversionEnabled;
7074
this.reader = reader;
@@ -104,6 +108,11 @@ public Class<T> getPojo() {
104108
return pojo;
105109
}
106110

111+
@Override
112+
public ClassLoader getClassLoader() {
113+
return this.classLoader;
114+
}
115+
107116
@Override
108117
public boolean getSupportSchemaVersioning() {
109118
return supportSchemaVersioning;

pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@
3636
import java.time.temporal.ChronoUnit;
3737
import java.util.Arrays;
3838
import java.util.UUID;
39+
import lombok.AllArgsConstructor;
3940
import lombok.Data;
41+
import lombok.NoArgsConstructor;
4042
import lombok.extern.slf4j.Slf4j;
4143
import org.apache.avro.Schema;
4244
import org.apache.avro.SchemaValidationException;
@@ -463,21 +465,88 @@ public void testAvroBigDecimal() {
463465

464466

465467
@Data
466-
private static class TimestampStruct {
468+
@AllArgsConstructor
469+
@NoArgsConstructor
470+
private static class TimestampPojo {
467471
Instant value;
468472
}
469473

470474
@Test
471475
public void testTimestampWithJsr310Conversion() {
472-
AvroSchema<TimestampStruct> schema = AvroSchema.of(TimestampStruct.class);
476+
AvroSchema<TimestampPojo> schema = AvroSchema.of(TimestampPojo.class);
473477
Assert.assertEquals(
474478
schema.getAvroSchema().getFields().get(0).schema().getTypes().get(1).getLogicalType().getName(),
475479
new TimeConversions.TimestampMicrosConversion().getLogicalTypeName());
476480

477-
AvroSchema<TimestampStruct> schema2 = AvroSchema.of(SchemaDefinition.<TimestampStruct>builder()
478-
.withPojo(TimestampStruct.class).withJSR310ConversionEnabled(true).build());
481+
AvroSchema<TimestampPojo> schema2 = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
482+
.withPojo(TimestampPojo.class).withJSR310ConversionEnabled(true).build());
479483
Assert.assertEquals(
480484
schema2.getAvroSchema().getFields().get(0).schema().getTypes().get(1).getLogicalType().getName(),
481485
new TimeConversions.TimestampMillisConversion().getLogicalTypeName());
482486
}
487+
488+
@Test
489+
public void testTimestampWithJsonDef(){
490+
AvroSchema<TimestampPojo> schemaWithPojo = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
491+
.withPojo(TimestampPojo.class)
492+
.withJSR310ConversionEnabled(false).build());
493+
494+
TimestampPojo timestampPojo = new TimestampPojo(Instant.parse("2022-06-10T12:38:59.039084Z"));
495+
byte[] encode = schemaWithPojo.encode(timestampPojo);
496+
TimestampPojo decodeWithPojo = schemaWithPojo.decode(encode);
497+
498+
Assert.assertEquals(decodeWithPojo, timestampPojo);
499+
500+
String schemaDefinition = new String(schemaWithPojo.schemaInfo.getSchema());
501+
AvroSchema<TimestampPojo> schemaWithJsonDef = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
502+
.withJsonDef(schemaDefinition)
503+
.withClassLoader(TimestampPojo.class.getClassLoader())
504+
.withJSR310ConversionEnabled(false).build());
505+
506+
TimestampPojo decodeWithJson = schemaWithJsonDef.decode(encode);
507+
508+
Assert.assertEquals(decodeWithJson, decodeWithPojo);
509+
Assert.assertEquals(Instant.class, decodeWithJson.getValue().getClass());
510+
511+
AvroSchema<TimestampPojo> schemaWithJsonDefNoClassLoader = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
512+
.withJsonDef(schemaDefinition)
513+
.withJSR310ConversionEnabled(false).build());
514+
515+
TimestampPojo decodeWithJsonNoClassLoader = schemaWithJsonDefNoClassLoader.decode(encode);
516+
Assert.assertNotEquals(decodeWithJsonNoClassLoader, decodeWithPojo);
517+
Assert.assertNotEquals(Instant.class, decodeWithJsonNoClassLoader.getValue().getClass());
518+
}
519+
520+
@Test
521+
public void testTimestampWithJsonDefAndJSR310ConversionEnabled(){
522+
AvroSchema<TimestampPojo> schemaWithPojo = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
523+
.withPojo(TimestampPojo.class)
524+
.withJSR310ConversionEnabled(true).build());
525+
526+
TimestampPojo timestampPojo = new TimestampPojo(Instant.parse("2022-06-10T12:38:59.039084Z"));
527+
byte[] encode = schemaWithPojo.encode(timestampPojo);
528+
TimestampPojo decodeWithPojo = schemaWithPojo.decode(encode);
529+
530+
Assert.assertNotEquals(decodeWithPojo, timestampPojo);
531+
532+
String schemaDefinition = new String(schemaWithPojo.schemaInfo.getSchema());
533+
AvroSchema<TimestampPojo> schemaWithJsonDef = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
534+
.withJsonDef(schemaDefinition)
535+
.withClassLoader(TimestampPojo.class.getClassLoader())
536+
.withJSR310ConversionEnabled(true).build());
537+
538+
TimestampPojo decodeWithJson = schemaWithJsonDef.decode(encode);
539+
540+
Assert.assertEquals(decodeWithJson, decodeWithPojo);
541+
Assert.assertEquals(Instant.class, decodeWithJson.getValue().getClass());
542+
543+
AvroSchema<TimestampPojo> schemaWithJsonDefNoClassLoader = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
544+
.withJsonDef(schemaDefinition)
545+
.withJSR310ConversionEnabled(true).build());
546+
547+
TimestampPojo decodeWithJsonNoClassLoader = schemaWithJsonDefNoClassLoader.decode(encode);
548+
Assert.assertNotEquals(decodeWithJsonNoClassLoader, decodeWithPojo);
549+
Assert.assertNotEquals(Instant.class, decodeWithJsonNoClassLoader.getValue().getClass());
550+
}
551+
483552
}

pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,15 @@
2020

2121
import static java.nio.charset.StandardCharsets.UTF_8;
2222
import static org.testng.Assert.assertEquals;
23-
2423
import lombok.Data;
2524
import org.apache.avro.reflect.Nullable;
2625
import org.apache.pulsar.client.api.Schema;
27-
import org.apache.pulsar.client.api.schema.*;
26+
import org.apache.pulsar.client.api.schema.GenericRecord;
27+
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
28+
import org.apache.pulsar.client.api.schema.GenericSchema;
29+
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
30+
import org.apache.pulsar.client.api.schema.SchemaBuilder;
31+
import org.apache.pulsar.client.api.schema.SchemaDefinition;
2832
import org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader;
2933
import org.apache.pulsar.common.schema.SchemaInfo;
3034
import org.apache.pulsar.common.schema.SchemaType;

0 commit comments

Comments
 (0)