-
Notifications
You must be signed in to change notification settings - Fork 6.8k
[WIP] [Prototype] ActorMesh API #54760
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @pcmoritz, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request introduces a new ActorMesh
API in Ray, which I've designed to provide a unified abstraction for managing and interacting with groups of Ray actors. My aim is to simplify common distributed patterns like sharding, collective operations, and load balancing across actor groups, particularly for post-training workloads such as training, inference, and data processing.
Highlights
- New
ActorMesh
Abstraction: I've introduced theActorMesh
class to manage a collection of Ray actors, allowing them to be treated as a single logical unit. - Flexible Actor Grouping: I've enabled defining actor groups with a
shape
(integer, tuple, or dictionary) to represent different sharding dimensions. - Three Invocation Patterns: I've provided
all
,choose
, andshard
methods for interacting with the actor mesh:
all
: Executes a method on all actors in the mesh.choose
: Selects a single actor (currently random) to execute a method, enabling load balancing.shard
: Distributes input data across actors for parallel processing, requiring a user-defineddispatch_fn
.
@method
Decorator for Sharding: I've introduced a@method
decorator that can be applied to actor methods to specify adispatch_fn
for custom sharding logic.- Actor Rank Environment Variable: Each actor within the mesh is assigned a
RAY_ACTOR_MESH_RANK
environment variable, allowing actors to behave differently based on their position.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces an ActorMesh API for managing groups of actors, providing functionalities for executing methods on all actors, choosing a single actor, and sharding data among them. The API is designed to support post-training workloads such as training, inference, and data processing.
self._actors.append(actor) | ||
|
||
@property | ||
def methods(self) -> type[T]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is currently introduced to (1) provide correct typing for the methods (the original idea for this was from https://github.com/zen-xu/sunray, see also #54149) and (2) avoid clashes of user defined methods and methods / properties defined on the ActorMesh
class itself.
However it is slightly clunky having to always type mesh.methods.{method}
, so ideally I'd like to get rid of this property and provide the actor methods directly as methods on the ActorMesh[ActorClass]
class. (2) could be solved in other ways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However it is slightly clunky having to always type mesh.methods.{method}, so ideally I'd like to get rid of this property and provide the actor methods directly as methods on the ActorMesh[ActorClass] class.
+1
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This PR introduces the ActorMesh
API, a new abstraction for managing groups of actors in Ray. I've identified a critical bug in the actor count calculation, some performance optimizations, and opportunities to enhance code clarity and maintainability.
runtime_env = copy.deepcopy(runtime_env) or {} | ||
for i in range(self._num_actors): | ||
env_vars = {**runtime_env.get("env_vars", {}), "RAY_ACTOR_MESH_RANK": str(i)} | ||
ray_actor_cls = ray.remote( | ||
runtime_env={**runtime_env, "env_vars": env_vars}, | ||
)(actor_cls) | ||
actor = ray_actor_cls.remote(*args, **kwargs) | ||
self._actors.append(actor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling ray.remote()
inside a loop is inefficient. This operation involves class serialization and registration with the GCS, which introduces unnecessary overhead for each actor created. It's much more efficient to decorate the actor class once outside the loop and then use .options()
to specify the per-actor runtime_env
during instantiation.
runtime_env = copy.deepcopy(runtime_env) or {}
ray_actor_cls = ray.remote(actor_cls)
for i in range(self._num_actors):
env_vars = {**runtime_env.get("env_vars", {}), "RAY_ACTOR_MESH_RANK": str(i)}
actor_runtime_env = {**runtime_env, "env_vars": env_vars}
actor = ray_actor_cls.options(runtime_env=actor_runtime_env).remote(
*args, **kwargs
)
self._actors.append(actor)
python/ray/actor_mesh.py
Outdated
pass | ||
|
||
def choose(self, *args: P.args, **kwargs: P.kwargs) -> "ray.ObjectRef[R]": | ||
"Call one of the actors in the mesh." | ||
pass | ||
|
||
def shard(self, *args: P.args, **kwargs: P.kwargs) -> R: | ||
"Call all the actors in the mesh, shard the argument(s) among them." | ||
pass | ||
|
||
class ActorMethodProxy: | ||
|
||
def __init__(self, actor_mesh: "ActorMesh", actor_method: str) -> None: | ||
self.actor_mesh = actor_mesh | ||
self.actor_method = actor_method | ||
|
||
def all(self, *args, **kwargs): | ||
results = [] | ||
for actor in self.actor_mesh.actors: | ||
method = getattr(actor, self.actor_method) | ||
results.append(method.remote(*args, **kwargs)) | ||
return results | ||
|
||
def choose(self, *args, **kwargs): | ||
# Choosing randomly right now, this is bad and needs to be improved | ||
actor = random.choice(self.actor_mesh.actors) | ||
method = getattr(actor, self.actor_method) | ||
return method.remote(*args, **kwargs) | ||
|
||
def shard(self, *args, **kwargs): | ||
orig_func = getattr(self.actor_mesh._actor_cls, self.actor_method) | ||
dispatch_fn = getattr(orig_func, "_dispatch_fn", None) | ||
assert dispatch_fn, "Sharding only supported if dispatch_fn is specified (later we will have a good default one)" | ||
return dispatch_fn(self.actor_mesh, self.actor_method, *args, **kwargs) | ||
|
||
|
||
class ActorMesh(Generic[T]): | ||
|
||
def __init__( | ||
self, | ||
actor_cls: Callable[..., T], | ||
args: Any, | ||
kwargs: Any, | ||
shape: Union[int, Tuple[int], Dict[str, int]], | ||
# TODO: Not used yet | ||
resources_per_actor: Optional[Dict[str, float]] = None, | ||
runtime_env: Optional[Dict[str, Any]] = None, | ||
): | ||
|
||
if isinstance(shape, int): | ||
shape = (shape,) | ||
|
||
self.shape = shape | ||
|
||
if isinstance(shape, dict): | ||
self._num_actors = sum(shape.values()) | ||
else: | ||
self._num_actors = sum(shape) | ||
|
||
self._actor_cls = actor_cls | ||
self._actors = [] | ||
runtime_env = copy.deepcopy(runtime_env) or {} | ||
for i in range(self._num_actors): | ||
env_vars = {**runtime_env.get("env_vars", {}), "RAY_ACTOR_MESH_RANK": str(i)} | ||
ray_actor_cls = ray.remote( | ||
runtime_env={**runtime_env, "env_vars": env_vars}, | ||
)(actor_cls) | ||
actor = ray_actor_cls.remote(*args, **kwargs) | ||
self._actors.append(actor) | ||
|
||
@property | ||
def methods(self) -> type[T]: | ||
return ActorMeshProxy(self) | ||
|
||
@property | ||
def actors(self) -> List[ActorHandle]: | ||
return self._actors | ||
|
||
@property | ||
def num_actors(self) -> int: | ||
return self._num_actors | ||
|
||
@overload | ||
def method(method: Callable[Concatenate[Any, P], R]) -> ActorMethod[P, R]: | ||
... | ||
|
||
@overload | ||
def method(dispatch_fn: Optional[Callable] = None,) -> Callable[[Callable[Concatenate[Any, P], R]], ActorMethod[P, R]]: | ||
... | ||
|
||
def method(method=None, dispatch_fn=None, **kwargs): | ||
|
||
def decorator(f): | ||
method = ray.method(**kwargs)(f) | ||
method._dispatch_fn = dispatch_fn | ||
return method | ||
|
||
if method is not None: | ||
return decorator(method) | ||
return decorator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The class name ActorMethod
conflicts with ray.actor.ActorMethod
. This is confusing because they are different classes with different purposes. The method
decorator in this file is type-hinted to return actor_mesh.ActorMethod
, but its implementation returns a ray.actor.ActorMethod
(or a wrapper around it).
This name collision will likely cause confusion for users and static analysis tools. Please rename ActorMethod
in this file to something more specific, like MeshMethod
, to avoid the conflict. You'll also need to update the type hints for the method
decorator.
Additionally, the method
decorator implementation has confusing variable names. The variable method
is used for the function itself, its argument, and a local variable. This harms readability. Consider renaming the argument to avoid shadowing.
class MeshMethod(Generic[P, R]):
def __init__(self, method):
self.method = method
def all(self, *args: P.args, **kwargs: P.kwargs) -> List["ray.ObjectRef[R]"]:
"Call all the actors in the mesh with the same arguments."
pass
def choose(self, *args: P.args, **kwargs: P.kwargs) -> "ray.ObjectRef[R]":
"Call one of the actors in the mesh."
pass
def shard(self, *args: P.args, **kwargs: P.kwargs) -> R:
"Call all the actors in the mesh, shard the argument(s) among them."
pass
class ActorMethodProxy:
def __init__(self, actor_mesh: "ActorMesh", actor_method: str) -> None:
self.actor_mesh = actor_mesh
self.actor_method = actor_method
def all(self, *args, **kwargs):
results = []
for actor in self.actor_mesh.actors:
method = getattr(actor, self.actor_method)
results.append(method.remote(*args, **kwargs))
return results
def choose(self, *args, **kwargs):
# Choosing randomly right now, this is bad and needs to be improved
actor = random.choice(self.actor_mesh.actors)
method = getattr(actor, self.actor_method)
return method.remote(*args, **kwargs)
def shard(self, *args, **kwargs):
orig_func = getattr(self.actor_mesh._actor_cls, self.actor_method)
dispatch_fn = getattr(orig_func, "_dispatch_fn", None)
assert dispatch_fn, "Sharding only supported if dispatch_fn is specified (later we will have a good default one)"
return dispatch_fn(self.actor_mesh, self.actor_method, *args, **kwargs)
class ActorMesh(Generic[T]):
def __init__(
self,
actor_cls: Callable[..., T],
args: Any,
kwargs: Any,
shape: Union[int, Tuple[int], Dict[str, int]],
# TODO: Not used yet
resources_per_actor: Optional[Dict[str, float]] = None,
runtime_env: Optional[Dict[str, Any]] = None,
):
if isinstance(shape, int):
shape = (shape,)
self.shape = shape
if isinstance(shape, dict):
self._num_actors = sum(shape.values())
else:
self._num_actors = sum(shape)
self._actor_cls = actor_cls
self._actors = []
runtime_env = copy.deepcopy(runtime_env) or {}
for i in range(self._num_actors):
env_vars = {**runtime_env.get("env_vars", {}), "RAY_ACTOR_MESH_RANK": str(i)}
ray_actor_cls = ray.remote(
runtime_env={**runtime_env, "env_vars": env_vars},
)(actor_cls)
actor = ray_actor_cls.remote(*args, **kwargs)
self._actors.append(actor)
@property
def methods(self) -> type[T]:
return ActorMeshProxy(self)
@property
def actors(self) -> List[ActorHandle]:
return self._actors
@property
def num_actors(self) -> int:
return self._num_actors
@overload
def method(method: Callable[Concatenate[Any, P], R]) -> MeshMethod[P, R]:
...
@overload
def method(dispatch_fn: Optional[Callable] = None,) -> Callable[[Callable[Concatenate[Any, P], R]], MeshMethod[P, R]]:
...
def method(fn=None, dispatch_fn=None, **kwargs):
def decorator(f):
decorated = ray.method(**kwargs)(f)
decorated._dispatch_fn = dispatch_fn
return decorated
if fn is not None:
return decorator(fn)
return decorator
|
||
def choose(self, *args, **kwargs): | ||
# Choosing randomly right now, this is bad and needs to be improved | ||
actor = random.choice(self.actor_mesh.actors) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jjyao @stephanie-wang @kevin85421 Do you know of a good way to do a better job with the load balancing here with existing Ray Core APIs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we currently expose number of pending tasks per actor but it wouldn't be very hard to add.
Seems also useful to have a version of choose
that lets you pass in a specific index? It'd be nice to be able to this:
mesh = ...
ref = mesh[1, 0].foo.remote(...)
Eventually could also use it for sub-meshes:
2d_mesh = ...
sub_mesh = mesh[1, :]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, the API that lets Ray choose the least-loaded could be reused for Ray Data actor pools. (cc @jjyao)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That, or we promote the pool itself to a core API and implement it in cpp directly without adding a new low-level API. This will likely be needed to satisfy data FT requirements (retry on any actor in the pool)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind adding some examples that simulate data communication between two SPMD systems, where each SPMD is an ActorMesh (ex: weight syncing)?
results.append(method.remote(*args, **kwargs)) | ||
return results | ||
|
||
def choose(self, *args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be push-based or pull-based? Or should we support both? If the use case involves data, pull-based might be a better option because of long-tail tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking two very reasonable implementations here are:
- Power of two scheduling, where you choose two actors and then dispatch onto the one with the least number of tasks in the queue
- Having a queue and dispatch new tasks as soon as an actor is finished (that's probably what you mean with pull based) -- the challenge with that with the current API is that if object references are exposed to the user directly, there is no way for us to know when a task finishes. So maybe that would need to be implemented at the Ray Core level where we probably have the required callbacks.
Besides these, there should also be ways to make this more user defined (as an example, think about processing LoRA requests, you might want to schedule them onto actors that already have the LoRA adapter loaded).
self._actors.append(actor) | ||
|
||
@property | ||
def methods(self) -> type[T]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However it is slightly clunky having to always type mesh.methods.{method}, so ideally I'd like to get rid of this property and provide the actor methods directly as methods on the ActorMesh[ActorClass] class.
+1
This pull request has been automatically marked as stale because it has not had You can always ask for help on our discussion forum or Ray's public slack channel. If you'd like to keep this open, just leave any comment, and the stale label will be removed. |
Why are these changes needed?
Many libraries that are built on top of Ray have an abstraction for a group of actors. Prominent use case include training (for sharding models), inference (for load balancing requests across model replicas or sharding models) and data processing.
Examples include:
In this PR we strive to implement an actor mesh abstraction that can support these workloads. It is based on a "mesh" of actors of a certain shape. Think of the dimensions of this shape corresponding to different sharding dimensions of a model, like pipeline parallel, tensor parallel or sequence parallel. The common case of an actor group would be a shape with a single dimension.
The main target of this abstraction is post-training workloads, which is sufficiently general in that it covers all of the above use cases (training, inference, simulation / data processing). See our comparison blog post for how these APIs connect to the use cases and how different open source LLM RL libraries are using actor pools to achieve their goals.
We orient the API around verl's single controller design and offer three invocation methods:
all
-- execute the same method on all actors, on the same data. Different actors will run different code based on their state and their actor rank (e.g. available throught theRAY_ACTOR_MESH_RANK
environment variable).choose
-- invoke a single actor and load balance the requests over the actors. Going forward we will implement more serious load balancing strategies and also make the load balancing pluggable so users can implement their own.shard
-- this is a collective verl style operation and provides a dispatch hook to customize how data is sharded among the actors (two examples are given in the PR,python_dispatch_fn
for distributing a python list andtorch_dispatch_fn
for distributing a torch tensor. I assume there will be many such pretty standardized and reusable strategies with high performant implementations that library authors can offer to their users.This API is very much work in progress / in flux and we would love to get feedback. Currently we keep it as simple as possible. Follow up items include:
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.