Skip to content

Conversation

@cmccabe
Copy link
Contributor

@cmccabe cmccabe commented May 22, 2023

Previously, if a user tried to perform an overly large batch operation on the KRaft controller (such as creating a million topics), we would create a very large number of records in memory. Our attempt to write these records to the Raft layer would fail, because there were too many to fit in an atomic batch. This failure, in turn, would trigger a controller failover.

(Note: I am assuming here that no topic creation policy was in place that would prevent the creation of a million topics. I am also assuming that the user operation must be done atomically, which is true for all current user operations, since we have not implemented KIP-868 yet.)

With this PR, we fail immediately when the number of records we have generated exceeds the threshold that we can apply. This failure does not generate a controller failover.

In order to implement this in a simple way, this PR adds the BoundedList class, which wraps any list and adds a maximum length. Attempts to grow the list beyond this length cause an exception to be thrown.

@cmccabe cmccabe added the kraft label May 22, 2023
Previously, if a user tried to perform an overly large batch operation on the KRaft controller
(such as creating a million topics), we would create a very large number of records in memory. Our
attempt to write these records to the Raft layer would fail, because there were too many to fit in
an atomic batch. This failure, in turn, would trigger a controller failover.

(Note: I am assuming here that no topic creation policy was in place that would prevent the
creation of a million topics. I am also assuming that the user operation must be done atomically,
which is true for all current user operations, since we have not implemented KIP-868 yet.)

With this PR, we fail immediately when the number of records we have generated exceeds the
threshold that we can apply. This failure does not generate a controller failover.

In order to implement this in a simple way, this PR adds the BoundedList class, which wraps any
list and adds a maximum length. Attempts to grow the list beyond this length cause an exception to
be thrown.
Copy link
Member

@divijvaidya divijvaidya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this change Colin.

The bug falls in the category of cases where we want to protect the broker from misuse (and perhaps misconfiguration). The approach you took looks good. We are protecting the system and failing in a graceful manner (instead of crashing or controller failover as earlier).

  1. But I was wondering if an additional guard could be to have a default TopicCreationPolicy with a MaxValue of X number of topics per requests (X is probably <= 10K). On every topic creation request, we will apply the policy and ensure that the request gets rejected upfront before entering the system. We could extend this pattern for other use cases where we would like to restrict range of certain parameters in the requests. What do you think?

  2. I am concerned about the user facing aspect of this change. If I am a user and get this exception, what am I expected to do to resolve it? The documentation does not call out any limitation on max topics that I can create in one API call. How do I know what the limit is? The alternative approach I proposed above (topic policy limitation) solves this. We can document policies with the constraints and follow a similar pattern for other out of bound configuration/request params.

Thoughts?

/**
* The maximum records any user-initiated operation is allowed to generate.
*/
final static int MAX_RECORDS_PER_USER_OP = 10000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a detailed comment explaining how this number was calculated. It would help future authors to understand the considerations to take into account while changing it.

Copy link
Contributor Author

@cmccabe cmccabe May 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was chosen to be the same as MAX_RECORDS_PER_BATCH which is also 10000. I will make this clearer by initializing one to the other.

@mumrah
Copy link
Member

mumrah commented May 26, 2023

@divijvaidya Colin can correct me if I'm mistaken, but I believe this patch is mainly about closing an existing edge case until we implement KIP-868 (metadata transactions). Once we have transactions in the controller, we can allow arbitrarily large batches of records.

I am concerned about the user facing aspect of this change. If I am a user and get this exception, what am I expected to do to resolve it?

Right now, if you create a topic with more than ~10000 partitions, you'll get a server error anyways. The controller fails to commit the batch, throws and exception, and the renounces leadership.

Here's what happens on the controller:

[2023-05-26 10:24:28,308] DEBUG [QuorumController id=1] Got exception while running createTopics(1813420413). Invoking handleException. (org.apache.kafka.queue.KafkaEventQueue)
java.lang.IllegalStateException: Attempted to atomically commit 20001 records, but maxRecordsPerBatch is 10000
	at org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:812)
	at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719)
	at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
	at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
	at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
	at java.lang.Thread.run(Thread.java:750)
[2023-05-26 10:24:28,314] INFO [RaftManager id=1] Received user request to resign from the current epoch 3 (org.apache.kafka.raft.KafkaRaftClient)
[2023-05-26 10:24:28,323] INFO [RaftManager id=1] Failed to handle fetch from 2 at 142 due to NOT_LEADER_OR_FOLLOWER (org.apache.kafka.raft.KafkaRaftClient)

And the client sees:

[2023-05-26 10:24:28,351] ERROR org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request.
 (kafka.admin.TopicCommand$)

So, really this patch isn't changing anything from the client's perspective. It's just prevent the controller from renouncing (which is the real problem).

Copy link
Member

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach looks good to me. I also wondered if we could solve this in ControllerResult rather than in each control manager separately.

Will we remove this logic once transactions are implemented?

LGTM once @divijvaidya's comments are addressed.

