Skip to content

Conversation

@omercelikceng
Copy link
Contributor

@omercelikceng omercelikceng commented Nov 29, 2023

I found some issues when given multiplex configuration in consumer for ReactorKafkaBinder. The reason is that destinationName is not parsed while partition information is fetched. That is, when the destinationName "testA, testB, testC" is given, the topic name is accepted as "testA, testB, testC". However, this is not just one topic ("testA, testB, testC"). They are 3 different topics (topic1="testA", topic2="testB", topic3="testC"). It is necessary to parse it and process it as 3 different topics.

@sobychacko
Copy link
Contributor

I took a glance at the PR. Please tell me where you added the code to fix the issue of splitting up the topics from the multiplexed destinations. Also, it will be helpful for us to review if you can add some inline comments to the code in this PR itself. Go to the Files Changed tab above, click on the + sign on the code you changed, and add some comments so I can quickly review your changes. Thank you!

int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
boolean groupManagement = properties.getExtension().isAutoRebalanceEnabled();
processTopic(consumerGroup, properties, factory, partitionCount, properties.getExtension().isDestinationIsPattern(),
groupManagement, destination.getName());
Copy link
Contributor Author

@omercelikceng omercelikceng Nov 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sobychacko Actually, this is the main point I want to fix. All the remaining changes I made are for refactoring.

You commented on the processTopic method above saying, "Let's refactor it later." Hence all my remaining changes. I marked your comment with * comment.

Multiplex configuration is handled in the KafkaMessageChannelBinder class. However, this has been forgotten in the ReactorKafkaBinder class. I just wrote code to fix this. All other codes are for refactoring purposes only.

error

return new ReactorMessageHandler(opts, converter, destination.getName(), resultChannel);
}

// TODO: Refactor to provide in a common area since KafkaMessageChannelBinder also provides this.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alterConfigsResult.all().get(this.operationTimeout, TimeUnit.SECONDS);
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaMessageChannelBinder and ReactorKafkaBinder classes performed the same operations. I put these methods here to use them from a common place.

.collect(Collectors.toList());
}).given(provisioningProvider).getPartitionsForTopic(anyInt(), anyBoolean(),
any(), any());
}).given(provisioningProvider).getListenedPartitions(anyString(), any(), any(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I was moving methods, I needed to mock this method.

@sobychacko
Copy link
Contributor

@omercelikceng Thank you for the PR contribution! Merged upstream.

@sobychacko sobychacko closed this Dec 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants