Skip to content

Commit 58c42cf

Browse files
landerliniLucio Anderlinicoderabbitai[bot]johanneskoester
authored
fix: avoid checking output files in immediate-submit mode (#3569)
<!--Add a description of your PR here--> Disable checking output files when snakemake is run with flag `--immediate-submit`. Notes: * The flag `--notemp` should be additionally indicated (the CLI suggests for it) * The flag `--not-retrieve-storage` should be indicated to avoid snakemake to try retrieving remote data that may not yet exist * `localrule`s break the DAG because their dependency pattern can not be handled by cluster logics. A warning is printed and the scheduler stops processing jobs requiring their output. ### QC <!-- Make sure that you can tick the boxes below. --> * [x] The PR contains a test case for the changes or the changes are already covered by an existing test case. * [x] The documentation (`docs/`) is updated to reflect the changes or this is not necessary (e.g. if the change does neither modify the language nor the behavior or functionalities of Snakemake). <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Summary by CodeRabbit - **New Features** - Added support for immediate job submission mode that skips local job execution and adjusts output handling accordingly. - Introduced a new test workflow and cluster submission script to validate execution without a shared filesystem. - **Bug Fixes** - Improved handling of missing outputs during job postprocessing to avoid waiting for intentionally ignored files. - **Documentation** - Expanded help text for the immediate submission option with additional usage guidance. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Lucio Anderlini <[email protected]> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Co-authored-by: Johannes Koester <[email protected]>
1 parent 2f663be commit 58c42cf

File tree

7 files changed

+260
-19
lines changed

7 files changed

+260
-19
lines changed

src/snakemake/cli.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1584,7 +1584,11 @@ def get_argument_parser(profiles=None):
15841584
"Assuming that your submit script (here sbatch) outputs the "
15851585
"generated job id to the first stdout line, {dependencies} will "
15861586
"be filled with space separated job ids this job depends on. "
1587-
"Does not work for workflows that contain checkpoint rules.",
1587+
"Does not work for workflows that contain checkpoint rules, "
1588+
"and localrules will be skipped. The additional argument `--notemp` "
1589+
"should be specified. Most often, `--not-retrieve-storage` is "
1590+
"also recommended to avoid Snakemake trying to download output files "
1591+
"before the jobs producing them are executed. ",
15881592
)
15891593
group_cluster.add_argument(
15901594
"--jobscript",

src/snakemake/jobs.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1245,7 +1245,12 @@ async def postprocess(
12451245
)
12461246
if not error:
12471247
self.dag.handle_protected(self)
1248-
elif not shared_input_output and not wait_for_local and not error:
1248+
elif (
1249+
not shared_input_output
1250+
and not wait_for_local
1251+
and not error
1252+
and not ignore_missing_output
1253+
):
12491254
expanded_output = list(self.output)
12501255
if self.benchmark:
12511256
expanded_output.append(self.benchmark)

src/snakemake/scheduling/job_scheduler.py

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -338,26 +338,34 @@ def schedule(self):
338338
local_runjobs = [job for job in run if job.is_local]
339339
runjobs = [job for job in run if not job.is_local]
340340
if local_runjobs:
341-
if (
342-
not self.workflow.remote_exec
343-
and not self.workflow.local_exec
344-
):
345-
# Workflow uses a remote plugin and this scheduling run
346-
# is on the main process. Hence, we have to download
347-
# non-shared remote files for the local jobs.
348-
async_run(
349-
self.workflow.dag.retrieve_storage_inputs(
350-
jobs=local_runjobs, also_missing_internal=True
341+
if self.workflow.remote_execution_settings.immediate_submit:
342+
logger.warning(
343+
"The following local rules cannot run when the "
344+
"--immediate-submit flag is specified. Skipping: "
345+
f"{', '.join(j.name for j in local_runjobs)}."
346+
)
347+
else:
348+
if (
349+
not self.workflow.remote_exec
350+
and not self.workflow.local_exec
351+
):
352+
# Workflow uses a remote plugin and this scheduling run
353+
# is on the main process. Hence, we have to download
354+
# non-shared remote files for the local jobs.
355+
async_run(
356+
self.workflow.dag.retrieve_storage_inputs(
357+
jobs=local_runjobs, also_missing_internal=True
358+
)
351359
)
360+
361+
self.run(
362+
local_runjobs,
363+
executor=self._local_executor or self._executor,
352364
)
353-
self.run(
354-
local_runjobs,
355-
executor=self._local_executor or self._executor,
356-
)
357365
if runjobs:
358366
self.run(runjobs)
359-
if not self.dryrun:
360367

368+
if not self.dryrun:
361369
if self._run_performed is None or self._run_performed:
362370
if self.running:
363371
logger.debug("Waiting for running jobs to complete.")
@@ -388,6 +396,10 @@ def _schedule_reevalutation(self, delay: int) -> None:
388396
def _finish_jobs(self):
389397
# must be called from within lock
390398
# clear the global tofinish such that parallel calls do not interfere
399+
400+
# shortcut to "--immediate-submit" flag
401+
immediate_submit = self.workflow.remote_execution_settings.immediate_submit
402+
391403
async def postprocess():
392404
for job in self._tofinish:
393405
# IMPORTANT: inside of this loop, there may be no calls that have
@@ -400,13 +412,14 @@ async def postprocess():
400412
store_in_storage=not self.touch,
401413
handle_log=True,
402414
handle_touch=not self.touch,
403-
ignore_missing_output=self.touch,
415+
ignore_missing_output=self.touch or immediate_submit,
404416
)
405417
elif self.workflow.exec_mode == ExecMode.SUBPROCESS:
406418
await job.postprocess(
407419
store_in_storage=False,
408420
handle_log=True,
409421
handle_touch=True,
422+
ignore_missing_output=immediate_submit,
410423
)
411424
else:
412425
# remote job execution
@@ -417,12 +430,15 @@ async def postprocess():
417430
store_in_storage=False,
418431
handle_log=True,
419432
handle_touch=True,
433+
ignore_missing_output=immediate_submit,
420434
)
421435
except (RuleException, WorkflowError) as e:
422436
# if an error occurs while processing job output,
423437
# we do the same as in case of errors during execution
424438
print_exception(e, self.workflow.linemaps)
425-
await job.postprocess(error=True)
439+
await job.postprocess(
440+
error=True, ignore_missing_output=immediate_submit
441+
)
426442
self._handle_error(job, postprocess_job=False)
427443
continue
428444

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
rule all:
2+
input: "report.yaml"
3+
4+
5+
rule produce:
6+
output:
7+
txt="{fname}.txt"
8+
9+
10+
shell: "printf '%s\n' {wildcards.fname} > {output.txt}"
11+
12+
13+
rule count:
14+
input:
15+
txt="{fname}.txt"
16+
output:
17+
dat="{fname}.dat"
18+
19+
shell: "wc -m < {input.txt} > {output.dat}"
20+
21+
rule sum:
22+
input:
23+
counts=expand("{f}.dat", f=['foo', 'bar', 'baz', 'qux', 'quux']),
24+
25+
output:
26+
yaml="report.yaml"
27+
28+
shell:
29+
"""
30+
SUM=0
31+
for i in {input.counts}; do
32+
SUM=$(($SUM + $(cat "$i")));
33+
done;
34+
echo "sum: $SUM" > {output.yaml}
35+
"""
36+
37+
38+
39+
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
#!/usr/bin/env python3
2+
import sys
3+
import asyncio
4+
import json
5+
from argparse import ArgumentParser
6+
import logging
7+
import sqlite3
8+
import string
9+
import random
10+
import os
11+
import time
12+
import tempfile
13+
14+
## Configuration
15+
DATABASE_FILE = os.environ.get("DATABASE_FILE", os.path.join(tempfile.gettempdir(), "clustersubmit.sqlite3"))
16+
LOG_FILE = os.environ.get("LOG_FILE", os.path.join(tempfile.gettempdir(), "clustersubmit.log"))
17+
TIMEOUT_SECONDS = float(os.environ.get("TIMEOUT_SECONDS", "300"))
18+
19+
logging.basicConfig(
20+
level=logging.DEBUG,
21+
format="%(asctime)s | %(levelname)-8s | %(message)s",
22+
datefmt="%Y-%m-%d %H:%M:%S",
23+
filename=LOG_FILE,
24+
filemode="a",
25+
)
26+
QUERY_RUNNABLE = """
27+
SELECT child.jobid, child.jobscript
28+
FROM jobs AS child
29+
WHERE done = 0 AND NOT EXISTS (
30+
SELECT * FROM jobs AS parents
31+
JOIN edges AS e ON e.parent = parents.jobid
32+
WHERE e.child = child.jobid AND done = 0
33+
);
34+
"""
35+
36+
37+
## Arguments
38+
parser = ArgumentParser("Mock script representing cluster-managed DAG execution")
39+
parser.add_argument("-d", "--dependencies", help="Space-separated job ids", default="")
40+
parser.add_argument("--execute", action='store_true', help="Execute pending jobs and exits")
41+
parser.add_argument("jobscript", type=str, default=None, nargs='?')
42+
args = parser.parse_args()
43+
44+
45+
## Initialization
46+
with sqlite3.connect(DATABASE_FILE) as db:
47+
db.execute("""
48+
CREATE TABLE IF NOT EXISTS jobs(
49+
jobid TEXT PRIMARY KEY,
50+
jobscript TEXT,
51+
done INT,
52+
created_at TEXT
53+
)
54+
""")
55+
56+
db.execute("""
57+
CREATE TABLE IF NOT EXISTS edges(
58+
parent TEXT,
59+
child TEXT,
60+
created_at TEXT,
61+
FOREIGN KEY(parent) REFERENCES jobs(jobid),
62+
FOREIGN KEY(child) REFERENCES jobs(jobid)
63+
);
64+
""")
65+
66+
db.execute("DELETE FROM jobs WHERE created_at <= datetime('now', '-1 hour')")
67+
db.execute("DELETE FROM edges WHERE created_at <= datetime('now', '-1 hour')")
68+
69+
70+
71+
## execution
72+
db_lock = asyncio.Lock()
73+
74+
async def execute_single_job(jobid, jobscript):
75+
"""
76+
Coroutine executing one job
77+
"""
78+
logging.info(f"Executing {jobid}")
79+
proc = await asyncio.create_subprocess_shell(jobscript)
80+
stdout, stderr = await proc.communicate()
81+
if proc.returncode != 0:
82+
logging.error(f"Job {jobid} failed with return code {proc.returncode}")
83+
if stdout:
84+
logging.debug(f"Job {jobid} stdout: {stdout.decode()[:500]}...")
85+
if stderr:
86+
logging.error(f"Job {jobid} stderr: {stderr.decode()[:500]}...")
87+
88+
raise RuntimeError(f"Faied executing job {jobid}")
89+
90+
async with db_lock:
91+
with sqlite3.connect(DATABASE_FILE, timeout=30) as db:
92+
db.execute("PRAGMA journal_mode=WAL")
93+
db.execute("UPDATE jobs SET done = 1 WHERE jobid = ?", (jobid,))
94+
logging.info(f"Marked {jobid} as done")
95+
96+
97+
async def execute_jobs(timeout_seconds=TIMEOUT_SECONDS):
98+
"""
99+
Simplistic scheduler checking jobs that can run and executing them
100+
"""
101+
start = time.perf_counter()
102+
while (time.perf_counter() - start) < timeout_seconds:
103+
with sqlite3.connect(DATABASE_FILE) as db:
104+
result = db.execute(QUERY_RUNNABLE).fetchall()
105+
if not result:
106+
logging.info("Could not find any other job to execute.")
107+
return
108+
109+
async with asyncio.TaskGroup() as tg:
110+
for row in result:
111+
tg.create_task(execute_single_job(*row))
112+
113+
raise TimeoutError(f"Exceeded timeout of {timeout_seconds} seconds")
114+
115+
116+
if args.execute:
117+
asyncio.run(execute_jobs())
118+
exit(0)
119+
elif args.jobscript is None:
120+
logging.critical(f"Failed to submit a job: missing jobscript. {sys.argv}")
121+
raise ValueError("Expected jobscript as the last positional argument")
122+
123+
124+
125+
## Submission
126+
with open(args.jobscript) as input_file:
127+
jobscript_content = input_file.read()
128+
129+
for line in jobscript_content.splitlines():
130+
if line.startswith("# properties = "):
131+
properties = json.loads(line[len("# properties = "):])
132+
break
133+
134+
logging.info(f"---")
135+
logging.info(f"Processing rule: {properties['rule']}")
136+
137+
job_name = ''.join(
138+
[properties['rule'], '-', str(time.time()), '-'] +
139+
[random.choice(string.ascii_lowercase) for _ in range(5)]
140+
)
141+
142+
dependencies = args.dependencies.split()
143+
logging.info(f"Submitting `{job_name}` which depends on {dependencies}")
144+
145+
with sqlite3.connect(DATABASE_FILE) as db:
146+
db.execute(
147+
"INSERT INTO jobs(jobid, jobscript, done, created_at) VALUES (?, ?, 0, datetime('now'))",
148+
(job_name, jobscript_content)
149+
)
150+
151+
for dependency in dependencies:
152+
db.execute(
153+
"INSERT INTO edges(parent, child, created_at) VALUES (?, ?, datetime('now'))",
154+
(dependency, job_name)
155+
)
156+
157+
print(job_name)
158+
159+
160+
161+
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
sum: 21

tests/tests.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2538,6 +2538,21 @@ def test_params_empty_inherit():
25382538
run(dpath("test_params_empty_inherit"))
25392539

25402540

2541+
@skip_on_windows
2542+
def test_immediate_submit_without_shared_fs():
2543+
run(
2544+
dpath("test_immediate_submit_without_shared_fs"),
2545+
shellcmd="""snakemake \
2546+
--executor cluster-generic \
2547+
--cluster-generic-submit-cmd "./clustersubmit --dependencies \\\"{dependencies}\\\"" \
2548+
--forceall \
2549+
--immediate-submit \
2550+
--notemp \
2551+
--jobs 10 \
2552+
&& ./clustersubmit --execute """,
2553+
)
2554+
2555+
25412556
def test_ambiguousruleexception():
25422557
try:
25432558
run(dpath("test_ambiguousruleexception"))

0 commit comments

Comments
 (0)