log.info("{}: failed with {} in {} us. Reason: {}", name,
exception.getClass().getSimpleName(), deltaUs, exception.getMessage());
if (exception instanceof BoundedListTooLongException) {
exception = new UnknownServerException(exception.getMessage());
Copy link
Member

@ijuma ijuma May 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnknownServerException is pretty uninformative. Can we use PolicyViolationException with a string indicating the specifics? Not perfect, but the custom string sent back to the user can clarify what's going on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that's fair.

@edoardocomar
Copy link
Contributor

I had just done an alternative smaller PR for the same issue
#13766

@cmccabe
Copy link
Contributor Author

cmccabe commented May 26, 2023

Hi all,

Thanks for the reviews and comments.

@divijvaidya : But I was wondering if an additional guard could be to have a default TopicCreationPolicy with a MaxValue of X number of topics per requests (X is probably <= 10K). On every topic creation request, we will apply the policy and ensure that the request gets rejected upfront before entering the system. We could extend this pattern for other use cases where we would like to restrict range of certain parameters in the requests. What do you think?

I like this idea, but it would require a KIP to implement.

@divijvaidya : I am concerned about the user facing aspect of this change. If I am a user and get this exception, what am I expected to do to resolve it? The documentation does not call out any limitation on max topics that I can create in one API call. How do I know what the limit is? The alternative approach I proposed above (topic policy limitation) solves this. We can document policies with the constraints and follow a similar pattern for other out of bound configuration/request params.

I think the thing to keep in mind is that this PR doesn't make any request fail that wouldn't have already failed.

@mumrah : Colin can correct me if I'm mistaken, but I believe this patch is mainly about closing an existing edge case until we implement KIP-868 (metadata transactions). Once we have transactions in the controller, we can allow arbitrarily large batches of records.

Yes, that's correct.

@mumrah : I also wondered if we could solve this in ControllerResult rather than in each control manager separately.

I think the issue is that people can request truly gigantic, garbage-collector killing lists of records to be constructed. You want to cut them off before they build the giant list, not after.

@mumrah : Will we remove this logic once transactions are implemented?

I think we'll need some kind of limit even with metadata transactions in place. But it will be a limit not set by the implementation, but by our policy.

@edoardocomar : I had just done an alternative smaller PR for the same issue

Thank you, Edoardo, I guess we were thinking along the same lines. One thing to keep in mind is that the problem is bigger than just CreateTopics. Any operation can be too big and cause implementation problems.

@cmccabe cmccabe merged commit b74204f into apache:trunk May 26, 2023
@cmccabe cmccabe deleted the KAFKA-14996 branch May 26, 2023 20:16
@cmccabe
Copy link
Contributor Author

cmccabe commented May 26, 2023

Thanks for all the reviews, and thanks @mumrah for the LGTM. Since this is a 3.5 blocker I am getting it in today so that it will be in the next RC.

As I said before, this doesn't add any new limits, but just prevents damage to the controller when the existing limits are exceeded. However, the discussion about limits here was good. I think we should consider a follow-on KIP to make the maximum number of records per user operation configurable, and possibly add a configurable partitions_max as well.

cmccabe added a commit that referenced this pull request May 26, 2023
…13742)

Previously, if a user tried to perform an overly large batch operation on the KRaft controller
(such as creating a million topics), we would create a very large number of records in memory. Our
attempt to write these records to the Raft layer would fail, because there were too many to fit in
an atomic batch. This failure, in turn, would trigger a controller failover.

(Note: I am assuming here that no topic creation policy was in place that would prevent the
creation of a million topics. I am also assuming that the user operation must be done atomically,
which is true for all current user operations, since we have not implemented KIP-868 yet.)

With this PR, we fail immediately when the number of records we have generated exceeds the
threshold that we can apply. This failure does not generate a controller failover. We also now
fail with a PolicyViolationException rather than an UnknownServerException.

In order to implement this in a simple way, this PR adds the BoundedList class, which wraps any
list and adds a maximum length. Attempts to grow the list beyond this length cause an exception to
be thrown.

Reviewers: David Arthur <[email protected]>, Ismael Juma <[email protected]>, Divij Vaidya <[email protected]>
@mimaison
Copy link
Member

@cmccabe Can we now close KAFKA-14996 or is there more work to do?

emissionnebula added a commit to confluentinc/kafka that referenced this pull request Jun 16, 2023
…tream-3.5

* commit 'c2f6f29ca6e1306ac77ec726bac4cd09bd1aa80b': (76 commits)
  KAFKA-15019: Improve handling of broker heartbeat timeouts (apache#13759)
  KAFKA-15003: Fix ZK sync logic for partition assignments (apache#13735)
  MINOR: Add 3.5 upgrade steps for ZK and KRaft (apache#13792)
  KAFKA-15010 ZK migration failover support (apache#13758)
  KAFKA-15017 Fix snapshot load in dual write mode for ClientQuotas and SCRAM  (apache#13757)
  MINOR: Update LICENSE-binary following snappy upgrade (apache#13791)
  Upgrade to snappy v1.1.10.0 (apache#13786)
  KAFKA-15004: Fix configuration dual-write during migration (apache#13767)
  KAFKA-8713: JsonConverter replace.null.with.default should prevent emitting default for Struct fields (apache#13781)
  KAFKA-14996: Handle overly large user operations on the kcontroller (apache#13742)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants