Skip to content

Conversation

pcmoritz
Copy link
Contributor

@pcmoritz pcmoritz commented Jul 19, 2025

Why are these changes needed?

Many libraries that are built on top of Ray have an abstraction for a group of actors. Prominent use case include training (for sharding models), inference (for load balancing requests across model replicas or sharding models) and data processing.

Examples include:

  • Verl, see also single controller docs: This implementation is very flexible in terms of how data is scattered to the participating actors and how the results are collected at the end (via a dispatch mechanism). It also supports multiplexing actor groups on underlying resources.
  • NemoRL: This implementation is structured around a flexible map style API and also supports flexible sharding
  • SkyRL (adapted from OpenReasonerZero)
  • Ray Train
  • Ray Data
  • Older Ray actor group and actor pool – they mainly differ in that for the actor pool, the user is responsible for creating the actors and for the actor group, the actors are managed by the actor group

In this PR we strive to implement an actor mesh abstraction that can support these workloads. It is based on a "mesh" of actors of a certain shape. Think of the dimensions of this shape corresponding to different sharding dimensions of a model, like pipeline parallel, tensor parallel or sequence parallel. The common case of an actor group would be a shape with a single dimension.

The main target of this abstraction is post-training workloads, which is sufficiently general in that it covers all of the above use cases (training, inference, simulation / data processing). See our comparison blog post for how these APIs connect to the use cases and how different open source LLM RL libraries are using actor pools to achieve their goals.

