Skip to content

Commit e83e6c0

Browse files
authored
Merge branch 'dev' into dev
2 parents 0363019 + 688f844 commit e83e6c0

File tree

13 files changed

+109
-37
lines changed

13 files changed

+109
-37
lines changed

docs/docs/en/guide/task/dependent.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ The Dependent node provides a logical judgment function, which can detect the ex
3030
Two dependency modes are supported, including workflow-dependent and task-dependent. The task-dependent mode is divided into two cases: depend on all tasks in the workflow and depend on a single task.
3131
The workflow-dependent mode checks the status of the dependent workflow; the all-task-dependent mode checks the status of all tasks in the workflow; and the single-task-dependent mode checks the status of the dependent task.
3232

33+
When the dependent result is success and the parameter passing option is true, the Dependent node will output the output parameters of the dependency to the downstream task. When the parameter names of multiple dependencies are the same, it involves the priority of the parameters. See also [Parameter Priority](../parameter/priority.md)
34+
3335
For example, process A is a weekly task, processes B and C are daily tasks, and task A requires tasks B and C to be successfully executed last week.
3436

3537
![dependent_task01](../../../../img/tasks/demo/dependent_task01.png)

docs/docs/zh/guide/task/dependent.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ Dependent 节点提供了逻辑判断功能,可以按照逻辑来检测所依
3131
依赖工作流的模式会检查所依赖的工作流的状态;依赖所有任务的模式会检查工作流中所有任务的状态;
3232
依赖单个任务的模式会检查所依赖的任务的状态。
3333

34+
当 Dependent 节点结果为 success 且参数传递选项为 true 时,Dependent 节点会将该依赖项的输出参数输出给下游任务。当多个依赖项的参数名称相同时涉及到参数的优先级问题,详见[参数优先级](../parameter/priority.md)
35+
3436
例如,A 流程为周报任务,B、C 流程为天任务,A 任务需要 B、C 任务在上周执行成功,如图示:
3537

3638
![dependent_task01](../../../../img/tasks/demo/dependent_task01.png)
14.1 KB
Loading
17.4 KB
Loading
6.91 KB
Loading

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dependent/DependentAsyncTaskExecuteFunction.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.dolphinscheduler.common.constants.Constants.DEPENDENT_SPLIT;
2121

2222
import org.apache.dolphinscheduler.common.constants.Constants;
23+
import org.apache.dolphinscheduler.common.utils.JSONUtils;
2324
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
2425
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
2526
import org.apache.dolphinscheduler.dao.entity.Project;
@@ -34,6 +35,7 @@
3435
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
3536
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
3637
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
38+
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
3739
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
3840
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
3941
import org.apache.dolphinscheduler.server.master.runner.execute.AsyncTaskExecuteFunction;
@@ -69,6 +71,7 @@ public class DependentAsyncTaskExecuteFunction implements AsyncTaskExecuteFuncti
6971
private final Date dependentDate;
7072
private final List<DependentExecute> dependentTaskList;
7173
private final Map<String, DependResult> dependResultMap;
74+
private final Map<String, Property> dependVarPoolPropertyMap;
7275

