Skip to content

Commit d9e7030

Browse files
Merge 0b0786c into 6636e3a
2 parents 6636e3a + 0b0786c commit d9e7030

File tree

2 files changed

+13
-9
lines changed

2 files changed

+13
-9
lines changed

eventmesh-connector-plugin/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/consumer/ConsumerImpl.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,14 @@ public class ConsumerImpl {
5353
private Set<String> topicsSet;
5454

5555
public ConsumerImpl(final Properties properties) {
56+
// Setting the ClassLoader to null is necessary for Kafka consumer configuration
57+
final ClassLoader original = Thread.currentThread().getContextClassLoader();
58+
Thread.currentThread().setContextClassLoader(null);
59+
5660
Properties props = new Properties();
57-
58-
// Other config props
5961
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
60-
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CloudEventDeserializer.class);
6162
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
63+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CloudEventDeserializer.class);
6264
props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
6365
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
6466

@@ -67,6 +69,8 @@ public ConsumerImpl(final Properties properties) {
6769
kafkaConsumerRunner = new KafkaConsumerRunner(this.kafkaConsumer);
6870
executorService = Executors.newFixedThreadPool(10);
6971
topicsSet = new HashSet<>();
72+
73+
Thread.currentThread().setContextClassLoader(original);
7074
}
7175

7276
public Properties attributes() {

eventmesh-connector-plugin/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/consumer/KafkaConsumerRunner.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ public long getOffset(CloudEvent cloudEvent) {
6262
public void run() {
6363
try {
6464
while (!closed.get()) {
65+
if (consumer.subscription().isEmpty()) {
66+
// consumer cannot poll if it is subscribe to nothing
67+
continue;
68+
}
6569
ConsumerRecords<String, CloudEvent> records = consumer.poll(Duration.ofMillis(10000));
6670
// Handle new records
6771
records.forEach(rec -> {
@@ -82,8 +86,7 @@ public void commit(EventMeshAction action) {
8286
break;
8387
case ManualAck:
8488
// update offset
85-
log
86-
.info("message ack, topic: {}, current offset:{}", topicName, rec.offset());
89+
log.info("message ack, topic: {}, current offset:{}", topicName, rec.offset());
8790
break;
8891
default:
8992
}
@@ -113,7 +116,4 @@ public void shutdown() {
113116
closed.set(true);
114117
consumer.wakeup();
115118
}
116-
}
117-
118-
119-
119+
}

0 commit comments

Comments
 (0)