Skip to content

Commit 51d29ac

Browse files
authored
feat(core): cluster role automatic management (#1943)
* #1869 cluster role automatic management * improve code * FIX bug * improve code * improve code * improve code * improve code * improve code * improve code
1 parent 4c8069d commit 51d29ac

File tree

9 files changed

+886
-1
lines changed

9 files changed

+886
-1
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2017 HugeGraph Authors
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with this
6+
* work for additional information regarding copyright ownership. The ASF
7+
* licenses this file to You under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16+
* License for the specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package com.baidu.hugegraph.election;
21+
22+
public interface Config {
23+
24+
String node();
25+
26+
int exceedsFailCount();
27+
28+
long randomTimeoutMillisecond();
29+
30+
long heartBeatIntervalSecond();
31+
32+
int exceedsWorkerCount();
33+
34+
long baseTimeoutMillisecond();
35+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2017 HugeGraph Authors
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with this
6+
* work for additional information regarding copyright ownership. The ASF
7+
* licenses this file to You under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16+
* License for the specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package com.baidu.hugegraph.election;
21+
22+
public interface RoleElectionStateMachine {
23+
24+
void shutdown();
25+
26+
void apply(StateMachineCallback stateMachineCallback);
27+
}
Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
/*
2+
* Copyright 2017 HugeGraph Authors
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with this
6+
* work for additional information regarding copyright ownership. The ASF
7+
* licenses this file to You under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16+
* License for the specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package com.baidu.hugegraph.election;
21+
22+
import java.security.SecureRandom;
23+
import java.util.Optional;
24+
import java.util.concurrent.locks.LockSupport;
25+
26+
import com.baidu.hugegraph.util.E;
27+
28+
public class RoleElectionStateMachineImpl implements RoleElectionStateMachine {
29+
30+
private volatile boolean shutdown;
31+
private final Config config;
32+
private volatile RoleState state;
33+
private final RoleTypeDataAdapter roleTypeDataAdapter;
34+
35+
public RoleElectionStateMachineImpl(Config config, RoleTypeDataAdapter adapter) {
36+
this.config = config;
37+
this.roleTypeDataAdapter = adapter;
38+
this.state = new UnknownState(null);
39+
this.shutdown = false;
40+
}
41+
42+
@Override
43+
public void shutdown() {
44+
this.shutdown = true;
45+
}
46+
47+
@Override
48+
public void apply(StateMachineCallback stateMachineCallback) {
49+
int failCount = 0;
50+
StateMachineContextImpl context = new StateMachineContextImpl(this);
51+
while (!this.shutdown) {
52+
E.checkArgumentNotNull(this.state, "State don't be null");
53+
try {
54+
this.state = state.transform(context);
55+
Callback runnable = this.state.callback(stateMachineCallback);
56+
runnable.call(context);
57+
failCount = 0;
58+
} catch (Throwable e) {
59+
stateMachineCallback.error(context, e);
60+
failCount ++;
61+
if (failCount >= this.config.exceedsFailCount()) {
62+
this.state = new AbdicationState(context.epoch());
63+
Callback runnable = this.state.callback(stateMachineCallback);
64+
runnable.call(context);
65+
}
66+
}
67+
}
68+
}
69+
70+
private interface RoleState {
71+
72+
SecureRandom secureRandom = new SecureRandom();
73+
74+
RoleState transform(StateMachineContext context);
75+
76+
Callback callback(StateMachineCallback callback);
77+
78+
static void heartBeatPark(StateMachineContext context) {
79+
long heartBeatIntervalSecond = context.config().heartBeatIntervalSecond();
80+
LockSupport.parkNanos(heartBeatIntervalSecond * 1_000_000_000);
81+
}
82+
83+
static void randomPark(StateMachineContext context) {
84+
long randomTimeout = context.config().randomTimeoutMillisecond();
85+
long baseTime = context.config().baseTimeoutMillisecond();
86+
long timeout = (long) (baseTime + (randomTimeout / 10.0 * secureRandom.nextInt(11)));
87+
LockSupport.parkNanos(timeout * 1_000_000);
88+
}
89+
}
90+
91+
@FunctionalInterface
92+
private interface Callback {
93+
94+
void call(StateMachineContext context);
95+
}
96+
97+
private static class UnknownState implements RoleState {
98+
99+
final Integer epoch;
100+
101+
public UnknownState(Integer epoch) {
102+
this.epoch = epoch;
103+
}
104+
105+
@Override
106+
public RoleState transform(StateMachineContext context) {
107+
RoleTypeDataAdapter adapter = context.adapter();
108+
Optional<RoleTypeData> roleTypeDataOpt = adapter.query();
109+
if (!roleTypeDataOpt.isPresent()) {
110+
context.reset();
111+
Integer nextEpoch = this.epoch == null ? 1 : this.epoch + 1;
112+
context.epoch(nextEpoch);
113+
return new CandidateState(nextEpoch);
114+
}
115+
116+
RoleTypeData roleTypeData = roleTypeDataOpt.get();
117+
if (this.epoch != null && roleTypeData.epoch() < this.epoch) {
118+
context.reset();
119+
Integer nextEpoch = this.epoch + 1;
120+
context.epoch(nextEpoch);
121+
return new CandidateState(nextEpoch);
122+
}
123+
124+
context.epoch(roleTypeData.epoch());
125+
if (roleTypeData.isMaster(context.node())) {
126+
return new MasterState(roleTypeData);
127+
} else {
128+
return new WorkerState(roleTypeData);
129+
}
130+
}
131+
132+
@Override
133+
public Callback callback(StateMachineCallback callback) {
134+
return callback::unknown;
135+
}
136+
}
137+
138+
private static class AbdicationState implements RoleState {
139+
140+
private final Integer epoch;
141+
142+
public AbdicationState(Integer epoch) {
143+
this.epoch = epoch;
144+
}
145+
146+
@Override
147+
public RoleState transform(StateMachineContext context) {
148+
RoleState.heartBeatPark(context);
149+
return new UnknownState(this.epoch).transform(context);
150+
}
151+
152+
@Override
153+
public Callback callback(StateMachineCallback callback) {
154+
return callback::abdication;
155+
}
156+
}
157+
158+
private static class MasterState implements RoleState {
159+
160+
private final RoleTypeData roleTypeData;
161+
162+
public MasterState(RoleTypeData roleTypeData) {
163+
this.roleTypeData = roleTypeData;
164+
}
165+
166+
@Override
167+
public RoleState transform(StateMachineContext context) {
168+
this.roleTypeData.increaseClock();
169+
RoleState.heartBeatPark(context);
170+
if (context.adapter().updateIfNodePresent(this.roleTypeData)) {
171+
return this;
172+
}
173+
context.reset();
174+
context.epoch(this.roleTypeData.epoch());
175+
return new UnknownState(this.roleTypeData.epoch()).transform(context);
176+
}
177+
178+
@Override
179+
public Callback callback(StateMachineCallback callback) {
180+
return callback::master;
181+
}
182+
}
183+
184+
private static class WorkerState implements RoleState {
185+
186+
private RoleTypeData roleTypeData;
187+
private int clock;
188+
189+
public WorkerState(RoleTypeData roleTypeData) {
190+
this.roleTypeData = roleTypeData;
191+
this.clock = 0;
192+
}
193+
194+
@Override
195+
public RoleState transform(StateMachineContext context) {
196+
RoleState.heartBeatPark(context);
197+
RoleState nextState = new UnknownState(this.roleTypeData.epoch()).transform(context);
198+
if (nextState instanceof WorkerState) {
199+
this.merge((WorkerState) nextState);
200+
if (this.clock > context.config().exceedsWorkerCount()) {
201+
return new CandidateState(this.roleTypeData.epoch() + 1);
202+
} else {
203+
return this;
204+
}
205+
} else {
206+
return nextState;
207+
}
208+
}
209+
210+
@Override
211+
public Callback callback(StateMachineCallback callback) {
212+
return callback::worker;
213+
}
214+
215+
public void merge(WorkerState state) {
216+
if (state.roleTypeData.epoch() > this.roleTypeData.epoch()) {
217+
this.clock = 0;
218+
this.roleTypeData = state.roleTypeData;
219+
} else if (state.roleTypeData.epoch() < this.roleTypeData.epoch()){
220+
throw new IllegalStateException("Epoch must increase");
221+
} else if (state.roleTypeData.epoch() == this.roleTypeData.epoch() &&
222+
state.roleTypeData.clock() < this.roleTypeData.clock()) {
223+
throw new IllegalStateException("Clock must increase");
224+
} else if (state.roleTypeData.epoch() == this.roleTypeData.epoch() &&
225+
state.roleTypeData.clock() > this.roleTypeData.clock()) {
226+
this.clock = 0;
227+
this.roleTypeData = state.roleTypeData;
228+
} else {
229+
this.clock++;
230+
}
231+
}
232+
}
233+
234+
private static class CandidateState implements RoleState {
235+
236+
private final Integer epoch;
237+
238+
public CandidateState(Integer epoch) {
239+
this.epoch = epoch;
240+
}
241+
242+
@Override
243+
public RoleState transform(StateMachineContext context) {
244+
RoleState.randomPark(context);
245+
int epoch = this.epoch == null ? 1 : this.epoch;
246+
RoleTypeData roleTypeData = new RoleTypeData(context.config().node(), epoch);
247+
//failover to master success
248+
context.epoch(roleTypeData.epoch());
249+
if (context.adapter().updateIfNodePresent(roleTypeData)) {
250+
return new MasterState(roleTypeData);
251+
} else {
252+
return new UnknownState(epoch).transform(context);
253+
}
254+
}
255+
256+
@Override
257+
public Callback callback(StateMachineCallback callback) {
258+
return callback::candidate;
259+
}
260+
}
261+
262+
private static class StateMachineContextImpl implements StateMachineContext {
263+
264+
private Integer epoch;
265+
private final String node;
266+
private final RoleElectionStateMachineImpl machine;
267+
268+
public StateMachineContextImpl(RoleElectionStateMachineImpl machine) {
269+
this.node = machine.config.node();
270+
this.machine = machine;
271+
}
272+
273+
@Override
274+
public Integer epoch() {
275+
return this.epoch;
276+
}
277+
278+
@Override
279+
public String node() {
280+
return this.node;
281+
}
282+
283+
@Override
284+
public void epoch(Integer epoch) {
285+
this.epoch = epoch;
286+
}
287+
288+
@Override
289+
public RoleTypeDataAdapter adapter() {
290+
return this.machine.adapter();
291+
}
292+
293+
@Override
294+
public Config config() {
295+
return this.machine.config;
296+
}
297+
298+
@Override
299+
public RoleElectionStateMachine stateMachine() {
300+
return this.machine;
301+
}
302+
303+
@Override
304+
public void reset() {
305+
this.epoch = null;
306+
}
307+
}
308+
309+
protected RoleTypeDataAdapter adapter() {
310+
return this.roleTypeDataAdapter;
311+
}
312+
}

0 commit comments

Comments
 (0)