Skip to content

Conversation

@YongGoose
Copy link
Member

@YongGoose YongGoose commented May 11, 2025

  • I have registered the PR changes.

Ⅰ. Describe what this PR did

Ⅱ. Does this pull request fix one issue?

fixes #7058
fixes #5283

Ⅲ. Why don't you add test cases (unit test/integration test)?

After the implementation is complete, it will be added.

graph TD;
    A[RemotingClient Interface] -->|implements| B[AbstractNettyRemotingClient];
    C[ChannelEventListener Interface] -->|implements| D[Custom Listener];
    B -->|registerChannelEventListener| C;
    B -->|setChannelHandlers| E[ChannelEventHandler];

    E -->|detects| F1[channelInactive];
    E -->|detects| F2[onChannelException];
    E -->|detects| F3[onChannelActive];
    E -->|detects| F4[onChannelIdle];

    F1 -->|invokes| G[cleanupResourcesForChannel];
    F2 -->|invokes| G;
    G -->|cleans up| H[mergeMsgMap];
    G -->|cleans up| I[futures];

    F1 -->|notifies| C;
    F2 -->|notifies| C;
    F3 -->|notifies| C;
    F4 -->|notifies| C;
Loading

@YongGoose YongGoose changed the title feature: Add channel event listener support to prevent memory leaks feature: Add ChannelEventListener support to prevent memory leaks May 11, 2025
@codecov
Copy link

codecov bot commented May 11, 2025

Codecov Report

Attention: Patch coverage is 78.90625% with 27 lines in your changes missing coverage. Please review.

Project coverage is 54.88%. Comparing base (3b40167) to head (02e4c90).
Report is 1 commits behind head on 2.x.

Files with missing lines Patch % Lines
...ta/core/rpc/netty/AbstractNettyRemotingClient.java 86.48% 3 Missing and 7 partials ⚠️
...he/seata/core/rpc/netty/RmNettyRemotingClient.java 10.00% 9 Missing ⚠️
...che/seata/core/rpc/netty/ChannelEventListener.java 0.00% 4 Missing ⚠️
...ache/seata/core/rpc/netty/ChannelEventHandler.java 88.00% 0 Missing and 3 partials ⚠️
...he/seata/core/rpc/netty/TmNettyRemotingClient.java 90.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##                2.x    #7337      +/-   ##
============================================
- Coverage     55.79%   54.88%   -0.91%     
+ Complexity     7476     7398      -78     
============================================
  Files          1178     1181       +3     
  Lines         41962    42089     +127     
  Branches       4923     4934      +11     
============================================
- Hits          23413    23101     -312     
- Misses        16310    16818     +508     
+ Partials       2239     2170      -69     
Files with missing lines Coverage Δ
.../apache/seata/core/rpc/netty/ChannelEventType.java 100.00% <100.00%> (ø)
...he/seata/core/rpc/netty/TmNettyRemotingClient.java 73.21% <90.00%> (+0.66%) ⬆️
...ache/seata/core/rpc/netty/ChannelEventHandler.java 88.00% <88.00%> (ø)
...che/seata/core/rpc/netty/ChannelEventListener.java 0.00% <0.00%> (ø)
...he/seata/core/rpc/netty/RmNettyRemotingClient.java 48.96% <10.00%> (-3.63%) ⬇️
...ta/core/rpc/netty/AbstractNettyRemotingClient.java 37.11% <86.48%> (+6.37%) ⬆️

... and 55 files with indirect coverage changes

Impacted file tree graph

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Contributor

@funky-eyes funky-eyes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This design is to callback the ChannelEventListener registered by TM and RM, rather than adding a listener after channel.write(), right?

@YongGoose
Copy link
Member Author

YongGoose commented May 12, 2025

This design is to callback the ChannelEventListener registered by TM and RM, rather than adding a listener after channel.write(), right?

Yes, that's correct.

It is implemented in a way that calls back the ChannelEventListener registered in advance by TM and RM, so that user-defined actions can be performed when the channel event is detected.

@funky-eyes
Copy link
Contributor

This design is to callback the ChannelEventListener registered by TM and RM, rather than adding a listener after channel.write(), right?

Yes, that's correct.

It is implemented in a way that calls back the ChannelEventListener registered in advance by TM and RM, so that user-defined actions can be performed when the channel event is detected.

I think this design is good, but since this functionality is not a user-facing API, it shouldn't be a feature-type PR. I believe it should be changed to an optimize type instead. Could you also fix the codecov/patch issue by adding some test cases to increase coverage? Of course, you could also improve unit test coverage in a subsequent PR.

@YongGoose
Copy link
Member Author

I think this design is good, but since this functionality is not a user-facing API, it shouldn't be a feature-type PR. I believe it should be changed to an optimize type instead. Could you also fix the codecov/patch issue by adding some test cases to increase coverage? Of course, you could also improve unit test coverage in a subsequent PR.

Got it!
I'll update the PR type and add both unit and integration tests. 🚀

@YongGoose
Copy link
Member Author

@funky-eyes

I believe the part I worked on this time is critical and could cause serious issues if a bug occurs. 🐞

That's why I plan to write more comprehensive unit and integration tests to cover a wide range of cases.
As a result, it might take some extra time.

I'll request a review once all the tests are complete!

@funky-eyes
Copy link
Contributor

@funky-eyes

I believe the part I worked on this time is critical and could cause serious issues if a bug occurs. 🐞

That's why I plan to write more comprehensive unit and integration tests to cover a wide range of cases. As a result, it might take some extra time.

I'll request a review once all the tests are complete!

OK, I wish you success. If you have any questions, feel free to contact me.

