Skip to content

Commit c59b2d5

Browse files
remove sub workflow finish notify (#15057)
Co-authored-by: xiangzihao <[email protected]>
1 parent f0f12c9 commit c59b2d5

File tree

1 file changed

+0
-103
lines changed

1 file changed

+0
-103
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

Lines changed: 0 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,15 @@
1717

1818
package org.apache.dolphinscheduler.server.master.runner;
1919

20-
import org.apache.dolphinscheduler.common.enums.Flag;
21-
import org.apache.dolphinscheduler.common.enums.StateEventType;
22-
import org.apache.dolphinscheduler.common.utils.NetUtils;
23-
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
24-
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
25-
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
26-
import org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener;
27-
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent;
28-
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
2920
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
3021
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
3122
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
3223
import org.apache.dolphinscheduler.server.master.event.StateEvent;
33-
import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
34-
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
35-
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
36-
import org.apache.dolphinscheduler.service.process.ProcessService;
3724

38-
import java.util.Map;
3925
import java.util.concurrent.ConcurrentHashMap;
4026

4127
import javax.annotation.PostConstruct;
4228

43-
import lombok.NonNull;
4429
import lombok.extern.slf4j.Slf4j;
4530

4631
import org.springframework.beans.factory.annotation.Autowired;
@@ -49,8 +34,6 @@
4934
import org.springframework.util.concurrent.ListenableFuture;
5035
import org.springframework.util.concurrent.ListenableFutureCallback;
5136

52-
import com.google.common.base.Strings;
53-
5437
/**
5538
* Used to execute {@link WorkflowExecuteRunnable}.
5639
*/
@@ -61,9 +44,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
6144
@Autowired
6245
private MasterConfig masterConfig;
6346

64-
@Autowired
65-
private ProcessService processService;
66-
6747
@Autowired
6848
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
6949

@@ -122,8 +102,6 @@ public void onFailure(Throwable ex) {
122102
LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId);
123103
try {
124104
log.error("Workflow instance events handle failed", ex);
125-
notifyProcessChanged(
126-
workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance());
127105
multiThreadFilterMap.remove(workflowInstanceId);
128106
} finally {
129107
LogUtils.removeWorkflowInstanceIdMDC();
@@ -140,8 +118,6 @@ public void onSuccess(Object result) {
140118
.removeProcess4TimeoutCheck(workflowExecuteThread.getWorkflowExecuteContext()
141119
.getWorkflowInstance().getId());
142120
processInstanceExecCacheManager.removeByProcessInstanceId(workflowInstanceId);
143-
notifyProcessChanged(
144-
workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance());
145121
log.info("Workflow instance is finished.");
146122
}
147123
} catch (Exception e) {
@@ -155,83 +131,4 @@ public void onSuccess(Object result) {
155131
});
156132
}
157133

158-
/**
159-
* notify process change
160-
*/
161-
private void notifyProcessChanged(ProcessInstance finishProcessInstance) {
162-
if (Flag.NO == finishProcessInstance.getIsSubProcess()) {
163-
return;
164-
}
165-
Map<ProcessInstance, TaskInstance> fatherMaps = processService.notifyProcessList(finishProcessInstance.getId());
166-
for (Map.Entry<ProcessInstance, TaskInstance> entry : fatherMaps.entrySet()) {
167-
ProcessInstance processInstance = entry.getKey();
168-
TaskInstance taskInstance = entry.getValue();
169-
crossWorkflowParameterPassing(finishProcessInstance, taskInstance);
170-
String address = NetUtils.getAddr(masterConfig.getListenPort());
171-
try {
172-
LogUtils.setWorkflowAndTaskInstanceIDMDC(processInstance.getId(), taskInstance.getId());
173-
if (processInstance.getHost().equalsIgnoreCase(address)) {
174-
log.info("Process host is local master, will notify it");
175-
this.notifyMyself(processInstance, taskInstance);
176-
} else {
177-
log.info("Process host is remote master, will notify it");
178-
this.notifyProcess(finishProcessInstance, processInstance, taskInstance);
179-
}
180-
} finally {
181-
LogUtils.removeWorkflowAndTaskInstanceIdMDC();
182-
}
183-
}
184-
}
185-
186-
private void crossWorkflowParameterPassing(ProcessInstance finishProcessInstance, TaskInstance taskInstance) {
187-
try {
188-
MasterTaskExecuteRunnable masterTaskExecuteRunnable =
189-
MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskInstance.getId());
190-
masterTaskExecuteRunnable.getILogicTask().getTaskParameters()
191-
.setVarPool(finishProcessInstance.getVarPool());
192-
log.info("Cross workflow parameter passing success, finishProcessInstanceId: {}, taskInstanceId: {}",
193-
finishProcessInstance.getId(), taskInstance.getId());
194-
} catch (Exception ex) {
195-
log.error("Cross workflow parameter passing error, finishProcessInstanceId: {}, taskInstanceId: {}",
196-
finishProcessInstance.getId(), taskInstance.getId(), ex);
197-
}
198-
}
199-
200-
/**
201-
* notify myself
202-
*/
203-
private void notifyMyself(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
204-
if (!processInstanceExecCacheManager.contains(processInstance.getId())) {
205-
log.warn("The execute cache manager doesn't contains this workflow instance");
206-
return;
207-
}
208-
TaskStateEvent stateEvent = TaskStateEvent.builder()
209-
.processInstanceId(processInstance.getId())
210-
.taskInstanceId(taskInstance.getId())
211-
.type(StateEventType.TASK_STATE_CHANGE)
212-
.status(TaskExecutionStatus.RUNNING_EXECUTION)
213-
.build();
214-
this.submitStateEvent(stateEvent);
215-
}
216-
217-
/**
218-
* notify process's master
219-
*/
220-
private void notifyProcess(ProcessInstance finishProcessInstance, ProcessInstance processInstance,
221-
TaskInstance taskInstance) {
222-
String processInstanceHost = processInstance.getHost();
223-
if (Strings.isNullOrEmpty(processInstanceHost)) {
224-
log.error("Process {} host is empty, cannot notify task {} now, taskId: {}", processInstance.getName(),
225-
taskInstance.getName(), taskInstance.getId());
226-
return;
227-
}
228-
ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener =
229-
SingletonJdkDynamicRpcClientProxyFactory
230-
.getProxyClient(processInstanceHost, ITaskInstanceExecutionEventListener.class);
231-
232-
WorkflowInstanceStateChangeEvent workflowInstanceStateChangeEvent = new WorkflowInstanceStateChangeEvent(
233-
finishProcessInstance.getId(), 0, finishProcessInstance.getState(), processInstance.getId(),
234-
taskInstance.getId());
235-
iTaskInstanceExecutionEventListener.onWorkflowInstanceInstanceStateChange(workflowInstanceStateChangeEvent);
236-
}
237134
}

0 commit comments

Comments
 (0)