Skip to content

Commit 9d81eda

Browse files
committed
call resubmitTask() when re-schedule task
Change-Id: I2efa9fd4979492c6901e1ae11fbaf2be451f0354
1 parent 5cd5bbf commit 9d81eda

File tree

2 files changed

+22
-2
lines changed

2 files changed

+22
-2
lines changed

hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -382,11 +382,12 @@ protected boolean checkDependenciesSuccess() {
382382
if (this.dependencies == null || this.dependencies.isEmpty()) {
383383
return true;
384384
}
385+
TaskScheduler scheduler = this.scheduler();
385386
for (Id dependency : this.dependencies) {
386-
HugeTask<?> task = this.scheduler().task(dependency);
387+
HugeTask<?> task = scheduler.task(dependency);
387388
if (!task.completed()) {
388389
// Dependent task not completed, re-schedule self
389-
this.scheduler().schedule(this);
390+
scheduler.schedule(this);
390391
return false;
391392
} else if (task.status() == TaskStatus.CANCELLED) {
392393
this.result(TaskStatus.CANCELLED, String.format(

hugegraph-core/src/main/java/com/baidu/hugegraph/task/StandardTaskScheduler.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,15 @@ private <V> Future<?> restore(HugeTask<V> task) {
205205
public <V> Future<?> schedule(HugeTask<V> task) {
206206
E.checkArgumentNotNull(task, "Task can't be null");
207207

208+
if (task.status() == TaskStatus.QUEUED) {
209+
/*
210+
* Just submit to queue if status=QUEUED (means re-schedule task)
211+
* NOTE: schedule() method may be called multi times by
212+
* HugeTask.checkDependenciesSuccess() method
213+
*/
214+
return this.resubmitTask(task);
215+
}
216+
208217
if (task.callable() instanceof EphemeralJob) {
209218
/*
210219
* Due to EphemeralJob won't be serialized and deserialized through
@@ -252,6 +261,16 @@ private <V> Future<?> submitTask(HugeTask<V> task) {
252261
return this.taskExecutor.submit(task);
253262
}
254263

264+
private <V> Future<?> resubmitTask(HugeTask<V> task) {
265+
E.checkArgument(task.status() == TaskStatus.QUEUED,
266+
"Can't resubmit task '%s' with status %s",
267+
task.id(), TaskStatus.QUEUED);
268+
E.checkArgument(this.tasks.containsKey(task.id()),
269+
"Can't resubmit task '%s' not been submitted before",
270+
task.id());
271+
return this.taskExecutor.submit(task);
272+
}
273+
255274
public <V> void initTaskCallable(HugeTask<V> task) {
256275
task.scheduler(this);
257276

0 commit comments

Comments
 (0)