Skip to content

Commit d29324a

Browse files
authored
Merge da7ae82 into 3392b45
2 parents 3392b45 + da7ae82 commit d29324a

File tree

14 files changed

+567
-0
lines changed

14 files changed

+567
-0
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
dependencies {
19+
implementation project(":eventmesh-openconnect:eventmesh-openconnect-java")
20+
implementation 'io.cloudevents:cloudevents-kafka:2.4.2'
21+
implementation 'org.apache.kafka:kafka-clients:3.0.0'
22+
compileOnly 'org.projectlombok:lombok'
23+
annotationProcessor 'org.projectlombok:lombok'
24+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
pluginType=connector
18+
pluginName=kafka
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.kafka.config;
19+
20+
import org.apache.eventmesh.openconnect.api.config.Config;
21+
22+
import lombok.Data;
23+
24+
@Data
25+
public class KafkaServerConfig extends Config {
26+
27+
private boolean sourceEnable;
28+
29+
private boolean sinkEnable;
30+
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.kafka.server;
19+
20+
import org.apache.eventmesh.connector.kafka.config.KafkaServerConfig;
21+
import org.apache.eventmesh.connector.kafka.sink.connector.KafkaSinkConnector;
22+
import org.apache.eventmesh.connector.kafka.source.connector.KafkaSourceConnector;
23+
import org.apache.eventmesh.openconnect.Application;
24+
import org.apache.eventmesh.openconnect.util.ConfigUtil;
25+
26+
import lombok.extern.slf4j.Slf4j;
27+
28+
@Slf4j
29+
public class KafkaConnectServer {
30+
31+
32+
public static void main(String[] args) throws Exception {
33+
34+
KafkaServerConfig serverConfig = ConfigUtil.parse(KafkaServerConfig.class, "server-config.yml");
35+
36+
if (serverConfig.isSourceEnable()) {
37+
Application kafkaSourceApp = new Application();
38+
kafkaSourceApp.run(KafkaSourceConnector.class);
39+
}
40+
41+
if (serverConfig.isSinkEnable()) {
42+
Application kafkaSinkApp = new Application();
43+
kafkaSinkApp.run(KafkaSinkConnector.class);
44+
}
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.kafka.sink.config;
19+
20+
import org.apache.eventmesh.openconnect.api.config.SinkConfig;
21+
22+
import lombok.Data;
23+
24+
@Data
25+
public class KafkaSinkConfig extends SinkConfig {
26+
27+
public SinkConnectorConfig connectorConfig;
28+
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.kafka.sink.config;
19+
20+
import lombok.Data;
21+
22+
@Data
23+
public class SinkConnectorConfig {
24+
private String connectorName;
25+
private String tasksMax;
26+
private String topic;
27+
private String bootstrapServers;
28+
private String groupID;
29+
private String keyConverter;
30+
private String valueConverter;
31+
private String offsetFlushIntervalMS;
32+
private String offsetStorageTopic;
33+
private String offsetStorageReplicationFactor;
34+
private String configStorageTopic;
35+
private String configStorageReplicationFactor;
36+
private String statusStorageTopic;
37+
private String statusStorageReplicationFactor;
38+
private String offsetCommitTimeoutMS;
39+
private String offsetCommitIntervalMS;
40+
private String heartbeatIntervalMS;
41+
private String sessionTimeoutMS;
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.kafka.sink.connector;
19+
20+
import org.apache.eventmesh.connector.kafka.sink.config.KafkaSinkConfig;
21+
import org.apache.eventmesh.openconnect.api.config.Config;
22+
import org.apache.eventmesh.openconnect.api.data.ConnectRecord;
23+
import org.apache.eventmesh.openconnect.api.sink.Sink;
24+
25+
import org.apache.kafka.clients.producer.KafkaProducer;
26+
import org.apache.kafka.clients.producer.Producer;
27+
import org.apache.kafka.clients.producer.ProducerConfig;
28+
import org.apache.kafka.clients.producer.ProducerRecord;
29+
import org.apache.kafka.common.header.Header;
30+
import org.apache.kafka.common.header.internals.RecordHeader;
31+
32+
import java.nio.charset.StandardCharsets;
33+
import java.util.ArrayList;
34+
import java.util.List;
35+
import java.util.Properties;
36+
37+
import lombok.extern.slf4j.Slf4j;
38+
39+
@Slf4j
40+
public class KafkaSinkConnector implements Sink {
41+
42+
private KafkaSinkConfig sinkConfig;
43+
44+
private Properties props = new Properties();
45+
Producer<String, String> producer;
46+
47+
@Override
48+
public Class<? extends Config> configClass() {
49+
return KafkaSinkConfig.class;
50+
}
51+
52+
@Override
53+
public void init(Config config) throws Exception {
54+
this.sinkConfig = (KafkaSinkConfig) config;
55+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sinkConfig.getConnectorConfig().getBootstrapServers());
56+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, sinkConfig.getConnectorConfig().getKeyConverter());
57+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, sinkConfig.getConnectorConfig().getValueConverter());
58+
producer = new KafkaProducer<>(props);
59+
}
60+
61+
@Override
62+
public void start() throws Exception {
63+
}
64+
65+
@Override
66+
public void commit(ConnectRecord record) {
67+
68+
}
69+
70+
@Override
71+
public String name() {
72+
return this.sinkConfig.getConnectorConfig().getConnectorName();
73+
}
74+
75+
@Override
76+
public void stop() {
77+
producer.close();
78+
}
79+
80+
@Override
81+
public void put(List<ConnectRecord> sinkRecords) {
82+
try {
83+
for (ConnectRecord connectRecord : sinkRecords) {
84+
ProducerRecord message = convertRecordToMessage(connectRecord);
85+
producer.send(message, (metadata, exception) -> {
86+
if (exception == null) {
87+
log.debug("Produced message to topic:{},partition:{},offset:{}", metadata.topic(), metadata.partition(), metadata.offset());
88+
} else {
89+
log.error("Failed to produce message:{}", exception.getMessage());
90+
}
91+
});
92+
}
93+
} catch (Exception e) {
94+
log.error("Failed to produce message:{}", e.getMessage());
95+
}
96+
}
97+
98+
public ProducerRecord convertRecordToMessage(ConnectRecord connectRecord) {
99+
List<Header> headers = new ArrayList<>();
100+
for (String key : connectRecord.getExtensions().keySet()) {
101+
headers.add(new RecordHeader(key, connectRecord.getExtension(key).getBytes(StandardCharsets.UTF_8)));
102+
}
103+
ProducerRecord message = new ProducerRecord(this.sinkConfig.getConnectorConfig().getTopic(), null, "",
104+
new String((byte[]) connectRecord.getData(), StandardCharsets.UTF_8), headers);
105+
return message;
106+
}
107+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.kafka.source.config;
19+
20+
import org.apache.eventmesh.openconnect.api.config.SourceConfig;
21+
22+
import lombok.Data;
23+
24+
@Data
25+
public class KafkaSourceConfig extends SourceConfig {
26+
27+
public SourceConnectorConfig connectorConfig;
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
19+
package org.apache.eventmesh.connector.kafka.source.config;
20+
21+
import lombok.Data;
22+
23+
@Data
24+
public class SourceConnectorConfig {
25+
private String connectorName;
26+
private String tasksMax;
27+
private String topic;
28+
private String bootstrapServers;
29+
private String groupID;
30+
private String keyConverter;
31+
private String valueConverter;
32+
private String offsetFlushIntervalMS;
33+
private String offsetStorageTopic;
34+
private String offsetStorageReplicationFactor;
35+
private String configStorageTopic;
36+
private String configStorageReplicationFactor;
37+
private String statusStorageTopic;
38+
private String statusStorageReplicationFactor;
39+
private String offsetCommitTimeoutMS;
40+
private String offsetCommitIntervalMS;
41+
private String heartbeatIntervalMS;
42+
private String sessionTimeoutMS;
43+
}

0 commit comments

Comments
 (0)