3737import org .apache .hugegraph .util .Log ;
3838import org .slf4j .Logger ;
3939
40+ /**
41+ * Central task management system that coordinates task scheduling and execution.
42+ * Manages task schedulers for different graphs and handles role-based execution.
43+ * <p>
44+ * Note: The local master-worker mechanism will be deprecated in version 1.7
45+ * (configuration has been removed from config files).
46+ */
4047public final class TaskManager {
4148
4249 private static final Logger LOG = Log .logger (TaskManager .class );
4350
4451 public static final String TASK_WORKER_PREFIX = "task-worker" ;
4552 public static final String TASK_WORKER = TASK_WORKER_PREFIX + "-%d" ;
4653 public static final String TASK_DB_WORKER = "task-db-worker-%d" ;
47- public static final String SERVER_INFO_DB_WORKER =
48- "server-info-db-worker-%d" ;
54+ public static final String SERVER_INFO_DB_WORKER = "server-info-db-worker-%d" ;
4955 public static final String TASK_SCHEDULER = "task-scheduler-%d" ;
5056
5157 public static final String OLAP_TASK_WORKER = "olap-task-worker-%d" ;
5258 public static final String SCHEMA_TASK_WORKER = "schema-task-worker-%d" ;
5359 public static final String EPHEMERAL_TASK_WORKER = "ephemeral-task-worker-%d" ;
5460 public static final String DISTRIBUTED_TASK_SCHEDULER = "distributed-scheduler-%d" ;
5561
56- protected static final long SCHEDULE_PERIOD = 1000L ; // unit ms
62+ static final long SCHEDULE_PERIOD = 1000L ; // unit ms
5763 private static final long TX_CLOSE_TIMEOUT = 30L ; // unit s
5864 private static final int THREADS = 4 ;
5965 private static final TaskManager MANAGER = new TaskManager (THREADS );
@@ -87,17 +93,13 @@ private TaskManager(int pool) {
8793 this .serverInfoDbExecutor = ExecutorUtil .newFixedThreadPool (
8894 1 , SERVER_INFO_DB_WORKER );
8995
90- this .schemaTaskExecutor = ExecutorUtil .newFixedThreadPool (pool ,
91- SCHEMA_TASK_WORKER );
92- this .olapTaskExecutor = ExecutorUtil .newFixedThreadPool (pool ,
93- OLAP_TASK_WORKER );
94- this .ephemeralTaskExecutor = ExecutorUtil .newFixedThreadPool (pool ,
95- EPHEMERAL_TASK_WORKER );
96+ this .schemaTaskExecutor = ExecutorUtil .newFixedThreadPool (pool , SCHEMA_TASK_WORKER );
97+ this .olapTaskExecutor = ExecutorUtil .newFixedThreadPool (pool , OLAP_TASK_WORKER );
98+ this .ephemeralTaskExecutor = ExecutorUtil .newFixedThreadPool (pool , EPHEMERAL_TASK_WORKER );
9699 this .distributedSchedulerExecutor =
97- ExecutorUtil .newPausableScheduledThreadPool (1 ,
98- DISTRIBUTED_TASK_SCHEDULER );
100+ ExecutorUtil .newPausableScheduledThreadPool (1 , DISTRIBUTED_TASK_SCHEDULER );
99101
100- // For schedule task to run, just one thread is ok
102+ // For a schedule task to run, just one thread is ok
101103 this .schedulerExecutor = ExecutorUtil .newPausableScheduledThreadPool (
102104 1 , TASK_SCHEDULER );
103105 // Start after 10x period time waiting for HugeGraphServer startup
@@ -111,7 +113,9 @@ public void addScheduler(HugeGraphParams graph) {
111113 E .checkArgumentNotNull (graph , "The graph can't be null" );
112114 LOG .info ("Use {} as the scheduler of graph ({})" ,
113115 graph .schedulerType (), graph .name ());
114- // TODO: If the current service is bound to a specified non-DEFAULT graph space, the graph outside of the current graph space will no longer create task schedulers (graph space)
116+ // TODO: If the current service is bound to a specified non-DEFAULT graph space, the
117+ // graph outside of the current graph space will no longer create task schedulers (graph
118+ // space)
115119 switch (graph .schedulerType ()) {
116120 case "distributed" : {
117121 TaskScheduler scheduler =
@@ -194,7 +198,7 @@ private void closeTaskTx(HugeGraphParams graph) {
194198
195199 private void closeSchedulerTx (HugeGraphParams graph ) {
196200 final Callable <Void > closeTx = () -> {
197- // Do close-tx for current thread
201+ // Do close-tx for the current thread
198202 graph .closeTx ();
199203 // Let other threads run
200204 Thread .yield ();
@@ -209,7 +213,7 @@ private void closeSchedulerTx(HugeGraphParams graph) {
209213
210214 private void closeDistributedSchedulerTx (HugeGraphParams graph ) {
211215 final Callable <Void > closeTx = () -> {
212- // Do close-tx for current thread
216+ // Do close-tx for the current thread
213217 graph .closeTx ();
214218 // Let other threads run
215219 Thread .yield ();
@@ -252,8 +256,7 @@ public void shutdown(long timeout) {
252256 if (!this .schedulerExecutor .isShutdown ()) {
253257 this .schedulerExecutor .shutdown ();
254258 try {
255- terminated = this .schedulerExecutor .awaitTermination (timeout ,
256- unit );
259+ terminated = this .schedulerExecutor .awaitTermination (timeout , unit );
257260 } catch (Throwable e ) {
258261 ex = e ;
259262 }
@@ -262,8 +265,7 @@ public void shutdown(long timeout) {
262265 if (terminated && !this .distributedSchedulerExecutor .isShutdown ()) {
263266 this .distributedSchedulerExecutor .shutdown ();
264267 try {
265- terminated = this .distributedSchedulerExecutor .awaitTermination (timeout ,
266- unit );
268+ terminated = this .distributedSchedulerExecutor .awaitTermination (timeout , unit );
267269 } catch (Throwable e ) {
268270 ex = e ;
269271 }
@@ -272,8 +274,7 @@ public void shutdown(long timeout) {
272274 if (terminated && !this .taskExecutor .isShutdown ()) {
273275 this .taskExecutor .shutdown ();
274276 try {
275- terminated = this .taskExecutor .awaitTermination (timeout ,
276- unit );
277+ terminated = this .taskExecutor .awaitTermination (timeout , unit );
277278 } catch (Throwable e ) {
278279 ex = e ;
279280 }
@@ -282,8 +283,7 @@ public void shutdown(long timeout) {
282283 if (terminated && !this .serverInfoDbExecutor .isShutdown ()) {
283284 this .serverInfoDbExecutor .shutdown ();
284285 try {
285- terminated = this .serverInfoDbExecutor .awaitTermination (timeout ,
286- unit );
286+ terminated = this .serverInfoDbExecutor .awaitTermination (timeout , unit );
287287 } catch (Throwable e ) {
288288 ex = e ;
289289 }
@@ -292,8 +292,7 @@ public void shutdown(long timeout) {
292292 if (terminated && !this .taskDbExecutor .isShutdown ()) {
293293 this .taskDbExecutor .shutdown ();
294294 try {
295- terminated = this .taskDbExecutor .awaitTermination (timeout ,
296- unit );
295+ terminated = this .taskDbExecutor .awaitTermination (timeout , unit );
297296 } catch (Throwable e ) {
298297 ex = e ;
299298 }
@@ -302,8 +301,7 @@ public void shutdown(long timeout) {
302301 if (terminated && !this .ephemeralTaskExecutor .isShutdown ()) {
303302 this .ephemeralTaskExecutor .shutdown ();
304303 try {
305- terminated = this .ephemeralTaskExecutor .awaitTermination (timeout ,
306- unit );
304+ terminated = this .ephemeralTaskExecutor .awaitTermination (timeout , unit );
307305 } catch (Throwable e ) {
308306 ex = e ;
309307 }
@@ -312,8 +310,7 @@ public void shutdown(long timeout) {
312310 if (terminated && !this .schemaTaskExecutor .isShutdown ()) {
313311 this .schemaTaskExecutor .shutdown ();
314312 try {
315- terminated = this .schemaTaskExecutor .awaitTermination (timeout ,
316- unit );
313+ terminated = this .schemaTaskExecutor .awaitTermination (timeout , unit );
317314 } catch (Throwable e ) {
318315 ex = e ;
319316 }
@@ -322,8 +319,7 @@ public void shutdown(long timeout) {
322319 if (terminated && !this .olapTaskExecutor .isShutdown ()) {
323320 this .olapTaskExecutor .shutdown ();
324321 try {
325- terminated = this .olapTaskExecutor .awaitTermination (timeout ,
326- unit );
322+ terminated = this .olapTaskExecutor .awaitTermination (timeout , unit );
327323 } catch (Throwable e ) {
328324 ex = e ;
329325 }
@@ -356,9 +352,12 @@ public void enableRoleElection() {
356352 public void onAsRoleMaster () {
357353 try {
358354 for (TaskScheduler entry : this .schedulers .values ()) {
359- StandardTaskScheduler scheduler = (StandardTaskScheduler ) entry ;
360- ServerInfoManager serverInfoManager = scheduler .serverManager ();
361- serverInfoManager .changeServerRole (NodeRole .MASTER );
355+ ServerInfoManager serverInfoManager = entry .serverManager ();
356+ if (serverInfoManager != null ) {
357+ serverInfoManager .changeServerRole (NodeRole .MASTER );
358+ } else {
359+ LOG .warn ("ServerInfoManager is null for graph {}" , entry .graphName ());
360+ }
362361 }
363362 } catch (Throwable e ) {
364363 LOG .error ("Exception occurred when change to master role" , e );
@@ -369,18 +368,21 @@ public void onAsRoleMaster() {
369368 public void onAsRoleWorker () {
370369 try {
371370 for (TaskScheduler entry : this .schedulers .values ()) {
372- StandardTaskScheduler scheduler = (StandardTaskScheduler ) entry ;
373- ServerInfoManager serverInfoManager = scheduler .serverManager ();
374- serverInfoManager .changeServerRole (NodeRole .WORKER );
371+ ServerInfoManager serverInfoManager = entry .serverManager ();
372+ if (serverInfoManager != null ) {
373+ serverInfoManager .changeServerRole (NodeRole .WORKER );
374+ } else {
375+ LOG .warn ("ServerInfoManager is null for graph {}" , entry .graphName ());
376+ }
375377 }
376378 } catch (Throwable e ) {
377379 LOG .error ("Exception occurred when change to worker role" , e );
378380 throw e ;
379381 }
380382 }
381383
382- protected void notifyNewTask (HugeTask <?> task ) {
383- Queue <Runnable > queue = (( ThreadPoolExecutor ) this .schedulerExecutor )
384+ void notifyNewTask (HugeTask <?> task ) {
385+ Queue <Runnable > queue = this .schedulerExecutor
384386 .getQueue ();
385387 if (queue .size () <= 1 ) {
386388 /*
@@ -398,10 +400,9 @@ private void scheduleOrExecuteJob() {
398400 // Called by scheduler timer
399401 try {
400402 for (TaskScheduler entry : this .schedulers .values ()) {
401- TaskScheduler scheduler = entry ;
402- // Maybe other thread close&remove scheduler at the same time
403- synchronized (scheduler ) {
404- this .scheduleOrExecuteJobForGraph (scheduler );
403+ // Maybe other threads close&remove scheduler at the same time
404+ synchronized (entry ) {
405+ this .scheduleOrExecuteJobForGraph (entry );
405406 }
406407 }
407408 } catch (Throwable e ) {
0 commit comments