Skip to content

LMDeploy Distserve#3304

Merged
lvhan028 merged 86 commits into
InternLM:mainfrom
JimyMa:distserve
May 8, 2025
Merged

LMDeploy Distserve#3304
lvhan028 merged 86 commits into
InternLM:mainfrom
JimyMa:distserve

Conversation

@JimyMa

@JimyMa JimyMa commented Mar 22, 2025

Copy link
Copy Markdown
Contributor

What the lmdeploy-distserve is included:

  1. A Simple Router for PD Scheduling
  2. A Transfer Engine for KVCache Migration
  3. A Migration Scheduler for Overlaping Migration and Decode

State of lmdeploy-distserve:

  1. pass the functional validation (in H800 with RoCE Supported).

Next Step

Initialization

image

The PD Consolidation process outlines the sequential steps for establishing peer-to-peer (P2P) connections between system components. The process begins with the Router , which acts as the central orchestrator. First, the Router initiates the connection setup by sending a p2p_initialize message to both the Prefill Server and the Decode Server . This ensures that all necessary components are prepared for the subsequent connection phase.

Once the initialization phase is complete for both the Prefill Server and the Decode Server , the Router proceeds to establish the actual P2P connections. It sends a p2p_connect message to the Prefill Server to finalize the connection, followed by another p2p_connect message to the Decode Server . This systematic approach ensures that all components are properly initialized before any connections are established, forming a cohesive network during the system's startup phase.

Control Plane

image

The diagram illustrates the workflow and interactions between various components involved in the system's prefill and decode processes. This process is designed to manage tasks efficiently, ensuring smooth operation and scalability.

Prefill Process:

The Prefill Server initiates the prefill process by sending a Prefill Message to the Prefill Engine .
The Prefill Engine processes the request and generates an EngineOutput, which includes details such as FirstToken and Cahce BlockIds.
The Prefill Scheduler receives the output from the Prefill Engine and manages task scheduling. Tasks are placed into a Waiting Queue with a status of Status.WAITING.
Once ready, the tasks are forwarded to the Forward Executor , which processes them with a status of Status.RUNNING. The status will be converted to Status.ToBeMigrated and will be free when decode engine migration done.

Decode Process:

The Decode Server sends requests to the Decode Engine , which processes the input and generates an EngineOutput. This output may include details like GenToken. The Decode Scheduler manages the decoded tasks and places them into a Migration Queue with a status of Status.WaitingMigration. The Migration Executor processes these tasks, transitioning their status to Status.Running. Completed tasks are then sent back to the Forward Executor for further processing (Prefill Engine cache_free).

Key Features

  • Task Management: Both the Prefill Scheduler and Decode Scheduler play crucial roles in managing task queues and ensuring efficient execution.
  • Executor Roles: The Forward Executor and Migration Executor handle the execution of tasks in different stages of the pipeline.
  • Scalability and Monitoring (Future work): The inclusion of load balancing, offline profiling, online monitoring, and scaling mechanisms ensures the system remains robust and adaptable under varying workloads.

This structured approach enables seamless coordination between components, facilitating efficient task execution and system control within the Control Plane .

Data Plane

image

The diagram illustrates the workflow and interactions between key components responsible for managing cache operations, migration, and load balancing. This process is designed to optimize data handling and ensure efficient resource utilization.

Prefill CacheEngine:

The Prefill CacheEngine handles caching operations for prefill tasks. It interacts with the MigrationBackend.Store to store cached data, which can be migrated or loaded as needed.

Decode CacheEngine:

The Decode CacheEngine manages caching operations for decode tasks. It interacts with the MigrationBackend.Load to retrieve cached data when required.

Optional Store Component:

An optional Store component is included, which can be utilized for additional storage needs.This component may interact with the MigrationBackend.Store to manage persistent storage or backup mechanisms.

Migration Operations:

Both the Prefill CacheEngine and Decode CacheEngine utilize the MigrationBackend.Migrate functionality to migrate cached data as necessary. This ensures that cached data can be efficiently moved between different storage locations or systems, maintaining data consistency and availability.

