Skip to content

Commit eace08f

Browse files
fix: in remote jobs, upload storage in topological order such that modification dates are preserved (e.g. in case of group jobs) (#3377)
### Description <!--Add a description of your PR here--> ### QC <!-- Make sure that you can tick the boxes below. --> * [x] The PR contains a test case for the changes or the changes are already covered by an existing test case. * [x] The documentation (`docs/`) is updated to reflect the changes or this is not necessary (e.g. if the change does neither modify the language nor the behavior or functionalities of Snakemake). <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Improved the job output storage workflow by changing the data structure for job processing. - **New Features** - Included template files in the package distribution for enhanced functionality. - **Chores** - Restructured dependencies in the test environment to treat certain tools as top-level dependencies, streamlining package management. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Tadas Bareikis <[email protected]>
1 parent c7473a6 commit eace08f

File tree

1 file changed

+19
-16
lines changed

1 file changed

+19
-16
lines changed

snakemake/dag.py

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -409,25 +409,28 @@ async def store_storage_outputs(self):
409409
if self.workflow.remote_exec:
410410
logger.info("Storing output in storage.")
411411
try:
412-
async with asyncio.TaskGroup() as tg:
413-
for job in self.needrun_jobs(exclude_finished=False):
414-
benchmark = [job.benchmark] if job.benchmark else []
415-
416-
async def tostore(f):
417-
return (
418-
f.is_storage
419-
and f
420-
not in self.workflow.storage_settings.unneeded_temp_files
421-
and await f.exists_local()
422-
)
412+
for level in self.toposorted(
413+
set(self.needrun_jobs(exclude_finished=False))
414+
):
415+
async with asyncio.TaskGroup() as tg:
416+
for job in level:
417+
benchmark = [job.benchmark] if job.benchmark else []
418+
419+
async def tostore(f):
420+
return (
421+
f.is_storage
422+
and f
423+
not in self.workflow.storage_settings.unneeded_temp_files
424+
and await f.exists_local()
425+
)
423426

424-
if self.finished(job):
425-
for f in chain(job.output, benchmark):
427+
if self.finished(job):
428+
for f in chain(job.output, benchmark):
429+
if await tostore(f):
430+
tg.create_task(f.store_in_storage())
431+
for f in job.log:
426432
if await tostore(f):
427433
tg.create_task(f.store_in_storage())
428-
for f in job.log:
429-
if await tostore(f):
430-
tg.create_task(f.store_in_storage())
431434
except ExceptionGroup as e:
432435
raise WorkflowError("Failed to store output in storage.", e)
433436

0 commit comments

Comments
 (0)