Skip to content

Commit 26b91e4

Browse files
authored
Merge pull request #19 from apache/master
[pull] master from apache:master
2 parents c2dab77 + 0946d5d commit 26b91e4

File tree

3 files changed

+55
-44
lines changed

3 files changed

+55
-44
lines changed

hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,15 @@ public static synchronized ServerOptions instance() {
7373
"master"
7474
);
7575

76+
public static final ConfigOption<Boolean> ENABLE_SERVER_ROLE_ELECTION =
77+
new ConfigOption<>(
78+
"server.role_election",
79+
"Whether to enable role election, if enabled, the server " +
80+
"will elect a master node in the cluster.",
81+
disallowEmpty(),
82+
false
83+
);
84+
7685
public static final ConfigOption<Integer> MAX_WORKER_THREADS =
7786
new ConfigOption<>(
7887
"restserver.max_worker_threads",

hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,8 @@ private void serverStarted(HugeConfig config) {
469469

470470
NodeRole nodeRole = NodeRole.valueOf(role.toUpperCase());
471471
boolean supportRoleElection = !nodeRole.computer() &&
472-
this.supportRoleElection();
472+
this.supportRoleElection() &&
473+
config.get(ServerOptions.ENABLE_SERVER_ROLE_ELECTION);
473474
if (supportRoleElection) {
474475
// Init any server as Worker role, then do role election
475476
nodeRole = NodeRole.WORKER;

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java

Lines changed: 44 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -37,23 +37,29 @@
3737
import org.apache.hugegraph.util.Log;
3838
import org.slf4j.Logger;
3939

40+
/**
41+
* Central task management system that coordinates task scheduling and execution.
42+
* Manages task schedulers for different graphs and handles role-based execution.
43+
* <p>
44+
* Note: The local master-worker mechanism will be deprecated in version 1.7
45+
* (configuration has been removed from config files).
46+
*/
4047
public final class TaskManager {
4148

4249
private static final Logger LOG = Log.logger(TaskManager.class);
4350

4451
public static final String TASK_WORKER_PREFIX = "task-worker";
4552
public static final String TASK_WORKER = TASK_WORKER_PREFIX + "-%d";
4653
public static final String TASK_DB_WORKER = "task-db-worker-%d";
47-
public static final String SERVER_INFO_DB_WORKER =
48-
"server-info-db-worker-%d";
54+
public static final String SERVER_INFO_DB_WORKER = "server-info-db-worker-%d";
4955
public static final String TASK_SCHEDULER = "task-scheduler-%d";
5056

5157
public static final String OLAP_TASK_WORKER = "olap-task-worker-%d";
5258
public static final String SCHEMA_TASK_WORKER = "schema-task-worker-%d";
5359
public static final String EPHEMERAL_TASK_WORKER = "ephemeral-task-worker-%d";
5460
public static final String DISTRIBUTED_TASK_SCHEDULER = "distributed-scheduler-%d";
5561

56-
protected static final long SCHEDULE_PERIOD = 1000L; // unit ms
62+
static final long SCHEDULE_PERIOD = 1000L; // unit ms
5763
private static final long TX_CLOSE_TIMEOUT = 30L; // unit s
5864
private static final int THREADS = 4;
5965
private static final TaskManager MANAGER = new TaskManager(THREADS);
@@ -87,17 +93,13 @@ private TaskManager(int pool) {
8793
this.serverInfoDbExecutor = ExecutorUtil.newFixedThreadPool(
8894
1, SERVER_INFO_DB_WORKER);
8995

90-
this.schemaTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
91-
SCHEMA_TASK_WORKER);
92-
this.olapTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
93-
OLAP_TASK_WORKER);
94-
this.ephemeralTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
95-
EPHEMERAL_TASK_WORKER);
96+
this.schemaTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, SCHEMA_TASK_WORKER);
97+
this.olapTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, OLAP_TASK_WORKER);
98+
this.ephemeralTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, EPHEMERAL_TASK_WORKER);
9699
this.distributedSchedulerExecutor =
97-
ExecutorUtil.newPausableScheduledThreadPool(1,
98-
DISTRIBUTED_TASK_SCHEDULER);
100+
ExecutorUtil.newPausableScheduledThreadPool(1, DISTRIBUTED_TASK_SCHEDULER);
99101

100-
// For schedule task to run, just one thread is ok
102+
// For a schedule task to run, just one thread is ok
101103
this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool(
102104
1, TASK_SCHEDULER);
103105
// Start after 10x period time waiting for HugeGraphServer startup
@@ -111,7 +113,9 @@ public void addScheduler(HugeGraphParams graph) {
111113
E.checkArgumentNotNull(graph, "The graph can't be null");
112114
LOG.info("Use {} as the scheduler of graph ({})",
113115
graph.schedulerType(), graph.name());
114-
// TODO: If the current service is bound to a specified non-DEFAULT graph space, the graph outside of the current graph space will no longer create task schedulers (graph space)
116+
// TODO: If the current service is bound to a specified non-DEFAULT graph space, the
117+
// graph outside of the current graph space will no longer create task schedulers (graph
118+
// space)
115119
switch (graph.schedulerType()) {
116120
case "distributed": {
117121
TaskScheduler scheduler =
@@ -194,7 +198,7 @@ private void closeTaskTx(HugeGraphParams graph) {
194198

195199
private void closeSchedulerTx(HugeGraphParams graph) {
196200
final Callable<Void> closeTx = () -> {
197-
// Do close-tx for current thread
201+
// Do close-tx for the current thread
198202
graph.closeTx();
199203
// Let other threads run
200204
Thread.yield();
@@ -209,7 +213,7 @@ private void closeSchedulerTx(HugeGraphParams graph) {
209213

210214
private void closeDistributedSchedulerTx(HugeGraphParams graph) {
211215
final Callable<Void> closeTx = () -> {
212-
// Do close-tx for current thread
216+
// Do close-tx for the current thread
213217
graph.closeTx();
214218
// Let other threads run
215219
Thread.yield();
@@ -252,8 +256,7 @@ public void shutdown(long timeout) {
252256
if (!this.schedulerExecutor.isShutdown()) {
253257
this.schedulerExecutor.shutdown();
254258
try {
255-
terminated = this.schedulerExecutor.awaitTermination(timeout,
256-
unit);
259+
terminated = this.schedulerExecutor.awaitTermination(timeout, unit);
257260
} catch (Throwable e) {
258261
ex = e;
259262
}
@@ -262,8 +265,7 @@ public void shutdown(long timeout) {
262265
if (terminated && !this.distributedSchedulerExecutor.isShutdown()) {
263266
this.distributedSchedulerExecutor.shutdown();
264267
try {
265-
terminated = this.distributedSchedulerExecutor.awaitTermination(timeout,
266-
unit);
268+
terminated = this.distributedSchedulerExecutor.awaitTermination(timeout, unit);
267269
} catch (Throwable e) {
268270
ex = e;
269271
}
@@ -272,8 +274,7 @@ public void shutdown(long timeout) {
272274
if (terminated && !this.taskExecutor.isShutdown()) {
273275
this.taskExecutor.shutdown();
274276
try {
275-
terminated = this.taskExecutor.awaitTermination(timeout,
276-
unit);
277+
terminated = this.taskExecutor.awaitTermination(timeout, unit);
277278
} catch (Throwable e) {
278279
ex = e;
279280
}
@@ -282,8 +283,7 @@ public void shutdown(long timeout) {
282283
if (terminated && !this.serverInfoDbExecutor.isShutdown()) {
283284
this.serverInfoDbExecutor.shutdown();
284285
try {
285-
terminated = this.serverInfoDbExecutor.awaitTermination(timeout,
286-
unit);
286+
terminated = this.serverInfoDbExecutor.awaitTermination(timeout, unit);
287287
} catch (Throwable e) {
288288
ex = e;
289289
}
@@ -292,8 +292,7 @@ public void shutdown(long timeout) {
292292
if (terminated && !this.taskDbExecutor.isShutdown()) {
293293
this.taskDbExecutor.shutdown();
294294
try {
295-
terminated = this.taskDbExecutor.awaitTermination(timeout,
296-
unit);
295+
terminated = this.taskDbExecutor.awaitTermination(timeout, unit);
297296
} catch (Throwable e) {
298297
ex = e;
299298
}
@@ -302,8 +301,7 @@ public void shutdown(long timeout) {
302301
if (terminated && !this.ephemeralTaskExecutor.isShutdown()) {
303302
this.ephemeralTaskExecutor.shutdown();
304303
try {
305-
terminated = this.ephemeralTaskExecutor.awaitTermination(timeout,
306-
unit);
304+
terminated = this.ephemeralTaskExecutor.awaitTermination(timeout, unit);
307305
} catch (Throwable e) {
308306
ex = e;
309307
}
@@ -312,8 +310,7 @@ public void shutdown(long timeout) {
312310
if (terminated && !this.schemaTaskExecutor.isShutdown()) {
313311
this.schemaTaskExecutor.shutdown();
314312
try {
315-
terminated = this.schemaTaskExecutor.awaitTermination(timeout,
316-
unit);
313+
terminated = this.schemaTaskExecutor.awaitTermination(timeout, unit);
317314
} catch (Throwable e) {
318315
ex = e;
319316
}
@@ -322,8 +319,7 @@ public void shutdown(long timeout) {
322319
if (terminated && !this.olapTaskExecutor.isShutdown()) {
323320
this.olapTaskExecutor.shutdown();
324321
try {
325-
terminated = this.olapTaskExecutor.awaitTermination(timeout,
326-
unit);
322+
terminated = this.olapTaskExecutor.awaitTermination(timeout, unit);
327323
} catch (Throwable e) {
328324
ex = e;
329325
}
@@ -356,9 +352,12 @@ public void enableRoleElection() {
356352
public void onAsRoleMaster() {
357353
try {
358354
for (TaskScheduler entry : this.schedulers.values()) {
359-
StandardTaskScheduler scheduler = (StandardTaskScheduler) entry;
360-
ServerInfoManager serverInfoManager = scheduler.serverManager();
361-
serverInfoManager.changeServerRole(NodeRole.MASTER);
355+
ServerInfoManager serverInfoManager = entry.serverManager();
356+
if (serverInfoManager != null) {
357+
serverInfoManager.changeServerRole(NodeRole.MASTER);
358+
} else {
359+
LOG.warn("ServerInfoManager is null for graph {}", entry.graphName());
360+
}
362361
}
363362
} catch (Throwable e) {
364363
LOG.error("Exception occurred when change to master role", e);
@@ -369,18 +368,21 @@ public void onAsRoleMaster() {
369368
public void onAsRoleWorker() {
370369
try {
371370
for (TaskScheduler entry : this.schedulers.values()) {
372-
StandardTaskScheduler scheduler = (StandardTaskScheduler) entry;
373-
ServerInfoManager serverInfoManager = scheduler.serverManager();
374-
serverInfoManager.changeServerRole(NodeRole.WORKER);
371+
ServerInfoManager serverInfoManager = entry.serverManager();
372+
if (serverInfoManager != null) {
373+
serverInfoManager.changeServerRole(NodeRole.WORKER);
374+
} else {
375+
LOG.warn("ServerInfoManager is null for graph {}", entry.graphName());
376+
}
375377
}
376378
} catch (Throwable e) {
377379
LOG.error("Exception occurred when change to worker role", e);
378380
throw e;
379381
}
380382
}
381383

382-
protected void notifyNewTask(HugeTask<?> task) {
383-
Queue<Runnable> queue = ((ThreadPoolExecutor) this.schedulerExecutor)
384+
void notifyNewTask(HugeTask<?> task) {
385+
Queue<Runnable> queue = this.schedulerExecutor
384386
.getQueue();
385387
if (queue.size() <= 1) {
386388
/*
@@ -398,10 +400,9 @@ private void scheduleOrExecuteJob() {
398400
// Called by scheduler timer
399401
try {
400402
for (TaskScheduler entry : this.schedulers.values()) {
401-
TaskScheduler scheduler = entry;
402-
// Maybe other thread close&remove scheduler at the same time
403-
synchronized (scheduler) {
404-
this.scheduleOrExecuteJobForGraph(scheduler);
403+
// Maybe other threads close&remove scheduler at the same time
404+
synchronized (entry) {
405+
this.scheduleOrExecuteJobForGraph(entry);
405406
}
406407
}
407408
} catch (Throwable e) {

0 commit comments

Comments
 (0)