Skip to content

Conversation

@ddcprg
Copy link
Contributor

@ddcprg ddcprg commented Jul 13, 2022

Helps fixing #9045

@codecov-commenter
Copy link

Codecov Report

Merging #9051 (eb02424) into master (8e7ca65) will decrease coverage by 45.33%.
The diff coverage is n/a.

❗ Current head eb02424 differs from pull request most recent head 4023afa. Consider uploading reports for the commit 4023afa to get more accurate results

@@              Coverage Diff              @@
##             master    #9051       +/-   ##
=============================================
- Coverage     70.09%   24.75%   -45.34%     
+ Complexity     4965       51     -4914     
=============================================
  Files          1831     1819       -12     
  Lines         96270    95917      -353     
  Branches      14390    14352       -38     
=============================================
- Hits          67483    23749    -43734     
- Misses        24135    69876    +45741     
+ Partials       4652     2292     -2360     
Flag Coverage Δ
integration1 ?
integration2 24.75% <ø> (-0.15%) ⬇️
unittests1 ?
unittests2 ?

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
.../java/org/apache/pinot/spi/utils/BooleanUtils.java 0.00% <0.00%> (-100.00%) ⬇️
...java/org/apache/pinot/spi/trace/BaseRecording.java 0.00% <0.00%> (-100.00%) ⬇️
...java/org/apache/pinot/spi/trace/NoOpRecording.java 0.00% <0.00%> (-100.00%) ⬇️
...ava/org/apache/pinot/spi/config/table/FSTType.java 0.00% <0.00%> (-100.00%) ⬇️
...ava/org/apache/pinot/spi/config/user/RoleType.java 0.00% <0.00%> (-100.00%) ⬇️
...ava/org/apache/pinot/spi/data/MetricFieldSpec.java 0.00% <0.00%> (-100.00%) ⬇️
...java/org/apache/pinot/common/tier/TierFactory.java 0.00% <0.00%> (-100.00%) ⬇️
...a/org/apache/pinot/spi/config/table/TableType.java 0.00% <0.00%> (-100.00%) ⬇️
.../org/apache/pinot/spi/data/DimensionFieldSpec.java 0.00% <0.00%> (-100.00%) ⬇️
.../org/apache/pinot/spi/data/readers/FileFormat.java 0.00% <0.00%> (-100.00%) ⬇️
... and 1344 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 8e7ca65...4023afa. Read the comment docs.

Copy link
Contributor

