1717
1818package org .apache .dolphinscheduler .server .master .runner ;
1919
20- import org .apache .dolphinscheduler .common .enums .Flag ;
21- import org .apache .dolphinscheduler .common .enums .StateEventType ;
22- import org .apache .dolphinscheduler .common .utils .NetUtils ;
23- import org .apache .dolphinscheduler .dao .entity .ProcessInstance ;
24- import org .apache .dolphinscheduler .dao .entity .TaskInstance ;
25- import org .apache .dolphinscheduler .extract .base .client .SingletonJdkDynamicRpcClientProxyFactory ;
26- import org .apache .dolphinscheduler .extract .master .ITaskInstanceExecutionEventListener ;
27- import org .apache .dolphinscheduler .extract .master .transportor .WorkflowInstanceStateChangeEvent ;
28- import org .apache .dolphinscheduler .plugin .task .api .enums .TaskExecutionStatus ;
2920import org .apache .dolphinscheduler .plugin .task .api .utils .LogUtils ;
3021import org .apache .dolphinscheduler .server .master .cache .ProcessInstanceExecCacheManager ;
3122import org .apache .dolphinscheduler .server .master .config .MasterConfig ;
3223import org .apache .dolphinscheduler .server .master .event .StateEvent ;
33- import org .apache .dolphinscheduler .server .master .event .TaskStateEvent ;
34- import org .apache .dolphinscheduler .server .master .runner .execute .MasterTaskExecuteRunnable ;
35- import org .apache .dolphinscheduler .server .master .runner .execute .MasterTaskExecuteRunnableHolder ;
36- import org .apache .dolphinscheduler .service .process .ProcessService ;
3724
38- import java .util .Map ;
3925import java .util .concurrent .ConcurrentHashMap ;
4026
4127import javax .annotation .PostConstruct ;
4228
43- import lombok .NonNull ;
4429import lombok .extern .slf4j .Slf4j ;
4530
4631import org .springframework .beans .factory .annotation .Autowired ;
4934import org .springframework .util .concurrent .ListenableFuture ;
5035import org .springframework .util .concurrent .ListenableFutureCallback ;
5136
52- import com .google .common .base .Strings ;
53-
5437/**
5538 * Used to execute {@link WorkflowExecuteRunnable}.
5639 */
@@ -61,9 +44,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
6144 @ Autowired
6245 private MasterConfig masterConfig ;
6346
64- @ Autowired
65- private ProcessService processService ;
66-
6747 @ Autowired
6848 private ProcessInstanceExecCacheManager processInstanceExecCacheManager ;
6949
@@ -122,8 +102,6 @@ public void onFailure(Throwable ex) {
122102 LogUtils .setWorkflowInstanceIdMDC (workflowInstanceId );
123103 try {
124104 log .error ("Workflow instance events handle failed" , ex );
125- notifyProcessChanged (
126- workflowExecuteThread .getWorkflowExecuteContext ().getWorkflowInstance ());
127105 multiThreadFilterMap .remove (workflowInstanceId );
128106 } finally {
129107 LogUtils .removeWorkflowInstanceIdMDC ();
@@ -140,8 +118,6 @@ public void onSuccess(Object result) {
140118 .removeProcess4TimeoutCheck (workflowExecuteThread .getWorkflowExecuteContext ()
141119 .getWorkflowInstance ().getId ());
142120 processInstanceExecCacheManager .removeByProcessInstanceId (workflowInstanceId );
143- notifyProcessChanged (
144- workflowExecuteThread .getWorkflowExecuteContext ().getWorkflowInstance ());
145121 log .info ("Workflow instance is finished." );
146122 }
147123 } catch (Exception e ) {
@@ -155,83 +131,4 @@ public void onSuccess(Object result) {
155131 });
156132 }
157133
158- /**
159- * notify process change
160- */
161- private void notifyProcessChanged (ProcessInstance finishProcessInstance ) {
162- if (Flag .NO == finishProcessInstance .getIsSubProcess ()) {
163- return ;
164- }
165- Map <ProcessInstance , TaskInstance > fatherMaps = processService .notifyProcessList (finishProcessInstance .getId ());
166- for (Map .Entry <ProcessInstance , TaskInstance > entry : fatherMaps .entrySet ()) {
167- ProcessInstance processInstance = entry .getKey ();
168- TaskInstance taskInstance = entry .getValue ();
169- crossWorkflowParameterPassing (finishProcessInstance , taskInstance );
170- String address = NetUtils .getAddr (masterConfig .getListenPort ());
171- try {
172- LogUtils .setWorkflowAndTaskInstanceIDMDC (processInstance .getId (), taskInstance .getId ());
173- if (processInstance .getHost ().equalsIgnoreCase (address )) {
174- log .info ("Process host is local master, will notify it" );
175- this .notifyMyself (processInstance , taskInstance );
176- } else {
177- log .info ("Process host is remote master, will notify it" );
178- this .notifyProcess (finishProcessInstance , processInstance , taskInstance );
179- }
180- } finally {
181- LogUtils .removeWorkflowAndTaskInstanceIdMDC ();
182- }
183- }
184- }
185-
186- private void crossWorkflowParameterPassing (ProcessInstance finishProcessInstance , TaskInstance taskInstance ) {
187- try {
188- MasterTaskExecuteRunnable masterTaskExecuteRunnable =
189- MasterTaskExecuteRunnableHolder .getMasterTaskExecuteRunnable (taskInstance .getId ());
190- masterTaskExecuteRunnable .getILogicTask ().getTaskParameters ()
191- .setVarPool (finishProcessInstance .getVarPool ());
192- log .info ("Cross workflow parameter passing success, finishProcessInstanceId: {}, taskInstanceId: {}" ,
193- finishProcessInstance .getId (), taskInstance .getId ());
194- } catch (Exception ex ) {
195- log .error ("Cross workflow parameter passing error, finishProcessInstanceId: {}, taskInstanceId: {}" ,
196- finishProcessInstance .getId (), taskInstance .getId (), ex );
197- }
198- }
199-
200- /**
201- * notify myself
202- */
203- private void notifyMyself (@ NonNull ProcessInstance processInstance , @ NonNull TaskInstance taskInstance ) {
204- if (!processInstanceExecCacheManager .contains (processInstance .getId ())) {
205- log .warn ("The execute cache manager doesn't contains this workflow instance" );
206- return ;
207- }
208- TaskStateEvent stateEvent = TaskStateEvent .builder ()
209- .processInstanceId (processInstance .getId ())
210- .taskInstanceId (taskInstance .getId ())
211- .type (StateEventType .TASK_STATE_CHANGE )
212- .status (TaskExecutionStatus .RUNNING_EXECUTION )
213- .build ();
214- this .submitStateEvent (stateEvent );
215- }
216-
217- /**
218- * notify process's master
219- */
220- private void notifyProcess (ProcessInstance finishProcessInstance , ProcessInstance processInstance ,
221- TaskInstance taskInstance ) {
222- String processInstanceHost = processInstance .getHost ();
223- if (Strings .isNullOrEmpty (processInstanceHost )) {
224- log .error ("Process {} host is empty, cannot notify task {} now, taskId: {}" , processInstance .getName (),
225- taskInstance .getName (), taskInstance .getId ());
226- return ;
227- }
228- ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener =
229- SingletonJdkDynamicRpcClientProxyFactory
230- .getProxyClient (processInstanceHost , ITaskInstanceExecutionEventListener .class );
231-
232- WorkflowInstanceStateChangeEvent workflowInstanceStateChangeEvent = new WorkflowInstanceStateChangeEvent (
233- finishProcessInstance .getId (), 0 , finishProcessInstance .getState (), processInstance .getId (),
234- taskInstance .getId ());
235- iTaskInstanceExecutionEventListener .onWorkflowInstanceInstanceStateChange (workflowInstanceStateChangeEvent );
236- }
237134}
0 commit comments