Skip to content

Commit ec974d0

Browse files
authored
Add a callback 'onBusy' used to adaptive rate limit (#1401)
* Add a callback 'onBusy' used to adaptive rate limit Change-Id: I2a1139f6e436744ee6b20557ec18f2181b2e63be
1 parent 39b9474 commit ec974d0

File tree

9 files changed

+36
-16
lines changed

9 files changed

+36
-16
lines changed

hugegraph-api/src/main/java/com/baidu/hugegraph/auth/ContextGremlinServer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.baidu.hugegraph.HugeGraph;
3434
import com.baidu.hugegraph.auth.HugeGraphAuthProxy.Context;
3535
import com.baidu.hugegraph.auth.HugeGraphAuthProxy.ContextThreadPoolExecutor;
36+
import com.baidu.hugegraph.config.CoreOptions;
3637

3738
/**
3839
* GremlinServer with custom ServerGremlinExecutor, which can pass Context
@@ -76,7 +77,7 @@ public void injectTraversalSource(String prefix) {
7677

7778
static ExecutorService newGremlinExecutorService(Settings settings) {
7879
if (settings.gremlinPool == 0) {
79-
settings.gremlinPool = Runtime.getRuntime().availableProcessors();
80+
settings.gremlinPool = CoreOptions.CPUS;
8081
}
8182
int size = settings.gremlinPool;
8283
ThreadFactory factory = ThreadFactoryUtil.create("exec-%d");

hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public static synchronized ServerOptions instance() {
7171
"restserver.max_worker_threads",
7272
"The maxmium worker threads of rest server.",
7373
rangeInt(2, Integer.MAX_VALUE),
74-
2 * Runtime.getRuntime().availableProcessors()
74+
2 * CoreOptions.CPUS
7575
);
7676

7777
public static final ConfigOption<Integer> MIN_FREE_MEMORY =
@@ -132,7 +132,7 @@ public static synchronized ServerOptions instance() {
132132
"gremlinserver.max_route",
133133
"The max route number for gremlin server.",
134134
positiveInt(),
135-
2 * Runtime.getRuntime().availableProcessors()
135+
2 * CoreOptions.CPUS
136136
);
137137

138138
public static final ConfigListOption<String> GRAPHS =

hugegraph-api/src/main/java/com/baidu/hugegraph/license/LicenseVerifyManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.slf4j.Logger;
3030

3131
import com.baidu.hugegraph.HugeException;
32+
import com.baidu.hugegraph.config.CoreOptions;
3233
import com.baidu.hugegraph.config.HugeConfig;
3334
import com.baidu.hugegraph.config.ServerOptions;
3435
import com.baidu.hugegraph.core.GraphManager;
@@ -218,7 +219,7 @@ private void checkCpu(ExtraParam param) {
218219
if (expectCpus == NO_LIMIT) {
219220
return;
220221
}
221-
int actualCpus = Runtime.getRuntime().availableProcessors();
222+
int actualCpus = CoreOptions.CPUS;
222223
if (actualCpus > expectCpus) {
223224
throw newLicenseException(
224225
"The server's cpus '%s' exceeded the limit '%s'",

hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,16 @@ public void onError(PeerId peer, Status status) {
285285
}
286286
}
287287

288+
// NOTE: Jraft itself doesn't have this callback, it's added by us
289+
public void onBusy(PeerId peer, Status status) {
290+
/*
291+
* If follower is busy then increase busy counter,
292+
* it will lead to submit thread wait more time
293+
*/
294+
int count = RaftNode.this.busyCounter.incrementAndGet();
295+
LOG.info("Increase busy counter: [{}]", count);
296+
}
297+
288298
private boolean isWriteBufferOverflow(Status status) {
289299
String expectMsg = "maybe write overflow";
290300
return RaftError.EINTERNAL == status.getRaftError() &&

hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.io.IOException;
2424
import java.nio.file.Paths;
2525
import java.util.List;
26+
import java.util.concurrent.ArrayBlockingQueue;
2627
import java.util.concurrent.BlockingQueue;
2728
import java.util.concurrent.ExecutorService;
2829
import java.util.concurrent.LinkedBlockingQueue;
@@ -73,6 +74,7 @@ public final class RaftSharedContext {
7374
public static final int WAIT_RPC_TIMEOUT = 30 * 60 * 1000;
7475
// compress block size
7576
public static final int BLOCK_SIZE = 8192;
77+
public static final int QUEUE_SIZE = CoreOptions.CPUS;
7678

7779
public static final String DEFAULT_GROUP = "default";
7880

@@ -362,7 +364,7 @@ private ExecutorService createBackendExecutor(int threads) {
362364
private static ExecutorService newPool(int coreThreads, int maxThreads,
363365
String name,
364366
RejectedExecutionHandler handler) {
365-
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
367+
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
366368
return ThreadPoolUtil.newBuilder()
367369
.poolName(name)
368370
.enableMetric(false)

hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.util.ArrayList;
2323
import java.util.List;
24+
import java.util.concurrent.Future;
2425

2526
import org.slf4j.Logger;
2627

@@ -103,6 +104,7 @@ public void onApply(Iterator iter) {
103104
LOG.debug("Node role: {}", this.node().selfIsLeader() ?
104105
"leader" : "follower");
105106
StoreClosure closure = null;
107+
List<Future<?>> futures = new ArrayList<>();
106108
try {
107109
while (iter.hasNext()) {
108110
closure = (StoreClosure) iter.done();
@@ -122,9 +124,8 @@ public void onApply(Iterator iter) {
122124
} else {
123125
// Follower need readMutation data
124126
byte[] bytes = iter.getData().array();
125-
// Follower seems no way to wait future
126127
// Let the backend thread do it directly
127-
this.context.backendExecutor().submit(() -> {
128+
futures.add(this.context.backendExecutor().submit(() -> {
128129
BytesBuffer buffer = LZ4Util.decompress(bytes,
129130
RaftSharedContext.BLOCK_SIZE);
130131
buffer.forReadWritten();
@@ -137,10 +138,14 @@ public void onApply(Iterator iter) {
137138
action, e);
138139
throw new BackendException("Backend error", e);
139140
}
140-
});
141+
}));
141142
}
142143
iter.next();
143144
}
145+
// Follower wait tasks finished
146+
for (Future<?> future : futures) {
147+
future.get();
148+
}
144149
} catch (Throwable e) {
145150
LOG.error("StateMachine occured critical error", e);
146151
Status status = new Status(RaftError.ESTATEMACHINE,
@@ -150,6 +155,7 @@ public void onApply(Iterator iter) {
150155
closure.failure(status, e);
151156
}
152157
// Will cause current node inactive
158+
// TODO: rollback to correct index
153159
iter.setErrorAndRollback(1L, status);
154160
}
155161
}

hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,14 @@
3030

3131
public class CoreOptions extends OptionHolder {
3232

33+
public static final int CPUS = Runtime.getRuntime().availableProcessors();
34+
3335
private CoreOptions() {
3436
super();
3537
}
3638

3739
private static volatile CoreOptions instance;
3840

39-
private static final int CPUS = Runtime.getRuntime().availableProcessors();
40-
4141
public static synchronized CoreOptions instance() {
4242
if (instance == null) {
4343
instance = new CoreOptions();

hugegraph-core/src/main/java/com/baidu/hugegraph/util/Consumers.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@
3737
import org.slf4j.Logger;
3838

3939
import com.baidu.hugegraph.HugeException;
40+
import com.baidu.hugegraph.config.CoreOptions;
4041
import com.baidu.hugegraph.task.TaskManager.ContextCallable;
4142

4243
public final class Consumers<V> {
4344

44-
public static final int CPUS = Runtime.getRuntime().availableProcessors();
45-
public static final int THREADS = 4 + CPUS / 4;
45+
public static final int THREADS = 4 + CoreOptions.CPUS / 4;
4646
public static final int QUEUE_WORKER_SIZE = 1000;
4747
public static final long CONSUMER_WAKE_PERIOD = 1;
4848

@@ -240,8 +240,8 @@ public static ExecutorService newThreadPool(String prefix, int workers) {
240240
if (workers < 0) {
241241
assert workers == -1;
242242
workers = Consumers.THREADS;
243-
} else if (workers > Consumers.CPUS * 2) {
244-
workers = Consumers.CPUS * 2;
243+
} else if (workers > CoreOptions.CPUS * 2) {
244+
workers = CoreOptions.CPUS * 2;
245245
}
246246
String name = prefix + "-worker-%d";
247247
return ExecutorUtil.newFixedThreadPool(workers, name);
@@ -262,7 +262,7 @@ public static RuntimeException wrapException(Throwable e) {
262262

263263
public static class ExecutorPool {
264264

265-
private final static int POOL_CAPACITY = 2 * CPUS;
265+
private final static int POOL_CAPACITY = 2 * CoreOptions.CPUS;
266266

267267
private final String threadNamePrefix;
268268
private final int executorWorkers;

hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,7 @@ public static void initOptions(HugeConfig conf,
493493

494494
// Optimize RocksDB
495495
if (optimize) {
496-
int processors = Runtime.getRuntime().availableProcessors();
496+
int processors = CoreOptions.CPUS;
497497
db.setIncreaseParallelism(Math.max(processors / 2, 1));
498498

499499
db.setAllowConcurrentMemtableWrite(true);

0 commit comments

Comments
 (0)