We orient the API around verl's single controller design and offer three invocation methods:

  1. all -- execute the same method on all actors, on the same data. Different actors will run different code based on their state and their actor rank (e.g. available throught the RAY_ACTOR_MESH_RANK environment variable).
  2. choose -- invoke a single actor and load balance the requests over the actors. Going forward we will implement more serious load balancing strategies and also make the load balancing pluggable so users can implement their own.
  3. shard -- this is a collective verl style operation and provides a dispatch hook to customize how data is sharded among the actors (two examples are given in the PR, python_dispatch_fn for distributing a python list and torch_dispatch_fn for distributing a torch tensor. I assume there will be many such pretty standardized and reusable strategies with high performant implementations that library authors can offer to their users.

This API is very much work in progress / in flux and we would love to get feedback. Currently we keep it as simple as possible. Follow up items include:

  • Flesh out the rank environment variable more and make a local rank (according to the shape) available in addition to the global rank
  • Provide more real world usage examples
  • Async methods / calls
  • Show how the API maps to the existing APIs
  • Get the PR merged
  • Provide high performance implementations for the collective RPCs that are involved in this API

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@pcmoritz pcmoritz requested a review from a team as a code owner July 19, 2025 22:50
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @pcmoritz, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a new ActorMesh API in Ray, which I've designed to provide a unified abstraction for managing and interacting with groups of Ray actors. My aim is to simplify common distributed patterns like sharding, collective operations, and load balancing across actor groups, particularly for post-training workloads such as training, inference, and data processing.

Highlights

  • New ActorMesh Abstraction: I've introduced the ActorMesh class to manage a collection of Ray actors, allowing them to be treated as a single logical unit.
  • Flexible Actor Grouping: I've enabled defining actor groups with a shape (integer, tuple, or dictionary) to represent different sharding dimensions.
  • Three Invocation Patterns: I've provided all, choose, and shard methods for interacting with the actor mesh:
  • all: Executes a method on all actors in the mesh.
  • choose: Selects a single actor (currently random) to execute a method, enabling load balancing.
  • shard: Distributes input data across actors for parallel processing, requiring a user-defined dispatch_fn.
  • @method Decorator for Sharding: I've introduced a @method decorator that can be applied to actor methods to specify a dispatch_fn for custom sharding logic.
  • Actor Rank Environment Variable: Each actor within the mesh is assigned a RAY_ACTOR_MESH_RANK environment variable, allowing actors to behave differently based on their position.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an ActorMesh API for managing groups of actors, providing functionalities for executing methods on all actors, choosing a single actor, and sharding data among them. The API is designed to support post-training workloads such as training, inference, and data processing.

self._actors.append(actor)

@property
def methods(self) -> type[T]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently introduced to (1) provide correct typing for the methods (the original idea for this was from https://github.com/zen-xu/sunray, see also #54149) and (2) avoid clashes of user defined methods and methods / properties defined on the ActorMesh class itself.

However it is slightly clunky having to always type mesh.methods.{method}, so ideally I'd like to get rid of this property and provide the actor methods directly as methods on the ActorMesh[ActorClass] class. (2) could be solved in other ways.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However it is slightly clunky having to always type mesh.methods.{method}, so ideally I'd like to get rid of this property and provide the actor methods directly as methods on the ActorMesh[ActorClass] class.

+1

@pcmoritz
Copy link
Contributor Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This PR introduces the ActorMesh API, a new abstraction for managing groups of actors in Ray. I've identified a critical bug in the actor count calculation, some performance optimizations, and opportunities to enhance code clarity and maintainability.

Comment on lines +90 to +97
runtime_env = copy.deepcopy(runtime_env) or {}
for i in range(self._num_actors):
env_vars = {**runtime_env.get("env_vars", {}), "RAY_ACTOR_MESH_RANK": str(i)}
ray_actor_cls = ray.remote(
runtime_env={**runtime_env, "env_vars": env_vars},
)(actor_cls)
actor = ray_actor_cls.remote(*args, **kwargs)
self._actors.append(actor)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Calling ray.remote() inside a loop is inefficient. This operation involves class serialization and registration with the GCS, which introduces unnecessary overhead for each actor created. It's much more efficient to decorate the actor class once outside the loop and then use .options() to specify the per-actor runtime_env during instantiation.

runtime_env = copy.deepcopy(runtime_env) or {}
        ray_actor_cls = ray.remote(actor_cls)
        for i in range(self._num_actors):
            env_vars = {**runtime_env.get("env_vars", {}), "RAY_ACTOR_MESH_RANK": str(i)}
            actor_runtime_env = {**runtime_env, "env_vars": env_vars}
            actor = ray_actor_cls.options(runtime_env=actor_runtime_env).remote(
                *args, **kwargs
            )
            self._actors.append(actor)

Comment on lines 22 to 128
pass

def choose(self, *args: P.args, **kwargs: P.kwargs) -> "ray.ObjectRef[R]":
"Call one of the actors in the mesh."
pass

def shard(self, *args: P.args, **kwargs: P.kwargs) -> R:
"Call all the actors in the mesh, shard the argument(s) among them."
pass

class ActorMethodProxy:

def __init__(self, actor_mesh: "ActorMesh", actor_method: str) -> None:
self.actor_mesh = actor_mesh
self.actor_method = actor_method

def all(self, *args, **kwargs):
results = []
for actor in self.actor_mesh.actors:
method = getattr(actor, self.actor_method)
results.append(method.remote(*args, **kwargs))
return results

def choose(self, *args, **kwargs):
# Choosing randomly right now, this is bad and needs to be improved
actor = random.choice(self.actor_mesh.actors)
method = getattr(actor, self.actor_method)
return method.remote(*args, **kwargs)

def shard(self, *args, **kwargs):
orig_func = getattr(self.actor_mesh._actor_cls, self.actor_method)
dispatch_fn = getattr(orig_func, "_dispatch_fn", None)
assert dispatch_fn, "Sharding only supported if dispatch_fn is specified (later we will have a good default one)"
return dispatch_fn(self.actor_mesh, self.actor_method, *args, **kwargs)


class ActorMesh(Generic[T]):

def __init__(
self,
actor_cls: Callable[..., T],
args: Any,
kwargs: Any,
shape: Union[int, Tuple[int], Dict[str, int]],
# TODO: Not used yet
resources_per_actor: Optional[Dict[str, float]] = None,
runtime_env: Optional[Dict[str, Any]] = None,
):

if isinstance(shape, int):
shape = (shape,)

self.shape = shape

if isinstance(shape, dict):
self._num_actors = sum(shape.values())
else:
self._num_actors = sum(shape)

self._actor_cls = actor_cls
self._actors = []
runtime_env = copy.deepcopy(runtime_env) or {}
for i in range(self._num_actors):
env_vars = {**runtime_env.get("env_vars", {}), "RAY_ACTOR_MESH_RANK": str(i)}
ray_actor_cls = ray.remote(
runtime_env={**runtime_env, "env_vars": env_vars},
)(actor_cls)
actor = ray_actor_cls.remote(*args, **kwargs)
self._actors.append(actor)

@property
def methods(self) -> type[T]:
return ActorMeshProxy(self)

@property
def actors(self) -> List[ActorHandle]:
return self._actors

@property
def num_actors(self) -> int:
return self._num_actors

@overload
def method(method: Callable[Concatenate[Any, P], R]) -> ActorMethod[P, R]:
...

@overload
def method(dispatch_fn: Optional[Callable] = None,) -> Callable[[Callable[Concatenate[Any, P], R]], ActorMethod[P, R]]:
...

def method(method=None, dispatch_fn=None, **kwargs):

def decorator(f):
method = ray.method(**kwargs)(f)
method._dispatch_fn = dispatch_fn
return method

if method is not None:
return decorator(method)
return decorator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The class name ActorMethod conflicts with ray.actor.ActorMethod. This is confusing because they are different classes with different purposes. The method decorator in this file is type-hinted to return actor_mesh.ActorMethod, but its implementation returns a ray.actor.ActorMethod (or a wrapper around it).

This name collision will likely cause confusion for users and static analysis tools. Please rename ActorMethod in this file to something more specific, like MeshMethod, to avoid the conflict. You'll also need to update the type hints for the method decorator.

Additionally, the method decorator implementation has confusing variable names. The variable method is used for the function itself, its argument, and a local variable. This harms readability. Consider renaming the argument to avoid shadowing.

class MeshMethod(Generic[P, R]):

    def __init__(self, method):
        self.method = method

    def all(self, *args: P.args, **kwargs: P.kwargs) -> List["ray.ObjectRef[R]"]:
        "Call all the actors in the mesh with the same arguments."
        pass

    def choose(self, *args: P.args, **kwargs: P.kwargs) -> "ray.ObjectRef[R]":
        "Call one of the actors in the mesh."
        pass
        
    def shard(self, *args: P.args, **kwargs: P.kwargs) -> R:
        "Call all the actors in the mesh, shard the argument(s) among them."
        pass
    
class ActorMethodProxy:

    def __init__(self, actor_mesh: "ActorMesh", actor_method: str) -> None:
        self.actor_mesh = actor_mesh
        self.actor_method = actor_method

    def all(self, *args, **kwargs):
        results = []
        for actor in self.actor_mesh.actors:
            method = getattr(actor, self.actor_method)
            results.append(method.remote(*args, **kwargs))
        return results
    
    def choose(self, *args, **kwargs):
        # Choosing randomly right now, this is bad and needs to be improved
        actor = random.choice(self.actor_mesh.actors)
        method = getattr(actor, self.actor_method)
        return method.remote(*args, **kwargs)
    
    def shard(self, *args, **kwargs):
        orig_func = getattr(self.actor_mesh._actor_cls, self.actor_method)
        dispatch_fn = getattr(orig_func, "_dispatch_fn", None)
        assert dispatch_fn, "Sharding only supported if dispatch_fn is specified (later we will have a good default one)"
        return dispatch_fn(self.actor_mesh, self.actor_method, *args, **kwargs)


class ActorMesh(Generic[T]):

    def __init__(
            self,
            actor_cls: Callable[..., T],
            args: Any,
            kwargs: Any,
            shape: Union[int, Tuple[int], Dict[str, int]],
            # TODO: Not used yet
            resources_per_actor: Optional[Dict[str, float]] = None,
            runtime_env: Optional[Dict[str, Any]] = None,
        ):

        if isinstance(shape, int):
            shape = (shape,)

        self.shape = shape

        if isinstance(shape, dict):
            self._num_actors = sum(shape.values())
        else:
            self._num_actors = sum(shape)

        self._actor_cls = actor_cls
        self._actors = []
        runtime_env = copy.deepcopy(runtime_env) or {}
        for i in range(self._num_actors):
            env_vars = {**runtime_env.get("env_vars", {}), "RAY_ACTOR_MESH_RANK": str(i)}
            ray_actor_cls = ray.remote(
                runtime_env={**runtime_env, "env_vars": env_vars},
            )(actor_cls)
            actor = ray_actor_cls.remote(*args, **kwargs)
            self._actors.append(actor)
    
    @property
    def methods(self) -> type[T]:
        return ActorMeshProxy(self)

    @property
    def actors(self) -> List[ActorHandle]:
        return self._actors
    
    @property
    def num_actors(self) -> int:
        return self._num_actors

@overload
def method(method: Callable[Concatenate[Any, P], R]) -> MeshMethod[P, R]:
    ...

@overload
def method(dispatch_fn: Optional[Callable] = None,) -> Callable[[Callable[Concatenate[Any, P], R]], MeshMethod[P, R]]:
    ...

def method(fn=None, dispatch_fn=None, **kwargs):

    def decorator(f):
        decorated = ray.method(**kwargs)(f)
        decorated._dispatch_fn = dispatch_fn
        return decorated

    if fn is not None:
        return decorator(fn)
    return decorator


def choose(self, *args, **kwargs):
# Choosing randomly right now, this is bad and needs to be improved
actor = random.choice(self.actor_mesh.actors)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jjyao @stephanie-wang @kevin85421 Do you know of a good way to do a better job with the load balancing here with existing Ray Core APIs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we currently expose number of pending tasks per actor but it wouldn't be very hard to add.

Seems also useful to have a version of choose that lets you pass in a specific index? It'd be nice to be able to this:

mesh = ...
ref = mesh[1, 0].foo.remote(...)

Eventually could also use it for sub-meshes:

2d_mesh = ...
sub_mesh = mesh[1, :]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the API that lets Ray choose the least-loaded could be reused for Ray Data actor pools. (cc @jjyao)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That, or we promote the pool itself to a core API and implement it in cpp directly without adding a new low-level API. This will likely be needed to satisfy data FT requirements (retry on any actor in the pool)

Copy link
Member

@kevin85421 kevin85421 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind adding some examples that simulate data communication between two SPMD systems, where each SPMD is an ActorMesh (ex: weight syncing)?

results.append(method.remote(*args, **kwargs))
return results

def choose(self, *args, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be push-based or pull-based? Or should we support both? If the use case involves data, pull-based might be a better option because of long-tail tasks.

Copy link
Contributor Author

@pcmoritz pcmoritz Jul 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking two very reasonable implementations here are:

  • Power of two scheduling, where you choose two actors and then dispatch onto the one with the least number of tasks in the queue
  • Having a queue and dispatch new tasks as soon as an actor is finished (that's probably what you mean with pull based) -- the challenge with that with the current API is that if object references are exposed to the user directly, there is no way for us to know when a task finishes. So maybe that would need to be implemented at the Ray Core level where we probably have the required callbacks.

Besides these, there should also be ways to make this more user defined (as an example, think about processing LoRA requests, you might want to schedule them onto actors that already have the LoRA adapter loaded).

self._actors.append(actor)

@property
def methods(self) -> type[T]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However it is slightly clunky having to always type mesh.methods.{method}, so ideally I'd like to get rid of this property and provide the actor methods directly as methods on the ActorMesh[ActorClass] class.

+1

Copy link

This pull request has been automatically marked as stale because it has not had
any activity for 14 days. It will be closed in another 14 days if no further activity occurs.
Thank you for your contributions.

You can always ask for help on our discussion forum or Ray's public slack channel.

If you'd like to keep this open, just leave any comment, and the stale label will be removed.

@github-actions github-actions bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Aug 12, 2025
@ray-gardener ray-gardener bot added train Ray Train Related Issue core Issues that should be addressed in Ray Core labels Aug 15, 2025
@github-actions github-actions bot added unstale A PR that has been marked unstale. It will not get marked stale again if this label is on it. and removed stale The issue is stale. It will be closed within 7 days unless there are further conversation labels Aug 15, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Issues that should be addressed in Ray Core train Ray Train Related Issue unstale A PR that has been marked unstale. It will not get marked stale again if this label is on it.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants