Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions mlflow/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ def __init__(self, pipeline_root_path: str, profile: str) -> None:
pipeline.yaml to generate the configuration to run the pipeline.
"""
self._pipeline_root_path = pipeline_root_path
self._run_args = {}
self._profile = profile
self._name = get_pipeline_name(pipeline_root_path)
self._steps = self._resolve_pipeline_steps()
Expand Down Expand Up @@ -75,9 +74,6 @@ def run(self, step: str = None) -> None:
:return: None
"""

# Save the run parameters for later
self._run_args = {"step": step}

# TODO Record performance here.
# Always resolve the steps to load latest step modules before execution.
self._steps = self._resolve_pipeline_steps()
Expand Down Expand Up @@ -178,7 +174,6 @@ def _resolve_pipeline_steps(self) -> List[BaseStep]:
"""
pipeline_config = get_pipeline_config(self._pipeline_root_path, self._profile)
pipeline_config["profile"] = self.profile
pipeline_config["run_args"] = self._run_args
return [
s.from_pipeline_config(pipeline_config, self._pipeline_root_path)
for s in self._get_step_classes()
Expand Down
12 changes: 7 additions & 5 deletions mlflow/pipelines/steps/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
from mlflow.exceptions import MlflowException, INVALID_PARAMETER_VALUE
from mlflow.pipelines.cards import BaseCard
from mlflow.pipelines.step import BaseStep
from mlflow.pipelines.utils.execution import get_step_output_path
from mlflow.pipelines.utils.execution import (
get_step_output_path,
_MLFLOW_PIPELINES_EXECUTION_TARGET_STEP_NAME_ENV_VAR,
)
from mlflow.pipelines.utils.metrics import (
BUILTIN_PIPELINE_METRICS,
_get_primary_metric,
Expand Down Expand Up @@ -117,13 +120,13 @@ def _run(self, output_directory):
estimator = estimator_fn()
mlflow.autolog(log_models=False)

run_args = self.step_config.get("run_args") or {}

tags = {
MLFLOW_SOURCE_TYPE: SourceType.to_string(SourceType.PIPELINE),
MLFLOW_PIPELINE_TEMPLATE_NAME: self.step_config["template_name"],
MLFLOW_PIPELINE_PROFILE_NAME: self.step_config["profile"],
MLFLOW_PIPELINE_STEP_NAME: run_args.get("step", ""),
MLFLOW_PIPELINE_STEP_NAME: os.getenv(
_MLFLOW_PIPELINES_EXECUTION_TARGET_STEP_NAME_ENV_VAR
),
}
with mlflow.start_run(tags=tags) as run:
estimator.fit(X_train, y_train)
Expand Down Expand Up @@ -455,7 +458,6 @@ def from_pipeline_config(cls, pipeline_config, pipeline_root):
step_config["metrics"] = pipeline_config.get("metrics")
step_config["template_name"] = pipeline_config.get("template")
step_config["profile"] = pipeline_config.get("profile")
step_config["run_args"] = pipeline_config.get("run_args")
step_config.update(
get_pipeline_tracking_config(
pipeline_root_path=pipeline_root,
Expand Down
6 changes: 5 additions & 1 deletion mlflow/pipelines/utils/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@


_MLFLOW_PIPELINES_EXECUTION_DIRECTORY_ENV_VAR = "MLFLOW_PIPELINES_EXECUTION_DIRECTORY"
_MLFLOW_PIPELINES_EXECUTION_TARGET_STEP_NAME_ENV_VAR = "MLFLOW_PIPELINES_EXECUTION_TARGET_STEP_NAME"
_STEPS_SUBDIRECTORY_NAME = "steps"
_STEP_OUTPUTS_SUBDIRECTORY_NAME = "outputs"
_STEP_CONF_YAML_NAME = "conf.yaml"
Expand Down Expand Up @@ -69,7 +70,10 @@ def get_execution_state(step):
# Aggregate step-specific environment variables into a single environment dictionary
# that is passed to the Make subprocess. In the future, steps with different environments
# should be isolated in different subprocesses
make_env = {}
make_env = {
# Include target step name in the environment variable set
_MLFLOW_PIPELINES_EXECUTION_TARGET_STEP_NAME_ENV_VAR: target_step.name,
}
for step in pipeline_steps:
make_env.update(step.environment)
# Use Make to run the target step and all of its dependencies
Expand Down
7 changes: 6 additions & 1 deletion tests/pipelines/test_execution_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
_get_or_create_execution_directory,
run_pipeline_step,
get_step_output_path,
_MLFLOW_PIPELINES_EXECUTION_TARGET_STEP_NAME_ENV_VAR,
)

# pylint: disable=unused-import
Expand Down Expand Up @@ -217,7 +218,11 @@ def environment(self):
)

_, subprocess_call_kwargs = mock_run_in_subprocess.call_args
assert subprocess_call_kwargs.get("extra_env") == {"A": "B", "C": "D"}
assert subprocess_call_kwargs.get("extra_env") == {
"A": "B",
"C": "D",
_MLFLOW_PIPELINES_EXECUTION_TARGET_STEP_NAME_ENV_VAR: "test_step_1",
}


def run_test_pipeline_step(pipeline_steps, target_step):
Expand Down
11 changes: 9 additions & 2 deletions tests/pipelines/test_train_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
import sklearn.compose
from mlflow.tracking import MlflowClient
from mlflow.utils.file_utils import read_yaml
from mlflow.pipelines.utils.execution import _MLFLOW_PIPELINES_EXECUTION_DIRECTORY_ENV_VAR
from mlflow.pipelines.utils.execution import (
_MLFLOW_PIPELINES_EXECUTION_DIRECTORY_ENV_VAR,
_MLFLOW_PIPELINES_EXECUTION_TARGET_STEP_NAME_ENV_VAR,
)
from mlflow.pipelines.utils import _PIPELINE_CONFIG_FILE_NAME
from mlflow.pipelines.steps.train import TrainStep
from unittest import mock
Expand Down Expand Up @@ -134,7 +137,11 @@ def test_train_steps_autologs(tmp_pipeline_root_path):

def test_train_steps_with_correct_tags(tmp_pipeline_root_path):
with mock.patch.dict(
os.environ, {_MLFLOW_PIPELINES_EXECUTION_DIRECTORY_ENV_VAR: str(tmp_pipeline_root_path)}
os.environ,
{
_MLFLOW_PIPELINES_EXECUTION_DIRECTORY_ENV_VAR: str(tmp_pipeline_root_path),
_MLFLOW_PIPELINES_EXECUTION_TARGET_STEP_NAME_ENV_VAR: "train",
},
):
train_step, train_step_output_dir = set_up_train_step(tmp_pipeline_root_path)
train_step._run(str(train_step_output_dir))
Expand Down