-
Notifications
You must be signed in to change notification settings - Fork 629
The bug for multiplex configuration in ReactorKafkaBinder #2862
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
The bug for multiplex configuration in ReactorKafkaBinder #2862
Conversation
…d and a method was written for common partition-related operations.
…d and a method was written for common partition-related operations.
|
I took a glance at the PR. Please tell me where you added the code to fix the issue of splitting up the topics from the multiplexed destinations. Also, it will be helpful for us to review if you can add some inline comments to the code in this PR itself. Go to the |
| int partitionCount = properties.getInstanceCount() * properties.getConcurrency(); | ||
| boolean groupManagement = properties.getExtension().isAutoRebalanceEnabled(); | ||
| processTopic(consumerGroup, properties, factory, partitionCount, properties.getExtension().isDestinationIsPattern(), | ||
| groupManagement, destination.getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sobychacko Actually, this is the main point I want to fix. All the remaining changes I made are for refactoring.
You commented on the processTopic method above saying, "Let's refactor it later." Hence all my remaining changes. I marked your comment with * comment.
Multiplex configuration is handled in the KafkaMessageChannelBinder class. However, this has been forgotten in the ReactorKafkaBinder class. I just wrote code to fix this. All other codes are for refactoring purposes only.
| return new ReactorMessageHandler(opts, converter, destination.getName(), resultChannel); | ||
| } | ||
|
|
||
| // TODO: Refactor to provide in a common area since KafkaMessageChannelBinder also provides this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| alterConfigsResult.all().get(this.operationTimeout, TimeUnit.SECONDS); | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KafkaMessageChannelBinder and ReactorKafkaBinder classes performed the same operations. I put these methods here to use them from a common place.
| .collect(Collectors.toList()); | ||
| }).given(provisioningProvider).getPartitionsForTopic(anyInt(), anyBoolean(), | ||
| any(), any()); | ||
| }).given(provisioningProvider).getListenedPartitions(anyString(), any(), any(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since I was moving methods, I needed to mock this method.
|
@omercelikceng Thank you for the PR contribution! Merged upstream. |

I found some issues when given multiplex configuration in consumer for ReactorKafkaBinder. The reason is that destinationName is not parsed while partition information is fetched. That is, when the destinationName "testA, testB, testC" is given, the topic name is accepted as "testA, testB, testC". However, this is not just one topic ("testA, testB, testC"). They are 3 different topics (topic1="testA", topic2="testB", topic3="testC"). It is necessary to parse it and process it as 3 different topics.