Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions ray-operator/apis/ray/v1/raycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ type RayClusterStatus struct {
// Important: Run "make" to regenerate code after modifying this file
// Status reflects the status of the cluster
State ClusterState `json:"state,omitempty"`
// ReadyWorkerReplicas indicates how many worker replicas are ready in the cluster
ReadyWorkerReplicas int32 `json:"readyWorkerReplicas,omitempty"`
// AvailableWorkerReplicas indicates how many replicas are available in the cluster
AvailableWorkerReplicas int32 `json:"availableWorkerReplicas,omitempty"`
// DesiredWorkerReplicas indicates overall desired replicas claimed by the user at the cluster level.
Expand Down
2 changes: 2 additions & 0 deletions ray-operator/apis/ray/v1alpha1/raycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ type RayClusterStatus struct {
// Important: Run "make" to regenerate code after modifying this file
// Status reflects the status of the cluster
State ClusterState `json:"state,omitempty"`
// ReadyWorkerReplicas indicates how many worker replicas are ready in the cluster
ReadyWorkerReplicas int32 `json:"readyWorkerReplicas,omitempty"`
// AvailableWorkerReplicas indicates how many replicas are available in the cluster
AvailableWorkerReplicas int32 `json:"availableWorkerReplicas,omitempty"`
// DesiredWorkerReplicas indicates overall desired replicas claimed by the user at the cluster level.
Expand Down
6 changes: 6 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayclusters.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayservices.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 16 additions & 6 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,13 +400,22 @@ func (r *RayClusterReconciler) inconsistentRayClusterStatus(ctx context.Context,
oldStatus.State, newStatus.State, oldStatus.Reason, newStatus.Reason))
return true
}
if oldStatus.AvailableWorkerReplicas != newStatus.AvailableWorkerReplicas || oldStatus.DesiredWorkerReplicas != newStatus.DesiredWorkerReplicas ||
oldStatus.MinWorkerReplicas != newStatus.MinWorkerReplicas || oldStatus.MaxWorkerReplicas != newStatus.MaxWorkerReplicas {
if oldStatus.ReadyWorkerReplicas != newStatus.ReadyWorkerReplicas ||
oldStatus.AvailableWorkerReplicas != newStatus.AvailableWorkerReplicas ||
oldStatus.DesiredWorkerReplicas != newStatus.DesiredWorkerReplicas ||
oldStatus.MinWorkerReplicas != newStatus.MinWorkerReplicas ||
oldStatus.MaxWorkerReplicas != newStatus.MaxWorkerReplicas {
logger.Info("inconsistentRayClusterStatus", "detect inconsistency", fmt.Sprintf(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: KubeRay introduced structured logging in KubeRay v1.1.0, which logs messages in JSON format. Hence, we are trying to reduce the use of fmt.Sprintf. Instead, we want to use key-value pairs. For example,

logger.Info(""inconsistentRayClusterStatus", "detect inconsistency", $KEY1, $VAL1, $KEY2, $VAL2, ...)

so that the logging tools can extract the information from the JSON easily. This is not the scope of this PR. It is not necessary to update this in this PR.

"old AvailableWorkerReplicas: %d, new AvailableWorkerReplicas: %d, old DesiredWorkerReplicas: %d, new DesiredWorkerReplicas: %d, "+
"old MinWorkerReplicas: %d, new MinWorkerReplicas: %d, old MaxWorkerReplicas: %d, new MaxWorkerReplicas: %d",
oldStatus.AvailableWorkerReplicas, newStatus.AvailableWorkerReplicas, oldStatus.DesiredWorkerReplicas, newStatus.DesiredWorkerReplicas,
oldStatus.MinWorkerReplicas, newStatus.MinWorkerReplicas, oldStatus.MaxWorkerReplicas, newStatus.MaxWorkerReplicas))
"old ReadyWorkerReplicas: %d, new ReadyWorkerReplicas: %d, "+
"old AvailableWorkerReplicas: %d, new AvailableWorkerReplicas: %d, "+
"old DesiredWorkerReplicas: %d, new DesiredWorkerReplicas: %d, "+
"old MinWorkerReplicas: %d, new MinWorkerReplicas: %d, "+
"old MaxWorkerReplicas: %d, new MaxWorkerReplicas: %d",
oldStatus.ReadyWorkerReplicas, newStatus.ReadyWorkerReplicas,
oldStatus.AvailableWorkerReplicas, newStatus.AvailableWorkerReplicas,
oldStatus.DesiredWorkerReplicas, newStatus.DesiredWorkerReplicas,
oldStatus.MinWorkerReplicas, newStatus.MinWorkerReplicas,
oldStatus.MaxWorkerReplicas, newStatus.MaxWorkerReplicas))
return true
}
if !reflect.DeepEqual(oldStatus.Endpoints, newStatus.Endpoints) || !reflect.DeepEqual(oldStatus.Head, newStatus.Head) {
Expand Down Expand Up @@ -1222,6 +1231,7 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra
return nil, err
}

newInstance.Status.ReadyWorkerReplicas = utils.CalculateReadyReplicas(runtimePods)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my own understanding, we can't use the existing AvailableWorkerReplicas field because that only counts "running" pods and not necessarily "ready" pods, is that correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

