Skip to content

Commit c58684c

Browse files
authored
fix: Convert Path to IOFile (snakemake#3405)
<!--Add a description of your PR here--> Fixes snakemake#3193 Fixes snakemake#2933 ### QC <!-- Make sure that you can tick the boxes below. --> * [ ] The PR contains a test case for the changes or the changes are already covered by an existing test case. * [ ] The documentation (`docs/`) is updated to reflect the changes or this is not necessary (e.g. if the change does neither modify the language nor the behavior or functionalities of Snakemake). <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Revised handling of pending file operations to deliver more precise and actionable feedback. - Streamlined metadata cleanup processes to improve overall reliability and consistency. - Enhanced workflow routines for file cleanup, ensuring a more robust and efficient operation. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent a6e45bf commit c58684c

File tree

3 files changed

+18
-14
lines changed

3 files changed

+18
-14
lines changed

src/snakemake/dag.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -499,12 +499,12 @@ async def check_incomplete(self):
499499
"""Check if any output files are incomplete. This is done by looking up
500500
markers in the persistence module."""
501501
if not self.ignore_incomplete:
502-
incomplete = await self.incomplete_files()
503-
if incomplete:
502+
incomplete_files = await self.incomplete_files()
503+
if any(incomplete_files):
504504
if self.workflow.dag_settings.force_incomplete:
505-
self.forcefiles.update(incomplete)
505+
self.forcefiles.update(incomplete_files)
506506
else:
507-
raise IncompleteFilesException(incomplete)
507+
raise IncompleteFilesException(incomplete_files)
508508

509509
def incomplete_external_jobid(self, job) -> Optional[str]:
510510
"""Return the external jobid of the job if it is marked as incomplete.
@@ -599,10 +599,13 @@ async def incomplete_files(self):
599599
"""Yield incomplete files."""
600600
incomplete = list()
601601
for job in filterfalse(self.needrun, self.jobs):
602-
is_incomplete = await self.workflow.persistence.incomplete(job)
603-
if is_incomplete:
604-
for f in job.output:
605-
incomplete.append(f)
602+
incomplete.extend(
603+
[
604+
job
605+
for job in await self.workflow.persistence.incomplete(job)
606+
if job is not None
607+
]
608+
)
606609
return incomplete
607610

608611
@property

src/snakemake/persistence.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,9 @@ def cleanup_locks(self):
228228
shutil.rmtree(self._lockdir)
229229

230230
def cleanup_metadata(self, path):
231-
return self._delete_record(self._metadata_path, path)
231+
return self._delete_record(self._incomplete_path, path) or self._delete_record(
232+
self._metadata_path, path
233+
)
232234

233235
def cleanup_shadow(self):
234236
if os.path.exists(self.shadow_path):
@@ -364,8 +366,7 @@ async def finished(self, job):
364366

365367
def cleanup(self, job):
366368
for f in job.output:
367-
self._delete_record(self._incomplete_path, f)
368-
self._delete_record(self._metadata_path, f)
369+
self.cleanup_metadata(f)
369370

370371
async def incomplete(self, job):
371372
if self._incomplete_cache is None:
@@ -385,12 +386,12 @@ def marked_incomplete(f):
385386
async def is_incomplete(f):
386387
exists = await f.exists()
387388
marked = marked_incomplete(f)
388-
return exists and marked
389+
return f if exists and marked else None
389390

390391
async with asyncio.TaskGroup() as tg:
391392
tasks = [tg.create_task(is_incomplete(f)) for f in job.output]
392393

393-
return any(task.result() for task in tasks)
394+
return [task.result() for task in tasks]
394395

395396
def _cache_incomplete_folder(self):
396397
self._incomplete_cache = {

src/snakemake/workflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -866,7 +866,7 @@ def cleanup_metadata(self, paths: List[Path]):
866866
)
867867
failed = []
868868
for path in paths:
869-
success = self.persistence.cleanup_metadata(path)
869+
success = self.persistence.cleanup_metadata(IOFile(path))
870870
if not success:
871871
failed.append(str(path))
872872
if failed:

0 commit comments

Comments
 (0)