Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

[BUG] Encounter corrupted messages after recovered from an OOM #442

@BewareMyPower

Description

@BewareMyPower

Describe the bug
After broker restarted from an OOM, the KafkaEntryFormatter#decode could throw an exception.

16:17:54.918 [bookkeeper-ml-workers-OrderedExecutor-4-0] ERROR org.apache.pulsar.common.protocol.Commands - [null] [0] Failed to parse message metadata
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 7
    at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[org.apache.pulsar-pulsar-common-2.8.0-rc-202102252222.jar:2.8.0-rc-202102252222]
    at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[org.apache.pulsar-pulsar-common-2.8.0-rc-202102252222.jar:2.8.0-rc-202102252222]
    at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:426) ~[org.apache.pulsar-pulsar-common-2.8.0-rc-202102252222.jar:2.8.0-rc-202102252222]
    at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:416) ~[org.apache.pulsar-pulsar-common-2.8.0-rc-202102252222.jar:2.8.0-rc-202102252222]
    at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1668) ~[org.apache.pulsar-pulsar-common-2.8.0-rc-202102252222.jar:2.8.0-rc-202102252222]
    at io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils.peekBaseOffsetFromEntry(MessageIdUtils.java:114) ~[?:?]
    at io.streamnative.pulsar.handlers.kop.format.KafkaEntryFormatter.lambda$decode$0(KafkaEntryFormatter.java:52) ~[?:?]
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) ~[?:?]
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) ~[?:?]
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[?:?]
    at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:952) ~[?:?]
    at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:926) ~[?:?]
    at java.util.stream.AbstractTask.compute(AbstractTask.java:327) ~[?:?]
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746) ~[?:?]
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) ~[?:?]
    at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:408) ~[?:?]
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:736) ~[?:?]
    at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:919) ~[?:?]
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) ~[?:?]
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) ~[?:?]
    at io.streamnative.pulsar.handlers.kop.format.KafkaEntryFormatter.decode(KafkaEntryFormatter.java:62) ~[?:?]
    at io.streamnative.pulsar.handlers.kop.MessageFetchContext.lambda$null$12(MessageFetchContext.java:371) ~[?:?]
    at java.util.concurrent.ConcurrentHashMap$EntrySetView.forEach(ConcurrentHashMap.java:4864) ~[?:?]
    at io.streamnative.pulsar.handlers.kop.MessageFetchContext.lambda$readMessagesInternal$13(MessageFetchContext.java:318) ~[?:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
    at io.streamnative.pulsar.handlers.kop.MessageFetchContext$2.readEntriesComplete(MessageFetchContext.java:458) ~[?:?]

In addition, there's another unexcepted error:

16:18:13.011 [AsyncHttpClient-54-1] ERROR io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [persistent://public/default/test-topic-FYeWwgs-0000-partition-0] failed get pulsar address, returned null.
16:18:13.011 [AsyncHttpClient-54-1] ERROR io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - Not get advertise data for Kafka topic:persistent://public/default/test-topic-FYeWwgs-0000-partition-0. throwable

It looks like the topic was deleted.

To Reproduce
Run 2 producers and 2 consumers for a 3 nodes KoP cluster with the 200000 messages producer rate and each message's size is 1 KB.

Expected behavior
We should handle the exception from decode at least.

Screenshots
If applicable, add screenshots to help explain your problem.
image

image

Additional context
Add any other context about the problem here.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions