KAFKA-9216: Enforce internal config topic settings for Connect workers during startup#8270
KAFKA-9216: Enforce internal config topic settings for Connect workers during startup#8270kkonstantine merged 2 commits intoapache:trunkfrom Evelyn-Bayes:KAFKA-9216
Conversation
kkonstantine
left a comment
There was a problem hiding this comment.
Thanks for the PR @Evelyn-Bayes !
Seems quite useful to fix this gap in the validation of misconfigured config topics that are not created by Connect itself.
Left a few, mainly stylistic comments.
There was a problem hiding this comment.
nit: related to the naming comment above.
There was a problem hiding this comment.
nit: traditionally we skip the get/set prefixes in method names in this code base (there maybe a few counter examples floating around).
How about, partitionCount?
There was a problem hiding this comment.
we have the topic name as member variable topic. Probably best if we use it as well.
Here's a suggestion, after checking https://kafka.apache.org/documentation/#connect_running
String msg = String.format("Topic '%s' supplied via the '%s' property is required "
+ "to have a single partition in order to guarantee consistency of "
+ "connector configurations but found %d partitions.",
topic, DistributedConfig.CONFIG_TOPIC_CONFIG, partitionCount);
throw new ConfigException(msg);
There was a problem hiding this comment.
The test looks great!
But fyi, we have org.junit.Assert.assertThrows; and org.hamcrest.MatcherAssert.assertThat, org.hamcrest.CoreMatchers.containsString in our disposal now, that allow for better assertions in tests that throw exceptions (added the full classnames because they are conflicts with deprecated methods in junit)
|
ok to test |
|
retest this please |
1 similar comment
|
retest this please |
|
The build was failing in a streams test, so I rebased to get a fix for that. @kkonstantine I also included a commit that incorporated your feedback. Please take another look. |
|
retest this please |
|
ok to test |
kkonstantine
left a comment
There was a problem hiding this comment.
Thanks @Evelyn-Bayes for the contribution and @rhauch for the latest commit.
This LGTM now.
…s during startup (#8270) Currently, Kafka Connect creates its config backing topic with a fire and forget approach. This is fine unless someone has manually created that topic already with the wrong partition count. In such a case Kafka Connect may run for some time. Especially if it's in standalone mode and once switched to distributed mode it will almost certainly fail. This commits adds a check when the KafkaConfigBackingStore is starting. This check will throw a ConfigException if there is more than one partition in the backing store. This exception is then caught upstream and logged by either: - DistributedHerder#run - ConnectStandalone#main A unit tests was added in KafkaConfigBackingStoreTest to verify the behaviour. Author: Evelyn Bayes <[email protected]> Co-authored-by: Randall Hauch <[email protected]> Reviewer: Konstantine Karantasis <[email protected]>
…s during startup (#8270) Currently, Kafka Connect creates its config backing topic with a fire and forget approach. This is fine unless someone has manually created that topic already with the wrong partition count. In such a case Kafka Connect may run for some time. Especially if it's in standalone mode and once switched to distributed mode it will almost certainly fail. This commits adds a check when the KafkaConfigBackingStore is starting. This check will throw a ConfigException if there is more than one partition in the backing store. This exception is then caught upstream and logged by either: - DistributedHerder#run - ConnectStandalone#main A unit tests was added in KafkaConfigBackingStoreTest to verify the behaviour. Author: Evelyn Bayes <[email protected]> Co-authored-by: Randall Hauch <[email protected]> Reviewer: Konstantine Karantasis <[email protected]>
…s during startup (#8270) Currently, Kafka Connect creates its config backing topic with a fire and forget approach. This is fine unless someone has manually created that topic already with the wrong partition count. In such a case Kafka Connect may run for some time. Especially if it's in standalone mode and once switched to distributed mode it will almost certainly fail. This commits adds a check when the KafkaConfigBackingStore is starting. This check will throw a ConfigException if there is more than one partition in the backing store. This exception is then caught upstream and logged by either: - DistributedHerder#run - ConnectStandalone#main A unit tests was added in KafkaConfigBackingStoreTest to verify the behaviour. Author: Evelyn Bayes <[email protected]> Co-authored-by: Randall Hauch <[email protected]> Reviewer: Konstantine Karantasis <[email protected]>
…s during startup (#8270) Currently, Kafka Connect creates its config backing topic with a fire and forget approach. This is fine unless someone has manually created that topic already with the wrong partition count. In such a case Kafka Connect may run for some time. Especially if it's in standalone mode and once switched to distributed mode it will almost certainly fail. This commits adds a check when the KafkaConfigBackingStore is starting. This check will throw a ConfigException if there is more than one partition in the backing store. This exception is then caught upstream and logged by either: - DistributedHerder#run - ConnectStandalone#main A unit tests was added in KafkaConfigBackingStoreTest to verify the behaviour. Author: Evelyn Bayes <[email protected]> Co-authored-by: Randall Hauch <[email protected]> Reviewer: Konstantine Karantasis <[email protected]>
Conflicts: * build.gradle: take upstream changes regarding heap memory configuration for the build. * apache-github/trunk: (33 commits) MINOR: fix HTML markup (apache#8823) KAFKA-10012; Reduce overhead of strings in SelectorMetrics (apache#8684) KAFKA-9216: Enforce internal config topic settings for Connect workers during startup (apache#8270) KAFKA-10097: Internalize checkpoint data (apache#8820) KAFKA-10033: Throw UnknownTopicOrPartitionException when modifying a non-existent topic's config MINOR: improve code encapsulation between StreamThread and TaskManager (apache#8819) Fixing KAFKA-10094 (apache#8797) KAFKA-9851: Revoking Connect tasks due to connectivity issues should also clear the running assignment (apache#8804) KAFKA-9840; Skip End Offset validation when the leader epoch is not reliable (apache#8486) HOT_FIX: Update javadoc since imports added (apache#8817) KAFKA-8011: Fix flaky RegexSourceIntegrationTest (apache#8799) KAFKA-9570: Define SSL configs in all worker config classes, not just distributed (apache#8135) KAFKA-10111: Make SinkTaskContext.errantRecordReporter() a default method (apache#8814) KAFKA-10110: Corrected potential NPE when null label value added to KafkaMetricsContext (apache#8811) MINOR: Change the order that Connect calls `config()` and `validate()` to avoid validating if the required ConfigDef is null (apache#8810) MINOR: fix backwards incompatibility in JmxReporter introduced by KIP-606 MINOR: Fix javadoc warnings (apache#8809) KAFKA-9441: Improve Kafka Streams task management (apache#8776) fix the broken links of streams javadoc (apache#8789) KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient (apache#8724) ...
Currently, if Kafka Connect will create its config backing topic with a fire and forget approach.
This is fine unless someone has manually created that topic already with the wrong partition count.
In such a case Kafka Connect "may" run for some time.
Especially if it's in standalone mode and once switched to distributed mode it will almost certainly fail.
To counter this I've added a check when the KafkaConfigBackingStore is starting.
This check will throw a ConfigException if there is more than one partition in the backing store.
This exception is then caught upstream and logged by either:
After a review I don't believe it impacts any other upstream code.
Finally, to supper this new functionality I've added a public method to KafkaBasedLog which returns the partition count and a variable to store this.
And, I've created a unit test in KafkaConfigBackingStoreTest to verify the behaviour.