Skip to content

Commit fd1102a

Browse files
authored
Merge pull request #4393 from fabian4/rabbitmq_plugin
[ISSUE #4371] Move Rabbitmq plugin into Connector from Storage plugin moudle
2 parents 95ec173 + 3f1783f commit fd1102a

File tree

26 files changed

+1202
-106
lines changed

26 files changed

+1202
-106
lines changed

eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/sink/connector/MongodbSinkConnector.java

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,11 @@
2626
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
2727
import org.apache.eventmesh.openconnect.api.sink.Sink;
2828
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
29+
import org.apache.eventmesh.openconnect.util.CloudEventUtil;
2930

30-
import java.net.URI;
31-
import java.net.URISyntaxException;
3231
import java.util.List;
3332

3433
import io.cloudevents.CloudEvent;
35-
import io.cloudevents.core.builder.CloudEventBuilder;
3634

3735
import com.mongodb.connection.ClusterType;
3836

@@ -98,40 +96,12 @@ public void stop() throws Exception {
9896
public void put(List<ConnectRecord> sinkRecords) {
9997
try {
10098
for (ConnectRecord connectRecord : sinkRecords) {
101-
CloudEvent event = convertRecordToEvent(connectRecord);
99+
CloudEvent event = CloudEventUtil.convertRecordToEvent(connectRecord);
102100
client.publish(event);
103101
log.debug("Produced message to event:{}}", event);
104102
}
105103
} catch (Exception e) {
106104
log.error("Failed to produce message:{}", e.getMessage());
107105
}
108106
}
109-
110-
public CloudEvent convertRecordToEvent(ConnectRecord connectRecord) {
111-
CloudEventBuilder cloudEventBuilder = CloudEventBuilder.v1()
112-
.withData((byte[]) connectRecord.getData());
113-
connectRecord.getExtensions().keySet().forEach(s -> {
114-
switch (s) {
115-
case "id":
116-
cloudEventBuilder.withId(connectRecord.getExtension(s));
117-
break;
118-
case "topic":
119-
cloudEventBuilder.withSubject(connectRecord.getExtension(s));
120-
break;
121-
case "source":
122-
try {
123-
cloudEventBuilder.withSource(new URI(connectRecord.getExtension(s)));
124-
} catch (URISyntaxException e) {
125-
throw new RuntimeException(e);
126-
}
127-
break;
128-
case "type":
129-
cloudEventBuilder.withType(connectRecord.getExtension(s));
130-
break;
131-
default:
132-
cloudEventBuilder.withExtension(s, connectRecord.getExtension(s));
133-
}
134-
});
135-
return cloudEventBuilder.build();
136-
}
137107
}

eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@
2626
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
2727
import org.apache.eventmesh.openconnect.api.source.Source;
2828
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
29+
import org.apache.eventmesh.openconnect.util.CloudEventUtil;
2930

3031
import java.util.ArrayList;
3132
import java.util.List;
32-
import java.util.Objects;
3333
import java.util.concurrent.BlockingQueue;
3434
import java.util.concurrent.LinkedBlockingQueue;
3535
import java.util.concurrent.TimeUnit;
@@ -109,24 +109,11 @@ public List<ConnectRecord> poll() {
109109
break;
110110
}
111111

112-
connectRecords.add(convertEventToRecord(event));
112+
connectRecords.add(CloudEventUtil.convertEventToRecord(event));
113113
} catch (InterruptedException e) {
114114
break;
115115
}
116116
}
117117
return connectRecords;
118118
}
119-
120-
public ConnectRecord convertEventToRecord(CloudEvent event) {
121-
byte[] body = Objects.requireNonNull(event.getData()).toBytes();
122-
ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis(), body);
123-
for (String extensionName : event.getExtensionNames()) {
124-
connectRecord.addExtension(extensionName, Objects.requireNonNull(event.getExtension(extensionName)).toString());
125-
}
126-
connectRecord.addExtension("id", event.getId());
127-
connectRecord.addExtension("topic", event.getSubject());
128-
connectRecord.addExtension("source", event.getSource().toString());
129-
connectRecord.addExtension("type", event.getType());
130-
return connectRecord;
131-
}
132119
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
19+
configurations {
20+
implementation.exclude group: 'ch.qos.logback', module: 'logback-classic'
21+
implementation.exclude group: 'log4j', module: 'log4j'
22+
}
23+
24+
dependencies {
25+
api project(":eventmesh-openconnect:eventmesh-openconnect-java")
26+
implementation project(":eventmesh-common")
27+
// rabbitmq
28+
implementation 'com.rabbitmq:amqp-client:5.16.0'
29+
30+
implementation 'io.cloudevents:cloudevents-json-jackson'
31+
32+
compileOnly 'org.projectlombok:lombok'
33+
annotationProcessor 'org.projectlombok:lombok'
34+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
pluginType=connector
18+
pluginName=rabbitmq
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.rabbitmq.client;
19+
20+
import org.apache.commons.lang3.StringUtils;
21+
22+
import com.rabbitmq.client.BuiltinExchangeType;
23+
import com.rabbitmq.client.Channel;
24+
import com.rabbitmq.client.Connection;
25+
import com.rabbitmq.client.ConnectionFactory;
26+
27+
import lombok.extern.slf4j.Slf4j;
28+
29+
@Slf4j
30+
public class RabbitmqClient {
31+
32+
33+
private final RabbitmqConnectionFactory rabbitmqConnectionFactory;
34+
35+
public RabbitmqClient(RabbitmqConnectionFactory rabbitmqConnectionFactory) {
36+
this.rabbitmqConnectionFactory = rabbitmqConnectionFactory;
37+
}
38+
39+
40+
/**
41+
* get rabbitmq connection
42+
*
43+
* @param host host
44+
* @param username username
45+
* @param passwd password
46+
* @param port port
47+
* @param virtualHost virtual host
48+
* @return connection
49+
* @throws Exception Exception
50+
*/
51+
public Connection getConnection(String host, String username,
52+
String passwd, int port,
53+
String virtualHost) throws Exception {
54+
ConnectionFactory factory = rabbitmqConnectionFactory.createConnectionFactory();
55+
factory.setHost(host.trim());
56+
factory.setPort(port);
57+
if (StringUtils.isNotEmpty(virtualHost)) {
58+
factory.setVirtualHost(virtualHost.trim());
59+
}
60+
factory.setUsername(username);
61+
factory.setPassword(passwd.trim());
62+
63+
return rabbitmqConnectionFactory.createConnection(factory);
64+
}
65+
66+
/**
67+
* send message
68+
*
69+
* @param channel channel
70+
* @param exchangeName exchange name
71+
* @param routingKey routing key
72+
* @param message message
73+
* @throws Exception Exception
74+
*/
75+
public void publish(Channel channel, String exchangeName,
76+
String routingKey, byte[] message) throws Exception {
77+
channel.basicPublish(exchangeName, routingKey, null, message);
78+
}
79+
80+
/**
81+
* binding queue
82+
*
83+
* @param channel channel
84+
* @param builtinExchangeType exchange type
85+
* @param exchangeName exchange name
86+
* @param routingKey routing key
87+
* @param queueName queue name
88+
*/
89+
public void binding(Channel channel, BuiltinExchangeType builtinExchangeType,
90+
String exchangeName, String routingKey, String queueName) {
91+
try {
92+
channel.exchangeDeclare(exchangeName, builtinExchangeType.getType(), true,
93+
false, false, null);
94+
channel.queueDeclare(queueName, false, false,
95+
false, null);
96+
routingKey = builtinExchangeType.getType().equals(BuiltinExchangeType.FANOUT.getType()) ? "" : routingKey;
97+
channel.queueBind(queueName, exchangeName, routingKey);
98+
} catch (Exception ex) {
99+
log.error("[RabbitmqClient] binding happen exception.", ex);
100+
}
101+
}
102+
103+
/**
104+
* unbinding queue
105+
*
106+
* @param channel channel
107+
* @param exchangeName exchange name
108+
* @param routingKey routing key
109+
* @param queueName queue name
110+
*/
111+
public void unbinding(Channel channel, String exchangeName, String routingKey, String queueName) {
112+
try {
113+
channel.queueUnbind(queueName, exchangeName, routingKey);
114+
} catch (Exception ex) {
115+
log.error("[RabbitmqClient] unbinding happen exception.", ex);
116+
}
117+
}
118+
119+
/**
120+
* close connection
121+
*
122+
* @param connection connection
123+
*/
124+
public void closeConnection(Connection connection) {
125+
if (null != connection) {
126+
try {
127+
connection.close();
128+
} catch (Exception ex) {
129+
log.error("[RabbitmqClient] connection close happen exception.", ex);
130+
}
131+
}
132+
}
133+
134+
/**
135+
* close channel
136+
*
137+
* @param channel channel
138+
*/
139+
public void closeChannel(Channel channel) {
140+
if (null != channel) {
141+
try {
142+
channel.close();
143+
} catch (Exception ex) {
144+
log.error("[RabbitmqClient] channel close happen exception.", ex);
145+
}
146+
}
147+
}
148+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.rabbitmq.client;
19+
20+
import java.io.IOException;
21+
import java.util.concurrent.TimeoutException;
22+
23+
import com.rabbitmq.client.Channel;
24+
import com.rabbitmq.client.Connection;
25+
import com.rabbitmq.client.ConnectionFactory;
26+
27+
public class RabbitmqConnectionFactory {
28+
29+
public ConnectionFactory createConnectionFactory() {
30+
return new ConnectionFactory();
31+
}
32+
33+
public Connection createConnection(ConnectionFactory connectionFactory) throws IOException, TimeoutException {
34+
return connectionFactory.newConnection();
35+
}
36+
37+
public Channel createChannel(Connection connection) throws IOException {
38+
return connection.createChannel();
39+
}
40+
}

0 commit comments

Comments
 (0)