@@ -2334,14 +2334,17 @@ public boolean acquireTaskGroup(int taskInstanceId,
23342334 TaskGroup taskGroup = taskGroupMapper .selectById (taskGroupId );
23352335 if (taskGroup == null ) {
23362336 // we don't throw exception here, to avoid the task group has been deleted during workflow running
2337+ log .warn ("The taskGroup is not exist no need to acquire taskGroup, taskGroupId: {}" , taskGroupId );
23372338 return true ;
23382339 }
23392340 // if task group is not applicable
23402341 if (taskGroup .getStatus () == Flag .NO .getCode ()) {
2342+ log .warn ("The taskGroup status is {}, no need to acquire taskGroup, taskGroupId: {}" , taskGroup .getStatus (),
2343+ taskGroupId );
23412344 return true ;
23422345 }
23432346 // Create a waiting taskGroupQueue, after acquire resource, we can update the status to ACQUIRE_SUCCESS
2344- TaskGroupQueue taskGroupQueue = this . taskGroupQueueMapper .queryByTaskId (taskInstanceId );
2347+ TaskGroupQueue taskGroupQueue = taskGroupQueueMapper .queryByTaskId (taskInstanceId );
23452348 if (taskGroupQueue == null ) {
23462349 taskGroupQueue = insertIntoTaskGroupQueue (
23472350 taskInstanceId ,
@@ -2350,14 +2353,12 @@ public boolean acquireTaskGroup(int taskInstanceId,
23502353 workflowInstanceId ,
23512354 taskGroupPriority ,
23522355 TaskGroupQueueStatus .WAIT_QUEUE );
2356+ log .info ("Insert TaskGroupQueue: {} successfully" , taskGroupQueue .getId ());
23532357 } else {
23542358 log .info ("The task queue is already exist, taskId: {}" , taskInstanceId );
23552359 if (taskGroupQueue .getStatus () == TaskGroupQueueStatus .ACQUIRE_SUCCESS ) {
23562360 return true ;
23572361 }
2358- taskGroupQueue .setInQueue (Flag .NO .getCode ());
2359- taskGroupQueue .setStatus (TaskGroupQueueStatus .WAIT_QUEUE );
2360- this .taskGroupQueueMapper .updateById (taskGroupQueue );
23612362 }
23622363 // check if there already exist higher priority tasks
23632364 List <TaskGroupQueue > highPriorityTasks = taskGroupQueueMapper .queryHighPriorityTasks (
@@ -2368,14 +2369,15 @@ public boolean acquireTaskGroup(int taskInstanceId,
23682369 return false ;
23692370 }
23702371 // try to get taskGroup
2371- int count = taskGroupMapper .selectAvailableCountById (taskGroupId );
2372- if (count == 1 && robTaskGroupResource (taskGroupQueue )) {
2373- log .info ("Success acquire taskGroup, taskInstanceId: {}, taskGroupId: {}" , taskInstanceId , taskGroupId );
2374- return true ;
2372+ int availableTaskGroupCount = taskGroupMapper .selectAvailableCountById (taskGroupId );
2373+ if (availableTaskGroupCount < 1 ) {
2374+ log .info (
2375+ "Failed to acquire taskGroup, there is no avaliable taskGroup, taskInstanceId: {}, taskGroupId: {}" ,
2376+ taskInstanceId , taskGroupId );
2377+ taskGroupQueueMapper .updateInQueue (Flag .NO .getCode (), taskGroupQueue .getId ());
2378+ return false ;
23752379 }
2376- log .info ("Failed to acquire taskGroup, taskInstanceId: {}, taskGroupId: {}" , taskInstanceId , taskGroupId );
2377- this .taskGroupQueueMapper .updateInQueue (Flag .NO .getCode (), taskGroupQueue .getId ());
2378- return false ;
2380+ return robTaskGroupResource (taskGroupQueue );
23792381 }
23802382
23812383 /**
@@ -2387,10 +2389,13 @@ public boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue) {
23872389 for (int i = 0 ; i < 10 ; i ++) {
23882390 TaskGroup taskGroup = taskGroupMapper .selectById (taskGroupQueue .getGroupId ());
23892391 if (taskGroup .getGroupSize () <= taskGroup .getUseSize ()) {
2392+ // remove
2393+ taskGroupQueueMapper .updateInQueue (Flag .NO .getCode (), taskGroupQueue .getId ());
23902394 log .info ("The current task Group is full, taskGroup: {}" , taskGroup );
23912395 return false ;
23922396 }
2393- int affectedCount = taskGroupMapper .robTaskGroupResource (taskGroup .getId (),
2397+ int affectedCount = taskGroupMapper .robTaskGroupResource (
2398+ taskGroup .getId (),
23942399 taskGroup .getUseSize (),
23952400 taskGroupQueue .getId (),
23962401 TaskGroupQueueStatus .WAIT_QUEUE .getCode ());
@@ -2404,6 +2409,7 @@ public boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue) {
24042409 }
24052410 }
24062411 log .info ("Failed to rob taskGroup, taskGroupQueue: {}" , taskGroupQueue );
2412+ taskGroupQueueMapper .updateInQueue (Flag .NO .getCode (), taskGroupQueue .getId ());
24072413 return false ;
24082414 }
24092415
@@ -2431,10 +2437,11 @@ public TaskInstance releaseTaskGroup(TaskInstance taskInstance) {
24312437 do {
24322438 taskGroup = taskGroupMapper .selectById (taskInstance .getTaskGroupId ());
24332439 if (taskGroup == null ) {
2434- log .error ("The taskGroup is null, taskGroupId: {}" , taskInstance .getTaskGroupId ());
2440+ log .error ("The taskGroup is not exist no need to release taskGroup, taskGroupId: {}" ,
2441+ taskInstance .getTaskGroupId ());
24352442 return null ;
24362443 }
2437- thisTaskGroupQueue = this . taskGroupQueueMapper .queryByTaskId (taskInstance .getId ());
2444+ thisTaskGroupQueue = taskGroupQueueMapper .queryByTaskId (taskInstance .getId ());
24382445 if (thisTaskGroupQueue .getStatus () == TaskGroupQueueStatus .RELEASE ) {
24392446 log .info ("The taskGroupQueue's status is release, taskInstanceId: {}" , taskInstance .getId ());
24402447 return null ;
@@ -2458,20 +2465,22 @@ public TaskInstance releaseTaskGroup(TaskInstance taskInstance) {
24582465 changeTaskGroupQueueStatus (taskInstance .getId (), TaskGroupQueueStatus .RELEASE );
24592466 TaskGroupQueue taskGroupQueue ;
24602467 do {
2461- taskGroupQueue = this .taskGroupQueueMapper .queryTheHighestPriorityTasks (taskGroup .getId (),
2468+ taskGroupQueue = taskGroupQueueMapper .queryTheHighestPriorityTasks (
2469+ taskGroup .getId (),
24622470 TaskGroupQueueStatus .WAIT_QUEUE .getCode (),
24632471 Flag .NO .getCode (),
24642472 Flag .NO .getCode ());
24652473 if (taskGroupQueue == null ) {
2466- log .info ("The taskGroupQueue is null, taskGroup: {}" , taskGroup .getId ());
2474+ log .info ("There is no taskGroupQueue need to be wakeup taskGroup: {}" , taskGroup .getId ());
24672475 return null ;
24682476 }
2469- } while (this .taskGroupQueueMapper .updateInQueueCAS (Flag .NO .getCode (),
2477+ } while (this .taskGroupQueueMapper .updateInQueueCAS (
2478+ Flag .NO .getCode (),
24702479 Flag .YES .getCode (),
24712480 taskGroupQueue .getId ()) != 1 );
24722481 log .info ("Finished to release task group queue: taskGroupId: {}, taskGroupQueueId: {}" ,
24732482 taskInstance .getTaskGroupId (), taskGroupQueue .getId ());
2474- return this . taskInstanceMapper .selectById (taskGroupQueue .getTaskId ());
2483+ return taskInstanceMapper .selectById (taskGroupQueue .getTaskId ());
24752484 }
24762485
24772486 /**
@@ -2505,6 +2514,7 @@ public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskInstanceId,
25052514 .processId (workflowInstanceId )
25062515 .priority (taskGroupPriority )
25072516 .status (status )
2517+ .forceStart (Flag .NO .getCode ())
25082518 .inQueue (Flag .NO .getCode ())
25092519 .createTime (now )
25102520 .updateTime (now )
0 commit comments