Skip to content

Commit 099e8d5

Browse files
Lucio AnderliniLucio Anderlini
authored andcommitted
improved test quality following coderabbit suggestions
1 parent e59e02f commit 099e8d5

File tree

4 files changed

+71
-40
lines changed

4 files changed

+71
-40
lines changed

src/snakemake/cli.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1587,9 +1587,9 @@ def get_argument_parser(profiles=None):
15871587
"generated job id to the first stdout line, {dependencies} will "
15881588
"be filled with space separated job ids this job depends on. "
15891589
"Does not work for workflows that contain checkpoint rules, "
1590-
"and localrules will fail. The additional argument `--notemp` "
1590+
"and localrules will be skipped. The additional argument `--notemp` "
15911591
"should be specified. Most often, `--not-retrieve-storage` is "
1592-
"also required to avoid Snakemake trying to download output files "
1592+
"also recommended to avoid Snakemake trying to download output files "
15931593
"before the jobs producing them are executed. ",
15941594
)
15951595
group_cluster.add_argument(

tests/test_immediate_submit_without_shared_fs/Snakefile

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,16 @@ rule produce:
77
txt="{fname}.txt"
88

99

10-
shell: "echo {wildcards.fname} > {output.txt}"
10+
shell: "printf '%s\n' {wildcards.fname} > {output.txt}"
11+
1112

1213
rule count:
1314
input:
1415
txt="{fname}.txt"
1516
output:
1617
dat="{fname}.dat"
1718

18-
shell: "cat {input.txt} | wc -m > {output.dat}"
19+
shell: "wc -m < {input.txt} > {output.dat}"
1920

2021
rule sum:
2122
input:
@@ -28,7 +29,7 @@ rule sum:
2829
"""
2930
SUM=0
3031
for i in {input.counts}; do
31-
SUM=$(($SUM + $(cat $i)));
32+
SUM=$(($SUM + $(cat "$i")));
3233
done;
3334
echo "sum: $SUM" > {output.yaml}
3435
"""

tests/test_immediate_submit_without_shared_fs/clustersubmit

Lines changed: 57 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
11
#!/usr/bin/env python3
2+
import sys
23
import asyncio
34
import json
4-
import sys
55
from argparse import ArgumentParser
66
import logging
77
import sqlite3
88
import string
99
import random
1010
import os
11+
import time
12+
import tempfile
1113

1214
## Configuration
13-
DATABASE_FILE = "/tmp/clustersubmit.sqlite3"
14-
LOG_FILE = "/tmp/clustersubmit.log"
15+
DATABASE_FILE = os.environ.get("DATABASE_FILE", os.path.join(tempfile.gettempdir(), "clustersubmit.sqlite3"))
16+
LOG_FILE = os.environ.get("LOG_FILE", os.path.join(tempfile.gettempdir(), "clustersubmit.log"))
17+
TIMEOUT_SECONDS = float(os.environ.get("TIMEOUT_SECONDS", "300"))
1518

1619
logging.basicConfig(
1720
level=logging.DEBUG,
@@ -41,10 +44,27 @@ args = parser.parse_args()
4144

4245
## Initialization
4346
with sqlite3.connect(DATABASE_FILE) as db:
44-
db.execute("CREATE TABLE IF NOT EXISTS jobs(jobid TEXT, jobscript TEXT, done INT, testtime)")
45-
db.execute("CREATE TABLE IF NOT EXISTS edges(parent TEXT, child TEXT, testtime)")
46-
db.execute("DELETE FROM jobs WHERE testtime <= datetime('now', '-1 hour')")
47-
db.execute("DELETE FROM edges WHERE testtime <= datetime('now', '-1 hour')")
47+
db.execute("""
48+
CREATE TABLE IF NOT EXISTS jobs(
49+
jobid TEXT PRIMARY KEY,
50+
jobscript TEXT,
51+
done INT,
52+
created_at TEXT
53+
)
54+
""")
55+
56+
db.execute("""
57+
CREATE TABLE IF NOT EXISTS edges(
58+
parent TEXT,
59+
child TEXT,
60+
created_at TEXT,
61+
FOREIGN KEY(parent) REFERENCES jobs(jobid),
62+
FOREIGN KEY(child) REFERENCES jobs(jobid)
63+
);
64+
""")
65+
66+
db.execute("DELETE FROM jobs WHERE created_at <= datetime('now', '-1 hour')")
67+
db.execute("DELETE FROM edges WHERE created_at <= datetime('now', '-1 hour')")
4868

4969

5070

@@ -57,69 +77,79 @@ async def execute_single_job(jobid, jobscript):
5777
"""
5878
logging.info(f"Executing {jobid}")
5979
proc = await asyncio.create_subprocess_shell(jobscript)
60-
await proc.communicate()
80+
stdout, stderr = await proc.communicate()
81+
if proc.returncode != 0:
82+
logging.critical(stdout)
83+
logging.critical(stderr)
84+
raise RuntimeError(f"Falied executing job {jobid}")
85+
6186
async with db_lock:
6287
with sqlite3.connect(DATABASE_FILE, timeout=30) as db:
6388
db.execute("PRAGMA journal_mode=WAL")
6489
db.execute("UPDATE jobs SET done = 1 WHERE jobid = ?", (jobid,))
6590
logging.info(f"Marked {jobid} as done")
66-
async def execute_jobs():
91+
92+
93+
async def execute_jobs(timeout_seconds=TIMEOUT_SECONDS):
6794
"""
6895
Simplistic scheduler checking jobs that can run and executing them
6996
"""
70-
for _ in range(100):
97+
start = time.perf_counter()
98+
while (time.perf_counter() - start) < timeout_seconds:
7199
with sqlite3.connect(DATABASE_FILE) as db:
72100
result = db.execute(QUERY_RUNNABLE).fetchall()
73-
if len(result):
74-
async with asyncio.TaskGroup() as tg:
75-
for row in result:
76-
tg.create_task(execute_single_job(*row))
77-
else:
101+
if not result:
78102
logging.info("Could not find any other job to execute.")
79103
return
104+
105+
async with asyncio.TaskGroup() as tg:
106+
for row in result:
107+
tg.create_task(execute_single_job(*row))
108+
109+
raise TimeoutError(f"Exceeded timeout of {timeout_seconds} seconds")
80110

81111

82112
if args.execute:
83113
asyncio.run(execute_jobs())
84114
exit(0)
85115
elif args.jobscript is None:
116+
logging.critical(f"Failed to submit a job: missing jobscript. {sys.argv}")
86117
raise ValueError("Expected jobscript as the last positional argument")
87118

88119

89120

90-
91121
## Submission
92122
with open(args.jobscript) as input_file:
93-
for line in input_file:
94-
if line.startswith("# properties = "):
95-
properties = json.loads(line[len("# properties = "):-1])
96-
break
123+
jobscript_content = input_file.read()
124+
125+
for line in jobscript_content.splitlines():
126+
if line.startswith("# properties = "):
127+
properties = json.loads(line[len("# properties = "):])
128+
break
97129

98130
logging.info(f"---")
99131
logging.info(f"Processing rule: {properties['rule']}")
100132

101133
job_name = ''.join(
102-
[properties['rule'], '-'] +
134+
[properties['rule'], '-', str(time.time()), '-'] +
103135
[random.choice(string.ascii_lowercase) for _ in range(5)]
104136
)
105137

106-
dependencies = [s for s in args.dependencies.split(" ") if s not in [' ', '', None]]
138+
dependencies = args.dependencies.split()
107139
logging.info(f"Submitting `{job_name}` which depends on {dependencies}")
108140

109141
with sqlite3.connect(DATABASE_FILE) as db:
110142
db.execute(
111-
"INSERT INTO jobs(jobid, jobscript, done, testtime) VALUES (?, ?, 0, datetime('now'))",
112-
(job_name, open(args.jobscript).read())
143+
"INSERT INTO jobs(jobid, jobscript, done, created_at) VALUES (?, ?, 0, datetime('now'))",
144+
(job_name, jobscript_content)
113145
)
114146

115147
for dependency in dependencies:
116148
db.execute(
117-
"INSERT INTO edges(parent, child, testtime) VALUES (?, ?, datetime('now'))",
149+
"INSERT INTO edges(parent, child, created_at) VALUES (?, ?, datetime('now'))",
118150
(dependency, job_name)
119151
)
120152

121-
122-
123153
print(job_name)
124154

125155

tests/tests.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2478,14 +2478,14 @@ def test_params_empty_inherit():
24782478
def test_immediate_submit_without_shared_fs():
24792479
run(
24802480
dpath("test_immediate_submit_without_shared_fs"),
2481-
shellcmd="snakemake "
2482-
"--executor cluster-generic "
2483-
"--cluster-generic-submit-cmd \"./clustersubmit --dependencies \\\"{dependencies}\\\"\" "
2484-
"--forceall "
2485-
"--immediate-submit "
2486-
"--notemp "
2487-
"--jobs 10 "
2488-
"; ./clustersubmit --execute"
2481+
shellcmd="""snakemake \
2482+
--executor cluster-generic \
2483+
--cluster-generic-submit-cmd "./clustersubmit --dependencies \\\"{dependencies}\\\"" \
2484+
--forceall \
2485+
--immediate-submit \
2486+
--notemp \
2487+
--jobs 10 \
2488+
; ./clustersubmit --execute """
24892489
)
24902490

24912491

0 commit comments

Comments
 (0)