Skip to content

[Core][Labels Scheduling]Finalize the new node affinity scheduling with node labels API in the Python worker #36419

@larrylian

Description

@larrylian

Description

After our offline discussion, we have come up with the following plan. This API plan will review and finalization by more people in the future.

API:

 MyActor.options(
        scheduling_strategy=NodeLabelSchedulingStrategy(
                {
                    "region": IN("us"), 
                    "gpu_type": IN("A100")
                }
)).remote()
 MyActor.options(
        scheduling_strategy=NodeLabelSchedulingStrategy(
                hard = {
                    "region": IN("us")
                }
                soft =  {
                    "gpu_type": IN("A100")
                }
)).remote()

If OR semantics are needed later, the API can be extended as follows. This is backwards compatible.

MyActor.options(
        scheduling_strategy=NodeLabelSchedulingStrategy(
            hard=[
                {
                    "region": IN("us"), 
                    "gpu_type": IN("A100")
                },
                {
                    "region": IN("asia", "europe"), 
                    "gpu_type": IN("T100")
                 },
            ],
            soft=[{}, {}]
        )
).remote()

usage scenaries:
1.Schedule nodes with label "region" in "us".

scheduling_strategy=NodeLabelSchedulingStrategy({
"region": IN("us")
})

2.Schedule nodes with label "region" not in "us".

scheduling_strategy=NodeLabelSchedulingStrategy({
"region": NOT_IN("us")
})

3.Schedule nodes with label "gpu_type" as "A100" or "T100".

scheduling_strategy=NodeLabelSchedulingStrategy({
"gpu_type": IN("A100", "T100")
})

4.Schedule nodes with IP address "1.1.1.1" or "2.2.2.2".

scheduling_strategy=NodeLabelSchedulingStrategy({
"RAY_NODE_IP": IN("1.1.1.1", "2.2.2.2")
})

5.Schedule nodes with NODE ID "xxxxx".

scheduling_strategy=NodeLabelSchedulingStrategy({
"RAY_NODE_IP": IN("xxxxx")
})

6.Schedule nodes with label "region" in "us" and "gpu_type" in "A100".

scheduling_strategy=NodeLabelSchedulingStrategy({
"region": IN("us"),
"gpu_type": IN("A100")
})

7.Schedule nodes with existing "gpu_type" label and preferably "A100".

scheduling_strategy=NodeLabelSchedulingStrategy({
"gpu_type": EXISTS()
},
soft={
"gpu_type": IN("A100")
})

8.Schedule nodes with existing "gpu_type" label and not "A100".

scheduling_strategy=NodeLabelSchedulingStrategy({
"gpu_type": [EXISTS(), NOT_IN("A100")]
})

9.If scheduling to "azone" as "a-zone", schedule nodes with "gpu_type" as "A100".
If scheduling to "azone" as "b-zone", schedule nodes with "gpu_type" as "T100".

scheduling_strategy=NodeLabelSchedulingStrategy([
{
"azone": IN("a-zone"),
"gpu_type": IN("A100")
},
{
"azone": IN("b-zone"),
"gpu_type": IN("T100")
}
])

Records of the original discussion plan:

Plan 1:

use case:

# Scheduled to a node with a specific IP.
actor_1 = Actor.options(
        scheduling_strategy=node_affinity(label_in(key="node_ip", values=["xxx.xxx.xx.xx"], is_soft=false))
    ).remote()

# Try to schedule to the node with A100/P100 graphics card. If not, schedule to other nodes.
actor_1 = Actor.options(
        scheduling_strategy=node_affinity(label_in("gpu_type", ["A100", "P100"], is_soft=true))
    ).remote()

# Do not schedule to the two nodes whose node id is "xxxxxxx"\"aaaaaaaa".
actor_1 = Actor.options(
        scheduling_strategy=node_affinity(label_not_in("node_id", ["xxxxxxx", "aaaaaaaa"], is_soft=false))
    ).remote()

# Schedule to the node with the key label exist "gpu_type".
actor_1 = Actor.options(
        scheduling_strategy=node_affinity(label_exist("gpu_type"))
    ).remote()

