Skip to content

Commit 6c94d9d

Browse files
authored
[ISSUE #4011] add kafkaConnector module (#4180)
* [ISSUE #4011] add kafkaConnector module * [ISSUE #4011] add kafkaConnector module * [ISSUE #4011] fix codestyle * Fixes #4011. add default config for kafka sink and source
1 parent 7c2c554 commit 6c94d9d

File tree

14 files changed

+572
-0
lines changed

14 files changed

+572
-0
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
dependencies {
19+
implementation project(":eventmesh-openconnect:eventmesh-openconnect-java")
20+
implementation 'io.cloudevents:cloudevents-kafka:2.4.2'
21+
implementation 'org.apache.kafka:kafka-clients:3.0.0'
22+
compileOnly 'org.projectlombok:lombok'
23+
annotationProcessor 'org.projectlombok:lombok'
24+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
pluginType=connector
18+
pluginName=kafka
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.kafka.config;
19+
20+
import org.apache.eventmesh.openconnect.api.config.Config;
21+
22+
import lombok.Data;
23+
24+
@Data
25+
public class KafkaServerConfig extends Config {
26+
27+
private boolean sourceEnable;
28+
29+
private boolean sinkEnable;
30+
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.kafka.server;
19+
20+
import org.apache.eventmesh.connector.kafka.config.KafkaServerConfig;
21+
import org.apache.eventmesh.connector.kafka.sink.connector.KafkaSinkConnector;
22+
import org.apache.eventmesh.connector.kafka.source.connector.KafkaSourceConnector;
23+
import org.apache.eventmesh.openconnect.Application;
24+
import org.apache.eventmesh.openconnect.util.ConfigUtil;
25+
26+
import lombok.extern.slf4j.Slf4j;
27+
28+
@Slf4j
29+
public class KafkaConnectServer {
30+
31+
32+
public static void main(String[] args) throws Exception {
33+
34+
KafkaServerConfig serverConfig = ConfigUtil.parse(KafkaServerConfig.class, "server-config.yml");
35+
36+
if (serverConfig.isSourceEnable()) {
37+
Application kafkaSourceApp = new Application();
38+
kafkaSourceApp.run(KafkaSourceConnector.class);
39+
}
40+
41+
if (serverConfig.isSinkEnable()) {
42+
Application kafkaSinkApp = new Application();
43+
kafkaSinkApp.run(KafkaSinkConnector.class);
44+
}
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.kafka.sink.config;
19+
20+
import org.apache.eventmesh.openconnect.api.config.SinkConfig;
21+
22+
import lombok.Data;
23+
24+
@Data
25+
public class KafkaSinkConfig extends SinkConfig {
26+
27+
public SinkConnectorConfig connectorConfig;
28+
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.kafka.sink.config;
19+
20+
import lombok.Data;
21+
22+
@Data
23+
public class SinkConnectorConfig {
24+
25+
private String connectorName = "kafkaSink";
26+
private String topic = "TopicTest";
27+
private String ack = "all";
28+
private String bootstrapServers = "127.0.0.1:9092";
29+
private String keyConverter = "org.apache.kafka.common.serialization.StringSerializer";
30+
private String valueConverter = "org.apache.kafka.common.serialization.StringSerializer";
31+
private String maxRequestSize = "1048576";
32+
private String bufferMemory = "33554432";
33+
private String batchSize = "16384";
34+
private String lingerMs = "0";
35+
private String requestTimeoutMs = "30000";
36+
private String maxInFightRequestsPerConnection = "5";
37+
private String retries = "0";
38+
private String compressionType = "none";
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.kafka.sink.connector;
19+
20+
import org.apache.eventmesh.connector.kafka.sink.config.KafkaSinkConfig;
21+
import org.apache.eventmesh.openconnect.api.config.Config;
22+
import org.apache.eventmesh.openconnect.api.data.ConnectRecord;
23+
import org.apache.eventmesh.openconnect.api.sink.Sink;
24+
25+
import org.apache.kafka.clients.producer.KafkaProducer;
26+
import org.apache.kafka.clients.producer.Producer;
27+
import org.apache.kafka.clients.producer.ProducerConfig;
28+
import org.apache.kafka.clients.producer.ProducerRecord;
29+
import org.apache.kafka.common.header.Header;
30+
import org.apache.kafka.common.header.internals.RecordHeader;
31+
32+
import java.nio.charset.StandardCharsets;
33+
import java.util.ArrayList;
34+
import java.util.List;
35+
import java.util.Properties;
36+
37+
import lombok.extern.slf4j.Slf4j;
38+
39+
@Slf4j
40+
public class KafkaSinkConnector implements Sink {
41+
42+
private KafkaSinkConfig sinkConfig;
43+
44+
private Properties props = new Properties();
45+
Producer<String, String> producer;
46+
47+
@Override
48+
public Class<? extends Config> configClass() {
49+
return KafkaSinkConfig.class;
50+
}
51+
52+
@Override
53+
public void init(Config config) {
54+
this.sinkConfig = (KafkaSinkConfig) config;
55+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sinkConfig.getConnectorConfig().getBootstrapServers());
56+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, sinkConfig.getConnectorConfig().getKeyConverter());
57+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, sinkConfig.getConnectorConfig().getValueConverter());
58+
props.put(ProducerConfig.ACKS_CONFIG, sinkConfig.getConnectorConfig().getAck());
59+
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, sinkConfig.getConnectorConfig().getMaxRequestSize());
60+
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, sinkConfig.getConnectorConfig().getBufferMemory());
61+
props.put(ProducerConfig.BATCH_SIZE_CONFIG, sinkConfig.getConnectorConfig().getBatchSize());
62+
props.put(ProducerConfig.LINGER_MS_CONFIG, sinkConfig.getConnectorConfig().getLingerMs());
63+
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, sinkConfig.getConnectorConfig().getRequestTimeoutMs());
64+
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, sinkConfig.getConnectorConfig().getMaxInFightRequestsPerConnection());
65+
props.put(ProducerConfig.RETRIES_CONFIG, sinkConfig.getConnectorConfig().getRetries());
66+
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, sinkConfig.getConnectorConfig().getCompressionType());
67+
producer = new KafkaProducer<>(props);
68+
}
69+
70+
@Override
71+
public void start() throws Exception {
72+
}
73+
74+
@Override
75+
public void commit(ConnectRecord record) {
76+
77+
}
78+
79+
@Override
80+
public String name() {
81+
return this.sinkConfig.getConnectorConfig().getConnectorName();
82+
}
83+
84+
@Override
85+
public void stop() {
86+
producer.close();
87+
}
88+
89+
@Override
90+
public void put(List<ConnectRecord> sinkRecords) {
91+
try {
92+
for (ConnectRecord connectRecord : sinkRecords) {
93+
ProducerRecord message = convertRecordToMessage(connectRecord);
94+
producer.send(message, (metadata, exception) -> {
95+
if (exception == null) {
96+
log.debug("Produced message to topic:{},partition:{},offset:{}", metadata.topic(), metadata.partition(), metadata.offset());
97+
} else {
98+
log.error("Failed to produce message:{}", exception.getMessage());
99+
}
100+
});
101+
}
102+
} catch (Exception e) {
103+
log.error("Failed to produce message:{}", e.getMessage());
104+
}
105+
}
106+
107+
public ProducerRecord convertRecordToMessage(ConnectRecord connectRecord) {
108+
List<Header> headers = new ArrayList<>();
109+
for (String key : connectRecord.getExtensions().keySet()) {
110+
headers.add(new RecordHeader(key, connectRecord.getExtension(key).getBytes(StandardCharsets.UTF_8)));
111+
}
112+
ProducerRecord message = new ProducerRecord(this.sinkConfig.getConnectorConfig().getTopic(), null, "",
113+
new String((byte[]) connectRecord.getData(), StandardCharsets.UTF_8), headers);
114+
return message;
115+
}
116+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.kafka.source.config;
19+
20+
import org.apache.eventmesh.openconnect.api.config.SourceConfig;
21+
22+
import lombok.Data;
23+
24+
@Data
25+
public class KafkaSourceConfig extends SourceConfig {
26+
27+
public SourceConnectorConfig connectorConfig;
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
19+
package org.apache.eventmesh.connector.kafka.source.config;
20+
21+
import lombok.Data;
22+
23+
@Data
24+
public class SourceConnectorConfig {
25+
26+
private String connectorName = "kafkaSource";
27+
private String topic = "TopicTest";
28+
private String bootstrapServers = "127.0.0.1:9092";
29+
private String groupID = "kafkaSource";
30+
private String keyConverter = "org.apache.kafka.common.serialization.StringSerializer";
31+
private String valueConverter = "org.apache.kafka.common.serialization.StringSerializer";
32+
private String autoCommitIntervalMS = "1000";
33+
private String enableAutoCommit = "false";
34+
private String sessionTimeoutMS = "3000";
35+
private String maxPollRecords = "1000";
36+
private int pollTimeOut = 100;
37+
}

0 commit comments

Comments
 (0)