Skip to content

Commit da8f962

Browse files
authored
Merge branch 'dev' into Improvement-15489
2 parents c7540d0 + 64e1e67 commit da8f962

File tree

23 files changed

+159
-104
lines changed

23 files changed

+159
-104
lines changed

dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
package org.apache.dolphinscheduler.alert;
1919

20+
import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics;
2021
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
2122
import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient;
2223
import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer;
2324
import org.apache.dolphinscheduler.alert.service.AlertBootstrapService;
2425
import org.apache.dolphinscheduler.alert.service.ListenerEventPostService;
2526
import org.apache.dolphinscheduler.common.constants.Constants;
2627
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
28+
import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
2729
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
2830

2931
import javax.annotation.PreDestroy;
@@ -54,6 +56,8 @@ public class AlertServer {
5456
private AlertRegistryClient alertRegistryClient;
5557

5658
public static void main(String[] args) {
59+
AlertServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
60+
Thread.setDefaultUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
5761
Thread.currentThread().setName(Constants.THREAD_NAME_ALERT_SERVER);
5862
new SpringApplicationBuilder(AlertServer.class).run(args);
5963
}

dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/metrics/AlertServerMetrics.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ public void registerPendingAlertGauge(final Supplier<Number> supplier) {
4545
.register(Metrics.globalRegistry);
4646
}
4747

48+
public static void registerUncachedException(final Supplier<Number> supplier) {
49+
Gauge.builder("ds.alert.uncached.exception", supplier)
50+
.description("number of uncached exception")
51+
.register(Metrics.globalRegistry);
52+
}
53+
4854
public void incAlertSuccessCount() {
4955
alertSuccessCounter.increment();
5056
}

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.dolphinscheduler.api;
1919

20+
import org.apache.dolphinscheduler.api.metrics.ApiServerMetrics;
2021
import org.apache.dolphinscheduler.common.enums.PluginType;
22+
import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
2123
import org.apache.dolphinscheduler.dao.PluginDao;
2224
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
2325
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
@@ -51,6 +53,8 @@ public class ApiApplicationServer {
5153
private PluginDao pluginDao;
5254

5355
public static void main(String[] args) {
56+
ApiServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
57+
Thread.setDefaultUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
5458
SpringApplication.run(ApiApplicationServer.class);
5559
}
5660

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/metrics/ApiServerMetrics.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
package org.apache.dolphinscheduler.api.metrics;
1919

2020
import java.util.concurrent.TimeUnit;
21+
import java.util.function.Supplier;
2122

2223
import lombok.experimental.UtilityClass;
2324
import io.micrometer.core.instrument.Counter;
2425
import io.micrometer.core.instrument.DistributionSummary;
26+
import io.micrometer.core.instrument.Gauge;
2527
import io.micrometer.core.instrument.Metrics;
2628
import io.micrometer.core.instrument.Timer;
2729

@@ -120,4 +122,10 @@ public void cleanUpApiResponseTimeMetricsByUserId(final int userId) {
120122
"ds.api.response.time",
121123
"user.id", String.valueOf(userId)));
122124
}
125+
126+
public static void registerUncachedException(final Supplier<Number> supplier) {
127+
Gauge.builder("ds.api.uncached.exception", supplier)
128+
.description("number of uncached exception")
129+
.register(Metrics.globalRegistry);
130+
}
123131
}

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ public abstract class BaseDaemonThread extends Thread {
2525
protected BaseDaemonThread(Runnable runnable) {
2626
super(runnable);
2727
this.setDaemon(true);
28+
this.setUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
2829
}
2930

3031
protected BaseDaemonThread(String threadName) {
3132
super();
3233
this.setName(threadName);
3334
this.setDaemon(true);
35+
this.setUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
3436
}
3537

3638
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.common.thread;
19+
20+
import java.util.concurrent.atomic.LongAdder;
21+
22+
import lombok.extern.slf4j.Slf4j;
23+
24+
@Slf4j
25+
public class DefaultUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
26+
27+
private static final DefaultUncaughtExceptionHandler INSTANCE = new DefaultUncaughtExceptionHandler();
28+
29+
private static final LongAdder uncaughtExceptionCount = new LongAdder();
30+
31+
private DefaultUncaughtExceptionHandler() {
32+
}
33+
34+
public static DefaultUncaughtExceptionHandler getInstance() {
35+
return INSTANCE;
36+
}
37+
38+
public static long getUncaughtExceptionCount() {
39+
return uncaughtExceptionCount.longValue();
40+
}
41+
42+
@Override
43+
public void uncaughtException(Thread t, Throwable e) {
44+
uncaughtExceptionCount.add(1);
45+
log.error("Caught an exception in {}.", t, e);
46+
}
47+
}

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,24 +31,20 @@
3131
@Slf4j
3232
public class ThreadUtils {
3333

34-
/**
35-
* Wrapper over newDaemonFixedThreadExecutor.
36-
*
37-
* @param threadName threadName
38-
* @param threadsNum threadsNum
39-
* @return ExecutorService
40-
*/
4134
public static ThreadPoolExecutor newDaemonFixedThreadExecutor(String threadName, int threadsNum) {
42-
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build();
43-
return (ThreadPoolExecutor) Executors.newFixedThreadPool(threadsNum, threadFactory);
35+
return (ThreadPoolExecutor) Executors.newFixedThreadPool(threadsNum, newDaemonThreadFactory(threadName));
4436
}
4537

4638
public static ScheduledExecutorService newSingleDaemonScheduledExecutorService(String threadName) {
47-
ThreadFactory threadFactory = new ThreadFactoryBuilder()
48-
.setNameFormat(threadName)
39+
return Executors.newSingleThreadScheduledExecutor(newDaemonThreadFactory(threadName));
40+
}
41+
42+
public static ThreadFactory newDaemonThreadFactory(String threadName) {
43+
return new ThreadFactoryBuilder()
4944
.setDaemon(true)
45+
.setNameFormat(threadName)
46+
.setUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance())
5047
.build();
51-
return Executors.newSingleThreadScheduledExecutor(threadFactory);
5248
}
5349

