Skip to content

KAFKA-9216: Enforce internal config topic settings for Connect workers during startup#8270

Merged
kkonstantine merged 2 commits intoapache:trunkfrom
Evelyn-Bayes:KAFKA-9216
Jun 7, 2020
Merged

KAFKA-9216: Enforce internal config topic settings for Connect workers during startup#8270
kkonstantine merged 2 commits intoapache:trunkfrom
Evelyn-Bayes:KAFKA-9216

Conversation

@Evelyn-Bayes
Copy link
Copy Markdown
Contributor

Currently, if Kafka Connect will create its config backing topic with a fire and forget approach.
This is fine unless someone has manually created that topic already with the wrong partition count.

In such a case Kafka Connect "may" run for some time.
Especially if it's in standalone mode and once switched to distributed mode it will almost certainly fail.

To counter this I've added a check when the KafkaConfigBackingStore is starting.
This check will throw a ConfigException if there is more than one partition in the backing store.

This exception is then caught upstream and logged by either:

  • class: DistributedHerder, method: run
  • class: ConnectStandalone, method: main

After a review I don't believe it impacts any other upstream code.

Finally, to supper this new functionality I've added a public method to KafkaBasedLog which returns the partition count and a variable to store this.

And, I've created a unit test in KafkaConfigBackingStoreTest to verify the behaviour.

Copy link
Copy Markdown
Contributor

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @Evelyn-Bayes !

Seems quite useful to fix this gap in the validation of misconfigured config topics that are not created by Connect itself.

Left a few, mainly stylistic comments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: related to the naming comment above.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: traditionally we skip the get/set prefixes in method names in this code base (there maybe a few counter examples floating around).

How about, partitionCount?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have the topic name as member variable topic. Probably best if we use it as well.

Here's a suggestion, after checking https://kafka.apache.org/documentation/#connect_running

            String msg = String.format("Topic '%s' supplied via the '%s' property is required "
                    + "to have a single partition in order to guarantee consistency of "
                    + "connector configurations but found %d partitions.",
                    topic, DistributedConfig.CONFIG_TOPIC_CONFIG, partitionCount);
            throw new ConfigException(msg);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test looks great!
But fyi, we have org.junit.Assert.assertThrows; and org.hamcrest.MatcherAssert.assertThat, org.hamcrest.CoreMatchers.containsString in our disposal now, that allow for better assertions in tests that throw exceptions (added the full classnames because they are conflicts with deprecated methods in junit)

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented May 20, 2020

ok to test

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented May 25, 2020

retest this please

1 similar comment
@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Jun 5, 2020

retest this please

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Jun 7, 2020

The build was failing in a streams test, so I rebased to get a fix for that.

@kkonstantine I also included a commit that incorporated your feedback. Please take another look.

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Jun 7, 2020

retest this please

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Jun 7, 2020

ok to test

Copy link
Copy Markdown
Contributor

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Evelyn-Bayes for the contribution and @rhauch for the latest commit.

This LGTM now.

