Skip to content

Commit 0bc9e15

Browse files
committed
Implement MPIEvaluator for multi-node HPC systems support
Adds a new MPIEvaluator to the EMAworkbench, enabling experiments to be executed on multi-node High-Performance Computing (HPC) systems leveraging the mpi4py library. This evaluator optimizes performance for distributed computing environments by parallelizing experiments across multiple nodes and processors. Changes include: - Definition of the MPIEvaluator class. - Initialization function to set up the global ExperimentRunner for worker processes. - Proper handling to pack and unpack experiments for efficient data transfer between nodes. Note: This addition requires the mpi4py package only when the MPIEvaluator is explicitly used, preventing unnecessary dependencies for users not requiring this feature.
1 parent b76b487 commit 0bc9e15

File tree

3 files changed

+59
-0
lines changed

3 files changed

+59
-0
lines changed

ema_workbench/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
Constant,
1616
Scenario,
1717
Policy,
18+
MPIEvaluator,
1819
MultiprocessingEvaluator,
1920
IpyparallelEvaluator,
2021
SequentialEvaluator,

ema_workbench/em_framework/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
"perform_experiments",
3131
"optimize",
3232
"IpyparallelEvaluator",
33+
"MPIEvaluator",
3334
"MultiprocessingEvaluator",
3435
"SequentialEvaluator",
3536
"ReplicatorModel",
@@ -76,6 +77,7 @@
7677
from .evaluators import (
7778
perform_experiments,
7879
optimize,
80+
MPIEvaluator,
7981
MultiprocessingEvaluator,
8082
SequentialEvaluator,
8183
Samplers,

ema_workbench/em_framework/evaluators.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,62 @@ def evaluate_experiments(self, scenarios, policies, callback, combine="factorial
415415
add_tasks(self.n_processes, self._pool, ex_gen, callback)
416416

417417

418+
# Create a global ExperimentRunner that will be used by all the worker processes
419+
experiment_runner = None
420+
421+
422+
def mpi_initializer(models):
423+
global experiment_runner
424+
experiment_runner = ExperimentRunner(models)
425+
426+
427+
class MPIEvaluator(BaseEvaluator):
428+
"""Evaluator for experiments using MPI Pool Executor from mpi4py"""
429+
430+
def __init__(self, msis, **kwargs):
431+
super().__init__(msis, **kwargs)
432+
self._pool = None
433+
434+
def initialize(self):
435+
# Only import mpi4py if the MPIEvaluator is used, to avoid unnecessary dependencies.
436+
from mpi4py.futures import MPIPoolExecutor
437+
438+
# Instead of instantiating the ExperimentRunner for each experiment, instantiate it once here
439+
models = NamedObjectMap(AbstractModel)
440+
models.extend(self._msis)
441+
442+
# Use the initializer function to set up the ExperimentRunner for all the worker processes
443+
self._pool = MPIPoolExecutor(initializer=mpi_initializer, initargs=(models,))
444+
_logger.info(f"MPI pool started with {self._pool._max_workers} workers")
445+
return self
446+
447+
def finalize(self):
448+
self._pool.shutdown()
449+
_logger.info("MPI pool has been shut down")
450+
451+
def evaluate_experiments(self, scenarios, policies, callback, combine="factorial"):
452+
ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine)
453+
experiments = list(ex_gen) # Convert generator to list
454+
455+
# Instead of sending all models for each experiment, send only the model_name
456+
packed = [(experiment, experiment.model_name) for experiment in experiments]
457+
458+
# Use the pool to execute in parallel
459+
results = self._pool.map(run_experiment_mpi, packed)
460+
461+
for experiment, outcomes in results:
462+
callback(experiment, outcomes)
463+
464+
465+
def run_experiment_mpi(packed_data):
466+
experiment, model_name = packed_data
467+
468+
# Use the global ExperimentRunner created by the initializer
469+
outcomes = experiment_runner.run_experiment(experiment)
470+
471+
return experiment, outcomes
472+
473+
418474
class IpyparallelEvaluator(BaseEvaluator):
419475
"""evaluator for using an ipypparallel pool"""
420476

0 commit comments

Comments
 (0)