Skip to content

Commit b992cd1

Browse files
authored
feat: cluster sidecar (snakemake#1397)
1 parent 0593de1 commit b992cd1

File tree

14 files changed

+149
-3
lines changed

14 files changed

+149
-3
lines changed

docs/tutorial/additional_features.rst

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,19 @@ When snakemake is terminated by pressing ``Ctrl-C``, it will cancel all currentl
279279
You can get the same behaviour with ``--cluster`` by adding ``--cluster-cancel`` and passing a command to use for canceling jobs by their jobid (e.g., ``scancel`` for SLURM or ``qdel`` for SGE).
280280
Most job schedulers can be passed multiple jobids and you can use ``--cluster-cancel-nargs`` to limit the number of arguments (default is 1000 which is reasonable for most schedulers).
281281

282+
Using --cluster-sidecar
283+
:::::::::::::::::::::::
284+
285+
In certain situations, it is necessary to not perform calls to cluster commands directly and instead have a "sidecar" process, e.g., providing a REST API.
286+
One example is when using SLURM where regular calls to ``scontrol show job JOBID`` or ``sacct -j JOBID`` puts a high load on the controller.
287+
Rather, it is better to use the ``squeue`` command with the ``-i/--iterate`` option.
288+
289+
When using ``--cluster``, you can use ``--cluster-sidecar`` to pass in a command that starts a sidecar server.
290+
The command should print one line to stdout and then block and accept connections.
291+
The line will subsequently be available in the calls to ``--cluster``, ``--cluster-status``, and ``--cluster-cancel`` in the environment variable ``SNAKEMAKE_CLUSTER_SIDECAR_VARS``.
292+
In the case of a REST server, you can use this to return the port that the server is listening on and credentials.
293+
When the Snakemake process terminates, the sidecar process will be terminated as well.
294+
282295
Constraining wildcards
283296
::::::::::::::::::::::
284297

snakemake/__init__.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ def snakemake(
168168
cluster_status=None,
169169
cluster_cancel=None,
170170
cluster_cancel_nargs=None,
171+
cluster_sidecar=None,
171172
export_cwl=None,
172173
show_failed_logs=False,
173174
keep_incomplete=False,
@@ -300,6 +301,7 @@ def snakemake(
300301
cluster_status (str): status command for cluster execution. If None, Snakemake will rely on flag files. Otherwise, it expects the command to return "success", "failure" or "running" when executing with a cluster jobid as a single argument.
301302
cluster_cancel (str): command to cancel multiple job IDs (like SLURM 'scancel') (default None)
302303
cluster_cancel_nargs (int): maximal number of job ids to pass to cluster_cancel (default 1000)
304+
cluster_sidecar (str): command that starts a sidecar process, see cluster documentation (default None)
303305
export_cwl (str): Compile workflow to CWL and save to given file
304306
log_handler (function): redirect snakemake output to this custom log handler, a function that takes a log message dictionary (see below) as its only argument (default None). The log message dictionary for the log handler has to following entries:
305307
keep_incomplete (bool): keep incomplete output files of failed jobs
@@ -697,6 +699,7 @@ def snakemake(
697699
cluster_status=cluster_status,
698700
cluster_cancel=cluster_cancel,
699701
cluster_cancel_nargs=cluster_cancel_nargs,
702+
cluster_sidecar=cluster_sidecar,
700703
max_jobs_per_second=max_jobs_per_second,
701704
max_status_checks_per_second=max_status_checks_per_second,
702705
overwrite_groups=overwrite_groups,
@@ -785,6 +788,7 @@ def snakemake(
785788
cluster_status=cluster_status,
786789
cluster_cancel=cluster_cancel,
787790
cluster_cancel_nargs=cluster_cancel_nargs,
791+
cluster_sidecar=cluster_sidecar,
788792
report=report,
789793
report_stylesheet=report_stylesheet,
790794
export_cwl=export_cwl,
@@ -2152,6 +2156,12 @@ def get_argument_parser(profile=None):
21522156
help="Specify maximal number of job ids to pass to --cluster-cancel "
21532157
"command, defaults to 1000.",
21542158
)
2159+
group_cluster.add_argument(
2160+
"--cluster-sidecar",
2161+
default=None,
2162+
help="Optional command to start a sidecar process during cluster "
2163+
"execution. Only active when --cluster is given as well.",
2164+
)
21552165
group_cluster.add_argument(
21562166
"--drmaa-log-dir",
21572167
metavar="DIR",
@@ -2424,7 +2434,7 @@ def adjust_path(f):
24242434
args.cluster_config = adjust_path(args.cluster_config)
24252435
if args.cluster_sync:
24262436
args.cluster_sync = adjust_path(args.cluster_sync)
2427-
for key in "cluster_status", "cluster_cancel":
2437+
for key in "cluster_status", "cluster_cancel", "cluster_sidecar":
24282438
if getattr(args, key):
24292439
setattr(args, key, adjust_path(getattr(args, key)))
24302440
if args.report_stylesheet:
@@ -2900,6 +2910,7 @@ def open_browser():
29002910
cluster_status=args.cluster_status,
29012911
cluster_cancel=args.cluster_cancel,
29022912
cluster_cancel_nargs=args.cluster_cancel_nargs,
2913+
cluster_sidecar=args.cluster_sidecar,
29032914
export_cwl=args.export_cwl,
29042915
show_failed_logs=args.show_failed_logs,
29052916
keep_incomplete=args.keep_incomplete,

snakemake/executors/__init__.py

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import subprocess
1919
import signal
2020
import tempfile
21+
import threading
2122
from functools import partial
2223
from itertools import chain
2324
from collections import namedtuple
@@ -925,6 +926,7 @@ def __init__(
925926
statuscmd=None,
926927
cancelcmd=None,
927928
cancelnargs=None,
929+
sidecarcmd=None,
928930
cluster_config=None,
929931
jobname="snakejob.{rulename}.{jobid}.sh",
930932
printreason=False,
@@ -947,6 +949,7 @@ def __init__(
947949

948950
self.statuscmd = statuscmd
949951
self.cancelcmd = cancelcmd
952+
self.sidecarcmd = sidecarcmd
950953
self.cancelnargs = cancelnargs
951954
self.external_jobid = dict()
952955
# We need to collect all external ids so we can properly cancel even if
@@ -970,6 +973,10 @@ def __init__(
970973
keepmetadata=keepmetadata,
971974
)
972975

976+
self.sidecar_vars = None
977+
if self.sidecarcmd:
978+
self._launch_sidecar()
979+
973980
if statuscmd:
974981
self.exec_job += " && exit 0 || exit 1"
975982
elif assume_shared_fs:
@@ -982,13 +989,53 @@ def __init__(
982989
"specify a cluster status command."
983990
)
984991

992+
def _launch_sidecar(self):
993+
def copy_stdout(executor, process):
994+
"""Run sidecar process and copy it's stdout to our stdout."""
995+
while process.poll() is None and executor.wait:
996+
buf = process.stdout.readline()
997+
if buf:
998+
self.stdout.write(buf)
999+
# one final time ...
1000+
buf = process.stdout.readline()
1001+
if buf:
1002+
self.stdout.write(buf)
1003+
1004+
def wait(executor, process):
1005+
while executor.wait:
1006+
time.sleep(0.5)
1007+
process.terminate()
1008+
process.wait()
1009+
logger.info(
1010+
"Cluster sidecar process has terminated (retcode=%d)."
1011+
% process.returncode
1012+
)
1013+
1014+
logger.info("Launch sidecar process and read first output line.")
1015+
process = subprocess.Popen(
1016+
self.sidecarcmd, stdout=subprocess.PIPE, shell=False, encoding="utf-8"
1017+
)
1018+
self.sidecar_vars = process.stdout.readline()
1019+
while self.sidecar_vars and self.sidecar_vars[-1] in "\n\r":
1020+
self.sidecar_vars = self.sidecar_vars[:-1]
1021+
logger.info("Done reading first output line.")
1022+
1023+
thread_stdout = threading.Thread(
1024+
target=copy_stdout, name="sidecar_stdout", args=(self, process)
1025+
)
1026+
thread_stdout.start()
1027+
thread_wait = threading.Thread(
1028+
target=wait, name="sidecar_stdout", args=(self, process)
1029+
)
1030+
thread_wait.start()
1031+
9851032
def cancel(self):
9861033
def _chunks(lst, n):
9871034
"""Yield successive n-sized chunks from lst."""
9881035
for i in range(0, len(lst), n):
9891036
yield lst[i : i + n]
9901037

991-
if self.cancelcmd: # We have --cluster-[m]cancel
1038+
if self.cancelcmd: # We have --cluster-cancel
9921039
# Enumerate job IDs and create chunks. If cancelnargs evaluates to false (0/None)
9931040
# then pass all job ids at once
9941041
jobids = list(self.all_ext_jobids)
@@ -998,8 +1045,14 @@ def _chunks(lst, n):
9981045
for chunk in chunks:
9991046
try:
10001047
cancel_timeout = 2 # rather fail on timeout than miss canceling all
1048+
env = dict(os.environ)
1049+
if self.sidecar_vars:
1050+
env["SNAKEMAKE_CLUSTER_SIDECAR_VARS"] = self.sidecar_vars
10011051
subprocess.check_call(
1002-
[self.cancelcmd] + chunk, shell=False, timeout=cancel_timeout
1052+
[self.cancelcmd] + chunk,
1053+
shell=False,
1054+
timeout=cancel_timeout,
1055+
env=env,
10031056
)
10041057
except subprocess.SubprocessError:
10051058
failures += 1
@@ -1070,12 +1123,16 @@ def run(self, job, callback=None, submit_callback=None, error_callback=None):
10701123
raise WorkflowError(str(e), rule=job.rule if not job.is_group() else None)
10711124

10721125
try:
1126+
env = dict(os.environ)
1127+
if self.sidecar_vars:
1128+
env["SNAKEMAKE_CLUSTER_SIDECAR_VARS"] = self.sidecar_vars
10731129
ext_jobid = (
10741130
subprocess.check_output(
10751131
'{submitcmd} "{jobscript}"'.format(
10761132
submitcmd=submitcmd, jobscript=jobscript
10771133
),
10781134
shell=True,
1135+
env=env,
10791136
)
10801137
.decode()
10811138
.split("\n")
@@ -1124,11 +1181,15 @@ def _wait_for_jobs(self):
11241181
def job_status(job, valid_returns=["running", "success", "failed"]):
11251182
try:
11261183
# this command shall return "success", "failed" or "running"
1184+
env = dict(os.environ)
1185+
if self.sidecar_vars:
1186+
env["SNAKEMAKE_CLUSTER_SIDECAR_VARS"] = self.sidecar_vars
11271187
ret = subprocess.check_output(
11281188
"{statuscmd} {jobid}".format(
11291189
jobid=job.jobid, statuscmd=self.statuscmd
11301190
),
11311191
shell=True,
1192+
env=env,
11321193
).decode()
11331194
except subprocess.CalledProcessError as e:
11341195
if e.returncode < 0:

snakemake/scheduler.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ def __init__(
6666
cluster_sync=None,
6767
cluster_cancel=None,
6868
cluster_cancel_nargs=None,
69+
cluster_sidecar=None,
6970
drmaa=None,
7071
drmaa_log_dir=None,
7172
kubernetes=None,
@@ -199,6 +200,7 @@ def __init__(
199200
statuscmd=cluster_status,
200201
cancelcmd=cluster_cancel,
201202
cancelnargs=cluster_cancel_nargs,
203+
sidecarcmd=cluster_sidecar,
202204
max_status_checks_per_second=max_status_checks_per_second,
203205
)
204206

snakemake/workflow.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,7 @@ def execute(
617617
cluster_status=None,
618618
cluster_cancel=None,
619619
cluster_cancel_nargs=None,
620+
cluster_sidecar=None,
620621
report=None,
621622
report_stylesheet=None,
622623
export_cwl=False,
@@ -986,6 +987,7 @@ def files(items):
986987
cluster_status=cluster_status,
987988
cluster_cancel=cluster_cancel,
988989
cluster_cancel_nargs=cluster_cancel_nargs,
990+
cluster_sidecar=cluster_sidecar,
989991
cluster_config=cluster_config,
990992
cluster_sync=cluster_sync,
991993
jobname=jobname,
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from snakemake import shell
2+
3+
4+
rule all:
5+
input: 'f.1', 'f.2'
6+
7+
rule one:
8+
output: 'f.1'
9+
shell: "touch {output}"
10+
11+
rule two:
12+
output: 'f.2'
13+
shell: "touch {output}"

tests/test_cluster_sidecar/expected-results/f.1

Whitespace-only changes.

tests/test_cluster_sidecar/expected-results/f.2

Whitespace-only changes.
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
SNAKEMAKE_CLUSTER_SIDECAR_VARS=FIRST_LINE
2+
SNAKEMAKE_CLUSTER_SIDECAR_VARS=FIRST_LINE
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
sidecar started
2+
sidecar stopped

0 commit comments

Comments
 (0)