2828import java .util .ArrayList ;
2929import java .util .Random ;
3030import java .util .concurrent .ConcurrentHashMap ;
31+ import java .util .concurrent .CountDownLatch ;
3132import java .util .concurrent .LinkedBlockingQueue ;
33+ import java .util .concurrent .TimeUnit ;
3234import java .util .concurrent .atomic .AtomicInteger ;
3335import java .util .concurrent .atomic .AtomicLong ;
3436import org .apache .jute .BinaryOutputArchive ;
@@ -82,13 +84,18 @@ public class CommitProcessorTest extends ZKTestCase {
8284 File tmpDir ;
8385 ArrayList <TestClientThread > testClients = new ArrayList <TestClientThread >();
8486 CommitProcessor commitProcessor ;
87+ DelayRequestProcessor delayProcessor ;
8588
8689 public void setUp (int numCommitThreads , int numClientThreads , int writePercent ) throws Exception {
90+ setUp (numCommitThreads , numClientThreads , writePercent , false );
91+ }
92+
93+ public void setUp (int numCommitThreads , int numClientThreads , int writePercent , boolean withDelayProcessor ) throws Exception {
8794 stopped = false ;
8895 System .setProperty (CommitProcessor .ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS , Integer .toString (numCommitThreads ));
8996 tmpDir = ClientBase .createTmpDir ();
9097 ClientBase .setupTestEnv ();
91- zks = new TestZooKeeperServer (tmpDir , tmpDir , 4000 );
98+ zks = new TestZooKeeperServer (tmpDir , tmpDir , 4000 , withDelayProcessor );
9299 zks .startup ();
93100 for (int i = 0 ; i < numClientThreads ; ++i ) {
94101 TestClientThread client = new TestClientThread (writePercent );
@@ -211,6 +218,23 @@ public void testNoCommitWorkersReadOnlyWorkload() throws Exception {
211218 assertTrue ("Write requests processed" , processedWriteRequests .get () == numClients );
212219 }
213220
221+ @ Test
222+ public void testWaitingForWriteToFinishBeforeShutdown () throws Exception {
223+ setUp (1 , 0 , 0 , true );
224+
225+ // send a single write request
226+ TestClientThread client = new TestClientThread (0 );
227+ client .sendWriteRequest ();
228+
229+ // wait for request being committed
230+ delayProcessor .waitRequestProcessing ();
231+
232+ zks .shutdown ();
233+
234+ // Make sure we've finished the in-flight request before shutdown returns
235+ assertFalse (commitProcessor .isAlive ());
236+ }
237+
214238 @ Test
215239 public void testNoCommitWorkersMixedWorkload () throws Exception {
216240 int numClients = 10 ;
@@ -287,8 +311,15 @@ private synchronized void failTest(String reason) {
287311
288312 private class TestZooKeeperServer extends ZooKeeperServer {
289313
314+ final boolean withDelayProcessor ;
315+
290316 public TestZooKeeperServer (File snapDir , File logDir , int tickTime ) throws IOException {
317+ this (snapDir , logDir , tickTime , false );
318+ }
319+
320+ public TestZooKeeperServer (File snapDir , File logDir , int tickTime , boolean withDelayProcessor ) throws IOException {
291321 super (snapDir , logDir , tickTime );
322+ this .withDelayProcessor = withDelayProcessor ;
292323 }
293324
294325 public PrepRequestProcessor getFirstProcessor () {
@@ -303,7 +334,12 @@ protected void setupRequestProcessors() {
303334 // ValidateProcessor is set up in a similar fashion to ToBeApplied
304335 // processor, so it can do pre/post validating of requests
305336 ValidateProcessor validateProcessor = new ValidateProcessor (finalProcessor );
306- commitProcessor = new CommitProcessor (validateProcessor , "1" , true , null );
337+ if (withDelayProcessor ) {
338+ delayProcessor = new DelayRequestProcessor (validateProcessor );
339+ commitProcessor = new CommitProcessor (delayProcessor , "1" , true , null );
340+ } else {
341+ commitProcessor = new CommitProcessor (validateProcessor , "1" , true , null );
342+ }
307343 validateProcessor .setCommitProcessor (commitProcessor );
308344 commitProcessor .start ();
309345 MockProposalRequestProcessor proposalProcessor = new MockProposalRequestProcessor (commitProcessor );
@@ -314,6 +350,46 @@ protected void setupRequestProcessors() {
314350
315351 }
316352
353+ private class DelayRequestProcessor implements RequestProcessor {
354+ // delay 1s for each request
355+ static final int DEFAULT_DELAY = 1000 ;
356+ RequestProcessor nextProcessor ;
357+ CountDownLatch waitingProcessRequestBeingCalled ;
358+
359+ public DelayRequestProcessor (RequestProcessor nextProcessor ) {
360+ this .nextProcessor = nextProcessor ;
361+ this .waitingProcessRequestBeingCalled = new CountDownLatch (1 );
362+ }
363+
364+ @ Override
365+ public void processRequest (Request request ) throws RequestProcessorException {
366+ try {
367+ this .waitingProcessRequestBeingCalled .countDown ();
368+ LOG .info ("Sleeping {} ms for request {}" , DEFAULT_DELAY , request );
369+ Thread .sleep (DEFAULT_DELAY );
370+ } catch (InterruptedException e ) { /* ignore */ }
371+ nextProcessor .processRequest (request );
372+ }
373+
374+ public void waitRequestProcessing () {
375+ try {
376+ if (!waitingProcessRequestBeingCalled .await (3000 , TimeUnit .MILLISECONDS )) {
377+ LOG .info ("Did not see request processing in 3s" );
378+ }
379+ } catch (InterruptedException e ) {
380+ LOG .info ("Interrupted when waiting for processRequest being called" );
381+ }
382+ }
383+
384+ @ Override
385+ public void shutdown () {
386+ LOG .info ("shutdown DelayRequestProcessor" );
387+ if (nextProcessor != null ) {
388+ nextProcessor .shutdown ();
389+ }
390+ }
391+ }
392+
317393 private class MockProposalRequestProcessor extends Thread implements RequestProcessor {
318394
319395 private final CommitProcessor commitProcessor ;
0 commit comments