Skip to content

Commit f629730

Browse files
authored
[ISSUE #4414] Add spring sink connector. (#4491)
* Add spring sink connector. * Restructuring consumption logic. * Optimize create extension logic. * Fix naming
1 parent 53e36b5 commit f629730

File tree

19 files changed

+531
-23
lines changed

19 files changed

+531
-23
lines changed

eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,10 @@ public class Constants {
173173

174174
public static final int SUCCESS_CODE = 200;
175175

176+
public static final String SINK = "Sink";
177+
178+
public static final String SOURCE = "Source";
179+
176180
// protocol desc
177181
public static final String PROTOCOL_DESC_GRPC_CLOUD_EVENT = "grpc-cloud-event";
178182

eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.commons.lang3.StringUtils;
2525

2626
import java.io.IOException;
27+
import java.lang.reflect.Type;
2728
import java.util.Objects;
2829

2930
import com.fasterxml.jackson.annotation.JsonInclude;
@@ -98,6 +99,24 @@ public static <T> T parseObject(String text, Class<T> clazz) {
9899
}
99100
}
100101

102+
public static <T> T parseObject(String text, Type type) {
103+
if (StringUtils.isEmpty(text)) {
104+
return null;
105+
}
106+
try {
107+
TypeReference<T> typeReference = new TypeReference<T>() {
108+
109+
@Override
110+
public Type getType() {
111+
return type;
112+
}
113+
};
114+
return OBJECT_MAPPER.readValue(text, typeReference);
115+
} catch (JsonProcessingException e) {
116+
throw new JsonException("deserialize json string to object error", e);
117+
}
118+
}
119+
101120
public static <T> T parseObject(byte[] bytes, Class<T> clazz) {
102121
if (bytes == null || bytes.length == 0) {
103122
return null;

eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/common/SpringApplicationContextHolder.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,21 @@ public class SpringApplicationContextHolder implements ApplicationContextAware {
2525

2626
private static ApplicationContext applicationContext;
2727

28+
public static boolean isStarted() {
29+
return applicationContext != null;
30+
}
31+
2832
@Override
2933
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
3034
SpringApplicationContextHolder.applicationContext = applicationContext;
3135
}
3236

33-
public static Object getBean(String beanName) {
34-
return applicationContext.getBean(beanName);
37+
public static <T> T getBean(Class<T> clazz) {
38+
return applicationContext.getBean(clazz);
3539
}
3640

37-
public static Object getBean(Class<?> beanClass) {
38-
return applicationContext.getBean(beanClass);
41+
public static Object getBean(String beanName) {
42+
return applicationContext.getBean(beanName);
3943
}
4044

41-
public static boolean isStarted() {
42-
return applicationContext != null;
43-
}
4445
}

eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/config/EventMeshAutoConfiguration.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import org.apache.eventmesh.connector.spring.common.SpringApplicationContextHolder;
2121
import org.apache.eventmesh.connector.spring.server.SpringConnectServer;
22+
import org.apache.eventmesh.connector.spring.sink.EventMeshListenerBeanPostProcessor;
23+
import org.apache.eventmesh.connector.spring.sink.connector.SpringSinkConnector;
2224
import org.apache.eventmesh.connector.spring.source.connector.SpringSourceConnector;
2325

2426
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@@ -32,15 +34,23 @@
3234
public class EventMeshAutoConfiguration {
3335

3436
public static final String SPRING_SOURCE_CONNECTOR_BEAN_NAME = "springSourceConnector";
37+
public static final String SPRING_SINK_CONNECTOR_BEAN_NAME = "springSinkConnector";
3538
public static final String SPRING_CONNECT_SERVER_BEAN_NAME = "springConnectServer";
3639
public static final String SPRING_APPLICATION_CONTEXT_HOLDER = "springApplicationContextHolder";
40+
public static final String EVENTMESH_LISTENER_BEAN_POST_PROCESSOR = "eventMeshListenerBeanPostProcessor";
3741

3842
@Bean(name = SPRING_SOURCE_CONNECTOR_BEAN_NAME)
3943
@ConditionalOnMissingBean(name = SPRING_SOURCE_CONNECTOR_BEAN_NAME)
4044
public SpringSourceConnector springSourceConnector() {
4145
return new SpringSourceConnector();
4246
}
4347

48+
@Bean(name = SPRING_SINK_CONNECTOR_BEAN_NAME)
49+
@ConditionalOnMissingBean(name = SPRING_SINK_CONNECTOR_BEAN_NAME)
50+
public SpringSinkConnector springSinkConnector() {
51+
return new SpringSinkConnector();
52+
}
53+
4454
@Bean(name = SPRING_CONNECT_SERVER_BEAN_NAME)
4555
@ConditionalOnMissingBean(name = SPRING_CONNECT_SERVER_BEAN_NAME)
4656
public SpringConnectServer springConnectServer() {
@@ -52,4 +62,11 @@ public SpringConnectServer springConnectServer() {
5262
public SpringApplicationContextHolder springApplicationContextHolder() {
5363
return new SpringApplicationContextHolder();
5464
}
65+
66+
@Bean(name = EVENTMESH_LISTENER_BEAN_POST_PROCESSOR)
67+
@ConditionalOnMissingBean(name = EVENTMESH_LISTENER_BEAN_POST_PROCESSOR)
68+
public EventMeshListenerBeanPostProcessor eventMeshListenerBeanPostProcessor() {
69+
return new EventMeshListenerBeanPostProcessor();
70+
}
71+
5572
}

eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/server/SpringConnectServer.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,27 @@
2323
import org.apache.eventmesh.openconnect.Application;
2424
import org.apache.eventmesh.openconnect.util.ConfigUtil;
2525

26+
import java.util.HashMap;
27+
import java.util.Map;
28+
2629
import org.springframework.boot.CommandLineRunner;
2730

31+
import lombok.extern.slf4j.Slf4j;
32+
33+
@Slf4j
2834
public class SpringConnectServer implements CommandLineRunner {
2935

36+
private static final String SPRING_SOURCE = "springSource";
37+
3038
@Override
3139
public void run(String... args) throws Exception {
3240
SpringConnectServerConfig springConnectServerConfig = ConfigUtil.parse(SpringConnectServerConfig.class,
3341
Constants.CONNECT_SERVER_CONFIG_FILE_NAME);
3442

3543
if (springConnectServerConfig.isSourceEnable()) {
36-
Application application = new Application();
44+
Map<String, String> extensions = new HashMap<>();
45+
extensions.put(Application.CREATE_EXTENSION_KEY, SPRING_SOURCE);
46+
Application application = new Application(extensions);
3747
application.run(SpringSourceConnector.class);
3848
}
3949
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.spring.sink;
19+
20+
import java.lang.reflect.Method;
21+
22+
import lombok.AllArgsConstructor;
23+
import lombok.Data;
24+
25+
@Data
26+
@AllArgsConstructor
27+
public class EventMeshConsumerMetadata {
28+
29+
private Object bean;
30+
31+
private Method method;
32+
33+
private EventMeshListener annotation;
34+
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.spring.sink;
19+
20+
import java.lang.annotation.Documented;
21+
import java.lang.annotation.ElementType;
22+
import java.lang.annotation.Retention;
23+
import java.lang.annotation.RetentionPolicy;
24+
import java.lang.annotation.Target;
25+
26+
import org.springframework.stereotype.Component;
27+
28+
@Target(ElementType.METHOD)
29+
@Retention(RetentionPolicy.RUNTIME)
30+
@Documented
31+
@Component
32+
public @interface EventMeshListener {
33+
34+
/**
35+
* The requestTimeout of client,it is 5s by default.
36+
*/
37+
int requestTimeout() default 5;
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.spring.sink;
19+
20+
import org.apache.eventmesh.common.Constants;
21+
import org.apache.eventmesh.common.ThreadPoolFactory;
22+
import org.apache.eventmesh.common.utils.JsonUtils;
23+
import org.apache.eventmesh.connector.spring.config.SpringConnectServerConfig;
24+
import org.apache.eventmesh.connector.spring.sink.connector.SpringSinkConnector;
25+
import org.apache.eventmesh.openconnect.Application;
26+
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
27+
import org.apache.eventmesh.openconnect.util.ConfigUtil;
28+
29+
import java.lang.reflect.Method;
30+
import java.lang.reflect.Type;
31+
import java.util.ArrayList;
32+
import java.util.HashMap;
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.concurrent.ThreadPoolExecutor;
36+
import java.util.concurrent.TimeUnit;
37+
38+
import org.springframework.aop.support.AopUtils;
39+
import org.springframework.beans.BeansException;
40+
import org.springframework.beans.factory.config.BeanPostProcessor;
41+
import org.springframework.boot.CommandLineRunner;
42+
import org.springframework.context.ApplicationContext;
43+
import org.springframework.context.ApplicationContextAware;
44+
import org.springframework.core.annotation.AnnotatedElementUtils;
45+
import org.springframework.util.ReflectionUtils;
46+
47+
import lombok.SneakyThrows;
48+
import lombok.extern.slf4j.Slf4j;
49+
50+
@Slf4j
51+
public class EventMeshListenerBeanPostProcessor implements ApplicationContextAware,
52+
CommandLineRunner, BeanPostProcessor {
53+
54+
private static final String SPRING_SINK = "springSink";
55+
56+
private static final ThreadPoolExecutor executor = ThreadPoolFactory.createThreadPoolExecutor(
57+
Runtime.getRuntime().availableProcessors() * 2,
58+
Runtime.getRuntime().availableProcessors() * 2,
59+
"EventMesh-MessageListenerBeanPostProcessor-");
60+
61+
private ApplicationContext applicationContext;
62+
63+
private List<EventMeshConsumerMetadata> metadataList = new ArrayList<>();
64+
65+
@Override
66+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
67+
this.applicationContext = applicationContext;
68+
}
69+
70+
@Override
71+
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
72+
Class<?> targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass();
73+
ReflectionUtils.doWithMethods(targetClass, new ReflectionUtils.MethodCallback() {
74+
75+
@Override
76+
public void doWith(final Method method) throws IllegalArgumentException, IllegalAccessException {
77+
EventMeshListener annotation = AnnotatedElementUtils.findMergedAnnotation(method, EventMeshListener.class);
78+
if (annotation == null || method.isBridge()) {
79+
return;
80+
}
81+
metadataList.add(new EventMeshConsumerMetadata(bean, method, annotation));
82+
}
83+
});
84+
return bean;
85+
}
86+
87+
@Override
88+
public void run(String... args) throws Exception {
89+
runSinkConnector();
90+
metadataList.forEach(metadata -> {
91+
Object bean = metadata.getBean();
92+
Method method = metadata.getMethod();
93+
EventMeshListener annotation = metadata.getAnnotation();
94+
SpringSinkConnector sinkConnector = applicationContext.getBean(SpringSinkConnector.class);
95+
executor.execute(() -> {
96+
ConnectRecord poll;
97+
while (sinkConnector.isRunning()) {
98+
try {
99+
poll = sinkConnector.getQueue().poll(annotation.requestTimeout(), TimeUnit.SECONDS);
100+
if (null == poll || null == poll.getData()) {
101+
continue;
102+
}
103+
String messageBody = new String((byte[]) poll.getData());
104+
Type[] parameterizedTypes = method.getGenericParameterTypes();
105+
if (parameterizedTypes.length == 0) {
106+
throw new IllegalStateException("There has not any arguments for consumer method.");
107+
}
108+
if (parameterizedTypes.length > 1) {
109+
throw new IllegalStateException("There has more than one arguments for consumer method.");
110+
}
111+
Class<?> rawType;
112+
if (parameterizedTypes[0] instanceof Class) {
113+
rawType = (Class<?>) parameterizedTypes[0];
114+
} else {
115+
throw new IllegalStateException(
116+
"The arguments type for consumer method can't cast to be Class and ParameterizedTypeImpl");
117+
}
118+
if (rawType == String.class) {
119+
metadata.getMethod().invoke(bean, messageBody);
120+
} else {
121+
metadata.getMethod().invoke(bean, JsonUtils.parseObject(messageBody, parameterizedTypes[0]));
122+
}
123+
} catch (Exception e) {
124+
log.warn("Consume snapshot event error", e);
125+
}
126+
}
127+
});
128+
});
129+
}
130+
131+
@SneakyThrows
132+
public void runSinkConnector() {
133+
SpringConnectServerConfig springConnectServerConfig = ConfigUtil.parse(SpringConnectServerConfig.class,
134+
Constants.CONNECT_SERVER_CONFIG_FILE_NAME);
135+
136+
if (springConnectServerConfig.isSinkEnable()) {
137+
Map<String, String> extensions = new HashMap<>();
138+
extensions.put(Application.CREATE_EXTENSION_KEY, SPRING_SINK);
139+
Application application = new Application(extensions);
140+
application.run(SpringSinkConnector.class);
141+
}
142+
}
143+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.spring.sink.config;
19+
20+
import org.apache.eventmesh.connector.spring.sink.connector.SinkConnectorConfig;
21+
import org.apache.eventmesh.openconnect.api.config.SinkConfig;
22+
23+
import lombok.Data;
24+
import lombok.EqualsAndHashCode;
25+
26+
@Data
27+
@EqualsAndHashCode(callSuper = true)
28+
public class SpringSinkConfig extends SinkConfig {
29+
30+
private SinkConnectorConfig sinkConnectorConfig;
31+
}

0 commit comments

Comments
 (0)