-
-
Notifications
You must be signed in to change notification settings - Fork 9.8k
[KVConnector] Aggregate finished requests on the scheduler #19555
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KVConnector] Aggregate finished requests on the scheduler #19555
Conversation
👋 Hi! Thank you for contributing to the vLLM project. 💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels. Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging. To run CI, PR reviewers can either: Add 🚀 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @orozery, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request refactors the communication mechanism between workers and the scheduler for KV cache transfer status. It introduces a generic metadata channel from workers to the scheduler, allowing workers to send connector-specific information. The scheduler is now responsible for collecting and aggregating this metadata from all workers to determine the global state of KV transfers, particularly for distributed connectors like Nixl where completion requires coordination across multiple ranks.
Highlights
- New Worker-to-Scheduler Metadata API: Introduced a new base method
build_worker_connector_meta
on the worker side of the KV connector interface. This method allows workers to build and return arbitrary metadata (KVConnectorMetadata
) to the scheduler during each execution step. - Scheduler-side Aggregation of KV Transfer Status: Moved the responsibility of aggregating finished KV transfers (sending and receiving) from the worker side to the scheduler side. The scheduler-side base connector now has a
get_finished
method that takes the collected metadata from all workers and determines which requests have completed their transfers across the entire distributed setup. - ModelRunnerOutput Update: The
ModelRunnerOutput
dataclass, which workers return to the scheduler, has been updated to include a list ofkv_connector_metadata
instead of separatefinished_sending
andfinished_recving
sets. This allows workers to pass richer, connector-specific information. - Executor Aggregation: The multiprocess executor (
MultiprocExecutor
) now collectsModelRunnerOutput
from all workers and aggregates thekv_connector_metadata
from each worker into a single list before passing the combined output to the scheduler. - Nixl Connector Adaptation: The Nixl connector implementation has been updated to use the new metadata mechanism. It defines
NixlWorkerConnectorMetadata
to carry finished transfer IDs from the worker. The logic for tracking completion across all tensor parallel ranks has been moved from the worker-sideNixlConnectorWorker
to the scheduler-sideNixlConnectorScheduler
.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configureGemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
@sdavidbd we can change the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a mechanism for workers to send metadata to the scheduler via the KV connector, which is a significant architectural change. The core idea is to centralize the aggregation of KV transfer events and status on the scheduler side. Key changes include introducing build_worker_connector_meta
on the worker side and moving/refining get_finished
to the scheduler side.
Overall, the changes seem to implement the described functionality. However, there are a few critical areas to address:
- A potential logic swap in
vllm/v1/core/sched/scheduler.py
regarding howfinished_sending
andfinished_recving
statuses from the connector are interpreted and acted upon. This could lead to incorrect behavior like freeing blocks prematurely or not marking requests as ready when they are. - Type mismatches and potential runtime errors in
NixlConnectorMetadata
,ModelRunnerOutput
, andgpu_worker.py
related to handlingNone
values and list assignments forkv_connector_metadata
.
Addressing these points will be crucial for the correctness and stability of this new metadata flow.
@njhill @robertgshaw2-redhat putting this for preliminary review before weekend starts over here. |
f4351d0
to
916c8e2
Compare
assert isinstance(output, ModelRunnerOutput) | ||
return output if self.is_driver_worker else None | ||
if has_kv_transfer_group(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's safer to use is_v1_kv_transfer_group
until V0 is officially deprecated.
vllm/v1/outputs.py
Outdated
finished_sending: Optional[set[str]] = None | ||
finished_recving: Optional[set[str]] = None | ||
# KV Cache Connector metadata. | ||
kv_connector_metadata: Optional[list["KVConnectorMetadata"]] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KVConnectorMetadata
was originally intended for scheduler-to-worker signaling. Using it in the opposite direction (worker-to-scheduler) could blur its semantics. It might be cleaner to introduce a separate class like KVConnectorOutput
for this purpose.
Also, as mentioned above, I think aggregation in multi-worker setups should be handled at the MultiprocExecutor
level rather than in the Scheduler
. In that case kv_connector_metadata
should be typed as: Optional["KVConnectorMetadata"]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want the scheduler connector to get access to all of the KVConnectorMetadata
from all workers.
Only the connector knows what's inside the metadata. From the MultiprocExecutor
perspective it's opaque.
The Executor
returns a single ModelRunnerOutput
, so the way I found to let the scheduler connector access all metadatas is having the executor simply compose all of the metadatas to a list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand your approach, but I still believe the alternative I suggested - employing connector-specific aggregation logic at the executor - is cleaner. This way, the executor can return a single, aggregated KVConnectorMetadata
object, and the scheduler connector continues to work with a unified metadata instance rather than a list. It keeps the interface consistent and offloads connector-specific logic to where it belongs.
Also, could we revisit the idea of separating the metadata classes for scheduler-to-worker and worker-to-scheduler communication?
return EMPTY_MODEL_RUNNER_OUTPUT | ||
if has_kv_transfer_group(): | ||
with set_forward_context(None, self.vllm_config): | ||
self.maybe_setup_kv_connector(scheduler_output) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're missing a call to clear_connector_metadata
in this case (also before this change).
vllm/v1/worker/gpu_worker.py
Outdated
return output if self.is_driver_worker else None | ||
if has_kv_transfer_group(): | ||
kv_connector_metadata = \ | ||
get_kv_transfer_group().build_worker_connector_meta( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
build_worker_connector_meta
should be called in gpu_model_runner.execute_model
, before invoking clear_connector_metadata
.
Alternatively, we could delegate the state clearing to build_worker_connector_meta
itself and remove the clear_connector_metadata
API. This would also make it symmetric with build_connector_meta
, which is responsible for resetting the scheduler connector’s state.
vllm/v1/core/sched/scheduler.py
Outdated
@@ -1028,21 +1028,27 @@ def _update_waiting_for_remote_kv(self, request: Request) -> bool: | |||
self.finished_recving_kv_req_ids.remove(request.request_id) | |||
return True | |||
|
|||
def _update_from_kv_xfer_finished(self, | |||
model_runner_output: ModelRunnerOutput): | |||
def _update_from_kv_connector_metadata( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kv_connector_metadata
may carry distinct signals for different code paths. I think it would be cleaner if update_from_output
used dedicated connector APIs to extract the relevant information from kv_connector_metadata
, e.g.:
finished_sending, finished_recving = self.connector.get_finished(kv_connector_metadata)
_update_from_kv_xfer_finished(finished_sending, finished_recving)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better the scheduler connector aggregates the metadatas only once, and outputs everything the scheduler needs back (finished reqs, invalid blocks, etc).
So obviously get_finished
is not a good name. Maybe something like process_worker_output(..) -> ConnectorOutput
where ConnectorOutput
is a new struct that will contain all relevant fields (which were previously laid out flat on ModelRunnerOutput
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that metadata aggregation should happen only once (ideally at the executor level). After that, the scheduler would hold a single aggregated instance of the worker-side KVConnectorMetadata
(which is still opaque and connector-specific).
From there, the scheduler can use dedicated connector APIs (e.g., get_finished
) to extract only the information it needs. This keeps the design more flexible and scalable, rather than relying on a single API to unpack all possible data upfront.
|
||
def get_finished( | ||
self, | ||
model_runner_output: ModelRunnerOutput, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why ModelRunnerOutput
and not KVConnectorMetadata
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To allow the connector full awareness of the model output (maybe someone will want sampled_token_ids
).
Same way the connector gets full access of the SchedulerOutput
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scheduler connector only gets access to SchedulerOutput
for the purpose of creating metadata for the worker connector. Similarly, the worker connector should only access ModelRunnerOutput
to generate metadata for the scheduler connector.
In any case, I don’t think the scheduler connector should have access to ModelRunnerOutput
. That separation helps keep responsibilities clear and avoids unnecessary coupling.
self, finished_req_ids: set[str] | ||
) -> tuple[Optional[set[str]], Optional[set[str]]]: | ||
def build_worker_connector_meta( | ||
self, scheduler_output: SchedulerOutput, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to pass in SchedulerOutput
? I think we should make it symmetric with the scheduler-side build_connector_meta
and pass just ModelRunnerOutput
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your suggestion will work assuming the connector already got the SchedulerOutput
(by bind_connector_metadata
).
But there's also clear_connector_metadata
in the way, so this seems more fragile to me to try to correspond to the correct scheduler output. I would prefer to directly pass in the scheduler output here to make it easier and more explicit for the worker side connector to build its metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe clear_connector_metadata
should always be the last worker-side API invoked in each step, after which the worker connector’s state is reset. As I mentioned earlier, a cleaner alternative is to make build_worker_connector_meta
responsible for both building the worker-side metadata and resetting the state - mirroring the behavior of build_connector_meta
on the scheduler side - thereby removing the need for a separate clear_connector_metadata
API.
In any case, the worker connector shouldn't need access to the SchedulerOutput
. As you said, it should already receive everything it needs via bind_connector_metadata
.
This keeps the design symmetric:
- The scheduler connector builds the metadata for the worker connector from
SchedulerOutput
and resets its state. - The worker connector builds the metadata for the scheduler connector from
ModelRunnerOutput
and resets its state.
kv_connector_metadata = [] | ||
for i, output in enumerate(outputs): | ||
kv_connector_metadata += output.kv_connector_metadata or [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be cleaner to use connector-specific aggregation logic here - for example, by introducing a new worker-side KVConnector
API dedicated to aggregating the metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this, but I did not want to introduce the connector inside the executor. Currently it's only in scheduler.py
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're already introducing KVConnectorMetadata
into the executor - so it seems reasonable to also give the executor its own KVConnector
instance. This could be a new EXECUTOR
role with a single API dedicated to aggregating worker-side metadata.
Personally, I find this cleaner than having each worker return a list with a single metadata object and adding ad-hoc logic in the executor to manually merge those lists. Delegating aggregation to the connector keeps the logic encapsulated and consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@njhill What are your thoughts on introducing the connector to the executor to allow aggregation of workers output there?
As suggested in my review, I think we should introduce a new connector API to extract the invalid block IDs from the worker-side connector metadata. |
This pull request has merge conflicts that must be resolved before it can be |
Thanks @orozery. I like this but just trying to think thorough implications/alternatives. In the interests of keeping the interface as simple as possible and minimizing concerns on the connector impl side. One idea is whether it would make sense to abstract this return flow of information in the form of events, i.e. a generalization of what we already have with the lists of finished request ids. It may be that we can then still encapsulate the TP aggregation of these within the framework, since we would require a positive response from all workers. One or more negative results (failures) would translate to a negative result when aggregated. |
@njhill IIUC (please correct me) your suggestion is as follows: @sdavidbd your thoughts on this? |
I really like the idea of making the worker-side connector metadata explicit rather than opaque - especially since it's ultimately consumed by the framework. Given the choice between:
- I’d strongly prefer the latter. Regarding aggregation, I think we can keep it simple and sufficient by following two principles:
For example:
|
@sdavidbd I started implementing with So I made some re-thinking and came up with a suggestion which is somewhere in the middle between opaque and explicit: This justifies why each worker actually returns a list (in the previous solution, it was not clear why each worker returns a singleton [worker_metadata]). |
d94ea04
to
ce105e7
Compare
finished_sending = set[str]() | ||
finished_recving = set[str]() | ||
for output in outputs: | ||
# update finished_sending | ||
for req_id in output.finished_sending or []: | ||
new_count = self._send_remaining_count[req_id] - 1 | ||
if new_count == 0: | ||
# got response from all workers, report back to scheduler | ||
finished_sending.add(req_id) | ||
del self._send_remaining_count[req_id] | ||
else: | ||
self._send_remaining_count[req_id] = new_count | ||
|
||
# update finished_recving | ||
for req_id in output.finished_recving or []: | ||
new_count = self._recv_remaining_count[req_id] - 1 | ||
if new_count == 0: | ||
# got response from all workers, report back to scheduler | ||
finished_recving.add(req_id) | ||
del self._recv_remaining_count[req_id] | ||
else: | ||
self._recv_remaining_count[req_id] = new_count |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finished_sending = set[str]() | |
finished_recving = set[str]() | |
for output in outputs: | |
# update finished_sending | |
for req_id in output.finished_sending or []: | |
new_count = self._send_remaining_count[req_id] - 1 | |
if new_count == 0: | |
# got response from all workers, report back to scheduler | |
finished_sending.add(req_id) | |
del self._send_remaining_count[req_id] | |
else: | |
self._send_remaining_count[req_id] = new_count | |
# update finished_recving | |
for req_id in output.finished_recving or []: | |
new_count = self._recv_remaining_count[req_id] - 1 | |
if new_count == 0: | |
# got response from all workers, report back to scheduler | |
finished_recving.add(req_id) | |
del self._recv_remaining_count[req_id] | |
else: | |
self._recv_remaining_count[req_id] = new_count | |
def update_finished_set( | |
req_ids: list[str], remaining_count_dict: dict[str, int], finished_set: set[str] | |
) -> None: | |
for req_id in req_ids or []: | |
new_count = remaining_count_dict[req_id] - 1 | |
if new_count == 0: | |
finished_set.add(req_id) | |
del remaining_count_dict[req_id] | |
else: | |
remaining_count_dict[req_id] = new_count | |
finished_sending = set[str]() | |
finished_recving = set[str]() | |
for output in outputs: | |
update_finished_set(output.finished_sending, self._send_remaining_count, finished_sending) | |
update_finished_set(output.finished_recving, self._recv_remaining_count, finished_recving) |
@njhill @orozery Also, I’m uncomfortable with this being done after the call to |
Thanks @sdavidbd I agree about the clear_connector_metadata call happening last, and that there was a missing call to I'm also unsure why we set it to an empty Would you be interested in making a follow-on PR to address these things? |
…ect#19555) Signed-off-by: Or Ozeri <oro@il.ibm.com>
…ect#19555) Signed-off-by: Or Ozeri <oro@il.ibm.com> Signed-off-by: Patrick von Platen <patrick.v.platen@gmail.com>
@orozery @njhill @sdavidbd why are we moving the logic out of the nixl connector? Since the logic is not replicated across other distributed backends it is breaking ray's support for kv-connector. Essentially the logic has to be repeated inside the workers and executors which doesn't make sense. It seems to have belonged to the right components which was the connector implementation itself. |
My motivation was to allow connectors to re-use this logic without having to re-implement it. BTW I don't think it will be hard to fix the ray executor. |
Apologies, I also hadn't considered the ray executor implications. I agree with @orozery though that it should be straightforward to make a corresponding update to the ray executor, and also that we should cover that in the CI. I am out on vacation this week but can help with that next week if needed. |
IC, the reasoning for reusing this logic connectors make sense. But does it make sense for it to be implemented in a base class / utility that connectors use? or should it belong to the executor? I rather have executor logic agnostic to using a connector or otherwise. It's some sort of conceptual leakage.
Yeah the test coverage on nixl path is unfortunately still low. I just realized this now. We should add more tests (more importantly nixl dependency is not added to CI yet)
I think you are right. This might not need to change since ray creates a wrapper worker around these
True, but I think this logic still belongs to connectors conceptually. |
@njhill @orozery @robertgshaw2-redhat What do you guys think about the reusability of this logic for done receiveing and done trasfering, I think it should be completely abstracted from executor and delegated completely to the connector implementations (reused across them) |
I think that it makes sense to live in a utility class that executors use, since they are responsible for orchestrating the workers and for the multi-worker "abstraction". This could change later though if we have a more significant rethink of the connector interface, for example decoupling it from the workers altogether. |
Signed-off-by: Juncheng Gu <juncgu@gmail.com>
…ect#19555) Signed-off-by: Or Ozeri <oro@il.ibm.com>
…ect#19555) Signed-off-by: Or Ozeri <oro@il.ibm.com> Signed-off-by: avigny <47987522+avigny@users.noreply.github.com>
…ect#19555) Signed-off-by: Or Ozeri <oro@il.ibm.com>
…ect#19555) Signed-off-by: Or Ozeri <oro@il.ibm.com>
…ect#19555) Signed-off-by: Or Ozeri <oro@il.ibm.com> Signed-off-by: Jinzhen Lin <linjinzhen@hotmail.com>
…ect#19555) Signed-off-by: Or Ozeri <oro@il.ibm.com>
…ect#19555) Signed-off-by: Or Ozeri <oro@il.ibm.com> Signed-off-by: Diego-Castan <diego.castan@ibm.com>
…ect#19555) Signed-off-by: Or Ozeri <oro@il.ibm.com>
…ect#19555) Signed-off-by: Or Ozeri <oro@il.ibm.com>
This PR enables the worker-side KV connector to pass on arbitrary metadata to the scheduler-side connector.This allows a standard and easy mechanism to aggregate kv-connector events from all workers.
In a nut-shell, we introduce the following connector APIs:
build_worker_connector_meta
on the worker side, allowing the worker to build metadata to be sent back to the scheduler.get_finished
- on the scheduler side (which was previously a worker side) - gets the connector metadata from all workers and yields the finished request transfers.This PR makes the following changes:
and aggregate the finished_sending and finished_recving from all.