fix: actually use asyncio#2963
Conversation
|
Well, some observations still show that it is sequential. I may need further investigation. |
|
Warning Rate limit exceeded@johanneskoester has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 0 minutes and 12 seconds before requesting another review. How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. WalkthroughThe recent changes enhance the efficiency and observability of the Snakemake workflow management system. The introduction of Changes
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 5
Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Files selected for processing (2)
- snakemake/jobs.py (2 hunks)
- snakemake/scheduler.py (6 hunks)
Additional context used
Ruff
snakemake/scheduler.py
199-199: Trailing comma missing
Add trailing comma
(COM812)
209-209: Trailing comma missing
Add trailing comma
(COM812)
296-296: Trailing comma missing
Add trailing comma
(COM812)
336-336: Trailing comma missing
Add trailing comma
(COM812)
341-341: Trailing comma missing
Add trailing comma
(COM812)
354-354: Undefined name
asyncio(F821)
365-365: Trailing comma missing
Add trailing comma
(COM812)
373-373: Trailing comma missing
Add trailing comma
(COM812)
384-384: Trailing comma missing
Add trailing comma
(COM812)
401-401: Trailing comma missing
Add trailing comma
(COM812)
418-418: Trailing comma missing
Add trailing comma
(COM812)
snakemake/jobs.py
1142-1142: Trailing comma missing
Add trailing comma
(COM812)
1156-1156: Trailing comma missing
Add trailing comma
(COM812)
1157-1157: Trailing comma missing
Add trailing comma
(COM812)
1171-1171: Trailing comma missing
Add trailing comma
(COM812)
1176-1176: Replace aliased errors with
OSErrorReplace
IOErrorwith builtinOSError(UP024)
1177-1181: Within an
exceptclause, raise exceptions withraise ... from errorraise ... from Noneto distinguish them from errors in exception handling(B904)
1178-1180: Use f-string instead of
formatcallConvert to f-string
(UP032)
1180-1180: Trailing comma missing
Add trailing comma
(COM812)
| handle_log=True, | ||
| handle_touch=True, | ||
| ) | ||
| async with asyncio.TaskGroup() as tg: |
There was a problem hiding this comment.
Import the asyncio module.
The asyncio module is used but not imported, which will cause a NameError at runtime.
+ import asyncioCommittable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async with asyncio.TaskGroup() as tg: | |
| import asyncio | |
| async with asyncio.TaskGroup() as tg: |
Tools
Ruff
354-354: Undefined name
asyncio(F821)
| except IOError as e: | ||
| raise WorkflowError( | ||
| "Error recording metadata for finished job " | ||
| "({}). Please ensure write permissions for the " | ||
| "directory {}".format(e, self.dag.workflow.persistence.path) | ||
| ) |
There was a problem hiding this comment.
Address static analysis suggestions.
- Replace
IOErrorwithOSErrorasIOErroris an alias forOSErrorin Python 3. - Use
raise ... from Noneto distinguish exceptions from errors in exception handling. - Convert to f-string for consistency and readability.
- except IOError as e:
+ except OSError as e:- raise WorkflowError(
- "Error recording metadata for finished job "
- "({}). Please ensure write permissions for the "
- "directory {}".format(e, self.dag.workflow.persistence.path)
- )
+ raise WorkflowError(
+ f"Error recording metadata for finished job ({e}). "
+ f"Please ensure write permissions for the directory {self.dag.workflow.persistence.path}"
+ ) from NoneTools
Ruff
1176-1176: Replace aliased errors with
OSErrorReplace
IOErrorwith builtinOSError(UP024)
1177-1181: Within an
exceptclause, raise exceptions withraise ... from errorraise ... from Noneto distinguish them from errors in exception handling(B904)
1178-1180: Use f-string instead of
formatcallConvert to f-string
(UP032)
1180-1180: Trailing comma missing
Add trailing comma
(COM812)
|
| tg.create_task( | ||
| self.dag.check_and_touch_output( | ||
| self, | ||
| wait=self.dag.workflow.execution_settings.latency_wait, | ||
| ignore_missing_output=ignore_missing_output, | ||
| # storage not yet handled, just require the local files | ||
| wait_for_local=True, | ||
| ) | ||
| ) | ||
| self.dag.unshadow_output(self, only_log=error) |
There was a problem hiding this comment.
I think this is problematic. The idea of the await was that the check_and_touch happens before the stuff below like the unshadow. If I don't get this wrong here, now the check and touch happens in the background, in theory in parallel to the unshadow?
There was a problem hiding this comment.
One example, in general all the steps currently conducted in postprocess should happen in the order they are defined in the code.
|
I think I will close this for now because of my concerns. Nevertheless, feel free to reopen or create a new one as a follow up if I missed something here. Thanks a lot for the suggestion, I still really appreciate the discussion! |



It seems that asyncio is not used correctly. Current implementation runs without IO concurrency, which can be seen by adding some timing outputs.
For example, the
_finish_jobsfunction inscheduler.pywill wait for thepostprocessof all jobs sequentially, thus the total execution time of_finish_jobsis the sum of allpostprocessexecution time, which can be slow when the computation scales up.This PR facilitates the actual use of asyncio. By making the changes in this PR, the total execution time of
_finish_jobswill be in theory the max of allpostprocessexecution times (slightly longer in reality), which can be much shorter than the sum of times. I can already see a significant speedup in one of my use cases.This PR is just a proposal for now. Asyncio usage in other parts of snakemake may have the same issue. We can proceed with a full fix if this proposal is thought to be correct.
The file diff on github may not be intuitive. The change is simply surrounding the codes with
async withand replacingawaitwithasyncio.TaskGroup().create_task()when possible.QC
docs/) is updated to reflect the changes or this is not necessary (e.g. if the change does neither modify the language nor the behavior or functionalities of Snakemake).Summary by CodeRabbit
New Features
Bug Fixes
Documentation