Skip to content

Commit fea66b8

Browse files
committed
Fix exception occur in RpcServer side, it will not be sent to RpcClient
1 parent edbf5cd commit fea66b8

File tree

4 files changed

+41
-11
lines changed

4 files changed

+41
-11
lines changed

dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/StandardRpcRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public static StandardRpcRequest of(Object[] args) {
4040
final Class<?>[] argsTypes = new Class[args.length];
4141
for (int i = 0; i < args.length; i++) {
4242
argsBytes[i] = JsonSerializer.serialize(args[i]);
43-
argsTypes[i] = args[i].getClass();
43+
argsTypes[i] = args[i] == null ? null : args[i].getClass();
4444
}
4545
return new StandardRpcRequest(argsBytes, argsTypes);
4646
}

dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,31 +33,37 @@ public class NettyServerConfig {
3333
/**
3434
* init the server connectable queue
3535
*/
36+
@Builder.Default
3637
private int soBacklog = 1024;
3738

3839
/**
3940
* whether tpc delay
4041
*/
42+
@Builder.Default
4143
private boolean tcpNoDelay = true;
4244

4345
/**
4446
* whether keep alive
4547
*/
48+
@Builder.Default
4649
private boolean soKeepalive = true;
4750

4851
/**
4952
* send buffer size
5053
*/
54+
@Builder.Default
5155
private int sendBufferSize = 65535;
5256

5357
/**
5458
* receive buffer size
5559
*/
60+
@Builder.Default
5661
private int receiveBufferSize = 65535;
5762

5863
/**
5964
* worker threads,default get machine cpus
6065
*/
66+
@Builder.Default
6167
private int workerThread = Runtime.getRuntime().availableProcessors() * 2;
6268

6369
/**

dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/ServerMethodInvokerImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.dolphinscheduler.extract.base.server;
1919

20+
import java.lang.reflect.InvocationTargetException;
2021
import java.lang.reflect.Method;
2122

2223
public class ServerMethodInvokerImpl implements ServerMethodInvoker {
@@ -36,7 +37,11 @@ public ServerMethodInvokerImpl(Object serviceBean, Method method) {
3637
@Override
3738
public Object invoke(Object... args) throws Throwable {
3839
// todo: check the request param when register
39-
return method.invoke(serviceBean, args);
40+
try {
41+
return method.invoke(serviceBean, args);
42+
} catch (InvocationTargetException ex) {
43+
throw ex.getTargetException();
44+
}
4045
}
4146

4247
@Override

dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,19 @@
1717

1818
package org.apache.dolphinscheduler.extract.base.client;
1919

20+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
21+
import static org.junit.jupiter.api.Assertions.assertEquals;
22+
2023
import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
2124
import org.apache.dolphinscheduler.extract.base.RpcMethod;
2225
import org.apache.dolphinscheduler.extract.base.RpcService;
2326
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
27+
import org.apache.dolphinscheduler.extract.base.exception.MethodInvocationException;
2428
import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
2529

30+
import org.apache.commons.lang3.RandomUtils;
31+
import org.apache.commons.lang3.StringUtils;
32+
2633
import org.junit.jupiter.api.AfterEach;
2734
import org.junit.jupiter.api.Assertions;
2835
import org.junit.jupiter.api.BeforeEach;
@@ -32,36 +39,45 @@ public class SingletonJdkDynamicRpcClientProxyFactoryTest {
3239

3340
private NettyRemotingServer nettyRemotingServer;
3441

42+
private String serverAddress;
43+
3544
@BeforeEach
3645
public void setUp() {
37-
nettyRemotingServer =
38-
new NettyRemotingServer(NettyServerConfig.builder().serverName("ApiServer").listenPort(12345).build());
46+
int listenPort = RandomUtils.nextInt(10000, 20000);
47+
NettyServerConfig nettyServerConfig = NettyServerConfig.builder()
48+
.serverName("ApiServer")
49+
.listenPort(listenPort)
50+
.build();
51+
nettyRemotingServer = new NettyRemotingServer(nettyServerConfig);
3952
nettyRemotingServer.start();
40-
53+
serverAddress = "localhost:" + listenPort;
4154
new SpringServerMethodInvokerDiscovery(nettyRemotingServer)
4255
.postProcessAfterInitialization(new IServiceImpl(), "iServiceImpl");
4356
}
4457

4558
@Test
4659
public void getProxyClient() {
4760
IService proxyClient =
48-
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345", IService.class);
61+
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(serverAddress, IService.class);
4962
Assertions.assertNotNull(proxyClient);
5063
}
5164

5265
@Test
5366
public void testPing() {
5467
IService proxyClient =
55-
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345", IService.class);
56-
String ping = proxyClient.ping("ping");
57-
Assertions.assertEquals("pong", ping);
68+
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(serverAddress, IService.class);
69+
assertEquals("pong", proxyClient.ping("ping"));
70+
71+
MethodInvocationException methodInvocationException =
72+
Assertions.assertThrows(MethodInvocationException.class, () -> proxyClient.ping(null));
73+
assertEquals("ping: null is illegal", methodInvocationException.getMessage());
5874
}
5975

6076
@Test
6177
public void testVoid() {
6278
IService proxyClient =
63-
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345", IService.class);
64-
Assertions.assertDoesNotThrow(proxyClient::voidMethod);
79+
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(serverAddress, IService.class);
80+
assertDoesNotThrow(proxyClient::voidMethod);
6581
}
6682

6783
@AfterEach
@@ -83,6 +99,9 @@ public static class IServiceImpl implements IService {
8399

84100
@Override
85101
public String ping(String ping) {
102+
if (StringUtils.isEmpty(ping)) {
103+
throw new IllegalArgumentException("ping: " + ping + " is illegal");
104+
}
86105
return "pong";
87106
}
88107

0 commit comments

Comments
 (0)