Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tools/run_tests/dockerjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class DockerJob:

def __init__(self, spec):
self._spec = spec
self._job = jobset.Job(spec, bin_hash=None, newline_on_success=True, travis=True, add_env={})
self._job = jobset.Job(spec, newline_on_success=True, travis=True, add_env={})
self._container_name = spec.container_name

def mapped_port(self, port):
Expand All @@ -118,4 +118,4 @@ def kill(self, suppress_failure=False):

def is_running(self):
"""Polls a job and returns True if given job is still running."""
return self._job.state(jobset.NoCache()) == jobset._RUNNING
return self._job.state() == jobset._RUNNING
66 changes: 14 additions & 52 deletions tools/run_tests/jobset.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

"""Run a group of subprocesses and then finish."""

import hashlib
import multiprocessing
import os
import platform
Expand Down Expand Up @@ -149,27 +148,22 @@ def which(filename):
class JobSpec(object):
"""Specifies what to run for a job."""

def __init__(self, cmdline, shortname=None, environ=None, hash_targets=None,
def __init__(self, cmdline, shortname=None, environ=None,
cwd=None, shell=False, timeout_seconds=5*60, flake_retries=0,
timeout_retries=0, kill_handler=None, cpu_cost=1.0,
verbose_success=False):
"""
Arguments:
cmdline: a list of arguments to pass as the command line
environ: a dictionary of environment variables to set in the child process
hash_targets: which files to include in the hash representing the jobs version
(or empty, indicating the job should not be hashed)
kill_handler: a handler that will be called whenever job.kill() is invoked
cpu_cost: number of cores per second this job needs
"""
if environ is None:
environ = {}
if hash_targets is None:
hash_targets = []
self.cmdline = cmdline
self.environ = environ
self.shortname = cmdline[0] if shortname is None else shortname
self.hash_targets = hash_targets or []
self.cwd = cwd
self.shell = shell
self.timeout_seconds = timeout_seconds
Expand All @@ -180,7 +174,7 @@ def __init__(self, cmdline, shortname=None, environ=None, hash_targets=None,
self.verbose_success = verbose_success

def identity(self):
return '%r %r %r' % (self.cmdline, self.environ, self.hash_targets)
return '%r %r' % (self.cmdline, self.environ)

def __hash__(self):
return hash(self.identity())
Expand All @@ -205,9 +199,8 @@ def __init__(self):
class Job(object):
"""Manages one job."""

def __init__(self, spec, bin_hash, newline_on_success, travis, add_env):
def __init__(self, spec, newline_on_success, travis, add_env):
self._spec = spec
self._bin_hash = bin_hash
self._newline_on_success = newline_on_success
self._travis = travis
self._add_env = add_env.copy()
Expand Down Expand Up @@ -249,7 +242,7 @@ def start(self):
self._process = try_start()
self._state = _RUNNING

def state(self, update_cache):
def state(self):
"""Poll current state of the job. Prints messages at completion."""
def stdout(self=self):
self._tempfile.seek(0)
Expand Down Expand Up @@ -293,8 +286,6 @@ def stdout(self=self):
stdout() if self._spec.verbose_success else None,
do_newline=self._newline_on_success or self._travis)
self.result.state = 'PASSED'
if self._bin_hash:
update_cache.finished(self._spec.identity(), self._bin_hash)
elif (self._state == _RUNNING and
self._spec.timeout_seconds is not None and
time.time() - self._start > self._spec.timeout_seconds):
Expand Down Expand Up @@ -329,7 +320,7 @@ class Jobset(object):
"""Manages one run of jobs."""

def __init__(self, check_cancelled, maxjobs, newline_on_success, travis,
stop_on_failure, add_env, cache):
stop_on_failure, add_env):
self._running = set()
self._check_cancelled = check_cancelled
self._cancelled = False
Expand All @@ -338,9 +329,7 @@ def __init__(self, check_cancelled, maxjobs, newline_on_success, travis,
self._maxjobs = maxjobs
self._newline_on_success = newline_on_success
self._travis = travis
self._cache = cache
self._stop_on_failure = stop_on_failure
self._hashes = {}
self._add_env = add_env
self.resultset = {}
self._remaining = None
Expand All @@ -367,37 +356,21 @@ def start(self, spec):
if current_cpu_cost + spec.cpu_cost <= self._maxjobs: break
self.reap()
if self.cancelled(): return False
if spec.hash_targets:
if spec.identity() in self._hashes:
bin_hash = self._hashes[spec.identity()]
else:
bin_hash = hashlib.sha1()
for fn in spec.hash_targets:
with open(which(fn)) as f:
bin_hash.update(f.read())
bin_hash = bin_hash.hexdigest()
self._hashes[spec.identity()] = bin_hash
should_run = self._cache.should_run(spec.identity(), bin_hash)
else:
bin_hash = None
should_run = True
if should_run:
job = Job(spec,
bin_hash,
self._newline_on_success,
self._travis,
self._add_env)
self._running.add(job)
if not self.resultset.has_key(job.GetSpec().shortname):
self.resultset[job.GetSpec().shortname] = []
job = Job(spec,
self._newline_on_success,
self._travis,
self._add_env)
self._running.add(job)
if not self.resultset.has_key(job.GetSpec().shortname):
self.resultset[job.GetSpec().shortname] = []
return True

def reap(self):
"""Collect the dead jobs."""
while self._running:
dead = set()
for job in self._running:
st = job.state(self._cache)
st = job.state()
if st == _RUNNING: continue
if st == _FAILURE or st == _KILLED:
self._failures += 1
Expand Down Expand Up @@ -450,15 +423,6 @@ def _never_cancelled():
return False


# cache class that caches nothing
class NoCache(object):
def should_run(self, cmdline, bin_hash):
return True

def finished(self, cmdline, bin_hash):
pass


def tag_remaining(xs):
staging = []
for x in xs:
Expand All @@ -477,12 +441,10 @@ def run(cmdlines,
travis=False,
infinite_runs=False,
stop_on_failure=False,
cache=None,
add_env={}):
js = Jobset(check_cancelled,
maxjobs if maxjobs is not None else _DEFAULT_MAX_JOBS,
newline_on_success, travis, stop_on_failure, add_env,
cache if cache is not None else NoCache())
newline_on_success, travis, stop_on_failure, add_env)
for cmdline, remaining in tag_remaining(cmdlines):
if not js.start(cmdline):
break
Expand Down
4 changes: 2 additions & 2 deletions tools/run_tests/run_performance_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ def __init__(self, spec, language, host_and_port):
self._spec = spec
self.language = language
self.host_and_port = host_and_port
self._job = jobset.Job(spec, bin_hash=None, newline_on_success=True, travis=True, add_env={})
self._job = jobset.Job(spec, newline_on_success=True, travis=True, add_env={})

