Skip to content

Commit 5ddbf11

Browse files
authored
Merge f460e23 into 738da1c
2 parents 738da1c + f460e23 commit 5ddbf11

File tree

6 files changed

+38
-17
lines changed

6 files changed

+38
-17
lines changed

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,12 +219,14 @@ ProcessInstance queryLastRunningProcess(@Param("processDefinitionCode") Long def
219219
* query last manual process instance
220220
*
221221
* @param definitionCode definitionCode
222+
* @param taskCode taskCode
222223
* @param startTime startTime
223224
* @param endTime endTime
224225
* @param testFlag testFlag
225226
* @return process instance
226227
*/
227228
ProcessInstance queryLastManualProcess(@Param("processDefinitionCode") Long definitionCode,
229+
@Param("taskCode") Long taskCode,
228230
@Param("startTime") Date startTime,
229231
@Param("endTime") Date endTime,
230232
@Param("testFlag") int testFlag);

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,12 @@ public interface ProcessInstanceDao extends IDao<ProcessInstance> {
5151
* find last manual process instance interval
5252
*
5353
* @param definitionCode process definition code
54+
* @param taskCode taskCode
5455
* @param dateInterval dateInterval
5556
* @return process instance
5657
*/
57-
ProcessInstance queryLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag);
58+
ProcessInstance queryLastManualProcessInterval(Long definitionCode, Long taskCode, DateInterval dateInterval,
59+
int testFlag);
5860

5961
/**
6062
* query first schedule process instance

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,15 @@ public ProcessInstance queryLastSchedulerProcessInterval(Long definitionCode, Da
8282
* find last manual process instance interval
8383
*
8484
* @param definitionCode process definition code
85+
* @param taskCode taskCode
8586
* @param dateInterval dateInterval
8687
* @return process instance
8788
*/
8889
@Override
89-
public ProcessInstance queryLastManualProcessInterval(Long definitionCode, DateInterval dateInterval,
90+
public ProcessInstance queryLastManualProcessInterval(Long definitionCode, Long taskCode, DateInterval dateInterval,
9091
int testFlag) {
9192
return mybatisMapper.queryLastManualProcess(definitionCode,
93+
taskCode,
9294
dateInterval.getStartTime(),
9395
dateInterval.getEndTime(),
9496
testFlag);

dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -224,15 +224,25 @@
224224
order by start_time desc limit 1
225225
</select>
226226
<select id="queryLastManualProcess" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
227-
select
228-
<include refid="baseSql"/>
229-
from t_ds_process_instance
230-
where process_definition_code=#{processDefinitionCode} and test_flag=#{testFlag}
231-
and schedule_time is null
232-
<if test="startTime!=null and endTime != null ">
233-
and start_time <![CDATA[ >= ]]> #{startTime} and start_time <![CDATA[ <= ]]> #{endTime}
234-
</if>
235-
order by end_time desc limit 1
227+
select t1.*
228+
from
229+
(
230+
select
231+
<include refid="baseSql"/>
232+
from t_ds_process_instance
233+
where
234+
process_definition_code=#{processDefinitionCode} and test_flag=#{testFlag}
235+
and schedule_time is null
236+
<if test="startTime!=null and endTime != null ">
237+
and start_time <![CDATA[ >= ]]> #{startTime} and start_time <![CDATA[ <= ]]> #{endTime}
238+
</if>
239+
) as t1
240+
<if test="taskCode != null and taskCode!=0 and taskCode!=-1">
241+
inner join
242+
t_ds_task_instance as t2
243+
on t1.id = t2.process_instance_id and t2.task_code=#{taskCode}
244+
</if>
245+
order by t1.end_time desc limit 1
236246
</select>
237247

238248
<select id="queryFirstScheduleProcessInstance" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">

dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -301,13 +301,15 @@ public void testQueryLastManualProcess() {
301301
Date start = new Date(2019 - 1900, 1 - 1, 01, 0, 0, 0);
302302
Date end = new Date(2019 - 1900, 1 - 1, 01, 5, 0, 0);
303303
ProcessInstance processInstance1 =
304-
processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(), start, end,
304+
processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(), null, start,
305+
end,
305306
processInstance.getTestFlag());
306307
Assertions.assertEquals(processInstance1.getId(), processInstance.getId());
307308

308309
start = new Date(2019 - 1900, 1 - 1, 01, 1, 0, 0);
309310
processInstance1 =
310-
processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(), start, end,
311+
processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(), null, start,
312+
end,
311313
processInstance.getTestFlag());
312314
Assertions.assertNull(processInstance1);
313315

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,9 @@ private DependResult calculateResultForTasks(DependentItem dependentItem,
149149

150150
DependResult result = DependResult.FAILED;
151151
for (DateInterval dateInterval : dateIntervals) {
152-
ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionCode(),
153-
dateInterval, testFlag);
152+
ProcessInstance processInstance =
153+
findLastProcessInterval(dependentItem.getDefinitionCode(), dependentItem.getDepTaskCode(),
154+
dateInterval, testFlag);
154155
if (processInstance == null) {
155156
return DependResult.WAITING;
156157
}
@@ -311,16 +312,18 @@ private void addItemVarPool(String varPoolStr, Long endTime) {
311312
* 2. schedule run and schedule time between the interval
312313
*
313314
* @param definitionCode definition code
315+
* @param taskCode task code
314316
* @param dateInterval date interval
315317
* @return ProcessInstance
316318
*/
317-
private ProcessInstance findLastProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) {
319+
private ProcessInstance findLastProcessInterval(Long definitionCode, Long taskCode, DateInterval dateInterval,
320+
int testFlag) {
318321

319322
ProcessInstance lastSchedulerProcess =
320323
processInstanceDao.queryLastSchedulerProcessInterval(definitionCode, dateInterval, testFlag);
321324

322325
ProcessInstance lastManualProcess =
323-
processInstanceDao.queryLastManualProcessInterval(definitionCode, dateInterval, testFlag);
326+
processInstanceDao.queryLastManualProcessInterval(definitionCode, taskCode, dateInterval, testFlag);
324327

325328
if (lastManualProcess == null) {
326329
return lastSchedulerProcess;

0 commit comments

Comments
 (0)