Skip to content

Commit 79f99af

Browse files
lvfangmineolivelli
authored andcommitted
ZOOKEEPER-3598: Fix potential data inconsistency issue due to CommitProcessor not gracefully shutdown
Note: use exit code 16 for SHUTDOWN_UNGRACEFULLY, since internally we've already using 15 for other exit code, which will be upstreamed later. Author: Fangmin Lyu <[email protected]> Reviewers: Enrico Olivelli <[email protected]>, Michael Han <[email protected]> Closes apache#1130 from lvfangmin/ZOOKEEPER-3598
1 parent 5536393 commit 79f99af

3 files changed

Lines changed: 97 additions & 3 deletions

File tree

zookeeper-server/src/main/java/org/apache/zookeeper/server/ExitCode.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@ public enum ExitCode {
4848
QUORUM_PACKET_ERROR(13),
4949

5050
/** Unable to bind to the quorum (election) port after multiple retry */
51-
UNABLE_TO_BIND_QUORUM_PORT(14);
51+
UNABLE_TO_BIND_QUORUM_PORT(14),
52+
53+
/** Failed to shutdown the request processor pipeline gracefully **/
54+
SHUTDOWN_UNGRACEFULLY(16);
5255

5356
private final int value;
5457

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.atomic.AtomicInteger;
3030
import org.apache.zookeeper.ZooDefs.OpCode;
3131
import org.apache.zookeeper.common.Time;
32+
import org.apache.zookeeper.server.ExitCode;
3233
import org.apache.zookeeper.server.Request;
3334
import org.apache.zookeeper.server.RequestProcessor;
3435
import org.apache.zookeeper.server.ServerMetrics;
@@ -621,6 +622,20 @@ public void shutdown() {
621622
workerPool.join(workerShutdownTimeoutMS);
622623
}
623624

625+
try {
626+
this.join(workerShutdownTimeoutMS);
627+
} catch (InterruptedException e) {
628+
LOG.warn("Interrupted while waiting for CommitProcessor to finish");
629+
Thread.currentThread().interrupt();
630+
}
631+
632+
if (this.isAlive()) {
633+
LOG.warn("CommitProcessor does not shutdown gracefully after "
634+
+ "waiting for {} ms, exit to avoid potential "
635+
+ "inconsistency issue", workerShutdownTimeoutMS);
636+
System.exit(ExitCode.SHUTDOWN_UNGRACEFULLY.getValue());
637+
}
638+
624639
if (nextProcessor != null) {
625640
nextProcessor.shutdown();
626641
}

zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
import java.util.ArrayList;
2929
import java.util.Random;
3030
import java.util.concurrent.ConcurrentHashMap;
31+
import java.util.concurrent.CountDownLatch;
3132
import java.util.concurrent.LinkedBlockingQueue;
33+
import java.util.concurrent.TimeUnit;
3234
import java.util.concurrent.atomic.AtomicInteger;
3335
import java.util.concurrent.atomic.AtomicLong;
3436
import org.apache.jute.BinaryOutputArchive;
@@ -82,13 +84,18 @@ public class CommitProcessorTest extends ZKTestCase {
8284
File tmpDir;
8385
ArrayList<TestClientThread> testClients = new ArrayList<TestClientThread>();
8486
CommitProcessor commitProcessor;
87+
DelayRequestProcessor delayProcessor;
8588

8689
public void setUp(int numCommitThreads, int numClientThreads, int writePercent) throws Exception {
90+
setUp(numCommitThreads, numClientThreads, writePercent, false);
91+
}
92+
93+
public void setUp(int numCommitThreads, int numClientThreads, int writePercent, boolean withDelayProcessor) throws Exception {
8794
stopped = false;
8895
System.setProperty(CommitProcessor.ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS, Integer.toString(numCommitThreads));
8996
tmpDir = ClientBase.createTmpDir();
9097
ClientBase.setupTestEnv();
91-
zks = new TestZooKeeperServer(tmpDir, tmpDir, 4000);
98+
zks = new TestZooKeeperServer(tmpDir, tmpDir, 4000, withDelayProcessor);
9299
zks.startup();
93100
for (int i = 0; i < numClientThreads; ++i) {
94101
TestClientThread client = new TestClientThread(writePercent);
@@ -211,6 +218,23 @@ public void testNoCommitWorkersReadOnlyWorkload() throws Exception {
211218
assertTrue("Write requests processed", processedWriteRequests.get() == numClients);
212219
}
213220

221+
@Test
222+
public void testWaitingForWriteToFinishBeforeShutdown() throws Exception {
223+
setUp(1, 0, 0, true);
224+
225+
// send a single write request
226+
TestClientThread client = new TestClientThread(0);
227+
client.sendWriteRequest();
228+
229+
// wait for request being committed
230+
delayProcessor.waitRequestProcessing();
231+
232+
zks.shutdown();
233+
234+
// Make sure we've finished the in-flight request before shutdown returns
235+
assertFalse(commitProcessor.isAlive());
236+
}
237+
214238
@Test
215239
public void testNoCommitWorkersMixedWorkload() throws Exception {
216240
int numClients = 10;
@@ -287,8 +311,15 @@ private synchronized void failTest(String reason) {
287311

288312
private class TestZooKeeperServer extends ZooKeeperServer {
289313

314+
final boolean withDelayProcessor;
315+
290316
public TestZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException {
317+
this(snapDir, logDir, tickTime, false);
318+
}
319+
320+
public TestZooKeeperServer(File snapDir, File logDir, int tickTime, boolean withDelayProcessor) throws IOException {
291321
super(snapDir, logDir, tickTime);
322+
this.withDelayProcessor = withDelayProcessor;
292323
}
293324

294325
public PrepRequestProcessor getFirstProcessor() {
@@ -303,7 +334,12 @@ protected void setupRequestProcessors() {
303334
// ValidateProcessor is set up in a similar fashion to ToBeApplied
304335
// processor, so it can do pre/post validating of requests
305336
ValidateProcessor validateProcessor = new ValidateProcessor(finalProcessor);
306-
commitProcessor = new CommitProcessor(validateProcessor, "1", true, null);
337+
if (withDelayProcessor) {
338+
delayProcessor = new DelayRequestProcessor(validateProcessor);
339+
commitProcessor = new CommitProcessor(delayProcessor, "1", true, null);
340+
} else {
341+
commitProcessor = new CommitProcessor(validateProcessor, "1", true, null);
342+
}
307343
validateProcessor.setCommitProcessor(commitProcessor);
308344
commitProcessor.start();
309345
MockProposalRequestProcessor proposalProcessor = new MockProposalRequestProcessor(commitProcessor);
@@ -314,6 +350,46 @@ protected void setupRequestProcessors() {
314350

315351
}
316352

353+
private class DelayRequestProcessor implements RequestProcessor {
354+
// delay 1s for each request
355+
static final int DEFAULT_DELAY = 1000;
356+
RequestProcessor nextProcessor;
357+
CountDownLatch waitingProcessRequestBeingCalled;
358+
359+
public DelayRequestProcessor(RequestProcessor nextProcessor) {
360+
this.nextProcessor = nextProcessor;
361+
this.waitingProcessRequestBeingCalled = new CountDownLatch(1);
362+
}
363+
364+
@Override
365+
public void processRequest(Request request) throws RequestProcessorException {
366+
try {
367+
this.waitingProcessRequestBeingCalled.countDown();
368+
LOG.info("Sleeping {} ms for request {}", DEFAULT_DELAY, request);
369+
Thread.sleep(DEFAULT_DELAY);
370+
} catch (InterruptedException e) { /* ignore */ }
371+
nextProcessor.processRequest(request);
372+
}
373+
374+
public void waitRequestProcessing() {
375+
try {
376+
if (!waitingProcessRequestBeingCalled.await(3000, TimeUnit.MILLISECONDS)) {
377+
LOG.info("Did not see request processing in 3s");
378+
}
379+
} catch (InterruptedException e) {
380+
LOG.info("Interrupted when waiting for processRequest being called");
381+
}
382+
}
383+
384+
@Override
385+
public void shutdown() {
386+
LOG.info("shutdown DelayRequestProcessor");
387+
if (nextProcessor != null) {
388+
nextProcessor.shutdown();
389+
}
390+
}
391+
}
392+
317393
private class MockProposalRequestProcessor extends Thread implements RequestProcessor {
318394

319395
private final CommitProcessor commitProcessor;

0 commit comments

Comments
 (0)