Key Features

  • Cache Management: The Prefill CacheEngine and Decode CacheEngine are responsible for managing cached data during prefill and decode operations, respectively.
  • Migration and Load Balancing: The MigrationBackend facilitates data migration and loading, ensuring that cached data is appropriately managed and accessible.
  • Scalability and Flexibility: The inclusion of an optional Store component allows for additional storage flexibility, while the Router (Load Balance) ensures efficient distribution of workloads.

This structured approach enables seamless coordination between components, facilitating efficient data handling and system control within the Data Plane .

How to build

pip install dlslime==0.0.1.post2
pip install -v -e .

How to Run

  • in Server Side
    Step 1. Start Prefill Engine
lmdeploy serve api_server Qwen/Qwen2.5-72B --server-port 23333 --role Prefill --tp 8 --cache-block-seq-len 256

Step 2. Start Decode Engine

lmdeploy serve api_server Qwen/Qwen2.5-72B --server-port 23333 --role Prefill --tp 8 --cache-block-seq-len 256

Step 3. Start Router

python -m lmdeploy.disagg.router --host 0.0.0.0 --port 5000 --prefill-endpoint http://10.130.8.139:23333 --decode-endpoint http://10.130.8.139:23334
  • in Client Side
curl -X POST "http://localhost:5000/v1/completions" \
-H "Content-Type: application/json" \
-d '{"model": "Qwen/Qwen2.5-72B", "prompt": "Shanghai is a city that", "max_tokens": 32, "stream": true}'

@lvhan028 lvhan028 added the enhancement New feature or request label Mar 24, 2025
@JimyMa JimyMa force-pushed the distserve branch 2 times, most recently from 636fb5b to 94eee2b Compare April 1, 2025 03:27
Comment thread lmdeploy/disagg/router.py Outdated
Comment thread lmdeploy/cli/serve.py Outdated
@@ -1,5 +1,5 @@
# Copyright (c) OpenMMLab. All rights reserved.

from lmdeploy.disagg.messages import EngineRole, MigrationBackend, MigrationTransportProtocol

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can put this after line 307 to avoid unnecessary importing time

Comment thread lmdeploy/serve/proxy/proxy.py Outdated
JimyMa and others added 2 commits April 14, 2025 07:31
1. [PD Connection more efficiently][High Priority] In DSV3 DP + EP condition, we need to concurrently construct prefill_dp_size (for exampe 32) * decode_dp_size(for example 144) links. We add a function `pd_consolidation_multi_thread` to do this. However, we need to check if the construction operation is thread safe.
2. [Combine with proxy] Maybe we should save conn_config to avoid repeatly reconnection of PD Link.
3. [PD Control Plane][High Priority]  For DP + EP, we need to reconstruct DisaggEngineConfig to record more information (e.g. dp_idx, tp_idx ...)
4. [Combine with router][Important] How to perform PD Load Balance in disaggregated LLM Serving.
5. [PD Data Plane] adapt to Open Source KVCache manager like Mooncake, infiniStore or NiXL and more transport media.
Comment thread lmdeploy/pytorch/engine/engine.py Outdated
self._loop_main = None

# for migration loop management
self.migration_event = asyncio.Event()

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The engine is lazy started since we might not have the event loop when creating engine.
I don't know if it is safe to initialize asyncio.Event here.

Comment thread lmdeploy/pytorch/engine/engine.py Outdated
cache_block_ids = resp.data.get('cache_block_ids', None)
if resp.type == ResponseType.SUCCESS:
token_ids = resp.data['token_ids'].tolist()
token_ids = resp.data['token_ids']

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EngineInstance would output ndarray instead of list[int], is it acceptable @lvhan028 ?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's not

Comment thread lmdeploy/pytorch/engine/model_agent.py Outdated
@lvhan028 lvhan028 merged commit 50b0ef7 into InternLM:main May 8, 2025
5 checks passed
oliveagle pushed a commit to oliveagle/lmdeploy that referenced this pull request May 22, 2026
* sync main