# Don't schedule to the node with the key label exist "gpu_type".
object_ref = Task.options(
        scheduling_strategy=node_affinity(label_does_not_exist("gpu_type", is_soft=false))
    ).remote()

# Multiple label expressions can be filled in at the same time, and the relationship between the expressions is "and". The dispatch must satisfy each expression.
# The actual meaning of this expression is that it must be scheduled to a node with a GPU, and as much as possible to a node with a GPU of the A100 type.
actor_1 = Actor.options(
        scheduling_strategy=node_affinity([
            label_in("gpu_type", ["A100"], true),
            label_exists("gpu_type", false)
        ])
    ).remote()

Implementation:

@PublicAPI(stability="beta")
class NodeAffinitySchedulingStrategy:
    def __init__(self, node_id: str = None, soft: bool = False, _spill_on_unavailable: bool = False, match_expressions = []):
        # This will be removed once we standardize on node id being hex string.
        if not isinstance(node_id, str):
            node_id = node_id.hex()

        self.node_id = node_id
        self.soft = soft
        self.match_expressions = match_expressions
        self._spill_on_unavailable = _spill_on_unavailable

SchedulingStrategyT = Union[
    None,
    str,  # Literal["DEFAULT", "SPREAD"]
    PlacementGroupSchedulingStrategy,
    NodeAffinitySchedulingStrategy,
]

class LabelMatchOperator(Enum):
    IN = "IN"
    NOT_IN = "NOT_IN"
    EXISTS = "EXISTS"
    DOES_NOT_EXIST = "DOES_NOT_EXIST"

class LabelMatchExpression:
    def __init__(self, key: str, operator: LabelMatchOperator,
                 values: List[str], soft: bool):
        self.key = key
        self.operator = operator
        self.values = values
        self.soft = soft

def label_in(key, values, is_soft=False):
    return LabelMatchExpression(key, LabelMatchOperator.IN,values, is_soft)

def label_not_in(key, values, is_soft=False):
    return LabelMatchExpression(key, LabelMatchOperator.NOT_IN,values, is_soft)


def label_exists(key, is_soft=False):
    return LabelMatchExpression(key, LabelMatchOperator.EXISTS, [], is_soft)


def label_does_not_exist(key, is_soft=False):
    return LabelMatchExpression(key, LabelMatchOperator.DOES_NOT_EXIST, [], is_soft)

def node_affinity(match_expressions: List[LabelMatchExpression]):
    return NodeAffinitySchedulingStrategy(match_expressions= match_expressions)

Plan 2:

Replace "node_affinity" with "NodeAffinitySchedulingStrategy".
use case:

actor_1 = Actor.options(
        scheduling_strategy=NodeAffinitySchedulingStrategy(label_in(key="node_ip", values=["xxx.xxx.xx.xx"], is_soft=false))
    ).remote()

actor_1 = Actor.options(
        scheduling_strategy=NodeAffinitySchedulingStrategy(label_in("gpu_type", ["A100", "P100"], is_soft=true))
    ).remote()

actor_1 = Actor.options(
        scheduling_strategy=NodeAffinitySchedulingStrategy(label_not_in("node_id", ["xxxxxxx", "aaaaaaaa"], is_soft=false))
    ).remote()

actor_1 = Actor.options(
        scheduling_strategy=NodeAffinitySchedulingStrategy(label_exist("gpu_type"))
    ).remote()

object_ref = Task.options(
        scheduling_strategy=NodeAffinitySchedulingStrategy(label_does_not_exist("gpu_type", is_soft=false))
    ).remote()

actor_1 = Actor.options(
        scheduling_strategy=NodeAffinitySchedulingStrategy([
            label_in("gpu_type", ["A100"], true),
            label_exists("gpu_type", false)
        ])
    ).remote()

Plan 3

use case:

actor = Actor.options(
    labels={
        "node_id": LabelIn("aaa", "bbb"),
        "gpu_type": LabelNotIn("A100", "P100", soft=True),
        "availability_zone": LabelExists(soft=True),
        "market": LabelIn("spot"),
        "taints": LabelDoesNotExist,
    }
).remote()

Metadata

Metadata

Labels

P1Issue that should be fixed within a few weekscoreIssues that should be addressed in Ray Corecore-schedulerenhancementRequest for new feature and/or capability

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions