-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][broker] fix unacked message count is zero when using exclusive subscription #24376
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I find it's not work when using Cumulative ACK in EXLUSIVE mode. Any feedback or advice is appreciated ! |
nodece
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work! I left my review result, please check.
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
Outdated
Show resolved
Hide resolved
...broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
Show resolved
Hide resolved
@nodece Thanks for kind review on it! I've been thinking about it for a while about Cumulative ACK. Seems like it still need more work to fix unacked message count when using Cumulative ACK. Any feedback or advice is appreciated ! |
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
Outdated
Show resolved
Hide resolved
…onsumer.java Co-authored-by: Lari Hotari <[email protected]>
|
I have another WIP berg223#1 to solve the issue further when the exlusive or failover consumer using the cumulative ack mode. I believe that it's complete now. Because there is another related flow control issue, I'm not sure whether to merge the WIP into this PR together. The basic idea is that we use the If you think the WIP and this PR should be merged together. Please let me know and I will do it! |
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
Outdated
Show resolved
Hide resolved
...rg/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
Show resolved
Hide resolved
...rg/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
Show resolved
Hide resolved
...rg/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
Outdated
Show resolved
Hide resolved
|
/pulsarbot rerun-failure-checks |
|
I will help you fix the CI:
|
Thanks for kind remind. But we cannot do this. Exclusive consumer shouldn't be blocked because it has no chance to be unblock when ack message. I have considered the block behavior carefully when coding and it's reasonable to keep it never be blocked the same as before. I have read the unit test in detail. And I think the Sorry for the CI breaks. |
codelipenghui
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cumulative acknowledgment is supported for both Inclusive and Failover subscription types, which is likely the main reason why unacknowledged message tracking hasn’t been supported previously.
Tracking unacked messages at the consumer level is quite difficult for Inclusive and Failover subscriptions, as the broker cannot determine how many individual messages were acknowledged in each cumulative ack.
While Shared subscriptions maintain pending acknowledgments with batch_size metadata per entry, I don’t think we should introduce similar tracking for Failover or Inclusive subscriptions.
In practice, users can already estimate unacknowledged messages using internal topic stats, since there’s only one active consumer for Failover and Inclusive subscriptions.
IMO, it's no a simple bug fix here. But we can move to the mail list for the detailed discussion.
|
IMO, since there will be a complete solution in PIP 426 #24400 , we can just close this PR if our user has no urgent need to fix it immediately. And we can leave comments in that PIP. @codelipenghui @nodece |
Sorry for that I'm confused by this. Why it's a reason?
A more complete implemention #24396 get the acknowledged number by
Do you have any worry about increasing memory usage here and that's the reason that you think it's negative to do that? Since consumer maybe acknowledge at any message, we need to record pending acknowledgments with batch_size metadata per entry. Theoretically, it hasn't no difference with shared or key_shared subscription and it will has same impact on increasing memory. We can still add a feature flag here to limit it. On the other hand, the flowcontrol of exlusive consumer is not work for now. And that functionality has a depend on accurate unacked message count. There are some issue like #15189 indicates that our failover consumer lack fine grained limit on pending ackownledge. We will have a better infrastructure to solve issues like that if we introduce similar tracking for Failover or EXclusive subscriptions.
Could you please share us the way to estimate unacknowledged messages using internal topic stats. Is that not a hack way and friendly to our users? Do you think we still need to make the count accurate?
Yes! We have a PIP 426 #24400 . We can discuss there! |
|
@berg223 Thanks for sharing the proposal, let talk on the PIP and I will also close this PR. |
Fixes #24159
Main Issue: #24159
Motivation
The unacked message count is not correct when using exclusive or failover subscription. We need to fix it.
Modifications
addAndGetUnAckedMsgsfor all subscription type inConsumeraddAndGetUnAckedMsgsfor all subscription type when handle ack.Verifying this change
Does this pull request potentially affect one of the following parts:
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: