Skip to content

Commit 1780ef7

Browse files
committed
[ISSUE #3288]Refactor ConsumerGroupManager
1 parent 7ec90de commit 1780ef7

File tree

1 file changed

+20
-13
lines changed

1 file changed

+20
-13
lines changed

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerGroupManager.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,31 +29,36 @@
2929

3030
public class ConsumerGroupManager {
3131

32-
protected AtomicBoolean started = new AtomicBoolean(Boolean.FALSE);
32+
private final AtomicBoolean started = new AtomicBoolean(false);
3333

34-
protected AtomicBoolean inited = new AtomicBoolean(Boolean.FALSE);
34+
private final AtomicBoolean inited = new AtomicBoolean(false);
3535

36-
private EventMeshHTTPServer eventMeshHTTPServer;
36+
private final EventMeshHTTPServer eventMeshHTTPServer;
3737

38-
private EventMeshConsumer eventMeshConsumer;
38+
private final EventMeshConsumer eventMeshConsumer;
3939

4040
private ConsumerGroupConf consumerGroupConfig;
4141

42-
public ConsumerGroupManager(EventMeshHTTPServer eventMeshHTTPServer, ConsumerGroupConf consumerGroupConfig) {
42+
public ConsumerGroupManager(final EventMeshHTTPServer eventMeshHTTPServer, final ConsumerGroupConf consumerGroupConfig) {
4343
this.eventMeshHTTPServer = eventMeshHTTPServer;
4444
this.consumerGroupConfig = consumerGroupConfig;
45-
eventMeshConsumer = new EventMeshConsumer(this.eventMeshHTTPServer, this.consumerGroupConfig);
45+
this.eventMeshConsumer = new EventMeshConsumer(this.eventMeshHTTPServer, this.consumerGroupConfig);
4646
}
4747

48-
public synchronized void init() throws Exception {
48+
public void init() throws Exception {
49+
if (!inited.compareAndSet(false, true)) {
50+
return;
51+
}
4952
eventMeshConsumer.init();
50-
inited.compareAndSet(false, true);
53+
5154
}
5255

53-
public synchronized void start() throws Exception {
56+
public void start() throws Exception {
57+
if (!started.compareAndSet(false, true)) {
58+
return;
59+
}
5460
setupEventMeshConsumer(consumerGroupConfig);
5561
eventMeshConsumer.start();
56-
started.compareAndSet(false, true);
5762
}
5863

5964
private synchronized void setupEventMeshConsumer(ConsumerGroupConf consumerGroupConfig) throws Exception {
@@ -62,12 +67,14 @@ private synchronized void setupEventMeshConsumer(ConsumerGroupConf consumerGroup
6267
}
6368
}
6469

65-
public synchronized void shutdown() throws Exception {
70+
public void shutdown() throws Exception {
71+
if (!started.compareAndSet(true, false)) {
72+
return;
73+
}
6674
eventMeshConsumer.shutdown();
67-
started.compareAndSet(true, false);
6875
}
6976

70-
public synchronized void refresh(ConsumerGroupConf consumerGroupConfig) throws Exception {
77+
public synchronized void refresh(final ConsumerGroupConf consumerGroupConfig) throws Exception {
7178

7279
if (consumerGroupConfig == null || this.consumerGroupConfig.equals(consumerGroupConfig)) {
7380
return;

0 commit comments

Comments
 (0)