7376
public DependentAsyncTaskExecuteFunction(TaskExecutionContext taskExecutionContext,
7477
DependentParameters dependentParameters,
@@ -89,6 +92,7 @@ public DependentAsyncTaskExecuteFunction(TaskExecutionContext taskExecutionConte
8992
this.dependentTaskList = initializeDependentTaskList();
9093
log.info("Initialized dependent task list successfully");
9194
this.dependResultMap = new HashMap<>();
95+
this.dependVarPoolPropertyMap = new HashMap<>();
9296
}
9397

9498
@Override
@@ -97,8 +101,13 @@ public DependentAsyncTaskExecuteFunction(TaskExecutionContext taskExecutionConte
97101
log.info("All dependent task finished, will calculate the dependent result");
98102
DependResult dependResult = calculateDependResult();
99103
log.info("The Dependent result is: {}", dependResult);
100-
return dependResult == DependResult.SUCCESS ? AsyncTaskExecutionStatus.SUCCESS
101-
: AsyncTaskExecutionStatus.FAILED;
104+
if (dependResult == DependResult.SUCCESS) {
105+
dependentParameters.setVarPool(JSONUtils.toJsonString(dependVarPoolPropertyMap.values()));
106+
log.info("Set dependentParameters varPool: {}", dependentParameters.getVarPool());
107+
return AsyncTaskExecutionStatus.SUCCESS;
108+
} else {
109+
return AsyncTaskExecutionStatus.FAILED;
110+
}
102111
}
103112
return AsyncTaskExecutionStatus.RUNNING;
104113
}
@@ -189,9 +198,16 @@ private List<DependentExecute> initializeDependentTaskList() {
189198

190199
private DependResult calculateDependResult() {
191200
List<DependResult> dependResultList = new ArrayList<>();
201+
Map<String, Long> dependVarPoolEndTimeMap = new HashMap<>();
192202
for (DependentExecute dependentExecute : dependentTaskList) {
193203
DependResult dependResult =
194204
dependentExecute.getModelDependResult(dependentDate, processInstance.getTestFlag());
205+
if (dependResult == DependResult.SUCCESS) {
206+
Map<String, Property> varPoolPropertyMap = dependentExecute.getDependTaskVarPoolPropertyMap();
207+
Map<String, Long> varPoolEndTimeMap = dependentExecute.getDependTaskVarPoolEndTimeMap();
208+
DependentUtils.addTaskVarPool(varPoolPropertyMap, varPoolEndTimeMap, dependVarPoolPropertyMap,
209+
dependVarPoolEndTimeMap);
210+
}
195211
dependResultList.add(dependResult);
196212
}
197213
return DependentUtils.getDependResultForRelation(this.dependentParameters.getRelation(),

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java

Lines changed: 47 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.dolphinscheduler.common.constants.Constants;
2323
import org.apache.dolphinscheduler.common.enums.Flag;
2424
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
25+
import org.apache.dolphinscheduler.common.utils.JSONUtils;
2526
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
2627
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
2728
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@@ -33,9 +34,11 @@
3334
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
3435
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
3536
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
37+
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
3638
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
3739
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
3840
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
41+
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
3942
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
4043
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
4144
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -48,6 +51,7 @@
4851
import java.util.HashMap;
4952
import java.util.List;
5053
import java.util.Map;
54+
import java.util.function.Function;
5155
import java.util.stream.Collectors;
5256

5357
import lombok.extern.slf4j.Slf4j;
@@ -97,6 +101,14 @@ public class DependentExecute {
97101
*/
98102
private final TaskDefinitionDao taskDefinitionDao = SpringApplicationContext.getBean(TaskDefinitionDao.class);
99103

104+
private Map<String, Property> dependTaskVarPoolPropertyMap = new HashMap<>();
105+
106+
private Map<String, Long> dependTaskVarPoolEndTimeMap = new HashMap<>();
107+
108+
private Map<String, Property> dependItemVarPoolPropertyMap = new HashMap<>();
109+
110+
private Map<String, Long> dependItemVarPoolEndTimeMap = new HashMap<>();
111+
100112
/**
101113
* constructor
102114
*
@@ -168,6 +180,7 @@ private DependResult dependResultByProcessInstance(ProcessInstance processInstan
168180
return DependResult.WAITING;
169181
}
170182
if (processInstance.getState().isSuccess()) {
183+
addItemVarPool(processInstance.getVarPool(), processInstance.getEndTime().getTime());
171184
return DependResult.SUCCESS;
172185
}
173186
log.warn(
@@ -221,46 +234,12 @@ private DependResult dependResultByAllTaskOfProcessInstance(ProcessInstance proc
221234
}
222235
}
223236
}
237+
addItemVarPool(processInstance.getVarPool(), processInstance.getEndTime().getTime());
224238
return DependResult.SUCCESS;
225239
}
226240
return DependResult.FAILED;
227241
}
228242

229-
/**
230-
* get depend task result
231-
*
232-
* @param taskCode
233-
* @param processInstance
234-
* @return
235-
*/
236-
private DependResult getDependTaskResult(long taskCode, ProcessInstance processInstance, int testFlag) {
237-
DependResult result;
238-
TaskInstance taskInstance = null;
239-
List<TaskInstance> taskInstanceList =
240-
taskInstanceDao.queryValidTaskListByWorkflowInstanceId(processInstance.getId(), testFlag);
241-
242-
for (TaskInstance task : taskInstanceList) {
243-
if (task.getTaskCode() == taskCode) {
244-
taskInstance = task;
245-
break;
246-
}
247-
}
248-
249-
if (taskInstance == null) {
250-
// cannot find task in the process instance
251-
// maybe because process instance is running or failed.
252-
if (processInstance.getState().isFinished()) {
253-
result = DependResult.FAILED;
254-
} else {
255-
return DependResult.WAITING;
256-
}
257-
} else {
258-
result = getDependResultByState(taskInstance.getState());
259-
}
260-
261-
return result;
262-
}
263-
264243
/**
265244
* depend type = depend_task
266245
*
@@ -303,12 +282,31 @@ private DependResult dependResultBySingleTaskInstance(ProcessInstance processIns
303282
log.info(
304283
"The dependent task is a streaming task, so return depend success. Task code: {}, task name: {}.",
305284
taskInstance.getTaskCode(), taskInstance.getName());
285+
addItemVarPool(taskInstance.getVarPool(), taskInstance.getEndTime().getTime());
306286
return DependResult.SUCCESS;
307287
}
308288
return getDependResultByState(taskInstance.getState());
309289
}
310290
}
311291

292+
/**
293+
* add varPool to dependItemVarPoolMap
294+
*
295+
* @param varPoolStr
296+
* @param endTime
297+
*/
298+
private void addItemVarPool(String varPoolStr, Long endTime) {
299+
List<Property> varPool = new ArrayList<>(JSONUtils.toList(varPoolStr, Property.class));
300+
if (!varPool.isEmpty()) {
301+
Map<String, Property> varPoolPropertyMap = varPool.stream().filter(p -> p.getDirect().equals(Direct.OUT))
302+
.collect(Collectors.toMap(Property::getProp, Function.identity()));
303+
Map<String, Long> varPoolEndTimeMap = varPool.stream().filter(p -> p.getDirect().equals(Direct.OUT))
304+
.collect(Collectors.toMap(Property::getProp, d -> endTime));
305+
dependItemVarPoolPropertyMap.putAll(varPoolPropertyMap);
306+
dependItemVarPoolEndTimeMap.putAll(varPoolEndTimeMap);
307+
}
308+
}
309+
312310
/**
313311
* find the last one process instance that :
314312
* 1. manual run and finish between the interval
@@ -399,7 +397,13 @@ public DependResult getModelDependResult(Date currentTime, int testFlag) {
399397
DependResult dependResult = getDependResultForItem(dependentItem, currentTime, testFlag);
400398
if (dependResult != DependResult.WAITING && dependResult != DependResult.FAILED) {
401399
dependResultMap.put(dependentItem.getKey(), dependResult);
400+
if (dependentItem.getParameterPassing() && !dependItemVarPoolPropertyMap.isEmpty()) {
401+
DependentUtils.addTaskVarPool(dependItemVarPoolPropertyMap, dependItemVarPoolEndTimeMap,
402+
dependTaskVarPoolPropertyMap, dependTaskVarPoolEndTimeMap);
403+
}
402404
}
405+
dependItemVarPoolPropertyMap.clear();
406+
dependItemVarPoolEndTimeMap.clear();
403407
dependResultList.add(dependResult);
404408
}
405409
return DependentUtils.getDependResultForRelation(this.relation, dependResultList);
@@ -424,6 +428,14 @@ public Map<String, DependResult> getDependResultMap() {
424428
return dependResultMap;
425429
}
426430

431+
public Map<String, Property> getDependTaskVarPoolPropertyMap() {
432+
return dependTaskVarPoolPropertyMap;
433+
}
434+
435+
public Map<String, Long> getDependTaskVarPoolEndTimeMap() {
436+
return dependTaskVarPoolEndTimeMap;
437+
}
438+
427439
/**
428440
* check for self-dependent
429441
* @param dependentItem

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentItem.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class DependentItem {
3535
private String dateValue;
3636
private DependResult dependResult;
3737
private TaskExecutionStatus status;
38+
private Boolean parameterPassing;
3839

3940
public String getKey() {
4041
return String.format("%d-%d-%s-%s",

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/DependentUtils.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
2121
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
2222
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
23+
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
2324

2425
import java.util.ArrayList;
2526
import java.util.Date;
2627
import java.util.List;
28+
import java.util.Map;
2729

2830
public class DependentUtils {
2931

@@ -151,4 +153,30 @@ public static List<DateInterval> getDateIntervalList(Date businessDate, String d
151153
return result;
152154
}
153155

156+
/**
157+
* add varPool from dependItemVarPoolMap to dependTaskVarPoolMap
158+
*
159+
* @param dependItemVarPoolPropertyMap dependItemVarPoolPropertyMap
160+
* @param dependItemVarPoolEndTimeMap dependItemVarPoolEndTimeMap
161+
* @param dependTaskVarPoolPropertyMap dependTaskVarPoolPropertyMap
162+
* @param dependTaskVarPoolEndTimeMap dependTaskVarPoolEndTimeMap
163+
*/
164+
public static void addTaskVarPool(Map<String, Property> dependItemVarPoolPropertyMap,
165+
Map<String, Long> dependItemVarPoolEndTimeMap,
166+
Map<String, Property> dependTaskVarPoolPropertyMap,
167+
Map<String, Long> dependTaskVarPoolEndTimeMap) {
168+
dependItemVarPoolPropertyMap.forEach((prop, property) -> {
169+
Long itemEndTime = dependItemVarPoolEndTimeMap.get(prop);
170+
if (dependTaskVarPoolPropertyMap.containsKey(prop)) {
171+
if (itemEndTime < dependTaskVarPoolEndTimeMap.get(prop)) {
172+
dependTaskVarPoolPropertyMap.put(prop, property);
173+
dependTaskVarPoolEndTimeMap.put(prop, itemEndTime);
174+
}
175+
} else {
176+
dependTaskVarPoolPropertyMap.put(prop, property);
177+
dependTaskVarPoolEndTimeMap.put(prop, itemEndTime);
178+
}
179+
});
180+
}
181+
154182
}

dolphinscheduler-ui/src/locales/en_US/project.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,7 @@ export default {
644644
hour: 'hour',
645645
add_dependency: 'Add dependency',
646646
waiting_dependent_start: 'Waiting Dependent start',
647+
dependent_task_parameter_passing: 'Parameter Passing',
647648
check_interval: 'Check interval',
648649
check_interval_tips: 'Check interval must be a positive integer',
649650
waiting_dependent_complete: 'Waiting Dependent complete',

0 commit comments

Comments
 (0)