Skip to content

Commit 5e7fe2e

Browse files
abzymeinsjtuabzymeinsjtu
authored andcommitted
fix dependent task logic
1 parent 0419543 commit 5e7fe2e

File tree

6 files changed

+25
-11
lines changed

6 files changed

+25
-11
lines changed

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,13 +188,15 @@ List<ProcessInstance> queryByProcessDefineCode(@Param("processDefinitionCode") L
188188
/**
189189
* query last scheduler process instance
190190
*
191-
* @param definitionCode definitionCode
191+
* @param processDefinitionCode definitionCode
192+
* @param taskDefinitionCode definitionCode
192193
* @param startTime startTime
193194
* @param endTime endTime
194195
* @param testFlag testFlag
195196
* @return process instance
196197
*/
197-
ProcessInstance queryLastSchedulerProcess(@Param("processDefinitionCode") Long definitionCode,
198+
ProcessInstance queryLastSchedulerProcess(@Param("processDefinitionCode") Long processDefinitionCode,
199+
@Param("taskDefinitionCode") Long taskDefinitionCode,
198200
@Param("startTime") Date startTime,
199201
@Param("endTime") Date endTime,
200202
@Param("testFlag") int testFlag);

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,13 @@ public interface ProcessInstanceDao extends IDao<ProcessInstance> {
4141
/**
4242
* find last scheduler process instance in the date interval
4343
*
44-
* @param definitionCode definitionCode
44+
* @param processDefinitionCode definitionCode
45+
* @param taskDefinitionCode definitionCode
4546
* @param dateInterval dateInterval
4647
* @return process instance
4748
*/
48-
ProcessInstance queryLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag);
49+
ProcessInstance queryLastSchedulerProcessInterval(Long processDefinitionCode, Long taskDefinitionCode,
50+
DateInterval dateInterval, int testFlag);
4951

5052
/**
5153
* find last manual process instance interval

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,18 @@ public void performTransactionalUpsert(ProcessInstance processInstance) {
6767
/**
6868
* find last scheduler process instance in the date interval
6969
*
70-
* @param definitionCode definitionCode
70+
* @param processDefinitionCode definitionCode
71+
* @param taskDefinitionCode definitionCode
7172
* @param dateInterval dateInterval
7273
* @return process instance
7374
*/
7475
@Override
75-
public ProcessInstance queryLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval,
76+
public ProcessInstance queryLastSchedulerProcessInterval(Long processDefinitionCode, Long taskDefinitionCode,
77+
DateInterval dateInterval,
7678
int testFlag) {
77-
return mybatisMapper.queryLastSchedulerProcess(definitionCode,
79+
return mybatisMapper.queryLastSchedulerProcess(
80+
processDefinitionCode,
81+
taskDefinitionCode,
7882
dateInterval.getStartTime(),
7983
dateInterval.getEndTime(),
8084
testFlag);

dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,12 +197,17 @@
197197
order by start_time desc limit #{size}
198198
</select>
199199
<select id="queryLastSchedulerProcess" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
200-
select
200+
select t1.* from (select
201201
<include refid="baseSql"/>
202202
from t_ds_process_instance
203203
where process_definition_code=#{processDefinitionCode} and test_flag=#{testFlag}
204-
<if test="startTime!=null and endTime != null ">
204+
<if test="startTime != null and endTime != null ">
205205
and schedule_time <![CDATA[ >= ]]> #{startTime} and schedule_time <![CDATA[ <= ]]> #{endTime}
206+
</if>) as t1
207+
<if test="taskDefinitionCode != null and taskDefinitionCode != 0 and taskDefinitionCode != -1">
208+
inner join
209+
t_ds_task_instance as t2
210+
on t1.id = t2.process_instance_id and t2.task_code=#{taskDefinitionCode}
206211
</if>
207212
order by end_time desc limit 1
208213
</select>

dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,8 @@ public void testQueryLastSchedulerProcess() {
263263
processInstanceMapper.updateById(processInstance);
264264

265265
ProcessInstance processInstance1 =
266-
processInstanceMapper.queryLastSchedulerProcess(processInstance.getProcessDefinitionCode(), null, null,
266+
processInstanceMapper.queryLastSchedulerProcess(processInstance.getProcessDefinitionCode(), 0L, null,
267+
null,
267268
processInstance.getTestFlag());
268269
Assertions.assertNotEquals(null, processInstance1);
269270
processInstanceMapper.deleteById(processInstance.getId());

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ private ProcessInstance findLastProcessInterval(Long definitionCode, Long taskCo
320320
int testFlag) {
321321

322322
ProcessInstance lastSchedulerProcess =
323-
processInstanceDao.queryLastSchedulerProcessInterval(definitionCode, dateInterval, testFlag);
323+
processInstanceDao.queryLastSchedulerProcessInterval(definitionCode, taskCode, dateInterval, testFlag);
324324

325325
ProcessInstance lastManualProcess =
326326
processInstanceDao.queryLastManualProcessInterval(definitionCode, taskCode, dateInterval, testFlag);

0 commit comments

Comments
 (0)