Skip to content

Commit bc4fcee

Browse files
perf: query updated input files in parallel (snakemake#3266)
<!--Add a description of your PR here--> ### QC <!-- Make sure that you can tick the boxes below. --> * [x] The PR contains a test case for the changes or the changes are already covered by an existing test case. * [x] The documentation (`docs/`) is updated to reflect the changes or this is not necessary (e.g. if the change does neither modify the language nor the behavior or functionalities of Snakemake). <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Enhanced clarity of comments in the `update_needrun` method regarding input checksums. - Improved handling of asynchronous operations related to input file checks. - Adjusted overall structure of the `update_needrun` method for better readability and maintainability. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent e5d8ec1 commit bc4fcee

File tree

1 file changed

+15
-12
lines changed

1 file changed

+15
-12
lines changed

snakemake/dag.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1226,7 +1226,8 @@ async def is_same_checksum(f, job):
12261226
# no chance to compute checksum, cannot be assumed the same
12271227
is_same = False
12281228
else:
1229-
# obtain the input checksums for the given file for all output files of the job
1229+
# obtain the set of input checksums for the given file for all
1230+
# output files of the job
12301231
checksums = self.workflow.persistence.input_checksums(job, f)
12311232
if len(checksums) > 1:
12321233
# more than one checksum recorded, cannot be all the same
@@ -1287,19 +1288,21 @@ async def update_needrun(job):
12871288
)
12881289
if not reason:
12891290
output_mintime_ = output_mintime.get(job)
1290-
updated_input = None
1291+
reason.updated_input.clear()
12911292
if output_mintime_:
1292-
# Input is updated if it is newer that the oldest output file
1293+
# Input is updated if it is newer than the oldest output file
12931294
# and does not have the same checksum as the one previously recorded.
1294-
updated_input = [
1295-
f
1296-
for f in job.input
1297-
if await f.exists()
1298-
and await f.is_newer(output_mintime_)
1299-
and not await is_same_checksum(f, job)
1300-
]
1301-
reason.updated_input.update(updated_input)
1302-
if not updated_input:
1295+
async def updated_input():
1296+
for f in job.input:
1297+
if (
1298+
await f.exists()
1299+
and await f.is_newer(output_mintime_)
1300+
and not await is_same_checksum(f, job)
1301+
):
1302+
yield f
1303+
1304+
reason.updated_input.update([f async for f in updated_input()])
1305+
if not reason.updated_input:
13031306
reason.unfinished_queue_input = job.has_unfinished_queue_input()
13041307
if not reason.unfinished_queue_input:
13051308
# check for other changes like parameters, set of input files, or code

0 commit comments

Comments
 (0)