Skip to content

Conversation

@berg223
Copy link
Contributor

@berg223 berg223 commented Jun 3, 2025

Fixes #24159

Main Issue: #24159

Motivation

The unacked message count is not correct when using exclusive or failover subscription. We need to fix it.

Modifications

  1. addAndGetUnAckedMsgs for all subscription type in Consumer
  2. addAndGetUnAckedMsgs for all subscription type when handle ack.

Verifying this change

  • Make sure that the change passes the CI checks.

Does this pull request potentially affect one of the following parts:

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jun 3, 2025
@berg223
Copy link
Contributor Author

berg223 commented Jun 4, 2025

I find it's not work when using Cumulative ACK in EXLUSIVE mode. Any feedback or advice is appreciated !

Copy link
Member

@nodece nodece left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work! I left my review result, please check.

@berg223
Copy link
Contributor Author

berg223 commented Jun 4, 2025

Great work! I left my review result, please check.

@nodece Thanks for kind review on it! I've been thinking about it for a while about Cumulative ACK. Seems like it still need more work to fix unacked message count when using Cumulative ACK. Any feedback or advice is appreciated !

@berg223
Copy link
Contributor Author

berg223 commented Jun 5, 2025

I have another WIP berg223#1 to solve the issue further when the exlusive or failover consumer using the cumulative ack mode. I believe that it's complete now. Because there is another related flow control issue, I'm not sure whether to merge the WIP into this PR together.

The basic idea is that we use the pendingAcks in Consumer to track unacked message for exlusive or failover subscription.However, pendingAcks is limited to individualAckMode before the WIP. I'm not sure whether the WIP break the origin design. So I have submitted a WIP there.

If you think the WIP and this PR should be merged together. Please let me know and I will do it!

@lhotari @nodece

@nodece
Copy link
Member

nodece commented Jun 5, 2025

@berg223 berg223#1 looks is an improvement, you can make a new PR to handle that after this PR is merged.

@berg223
Copy link
Contributor Author

berg223 commented Jun 5, 2025

@berg223 berg223#1 looks is an improvement, you can make a new PR to handle that after this PR is merged.

Thanks for the kind guide! I will make a new PR.

@berg223
Copy link
Contributor Author

berg223 commented Jun 8, 2025

Since this PR only support individual ack in non–batching mode, I have submitted a new PR #24396 to further support features about:

  1. batching mode
  2. cumulative ack
  3. transaction

Any feedback is appreciated @nodece

@berg223
Copy link
Contributor Author

berg223 commented Jun 10, 2025

/pulsarbot rerun-failure-checks

@nodece
Copy link
Member

nodece commented Jun 10, 2025

I will help you fix the CI:

  • Please remove Subscription.isIndividualAckMode(subType) check from Consumer.shouldBlockConsumerOnUnackMsgs to fix AdminApi2Test#testConsumerStatsLastTimestamp.

@berg223
Copy link
Contributor Author

berg223 commented Jun 10, 2025

I will help you fix the CI:

  • Please remove Subscription.isIndividualAckMode(subType) check from Consumer.shouldBlockConsumerOnUnackMsgs to fix AdminApi2Test#testConsumerStatsLastTimestamp.

Thanks for kind remind. But we cannot do this. Exclusive consumer shouldn't be blocked because it has no chance to be unblock when ack message. I have considered the block behavior carefully when coding and it's reasonable to keep it never be blocked the same as before.

I have read the unit test in detail. And I think the assertEquals(lastConsumedFlowTimestamp, consumedFlowTimestamp); should be changed to assertTrue(ackedTimestampInSubStats < lastAckedTimestampInSubStats);. A second flowPermit() will be triggered for Exclusive mode. The flow behavior is different with before and it's reasonable. Because after we acked message, the exlusive consumer will have more resource to consume messages. And we can give it a chance to flowPermit(). It beahaves same to individual mode ack of shared consumer.

Sorry for the CI breaks.

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cumulative acknowledgment is supported for both Inclusive and Failover subscription types, which is likely the main reason why unacknowledged message tracking hasn’t been supported previously.

Tracking unacked messages at the consumer level is quite difficult for Inclusive and Failover subscriptions, as the broker cannot determine how many individual messages were acknowledged in each cumulative ack.

While Shared subscriptions maintain pending acknowledgments with batch_size metadata per entry, I don’t think we should introduce similar tracking for Failover or Inclusive subscriptions.

In practice, users can already estimate unacknowledged messages using internal topic stats, since there’s only one active consumer for Failover and Inclusive subscriptions.

IMO, it's no a simple bug fix here. But we can move to the mail list for the detailed discussion.

@berg223
Copy link
Contributor Author

berg223 commented Jun 13, 2025

IMO, since there will be a complete solution in PIP 426 #24400 , we can just close this PR if our user has no urgent need to fix it immediately. And we can leave comments in that PIP. @codelipenghui @nodece

@berg223
Copy link
Contributor Author

berg223 commented Jun 13, 2025

Cumulative acknowledgment is supported for both Inclusive and Failover subscription types, which is likely the main reason why unacknowledged message tracking hasn’t been supported previously.

Sorry for that I'm confused by this. Why it's a reason?

Tracking unacked messages at the consumer level is quite difficult for Inclusive and Failover subscriptions, as the broker cannot determine how many individual messages were acknowledged in each cumulative ack.

A more complete implemention #24396 get the acknowledged number by removeAllUpTo method. Do you have concerns about that? Is there some cases haven't been covered?

While Shared subscriptions maintain pending acknowledgments with batch_size metadata per entry, I don’t think we should introduce similar tracking for Failover or Inclusive subscriptions.

Do you have any worry about increasing memory usage here and that's the reason that you think it's negative to do that? Since consumer maybe acknowledge at any message, we need to record pending acknowledgments with batch_size metadata per entry. Theoretically, it hasn't no difference with shared or key_shared subscription and it will has same impact on increasing memory. We can still add a feature flag here to limit it.

On the other hand, the flowcontrol of exlusive consumer is not work for now. And that functionality has a depend on accurate unacked message count.

There are some issue like #15189 indicates that our failover consumer lack fine grained limit on pending ackownledge. We will have a better infrastructure to solve issues like that if we introduce similar tracking for Failover or EXclusive subscriptions.

In practice, users can already estimate unacknowledged messages using internal topic stats, since there’s only one active consumer for Failover and Inclusive subscriptions.

Could you please share us the way to estimate unacknowledged messages using internal topic stats. Is that not a hack way and friendly to our users? Do you think we still need to make the count accurate?

IMO, it's no a simple bug fix here. But we can move to the mail list for the detailed discussion.

Yes! We have a PIP 426 #24400 . We can discuss there!

@codelipenghui

@codelipenghui
Copy link
Contributor

@berg223 Thanks for sharing the proposal, let talk on the PIP and I will also close this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] unacked message count is zero when using exclusive subscription.

4 participants