newInstance.Status.AvailableWorkerReplicas = utils.CalculateAvailableReplicas(runtimePods)
newInstance.Status.DesiredWorkerReplicas = utils.CalculateDesiredReplicas(ctx, newInstance)
newInstance.Status.MinWorkerReplicas = utils.CalculateMinReplicas(newInstance)
Expand Down
22 changes: 14 additions & 8 deletions ray-operator/controllers/ray/raycluster_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1530,6 +1530,7 @@ func TestInconsistentRayClusterStatus(t *testing.T) {
timeNow := metav1.Now()
oldStatus := rayv1.RayClusterStatus{
State: rayv1.Ready,
ReadyWorkerReplicas: 1,
AvailableWorkerReplicas: 1,
DesiredWorkerReplicas: 1,
MinWorkerReplicas: 1,
Expand Down Expand Up @@ -1564,42 +1565,47 @@ func TestInconsistentRayClusterStatus(t *testing.T) {
newStatus.Reason = "new reason"
assert.True(t, r.inconsistentRayClusterStatus(ctx, oldStatus, *newStatus))

// Case 3: `AvailableWorkerReplicas` is different => return true
// Case 3: `ReadyWorkerReplicas` is different => return true
newStatus = oldStatus.DeepCopy()
newStatus.ReadyWorkerReplicas = oldStatus.ReadyWorkerReplicas + 1
assert.True(t, r.inconsistentRayClusterStatus(ctx, oldStatus, *newStatus))

// Case 4: `AvailableWorkerReplicas` is different => return true
newStatus = oldStatus.DeepCopy()
newStatus.AvailableWorkerReplicas = oldStatus.AvailableWorkerReplicas + 1
assert.True(t, r.inconsistentRayClusterStatus(ctx, oldStatus, *newStatus))

// Case 4: `DesiredWorkerReplicas` is different => return true
// Case 5: `DesiredWorkerReplicas` is different => return true
newStatus = oldStatus.DeepCopy()
newStatus.DesiredWorkerReplicas = oldStatus.DesiredWorkerReplicas + 1
assert.True(t, r.inconsistentRayClusterStatus(ctx, oldStatus, *newStatus))

// Case 5: `MinWorkerReplicas` is different => return true
// Case 6: `MinWorkerReplicas` is different => return true
newStatus = oldStatus.DeepCopy()
newStatus.MinWorkerReplicas = oldStatus.MinWorkerReplicas + 1
assert.True(t, r.inconsistentRayClusterStatus(ctx, oldStatus, *newStatus))

// Case 6: `MaxWorkerReplicas` is different => return true
// Case 7: `MaxWorkerReplicas` is different => return true
newStatus = oldStatus.DeepCopy()
newStatus.MaxWorkerReplicas = oldStatus.MaxWorkerReplicas + 1
assert.True(t, r.inconsistentRayClusterStatus(ctx, oldStatus, *newStatus))

// Case 7: `Endpoints` is different => return true
// Case 8: `Endpoints` is different => return true
newStatus = oldStatus.DeepCopy()
newStatus.Endpoints["fakeEndpoint"] = "10009"
assert.True(t, r.inconsistentRayClusterStatus(ctx, oldStatus, *newStatus))

// Case 8: `Head` is different => return true
// Case 9: `Head` is different => return true
newStatus = oldStatus.DeepCopy()
newStatus.Head.PodIP = "test head pod ip"
assert.True(t, r.inconsistentRayClusterStatus(ctx, oldStatus, *newStatus))

// Case 9: `LastUpdateTime` is different => return false
// Case 10: `LastUpdateTime` is different => return false
newStatus = oldStatus.DeepCopy()
newStatus.LastUpdateTime = &metav1.Time{Time: timeNow.Add(time.Hour)}
assert.False(t, r.inconsistentRayClusterStatus(ctx, oldStatus, *newStatus))

// Case 10: `ObservedGeneration` is different => return false
// Case 11: `ObservedGeneration` is different => return false
newStatus = oldStatus.DeepCopy()
newStatus.ObservedGeneration = oldStatus.ObservedGeneration + 1
assert.False(t, r.inconsistentRayClusterStatus(ctx, oldStatus, *newStatus))
Expand Down
19 changes: 19 additions & 0 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,25 @@ func CalculateMaxReplicas(cluster *rayv1.RayCluster) int32 {
return count
}

// CalculateReadyReplicas calculates ready worker replicas at the cluster level
// A worker is ready if its Pod has a PodCondition with type == Ready and status == True
func CalculateReadyReplicas(pods corev1.PodList) int32 {
count := int32(0)
for _, pod := range pods.Items {
if val, ok := pod.Labels["ray.io/node-type"]; !ok || val != string(rayv1.WorkerNode) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use RayNodeTypeLabelKey instead?

continue
}
for _, cond := range pod.Status.Conditions {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use IsRunningAndReady instead?

func IsRunningAndReady(pod *corev1.Pod) bool {

if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue {
count++
break
}
}
}

return count
}

// CalculateAvailableReplicas calculates available worker replicas at the cluster level
// A worker is available if its Pod is running
func CalculateAvailableReplicas(pods corev1.PodList) int32 {
Expand Down
20 changes: 18 additions & 2 deletions ray-operator/controllers/ray/utils/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,12 @@ func TestCalculateAvailableReplicas(t *testing.T) {
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
},
},
},
},
{
Expand All @@ -285,6 +291,12 @@ func TestCalculateAvailableReplicas(t *testing.T) {
},
Status: corev1.PodStatus{
Phase: corev1.PodPending,
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionFalse,
},
},
},
},
{
Expand All @@ -300,8 +312,12 @@ func TestCalculateAvailableReplicas(t *testing.T) {
},
},
}
count := CalculateAvailableReplicas(podList)
assert.Equal(t, count, int32(1), "expect 1 available replica")

availableCount := CalculateAvailableReplicas(podList)
assert.Equal(t, availableCount, int32(1), "expect 1 available replica")

readyCount := CalculateReadyReplicas(podList)
assert.Equal(t, readyCount, int32(1), "expect 1 ready replica")
}

func TestFindContainerPort(t *testing.T) {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.