Skip to content

Commit 01eb8f8

Browse files
authored
fix: start param for wf not work (#15544)
* fix: start param for wf not work fix: #15280 * fix test
1 parent e5a208f commit 01eb8f8

File tree

6 files changed

+99
-57
lines changed

6 files changed

+99
-57
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

Lines changed: 1 addition & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_END_DATE;
2121
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
2222
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_START_DATE;
23-
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_FATHER_PARAMS;
2423
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING;
2524
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
2625
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_NODES;
@@ -29,8 +28,6 @@
2928
import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP;
3029
import static org.apache.dolphinscheduler.common.constants.DateConstants.YYYY_MM_DD_HH_MM_SS;
3130
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
32-
import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
33-
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
3431

3532
import org.apache.dolphinscheduler.common.constants.Constants;
3633
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -860,7 +857,7 @@ private void initTaskQueue() throws StateEventHandleException, CronParseExceptio
860857
Map<String, String> cmdParam = JSONUtils.toMap(workflowInstance.getCommandParam());
861858
if (cmdParam != null) {
862859
// reset global params while there are start parameters
863-
setGlobalParamIfCommanded(workflowDefinition, cmdParam);
860+
processService.setGlobalParamIfCommanded(workflowDefinition, cmdParam);
864861

865862
Date start = null;
866863
Date end = null;
@@ -2057,40 +2054,6 @@ public Map<Long, TaskInstance> getWaitToRetryTaskInstanceMap() {
20572054
return waitToRetryTaskInstanceMap;
20582055
}
20592056

2060-
private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
2061-
// get start params from command param
2062-
Map<String, String> startParamMap = new HashMap<>();
2063-
if (cmdParam.containsKey(CMD_PARAM_START_PARAMS)) {
2064-
String startParamJson = cmdParam.get(CMD_PARAM_START_PARAMS);
2065-
startParamMap = JSONUtils.toMap(startParamJson);
2066-
}
2067-
Map<String, String> fatherParamMap = new HashMap<>();
2068-
if (cmdParam.containsKey(CMD_PARAM_FATHER_PARAMS)) {
2069-
String fatherParamJson = cmdParam.get(CMD_PARAM_FATHER_PARAMS);
2070-
fatherParamMap = JSONUtils.toMap(fatherParamJson);
2071-
}
2072-
startParamMap.putAll(fatherParamMap);
2073-
// set start param into global params
2074-
Map<String, String> globalMap = processDefinition.getGlobalParamMap();
2075-
List<Property> globalParamList = processDefinition.getGlobalParamList();
2076-
if (startParamMap.size() > 0 && globalMap != null) {
2077-
// start param to overwrite global param
2078-
for (Map.Entry<String, String> param : globalMap.entrySet()) {
2079-
String val = startParamMap.get(param.getKey());
2080-
if (val != null) {
2081-
param.setValue(val);
2082-
}
2083-
}
2084-
// start param to create new global param if global not exist
2085-
for (Map.Entry<String, String> startParam : startParamMap.entrySet()) {
2086-
if (!globalMap.containsKey(startParam.getKey())) {
2087-
globalMap.put(startParam.getKey(), startParam.getValue());
2088-
globalParamList.add(new Property(startParam.getKey(), IN, VARCHAR, startParam.getValue()));
2089-
}
2090-
}
2091-
}
2092-
}
2093-
20942057
/**
20952058
* clear related data if command of process instance is EXECUTE_TASK
20962059
* 1. find all task code from sub dag (only contains related task)

dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsService.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import java.util.List;
2828
import java.util.Map;
2929

30+
import javax.annotation.Nullable;
31+
3032
import lombok.NonNull;
3133

3234
public interface CuringParamsService {
@@ -80,6 +82,16 @@ Map<String, Property> paramParsingPreparation(@NonNull TaskInstance taskInstance
8082
@NonNull AbstractParameters parameters,
8183
@NonNull ProcessInstance processInstance);
8284

85+
/**
86+
* Parse workflow star parameter
87+
*/
88+
Map<String, Property> parseWorkflowStartParam(@Nullable Map<String, String> cmdParam);
89+
90+
/**
91+
* Parse workflow father parameter
92+
*/
93+
Map<String, Property> parseWorkflowFatherParam(@Nullable Map<String, String> cmdParam);
94+
8395
/**
8496
* preBuildBusinessParams
8597
* @param processInstance

dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_WORKFLOW_DEFINITION_NAME;
2929
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_WORKFLOW_INSTANCE_ID;
3030

31+
import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
3132
import org.apache.dolphinscheduler.common.constants.Constants;
3233
import org.apache.dolphinscheduler.common.constants.DateConstants;
3334
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -55,6 +56,8 @@
5556
import java.util.Set;
5657
import java.util.stream.Collectors;
5758

59+
import javax.annotation.Nullable;
60+
5861
import lombok.NonNull;
5962

6063
import org.springframework.beans.factory.annotation.Autowired;
@@ -141,6 +144,28 @@ public String curingGlobalParams(Integer processInstanceId, Map<String, String>
141144
return JSONUtils.toJsonString(globalParamList);
142145
}
143146

147+
@Override
148+
public Map<String, Property> parseWorkflowStartParam(@Nullable Map<String, String> cmdParam) {
149+
if (cmdParam == null || !cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_START_PARAMS)) {
150+
return new HashMap<>();
151+
}
152+
String startParamJson = cmdParam.get(CommandKeyConstants.CMD_PARAM_START_PARAMS);
153+
Map<String, String> startParamMap = JSONUtils.toMap(startParamJson);
154+
return startParamMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
155+
entry -> new Property(entry.getKey(), Direct.IN, DataType.VARCHAR, entry.getValue())));
156+
}
157+
158+
@Override
159+
public Map<String, Property> parseWorkflowFatherParam(@Nullable Map<String, String> cmdParam) {
160+
if (cmdParam == null || !cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_FATHER_PARAMS)) {
161+
return new HashMap<>();
162+
}
163+
String startParamJson = cmdParam.get(CommandKeyConstants.CMD_PARAM_FATHER_PARAMS);
164+
Map<String, String> startParamMap = JSONUtils.toMap(startParamJson);
165+
return startParamMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
166+
entry -> new Property(entry.getKey(), Direct.IN, DataType.VARCHAR, entry.getValue())));
167+
}
168+
144169
/**
145170
* the global parameters and local parameters used in the worker will be prepared here, and built-in parameters.
146171
*
@@ -199,7 +224,7 @@ public Map<String, Property> paramParsingPreparation(@NonNull TaskInstance taskI
199224
}
200225

201226
if (MapUtils.isNotEmpty(cmdParam)) {
202-
prepareParamsMap.putAll(ParameterUtils.getUserDefParamsMap(cmdParam));
227+
prepareParamsMap.putAll(parseWorkflowStartParam(cmdParam));
203228
}
204229

205230
Iterator<Map.Entry<String, Property>> iter = prepareParamsMap.entrySet().iterator();

dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.dolphinscheduler.service.model.TaskNode;
5151

5252
import java.util.List;
53+
import java.util.Map;
5354
import java.util.Optional;
5455

5556
import javax.annotation.Nullable;
@@ -194,4 +195,6 @@ TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,
194195
void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId);
195196

196197
void saveCommandTrigger(Integer commandId, Integer processInstanceId);
198+
199+
void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam);
197200
}

dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
2929
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
3030
import static org.apache.dolphinscheduler.common.constants.Constants.LOCAL_PARAMS;
31-
import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
32-
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
3331
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
3432

3533
import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
@@ -587,35 +585,32 @@ private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefi
587585
return processInstance;
588586
}
589587

590-
private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
588+
@Override
589+
public void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
590+
591591
// get start params from command param
592-
Map<String, String> startParamMap = new HashMap<>();
593-
if (cmdParam != null && cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_START_PARAMS)) {
594-
String startParamJson = cmdParam.get(CommandKeyConstants.CMD_PARAM_START_PARAMS);
595-
startParamMap = JSONUtils.toMap(startParamJson);
596-
}
597-
Map<String, String> fatherParamMap = new HashMap<>();
598-
if (cmdParam != null && cmdParam.containsKey(CommandKeyConstants.CMD_PARAM_FATHER_PARAMS)) {
599-
String fatherParamJson = cmdParam.get(CommandKeyConstants.CMD_PARAM_FATHER_PARAMS);
600-
fatherParamMap = JSONUtils.toMap(fatherParamJson);
601-
}
602-
startParamMap.putAll(fatherParamMap);
592+
Map<String, Property> fatherParam = curingGlobalParamsService.parseWorkflowFatherParam(cmdParam);
593+
Map<String, Property> startParamMap = new HashMap<>(fatherParam);
594+
595+
Map<String, Property> currentStartParamMap = curingGlobalParamsService.parseWorkflowStartParam(cmdParam);
596+
startParamMap.putAll(currentStartParamMap);
597+
603598
// set start param into global params
604599
Map<String, String> globalMap = processDefinition.getGlobalParamMap();
605600
List<Property> globalParamList = processDefinition.getGlobalParamList();
606601
if (MapUtils.isNotEmpty(startParamMap) && globalMap != null) {
607602
// start param to overwrite global param
608603
for (Map.Entry<String, String> param : globalMap.entrySet()) {
609-
String val = startParamMap.get(param.getKey());
604+
String val = startParamMap.get(param.getKey()).getValue();
610605
if (val != null) {
611606
param.setValue(val);
612607
}
613608
}
614609
// start param to create new global param if global not exist
615-
for (Entry<String, String> startParam : startParamMap.entrySet()) {
610+
for (Entry<String, Property> startParam : startParamMap.entrySet()) {
616611
if (!globalMap.containsKey(startParam.getKey())) {
617-
globalMap.put(startParam.getKey(), startParam.getValue());
618-
globalParamList.add(new Property(startParam.getKey(), IN, VARCHAR, startParam.getValue()));
612+
globalMap.put(startParam.getKey(), startParam.getValue().getValue());
613+
globalParamList.add(startParam.getValue());
619614
}
620615
}
621616
}

dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
3434
import org.apache.dolphinscheduler.plugin.task.api.parameters.SubProcessParameters;
3535

36+
import org.apache.commons.collections4.MapUtils;
37+
3638
import java.util.ArrayList;
3739
import java.util.Collections;
3840
import java.util.Date;
@@ -234,4 +236,46 @@ public void testParamParsingPreparation() {
234236
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_WORKFLOW_DEFINITION_CODE).getValue(),
235237
String.valueOf(processDefinition.getCode()));
236238
}
239+
240+
@Test
241+
public void testParseWorkflowStartParam() {
242+
Map<String, Property> result = new HashMap<>();
243+
// empty cmd param
244+
Map<String, String> startParamMap = new HashMap<>();
245+
result = dolphinSchedulerCuringGlobalParams.parseWorkflowStartParam(startParamMap);
246+
Assertions.assertTrue(MapUtils.isEmpty(result));
247+
248+
// without key
249+
startParamMap.put("testStartParam", "$[yyyyMMdd]");
250+
result = dolphinSchedulerCuringGlobalParams.parseWorkflowStartParam(startParamMap);
251+
Assertions.assertTrue(MapUtils.isEmpty(result));
252+
253+
startParamMap.put("StartParams", "{\"param1\":\"11111\", \"param2\":\"22222\"}");
254+
result = dolphinSchedulerCuringGlobalParams.parseWorkflowStartParam(startParamMap);
255+
Assertions.assertTrue(MapUtils.isNotEmpty(result));
256+
Assertions.assertEquals(2, result.keySet().size());
257+
Assertions.assertEquals("11111", result.get("param1").getValue());
258+
Assertions.assertEquals("22222", result.get("param2").getValue());
259+
}
260+
261+
@Test
262+
public void testParseWorkflowFatherParam() {
263+
Map<String, Property> result = new HashMap<>();
264+
// empty cmd param
265+
Map<String, String> startParamMap = new HashMap<>();
266+
result = dolphinSchedulerCuringGlobalParams.parseWorkflowFatherParam(startParamMap);
267+
Assertions.assertTrue(MapUtils.isEmpty(result));
268+
269+
// without key
270+
startParamMap.put("testfatherParams", "$[yyyyMMdd]");
271+
result = dolphinSchedulerCuringGlobalParams.parseWorkflowFatherParam(startParamMap);
272+
Assertions.assertTrue(MapUtils.isEmpty(result));
273+
274+
startParamMap.put("fatherParams", "{\"param1\":\"11111\", \"param2\":\"22222\"}");
275+
result = dolphinSchedulerCuringGlobalParams.parseWorkflowFatherParam(startParamMap);
276+
Assertions.assertTrue(MapUtils.isNotEmpty(result));
277+
Assertions.assertEquals(2, result.keySet().size());
278+
Assertions.assertEquals("11111", result.get("param1").getValue());
279+
Assertions.assertEquals("22222", result.get("param2").getValue());
280+
}
237281
}

0 commit comments

Comments
 (0)