Skip to content

Commit 08d89a2

Browse files
[improve][broker] PIP-464: Strict Avro schema validation for SchemaType.JSON (#25362)
1 parent 78281fd commit 08d89a2

11 files changed

Lines changed: 548 additions & 32 deletions

File tree

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3423,6 +3423,16 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
34233423
)
34243424
private SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.FULL;
34253425

3426+
@FieldContext(
3427+
category = CATEGORY_SCHEMA,
3428+
doc = "Whether to allow legacy Jackson JsonSchema format for SchemaType.JSON schema definitions. "
3429+
+ "When false (default), only valid Apache Avro schema format is accepted for SchemaType.JSON, "
3430+
+ "consistent with what the consumer side requires. When true, the pre-2.1 backward-compatible "
3431+
+ "behavior is preserved for deployments that still have topics with legacy-format schemas. "
3432+
+ "See PIP-464 for details."
3433+
)
3434+
private boolean schemaJsonAllowLegacyJacksonFormat = false;
3435+
34263436
/**** --- WebSocket. --- ****/
34273437
@FieldContext(
34283438
category = CATEGORY_WEBSOCKET,

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheck.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,26 +36,36 @@
3636
@SuppressWarnings("unused")
3737
public class JsonSchemaCompatibilityCheck extends AvroSchemaBasedCompatibilityCheck {
3838

39+
private volatile boolean allowLegacyJacksonFormat = false;
40+
3941
@Override
4042
public SchemaType getSchemaType() {
4143
return SchemaType.JSON;
4244
}
4345

46+
/**
47+
* Set whether to allow legacy Jackson JsonSchema format for backward compatibility.
48+
* When false (default), only valid Avro schema format is accepted (PIP-464).
49+
*/
50+
public void setAllowLegacyJacksonFormat(boolean allowLegacyJacksonFormat) {
51+
this.allowLegacyJacksonFormat = allowLegacyJacksonFormat;
52+
}
53+
4454
@Override
4555
public void checkCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy)
4656
throws IncompatibleSchemaException {
4757
if (isAvroSchema(from)) {
4858
if (isAvroSchema(to)) {
4959
// if both producer and broker have the schema in avro format
5060
super.checkCompatible(from, to, strategy);
51-
} else if (isJsonSchema(to)) {
61+
} else if (allowLegacyJacksonFormat && isJsonSchema(to)) {
5262
// if broker have the schema in avro format but producer sent a schema in the old json format
53-
// allow old schema format for backwards compatibility
63+
// allow old schema format for backwards compatibility (only when legacy format is enabled)
5464
} else {
55-
// unknown schema format
56-
throw new IncompatibleSchemaException("Unknown schema format");
65+
throw new IncompatibleSchemaException(
66+
"Incompatible schema: expected Avro schema format for SchemaType.JSON");
5767
}
58-
} else if (isJsonSchema(from)){
68+
} else if (allowLegacyJacksonFormat && isJsonSchema(from)) {
5969

6070
if (isAvroSchema(to)) {
6171
// if broker have the schema in old json format but producer sent a schema in the avro format
@@ -64,9 +74,14 @@ public void checkCompatible(SchemaData from, SchemaData to, SchemaCompatibilityS
6474
// if both producer and broker have the schema in old json format
6575
isCompatibleJsonSchema(from, to);
6676
} else {
67-
// unknown schema format
68-
throw new IncompatibleSchemaException("Unknown schema format");
77+
throw new IncompatibleSchemaException(
78+
"Incompatible schema: expected Avro schema format for SchemaType.JSON");
6979
}
80+
} else if (!allowLegacyJacksonFormat && !isAvroSchema(from)) {
81+
// When legacy format is disabled, the existing schema must be valid Avro.
82+
// If it's not, this is a defense-in-depth rejection (PIP-464).
83+
throw new IncompatibleSchemaException(
84+
"Incompatible schema: existing schema is not in valid Avro format for SchemaType.JSON");
7085
} else {
7186
// broker has schema format with unknown format
7287
// maybe corrupted?

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,19 @@ static SchemaRegistryService create(SchemaStorage schemaStorage, Set<String> sch
4949
try {
5050
Map<SchemaType, SchemaCompatibilityCheck> checkers = getCheckers(schemaRegistryCompatibilityCheckers);
5151
checkers.put(SchemaType.KEY_VALUE, new KeyValueSchemaCompatibilityCheck(checkers));
52+
53+
// PIP-464: propagate schemaJsonAllowLegacyJacksonFormat to JsonSchemaCompatibilityCheck
54+
boolean allowLegacyJacksonFormat =
55+
pulsarService.getConfiguration().isSchemaJsonAllowLegacyJacksonFormat();
56+
SchemaCompatibilityCheck jsonCheck = checkers.get(SchemaType.JSON);
57+
if (jsonCheck instanceof JsonSchemaCompatibilityCheck) {
58+
((JsonSchemaCompatibilityCheck) jsonCheck)
59+
.setAllowLegacyJacksonFormat(allowLegacyJacksonFormat);
60+
}
61+
5262
return SchemaRegistryServiceWithSchemaDataValidator.of(
53-
new SchemaRegistryServiceImpl(schemaStorage, checkers, pulsarService));
63+
new SchemaRegistryServiceImpl(schemaStorage, checkers, pulsarService),
64+
allowLegacyJacksonFormat);
5465
} catch (Exception e) {
5566
LOG.warn("Unable to create schema registry storage, defaulting to empty storage", e);
5667
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,30 @@ public interface SchemaDataValidator {
3434

3535
/**
3636
* Validate if the schema data is well formed.
37+
* Uses strict Avro-only validation for SchemaType.JSON (no legacy Jackson fallback).
3738
*
3839
* @param schemaData schema data to validate
3940
* @throws InvalidSchemaDataException if the schema data is not in a valid form.
4041
*/
4142
static void validateSchemaData(SchemaData schemaData) throws InvalidSchemaDataException {
43+
validateSchemaData(schemaData, false);
44+
}
45+
46+
/**
47+
* Validate if the schema data is well formed.
48+
*
49+
* @param schemaData schema data to validate
50+
* @param allowLegacyJacksonFormat if true, allows legacy Jackson JsonSchema format for SchemaType.JSON
51+
* for backward compatibility with pre-2.1 schemas (PIP-464)
52+
* @throws InvalidSchemaDataException if the schema data is not in a valid form.
53+
*/
54+
static void validateSchemaData(SchemaData schemaData,
55+
boolean allowLegacyJacksonFormat) throws InvalidSchemaDataException {
4256
switch (schemaData.getType()) {
4357
case AVRO:
4458
case JSON:
4559
case PROTOBUF:
46-
StructSchemaDataValidator.of().validate(schemaData);
60+
StructSchemaDataValidator.of(allowLegacyJacksonFormat).validate(schemaData);
4761
break;
4862
case PROTOBUF_NATIVE:
4963
ProtobufNativeSchemaDataValidator.of().validate(schemaData);
@@ -80,8 +94,8 @@ static void validateSchemaData(SchemaData schemaData) throws InvalidSchemaDataEx
8094
case KEY_VALUE:
8195
KeyValue<SchemaData, SchemaData> kvSchema =
8296
KeyValueSchemaCompatibilityCheck.decodeKeyValueSchemaData(schemaData);
83-
validateSchemaData(kvSchema.getKey());
84-
validateSchemaData(kvSchema.getValue());
97+
validateSchemaData(kvSchema.getKey(), allowLegacyJacksonFormat);
98+
validateSchemaData(kvSchema.getValue(), allowLegacyJacksonFormat);
8599
break;
86100
default:
87101
throw new InvalidSchemaDataException("Unknown schema type : " + schemaData.getType());

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,21 @@
3333
public class SchemaRegistryServiceWithSchemaDataValidator implements SchemaRegistryService {
3434

3535
public static SchemaRegistryServiceWithSchemaDataValidator of(SchemaRegistryService service) {
36-
return new SchemaRegistryServiceWithSchemaDataValidator(service);
36+
return new SchemaRegistryServiceWithSchemaDataValidator(service, false);
37+
}
38+
39+
public static SchemaRegistryServiceWithSchemaDataValidator of(SchemaRegistryService service,
40+
boolean allowLegacyJacksonFormat) {
41+
return new SchemaRegistryServiceWithSchemaDataValidator(service, allowLegacyJacksonFormat);
3742
}
3843

3944
private final SchemaRegistryService service;
45+
private final boolean allowLegacyJacksonFormat;
4046

41-
private SchemaRegistryServiceWithSchemaDataValidator(SchemaRegistryService service) {
47+
private SchemaRegistryServiceWithSchemaDataValidator(SchemaRegistryService service,
48+
boolean allowLegacyJacksonFormat) {
4249
this.service = service;
50+
this.allowLegacyJacksonFormat = allowLegacyJacksonFormat;
4351
}
4452

4553
@Override
@@ -89,7 +97,7 @@ public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId,
8997
SchemaData schema,
9098
SchemaCompatibilityStrategy strategy) {
9199
try {
92-
SchemaDataValidator.validateSchemaData(schema);
100+
SchemaDataValidator.validateSchemaData(schema, allowLegacyJacksonFormat);
93101
} catch (InvalidSchemaDataException e) {
94102
return FutureUtil.failedFuture(e);
95103
}
@@ -115,7 +123,7 @@ public CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId, boo
115123
public CompletableFuture<Boolean> isCompatible(String schemaId, SchemaData schema,
116124
SchemaCompatibilityStrategy strategy) {
117125
try {
118-
SchemaDataValidator.validateSchemaData(schema);
126+
SchemaDataValidator.validateSchemaData(schema, allowLegacyJacksonFormat);
119127
} catch (InvalidSchemaDataException e) {
120128
return FutureUtil.failedFuture(e);
121129
}
@@ -127,7 +135,7 @@ public CompletableFuture<Void> checkCompatible(String schemaId,
127135
SchemaData schema,
128136
SchemaCompatibilityStrategy strategy) {
129137
try {
130-
SchemaDataValidator.validateSchemaData(schema);
138+
SchemaDataValidator.validateSchemaData(schema, allowLegacyJacksonFormat);
131139
} catch (InvalidSchemaDataException e) {
132140
return FutureUtil.failedFuture(e);
133141
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.io.IOException;
2525
import org.apache.avro.NameValidator;
2626
import org.apache.avro.Schema;
27-
import org.apache.avro.SchemaParseException;
2827
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
2928
import org.apache.pulsar.common.protocol.schema.SchemaData;
3029
import org.apache.pulsar.common.schema.SchemaType;
@@ -39,10 +38,21 @@ public static StructSchemaDataValidator of() {
3938
return INSTANCE;
4039
}
4140

42-
private static final StructSchemaDataValidator INSTANCE = new StructSchemaDataValidator();
41+
public static StructSchemaDataValidator of(boolean allowLegacyJacksonFormat) {
42+
return allowLegacyJacksonFormat ? LEGACY_INSTANCE : INSTANCE;
43+
}
44+
45+
// Default instance: strict Avro-only validation for SchemaType.JSON (PIP-464)
46+
private static final StructSchemaDataValidator INSTANCE = new StructSchemaDataValidator(false);
47+
// Legacy instance: allows Jackson JsonSchema fallback for backward compatibility
48+
private static final StructSchemaDataValidator LEGACY_INSTANCE = new StructSchemaDataValidator(true);
4349
public static final NameValidator COMPATIBLE_NAME_VALIDATOR = new CompatibleNameValidator();
4450

45-
private StructSchemaDataValidator() {}
51+
private final boolean allowLegacyJacksonFormat;
52+
53+
private StructSchemaDataValidator(boolean allowLegacyJacksonFormat) {
54+
this.allowLegacyJacksonFormat = allowLegacyJacksonFormat;
55+
}
4656

4757
private static final ObjectReader JSON_SCHEMA_READER =
4858
ObjectMapperFactory.getMapper().reader().forType(JsonSchema.class);
@@ -57,11 +67,14 @@ public void validate(SchemaData schemaData) throws InvalidSchemaDataException {
5767
if (SchemaType.AVRO.equals(schemaData.getType())) {
5868
checkAvroSchemaTypeSupported(schema);
5969
}
60-
} catch (SchemaParseException e) {
61-
if (schemaData.getType() == SchemaType.JSON) {
62-
// we used JsonSchema for storing the definition of a JSON schema
63-
// hence for backward compatibility consideration, we need to try
64-
// to use JsonSchema to decode the schema data
70+
} catch (InvalidSchemaDataException invalidSchemaDataException) {
71+
throw invalidSchemaDataException;
72+
} catch (Exception e) {
73+
// Avro 1.12.0 may throw NullPointerException (not SchemaParseException) for
74+
// non-Avro schemas, so the legacy fallback must be in the general catch block.
75+
if (schemaData.getType() == SchemaType.JSON && allowLegacyJacksonFormat) {
76+
// For backward compatibility with pre-2.1 schemas: try Jackson JsonSchema parsing.
77+
// This fallback is only enabled when schemaJsonAllowLegacyJacksonFormat=true (PIP-464).
6578
try {
6679
JSON_SCHEMA_READER.readValue(data);
6780
} catch (IOException ioe) {
@@ -70,10 +83,6 @@ public void validate(SchemaData schemaData) throws InvalidSchemaDataException {
7083
} else {
7184
throwInvalidSchemaDataException(schemaData, e);
7285
}
73-
} catch (InvalidSchemaDataException invalidSchemaDataException) {
74-
throw invalidSchemaDataException;
75-
} catch (Exception e) {
76-
throwInvalidSchemaDataException(schemaData, e);
7786
}
7887
}
7988

0 commit comments

Comments
 (0)