Skip to content

Commit 7bef33a

Browse files
committed
Use ClusterManager to manage the cluster info
1 parent 9879c7c commit 7bef33a

File tree

81 files changed

+2167
-2436
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

81 files changed

+2167
-2436
lines changed

docs/docs/en/architecture/configuration.md

Lines changed: 24 additions & 24 deletions
Large diffs are not rendered by default.

docs/docs/zh/architecture/configuration.md

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -284,30 +284,30 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId
284284

285285
位置:`master-server/conf/application.yaml`
286286

287-
| 参数 | 默认值 | 描述 |
288-
|-----------------------------------------------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------|
289-
| master.listen-port | 5678 | master监听端口 |
290-
| master.pre-exec-threads | 10 | master准备执行任务的数量,用于限制并行的command |
291-
| master.exec-threads | 100 | master工作线程数量,用于限制并行的流程实例数量 |
292-
| master.dispatch-task-number | 3 | master每个批次的派发任务数量 |
293-
| master.host-selector | lower_weight | master host选择器,用于选择合适的worker执行任务,可选值: random, round_robin, lower_weight |
294-
| master.max-heartbeat-interval | 10s | master最大心跳间隔 |
295-
| master.task-commit-retry-times | 5 | 任务重试次数 |
296-
| master.task-commit-interval | 1000 | 任务提交间隔,单位为毫秒 |
297-
| master.state-wheel-interval | 5 | 轮询检查状态时间 |
298-
| master.server-load-protection.enabled | true | 是否开启系统保护策略 |
299-
| master.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.7 | master最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统CPU |
300-
| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.7 | master最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的JVM CPU |
301-
| master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | master最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统内存 |
302-
| master.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | master最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统磁盘空间 |
303-
| master.failover-interval | 10 | failover间隔,单位为分钟 |
304-
| master.kill-application-when-task-failover | true | 当任务实例failover时,是否kill掉yarn或k8s application |
305-
| master.registry-disconnect-strategy.strategy | stop | 当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting |
306-
| master.registry-disconnect-strategy.max-waiting-time | 100s | 当Master与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 |
307-
| master.master.worker-group-refresh-interval | 10s | 定期将workerGroup从数据库中同步到内存的时间间隔 |
308-
| master.command-fetch-strategy.type | ID_SLOT_BASED | Command拉取策略, 目前仅支持 `ID_SLOT_BASED` |
309-
| master.command-fetch-strategy.config.id-step | 1 | 数据库中t_ds_command的id自增步长 |
310-
| master.command-fetch-strategy.config.fetch-size | 10 | master拉取command数量 |
287+
| 参数 | 默认值 | 描述 |
288+
|-----------------------------------------------------------------------------|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------|
289+
| master.listen-port | 5678 | master监听端口 |
290+
| master.pre-exec-threads | 10 | master准备执行任务的数量,用于限制并行的command |
291+
| master.exec-threads | 100 | master工作线程数量,用于限制并行的流程实例数量 |
292+
| master.dispatch-task-number | 3 | master每个批次的派发任务数量 |
293+
| master.worker-load-balancer-configuration-properties.type | DYNAMIC_WEIGHTED_ROUND_ROBIN | Master 将会使用Worker的动态CPU/Memory/线程池使用率来计算Worker的负载,负载越低的worker将会有更高的机会被分发任务 |
294+
| master.max-heartbeat-interval | 10s | master最大心跳间隔 |
295+
| master.task-commit-retry-times | 5 | 任务重试次数 |
296+
| master.task-commit-interval | 1000 | 任务提交间隔,单位为毫秒 |
297+
| master.state-wheel-interval | 5 | 轮询检查状态时间 |
298+
| master.server-load-protection.enabled | true | 是否开启系统保护策略 |
299+
| master.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.7 | master最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统CPU |
300+
| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.7 | master最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的JVM CPU |
301+
| master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | master最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统内存 |
302+
| master.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | master最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统磁盘空间 |
303+
| master.failover-interval | 10 | failover间隔,单位为分钟 |
304+
| master.kill-application-when-task-failover | true | 当任务实例failover时,是否kill掉yarn或k8s application |
305+
| master.registry-disconnect-strategy.strategy | stop | 当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting |
306+
| master.registry-disconnect-strategy.max-waiting-time | 100s | 当Master与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 |
307+
| master.master.worker-group-refresh-interval | 10s | 定期将workerGroup从数据库中同步到内存的时间间隔 |
308+
| master.command-fetch-strategy.type | ID_SLOT_BASED | Command拉取策略, 目前仅支持 `ID_SLOT_BASED` |
309+
| master.command-fetch-strategy.config.id-step | 1 | 数据库中t_ds_command的id自增步长 |
310+
| master.command-fetch-strategy.config.fetch-size | 10 | master拉取command数量 |
311311

