Skip to content

Commit 5889c90

Browse files
authored
Merge 378454e into efb2c11
2 parents efb2c11 + 378454e commit 5889c90

File tree

4 files changed

+44
-48
lines changed

4 files changed

+44
-48
lines changed

eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/AbstractLiteClient.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,17 @@
1818
package org.apache.eventmesh.client.http;
1919

2020
import org.apache.eventmesh.client.http.conf.LiteClientConfig;
21+
import org.apache.eventmesh.client.http.ssl.MyX509TrustManager;
2122
import org.apache.eventmesh.client.http.util.HttpLoadBalanceUtils;
2223
import org.apache.eventmesh.common.loadbalance.LoadBalanceSelector;
24+
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
25+
import org.apache.http.impl.client.CloseableHttpClient;
26+
import org.apache.http.impl.client.HttpClients;
2327
import org.slf4j.Logger;
2428
import org.slf4j.LoggerFactory;
29+
import javax.net.ssl.SSLContext;
30+
import javax.net.ssl.TrustManager;
31+
import java.security.SecureRandom;
2532

2633
public abstract class AbstractLiteClient {
2734

@@ -46,4 +53,22 @@ public LiteClientConfig getLiteClientConfig() {
4653
public void shutdown() throws Exception {
4754
logger.info("AbstractLiteClient shutdown");
4855
}
56+
57+
public CloseableHttpClient setHttpClient() throws Exception {
58+
if (!liteClientConfig.isUseTls()) {
59+
return HttpClients.createDefault();
60+
}
61+
SSLContext sslContext = null;
62+
try {
63+
String protocol = System.getProperty("ssl.client.protocol", "TLSv1.2");
64+
TrustManager[] tm = new TrustManager[]{new MyX509TrustManager()};
65+
sslContext = SSLContext.getInstance(protocol);
66+
sslContext.init(null, tm, new SecureRandom());
67+
return HttpClients.custom().setSSLContext(sslContext)
68+
.setSSLHostnameVerifier(new DefaultHostnameVerifier()).build();
69+
} catch (Exception e) {
70+
logger.error("Error in creating HttpClient.", e);
71+
throw e;
72+
}
73+
}
4974
}

eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,13 @@ public class LiteConsumer extends AbstractLiteClient {
6565

6666
private ThreadPoolExecutor consumeExecutor;
6767

68-
private static CloseableHttpClient httpClient = HttpClients.createDefault();
69-
7068
protected LiteClientConfig eventMeshClientConfig;
7169

7270
private List<String> subscription = Lists.newArrayList();
7371

7472
private LiteMessageListener messageListener;
7573

76-
protected static final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(4, new EventMeshThreadFactoryImpl("TCPClientScheduler", true));
74+
protected final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(4, new EventMeshThreadFactoryImpl("TCPClientScheduler", true));
7775

7876
public LiteConsumer(LiteClientConfig liteClientConfig) throws Exception {
7977
super(liteClientConfig);
@@ -110,7 +108,10 @@ public void start() throws Exception {
110108
public void shutdown() throws Exception {
111109
logger.info("LiteConsumer shutting down");
112110
super.shutdown();
113-
httpClient.close();
111+
if (consumeExecutor != null) {
112+
consumeExecutor.shutdown();
113+
}
114+
scheduler.shutdown();
114115
started.compareAndSet(true, false);
115116
logger.info("LiteConsumer shutdown");
116117
}
@@ -126,10 +127,9 @@ public boolean subscribe(List<String> topicList, String url) throws Exception {
126127
long startTime = System.currentTimeMillis();
127128
String target = selectEventMesh();
128129
String subRes = "";
129-
try {
130+
131+
try (CloseableHttpClient httpClient = setHttpClient()){
130132
subRes = HttpUtil.post(httpClient, target, subscribeParam);
131-
} catch (Exception ex) {
132-
throw new EventMeshException(ex);
133133
}
134134

135135
if (logger.isDebugEnabled()) {
@@ -211,10 +211,9 @@ public void run() {
211211
long startTime = System.currentTimeMillis();
212212
String target = selectEventMesh();
213213
String res = "";
214-
try {
214+
215+
try (CloseableHttpClient httpClient = setHttpClient()) {
215216
res = HttpUtil.post(httpClient, target, requestParam);
216-
} catch (Exception ex) {
217-
throw new EventMeshException(ex);
218217
}
219218

220219
if (logger.isDebugEnabled()) {
@@ -234,17 +233,16 @@ public void run() {
234233
}, EventMeshCommon.HEATBEAT, EventMeshCommon.HEATBEAT, TimeUnit.MILLISECONDS);
235234
}
236235

237-
public boolean unsubscribe(List<String> topicList, String url) throws EventMeshException {
236+
public boolean unsubscribe(List<String> topicList, String url) throws Exception {
238237
subscription.removeAll(topicList);
239238
RequestParam unSubscribeParam = generateUnSubscribeRequestParam(topicList, url);
240239

241240
long startTime = System.currentTimeMillis();
242241
String target = selectEventMesh();
243242
String unSubRes = "";
244-
try {
243+
244+
try (CloseableHttpClient httpClient = setHttpClient()) {
245245
unSubRes = HttpUtil.post(httpClient, target, unSubscribeParam);
246-
} catch (Exception ex) {
247-
throw new EventMeshException(ex);
248246
}
249247

250248
if (logger.isDebugEnabled()) {

eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,8 @@ public class LiteProducer extends AbstractLiteClient {
6161

6262
public Logger logger = LoggerFactory.getLogger(LiteProducer.class);
6363

64-
private static CloseableHttpClient httpClient = HttpClients.createDefault();
65-
6664
public LiteProducer(LiteClientConfig liteClientConfig) {
6765
super(liteClientConfig);
68-
if (liteClientConfig.isUseTls()) {
69-
setHttpClient();
70-
}
7166
}
7267

7368
private AtomicBoolean started = new AtomicBoolean(Boolean.FALSE);
@@ -92,7 +87,6 @@ public void shutdown() throws Exception {
9287
}
9388
logger.info("LiteProducer shutting down");
9489
super.shutdown();
95-
httpClient.close();
9690
started.compareAndSet(true, false);
9791
logger.info("LiteProducer shutdown");
9892
}
@@ -132,10 +126,9 @@ public boolean publish(LiteMessage message) throws Exception {
132126
long startTime = System.currentTimeMillis();
133127
String target = selectEventMesh();
134128
String res = "";
135-
try {
129+
130+
try (CloseableHttpClient httpClient = setHttpClient()) {
136131
res = HttpUtil.post(httpClient, target, requestParam);
137-
} catch (Exception ex) {
138-
throw new EventMeshException(ex);
139132
}
140133

141134
if (logger.isDebugEnabled()) {
@@ -191,10 +184,9 @@ public LiteMessage request(LiteMessage message, long timeout) throws Exception {
191184
long startTime = System.currentTimeMillis();
192185
String target = selectEventMesh();
193186
String res = "";
194-
try {
187+
188+
try (CloseableHttpClient httpClient = setHttpClient()) {
195189
res = HttpUtil.post(httpClient, target, requestParam);
196-
} catch (Exception ex) {
197-
throw new EventMeshException(ex);
198190
}
199191

200192
if (logger.isDebugEnabled()) {
@@ -246,32 +238,13 @@ public void request(LiteMessage message, RRCallback rrCallback, long timeout) th
246238

247239
long startTime = System.currentTimeMillis();
248240
String target = selectEventMesh();
249-
try {
241+
242+
try (CloseableHttpClient httpClient = setHttpClient()) {
250243
HttpUtil.post(httpClient, null, target, requestParam, new RRCallbackResponseHandlerAdapter(message, rrCallback, timeout));
251-
} catch (Exception ex) {
252-
throw new EventMeshException(ex);
253244
}
254245

255246
if (logger.isDebugEnabled()) {
256247
logger.debug("publish sync message by async, target:{}, cost:{}, message:{}", target, System.currentTimeMillis() - startTime, message);
257248
}
258249
}
259-
260-
public static void setHttpClient() {
261-
SSLContext sslContext = null;
262-
try {
263-
String protocol = System.getProperty("ssl.client.protocol", "TLSv1.1");
264-
TrustManager[] tm = new TrustManager[]{new MyX509TrustManager()};
265-
sslContext = SSLContext.getInstance(protocol);
266-
sslContext.init(null, tm, new SecureRandom());
267-
httpClient = HttpClients.custom().setSSLContext(sslContext)
268-
.setSSLHostnameVerifier(new DefaultHostnameVerifier()).build();
269-
} catch (NoSuchAlgorithmException e) {
270-
e.printStackTrace();
271-
} catch (KeyManagementException e) {
272-
e.printStackTrace();
273-
} catch (Exception e) {
274-
e.printStackTrace();
275-
}
276-
}
277250
}

eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void cleanup() {
8282
logger.info("start destory ....");
8383
try {
8484
liteConsumer.unsubscribe(topicList, url);
85-
} catch (EventMeshException e) {
85+
} catch (Exception e) {
8686
e.printStackTrace();
8787
}
8888
try {

0 commit comments

Comments
 (0)