def is_running(self):
"""Polls a job and returns True if given job is still running."""
return self._job.state(jobset.NoCache()) == jobset._RUNNING
return self._job.state() == jobset._RUNNING

def kill(self):
return self._job.kill()
Expand Down
69 changes: 5 additions & 64 deletions tools/run_tests/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import argparse
import ast
import glob
import hashlib
import itertools
import json
import multiprocessing
Expand Down Expand Up @@ -78,24 +77,18 @@ def __init__(self, config, environ=None, timeout_multiplier=1, tool_prefix=[]):
if environ is None:
environ = {}
self.build_config = config
self.allow_hashing = (config != 'gcov')
self.environ = environ
self.environ['CONFIG'] = config
self.tool_prefix = tool_prefix
self.timeout_multiplier = timeout_multiplier

def job_spec(self, cmdline, hash_targets, timeout_seconds=5*60,
def job_spec(self, cmdline, timeout_seconds=5*60,
shortname=None, environ={}, cpu_cost=1.0, flaky=False):
"""Construct a jobset.JobSpec for a test under this config

Args:
cmdline: a list of strings specifying the command line the test
would like to run
hash_targets: either None (don't do caching of test results), or
a list of strings specifying files to include in a
binary hash to check if a test has changed
-- if used, all artifacts needed to run the test must
be listed
"""
actual_environ = self.environ.copy()
for k, v in environ.iteritems():
Expand All @@ -105,8 +98,6 @@ def job_spec(self, cmdline, hash_targets, timeout_seconds=5*60,
environ=actual_environ,
cpu_cost=cpu_cost,
timeout_seconds=(self.timeout_multiplier * timeout_seconds if timeout_seconds else None),
hash_targets=hash_targets
if self.allow_hashing else None,
flake_retries=5 if flaky or args.allow_flakes else 0,
timeout_retries=3 if args.allow_flakes else 0)

Expand Down Expand Up @@ -399,7 +390,6 @@ def test_specs(self):
if self.config.build_config != 'gcov':
return [self.config.job_spec(
['tools/run_tests/run_python.sh', tox_env],
None,
environ=dict(environment.items() +
[('GRPC_PYTHON_TESTRUNNER_FILTER', suite_name)]),
shortname='%s.test.%s' % (tox_env, suite_name),
Expand All @@ -408,7 +398,6 @@ def test_specs(self):
for tox_env in self._tox_envs]
else:
return [self.config.job_spec(['tools/run_tests/run_python.sh', tox_env],
None,
environ=environment,
shortname='%s.test.coverage' % tox_env,
timeout_seconds=15*60)
Expand All @@ -425,7 +414,7 @@ def make_options(self):
return []

def build_steps(self):
return [['tools/run_tests/build_python.sh', tox_env]
return [['tools/run_tests/build_python.sh', tox_env]
for tox_env in self._tox_envs]

def post_tests_steps(self):
Expand Down Expand Up @@ -460,7 +449,7 @@ def configure(self, config, args):
_check_compiler(self.args.compiler, ['default'])

def test_specs(self):
return [self.config.job_spec(['tools/run_tests/run_ruby.sh'], None,
return [self.config.job_spec(['tools/run_tests/run_ruby.sh'],
timeout_seconds=10*60,
environ=_FORCE_ENVIRON_FOR_WRAPPERS)]

Expand Down Expand Up @@ -670,7 +659,7 @@ def configure(self, config, args):
def test_specs(self):
import yaml
with open('tools/run_tests/sanity/sanity_tests.yaml', 'r') as f:
return [self.config.job_spec(cmd['script'].split(), None,
return [self.config.job_spec(cmd['script'].split(),
timeout_seconds=None, environ={'TEST': 'true'},
cpu_cost=cmd.get('cpu_cost', 1))
for cmd in yaml.load(f)]
Expand Down Expand Up @@ -1058,46 +1047,6 @@ def build_step_environ(cfg):
forever = args.forever


class TestCache(object):
"""Cache for running tests."""

def __init__(self, use_cache_results):
self._last_successful_run = {}
self._use_cache_results = use_cache_results
self._last_save = time.time()

def should_run(self, cmdline, bin_hash):
if cmdline not in self._last_successful_run:
return True
if self._last_successful_run[cmdline] != bin_hash:
return True
if not self._use_cache_results:
return True
return False

def finished(self, cmdline, bin_hash):
self._last_successful_run[cmdline] = bin_hash
if time.time() - self._last_save > 1:
self.save()

def dump(self):
return [{'cmdline': k, 'hash': v}
for k, v in self._last_successful_run.iteritems()]

def parse(self, exdump):
self._last_successful_run = dict((o['cmdline'], o['hash']) for o in exdump)

def save(self):
with open('.run_tests_cache', 'w') as f:
f.write(json.dumps(self.dump()))
self._last_save = time.time()

def maybe_load(self):
if os.path.exists('.run_tests_cache'):
with open('.run_tests_cache') as f:
self.parse(json.loads(f.read()))


def _start_port_server(port_server_port):
# check if a compatible port server is running
# if incompatible (version mismatch) ==> start a new one
Expand Down Expand Up @@ -1217,7 +1166,7 @@ class BuildAndRunError(object):

# returns a list of things that failed (or an empty list on success)
def _build_and_run(
check_cancelled, newline_on_success, cache, xml_report=None, build_only=False):
check_cancelled, newline_on_success, xml_report=None, build_only=False):
"""Do one pass of building & running tests."""
# build latest sequentially
num_failures, resultset = jobset.run(
Expand Down Expand Up @@ -1266,7 +1215,6 @@ def _build_and_run(
all_runs, check_cancelled, newline_on_success=newline_on_success,
travis=args.travis, infinite_runs=infinite_runs, maxjobs=args.jobs,
stop_on_failure=args.stop_on_failure,
cache=cache if not xml_report else None,
add_env={'GRPC_TEST_PORT_SERVER': 'localhost:%d' % port_server_port})
if resultset:
for k, v in sorted(resultset.items()):
Expand Down Expand Up @@ -1295,14 +1243,9 @@ def _build_and_run(
if num_test_failures:
out.append(BuildAndRunError.TEST)

if cache: cache.save()

return out


test_cache = TestCache(runs_per_test == 1)
test_cache.maybe_load()

if forever:
success = True
while True:
Expand All @@ -1312,7 +1255,6 @@ def _build_and_run(
previous_success = success
errors = _build_and_run(check_cancelled=have_files_changed,
newline_on_success=False,
cache=test_cache,
build_only=args.build_only) == 0
if not previous_success and not errors:
jobset.message('SUCCESS',
Expand All @@ -1324,7 +1266,6 @@ def _build_and_run(
else:
errors = _build_and_run(check_cancelled=lambda: False,
newline_on_success=args.newline_on_success,
cache=test_cache,
xml_report=args.xml_report,
build_only=args.build_only)
if not errors:
Expand Down