@mcvsubbu mcvsubbu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to declare a new class of decoder exception (e.g. TransientDecoderException (in the same package as PermanentConsumerException) and throw that particular exception when decoder is called.

You can change the interface signature to throw that additional exception in StreamMessageDecoder.

You can also change the code in LLRealtimeSegmentDataManager to field this exception and use the retry logic already present there.

This way, the new feature will be available for other streams that have a similar requirement (of accessing the stream endpoint again in order to decode a message).

thanks

This way, the functionality is available

@ddcprg
Copy link
Contributor Author

ddcprg commented Jul 13, 2022

@mcvsubbu thanks for making a proposal. This particular exception that I am addressing in this PR is not transient though and the stream cannot recover from it

@kishoreg
Copy link
Member

Discussed on slack and I think this does the job for now.

We can file follow up PR's to address this in a generic way. Please add docs and test case if possible.

Will be great if you can take up the task of solving this in a generic way for all formats and decoders

@mcvsubbu
Copy link
Contributor

OK, then can you please add a comment in the StreamMessageaDecoder class that null return is OK, and the row will be dropped if null is returned.

@ddcprg
Copy link
Contributor Author

ddcprg commented Jul 13, 2022

@kishoreg I'll address the other points in #9045 in a separate PR as discussed and see if I can easily add an integration test
@mcvsubbu of course, I'll update the javadocs

@mcvsubbu
Copy link
Contributor

@kishoreg I'll address the other points in #9045 in a separate PR as discussed and see if I can easily add an integration test @mcvsubbu of course, I'll update the javadocs

thanks, with that , I am good

@ddcprg ddcprg force-pushed the issue_9045 branch 2 times, most recently from 4623d2a to 8003067 Compare July 15, 2022 22:01
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find a better place for this class

Copy link
Contributor Author

@ddcprg ddcprg Jul 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm using testcontainers because I had loads of dependency issues flagged by enforcer and I didn't find an easy way to fix them, otherwise schema registry could've been embedded in the code instead of using a container. Also, this spins up its own Kafka broker because I wasn't able to connect the container to the Kafka broker that the test creates - hostname is translated to a especial DN and I couldn't get Kafka listeners play nicely with that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is fine. We already use test-containers and localstack in other tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just make sure to shut it down properly in case of failure

@ddcprg ddcprg force-pushed the issue_9045 branch 2 times, most recently from c3cd429 to c6da272 Compare July 17, 2022 19:08
@ddcprg
Copy link
Contributor Author

ddcprg commented Jul 18, 2022

@mcvsubbu @kishoreg if you don't mind please take a look at this PR when you get some spare time

Copy link
Contributor

@mcvsubbu mcvsubbu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a relatively small change that can be verified via a unit test. How much time does the integration test take? We generally try to keep the number of integration tests small.

@ddcprg
Copy link
Contributor Author

ddcprg commented Jul 18, 2022

on my laptop the IT takes ~2.5 min (not counting docker images download time), the good thing about this is that it also tests plugin integration with schema registry. I could replace it with a UT but then I would have to mock schema registry requests and responses

@mcvsubbu
Copy link
Contributor

My suggestion is not to have the integration test enabled.

@Jackie-Jiang or @kishoreg thoughts?

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcvsubbu I feel it is okay to add an IT for confluent kafka since we don't have one yet

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should take the kafka:9092 value from _kafkaContainer object so that it doesn't betray us if we change the port.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the port number is hardcoded in KafkaContainer, the function that returns the BROKER listener is protected. The most I can do if define a constant with the hostname for this container and replace "kafka" with this new constant. The port cannot be changed

Comment on lines +131 to +134
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of incrementing a counter metric here? Normally people don't go through the logs, but they can get alerted on metrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea indeed! I'll add one

Copy link
Contributor Author

@ddcprg ddcprg Jul 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually there is an error counter in LL data manager already,unless we want to get a counter per error type this won't be necessary right now if you agree

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just saw it: INVALID_REALTIME_ROWS_DROPPED. No need to create a new counter then.

@ddcprg
Copy link
Contributor Author

ddcprg commented Jul 22, 2022

@mcvsubbu are we good with this PR now?

@mcvsubbu
Copy link
Contributor

@mcvsubbu are we good with this PR now?

yup, good to go

@ddcprg
Copy link
Contributor Author

ddcprg commented Jul 22, 2022

I think only maintainers can merge PRs

@Jackie-Jiang
Copy link
Contributor

@ddcprg Please take a look at the failures in the CI pipeline

@ddcprg
Copy link
Contributor Author

ddcprg commented Jul 24, 2022

@mcvsubbu @Jackie-Jiang all green now but I still can't merge

@mcvsubbu mcvsubbu merged commit bbec0a8 into apache:master Jul 24, 2022
@ddcprg ddcprg deleted the issue_9045 branch July 25, 2022 09:09
Comment on lines +154 to +155
nonAvroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes, "Rubbish".getBytes(UTF_8)));
avroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes, genericRecord));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ddcprg can you help us understand if we need to produce one non-avro rubbish per avro producer row?
we are seeing a lot of rubbish in the logs and we were trying to determine whether we should slim down the number of rubbish rows published in this test case.

see: #9516

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @walterddr not at all, we only need to produce a few messages to make sure they are skipped. Let me know if you like me to raise a PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries. We will limit the rubbish rows produced thank you for confirming!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants