Skip to content

Deserialization fails when subject is null #430

@JapuDCret

Description

@JapuDCret

Description

In our project we have multiple services, where we also use different implementations of the CloudEvents standard.

We now have a problem, where a service uses cloudevents-json-jackson and gets a valid CloudEvent, but which does have the field subject set to null. Then the service fails to deserialize this event (see error log).

If I understand the CloudEvents format correctly (https://github.com/cloudevents/spec/blob/v1.0.1/json-format.md), the subject field should be nullable.

General information

Dependencies used:

  • cloudevents-core: 2.2.0
  • cloudevents-json-jackson: 2.2.0
  • cloudevents-kafka: 2.2.0

Error log

2021-12-09 15:01:11,027 WARN  [io.sma.rea.mes.kafka] (smallrye-kafka-consumer-thread-10) SRMSG18228:
    A failure has been reported for Kafka topics '[my-topic]':
    io.cloudevents.core.format.EventDeserializationException: com.fasterxml.jackson.databind.exc.MismatchedInputException:
    Wrong type NULL for attribute subject, expecting STRING
at io.cloudevents.jackson.JsonFormat.deserialize(JsonFormat.java:101)
at io.cloudevents.jackson.JsonFormat.deserialize(JsonFormat.java:107)
at io.cloudevents.core.message.MessageReader.lambda$toEvent$0(MessageReader.java:118)
at io.cloudevents.core.message.impl.GenericStructuredMessageReader.read(GenericStructuredMessageReader.java:42)
at io.cloudevents.core.message.MessageReader.toEvent(MessageReader.java:118)
at io.cloudevents.core.message.MessageReader.toEvent(MessageReader.java:102)
at io.cloudevents.kafka.CloudEventDeserializer.deserialize(CloudEventDeserializer.java:62)
at io.cloudevents.kafka.CloudEventDeserializer.deserialize(CloudEventDeserializer.java:34)
at io.smallrye.reactive.messaging.kafka.fault.DeserializerWrapper.deserialize(DeserializerWrapper.java:111)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1386)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:133)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1617)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1453)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:686)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:637)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1303)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.lambda$poll$4(ReactiveKafkaConsumer.java:141)
at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.lambda$runOnPollingThread$0(ReactiveKafkaConsumer.java:108)
at io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21)
at io.smallrye.mutiny.operators.uni.builders.UniCreateFromItemSupplier.subscribe(UniCreateFromItemSupplier.java:28)
at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.lambda$subscribe$0(UniRunSubscribeOn.java:27)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

Implemenation details

We have an ObjectMapper that has the CloudEvent module configured:

    @Override
    public void customize(ObjectMapper mapper) {
        mapper.registerModule(new JavaTimeModule())
                .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
                .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

        mapper.registerModule(io.cloudevents.jackson.JsonFormat.getCloudEventJacksonModule());
    }

We are using Quarkus and our listeners are configured as such:

mp:
  messaging:
    incoming:
      my-topic:
        connector: smallrye-kafka
        topic: my-topic
        key:
          deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value:
          deserializer: io.cloudevents.kafka.CloudEventDeserializer
        interceptor:
          classes: io.opentracing.contrib.kafka.TracingConsumerInterceptor
        cloud-events: false
        failure-strategy: ignore

Listeners are implemented as such:

@Startup
@ApplicationScoped
public class MyEventListener {

    @Incoming("my-topic")
    @Blocking
    public CompletionStage<Void> receiveMyEvent(Message<CloudEventV1> event) {

        // do something

        return event.ack();
    }
}

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions