File tree Expand file tree Collapse file tree 2 files changed +22
-2
lines changed
hugegraph-core/src/main/java/com/baidu/hugegraph/task Expand file tree Collapse file tree 2 files changed +22
-2
lines changed Original file line number Diff line number Diff line change @@ -382,11 +382,12 @@ protected boolean checkDependenciesSuccess() {
382382 if (this .dependencies == null || this .dependencies .isEmpty ()) {
383383 return true ;
384384 }
385+ TaskScheduler scheduler = this .scheduler ();
385386 for (Id dependency : this .dependencies ) {
386- HugeTask <?> task = this . scheduler () .task (dependency );
387+ HugeTask <?> task = scheduler .task (dependency );
387388 if (!task .completed ()) {
388389 // Dependent task not completed, re-schedule self
389- this . scheduler () .schedule (this );
390+ scheduler .schedule (this );
390391 return false ;
391392 } else if (task .status () == TaskStatus .CANCELLED ) {
392393 this .result (TaskStatus .CANCELLED , String .format (
Original file line number Diff line number Diff line change @@ -205,6 +205,15 @@ private <V> Future<?> restore(HugeTask<V> task) {
205205 public <V > Future <?> schedule (HugeTask <V > task ) {
206206 E .checkArgumentNotNull (task , "Task can't be null" );
207207
208+ if (task .status () == TaskStatus .QUEUED ) {
209+ /*
210+ * Just submit to queue if status=QUEUED (means re-schedule task)
211+ * NOTE: schedule() method may be called multi times by
212+ * HugeTask.checkDependenciesSuccess() method
213+ */
214+ return this .resubmitTask (task );
215+ }
216+
208217 if (task .callable () instanceof EphemeralJob ) {
209218 /*
210219 * Due to EphemeralJob won't be serialized and deserialized through
@@ -252,6 +261,16 @@ private <V> Future<?> submitTask(HugeTask<V> task) {
252261 return this .taskExecutor .submit (task );
253262 }
254263
264+ private <V > Future <?> resubmitTask (HugeTask <V > task ) {
265+ E .checkArgument (task .status () == TaskStatus .QUEUED ,
266+ "Can't resubmit task '%s' with status %s" ,
267+ task .id (), TaskStatus .QUEUED );
268+ E .checkArgument (this .tasks .containsKey (task .id ()),
269+ "Can't resubmit task '%s' not been submitted before" ,
270+ task .id ());
271+ return this .taskExecutor .submit (task );
272+ }
273+
255274 public <V > void initTaskCallable (HugeTask <V > task ) {
256275 task .scheduler (this );
257276
You can’t perform that action at this time.
0 commit comments