Skip to content

Commit dd2c122

Browse files
authored
Merge bac32c6 into 5523a62
2 parents 5523a62 + bac32c6 commit dd2c122

File tree

21 files changed

+104
-263
lines changed

21 files changed

+104
-263
lines changed

dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.dolphinscheduler.alert.rpc;
1919

2020
import org.apache.dolphinscheduler.alert.config.AlertConfig;
21-
import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
21+
import org.apache.dolphinscheduler.extract.base.NettyRemotingServerFactory;
2222
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
2323
import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
2424

@@ -31,7 +31,8 @@
3131
public class AlertRpcServer extends SpringServerMethodInvokerDiscovery implements AutoCloseable {
3232

3333
public AlertRpcServer(AlertConfig alertConfig) {
34-
super(new NettyRemotingServer(new NettyServerConfig(alertConfig.getPort())));
34+
super(NettyRemotingServerFactory.buildNettyRemotingServer(
35+
NettyServerConfig.builder().serverName("AlertRpcServer").listenPort(alertConfig.getPort()).build()));
3536
}
3637

3738
public void start() {

dolphinscheduler-api/src/main/resources/logback-spring.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
2424
<encoder>
2525
<pattern>
26-
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n
26+
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{10}:[%line] - %msg%n
2727
</pattern>
2828
<charset>UTF-8</charset>
2929
</encoder>
@@ -40,7 +40,7 @@
4040
</rollingPolicy>
4141
<encoder>
4242
<pattern>
43-
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n
43+
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{10}:[%line] - %msg%n
4444
</pattern>
4545
<charset>UTF-8</charset>
4646
</encoder>

dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public class LoggerServiceTest {
9191

9292
@BeforeEach
9393
public void setUp() {
94-
nettyRemotingServer = new NettyRemotingServer(new NettyServerConfig(8080));
94+
nettyRemotingServer = new NettyRemotingServer(NettyServerConfig.builder().listenPort(8080).build());
9595
nettyRemotingServer.start();
9696
SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery =
9797
new SpringServerMethodInvokerDiscovery(nettyRemotingServer);

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java

Lines changed: 1 addition & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@ private Constants() {
3535
*/
3636
public static final String COMMON_PROPERTIES_PATH = "/common.properties";
3737

38-
public static final String REGISTRY_DOLPHINSCHEDULER_MASTERS = "/nodes/master";
39-
public static final String REGISTRY_DOLPHINSCHEDULER_WORKERS = "/nodes/worker";
40-
4138
public static final String FORMAT_SS = "%s%s";
4239
public static final String FORMAT_S_S = "%s/%s";
4340
public static final String FORMAT_S_S_COLON = "%s:%s";
@@ -191,11 +188,6 @@ private Constants() {
191188
*/
192189
public static final String DOUBLE_SLASH = "//";
193190

194-
/**
195-
* EQUAL SIGN
196-
*/
197-
public static final String EQUAL_SIGN = "=";
198-
199191
/**
200192
* AT SIGN
201193
*/
@@ -235,11 +227,6 @@ private Constants() {
235227
*/
236228
public static final int SOCKET_TIMEOUT = 60 * 1000;
237229

238-
/**
239-
* registry session timeout
240-
*/
241-
public static final int REGISTRY_SESSION_TIMEOUT = 10 * 1000;
242-
243230
/**
244231
* http header
245232
*/
@@ -300,22 +287,6 @@ private Constants() {
300287
*/
301288
public static final int MAX_TASK_TIMEOUT = 24 * 3600;
302289

303-
/**
304-
* worker host weight
305-
*/
306-
public static final int DEFAULT_WORKER_HOST_WEIGHT = 100;
307-
308-
/**
309-
* unit convertor for minute to second
310-
*/
311-
public static final int MINUTE_2_SECOND_TIME_UNIT = 60;
312-
313-
/***
314-
*
315-
* rpc port
316-
*/
317-
public static final String RPC_PORT = "rpc.port";
318-
319290
/**
320291
* forbid running task
321292
*/
@@ -356,21 +327,6 @@ private Constants() {
356327

357328
public static final Duration SERVER_CLOSE_WAIT_TIME = Duration.ofSeconds(3);
358329

359-
/**
360-
* one second mils
361-
*/
362-
public static final long SECOND_TIME_MILLIS = 1_000L;
363-
364-
/**
365-
* master task instance cache-database refresh interval
366-
*/
367-
public static final long CACHE_REFRESH_TIME_MILLIS = 20 * 1_000L;
368-
369-
/**
370-
* heartbeat for zk info length
371-
*/
372-
public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 14;
373-
374330
/**
375331
* jar
376332
*/
@@ -408,39 +364,10 @@ private Constants() {
408364
*/
409365
public static final int VERSION_FIRST = 1;
410366

411-
/**
412-
* ACCEPTED
413-
*/
414-
public static final String ACCEPTED = "ACCEPTED";
415-
416-
/**
417-
* SUCCEEDED
418-
*/
419-
public static final String SUCCEEDED = "SUCCEEDED";
420-
/**
421-
* ENDED
422-
*/
423-
public static final String ENDED = "ENDED";
424-
/**
425-
* NEW
426-
*/
427-
public static final String NEW = "NEW";
428-
/**
429-
* NEW_SAVING
430-
*/
431-
public static final String NEW_SAVING = "NEW_SAVING";
432-
/**
433-
* SUBMITTED
434-
*/
435-
public static final String SUBMITTED = "SUBMITTED";
436367
/**
437368
* FAILED
438369
*/
439370
public static final String FAILED = "FAILED";
440-
/**
441-
* KILLED
442-
*/
443-
public static final String KILLED = "KILLED";
444371
/**
445372
* RUNNING
446373
*/
@@ -449,25 +376,11 @@ private Constants() {
449376
* underline "_"
450377
*/
451378
public static final String UNDERLINE = "_";
452-
/**
453-
* application regex
454-
*/
455-
public static final String APPLICATION_REGEX = "application_\\d+_\\d+";
379+
456380
public static final String PID = SystemUtils.IS_OS_WINDOWS ? "handle" : "pid";
457381

458-
public static final char SUBTRACT_CHAR = '-';
459-
public static final char ADD_CHAR = '+';
460-
public static final char MULTIPLY_CHAR = '*';
461-
public static final char DIVISION_CHAR = '/';
462-
public static final char LEFT_BRACE_CHAR = '(';
463-
public static final char RIGHT_BRACE_CHAR = ')';
464-
public static final String ADD_STRING = "+";
465382
public static final String STAR = "*";
466-
public static final String DIVISION_STRING = "/";
467-
public static final String LEFT_BRACE_STRING = "(";
468-
public static final char P = 'P';
469383
public static final char N = 'N';
470-
public static final String SUBTRACT_STRING = "-";
471384
public static final String GLOBAL_PARAMS = "globalParams";
472385
public static final String LOCAL_PARAMS = "localParams";
473386
public static final String SUBPROCESS_INSTANCE_ID = "subProcessInstanceId";
@@ -482,9 +395,6 @@ private Constants() {
482395
public static final String QUEUE_NAME = "queueName";
483396
public static final int LOG_QUERY_SKIP_LINE_NUMBER = 0;
484397
public static final int LOG_QUERY_LIMIT = 4096;
485-
public static final String BLOCKING_CONDITION = "blockingCondition";
486-
public static final String ALERT_WHEN_BLOCKING = "alertWhenBlocking";
487-
488398
public static final String ALIAS = "alias";
489399
public static final String CONTENT = "content";
490400
public static final String DEPENDENT_SPLIT = ":||";
@@ -527,11 +437,6 @@ private Constants() {
527437
public static final String HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE =
528438
"hadoop.security.authentication.startup.state";
529439

530-
/**
531-
* com.amazonaws.services.s3.enableV4
532-
*/
533-
public static final String AWS_S3_V4 = "com.amazonaws.services.s3.enableV4";
534-
535440
/**
536441
* loginUserFromKeytab user
537442
*/
@@ -550,11 +455,6 @@ private Constants() {
550455
public static final String WORKFLOW_INSTANCE_ID_MDC_KEY = "workflowInstanceId";
551456
public static final String TASK_INSTANCE_ID_MDC_KEY = "taskInstanceId";
552457

553-
/**
554-
* task log info format
555-
*/
556-
public static final String TASK_LOG_INFO_FORMAT = "TaskLogInfo-%s";
557-
558458
/**
559459
* double brackets left
560460
*/
@@ -647,10 +547,6 @@ private Constants() {
647547
* authorize writable perm
648548
*/
649549
public static final int AUTHORIZE_WRITABLE_PERM = 7;
650-
/**
651-
* authorize readable perm
652-
*/
653-
public static final int AUTHORIZE_READABLE_PERM = 4;
654550

655551
public static final String START_TIME = "start time";
656552
public static final String END_TIME = "end time";
@@ -682,8 +578,6 @@ private Constants() {
682578
*/
683579
public static final String DATA_QUALITY_ERROR_OUTPUT_PATH = "data-quality.error.output.path";
684580

685-
public static final String CACHE_KEY_VALUE_ALL = "'all'";
686-
687581
/**
688582
* use for k8s
689583
*/
@@ -784,12 +678,6 @@ private Constants() {
784678
*/
785679
public static final String SUPPORT_HIVE_ONE_SESSION = "support.hive.oneSession";
786680

787-
public static final String PRINCIPAL = "principal";
788-
public static final String ORACLE_DB_CONNECT_TYPE = "connectType";
789-
public static final String KERBEROS_KRB5_CONF_PATH = "javaSecurityKrb5Conf";
790-
public static final String KERBEROS_KEY_TAB_USERNAME = "loginUserKeytabUsername";
791-
public static final String KERBEROS_KEY_TAB_PATH = "loginUserKeytabPath";
792-
793681
public static final Integer QUERY_ALL_ON_SYSTEM = 0;
794682
public static final Integer QUERY_ALL_ON_PROJECT = 1;
795683
public static final Integer QUERY_ALL_ON_WORKFLOW = 2;

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,6 @@
4141

4242
import com.google.common.collect.Sets;
4343

44-
/**
45-
* NetUtils
46-
*/
4744
@Slf4j
4845
public class NetUtils {
4946

@@ -213,15 +210,17 @@ private static List<NetworkInterface> findSuitableNetworkInterface() {
213210
.collect(Collectors.toList());
214211

215212
// Use the specified network interface if set
216-
if (StringUtils.isNotBlank(specifyNetworkInterfaceName())) {
217-
String specifyNetworkInterfaceName = specifyNetworkInterfaceName();
213+
String specifiedNetworkInterfaceName = specifyNetworkInterfaceName();
214+
if (StringUtils.isNotBlank(specifiedNetworkInterfaceName)) {
218215
validNetworkInterfaces = validNetworkInterfaces.stream()
219-
.filter(networkInterface -> specifyNetworkInterfaceName.equals(networkInterface.getDisplayName()))
216+
.filter(networkInterface -> specifiedNetworkInterfaceName.equals(networkInterface.getDisplayName()))
220217
.collect(Collectors.toList());
221218
if (CollectionUtils.isEmpty(validNetworkInterfaces)) {
222219
throw new IllegalArgumentException(
223-
"The specified network interface: " + specifyNetworkInterfaceName + " is not found");
220+
"The specified network interface: " + specifiedNetworkInterfaceName + " is not found");
224221
}
222+
log.info("Use the specified network interface: {} -> {}", specifiedNetworkInterfaceName,
223+
validNetworkInterfaces);
225224
}
226225

227226
Set<String> restrictNetworkInterfaceName = restrictNetworkInterfaceName();
@@ -307,9 +306,10 @@ private static List<NetworkInterface> getAllNetworkInterfaces() throws SocketExc
307306
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
308307
while (interfaces.hasMoreElements()) {
309308
NetworkInterface networkInterface = interfaces.nextElement();
310-
log.info("Found NetworkInterface: {}", networkInterface);
309+
log.debug("Found NetworkInterface: {}", networkInterface);
311310
validNetworkInterfaces.add(networkInterface);
312311
}
312+
log.info("Get all NetworkInterfaces: {}", validNetworkInterfaces);
313313
return validNetworkInterfaces;
314314
}
315315

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.dolphinscheduler.dao.entity;
1919

20-
import static org.apache.dolphinscheduler.common.constants.Constants.MINUTE_2_SECOND_TIME_UNIT;
2120
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
2221
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS;
2322
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
@@ -38,6 +37,7 @@
3837
import java.io.Serializable;
3938
import java.util.Date;
4039
import java.util.Map;
40+
import java.util.concurrent.TimeUnit;
4141

4242
import lombok.Data;
4343

@@ -407,7 +407,7 @@ public boolean retryTaskIntervalOverTime() {
407407
Date now = new Date();
408408
long failedTimeInterval = DateUtils.differSec(now, getEndTime());
409409
// task retry does not over time, return false
410-
return getRetryInterval() * MINUTE_2_SECOND_TIME_UNIT < failedTimeInterval;
410+
return TimeUnit.MINUTES.toSeconds(getRetryInterval()) < failedTimeInterval;
411411
}
412412

413413
}

dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,11 @@ public class NettyRemotingServer {
7373
public NettyRemotingServer(final NettyServerConfig serverConfig) {
7474
this.serverConfig = serverConfig;
7575
ThreadFactory bossThreadFactory =
76-
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("NettyServerBossThread_%s").build();
76+
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(serverConfig.getServerName() + "BossThread_%s")
77+
.build();
7778
ThreadFactory workerThreadFactory =
78-
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("NettyServerWorkerThread_%s").build();
79+
new ThreadFactoryBuilder().setDaemon(true)
80+
.setNameFormat(serverConfig.getServerName() + "WorkerThread_%s").build();
7981
if (Epoll.isAvailable()) {
8082
this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory);
8183
this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);
@@ -108,16 +110,23 @@ protected void initChannel(SocketChannel ch) {
108110
try {
109111
future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
110112
} catch (Exception e) {
111-
log.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e);
112-
throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
113+
log.error("{} bind fail {}, exit", serverConfig.getServerName(), e.getMessage(), e);
114+
throw new RemoteException(
115+
String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort()));
113116
}
117+
114118
if (future.isSuccess()) {
115-
log.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
116-
} else if (future.cause() != null) {
117-
throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()),
119+
log.info("{} bind success at port: {}", serverConfig.getServerName(), serverConfig.getListenPort());
120+
return;
121+
}
122+
123+
if (future.cause() != null) {
124+
throw new RemoteException(
125+
String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort()),
118126
future.cause());
119127
} else {
120-
throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
128+
throw new RemoteException(
129+
String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort()));
121130
}
122131
}
123132
}

0 commit comments

Comments
 (0)