Skip to content

Commit fa35a02

Browse files
author
MarcoFalke
committed
qa: Remove polling loop from test_runner
1 parent 472fe8a commit fa35a02

File tree

1 file changed

+35
-69
lines changed

1 file changed

+35
-69
lines changed

test/functional/test_runner.py

Lines changed: 35 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,13 @@
1616

1717
import argparse
1818
from collections import deque
19+
from concurrent.futures import ThreadPoolExecutor, as_completed
1920
import configparser
2021
import datetime
22+
import math
2123
import os
2224
import time
2325
import shutil
24-
import signal
2526
import sys
2627
import subprocess
2728
import tempfile
@@ -333,14 +334,16 @@ def run_tests(test_list, src_dir, build_dir, tmpdir, jobs=1, enable_coverage=Fal
333334
raise
334335

335336
#Run Tests
336-
job_queue = TestHandler(jobs, tests_dir, tmpdir, test_list, flags)
337+
job_queue = TestHandler(jobs, tests_dir, tmpdir, test_list, flags).jobs
337338
start_time = time.time()
338339
test_results = []
339340

340341
max_len_name = len(max(test_list, key=len))
341342

342-
for _ in range(len(test_list)):
343-
test_result, testdir, stdout, stderr = job_queue.get_next()
343+
for future in as_completed(job_queue):
344+
if future.cancelled():
345+
continue
346+
test_result, testdir, stdout, stderr = future.result()
344347
test_results.append(test_result)
345348

346349
if test_result.status == "Passed":
@@ -362,7 +365,9 @@ def run_tests(test_list, src_dir, build_dir, tmpdir, jobs=1, enable_coverage=Fal
362365

363366
if failfast:
364367
logging.debug("Early exiting after test failure")
365-
break
368+
for f in job_queue:
369+
f.cancel()
370+
logging.debug("Waiting for unfinished jobs ...")
366371

367372
print_results(test_results, max_len_name, (int(time.time() - start_time)))
368373

@@ -378,10 +383,6 @@ def run_tests(test_list, src_dir, build_dir, tmpdir, jobs=1, enable_coverage=Fal
378383

379384
all_passed = all(map(lambda test_result: test_result.was_successful, test_results))
380385

381-
# This will be a no-op unless failfast is True in which case there may be dangling
382-
# processes which need to be killed.
383-
job_queue.kill_and_join()
384-
385386
sys.exit(not all_passed)
386387

387388
def print_results(test_results, max_len_name, runtime):
@@ -411,77 +412,42 @@ class TestHandler:
411412
Trigger the test scripts passed in via the list.
412413
"""
413414

415+
@staticmethod
416+
def _run_test(name, proc_args, testdir, log_out, log_err):
417+
start_time = time.time()
418+
ret_code = subprocess.call(proc_args, universal_newlines=True, stdout=log_out, stderr=log_err, timeout=None if not os.getenv('TRAVIS') == 'true' else TRAVIS_TIMEOUT_DURATION)
419+
duration = int(math.ceil(time.time() - start_time))
420+
log_out.seek(0), log_err.seek(0)
421+
[stdout, stderr] = [log_file.read().decode('utf-8') for log_file in (log_out, log_err)]
422+
log_out.close(), log_err.close()
423+
if ret_code == TEST_EXIT_PASSED and stderr == "":
424+
status = "Passed"
425+
elif ret_code == TEST_EXIT_SKIPPED:
426+
status = "Skipped"
427+
else:
428+
status = "Failed"
429+
430+
return TestResult(name, status, duration), testdir, stdout, stderr
431+
414432
def __init__(self, num_tests_parallel, tests_dir, tmpdir, test_list=None, flags=None):
415-
assert(num_tests_parallel >= 1)
416-
self.num_jobs = num_tests_parallel
417-
self.tests_dir = tests_dir
418-
self.tmpdir = tmpdir
419-
self.test_list = test_list
420-
self.flags = flags
421-
self.num_running = 0
422-
# In case there is a graveyard of zombie bitcoinds, we can apply a
423-
# pseudorandom offset to hopefully jump over them.
424-
# (625 is PORT_RANGE/MAX_NODES)
425-
self.portseed_offset = int(time.time() * 1000) % 625
433+
self.executor = ThreadPoolExecutor(max_workers=num_tests_parallel)
426434
self.jobs = []
427435

428-
def get_next(self):
429-
while self.num_running < self.num_jobs and self.test_list:
430-
# Add tests
431-
self.num_running += 1
432-
test = self.test_list.pop(0)
433-
portseed = len(self.test_list) + self.portseed_offset
436+
# Add tests
437+
for i, test in enumerate(test_list):
438+
portseed = i
434439
portseed_arg = ["--portseed={}".format(portseed)]
435440
log_stdout = tempfile.SpooledTemporaryFile(max_size=2**16)
436441
log_stderr = tempfile.SpooledTemporaryFile(max_size=2**16)
437442
test_argv = test.split()
438-
testdir = "{}/{}_{}".format(self.tmpdir, re.sub(".py$", "", test_argv[0]), portseed)
443+
testdir = "{}/{}_{}".format(tmpdir, re.sub(".py$", "", test_argv[0]), portseed)
439444
tmpdir_arg = ["--tmpdir={}".format(testdir)]
440-
self.jobs.append((test,
441-
time.time(),
442-
subprocess.Popen([sys.executable, self.tests_dir + test_argv[0]] + test_argv[1:] + self.flags + portseed_arg + tmpdir_arg,
443-
universal_newlines=True,
444-
stdout=log_stdout,
445-
stderr=log_stderr),
445+
self.jobs.append(self.executor.submit(TestHandler._run_test,
446+
test,
447+
[sys.executable, tests_dir + test_argv[0]] + test_argv[1:] + flags + portseed_arg + tmpdir_arg,
446448
testdir,
447449
log_stdout,
448450
log_stderr))
449-
if not self.jobs:
450-
raise IndexError('pop from empty list')
451-
while True:
452-
# Return first proc that finishes
453-
time.sleep(.5)
454-
for job in self.jobs:
455-
(name, start_time, proc, testdir, log_out, log_err) = job
456-
if os.getenv('TRAVIS') == 'true' and int(time.time() - start_time) > TRAVIS_TIMEOUT_DURATION:
457-
# In travis, timeout individual tests (to stop tests hanging and not providing useful output).
458-
proc.send_signal(signal.SIGINT)
459-
if proc.poll() is not None:
460-
log_out.seek(0), log_err.seek(0)
461-
[stdout, stderr] = [log_file.read().decode('utf-8') for log_file in (log_out, log_err)]
462-
log_out.close(), log_err.close()
463-
if proc.returncode == TEST_EXIT_PASSED and stderr == "":
464-
status = "Passed"
465-
elif proc.returncode == TEST_EXIT_SKIPPED:
466-
status = "Skipped"
467-
else:
468-
status = "Failed"
469-
self.num_running -= 1
470-
self.jobs.remove(job)
471-
472-
return TestResult(name, status, int(time.time() - start_time)), testdir, stdout, stderr
473-
print('.', end='', flush=True)
474-
475-
def kill_and_join(self):
476-
"""Send SIGKILL to all jobs and block until all have ended."""
477-
procs = [i[2] for i in self.jobs]
478-
479-
for proc in procs:
480-
proc.kill()
481-
482-
for proc in procs:
483-
proc.wait()
484-
485451

486452
class TestResult():
487453
def __init__(self, name, status, time):

0 commit comments

Comments
 (0)