-
Notifications
You must be signed in to change notification settings - Fork 8.9k
optimize: Add ChannelEventListener support to prevent memory leaks
#7337
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ChannelEventListener support to prevent memory leaks
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## 2.x #7337 +/- ##
============================================
- Coverage 55.79% 54.88% -0.91%
+ Complexity 7476 7398 -78
============================================
Files 1178 1181 +3
Lines 41962 42089 +127
Branches 4923 4934 +11
============================================
- Hits 23413 23101 -312
- Misses 16310 16818 +508
+ Partials 2239 2170 -69
🚀 New features to boost your workflow:
|
funky-eyes
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This design is to callback the ChannelEventListener registered by TM and RM, rather than adding a listener after channel.write(), right?
Yes, that's correct. It is implemented in a way that calls back the |
I think this design is good, but since this functionality is not a user-facing API, it shouldn't be a feature-type PR. I believe it should be changed to an optimize type instead. Could you also fix the codecov/patch issue by adding some test cases to increase coverage? Of course, you could also improve unit test coverage in a subsequent PR. |
Got it! |
|
I believe the part I worked on this time is critical and could cause serious issues if a bug occurs. 🐞 That's why I plan to write more comprehensive unit and integration tests to cover a wide range of cases. I'll request a review once all the tests are complete! |
OK, I wish you success. If you have any questions, feel free to contact me. |
ChannelEventListener support to prevent memory leaksChannelEventListener support to prevent memory leaks
| @Test | ||
| void testChannelInactiveByServer() throws Exception { | ||
| connectClient(); | ||
|
|
||
| DefaultChannelGroup serverChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); | ||
| serverChannels.addAll(collectServerChannels(workerGroup)); | ||
| Channel serverSideClientChannel = serverChannels.stream() | ||
| .filter(ch -> ch.isActive() && ch.remoteAddress() != null) | ||
| .findFirst() | ||
| .orElseThrow(() -> new AssertionError("Failed to find client channel on server side")); | ||
|
|
||
| serverSideClientChannel.close().sync(); | ||
| assertTrue( | ||
| channelInactiveLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS), | ||
| "Channel inactive event was not detected on client side when server closed the connection"); | ||
| verify(mockRemotingClient).onChannelInactive(any(Channel.class)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check whether this test verifies what we're trying to implement in this issue!
core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java
Outdated
Show resolved
Hide resolved
funky-eyes
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
I would like to know if a PR will be submitted soon to fix the issue mentioned in #5283? |
| protected void cleanupResourcesForChannel(Channel channel) { | ||
| if (channel == null) { | ||
| return; | ||
| } | ||
| ChannelException cause = new ChannelException( | ||
| String.format("Channel disconnected: %s", channel.remoteAddress())); | ||
|
|
||
| Set<Integer> messageIds = collectMessageIdsForChannel(channel.id()); | ||
| cleanupFuturesForMessageIds(messageIds, cause); | ||
|
|
||
| LOGGER.info("Cleaned up {} pending requests for disconnected channel: {}", | ||
| messageIds.size(), channel.remoteAddress()); | ||
| } | ||
|
|
||
| /** | ||
| * Collects message IDs associated with a specific channel. | ||
| * This is used during channel cleanup to identify pending requests. | ||
| * | ||
| * @param channelId the ID of the channel | ||
| * @return a set of message IDs associated with the channel | ||
| */ | ||
| private Set<Integer> collectMessageIdsForChannel(ChannelId channelId) { | ||
| Set<Integer> messageIds = new HashSet<>(); | ||
|
|
||
| String serverAddress = null; | ||
| for (Map.Entry<String, Channel> entry : clientChannelManager.getChannels().entrySet()) { | ||
| Channel channel = entry.getValue(); | ||
| if (channelId.equals(channel.id())) { | ||
| serverAddress = entry.getKey(); | ||
| break; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this method addresses issue #5283.
What are your thoughts?
| @Test | ||
| void testCleanupMessageFuturesOnChannelDisconnection() { | ||
| when(channel.id()).thenReturn(channelId); | ||
| when(channel.remoteAddress()).thenReturn(new InetSocketAddress("127.0.0.1", 8091)); | ||
|
|
||
| MessageFuture messageFuture1 = new MessageFuture(); | ||
| RpcMessage rpcMessage1 = createRpcMessage(1); | ||
| messageFuture1.setRequestMessage(rpcMessage1); | ||
| futures.put(1, messageFuture1); | ||
|
|
||
| MessageFuture messageFuture2 = new MessageFuture(); | ||
| RpcMessage rpcMessage2 = createRpcMessage(2); | ||
| messageFuture2.setRequestMessage(rpcMessage2); | ||
| futures.put(2, messageFuture2); | ||
|
|
||
| int parentId = 100; | ||
| MergedWarpMessage mergeMessage = new MergedWarpMessage(); | ||
| mergeMessage.msgIds = new ArrayList<>(); | ||
| mergeMessage.msgIds.add(1); | ||
| mergeMessage.msgIds.add(2); | ||
|
|
||
| mergeMsgMap.put(parentId, mergeMessage); | ||
|
|
||
| String serverAddress = "127.0.0.1:8091"; | ||
| channels.put(serverAddress, channel); | ||
|
|
||
| BlockingQueue<RpcMessage> basket = new LinkedBlockingQueue<>(); | ||
| basket.add(rpcMessage1); | ||
| basket.add(rpcMessage2); | ||
| basketMap.put(serverAddress, basket); | ||
|
|
||
| client.cleanupResourcesForChannel(channel); | ||
|
|
||
| assertFalse(futures.containsKey(1), "Future ID 1 has not been removed"); | ||
| assertFalse(futures.containsKey(2), "Future ID 2 has not been removed"); | ||
|
|
||
| assertThrows(RuntimeException.class, () -> messageFuture1.get(0, java.util.concurrent.TimeUnit.MILLISECONDS)); | ||
| assertThrows(RuntimeException.class, () -> messageFuture2.get(0, java.util.concurrent.TimeUnit.MILLISECONDS)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto
Please take a look |
| * @param cause the exception to set as the result for each future | ||
| */ | ||
| private void cleanupFuturesForMessageIds(Set<Integer> messageIds, Exception cause) { | ||
| for (Integer messageId : messageIds) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Override
public void sendAsyncRequest(Channel channel, Object msg) {
if (channel == null) {
LOGGER.warn("sendAsyncRequest nothing, caused by null channel.");
throw new FrameworkException(new Throwable("throw"), "frameworkException", FrameworkErrorCode.ChannelIsNotWritable);
}
RpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof HeartbeatMessage
? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
: ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
Object body = rpcMessage.getBody();
if (body instanceof MergeMessage) {
Integer parentId = rpcMessage.getId();
mergeMsgMap.put(parentId, (MergeMessage)rpcMessage.getBody());
if (body instanceof MergedWarpMessage) {
for (Integer msgId : ((MergedWarpMessage)rpcMessage.getBody()).msgIds) {
childToParentMap.put(msgId, parentId);
}
}
}
super.sendAsync(channel, rpcMessage);
}
It seems that childToParentMap is not being cleared.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll create a PR that also clears the childToParentMap as soon as possible—hopefully by tomorrow.
Ⅰ. Describe what this PR did
Ⅱ. Does this pull request fix one issue?
fixes #7058
fixes #5283
Ⅲ. Why don't you add test cases (unit test/integration test)?
After the implementation is complete, it will be added.
graph TD; A[RemotingClient Interface] -->|implements| B[AbstractNettyRemotingClient]; C[ChannelEventListener Interface] -->|implements| D[Custom Listener]; B -->|registerChannelEventListener| C; B -->|setChannelHandlers| E[ChannelEventHandler]; E -->|detects| F1[channelInactive]; E -->|detects| F2[onChannelException]; E -->|detects| F3[onChannelActive]; E -->|detects| F4[onChannelIdle]; F1 -->|invokes| G[cleanupResourcesForChannel]; F2 -->|invokes| G; G -->|cleans up| H[mergeMsgMap]; G -->|cleans up| I[futures]; F1 -->|notifies| C; F2 -->|notifies| C; F3 -->|notifies| C; F4 -->|notifies| C;