Skip to content

Commit aa5fdd7

Browse files
authored
Merge 1410015 into c18ef05
2 parents c18ef05 + 1410015 commit aa5fdd7

File tree

14 files changed

+538
-0
lines changed

14 files changed

+538
-0
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
List pulsar = [
19+
"org.apache.pulsar:pulsar-client:$pulsar_version"
20+
]
21+
dependencies {
22+
implementation project(":eventmesh-openconnect:eventmesh-openconnect-java")
23+
implementation pulsar
24+
compileOnly 'org.projectlombok:lombok'
25+
annotationProcessor 'org.projectlombok:lombok'
26+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
pulsar_version=2.11.1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.pulsar.config;
19+
20+
21+
import org.apache.eventmesh.openconnect.api.config.Config;
22+
23+
import lombok.Data;
24+
25+
@Data
26+
public class PulsarServerConfig extends Config {
27+
private boolean sourceEnable;
28+
29+
private boolean sinkEnable;
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.pulsar.server;
19+
20+
21+
import org.apache.eventmesh.connector.pulsar.config.PulsarServerConfig;
22+
import org.apache.eventmesh.connector.pulsar.sink.connector.PulsarSinkConnector;
23+
import org.apache.eventmesh.openconnect.Application;
24+
import org.apache.eventmesh.openconnect.util.ConfigUtil;
25+
26+
import lombok.extern.slf4j.Slf4j;
27+
28+
@Slf4j
29+
public class PulsarConnectServer {
30+
31+
32+
public static void main(String[] args) throws Exception {
33+
34+
PulsarServerConfig serverConfig = ConfigUtil.parse(PulsarServerConfig.class, "server-config.yml");
35+
36+
if (serverConfig.isSourceEnable()) {
37+
Application rocketmqSourceApp = new Application();
38+
rocketmqSourceApp.run(PulsarSinkConnector.class);
39+
}
40+
41+
if (serverConfig.isSinkEnable()) {
42+
Application rocketmqSinkApp = new Application();
43+
rocketmqSinkApp.run(PulsarSinkConnector.class);
44+
}
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.pulsar.sink.config;
19+
20+
21+
22+
import org.apache.eventmesh.openconnect.api.config.SinkConfig;
23+
24+
import lombok.Data;
25+
26+
@Data
27+
public class PulsarSinkConfig extends SinkConfig {
28+
29+
public SinkConnectorConfig connectorConfig;
30+
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.pulsar.sink.config;
19+
20+
import lombok.Data;
21+
22+
@Data
23+
public class SinkConnectorConfig {
24+
25+
private String connectorName;
26+
27+
private String serviceUrl;
28+
29+
private String topic;
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.pulsar.sink.connector;
19+
20+
21+
22+
import org.apache.eventmesh.connector.pulsar.sink.config.PulsarSinkConfig;
23+
import org.apache.eventmesh.openconnect.api.config.Config;
24+
import org.apache.eventmesh.openconnect.api.data.ConnectRecord;
25+
import org.apache.eventmesh.openconnect.api.sink.Sink;
26+
27+
import org.apache.pulsar.client.api.MessageId;
28+
import org.apache.pulsar.client.api.Producer;
29+
import org.apache.pulsar.client.api.PulsarClient;
30+
import org.apache.pulsar.client.api.PulsarClientException;
31+
32+
import java.util.HashMap;
33+
import java.util.List;
34+
import java.util.Map;
35+
36+
import lombok.extern.slf4j.Slf4j;
37+
38+
@Slf4j
39+
public class PulsarSinkConnector implements Sink {
40+
41+
private PulsarSinkConfig sinkConfig;
42+
43+
private Producer<byte[]> producer;
44+
45+
@Override
46+
public Class<? extends Config> configClass() {
47+
return PulsarSinkConfig.class;
48+
}
49+
50+
@Override
51+
public void init(Config config) throws Exception {
52+
// init config for pulsar source connector
53+
this.sinkConfig = (PulsarSinkConfig) config;
54+
PulsarClient client = PulsarClient.builder()
55+
.serviceUrl(sinkConfig.getConnectorConfig().getServiceUrl())
56+
.build();
57+
producer = client.newProducer()
58+
.topic(sinkConfig.getConnectorConfig().getTopic())
59+
.create();
60+
}
61+
62+
@Override
63+
public void start() throws Exception {
64+
}
65+
66+
@Override
67+
public void commit(ConnectRecord record) {
68+
69+
}
70+
71+
@Override
72+
public String name() {
73+
return this.sinkConfig.getConnectorConfig().getConnectorName();
74+
}
75+
76+
@Override
77+
public void stop() {
78+
try {
79+
producer.close();
80+
} catch (PulsarClientException e) {
81+
log.error("close pulsar producer failed", e);
82+
}
83+
}
84+
85+
@Override
86+
public void put(List<ConnectRecord> sinkRecords) {
87+
for (ConnectRecord connectRecord : sinkRecords) {
88+
try {
89+
Map props = new HashMap<String, Object>();
90+
for (String key : connectRecord.getExtensions().keySet()) {
91+
props.put(key, connectRecord.getExtension(key));
92+
}
93+
MessageId messageId = producer.newMessage()
94+
.value((byte[]) connectRecord.getData())
95+
.properties(props)
96+
.send();
97+
} catch (Exception e) {
98+
log.error("put records to pulsar failed", e);
99+
}
100+
}
101+
}
102+
103+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.pulsar.source.config;
19+
20+
21+
import org.apache.eventmesh.openconnect.api.config.SourceConfig;
22+
23+
import lombok.Data;
24+
25+
@Data
26+
public class PulsarSourceConfig extends SourceConfig {
27+
28+
public SourceConnectorConfig connectorConfig;
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.pulsar.source.config;
19+
20+
import lombok.Data;
21+
22+
@Data
23+
public class SourceConnectorConfig {
24+
private String connectorName;
25+
26+
private String serviceUrl;
27+
28+
private String topic;
29+
}

0 commit comments

Comments
 (0)