-
Notifications
You must be signed in to change notification settings - Fork 167
Closed
Description
Description
In our project we have multiple services, where we also use different implementations of the CloudEvents standard.
We now have a problem, where a service uses cloudevents-json-jackson and gets a valid CloudEvent, but which does have the field subject set to null. Then the service fails to deserialize this event (see error log).
If I understand the CloudEvents format correctly (https://github.com/cloudevents/spec/blob/v1.0.1/json-format.md), the subject field should be nullable.
General information
Dependencies used:
- cloudevents-core:
2.2.0 - cloudevents-json-jackson:
2.2.0 - cloudevents-kafka:
2.2.0
Error log
2021-12-09 15:01:11,027 WARN [io.sma.rea.mes.kafka] (smallrye-kafka-consumer-thread-10) SRMSG18228:
A failure has been reported for Kafka topics '[my-topic]':
io.cloudevents.core.format.EventDeserializationException: com.fasterxml.jackson.databind.exc.MismatchedInputException:
Wrong type NULL for attribute subject, expecting STRING
at io.cloudevents.jackson.JsonFormat.deserialize(JsonFormat.java:101)
at io.cloudevents.jackson.JsonFormat.deserialize(JsonFormat.java:107)
at io.cloudevents.core.message.MessageReader.lambda$toEvent$0(MessageReader.java:118)
at io.cloudevents.core.message.impl.GenericStructuredMessageReader.read(GenericStructuredMessageReader.java:42)
at io.cloudevents.core.message.MessageReader.toEvent(MessageReader.java:118)
at io.cloudevents.core.message.MessageReader.toEvent(MessageReader.java:102)
at io.cloudevents.kafka.CloudEventDeserializer.deserialize(CloudEventDeserializer.java:62)
at io.cloudevents.kafka.CloudEventDeserializer.deserialize(CloudEventDeserializer.java:34)
at io.smallrye.reactive.messaging.kafka.fault.DeserializerWrapper.deserialize(DeserializerWrapper.java:111)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1386)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:133)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1617)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1453)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:686)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:637)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1303)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.lambda$poll$4(ReactiveKafkaConsumer.java:141)
at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.lambda$runOnPollingThread$0(ReactiveKafkaConsumer.java:108)
at io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21)
at io.smallrye.mutiny.operators.uni.builders.UniCreateFromItemSupplier.subscribe(UniCreateFromItemSupplier.java:28)
at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.lambda$subscribe$0(UniRunSubscribeOn.java:27)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)Implemenation details
We have an ObjectMapper that has the CloudEvent module configured:
@Override
public void customize(ObjectMapper mapper) {
mapper.registerModule(new JavaTimeModule())
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.registerModule(io.cloudevents.jackson.JsonFormat.getCloudEventJacksonModule());
}We are using Quarkus and our listeners are configured as such:
mp:
messaging:
incoming:
my-topic:
connector: smallrye-kafka
topic: my-topic
key:
deserializer: org.apache.kafka.common.serialization.StringDeserializer
value:
deserializer: io.cloudevents.kafka.CloudEventDeserializer
interceptor:
classes: io.opentracing.contrib.kafka.TracingConsumerInterceptor
cloud-events: false
failure-strategy: ignoreListeners are implemented as such:
@Startup
@ApplicationScoped
public class MyEventListener {
@Incoming("my-topic")
@Blocking
public CompletionStage<Void> receiveMyEvent(Message<CloudEventV1> event) {
// do something
return event.ack();
}
}eNeRGy164 and Oliver-CI
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working