Skip to content

Commit af8a6cd

Browse files
committed
[ISSUE #3348]Modify SessionContext's public attributes to private
1 parent 94562e9 commit af8a6cd

File tree

5 files changed

+16
-12
lines changed

5 files changed

+16
-12
lines changed

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ private void cleanClientGroupWrapperByClosePub(Session session) throws Exception
277277
* @param session
278278
*/
279279
private void cleanSubscriptionInSession(Session session) throws Exception {
280-
for (SubscriptionItem item : session.getSessionContext().subscribeTopics.values()) {
280+
for (SubscriptionItem item : session.getSessionContext().getSubscribeTopics().values()) {
281281
ClientGroupWrapper clientGroupWrapper = Objects.requireNonNull(session.getClientGroupWrapper().get());
282282
clientGroupWrapper.removeSubscription(item, session);
283283
if (!clientGroupWrapper.hasSubscription(item.getTopic())) {

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ public void setListenRequestSeq(String listenRequestSeq) {
169169

170170
public void subscribe(List<SubscriptionItem> items) throws Exception {
171171
for (SubscriptionItem item : items) {
172-
sessionContext.subscribeTopics.putIfAbsent(item.getTopic(), item);
172+
sessionContext.getSubscribeTopics().putIfAbsent(item.getTopic(), item);
173173
Objects.requireNonNull(clientGroupWrapper.get()).subscribe(item);
174174

175175
Objects.requireNonNull(clientGroupWrapper.get()).getMqProducerWrapper().getMeshMQProducer()
@@ -182,7 +182,7 @@ public void subscribe(List<SubscriptionItem> items) throws Exception {
182182

183183
public void unsubscribe(List<SubscriptionItem> items) throws Exception {
184184
for (SubscriptionItem item : items) {
185-
sessionContext.subscribeTopics.remove(item.getTopic());
185+
sessionContext.getSubscribeTopics().remove(item.getTopic());
186186
Objects.requireNonNull(clientGroupWrapper.get()).removeSubscription(item, this);
187187

188188
if (!Objects.requireNonNull(clientGroupWrapper.get()).hasSubscription(item.getTopic())) {
@@ -195,7 +195,7 @@ public void unsubscribe(List<SubscriptionItem> items) throws Exception {
195195
public EventMeshTcpSendResult upstreamMsg(Header header, CloudEvent event, SendCallback sendCallback,
196196
long startTime, long taskExecuteTime) {
197197
String topic = event.getSubject();
198-
sessionContext.sendTopics.putIfAbsent(topic, topic);
198+
sessionContext.getSendTopics().putIfAbsent(topic, topic);
199199
return sender.send(header, event, sendCallback, startTime, taskExecuteTime);
200200
}
201201

@@ -358,7 +358,7 @@ public boolean isAvailable(String topic) {
358358
return false;
359359
}
360360

361-
if (!sessionContext.subscribeTopics.containsKey(topic)) {
361+
if (!sessionContext.getSubscribeTopics().containsKey(topic)) {
362362
log.warn("session is not available because session has not subscribe topic:{},client:{}", topic, client);
363363
return false;
364364
}

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/SessionContext.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,17 @@
2424

2525
import java.util.concurrent.ConcurrentHashMap;
2626

27+
import lombok.Getter;
28+
2729
public class SessionContext {
2830

2931
private Session session;
3032

31-
public ConcurrentHashMap<String, String> sendTopics = new ConcurrentHashMap<String, String>();
33+
@Getter
34+
private final ConcurrentHashMap<String, String> sendTopics = new ConcurrentHashMap<>(64);
3235

33-
public ConcurrentHashMap<String, SubscriptionItem> subscribeTopics = new ConcurrentHashMap<String, SubscriptionItem>();
36+
@Getter
37+
private final ConcurrentHashMap<String/*Topic*/, SubscriptionItem> subscribeTopics = new ConcurrentHashMap<>(64);
3438

3539
public long createTime = System.currentTimeMillis();
3640

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/UnSubscribeTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ public void run() {
5555
try {
5656
synchronized (session) {
5757
List<SubscriptionItem> topics = new ArrayList<SubscriptionItem>();
58-
if (MapUtils.isNotEmpty(session.getSessionContext().subscribeTopics)) {
59-
for (Map.Entry<String, SubscriptionItem> entry : session.getSessionContext().subscribeTopics.entrySet()) {
58+
if (MapUtils.isNotEmpty(session.getSessionContext().getSubscribeTopics())) {
59+
for (Map.Entry<String, SubscriptionItem> entry : session.getSessionContext().getSubscribeTopics().entrySet()) {
6060
topics.add(entry.getValue());
6161
}
6262
session.unsubscribe(topics);

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,14 +112,14 @@ public void start() throws Exception {
112112
AtomicLong deliveredMsgsCount = session.getPusher().getDeliveredMsgsCount();
113113
AtomicLong deliveredFailCount = session.getPusher().getDeliverFailMsgsCount();
114114
int unAckMsgsCount = session.getPusher().getTotalUnackMsgs();
115-
int sendTopics = session.getSessionContext().sendTopics.size();
116-
int subscribeTopics = session.getSessionContext().subscribeTopics.size();
115+
int sendTopics = session.getSessionContext().getSendTopics().size();
116+
int subscribeTopics = session.getSessionContext().getSubscribeTopics().size();
117117

118118
tcpLogger.info("session|deliveredFailCount={}|deliveredMsgsCount={}|unAckMsgsCount={}|sendTopics={}|subscribeTopics={}|user={}",
119119
deliveredFailCount.longValue(), deliveredMsgsCount.longValue(),
120120
unAckMsgsCount, sendTopics, subscribeTopics, session.getClient());
121121

122-
topicSet.addAll(session.getSessionContext().subscribeTopics.keySet());
122+
topicSet.addAll(session.getSessionContext().getSubscribeTopics().keySet());
123123
}
124124
tcpSummaryMetrics.setSubTopicNum(topicSet.size());
125125
tcpSummaryMetrics.setAllConnections(EventMeshTcpConnectionHandler.connections.get());

0 commit comments

Comments
 (0)