-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-14996: Handle overly large user operations on the kcontroller #13742
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Previously, if a user tried to perform an overly large batch operation on the KRaft controller (such as creating a million topics), we would create a very large number of records in memory. Our attempt to write these records to the Raft layer would fail, because there were too many to fit in an atomic batch. This failure, in turn, would trigger a controller failover. (Note: I am assuming here that no topic creation policy was in place that would prevent the creation of a million topics. I am also assuming that the user operation must be done atomically, which is true for all current user operations, since we have not implemented KIP-868 yet.) With this PR, we fail immediately when the number of records we have generated exceeds the threshold that we can apply. This failure does not generate a controller failover. In order to implement this in a simple way, this PR adds the BoundedList class, which wraps any list and adds a maximum length. Attempts to grow the list beyond this length cause an exception to be thrown.
divijvaidya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this change Colin.
The bug falls in the category of cases where we want to protect the broker from misuse (and perhaps misconfiguration). The approach you took looks good. We are protecting the system and failing in a graceful manner (instead of crashing or controller failover as earlier).
-
But I was wondering if an additional guard could be to have a default TopicCreationPolicy with a MaxValue of X number of topics per requests (X is probably <= 10K). On every topic creation request, we will apply the policy and ensure that the request gets rejected upfront before entering the system. We could extend this pattern for other use cases where we would like to restrict range of certain parameters in the requests. What do you think?
-
I am concerned about the user facing aspect of this change. If I am a user and get this exception, what am I expected to do to resolve it? The documentation does not call out any limitation on max topics that I can create in one API call. How do I know what the limit is? The alternative approach I proposed above (topic policy limitation) solves this. We can document policies with the constraints and follow a similar pattern for other out of bound configuration/request params.
Thoughts?
| /** | ||
| * The maximum records any user-initiated operation is allowed to generate. | ||
| */ | ||
| final static int MAX_RECORDS_PER_USER_OP = 10000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a detailed comment explaining how this number was calculated. It would help future authors to understand the considerations to take into account while changing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was chosen to be the same as MAX_RECORDS_PER_BATCH which is also 10000. I will make this clearer by initializing one to the other.
|
@divijvaidya Colin can correct me if I'm mistaken, but I believe this patch is mainly about closing an existing edge case until we implement KIP-868 (metadata transactions). Once we have transactions in the controller, we can allow arbitrarily large batches of records.
Right now, if you create a topic with more than ~10000 partitions, you'll get a server error anyways. The controller fails to commit the batch, throws and exception, and the renounces leadership. Here's what happens on the controller: And the client sees: So, really this patch isn't changing anything from the client's perspective. It's just prevent the controller from renouncing (which is the real problem). |
mumrah
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The approach looks good to me. I also wondered if we could solve this in ControllerResult rather than in each control manager separately.
Will we remove this logic once transactions are implemented?
LGTM once @divijvaidya's comments are addressed.
| log.info("{}: failed with {} in {} us. Reason: {}", name, | ||
| exception.getClass().getSimpleName(), deltaUs, exception.getMessage()); | ||
| if (exception instanceof BoundedListTooLongException) { | ||
| exception = new UnknownServerException(exception.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UnknownServerException is pretty uninformative. Can we use PolicyViolationException with a string indicating the specifics? Not perfect, but the custom string sent back to the user can clarify what's going on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah that's fair.
|
I had just done an alternative smaller PR for the same issue |
|
Hi all, Thanks for the reviews and comments.
I like this idea, but it would require a KIP to implement.
I think the thing to keep in mind is that this PR doesn't make any request fail that wouldn't have already failed.
Yes, that's correct.
I think the issue is that people can request truly gigantic, garbage-collector killing lists of records to be constructed. You want to cut them off before they build the giant list, not after.
I think we'll need some kind of limit even with metadata transactions in place. But it will be a limit not set by the implementation, but by our policy.
Thank you, Edoardo, I guess we were thinking along the same lines. One thing to keep in mind is that the problem is bigger than just CreateTopics. Any operation can be too big and cause implementation problems. |
|
Thanks for all the reviews, and thanks @mumrah for the LGTM. Since this is a 3.5 blocker I am getting it in today so that it will be in the next RC. As I said before, this doesn't add any new limits, but just prevents damage to the controller when the existing limits are exceeded. However, the discussion about limits here was good. I think we should consider a follow-on KIP to make the maximum number of records per user operation configurable, and possibly add a configurable partitions_max as well. |
…13742) Previously, if a user tried to perform an overly large batch operation on the KRaft controller (such as creating a million topics), we would create a very large number of records in memory. Our attempt to write these records to the Raft layer would fail, because there were too many to fit in an atomic batch. This failure, in turn, would trigger a controller failover. (Note: I am assuming here that no topic creation policy was in place that would prevent the creation of a million topics. I am also assuming that the user operation must be done atomically, which is true for all current user operations, since we have not implemented KIP-868 yet.) With this PR, we fail immediately when the number of records we have generated exceeds the threshold that we can apply. This failure does not generate a controller failover. We also now fail with a PolicyViolationException rather than an UnknownServerException. In order to implement this in a simple way, this PR adds the BoundedList class, which wraps any list and adds a maximum length. Attempts to grow the list beyond this length cause an exception to be thrown. Reviewers: David Arthur <[email protected]>, Ismael Juma <[email protected]>, Divij Vaidya <[email protected]>
|
@cmccabe Can we now close KAFKA-14996 or is there more work to do? |
…tream-3.5 * commit 'c2f6f29ca6e1306ac77ec726bac4cd09bd1aa80b': (76 commits) KAFKA-15019: Improve handling of broker heartbeat timeouts (apache#13759) KAFKA-15003: Fix ZK sync logic for partition assignments (apache#13735) MINOR: Add 3.5 upgrade steps for ZK and KRaft (apache#13792) KAFKA-15010 ZK migration failover support (apache#13758) KAFKA-15017 Fix snapshot load in dual write mode for ClientQuotas and SCRAM (apache#13757) MINOR: Update LICENSE-binary following snappy upgrade (apache#13791) Upgrade to snappy v1.1.10.0 (apache#13786) KAFKA-15004: Fix configuration dual-write during migration (apache#13767) KAFKA-8713: JsonConverter replace.null.with.default should prevent emitting default for Struct fields (apache#13781) KAFKA-14996: Handle overly large user operations on the kcontroller (apache#13742) ...
Previously, if a user tried to perform an overly large batch operation on the KRaft controller (such as creating a million topics), we would create a very large number of records in memory. Our attempt to write these records to the Raft layer would fail, because there were too many to fit in an atomic batch. This failure, in turn, would trigger a controller failover.
(Note: I am assuming here that no topic creation policy was in place that would prevent the creation of a million topics. I am also assuming that the user operation must be done atomically, which is true for all current user operations, since we have not implemented KIP-868 yet.)
With this PR, we fail immediately when the number of records we have generated exceeds the threshold that we can apply. This failure does not generate a controller failover.
In order to implement this in a simple way, this PR adds the BoundedList class, which wraps any list and adds a maximum length. Attempts to grow the list beyond this length cause an exception to be thrown.