1616
1717import argparse
1818from collections import deque
19+ from concurrent .futures import ThreadPoolExecutor , as_completed
1920import configparser
2021import datetime
22+ import math
2123import os
2224import time
2325import shutil
24- import signal
2526import sys
2627import subprocess
2728import tempfile
@@ -333,14 +334,16 @@ def run_tests(test_list, src_dir, build_dir, tmpdir, jobs=1, enable_coverage=Fal
333334 raise
334335
335336 #Run Tests
336- job_queue = TestHandler (jobs , tests_dir , tmpdir , test_list , flags )
337+ job_queue = TestHandler (jobs , tests_dir , tmpdir , test_list , flags ). jobs
337338 start_time = time .time ()
338339 test_results = []
339340
340341 max_len_name = len (max (test_list , key = len ))
341342
342- for _ in range (len (test_list )):
343- test_result , testdir , stdout , stderr = job_queue .get_next ()
343+ for future in as_completed (job_queue ):
344+ if future .cancelled ():
345+ continue
346+ test_result , testdir , stdout , stderr = future .result ()
344347 test_results .append (test_result )
345348
346349 if test_result .status == "Passed" :
@@ -362,7 +365,9 @@ def run_tests(test_list, src_dir, build_dir, tmpdir, jobs=1, enable_coverage=Fal
362365
363366 if failfast :
364367 logging .debug ("Early exiting after test failure" )
365- break
368+ for f in job_queue :
369+ f .cancel ()
370+ logging .debug ("Waiting for unfinished jobs ..." )
366371
367372 print_results (test_results , max_len_name , (int (time .time () - start_time )))
368373
@@ -378,10 +383,6 @@ def run_tests(test_list, src_dir, build_dir, tmpdir, jobs=1, enable_coverage=Fal
378383
379384 all_passed = all (map (lambda test_result : test_result .was_successful , test_results ))
380385
381- # This will be a no-op unless failfast is True in which case there may be dangling
382- # processes which need to be killed.
383- job_queue .kill_and_join ()
384-
385386 sys .exit (not all_passed )
386387
387388def print_results (test_results , max_len_name , runtime ):
@@ -411,77 +412,42 @@ class TestHandler:
411412 Trigger the test scripts passed in via the list.
412413 """
413414
415+ @staticmethod
416+ def _run_test (name , proc_args , testdir , log_out , log_err ):
417+ start_time = time .time ()
418+ ret_code = subprocess .call (proc_args , universal_newlines = True , stdout = log_out , stderr = log_err , timeout = None if not os .getenv ('TRAVIS' ) == 'true' else TRAVIS_TIMEOUT_DURATION )
419+ duration = int (math .ceil (time .time () - start_time ))
420+ log_out .seek (0 ), log_err .seek (0 )
421+ [stdout , stderr ] = [log_file .read ().decode ('utf-8' ) for log_file in (log_out , log_err )]
422+ log_out .close (), log_err .close ()
423+ if ret_code == TEST_EXIT_PASSED and stderr == "" :
424+ status = "Passed"
425+ elif ret_code == TEST_EXIT_SKIPPED :
426+ status = "Skipped"
427+ else :
428+ status = "Failed"
429+
430+ return TestResult (name , status , duration ), testdir , stdout , stderr
431+
414432 def __init__ (self , num_tests_parallel , tests_dir , tmpdir , test_list = None , flags = None ):
415- assert (num_tests_parallel >= 1 )
416- self .num_jobs = num_tests_parallel
417- self .tests_dir = tests_dir
418- self .tmpdir = tmpdir
419- self .test_list = test_list
420- self .flags = flags
421- self .num_running = 0
422- # In case there is a graveyard of zombie bitcoinds, we can apply a
423- # pseudorandom offset to hopefully jump over them.
424- # (625 is PORT_RANGE/MAX_NODES)
425- self .portseed_offset = int (time .time () * 1000 ) % 625
433+ self .executor = ThreadPoolExecutor (max_workers = num_tests_parallel )
426434 self .jobs = []
427435
428- def get_next (self ):
429- while self .num_running < self .num_jobs and self .test_list :
430- # Add tests
431- self .num_running += 1
432- test = self .test_list .pop (0 )
433- portseed = len (self .test_list ) + self .portseed_offset
436+ # Add tests
437+ for i , test in enumerate (test_list ):
438+ portseed = i
434439 portseed_arg = ["--portseed={}" .format (portseed )]
435440 log_stdout = tempfile .SpooledTemporaryFile (max_size = 2 ** 16 )
436441 log_stderr = tempfile .SpooledTemporaryFile (max_size = 2 ** 16 )
437442 test_argv = test .split ()
438- testdir = "{}/{}_{}" .format (self . tmpdir , re .sub (".py$" , "" , test_argv [0 ]), portseed )
443+ testdir = "{}/{}_{}" .format (tmpdir , re .sub (".py$" , "" , test_argv [0 ]), portseed )
439444 tmpdir_arg = ["--tmpdir={}" .format (testdir )]
440- self .jobs .append ((test ,
441- time .time (),
442- subprocess .Popen ([sys .executable , self .tests_dir + test_argv [0 ]] + test_argv [1 :] + self .flags + portseed_arg + tmpdir_arg ,
443- universal_newlines = True ,
444- stdout = log_stdout ,
445- stderr = log_stderr ),
445+ self .jobs .append (self .executor .submit (TestHandler ._run_test ,
446+ test ,
447+ [sys .executable , tests_dir + test_argv [0 ]] + test_argv [1 :] + flags + portseed_arg + tmpdir_arg ,
446448 testdir ,
447449 log_stdout ,
448450 log_stderr ))
449- if not self .jobs :
450- raise IndexError ('pop from empty list' )
451- while True :
452- # Return first proc that finishes
453- time .sleep (.5 )
454- for job in self .jobs :
455- (name , start_time , proc , testdir , log_out , log_err ) = job
456- if os .getenv ('TRAVIS' ) == 'true' and int (time .time () - start_time ) > TRAVIS_TIMEOUT_DURATION :
457- # In travis, timeout individual tests (to stop tests hanging and not providing useful output).
458- proc .send_signal (signal .SIGINT )
459- if proc .poll () is not None :
460- log_out .seek (0 ), log_err .seek (0 )
461- [stdout , stderr ] = [log_file .read ().decode ('utf-8' ) for log_file in (log_out , log_err )]
462- log_out .close (), log_err .close ()
463- if proc .returncode == TEST_EXIT_PASSED and stderr == "" :
464- status = "Passed"
465- elif proc .returncode == TEST_EXIT_SKIPPED :
466- status = "Skipped"
467- else :
468- status = "Failed"
469- self .num_running -= 1
470- self .jobs .remove (job )
471-
472- return TestResult (name , status , int (time .time () - start_time )), testdir , stdout , stderr
473- print ('.' , end = '' , flush = True )
474-
475- def kill_and_join (self ):
476- """Send SIGKILL to all jobs and block until all have ended."""
477- procs = [i [2 ] for i in self .jobs ]
478-
479- for proc in procs :
480- proc .kill ()
481-
482- for proc in procs :
483- proc .wait ()
484-
485451
486452class TestResult ():
487453 def __init__ (self , name , status , time ):
0 commit comments