This repository was archived by the owner on Jan 24, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 142
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
[BUG] Encounter corrupted messages after recovered from an OOM #442
Copy link
Copy link
Closed
Labels
Description
Describe the bug
After broker restarted from an OOM, the KafkaEntryFormatter#decode could throw an exception.
16:17:54.918 [bookkeeper-ml-workers-OrderedExecutor-4-0] ERROR org.apache.pulsar.common.protocol.Commands - [null] [0] Failed to parse message metadata
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 7
at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[org.apache.pulsar-pulsar-common-2.8.0-rc-202102252222.jar:2.8.0-rc-202102252222]
at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[org.apache.pulsar-pulsar-common-2.8.0-rc-202102252222.jar:2.8.0-rc-202102252222]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:426) ~[org.apache.pulsar-pulsar-common-2.8.0-rc-202102252222.jar:2.8.0-rc-202102252222]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:416) ~[org.apache.pulsar-pulsar-common-2.8.0-rc-202102252222.jar:2.8.0-rc-202102252222]
at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1668) ~[org.apache.pulsar-pulsar-common-2.8.0-rc-202102252222.jar:2.8.0-rc-202102252222]
at io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils.peekBaseOffsetFromEntry(MessageIdUtils.java:114) ~[?:?]
at io.streamnative.pulsar.handlers.kop.format.KafkaEntryFormatter.lambda$decode$0(KafkaEntryFormatter.java:52) ~[?:?]
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) ~[?:?]
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) ~[?:?]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[?:?]
at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:952) ~[?:?]
at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:926) ~[?:?]
at java.util.stream.AbstractTask.compute(AbstractTask.java:327) ~[?:?]
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746) ~[?:?]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) ~[?:?]
at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:408) ~[?:?]
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:736) ~[?:?]
at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:919) ~[?:?]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) ~[?:?]
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) ~[?:?]
at io.streamnative.pulsar.handlers.kop.format.KafkaEntryFormatter.decode(KafkaEntryFormatter.java:62) ~[?:?]
at io.streamnative.pulsar.handlers.kop.MessageFetchContext.lambda$null$12(MessageFetchContext.java:371) ~[?:?]
at java.util.concurrent.ConcurrentHashMap$EntrySetView.forEach(ConcurrentHashMap.java:4864) ~[?:?]
at io.streamnative.pulsar.handlers.kop.MessageFetchContext.lambda$readMessagesInternal$13(MessageFetchContext.java:318) ~[?:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
at io.streamnative.pulsar.handlers.kop.MessageFetchContext$2.readEntriesComplete(MessageFetchContext.java:458) ~[?:?]
In addition, there's another unexcepted error:
16:18:13.011 [AsyncHttpClient-54-1] ERROR io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [persistent://public/default/test-topic-FYeWwgs-0000-partition-0] failed get pulsar address, returned null.
16:18:13.011 [AsyncHttpClient-54-1] ERROR io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - Not get advertise data for Kafka topic:persistent://public/default/test-topic-FYeWwgs-0000-partition-0. throwable
It looks like the topic was deleted.
To Reproduce
Run 2 producers and 2 consumers for a 3 nodes KoP cluster with the 200000 messages producer rate and each message's size is 1 KB.
Expected behavior
We should handle the exception from decode at least.
Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.