* typo correct

* 1. typo 2. add migration event

* 1. move slime to 'https://github.com/JimyMa/DLSlime.git' and init readme.

* Update disagg README

* mute slime when disable distserve

* remove build_migration.sh

* revert debug code

* 1. identify interface. 2. add multi backend registry

* add dlslime max transfer batch

* add an infinistore interface

* add load/store

* conditional register of Multi Migration Backend

* merge router to proxy

* remove redandunt print

* 1. remove redandunt print 2. revert safe_run

* dsv3 kvtransfer support (bypass v cache)

* dsv3 debug, 1. change log info to log debug of log resp. 2. add num_cpus to ray.init for run in dlc

* DSV3 Debug, known issue:
1. [PD Connection more efficiently][High Priority] In DSV3 DP + EP condition, we need to concurrently construct prefill_dp_size (for exampe 32) * decode_dp_size(for example 144) links. We add a function `pd_consolidation_multi_thread` to do this. However, we need to check if the construction operation is thread safe.
2. [Combine with proxy] Maybe we should save conn_config to avoid repeatly reconnection of PD Link.
3. [PD Control Plane][High Priority]  For DP + EP, we need to reconstruct DisaggEngineConfig to record more information (e.g. dp_idx, tp_idx ...)
4. [Combine with router][Important] How to perform PD Load Balance in disaggregated LLM Serving.
5. [PD Data Plane] adapt to Open Source KVCache manager like Mooncake, infiniStore or NiXL and more transport media.

* revert match to if,else

* [bugfix] rename typo

* [refactor] refactor pd_conn

* 1. format code. 2. add engine_role for passing ut test

* 1. format code 2. parse dp, ep, and dp rank to DisaggEngineConfig

* 1. add pd conn timeout, 2. add default EngineRole to Hybrid, 3. fix disagg strategy proxy typo

* 1. refactor PDConnection Pool

* refactor debug

* fix migration loop bug

* add proxy arguments about distserve

* bugfix

* debug interface

* remove unnesessary EngineRole Check.

* add v1/chat/completions support

* remove redundent print

* async free cache

* async free cache

* 1. add some comments.

* 1. bugfix

* [proxy] add connection_warmup api

* 1. bugfix (warmup_connection_typo and wrong args) 2. preserve cache bugfix

* [disagg] update readme, 1. fault tolerance and 2. replace router to proxy.

* bugfix

* fix decode back pressure bug

* 1. add migration_request to chat/completions for correctly cache free

* 2. free cache bugfix

* 1. fix lock running bug

* 1. fix dist.broadcast deadlock

* [lint] 1. fix lint

* rename Ethernet to RoCE

* change emun.Enum.__members__[elem] to enum.Enum[elem] directly

* update readme

* update migration-backend

* 1. update readme 2. move module to string for conditional import

* 1. update readme

* 1. remove migic number and handle long assignments in dlslime. 2. add uniexecutor support

* fix error migration in dummy situation

* 1. bugfix when token is not a decodable utf-8 (in test)

* 1. overlapping migration and forward.

* bump dlslime to v0.0.1.post5

* remove print

* remove free in decode engine because already freed in proxy

* 1. bump dlslime to 0.0.1.post7

* 1. [proxy] revert self.nodes to nodes 2. [api_server] remove redundant api

* 1. [cli] remove available_nic args

* format comments

* [pytorch paging] remove redundant logger

* [model_agent] bugfix caused by merge

* [model agent] bypass model agent migrate

* revert migrate to sync mode

* bypass model agent migrate in uni_executor

* [proxy] set default serving strategy to DistServe

* 1. [disagg] update readme

* info -> debug

* remove unused code

* lazily initialize migration event

* add nvlink support

* mute TCP support by now

* update readme for execption

* set migration token_ids output to numpy array

* update readme

* In PD Disaggregation Mode, fallback next token ids to CPU

* 1. [disagg] update readme

* move disagg to pytorch backend
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants