Skip to content

Commit e10feef

Browse files
feat: subsample jobs to speed-up scheduler (#3112)
<!--Add a description of your PR here--> I am running a workflow with ~700k jobs and, at each given time, there are around 230k jobs ready to be run. The initial building of the DAG is quite slow (~2h, but I'll leave that for another PR 😄), but the main issue is that the scheduler takes a lot of time deciding the next jobs to be submitted. In my case, all jobs are quite fast and similar in terms of resources, so the cluster is idle most of the time. The greedy scheduler is considerably faster, but still too slow. The ILP should switch to the greedy after 10s, but it sometimes ignores the timeout (coin-or/Cbc#487) and it has been reported being quite slow instantiating large problems (coin-or/pulp#749). In my case, the ILP runs for 60s (the pulp file is 100Mb) before switching to greedy. Apart from that, and specially on slow file systems, the scheduler can still be quite slow checking all temp and input files. Here, I propose sampling ready jobs, so that only a subset of jobs (instead of all ready jobs) are evaluated by the scheduler. In my tests, this greatly reduces the scheduler time: | | ILP | greedy | |---|---|---| | Native |15 - 20 mins |30s - 1 min | | Sampling 1000 jobs | | 1 - 2 s | ### QC <!-- Make sure that you can tick the boxes below. --> * [x] The PR contains a test case for the changes or the changes are already covered by an existing test case. * [ ] The documentation (`docs/`) is updated to reflect the changes or this is not necessary (e.g. if the change does neither modify the language nor the behavior or functionalities of Snakemake). <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Summary by CodeRabbit - **New Features** - Introduced a new argument `--scheduler-subsample` to optimize job scheduling by limiting the number of jobs considered for execution. - Added a method for inferring resource requirements, enhancing user experience with better error handling. - Updated settings to include a new attribute for job subsampling, improving scheduling flexibility. - **Bug Fixes** - Improved error handling and logging for resource evaluation and parsing, providing clearer guidance for users. - Enhanced job selection process with a subsampling mechanism to optimize scheduling efficiency. - **Refactor** - Enhanced structure and organization of job scheduling logic for better integration with existing functionality. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent 9504bf4 commit e10feef

File tree

4 files changed

+33
-6
lines changed

4 files changed

+33
-6
lines changed

snakemake/cli.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1414,6 +1414,16 @@ def get_argument_parser(profiles=None):
14141414
"value (1.0) provides the best speed and still acceptable scheduling "
14151415
"quality.",
14161416
)
1417+
group_behavior.add_argument(
1418+
"--scheduler-subsample",
1419+
type=int,
1420+
default=None,
1421+
help="Set the number of jobs to be considered for scheduling. If number "
1422+
"of ready jobs is greater than this value, this number of jobs is randomly "
1423+
"chosen for scheduling; if number of ready jobs is lower, this option has "
1424+
"no effect. This can be useful on very large DAGs, where the scheduler can "
1425+
"take some time selecting which jobs to run.",
1426+
)
14171427
group_behavior.add_argument(
14181428
"--no-hooks",
14191429
action="store_true",
@@ -2127,6 +2137,7 @@ def args_to_api(args, parser):
21272137
ilp_solver=args.scheduler_ilp_solver,
21282138
solver_path=args.scheduler_solver_path,
21292139
greediness=args.scheduler_greediness,
2140+
subsample=args.scheduler_subsample,
21302141
max_jobs_per_second=args.max_jobs_per_second,
21312142
max_jobs_per_timespan=args.max_jobs_per_timespan,
21322143
),

snakemake/resources.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -654,7 +654,6 @@ def infer_resources(name, value, resources: dict):
654654
raise WorkflowError(
655655
f"Cannot parse runtime value into minutes for setting runtime resource: {value}"
656656
)
657-
logger.debug(f"Inferred runtime value of {parsed} minutes from {value}")
658657
resources["runtime"] = parsed
659658

660659

snakemake/scheduler.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ def __init__(self, workflow, executor_plugin: ExecutorPlugin):
6262
self.failed = set()
6363
self.finished_jobs = 0
6464
self.greediness = self.workflow.scheduling_settings.greediness
65+
self.subsample = self.workflow.scheduling_settings.subsample
6566
self._tofinish = []
6667
self._toerror = []
6768
self.handle_job_success = True
@@ -263,7 +264,18 @@ def schedule(self):
263264
job.reset_params_and_resources()
264265

265266
logger.debug(f"Resources before job selection: {self.resources}")
266-
logger.debug(f"Ready jobs: {len(needrun)}")
267+
268+
# Subsample jobs to be run (to speedup solver)
269+
n_total_needrun = len(needrun)
270+
if self.subsample and n_total_needrun > self.subsample:
271+
import random
272+
273+
needrun = set(random.sample(tuple(needrun), k=self.subsample))
274+
logger.debug(
275+
f"Ready subsampled jobs: {len(needrun)} (out of {n_total_needrun})"
276+
)
277+
else:
278+
logger.debug(f"Ready jobs: {n_total_needrun}")
267279

268280
if not self._last_job_selection_empty:
269281
logger.info("Select jobs to execute...")
@@ -506,7 +518,6 @@ def job_selector_ilp(self, jobs):
506518
if not self.resources["_cores"]:
507519
return set()
508520

509-
# assert self.resources["_cores"] > 0
510521
scheduled_jobs = {
511522
job: pulp.LpVariable(
512523
f"job_{idx}", lowBound=0, upBound=1, cat=pulp.LpInteger

snakemake/settings/types.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -289,14 +289,17 @@ class SchedulingSettings(SettingsBase):
289289
ilp_solver:
290290
Set solver for ilp scheduler.
291291
greediness:
292-
set the greediness of scheduling. This value between 0 and 1 determines how careful jobs are selected for execution. The default value (0.5 if prioritytargets are used, 1.0 else) provides the best speed and still acceptable scheduling quality.
292+
Set the greediness of scheduling. This value, between 0 and 1, determines how careful jobs are selected for execution. The default value (0.5 if prioritytargets are used, 1.0 else) provides the best speed and still acceptable scheduling quality.
293+
subsample:
294+
Set the number of jobs to be considered for scheduling. If number of ready jobs is greater than this value, this number of jobs is randomly chosen for scheduling; if number of ready jobs is lower, this option has no effect. This can be useful on very large DAGs, where the scheduler can take some time selecting which jobs to run."
293295
"""
294296

295297
prioritytargets: AnySet[str] = frozenset()
296298
scheduler: str = "ilp"
297299
ilp_solver: Optional[str] = None
298300
solver_path: Optional[Path] = None
299301
greediness: Optional[float] = None
302+
subsample: Optional[int] = None
300303
max_jobs_per_second: Optional[int] = None
301304
max_jobs_per_timespan: Optional[MaxJobsPerTimespan] = None
302305

@@ -312,8 +315,11 @@ def _get_greediness(self):
312315
return self.greediness
313316

314317
def _check(self):
315-
if not (0 < self.greedyness <= 1.0):
316-
raise ApiError("greediness must be >0 and <=1")
318+
if not (0 <= self.greediness <= 1.0):
319+
raise ApiError("greediness must be >=0 and <=1")
320+
if self.subsample:
321+
if not isinstance(self.subsample, int) or self.subsample < 1:
322+
raise ApiError("subsample must be a positive integer")
317323

318324

319325
@dataclass

0 commit comments

Comments
 (0)