@kkonstantine kkonstantine changed the title KAFKA-9216: Enforce connect internal topic configuration at startup KAFKA-9216: Enforce internal config topic settings for Connect workers during startup Jun 7, 2020
@kkonstantine kkonstantine merged commit 9a0b694 into apache:trunk Jun 7, 2020
kkonstantine pushed a commit that referenced this pull request Jun 7, 2020
…s during startup (#8270)

Currently, Kafka Connect creates its config backing topic with a fire and forget approach.
This is fine unless someone has manually created that topic already with the wrong partition count.

In such a case Kafka Connect may run for some time. Especially if it's in standalone mode and once switched to distributed mode it will almost certainly fail.

This commits adds a check when the KafkaConfigBackingStore is starting.
This check will throw a ConfigException if there is more than one partition in the backing store.

This exception is then caught upstream and logged by either:
- DistributedHerder#run
- ConnectStandalone#main

A unit tests was added in KafkaConfigBackingStoreTest to verify the behaviour.

Author: Evelyn Bayes <[email protected]>
Co-authored-by: Randall Hauch <[email protected]>

Reviewer: Konstantine Karantasis <[email protected]>
kkonstantine pushed a commit that referenced this pull request Jun 7, 2020
…s during startup (#8270)

Currently, Kafka Connect creates its config backing topic with a fire and forget approach.
This is fine unless someone has manually created that topic already with the wrong partition count.

In such a case Kafka Connect may run for some time. Especially if it's in standalone mode and once switched to distributed mode it will almost certainly fail.

This commits adds a check when the KafkaConfigBackingStore is starting.
This check will throw a ConfigException if there is more than one partition in the backing store.

This exception is then caught upstream and logged by either:
- DistributedHerder#run
- ConnectStandalone#main

A unit tests was added in KafkaConfigBackingStoreTest to verify the behaviour.

Author: Evelyn Bayes <[email protected]>
Co-authored-by: Randall Hauch <[email protected]>

Reviewer: Konstantine Karantasis <[email protected]>
kkonstantine pushed a commit that referenced this pull request Jun 7, 2020
…s during startup (#8270)

Currently, Kafka Connect creates its config backing topic with a fire and forget approach.
This is fine unless someone has manually created that topic already with the wrong partition count.

In such a case Kafka Connect may run for some time. Especially if it's in standalone mode and once switched to distributed mode it will almost certainly fail.

This commits adds a check when the KafkaConfigBackingStore is starting.
This check will throw a ConfigException if there is more than one partition in the backing store.

This exception is then caught upstream and logged by either:
- DistributedHerder#run
- ConnectStandalone#main

A unit tests was added in KafkaConfigBackingStoreTest to verify the behaviour.

Author: Evelyn Bayes <[email protected]>
Co-authored-by: Randall Hauch <[email protected]>

Reviewer: Konstantine Karantasis <[email protected]>
kkonstantine pushed a commit that referenced this pull request Jun 7, 2020
…s during startup (#8270)

Currently, Kafka Connect creates its config backing topic with a fire and forget approach.
This is fine unless someone has manually created that topic already with the wrong partition count.

In such a case Kafka Connect may run for some time. Especially if it's in standalone mode and once switched to distributed mode it will almost certainly fail.

This commits adds a check when the KafkaConfigBackingStore is starting.
This check will throw a ConfigException if there is more than one partition in the backing store.

This exception is then caught upstream and logged by either:
- DistributedHerder#run
- ConnectStandalone#main

A unit tests was added in KafkaConfigBackingStoreTest to verify the behaviour.

Author: Evelyn Bayes <[email protected]>
Co-authored-by: Randall Hauch <[email protected]>

Reviewer: Konstantine Karantasis <[email protected]>
ijuma added a commit to confluentinc/kafka that referenced this pull request Jun 8, 2020
Conflicts:
* build.gradle: take upstream changes regarding heap memory
configuration for the build.

* apache-github/trunk: (33 commits)
  MINOR: fix HTML markup (apache#8823)
  KAFKA-10012; Reduce overhead of strings in SelectorMetrics (apache#8684)
KAFKA-9216: Enforce internal config topic settings for Connect workers
during startup (apache#8270)
  KAFKA-10097: Internalize checkpoint data (apache#8820)
KAFKA-10033: Throw UnknownTopicOrPartitionException when modifying a
non-existent topic's config
MINOR: improve code encapsulation between StreamThread and TaskManager
(apache#8819)
  Fixing KAFKA-10094 (apache#8797)
KAFKA-9851: Revoking Connect tasks due to connectivity issues should
also clear the running assignment (apache#8804)
KAFKA-9840; Skip End Offset validation when the leader epoch is not
reliable (apache#8486)
  HOT_FIX: Update javadoc since imports added (apache#8817)
  KAFKA-8011: Fix flaky RegexSourceIntegrationTest (apache#8799)
KAFKA-9570: Define SSL configs in all worker config classes, not just
distributed (apache#8135)
KAFKA-10111: Make SinkTaskContext.errantRecordReporter() a default
method (apache#8814)
KAFKA-10110: Corrected potential NPE when null label value added to
KafkaMetricsContext (apache#8811)
MINOR: Change the order that Connect calls `config()` and `validate()`
to avoid validating if the required ConfigDef is null (apache#8810)
MINOR: fix backwards incompatibility in JmxReporter introduced by
KIP-606
  MINOR: Fix javadoc warnings (apache#8809)
  KAFKA-9441: Improve Kafka Streams task management (apache#8776)
  fix the broken links of streams javadoc (apache#8789)
KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric
more efficient (apache#8724)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants