Skip to content

Commit e886f78

Browse files
authored
Merge 7f72d53 into 74f9abf
2 parents 74f9abf + 7f72d53 commit e886f78

File tree

6 files changed

+112
-152
lines changed

6 files changed

+112
-152
lines changed
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.runtime.core.protocol;
19+
20+
import java.util.concurrent.DelayQueue;
21+
import java.util.concurrent.ThreadPoolExecutor;
22+
23+
import lombok.extern.slf4j.Slf4j;
24+
25+
@Slf4j
26+
public abstract class AbstractRetryer {
27+
28+
protected final DelayQueue<DelayRetryable> retrys = new DelayQueue<>();
29+
30+
protected ThreadPoolExecutor pool;
31+
32+
protected Thread dispatcher;
33+
34+
protected abstract void pushRetry(DelayRetryable delayRetryable);
35+
36+
protected abstract void init();
37+
38+
public int getRetrySize() {
39+
return retrys.size();
40+
}
41+
42+
protected void initDispatcher(Thread dispatcher) {
43+
dispatcher = new Thread(() -> {
44+
try {
45+
DelayRetryable retryObj;
46+
while (!Thread.currentThread().isInterrupted() && (retryObj = retrys.take()) != null) {
47+
final DelayRetryable delayRetryable = retryObj;
48+
pool.execute(() -> {
49+
try {
50+
delayRetryable.retry();
51+
if (log.isDebugEnabled()) {
52+
log.debug("retryObj : {}", delayRetryable);
53+
}
54+
} catch (Exception e) {
55+
log.error("retry-dispatcher error!", e);
56+
}
57+
});
58+
}
59+
} catch (Exception e) {
60+
if (e instanceof InterruptedException) {
61+
Thread.currentThread().interrupt();
62+
}
63+
log.error("retry-dispatcher error!", e);
64+
}
65+
}, "retry-dispatcher");
66+
dispatcher.setDaemon(true);
67+
log.info("EventMesh retryer inited......");
68+
}
69+
70+
public void shutdown() {
71+
dispatcher.interrupt();
72+
pool.shutdown();
73+
log.info("EventMesh retryer shutdown......");
74+
}
75+
76+
public void start() throws Exception {
77+
dispatcher.start();
78+
log.info("EventMesh retryer started......");
79+
}
80+
81+
/**
82+
* Get fail-retry queue, this method is just used for metrics.
83+
*/
84+
public DelayQueue<DelayRetryable> getRetryQueue() {
85+
return retrys;
86+
}
87+
}

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/retry/GrpcRetryer.java

Lines changed: 7 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -20,38 +20,34 @@
2020
import org.apache.eventmesh.common.EventMeshThreadFactory;
2121
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
2222
import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration;
23+
import org.apache.eventmesh.runtime.core.protocol.AbstractRetryer;
2324
import org.apache.eventmesh.runtime.core.protocol.DelayRetryable;
2425

2526
import java.util.concurrent.ArrayBlockingQueue;
26-
import java.util.concurrent.DelayQueue;
2727
import java.util.concurrent.ThreadPoolExecutor;
2828
import java.util.concurrent.TimeUnit;
2929

3030
import lombok.extern.slf4j.Slf4j;
3131

3232
@Slf4j
33-
public class GrpcRetryer {
33+
public class GrpcRetryer extends AbstractRetryer {
3434

3535
private final EventMeshGrpcConfiguration grpcConfiguration;
3636

3737
public GrpcRetryer(EventMeshGrpcServer eventMeshGrpcServer) {
3838
this.grpcConfiguration = eventMeshGrpcServer.getEventMeshGrpcConfiguration();
3939
}
4040

41-
private final DelayQueue<DelayRetryable> failed = new DelayQueue<DelayRetryable>();
42-
43-
private ThreadPoolExecutor pool;
44-
45-
private Thread dispatcher;
46-
41+
@Override
4742
public void pushRetry(DelayRetryable delayRetryable) {
48-
if (failed.size() >= grpcConfiguration.getEventMeshServerRetryBlockQueueSize()) {
43+
if (retrys.size() >= grpcConfiguration.getEventMeshServerRetryBlockQueueSize()) {
4944
log.error("[RETRY-QUEUE] is full!");
5045
return;
5146
}
52-
failed.offer(delayRetryable);
47+
retrys.offer(delayRetryable);
5348
}
5449

50+
@Override
5551
public void init() {
5652
pool = new ThreadPoolExecutor(
5753
grpcConfiguration.getEventMeshServerRetryThreadNum(),
@@ -60,46 +56,7 @@ public void init() {
6056
new EventMeshThreadFactory("grpc-retry", true, Thread.NORM_PRIORITY),
6157
new ThreadPoolExecutor.AbortPolicy());
6258

63-
dispatcher = new Thread(() -> {
64-
try {
65-
DelayRetryable retryObj;
66-
while (!Thread.currentThread().isInterrupted()
67-
&& (retryObj = failed.take()) != null) {
68-
final DelayRetryable delayRetryable = retryObj;
69-
pool.execute(() -> {
70-
try {
71-
delayRetryable.retry();
72-
if (log.isDebugEnabled()) {
73-
log.debug("retryObj : {}", delayRetryable);
74-
}
75-
} catch (Exception e) {
76-
log.error("grpc-retry-dispatcher error!", e);
77-
}
78-
});
79-
}
80-
} catch (Exception e) {
81-
if (e instanceof InterruptedException) {
82-
Thread.currentThread().interrupt();
83-
}
84-
log.error("grpc-retry-dispatcher error!", e);
85-
}
86-
}, "grpc-retry-dispatcher");
87-
dispatcher.setDaemon(true);
88-
log.info("GrpcRetryer inited......");
89-
}
90-
91-
public int size() {
92-
return failed.size();
93-
}
94-
95-
public void shutdown() {
96-
dispatcher.interrupt();
97-
pool.shutdown();
98-
log.info("GrpcRetryer shutdown......");
59+
initDispatcher(dispatcher);
9960
}
10061

101-
public void start() throws Exception {
102-
dispatcher.start();
103-
log.info("GrpcRetryer started......");
104-
}
10562
}

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/retry/HttpRetryer.java

Lines changed: 7 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919

2020
import org.apache.eventmesh.common.EventMeshThreadFactory;
2121
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
22+
import org.apache.eventmesh.runtime.core.protocol.AbstractRetryer;
2223
import org.apache.eventmesh.runtime.core.protocol.DelayRetryable;
2324

2425
import java.util.concurrent.ArrayBlockingQueue;
25-
import java.util.concurrent.DelayQueue;
2626
import java.util.concurrent.ThreadPoolExecutor;
2727
import java.util.concurrent.TimeUnit;
2828

@@ -33,7 +33,7 @@
3333
import lombok.extern.slf4j.Slf4j;
3434

3535
@Slf4j
36-
public class HttpRetryer {
36+
public class HttpRetryer extends AbstractRetryer {
3737

3838
private final Logger retryLogger = LoggerFactory.getLogger("retry");
3939

@@ -43,20 +43,16 @@ public HttpRetryer(EventMeshHTTPServer eventMeshHTTPServer) {
4343
this.eventMeshHTTPServer = eventMeshHTTPServer;
4444
}
4545

46-
private final DelayQueue<DelayRetryable> failed = new DelayQueue<>();
47-
48-
private ThreadPoolExecutor pool;
49-
50-
private Thread dispatcher;
51-
46+
@Override
5247
public void pushRetry(DelayRetryable delayRetryable) {
53-
if (failed.size() >= eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerRetryBlockQSize()) {
48+
if (retrys.size() >= eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerRetryBlockQSize()) {
5449
retryLogger.error("[RETRY-QUEUE] is full!");
5550
return;
5651
}
57-
failed.offer(delayRetryable);
52+
retrys.offer(delayRetryable);
5853
}
5954

55+
@Override
6056
public void init() {
6157
pool = new ThreadPoolExecutor(eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerRetryThreadNum(),
6258
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerRetryThreadNum(),
@@ -66,52 +62,7 @@ public void init() {
6662
new EventMeshThreadFactory("http-retry", true, Thread.NORM_PRIORITY),
6763
new ThreadPoolExecutor.AbortPolicy());
6864

69-
dispatcher = new Thread(() -> {
70-
try {
71-
DelayRetryable retryObj;
72-
while (!Thread.currentThread().isInterrupted() && (retryObj = failed.take()) != null) {
73-
final DelayRetryable delayRetryable = retryObj;
74-
pool.execute(() -> {
75-
try {
76-
delayRetryable.retry();
77-
if (retryLogger.isDebugEnabled()) {
78-
retryLogger.debug("retryObj : {}", delayRetryable);
79-
}
80-
} catch (Exception e) {
81-
retryLogger.error("http-retry-dispatcher error!", e);
82-
}
83-
});
84-
}
85-
} catch (Exception e) {
86-
if (e instanceof InterruptedException) {
87-
Thread.currentThread().interrupt();
88-
}
89-
retryLogger.error("http-retry-dispatcher error!", e);
90-
}
91-
}, "http-retry-dispatcher");
92-
dispatcher.setDaemon(true);
93-
log.info("HttpRetryer inited......");
94-
}
95-
96-
public int size() {
97-
return failed.size();
65+
initDispatcher(dispatcher);
9866
}
9967

100-
/**
101-
* Get failed queue, this method is just used for metrics.
102-
*/
103-
public DelayQueue<DelayRetryable> getFailedQueue() {
104-
return failed;
105-
}
106-
107-
public void shutdown() {
108-
dispatcher.interrupt();
109-
pool.shutdown();
110-
log.info("HttpRetryer shutdown......");
111-
}
112-
113-
public void start() throws Exception {
114-
dispatcher.start();
115-
log.info("HttpRetryer started......");
116-
}
11768
}

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.java

Lines changed: 8 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,35 +20,32 @@
2020
import org.apache.eventmesh.common.EventMeshThreadFactory;
2121
import org.apache.eventmesh.common.protocol.SubscriptionType;
2222
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
23+
import org.apache.eventmesh.runtime.core.protocol.AbstractRetryer;
24+
import org.apache.eventmesh.runtime.core.protocol.DelayRetryable;
2325
import org.apache.eventmesh.runtime.core.protocol.RetryContext;
2426
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
2527
import org.apache.eventmesh.runtime.util.EventMeshUtil;
2628
import org.apache.eventmesh.runtime.util.ThreadPoolHelper;
2729

2830
import java.util.concurrent.ArrayBlockingQueue;
29-
import java.util.concurrent.DelayQueue;
3031
import java.util.concurrent.ThreadPoolExecutor;
3132
import java.util.concurrent.TimeUnit;
3233

3334

3435
import lombok.extern.slf4j.Slf4j;
3536

3637
@Slf4j
37-
public class EventMeshTcpRetryer {
38+
public class EventMeshTcpRetryer extends AbstractRetryer {
3839

3940
private EventMeshTCPServer eventMeshTCPServer;
4041

41-
private final DelayQueue<RetryContext> retrys = new DelayQueue<>();
42-
4342
private final ThreadPoolExecutor pool = new ThreadPoolExecutor(3,
4443
3,
4544
60000,
4645
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1000),
4746
new EventMeshThreadFactory("eventMesh-tcp-retry", true),
4847
new ThreadPoolExecutor.AbortPolicy());
4948

50-
private Thread dispatcher;
51-
5249
public EventMeshTcpRetryer(EventMeshTCPServer eventMeshTCPServer) {
5350
this.eventMeshTCPServer = eventMeshTCPServer;
5451
}
@@ -61,7 +58,9 @@ public void setEventMeshTCPServer(EventMeshTCPServer eventMeshTCPServer) {
6158
this.eventMeshTCPServer = eventMeshTCPServer;
6259
}
6360

64-
public void pushRetry(RetryContext retryContext) {
61+
@Override
62+
public void pushRetry(DelayRetryable delayRetryable) {
63+
RetryContext retryContext = (RetryContext) delayRetryable;
6564
if (retrys.size() >= eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryQueueSize()) {
6665
log.error("pushRetry fail, retrys is too much,allow max retryQueueSize:{}, retryTimes:{}, seq:{}, bizSeq:{}",
6766
eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryQueueSize(), retryContext.retryTimes,
@@ -88,43 +87,9 @@ public void pushRetry(RetryContext retryContext) {
8887
EventMeshUtil.getMessageBizSeq(retryContext.event));
8988
}
9089

90+
@Override
9191
public void init() {
92-
dispatcher = new Thread(() -> {
93-
try {
94-
RetryContext retryContext;
95-
while ((retryContext = retrys.take()) != null) {
96-
final RetryContext retryCtx = retryContext;
97-
pool.execute(() -> {
98-
try {
99-
retryCtx.retry();
100-
} catch (Exception e) {
101-
log.error("retry-dispatcher error!", e);
102-
}
103-
});
104-
}
105-
} catch (Exception e) {
106-
if (e instanceof InterruptedException) {
107-
Thread.currentThread().interrupt();
108-
}
109-
log.error("retry-dispatcher error!", e);
110-
}
111-
}, "retry-dispatcher");
112-
dispatcher.setDaemon(true);
113-
log.info("EventMeshTcpRetryer inited......");
114-
}
115-
116-
public void start() throws Exception {
117-
dispatcher.start();
118-
log.info("EventMeshTcpRetryer started......");
119-
}
120-
121-
public void shutdown() {
122-
pool.shutdown();
123-
log.info("EventMeshTcpRetryer shutdown......");
124-
}
125-
126-
public int getRetrySize() {
127-
return retrys.size();
92+
initDispatcher(dispatcher);
12893
}
12994

13095
public void printRetryThreadPoolState() {

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/grpc/EventMeshGrpcMonitor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public void start() throws Exception {
6464
scheduleTask = scheduler.scheduleAtFixedRate(() -> {
6565
grpcSummaryMetrics.refreshTpsMetrics(SCHEDULE_PERIOD_MILLS);
6666
grpcSummaryMetrics.clearAllMessageCounter();
67-
grpcSummaryMetrics.setRetrySize(eventMeshGrpcServer.getGrpcRetryer().size());
67+
grpcSummaryMetrics.setRetrySize(eventMeshGrpcServer.getGrpcRetryer().getRetrySize());
6868
grpcSummaryMetrics.setSubscribeTopicNum(eventMeshGrpcServer.getConsumerManager().getAllConsumerTopic().size());
6969
}, DELAY_MILLS, SCHEDULE_PERIOD_MILLS, TimeUnit.MILLISECONDS);
7070
}

0 commit comments

Comments
 (0)