Skip to content

Commit b039f8a

Browse files
JZentgrafjohannaelenaschmitzjohannaschmitzjohanneskoester
authored
feat: Maximal file size for checksums (snakemake#3368)
Add a new parameter `--max-checksum-file-size` to adjust the maximal file size for which a checksum is computed when the DAG is build. This is related to snakemake#2949 ### QC <!-- Make sure that you can tick the boxes below. --> * [ ] The PR contains a test case for the changes or the changes are already covered by an existing test case. * [ ] The documentation (`docs/`) is updated to reflect the changes or this is not necessary (e.g. if the change does neither modify the language nor the behavior or functionalities of Snakemake). <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added a new command-line option to let users specify a maximum file size for checksum processing, with a default threshold of 1,000,000 bytes. - **Refactor** - Updated file validation and checksum routines to enforce the new file size limit, ensuring more efficient processing across workflow operations. - Made method signatures more explicit by removing default values for threshold parameters in checksum-related methods. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Johanna <[email protected]> Co-authored-by: johannaschmitz <[email protected]> Co-authored-by: Johannes Köster <[email protected]>
1 parent 4430e23 commit b039f8a

File tree

5 files changed

+38
-11
lines changed

5 files changed

+38
-11
lines changed

snakemake/cli.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@
7070
from snakemake.utils import available_cpu_count, update_config
7171

7272

73+
def parse_size_in_bytes(value):
74+
from humanfriendly import parse_size
75+
76+
return parse_size(value)
77+
78+
7379
def expandvars(atype):
7480
def inner(args):
7581
if isinstance(args, list):
@@ -1270,6 +1276,15 @@ def get_argument_parser(profiles=None):
12701276
"network file systems. Hence, we do not spend more than a given amount of time and fall back "
12711277
"to individual checks for the rest.",
12721278
)
1279+
group_behavior.add_argument(
1280+
"--max-checksum-file-size",
1281+
default=1000000,
1282+
metavar="SIZE",
1283+
parse_func=parse_size_in_bytes,
1284+
help="Compute the checksum during DAG computation and job postprocessing "
1285+
"only for files that are smaller than the provided threshold (given in any valid size "
1286+
"unit, e.g. 1MB, which is also the default). ",
1287+
)
12731288
group_behavior.add_argument(
12741289
"--latency-wait",
12751290
"--output-wait",
@@ -2052,6 +2067,7 @@ def args_to_api(args, parser):
20522067
allowed_rules=args.allowed_rules,
20532068
rerun_triggers=args.rerun_triggers,
20542069
max_inventory_wait_time=args.max_inventory_time,
2070+
max_checksum_file_size=args.max_checksum_file_size,
20552071
strict_evaluation=args.strict_dag_evaluation,
20562072
),
20572073
)

snakemake/dag.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ def __init__(
145145
self._running = set()
146146
self._jobs_with_finished_queue_input = set()
147147
self._storage_input_jobs = defaultdict(list)
148+
self.max_checksum_file_size = self.workflow.dag_settings.max_checksum_file_size
148149

149150
self.job_factory = JobFactory()
150151
self.group_job_factory = GroupJobFactory()
@@ -649,7 +650,9 @@ async def is_not_same_checksum(f, checksum):
649650
e,
650651
rule=job.rule,
651652
)
652-
return not await f.is_same_checksum(checksum, force=True)
653+
return not await f.is_same_checksum(
654+
checksum, self.max_checksum_file_size, force=True
655+
)
653656

654657
checksum_failed_output = [
655658
f
@@ -1254,7 +1257,7 @@ async def is_same_checksum(f, job):
12541257
try:
12551258
return is_same_checksum_cache[(f, job)]
12561259
except KeyError:
1257-
if not await f.is_checksum_eligible():
1260+
if not await f.is_checksum_eligible(self.max_checksum_file_size):
12581261
# no chance to compute checksum, cannot be assumed the same
12591262
is_same = False
12601263
else:
@@ -1268,7 +1271,9 @@ async def is_same_checksum(f, job):
12681271
# no checksums recorded, we cannot assume them to be the same
12691272
is_same = False
12701273
else:
1271-
is_same = await f.is_same_checksum(checksums.pop())
1274+
is_same = await f.is_same_checksum(
1275+
checksums.pop(), self.max_checksum_file_size
1276+
)
12721277

12731278
is_same_checksum_cache[(f, job)] = is_same
12741279
return is_same

snakemake/io.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -596,19 +596,19 @@ async def size_local(self):
596596
await self.check_broken_symlink()
597597
return os.path.getsize(self.file)
598598

599-
async def is_checksum_eligible(self):
599+
async def is_checksum_eligible(self, threshold):
600600
return (
601601
await self.exists_local()
602602
and not os.path.isdir(self.file)
603-
and await self.size() <= 1000000
603+
and await self.size() <= threshold
604604
and not self.is_fifo()
605605
)
606606

607-
async def checksum(self, force=False):
607+
async def checksum(self, threshold, force=False):
608608
"""Return checksum if file is small enough, else None.
609609
Returns None if file does not exist. If force is True,
610610
omit eligibility check."""
611-
if force or await self.is_checksum_eligible(): # less than 1 MB
611+
if force or await self.is_checksum_eligible(threshold):
612612
checksum = sha256()
613613
if await self.size() > 0:
614614
# only read if file is bigger than zero
@@ -623,8 +623,8 @@ async def checksum(self, force=False):
623623
else:
624624
return None
625625

626-
async def is_same_checksum(self, other_checksum, force=False):
627-
checksum = await self.checksum(force=force)
626+
async def is_same_checksum(self, other_checksum, threshold, force=False):
627+
checksum = await self.checksum(threshold, force=force)
628628
if checksum is None or other_checksum is None:
629629
# if no checksum available or files too large, not the same
630630
return False

snakemake/persistence.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,9 @@ def __init__(
130130
self.unlock = self.noop
131131

132132
self._read_record = self._read_record_cached
133+
self.max_checksum_file_size = (
134+
self.dag.workflow.dag_settings.max_checksum_file_size
135+
)
133136

134137
@property
135138
def path(self) -> Path:
@@ -326,8 +329,10 @@ async def finished(self, job):
326329
else fallback_time
327330
)
328331

329-
checksums = ((infile, await infile.checksum()) for infile in job.input)
330-
332+
checksums = (
333+
(infile, await infile.checksum(self.max_checksum_file_size))
334+
for infile in job.input
335+
)
331336
self._record(
332337
self._metadata_path,
333338
{

snakemake/settings/types.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ class DAGSettings(SettingsBase):
203203
allowed_rules: AnySet[str] = frozenset()
204204
rerun_triggers: AnySet[RerunTrigger] = RerunTrigger.all()
205205
max_inventory_wait_time: int = 20
206+
max_checksum_file_size: int = 1000000
206207
strict_evaluation: AnySet[StrictDagEvaluation] = frozenset()
207208
# strict_functions_evaluation: bool = False
208209
# strict_cycle_evaluation: bool = False

0 commit comments

Comments
 (0)