Skip to content

Commit e937402

Browse files
authored
Merge branch 'dev' into improve_e2e_m1_chip_local_mode
2 parents 30e61f4 + fd5a182 commit e937402

File tree

17 files changed

+421
-40
lines changed

17 files changed

+421
-40
lines changed

docs/docs/en/guide/metrics/metrics.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ For example, you can get the master metrics by `curl http://localhost:5679/actua
9191
- stop: the number of stopped workflow instances
9292
- failover: the number of workflow instance fail-overs
9393

94+
### RPC Related Metrics
95+
96+
- ds.rpc.client.sync.request.exception.count: (counter) the number of exceptions occurred in sync rpc requests
97+
- ds.rpc.client.sync.request.duration.time: (histogram) the time cost of sync rpc requests
98+
9499
### Master Server Metrics
95100

96101
- ds.master.overload.count: (counter) the number of times the master overloaded

docs/docs/zh/guide/metrics/metrics.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ metrics exporter端口`server.port`是在application.yaml里定义的: master: `
9191
- stop:停止的工作流实例数量
9292
- failover:容错的工作流实例数量
9393

94+
### RPC相关指标
95+
96+
- ds.rpc.client.sync.request.exception.count: (counter) 同步rpc请求异常数
97+
- ds.rpc.client.sync.request.duration.time: (histogram) 同步rpc请求耗时
98+
9499
### Master Server指标
95100

96101
- ds.master.overload.count: (counter) master过载次数

dolphinscheduler-extract/dolphinscheduler-extract-base/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,13 @@
4747
<artifactId>dolphinscheduler-common</artifactId>
4848
<version>${project.version}</version>
4949
</dependency>
50+
51+
<dependency>
52+
<groupId>org.apache.dolphinscheduler</groupId>
53+
<artifactId>dolphinscheduler-meter</artifactId>
54+
<version>${project.version}</version>
55+
</dependency>
56+
5057
<dependency>
5158
<groupId>io.netty</groupId>
5259
<artifactId>netty-all</artifactId>

dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,6 @@
2828
@Documented
2929
public @interface RpcMethod {
3030

31-
long timeout() default 3000L;
31+
long timeout() default -1;
3232

3333
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.extract.base;
19+
20+
import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
21+
import org.apache.dolphinscheduler.extract.base.utils.Host;
22+
23+
import lombok.AllArgsConstructor;
24+
import lombok.Builder;
25+
import lombok.Data;
26+
import lombok.NoArgsConstructor;
27+
28+
@Data
29+
@Builder
30+
@AllArgsConstructor
31+
@NoArgsConstructor
32+
public class SyncRequestDto {
33+
34+
private Host serverHost;
35+
private Transporter transporter;
36+
private long timeoutMillis;
37+
38+
}

dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyClientHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
7777
.writeAndFlush(HeartBeatTransporter.getHeartBeatTransporter())
7878
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
7979
if (log.isDebugEnabled()) {
80-
log.debug("Client send heart beat to: {}", ChannelUtils.getRemoteAddress(ctx.channel()));
80+
log.info("Client send heartbeat to: {}", ctx.channel().remoteAddress());
8181
}
8282
} else {
8383
super.userEventTriggered(ctx, evt);

dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java

Lines changed: 54 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,17 @@
1919

2020
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
2121
import org.apache.dolphinscheduler.extract.base.IRpcResponse;
22+
import org.apache.dolphinscheduler.extract.base.SyncRequestDto;
2223
import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
2324
import org.apache.dolphinscheduler.extract.base.exception.RemotingException;
2425
import org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException;
2526
import org.apache.dolphinscheduler.extract.base.future.ResponseFuture;
27+
import org.apache.dolphinscheduler.extract.base.metrics.ClientSyncDurationMetrics;
28+
import org.apache.dolphinscheduler.extract.base.metrics.ClientSyncExceptionMetrics;
29+
import org.apache.dolphinscheduler.extract.base.metrics.RpcMetrics;
2630
import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
2731
import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder;
2832
import org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder;
29-
import org.apache.dolphinscheduler.extract.base.utils.Constants;
3033
import org.apache.dolphinscheduler.extract.base.utils.Host;
3134
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
3235

@@ -97,8 +100,8 @@ public void initChannel(SocketChannel ch) {
97100
ch.pipeline()
98101
.addLast("client-idle-handler",
99102
new IdleStateHandler(
100-
Constants.NETTY_CLIENT_HEART_BEAT_TIME,
101103
0,
104+
clientConfig.getHeartBeatIntervalMillis(),
102105
0,
103106
TimeUnit.MILLISECONDS))
104107
.addLast(new TransporterDecoder(), clientHandler, new TransporterEncoder());
@@ -107,38 +110,60 @@ public void initChannel(SocketChannel ch) {
107110
isStarted.compareAndSet(false, true);
108111
}
109112

110-
public IRpcResponse sendSync(final Host host,
111-
final Transporter transporter,
112-
final long timeoutMillis) throws InterruptedException, RemotingException {
113-
final Channel channel = getOrCreateChannel(host);
114-
if (channel == null) {
115-
throw new RemotingException(String.format("connect to : %s fail", host));
116-
}
113+
public IRpcResponse sendSync(SyncRequestDto syncRequestDto) throws RemotingException {
114+
long start = System.currentTimeMillis();
115+
116+
final Host host = syncRequestDto.getServerHost();
117+
final Transporter transporter = syncRequestDto.getTransporter();
118+
final long timeoutMillis = syncRequestDto.getTimeoutMillis() < 0 ? clientConfig.getConnectTimeoutMillis()
119+
: syncRequestDto.getTimeoutMillis();
117120
final long opaque = transporter.getHeader().getOpaque();
118-
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis);
119-
channel.writeAndFlush(transporter).addListener(future -> {
120-
if (future.isSuccess()) {
121-
responseFuture.setSendOk(true);
122-
return;
123-
} else {
124-
responseFuture.setSendOk(false);
121+
122+
try {
123+
final Channel channel = getOrCreateChannel(host);
124+
if (channel == null) {
125+
throw new RemotingException(String.format("connect to : %s fail", host));
125126
}
126-
responseFuture.setCause(future.cause());
127-
responseFuture.putResponse(null);
128-
log.error("Send Sync request {} to host {} failed", transporter, host, responseFuture.getCause());
129-
});
130-
/*
131-
* sync wait for result
132-
*/
133-
IRpcResponse iRpcResponse = responseFuture.waitResponse();
134-
if (iRpcResponse == null) {
135-
if (responseFuture.isSendOK()) {
136-
throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());
127+
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis);
128+
channel.writeAndFlush(transporter).addListener(future -> {
129+
if (future.isSuccess()) {
130+
responseFuture.setSendOk(true);
131+
return;
132+
} else {
133+
responseFuture.setSendOk(false);
134+
}
135+
responseFuture.setCause(future.cause());
136+
responseFuture.putResponse(null);
137+
log.error("Send Sync request {} to host {} failed", transporter, host, responseFuture.getCause());
138+
});
139+
/*
140+
* sync wait for result
141+
*/
142+
IRpcResponse iRpcResponse = responseFuture.waitResponse();
143+
if (iRpcResponse == null) {
144+
if (responseFuture.isSendOK()) {
145+
throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());
146+
} else {
147+
throw new RemotingException(host.toString(), responseFuture.getCause());
148+
}
149+
}
150+
return iRpcResponse;
151+
} catch (Exception ex) {
152+
ClientSyncExceptionMetrics clientSyncExceptionMetrics = ClientSyncExceptionMetrics
153+
.of(syncRequestDto)
154+
.withThrowable(ex);
155+
RpcMetrics.recordClientSyncRequestException(clientSyncExceptionMetrics);
156+
if (ex instanceof RemotingException) {
157+
throw (RemotingException) ex;
137158
} else {
138-
throw new RemotingException(host.toString(), responseFuture.getCause());
159+
throw new RemotingException(ex);
139160
}
161+
} finally {
162+
ClientSyncDurationMetrics clientSyncDurationMetrics = ClientSyncDurationMetrics
163+
.of(syncRequestDto)
164+
.withMilliseconds(System.currentTimeMillis() - start);
165+
RpcMetrics.recordClientSyncRequestDuration(clientSyncDurationMetrics);
140166
}
141-
return iRpcResponse;
142167
}
143168

144169
Channel getOrCreateChannel(Host host) {

dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.dolphinscheduler.extract.base.IRpcResponse;
2121
import org.apache.dolphinscheduler.extract.base.RpcMethod;
2222
import org.apache.dolphinscheduler.extract.base.StandardRpcRequest;
23+
import org.apache.dolphinscheduler.extract.base.SyncRequestDto;
2324
import org.apache.dolphinscheduler.extract.base.exception.MethodInvocationException;
2425
import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
2526
import org.apache.dolphinscheduler.extract.base.protocal.TransporterHeader;
@@ -41,8 +42,12 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
4142
transporter.setBody(JsonSerializer.serialize(StandardRpcRequest.of(args)));
4243
transporter.setHeader(TransporterHeader.of(methodIdentifier));
4344

44-
IRpcResponse iRpcResponse =
45-
nettyRemotingClient.sendSync(serverHost, transporter, sync.timeout());
45+
SyncRequestDto syncRequestDto = SyncRequestDto.builder()
46+
.timeoutMillis(sync.timeout())
47+
.transporter(transporter)
48+
.serverHost(serverHost)
49+
.build();
50+
IRpcResponse iRpcResponse = nettyRemotingClient.sendSync(syncRequestDto);
4651
if (!iRpcResponse.isSuccess()) {
4752
throw MethodInvocationException.of(iRpcResponse.getMessage());
4853
}

dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyClientConfig.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.dolphinscheduler.extract.base.config;
1919

20+
import java.time.Duration;
21+
2022
import lombok.AllArgsConstructor;
2123
import lombok.Builder;
2224
import lombok.Data;
@@ -64,4 +66,14 @@ public class NettyClientConfig {
6466
@Builder.Default
6567
private int connectTimeoutMillis = 3000;
6668

69+
/**
70+
* Will send {@link org.apache.dolphinscheduler.extract.base.protocal.HeartBeatTransporter} to netty server every
71+
* heartBeatIntervalMillis, used to keep the {@link io.netty.channel.Channel} active.
72+
*/
73+
@Builder.Default
74+
private long heartBeatIntervalMillis = Duration.ofSeconds(10).toMillis();
75+
76+
@Builder.Default
77+
private int defaultRpcTimeoutMillis = 10_000;
78+
6779
}

dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.dolphinscheduler.extract.base.config;
1919

20+
import java.time.Duration;
21+
2022
import lombok.AllArgsConstructor;
2123
import lombok.Builder;
2224
import lombok.Data;
@@ -66,6 +68,12 @@ public class NettyServerConfig {
6668
@Builder.Default
6769
private int workerThread = Runtime.getRuntime().availableProcessors() * 2;
6870

71+
/**
72+
* If done's receive any data from a {@link io.netty.channel.Channel} during 180s then will close it.
73+
*/
74+
@Builder.Default
75+
private long connectionIdleTime = Duration.ofSeconds(60).toMillis();
76+
6977
/**
7078
* listen port
7179
*/

0 commit comments

Comments
 (0)