Skip to content

Conversation

@Markliniubility
Copy link
Member

@Markliniubility Markliniubility commented Mar 1, 2023

Fixes #3306.

Motivation

Kafka connectors may raise two exceptions during the polling process.

  1. when the consumer polls but is not subscribed to any topic, the thread raises an exception and the consumer shuts down permanently.
  2. RecordDeserializationException, which happens when the consumer fails to deserialize a record.

Modifications

For the first problem, I added a check on the subscription of the consumer.
The second issue is a bit more tricky. It was a bug in initializing the Kafka consumer. The added lines will fix the bug for the following reasons:

kafka client use Class.forName(trimmed, true, Utils.getContextOrKafkaClassLoader()) to get the Class object, and the create the instance, the key point is the classLoader, which is specified by the last param, the implementation of method Utils.getContextOrKafkaClassLoader() is

public static ClassLoader getContextOrKafkaClassLoader() {
    ClassLoader cl = Thread.currentThread().getContextClassLoader();
    if (cl == null)
        return getKafkaClassLoader();
    else
        return cl;
}

Documentation

  • Does this pull request introduce a new feature?
  • No
  • If yes, how is the feature documented? N/A
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@codecov
Copy link

codecov bot commented Mar 1, 2023

Codecov Report

Merging #3302 (d9e7030) into master (8a014d4) will decrease coverage by 0.01%.
The diff coverage is 0.00%.

❗ Current head d9e7030 differs from pull request most recent head 0b0786c. Consider uploading reports for the commit 0b0786c to get more accurate results

@@             Coverage Diff              @@
##             master    #3302      +/-   ##
============================================
- Coverage     13.08%   13.08%   -0.01%     
  Complexity     1159     1159              
============================================
  Files           550      550              
  Lines         28715    28719       +4     
  Branches       2852     2853       +1     
============================================
  Hits           3757     3757              
- Misses        24632    24636       +4     
  Partials        326      326              
Impacted Files Coverage Δ
...entmesh/connector/kafka/consumer/ConsumerImpl.java 0.00% <0.00%> (ø)
.../connector/kafka/consumer/KafkaConsumerRunner.java 0.00% <0.00%> (ø)

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@Markliniubility Markliniubility changed the title Fix bug on the Initialization of Deserializer [ISSUE #3306] Fix bug on the Initialization of Deserializer Mar 1, 2023
@Markliniubility Markliniubility changed the title [ISSUE #3306] Fix bug on the Initialization of Deserializer [ISSUE #3306] Fix the Initialization of Deserializer Mar 1, 2023
@Markliniubility Markliniubility changed the title [ISSUE #3306] Fix the Initialization of Deserializer [ISSUE #3306] Fix the Initialization of Deserializer on Kafka Connector Mar 1, 2023
@Markliniubility Markliniubility requested a review from xwm1992 March 2, 2023 03:37
@xwm1992 xwm1992 merged commit 155bd51 into master Mar 2, 2023
@qqeasonchen qqeasonchen deleted the kafka-bug-fix branch March 30, 2023 12:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] KafkaConsumerRunner Thread Closes Unexceptedly

2 participants