312312
## Worker Server相关配置
313313

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,6 @@
2929
public class WorkerHeartBeat extends BaseHeartBeat implements HeartBeat {
3030

3131
private int workerHostWeight; // worker host weight
32-
private int threadPoolUsage; // worker waiting task count
32+
private double threadPoolUsage; // worker waiting task count
3333

3434
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.common.utils;
19+
20+
import java.util.HashSet;
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.Set;
24+
import java.util.stream.Collectors;
25+
26+
public class MapComparator<K, V> {
27+
28+
private final Map<K, V> oldMap;
29+
private final Map<K, V> newMap;
30+
31+
public MapComparator(Map<K, V> oldMap, Map<K, V> newMap) {
32+
this.oldMap = oldMap;
33+
this.newMap = newMap;
34+
}
35+
36+
public Set<K> getKeysToAdd() {
37+
Set<K> keysToAdd = new HashSet<>(newMap.keySet());
38+
keysToAdd.removeAll(oldMap.keySet());
39+
return keysToAdd;
40+
}
41+
42+
public List<V> getValuesToAdd() {
43+
return getKeysToAdd().stream().map(newMap::get).collect(Collectors.toList());
44+
}
45+
46+
public Set<K> getKeysToRemove() {
47+
Set<K> keysToRemove = new HashSet<>(oldMap.keySet());
48+
keysToRemove.removeAll(newMap.keySet());
49+
return keysToRemove;
50+
}
51+
52+
public List<V> getValuesToRemove() {
53+
return getKeysToRemove().stream().map(oldMap::get).collect(Collectors.toList());
54+
}
55+
56+
public Set<K> getKeysToUpdate() {
57+
Set<K> keysToUpdate = new HashSet<>(newMap.keySet());
58+
keysToUpdate.retainAll(oldMap.keySet());
59+
keysToUpdate.removeIf(key -> newMap.get(key).equals(oldMap.get(key)));
60+
61+
return keysToUpdate;
62+
}
63+
64+
public List<V> getNewValuesToUpdate() {
65+
return getKeysToUpdate().stream().map(newMap::get).collect(Collectors.toList());
66+
}
67+
}

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ public class AlertDao {
6565

6666
private static final Integer QUERY_ALERT_THRESHOLD = 100;
6767

68+
private static final int ADMIN_ALERT_GROUP_ID = 1;
69+
6870
@Value("${alert.alarm-suppression.crash:60}")
6971
private Integer crashAlarmSuppression;
7072

@@ -159,11 +161,10 @@ public int insertAlertSendStatus(List<AlertSendStatus> alertSendStatuses) {
159161
/**
160162
* MasterServer or WorkerServer stopped
161163
*
162-
* @param alertGroupId alertGroupId
163164
* @param host host
164165
* @param serverType serverType
165166
*/
166-
public void sendServerStoppedAlert(int alertGroupId, String host, String serverType) {
167+
public void sendServerStoppedAlert(String host, String serverType) {
167168
ServerAlertContent serverStopAlertContent = ServerAlertContent.newBuilder().type(serverType)
168169
.host(host)
169170
.event(AlertEvent.SERVER_DOWN)
@@ -175,7 +176,7 @@ public void sendServerStoppedAlert(int alertGroupId, String host, String serverT
175176
alert.setWarningType(WarningType.FAILURE);
176177
alert.setAlertStatus(AlertStatus.WAIT_EXECUTION);
177178
alert.setContent(content);
178-
alert.setAlertGroupId(alertGroupId);
179+
alert.setAlertGroupId(ADMIN_ALERT_GROUP_ID);
179180
alert.setCreateTime(new Date());
180181
alert.setUpdateTime(new Date());
181182
alert.setAlertType(AlertType.FAULT_TOLERANCE_WARNING);

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,21 @@
1919

2020
import java.util.Date;
2121

22+
import lombok.AllArgsConstructor;
23+
import lombok.Builder;
2224
import lombok.Data;
25+
import lombok.NoArgsConstructor;
2326

2427
import com.baomidou.mybatisplus.annotation.IdType;
2528
import com.baomidou.mybatisplus.annotation.TableField;
2629
import com.baomidou.mybatisplus.annotation.TableId;
2730
import com.baomidou.mybatisplus.annotation.TableName;
2831

29-
/**
30-
* worker group
31-
*/
3232
@TableName("t_ds_worker_group")
3333
@Data
34+
@Builder
35+
@NoArgsConstructor
36+
@AllArgsConstructor
3437
public class WorkerGroup {
3538

3639
@TableId(value = "id", type = IdType.AUTO)

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java renamed to dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkerGroupDao.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,9 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.dolphinscheduler.server.master.dispatch.enums;
18+
package org.apache.dolphinscheduler.dao.repository;
1919

20-
// todo: refactor this enum
21-
public enum ExecutorType {
20+
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
2221

23-
WORKER,
24-
25-
CLIENT,
26-
MASTER,
27-
;
22+
public interface WorkerGroupDao extends IDao<WorkerGroup> {
2823
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.dao.repository.impl;
19+
20+
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
21+
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
22+
import org.apache.dolphinscheduler.dao.repository.BaseDao;
23+
import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao;
24+
25+
import lombok.NonNull;
26+
27+
import org.springframework.stereotype.Repository;
28+
29+
@Repository
30+
public class WorkerGroupDaoImpl extends BaseDao<WorkerGroup, WorkerGroupMapper> implements WorkerGroupDao {
31+
32+
public WorkerGroupDaoImpl(@NonNull WorkerGroupMapper workerGroupMapper) {
33+
super(workerGroupMapper);
34+
}
35+
36+
}

0 commit comments

Comments
 (0)