Skip to content

Support MLFlow Handler for single process/multi task enviornment#5728

Merged
wyli merged 3 commits intoProject-MONAI:devfrom
SachidanandAlle:mlflow_multi_thread
Dec 15, 2022
Merged

Support MLFlow Handler for single process/multi task enviornment#5728
wyli merged 3 commits intoProject-MONAI:devfrom
SachidanandAlle:mlflow_multi_thread

Conversation

@SachidanandAlle
Copy link
Copy Markdown
Contributor

@SachidanandAlle SachidanandAlle commented Dec 14, 2022

Signed-off-by: Sachidanand Alle [email protected]

Current MLFlow Handler fails when you invoke 2 train requests back to back with different URI. Or multiple train requests within the same process. This is mainly for using global array where it saves active experiment, active run and others share the same. This will cause conflicts between 2 invokes with 2 different URI.

Fixes

  • Use MLFlow Client to create experiment/runs instead of global functions.
  • Save the current run through the lifecycle of handler. If any handler has the same experiment name and same run name, the metrics all will be merged as part of the same run (e.g. train and validation handler).
  • If the run name is not provided (fall back on default) then last active run within the same experiment (sorted based on start time) is used for adding the metrics.

The above two conditions will help create similar behavior compared to using mlflow.active_run()

Verified

  • Running single and multi gpu training on bundles
    • spleen_ct_segmentation_v0.1.0
    • spleen_deepedit_annotation_v0.1.0
    • swin_unetr_btcv_segmentation_v0.1.0
  • Running Training workflows for both single and multi gpu in MONAI Label
  • Verified against running shared/single tracking URI (where all the experiments get saved)
  • Verified against individual eval/mlruns per bundle/workflow

I suggest, original owner of this handler to verify/test all the behaviors that were currently supported.

Error Description

Error stack when you run two train workflows with in the same process (simply one after another).

