Skip to content

Commit 495a4e7

Browse files
Hocnonsensejohanneskoestercoderabbitai[bot]
authored
fix: force check all required outputs (#3341)
This may fix #3036 This fix discards totaly change `future_output` to `created_output`, and directly check if all output the rule wanted are created. I'm somehow doubt if `future_output` used some elsewhere. Is it needed to add it back? ### QC <!-- Make sure that you can tick the boxes below. --> * [x] The PR contains test case <`tests/tests.py::test_checkpoints_many`> for the changes. * [x] the change does neither modify the language nor the behavior or functionalities of Snakemake. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced workflow execution with refined job ordering, checkpoint validation, file parsing, checksum extraction, and updated CLI help text. - **Documentation** - Updated examples and guidelines for input parsing and configuration validations, with typographical corrections for improved clarity. - **Tests** - Introduced new test cases for checkpoint and data format validation, and updated expected outputs and schema definitions. - **Chores** - Revised CI workflows with conditional execution and updated environment dependencies to streamline the build process. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Johannes Köster <[email protected]> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Co-authored-by: Johannes Köster <[email protected]>
1 parent 4ba62fe commit 495a4e7

File tree

6 files changed

+94
-18
lines changed

6 files changed

+94
-18
lines changed

snakemake/checkpoints.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
1-
from snakemake.common import async_run
1+
from typing import TYPE_CHECKING
22
from snakemake.exceptions import IncompleteCheckpointException, WorkflowError
33
from snakemake.io import checkpoint_target
44

5+
if TYPE_CHECKING:
6+
from snakemake.rules import Rule
7+
58

69
class Checkpoints:
710
"""A namespace for checkpoints so that they can be accessed via dot notation."""
811

912
def __init__(self):
10-
self.future_output = None
1113
self.created_output = None
1214

1315
def register(self, rule, fallback_name=None):
@@ -20,7 +22,7 @@ def register(self, rule, fallback_name=None):
2022
class Checkpoint:
2123
__slots__ = ["rule", "checkpoints"]
2224

23-
def __init__(self, rule, checkpoints):
25+
def __init__(self, rule: "Rule", checkpoints: Checkpoints):
2426
self.rule = rule
2527
self.checkpoints = checkpoints
2628

@@ -32,11 +34,9 @@ def get(self, **wildcards):
3234
)
3335

3436
output, _ = self.rule.expand_output(wildcards)
35-
if self.checkpoints.future_output is not None:
36-
for iofile in output:
37-
if iofile in self.checkpoints.future_output:
38-
break
39-
else:
37+
if self.checkpoints.created_output is not None:
38+
may_not_created = set(output) - set(self.checkpoints.created_output)
39+
if not may_not_created:
4040
return CheckpointJob(self.rule, output)
4141

4242
raise IncompleteCheckpointException(self.rule, checkpoint_target(output[0]))
@@ -45,6 +45,6 @@ def get(self, **wildcards):
4545
class CheckpointJob:
4646
__slots__ = ["rule", "output"]
4747

48-
def __init__(self, rule, output):
48+
def __init__(self, rule: "Rule", output):
4949
self.output = output
5050
self.rule = rule

snakemake/dag.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -267,17 +267,16 @@ def checkpoint_jobs(self):
267267
return self._checkpoint_jobs
268268

269269
@property
270-
def finished_checkpoint_jobs(self):
271-
for job in self.finished_jobs:
272-
if job.is_checkpoint:
270+
def finished_and_not_needrun_checkpoint_jobs(self):
271+
for job in self.jobs:
272+
if job.is_checkpoint and (self.finished(job) or not self.needrun(job)):
273273
yield job
274274

275275
def update_checkpoint_outputs(self):
276-
workflow.checkpoints.future_output = set(
277-
f for job in self.checkpoint_jobs for f in job.output
278-
)
279276
workflow.checkpoints.created_output = set(
280-
f for job in self.finished_checkpoint_jobs for f in job.output
277+
f
278+
for job in self.finished_and_not_needrun_checkpoint_jobs
279+
for f in job.output
281280
)
282281

283282
def update_jobids(self):
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import glob
2+
import random
3+
from pathlib import Path
4+
5+
ALL_SAMPLES = ["s1"]
6+
7+
8+
rule all:
9+
input:
10+
expand("collect/{sample}/all_done.txt", sample=ALL_SAMPLES),
11+
12+
13+
rule before:
14+
output:
15+
ALL_SAMPLES,
16+
run:
17+
for sample in ALL_SAMPLES:
18+
Path(sample).touch()
19+
20+
21+
checkpoint first:
22+
input:
23+
expand("{sample}", sample=ALL_SAMPLES),
24+
output:
25+
directory("first/{sample}"),
26+
run:
27+
for i in range(1, 5):
28+
Path(f"{output[0]}/{i}").mkdir(parents=True, exist_ok=True)
29+
Path(f"{output[0]}/{i}/test.txt").touch()
30+
31+
32+
checkpoint second:
33+
input:
34+
"first/{sample}/{i}/test.txt",
35+
output:
36+
directory("second/{sample}/{i}"),
37+
run:
38+
for j in range(6, 10):
39+
Path(f"{output[0]}/{j}").mkdir(parents=True, exist_ok=True)
40+
Path(f"{output[0]}/{j}/test2.txt").touch()
41+
42+
43+
rule copy:
44+
input:
45+
"second/{sample}/{i}/{j}/test2.txt",
46+
output:
47+
touch("copy/{sample}/{i}/{j}/test2.txt"),
48+
49+
50+
def aggregate(wildcards):
51+
52+
outputs_i = glob.glob(f"{checkpoints.first.get(**wildcards).output}/*/")
53+
54+
outputs_i = [output.split("/")[-2] for output in outputs_i]
55+
56+
split_files = []
57+
for i in outputs_i:
58+
s2out = checkpoints.second.get(**wildcards, i=i).output[0]
59+
assert Path(s2out).exists()
60+
output_j = glob.glob(f"{s2out}/*/")
61+
outputs_j = [output.split("/")[-2] for output in output_j]
62+
for j in outputs_j:
63+
split_files.extend(
64+
expand(f"copy/{{sample}}/{i}/{j}/test2.txt", sample=wildcards.sample)
65+
)
66+
return split_files
67+
68+
69+
rule collect:
70+
input:
71+
aggregate,
72+
output:
73+
touch("collect/{sample}/all_done.txt"),

tests/test_checkpoints_many/expected-results/collect/s1/all_done.txt

Whitespace-only changes.

tests/test_issue1092/Snakefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ rule aggregate:
2929
aggregate_input
3030
output:
3131
"aggregated.txt"
32-
#shell:
33-
# "cat {input} > {output}"
32+
# shell:
33+
# "cat {input} > {output}"
3434
run:
3535
shell("cat {input} > {output}")
3636

tests/tests.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,6 +1167,10 @@ def test_checkpoints_dir():
11671167
run(dpath("test_checkpoints_dir"))
11681168

11691169

1170+
def test_checkpoints_many():
1171+
run(dpath("test_checkpoints_many"))
1172+
1173+
11701174
def test_issue1092():
11711175
run(dpath("test_issue1092"))
11721176

0 commit comments

Comments
 (0)