5450
/**
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.common.thread;
19+
20+
import java.util.concurrent.ThreadPoolExecutor;
21+
22+
import org.junit.jupiter.api.Assertions;
23+
import org.junit.jupiter.api.Test;
24+
25+
class ThreadUtilsTest {
26+
27+
@Test
28+
void newDaemonFixedThreadExecutor() throws InterruptedException {
29+
ThreadPoolExecutor threadPoolExecutor = ThreadUtils.newDaemonFixedThreadExecutor("DemonThread", 1);
30+
threadPoolExecutor.execute(() -> {
31+
throw new IllegalArgumentException("I am an exception");
32+
});
33+
Thread.sleep(1_000);
34+
Assertions.assertEquals(1, DefaultUncaughtExceptionHandler.getUncaughtExceptionCount());
35+
36+
}
37+
}

dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ private void processReceived(final Transporter transporter) {
6767
future.release();
6868
if (future.getInvokeCallback() != null) {
6969
future.removeFuture();
70-
this.callbackExecutor.submit(future::executeInvokeCallback);
70+
this.callbackExecutor.execute(future::executeInvokeCallback);
7171
} else {
7272
future.putResponse(deserialize);
7373
}

dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.dolphinscheduler.extract.base;
1919

20+
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
2021
import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
2122
import org.apache.dolphinscheduler.extract.base.exception.RemotingException;
2223
import org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException;
@@ -30,7 +31,6 @@
3031
import org.apache.dolphinscheduler.extract.base.utils.CallerThreadExecutePolicy;
3132
import org.apache.dolphinscheduler.extract.base.utils.Constants;
3233
import org.apache.dolphinscheduler.extract.base.utils.Host;
33-
import org.apache.dolphinscheduler.extract.base.utils.NamedThreadFactory;
3434
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
3535

3636
import java.net.InetSocketAddress;
@@ -40,6 +40,7 @@
4040
import java.util.concurrent.LinkedBlockingQueue;
4141
import java.util.concurrent.ScheduledExecutorService;
4242
import java.util.concurrent.Semaphore;
43+
import java.util.concurrent.ThreadFactory;
4344
import java.util.concurrent.ThreadPoolExecutor;
4445
import java.util.concurrent.TimeUnit;
4546
import java.util.concurrent.atomic.AtomicBoolean;
@@ -80,25 +81,24 @@ public class NettyRemotingClient implements AutoCloseable {
8081

8182
public NettyRemotingClient(final NettyClientConfig clientConfig) {
8283
this.clientConfig = clientConfig;
84+
ThreadFactory nettyClientThreadFactory = ThreadUtils.newDaemonThreadFactory("NettyClientThread-");
8385
if (Epoll.isAvailable()) {
84-
this.workerGroup =
85-
new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new NamedThreadFactory("NettyClient"));
86+
this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), nettyClientThreadFactory);
8687
} else {
87-
this.workerGroup =
88-
new NioEventLoopGroup(clientConfig.getWorkerThreads(), new NamedThreadFactory("NettyClient"));
88+
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), nettyClientThreadFactory);
8989
}
9090
this.callbackExecutor = new ThreadPoolExecutor(
9191
Constants.CPUS,
9292
Constants.CPUS,
9393
1,
9494
TimeUnit.MINUTES,
9595
new LinkedBlockingQueue<>(1000),
96-
new NamedThreadFactory("CallbackExecutor"),
96+
ThreadUtils.newDaemonThreadFactory("NettyClientCallbackThread-"),
9797
new CallerThreadExecutePolicy());
9898
this.clientHandler = new NettyClientHandler(this, callbackExecutor);
9999

100-
this.responseFutureExecutor =
101-
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor"));
100+
this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(
101+
ThreadUtils.newDaemonThreadFactory("NettyClientResponseFutureThread-"));
102102

103103
this.start();
104104
}

0 commit comments

Comments
 (0)