[2022-12-13 21:08:11,095] [4047823] [MainThread] [ERROR] (uvicorn.error:369) - Exception in ASGI application
Traceback (most recent call last):
  File "/localhome/sachi/.local/lib/python3.10/site-packages/uvicorn/protocols/http/h11_impl.py", line 366, in run_asgi
    result = await app(self.scope, self.receive, self.send)
  File "/localhome/sachi/.local/lib/python3.10/site-packages/uvicorn/middleware/proxy_headers.py", line 75, in __call__
    return await self.app(scope, receive, send)
  File "/localhome/sachi/.local/lib/python3.10/site-packages/fastapi/applications.py", line 199, in __call__
    await super().__call__(scope, receive, send)
  File "/localhome/sachi/.local/lib/python3.10/site-packages/starlette/applications.py", line 112, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/localhome/sachi/.local/lib/python3.10/site-packages/starlette/middleware/errors.py", line 181, in __call__
    raise exc from None
  File "/localhome/sachi/.local/lib/python3.10/site-packages/starlette/middleware/errors.py", line 159, in __call__
    await self.app(scope, receive, _send)
  File "/localhome/sachi/.local/lib/python3.10/site-packages/starlette/middleware/cors.py", line 78, in __call__
    await self.app(scope, receive, send)
  File "/localhome/sachi/.local/lib/python3.10/site-packages/starlette/exceptions.py", line 82, in __call__
    raise exc from None
  File "/localhome/sachi/.local/lib/python3.10/site-packages/starlette/exceptions.py", line 71, in __call__
    await self.app(scope, receive, sender)
  File "/localhome/sachi/.local/lib/python3.10/site-packages/starlette/routing.py", line 580, in __call__
    await route.handle(scope, receive, send)
  File "/localhome/sachi/.local/lib/python3.10/site-packages/starlette/routing.py", line 241, in handle
    await self.app(scope, receive, send)
  File "/localhome/sachi/.local/lib/python3.10/site-packages/starlette/routing.py", line 52, in app
    response = await func(request)
  File "/localhome/sachi/.local/lib/python3.10/site-packages/fastapi/routing.py", line 219, in app
    raw_response = await run_endpoint_function(
  File "/localhome/sachi/.local/lib/python3.10/site-packages/fastapi/routing.py", line 152, in run_endpoint_function
    return await dependant.call(**values)
  File "/localhome/sachi/Projects/monailabel/monailabel/endpoints/train.py", line 96, in api_run_model
    return run_model(model, params, run_sync, enqueue)
  File "/localhome/sachi/Projects/monailabel/monailabel/endpoints/train.py", line 55, in run_model
    res, detail = AsyncTask.run("train", request=request, params=params, force_sync=run_sync, enqueue=enqueue)
  File "/localhome/sachi/Projects/monailabel/monailabel/utils/async_tasks/task.py", line 43, in run
    return instance.train(request), None
  File "/localhome/sachi/Projects/monailabel/monailabel/interfaces/app.py", line 422, in train
    result = task(request, self.datastore())
  File "/localhome/sachi/Projects/monailabel/monailabel/tasks/train/basic_train.py", line 458, in __call__
    res = self.train(0, world_size, req, datalist)
  File "/localhome/sachi/Projects/monailabel/monailabel/tasks/train/basic_train.py", line 545, in train
    context.trainer.run()
  File "/localhome/sachi/Projects/MONAI/monai/engines/trainer.py", line 53, in run
    super().run()
  File "/localhome/sachi/Projects/MONAI/monai/engines/workflow.py", line 281, in run
    super().run(data=self.data_loader, max_epochs=self.state.max_epochs)
  File "/localhome/sachi/.local/lib/python3.10/site-packages/ignite/engine/engine.py", line 892, in run
    return self._internal_run()
  File "/localhome/sachi/.local/lib/python3.10/site-packages/ignite/engine/engine.py", line 935, in _internal_run
    return next(self._internal_run_generator)
  File "/localhome/sachi/.local/lib/python3.10/site-packages/ignite/engine/engine.py", line 993, in _internal_run_as_gen
    self._handle_exception(e)
  File "/localhome/sachi/.local/lib/python3.10/site-packages/ignite/engine/engine.py", line 636, in _handle_exception
    self._fire_event(Events.EXCEPTION_RAISED, e)
  File "/localhome/sachi/.local/lib/python3.10/site-packages/ignite/engine/engine.py", line 425, in _fire_event
    func(*first, *(event_args + others), **kwargs)
  File "/localhome/sachi/Projects/MONAI/monai/handlers/stats_handler.py", line 181, in exception_raised
    raise e
  File "/localhome/sachi/.local/lib/python3.10/site-packages/ignite/engine/engine.py", line 946, in _internal_run_as_gen
    self._fire_event(Events.STARTED)
  File "/localhome/sachi/.local/lib/python3.10/site-packages/ignite/engine/engine.py", line 425, in _fire_event
    func(*first, *(event_args + others), **kwargs)
  File "/localhome/sachi/Projects/MONAI/monai/handlers/mlflow_handler.py", line 183, in start
    self._delete_exist_param_in_dict(attrs)
  File "/localhome/sachi/Projects/MONAI/monai/handlers/mlflow_handler.py", line 141, in _delete_exist_param_in_dict
    log_data = self.client.get_run(cur_run.info.run_id).data
  File "/localhome/sachi/.local/lib/python3.10/site-packages/mlflow/tracking/client.py", line 150, in get_run
    return self._tracking_client.get_run(run_id)
  File "/localhome/sachi/.local/lib/python3.10/site-packages/mlflow/tracking/_tracking_service/client.py", line 72, in get_run
    return self.store.get_run(run_id)
  File "/localhome/sachi/.local/lib/python3.10/site-packages/mlflow/store/tracking/file_store.py", line 623, in get_run
    run_info = self._get_run_info(run_id)
  File "/localhome/sachi/.local/lib/python3.10/site-packages/mlflow/store/tracking/file_store.py", line 646, in _get_run_info
    raise MlflowException(
mlflow.exceptions.MlflowException: Run '1765aea084a3417586d052d9d8240039' not found
FAILED                                                                                                                                                           [ 72%]

Types of changes

  • Non-breaking change (fix or new feature that would not break existing functionality).
  • Breaking change (fix or new feature that would cause existing functionality to change).
  • New tests added to cover the changes.
  • Integration tests passed locally by running ./runtests.sh -f -u --net --coverage.
  • Quick tests passed locally by running ./runtests.sh --quick --unittests --disttests.
  • In-line docstrings updated.
  • Documentation updated, tested make html command in the docs/ folder.

@SachidanandAlle SachidanandAlle added enhancement New feature or request bug Something isn't working labels Dec 14, 2022
@Nic-Ma
Copy link
Copy Markdown
Contributor

Nic-Ma commented Dec 14, 2022

Hi @SachidanandAlle ,

May I know how you tested the MLFlow experiment management for bundles?

  1. Did you use the same method as: https://github.com/Project-MONAI/tutorials/blob/main/experiment_management/bundle_integrate_mlflow.ipynb?
    "--tracking mlflow" arg
    Same in the MONAI FL: https://github.com/Project-MONAI/MONAI/blob/dev/monai/fl/client/monai_algo.py#L403
  2. If you use the MLFlowHandler directly, have you set this close_on_complete arg to True?
    https://github.com/Project-MONAI/MONAI/blob/dev/monai/handlers/mlflow_handler.py#L110

And I agree with your proposal, I think we should try to avoid setting the global param of MLFlow.

Thanks.

@SachidanandAlle
Copy link
Copy Markdown
Contributor Author

SachidanandAlle commented Dec 14, 2022

Hi @SachidanandAlle ,

May I know how you tested the MLFlow experiment management for bundles?

  1. Did you use the same method as: https://github.com/Project-MONAI/tutorials/blob/main/experiment_management/bundle_integrate_mlflow.ipynb?
    "--tracking mlflow" arg
    Same in the MONAI FL: https://github.com/Project-MONAI/MONAI/blob/dev/monai/fl/client/monai_algo.py#L403
  2. If you use the MLFlowHandler directly, have you set this close_on_complete arg to True?
    https://github.com/Project-MONAI/MONAI/blob/dev/monai/handlers/mlflow_handler.py#L110

And I agree with your proposal, I think we should try to avoid setting the global param of MLFlow.

Thanks.

Point 1.. yes..
Point 2.. good.. didn't see that.. that will solve one situation. but multi-thread will have race issues still.. because of the global array sharing. 2 experiments/runs can be active at the same time

@Nic-Ma
Copy link
Copy Markdown
Contributor

Nic-Ma commented Dec 14, 2022

Hi @SachidanandAlle ,
I agree we should avoid setting the global variables of MLFlow, your proposal is a good enhancement for thread-safe.
Could you please help share a simple program to reproduce the "race issues" you said?
Then @binliunls can test and also verify your current PR solution ASAP.

Thanks in advance.

@SachidanandAlle
Copy link
Copy Markdown
Contributor Author

SachidanandAlle commented Dec 14, 2022

Run the handler with two different URI (folder) something like this..

from concurrent.futures import ThreadPoolExecutor

def run_task(uri):
  print(f"Running handler update for uri: {uri}")
  r = train(req)  # Run Transform instead of train with mocked engine for train/eval
  return r

train_tasks = ["uri1", "uri2"]
with ThreadPoolExecutor(2, "Training") as executor:
  for t in train_tasks:
    futures[t["_id"]] = t, executor.submit(run_task, t)

  for tid, (t, future) in futures.items():
    res = future.result()

Actually this keeps the active run open and both handlers will try to race for the current uri/experiment/active run.
Even in their code (mlflow), they have mentioned in couple of places regarding the possible race condition when you invoke certain methods.

https://github.com/mlflow/mlflow/blob/master/mlflow/tracking/fluent.py#L1529-L1532
If two handlers using two different URIs within the same process, the above method always returns the last one. And you will be using that to log the params/metrics etc.. this is similar to not closing the active run before starting another.

Another easiest way to run is via unit tests and invoke with multi threads...

@binliunls
Copy link
Copy Markdown
Contributor

Hi @Nic-Ma ,
I've reproduced the error with multi-thread task and verified that this new version of code can fix it. And the experiment management tutorial run well with this new code since it only have one main thread.

Thanks,
Bin

@wyli
Copy link
Copy Markdown
Contributor

wyli commented Dec 15, 2022

/build

Signed-off-by: Wenqi Li <[email protected]>
@wyli wyli enabled auto-merge (squash) December 15, 2022 09:03
@wyli
Copy link
Copy Markdown
Contributor

wyli commented Dec 15, 2022

/build

2 similar comments
@wyli
Copy link
Copy Markdown
Contributor

wyli commented Dec 15, 2022

/build

@wyli
Copy link
Copy Markdown
Contributor

wyli commented Dec 15, 2022

/build

@Nic-Ma Nic-Ma disabled auto-merge December 15, 2022 10:36
@Nic-Ma
Copy link
Copy Markdown
Contributor

Nic-Ma commented Dec 15, 2022

Hi @Nic-Ma , I've reproduced the error with multi-thread task and verified that this new version of code can fix it. And the experiment management tutorial run well with this new code since it only have one main thread.

Thanks, Bin

Thanks for your testing, the PR looks good to me.
Is it possible to add another unit test case to cover this multi-thread usage?
If not easy, we can consider to add it in another PR later.

Thanks in advance.

@wyli wyli merged commit 9784506 into Project-MONAI:dev Dec 15, 2022
@SachidanandAlle SachidanandAlle deleted the mlflow_multi_thread branch December 15, 2022 19:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working enhancement New feature or request

Projects

No open projects
Status: Done

Development

Successfully merging this pull request may close these issues.

4 participants