-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Handle unknown magic byte error in Confluent Avro decoder (#9045) #9051
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## master #9051 +/- ##
=============================================
- Coverage 70.09% 24.75% -45.34%
+ Complexity 4965 51 -4914
=============================================
Files 1831 1819 -12
Lines 96270 95917 -353
Branches 14390 14352 -38
=============================================
- Hits 67483 23749 -43734
- Misses 24135 69876 +45741
+ Partials 4652 2292 -2360
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
mcvsubbu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to declare a new class of decoder exception (e.g. TransientDecoderException (in the same package as PermanentConsumerException) and throw that particular exception when decoder is called.
You can change the interface signature to throw that additional exception in StreamMessageDecoder.
You can also change the code in LLRealtimeSegmentDataManager to field this exception and use the retry logic already present there.
This way, the new feature will be available for other streams that have a similar requirement (of accessing the stream endpoint again in order to decode a message).
thanks
This way, the functionality is available
|
@mcvsubbu thanks for making a proposal. This particular exception that I am addressing in this PR is not transient though and the stream cannot recover from it |
|
Discussed on slack and I think this does the job for now. We can file follow up PR's to address this in a generic way. Please add docs and test case if possible. Will be great if you can take up the task of solving this in a generic way for all formats and decoders |
|
OK, then can you please add a comment in the |
4623d2a to
8003067
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't find a better place for this class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm using testcontainers because I had loads of dependency issues flagged by enforcer and I didn't find an easy way to fix them, otherwise schema registry could've been embedded in the code instead of using a container. Also, this spins up its own Kafka broker because I wasn't able to connect the container to the Kafka broker that the test creates - hostname is translated to a especial DN and I couldn't get Kafka listeners play nicely with that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is fine. We already use test-containers and localstack in other tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just make sure to shut it down properly in case of failure
c3cd429 to
c6da272
Compare
mcvsubbu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a relatively small change that can be verified via a unit test. How much time does the integration test take? We generally try to keep the number of integration tests small.
|
on my laptop the IT takes ~2.5 min (not counting docker images download time), the good thing about this is that it also tests plugin integration with schema registry. I could replace it with a UT but then I would have to mock schema registry requests and responses |
|
My suggestion is not to have the integration test enabled. @Jackie-Jiang or @kishoreg thoughts? |
Jackie-Jiang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mcvsubbu I feel it is okay to add an IT for confluent kafka since we don't have one yet
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
Outdated
Show resolved
Hide resolved
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should take the kafka:9092 value from _kafkaContainer object so that it doesn't betray us if we change the port.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the port number is hardcoded in KafkaContainer, the function that returns the BROKER listener is protected. The most I can do if define a constant with the hostname for this container and replace "kafka" with this new constant. The port cannot be changed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think of incrementing a counter metric here? Normally people don't go through the logs, but they can get alerted on metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea indeed! I'll add one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually there is an error counter in LL data manager already,unless we want to get a counter per error type this won't be necessary right now if you agree
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just saw it: INVALID_REALTIME_ROWS_DROPPED. No need to create a new counter then.
|
@mcvsubbu are we good with this PR now? |
yup, good to go |
|
I think only maintainers can merge PRs |
|
@ddcprg Please take a look at the failures in the CI pipeline |
|
@mcvsubbu @Jackie-Jiang all green now but I still can't merge |
| nonAvroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes, "Rubbish".getBytes(UTF_8))); | ||
| avroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes, genericRecord)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @walterddr not at all, we only need to produce a few messages to make sure they are skipped. Let me know if you like me to raise a PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No worries. We will limit the rubbish rows produced thank you for confirming!
Helps fixing #9045