@funky-eyes funky-eyes changed the title feature: Add ChannelEventListener support to prevent memory leaks optimize: Add ChannelEventListener support to prevent memory leaks May 13, 2025
@YongGoose YongGoose requested a review from funky-eyes May 15, 2025 14:15
Comment on lines +196 to +212
@Test
void testChannelInactiveByServer() throws Exception {
connectClient();

DefaultChannelGroup serverChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
serverChannels.addAll(collectServerChannels(workerGroup));
Channel serverSideClientChannel = serverChannels.stream()
.filter(ch -> ch.isActive() && ch.remoteAddress() != null)
.findFirst()
.orElseThrow(() -> new AssertionError("Failed to find client channel on server side"));

serverSideClientChannel.close().sync();
assertTrue(
channelInactiveLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS),
"Channel inactive event was not detected on client side when server closed the connection");
verify(mockRemotingClient).onChannelInactive(any(Channel.class));
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check whether this test verifies what we're trying to implement in this issue!

@funky-eyes funky-eyes added this to the 2.5.0 milestone May 19, 2025
Copy link
Contributor

@funky-eyes funky-eyes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@YongGoose YongGoose requested a review from slievrly May 19, 2025 05:19
@funky-eyes funky-eyes merged commit c5efbd9 into apache:2.x May 19, 2025
9 checks passed
@funky-eyes
Copy link
Contributor

I would like to know if a PR will be submitted soon to fix the issue mentioned in #5283?

Comment on lines +425 to +456
protected void cleanupResourcesForChannel(Channel channel) {
if (channel == null) {
return;
}
ChannelException cause = new ChannelException(
String.format("Channel disconnected: %s", channel.remoteAddress()));

Set<Integer> messageIds = collectMessageIdsForChannel(channel.id());
cleanupFuturesForMessageIds(messageIds, cause);

LOGGER.info("Cleaned up {} pending requests for disconnected channel: {}",
messageIds.size(), channel.remoteAddress());
}

/**
* Collects message IDs associated with a specific channel.
* This is used during channel cleanup to identify pending requests.
*
* @param channelId the ID of the channel
* @return a set of message IDs associated with the channel
*/
private Set<Integer> collectMessageIdsForChannel(ChannelId channelId) {
Set<Integer> messageIds = new HashSet<>();

String serverAddress = null;
for (Map.Entry<String, Channel> entry : clientChannelManager.getChannels().entrySet()) {
Channel channel = entry.getValue();
if (channelId.equals(channel.id())) {
serverAddress = entry.getKey();
break;
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@funky-eyes

I believe this method addresses issue #5283.
What are your thoughts?

Comment on lines +82 to +120
@Test
void testCleanupMessageFuturesOnChannelDisconnection() {
when(channel.id()).thenReturn(channelId);
when(channel.remoteAddress()).thenReturn(new InetSocketAddress("127.0.0.1", 8091));

MessageFuture messageFuture1 = new MessageFuture();
RpcMessage rpcMessage1 = createRpcMessage(1);
messageFuture1.setRequestMessage(rpcMessage1);
futures.put(1, messageFuture1);

MessageFuture messageFuture2 = new MessageFuture();
RpcMessage rpcMessage2 = createRpcMessage(2);
messageFuture2.setRequestMessage(rpcMessage2);
futures.put(2, messageFuture2);

int parentId = 100;
MergedWarpMessage mergeMessage = new MergedWarpMessage();
mergeMessage.msgIds = new ArrayList<>();
mergeMessage.msgIds.add(1);
mergeMessage.msgIds.add(2);

mergeMsgMap.put(parentId, mergeMessage);

String serverAddress = "127.0.0.1:8091";
channels.put(serverAddress, channel);

BlockingQueue<RpcMessage> basket = new LinkedBlockingQueue<>();
basket.add(rpcMessage1);
basket.add(rpcMessage2);
basketMap.put(serverAddress, basket);

client.cleanupResourcesForChannel(channel);

assertFalse(futures.containsKey(1), "Future ID 1 has not been removed");
assertFalse(futures.containsKey(2), "Future ID 2 has not been removed");

assertThrows(RuntimeException.class, () -> messageFuture1.get(0, java.util.concurrent.TimeUnit.MILLISECONDS));
assertThrows(RuntimeException.class, () -> messageFuture2.get(0, java.util.concurrent.TimeUnit.MILLISECONDS));
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@funky-eyes

Ditto

@YongGoose
Copy link
Member Author

I would like to know if a PR will be submitted soon to fix the issue mentioned in #5283?

Please take a look

* @param cause the exception to set as the result for each future
*/
private void cleanupFuturesForMessageIds(Set<Integer> messageIds, Exception cause) {
for (Integer messageId : messageIds) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    @Override
    public void sendAsyncRequest(Channel channel, Object msg) {
        if (channel == null) {
            LOGGER.warn("sendAsyncRequest nothing, caused by null channel.");
            throw new FrameworkException(new Throwable("throw"), "frameworkException", FrameworkErrorCode.ChannelIsNotWritable);
        }
        RpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof HeartbeatMessage
            ? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
            : ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
        Object body = rpcMessage.getBody();
        if (body instanceof MergeMessage) {
            Integer parentId = rpcMessage.getId();
            mergeMsgMap.put(parentId, (MergeMessage)rpcMessage.getBody());
            if (body instanceof MergedWarpMessage) {
                for (Integer msgId : ((MergedWarpMessage)rpcMessage.getBody()).msgIds) {
                    childToParentMap.put(msgId, parentId);
                }
            }
        }
        super.sendAsync(channel, rpcMessage);
    }

It seems that childToParentMap is not being cleared.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll create a PR that also clears the childToParentMap as soon as possible—hopefully by tomorrow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

RemotingClient does not support setting a listener ClientOnResponseProcessor#mergeMsgMap&futures is a risk of memory leaks

2 participants