Skip to content

[RayJob] Add spec.backoffLimit for retrying RayJobs with new clusters #2192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 1, 2024

Conversation

andrewsykim
Copy link
Member

Why are these changes needed?

Add a new field spec.backOffLimit to RayJob for retrying failed jobs. A retry involves deleting and recreating the RayCluster.

Marking WIP since I haven't added tests yet

Related issue number

Fixes #1902

Checks

  • I've made sure the tests are passing.
  • Testing Strategy
    • Unit tests
    • Manual tests
    • This PR is not tested :(

@kevin85421
Copy link
Member

I just had a quick glance, and I think we should reuse the code path of the existing state machine. To elaborate, we can add a new state rayv1.JobDeploymentStatusRestarting, which goes through a similar path as rayv1.JobDeploymentStatusSuspending. In addition, all the state machine transition should be finished in the switch rayJobInstance.Status.JobDeploymentStatus statement. Currently, restartRayJobOnFailure is not in the switch statement.

case rayv1.JobDeploymentStatusSuspending, rayv1.JobDeploymentStatusRestarting:
    // Delete RayCluster and submitter K8s Job
    ...
    // Reset the RayCluster and Ray job related status.
    ...
    // Reset the JobStatus to JobStatusNew
    ...
    // Transition the JobDeploymentStatus to `Suspended` if the status is `Suspending`.
    // Transition the JobDeploymentStatus to `New` if the status is `Restarting`.
    rayJobInstance.Status.JobDeploymentStatus = ...

@kevin85421 kevin85421 self-assigned this Jun 13, 2024
@andrewsykim
Copy link
Member Author

@kevin85421 the reason I didn't use suspend is that I don't think it will play nicely with Kueue because suspend means each retry goes to the back of the job queue instead of immediate retry on failure

@andrewsykim
Copy link
Member Author

Actually,I think I misunderstood what you said, we can use the same state machine logic for suspend without actually suspending the RayJob, will update the PR to do that. I'll also move restartRayJobOnFailure into the switch statement

@andrewsykim andrewsykim changed the title [WIP][RayJob] Add spec.backoffLimit for retrying RayJobs [WIP][RayJob] Add spec.backoffLimit for retrying RayJobs with new clusters Jun 13, 2024

// Succeeded is the number of times this job succeeded.
// +kubebuilder:default:=0
Succeeded *int32 `json:"succeeded,omitempty"`
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Succeeded is always 0 or 1 for now, but this could be useful in the future if we want a success policy siimlar to Job: https://kubernetes.io/docs/concepts/workloads/controllers/job/#success-policy

@andrewsykim
Copy link
Member Author

@kevin85421 I incorporated your feedback, if the overall approach looks good to you I can polish the PR and add tests.

So far I've manually verified this works:

$ kubectl get rayjob -w                                                                                                              
NAME            JOB STATUS   DEPLOYMENT STATUS   RAY CLUSTER NAME                 START TIME             END TIME   AGE                                                                                            
rayjob-sample                Initializing        rayjob-sample-raycluster-qhdx2   2024-06-13T14:42:08Z              6s                                                                                             
rayjob-sample                Running             rayjob-sample-raycluster-qhdx2   2024-06-13T14:42:08Z              76s                                                                                            
rayjob-sample   PENDING      Running             rayjob-sample-raycluster-qhdx2   2024-06-13T14:42:08Z              78s                                                                                            
rayjob-sample   FAILED       Failed              rayjob-sample-raycluster-qhdx2   2024-06-13T14:42:08Z   2024-06-13T14:43:32Z   84s                                                                                
rayjob-sample   FAILED       Restarting          rayjob-sample-raycluster-qhdx2   2024-06-13T14:42:08Z   2024-06-13T14:43:32Z   84s                                                                                
rayjob-sample                                                                     2024-06-13T14:42:08Z   2024-06-13T14:43:32Z   84s
rayjob-sample                Initializing        rayjob-sample-raycluster-mqt2l   2024-06-13T14:43:32Z   2024-06-13T14:43:32Z   84s
rayjob-sample                Running             rayjob-sample-raycluster-mqt2l   2024-06-13T14:43:32Z   2024-06-13T14:43:32Z   110s
rayjob-sample   PENDING      Running             rayjob-sample-raycluster-mqt2l   2024-06-13T14:43:32Z   2024-06-13T14:43:32Z   112s
rayjob-sample   FAILED       Failed              rayjob-sample-raycluster-mqt2l   2024-06-13T14:43:32Z   2024-06-13T14:44:09Z   2m1s
rayjob-sample   FAILED       Restarting          rayjob-sample-raycluster-mqt2l   2024-06-13T14:43:32Z   2024-06-13T14:44:09Z   2m1s
rayjob-sample                                                                     2024-06-13T14:43:32Z   2024-06-13T14:44:09Z   2m1s
rayjob-sample                Initializing        rayjob-sample-raycluster-x576d   2024-06-13T14:44:09Z   2024-06-13T14:44:09Z   2m1s
rayjob-sample                Running             rayjob-sample-raycluster-x576d   2024-06-13T14:44:09Z   2024-06-13T14:44:09Z   2m27s
rayjob-sample   PENDING      Running             rayjob-sample-raycluster-x576d   2024-06-13T14:44:09Z   2024-06-13T14:44:09Z   2m28s
rayjob-sample   RUNNING      Running             rayjob-sample-raycluster-x576d   2024-06-13T14:44:09Z   2024-06-13T14:44:09Z   2m34s
rayjob-sample   FAILED       Failed              rayjob-sample-raycluster-x576d   2024-06-13T14:44:09Z   2024-06-13T14:44:45Z   2m37s
rayjob-sample   FAILED       Restarting          rayjob-sample-raycluster-x576d   2024-06-13T14:44:09Z   2024-06-13T14:44:45Z   2m37s
rayjob-sample                                                                     2024-06-13T14:44:09Z   2024-06-13T14:44:45Z   2m37s
rayjob-sample                Initializing        rayjob-sample-raycluster-89284   2024-06-13T14:44:45Z   2024-06-13T14:44:45Z   2m37s
rayjob-sample                Running             rayjob-sample-raycluster-89284   2024-06-13T14:44:45Z   2024-06-13T14:44:45Z   3m3s
rayjob-sample   PENDING      Running             rayjob-sample-raycluster-89284   2024-06-13T14:44:45Z   2024-06-13T14:44:45Z   3m5s
rayjob-sample   FAILED       Failed              rayjob-sample-raycluster-89284   2024-06-13T14:44:45Z   2024-06-13T14:45:22Z   3m14s
rayjob-sample   FAILED       Restarting          rayjob-sample-raycluster-89284   2024-06-13T14:44:45Z   2024-06-13T14:45:22Z   3m14s
rayjob-sample                                                                     2024-06-13T14:44:45Z   2024-06-13T14:45:22Z   3m14s
rayjob-sample                Initializing        rayjob-sample-raycluster-xdgv9   2024-06-13T14:45:22Z   2024-06-13T14:45:22Z   3m14s

rayJobInstance.Status.JobStatus == rayv1.JobStatusFailed &&
rayJobInstance.Status.Failed != nil &&
*rayJobInstance.Status.Failed < *rayJobInstance.Spec.BackoffLimit+1 {
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusRestarting
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can transition the status from JobDeploymentStatusRunning to JobDeploymentStatusRestarting directly. If we update the status here, it may enter the if rayJobInstance.Spec.ShutdownAfterJobFinishes && len(rayJobInstance.Spec.ClusterSelector) == 0 statement.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can transition the status from JobDeploymentStatusRunning to JobDeploymentStatusRestarting directly.

I think it'll be confusing to skip the JobDeploymentStatusFailed state though. Job going directly from Running -> Restarting could imply the job was restarted for another reason

If we update the status here, it may enter the if rayJobInstance.Spec.ShutdownAfterJobFinishes && len(rayJobInstance.Spec.ClusterSelector) == 0 statement.

This is a good catch. Should we instead just add a check for JobDeploymentStatus == JobDeploymentStatusComplete here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think rayv1.JobDeploymentStatusComplete and rayv1.JobDeploymentStatusFailed should be terminal states, meaning that no state transitions from these two states to any other states are allowed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe consider adding a Failing status, so the state transition would be Running -> Failing -> Restarting -> New ... -> Failed or Complete. This is how the Spark operator does it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed Restarting -> Retrying and updating state flow to Running -> Retrying -> New


if rayJobInstance.Spec.BackoffLimit != nil && *rayJobInstance.Spec.BackoffLimit > 0 &&
rayJobInstance.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusFailed &&
rayJobInstance.Status.JobStatus == rayv1.JobStatusFailed &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to include the status of JobStatusFailed here? For example, there is a user who collects the driver logs by reading the submitter K8s Job's STDOUT / STDERR. If the submitter K8s job fails but the Ray job succeeds, should we restart? The user doesn't collect any driver logs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I thought JobStatusFailed only occurs if the job failed from Ray API, but it wouldn't work if it's also Failed if the submitter job fails. Will update to only check JobDeploymentStatus

@andrewsykim andrewsykim force-pushed the ray-job-retry branch 3 times, most recently from 7a41e17 to e7f5fda Compare June 16, 2024 15:45
@andrewsykim andrewsykim changed the title [WIP][RayJob] Add spec.backoffLimit for retrying RayJobs with new clusters [RayJob] Add spec.backoffLimit for retrying RayJobs with new clusters Jun 16, 2024
@andrewsykim andrewsykim force-pushed the ray-job-retry branch 2 times, most recently from cc185b6 to 14277b9 Compare June 17, 2024 03:21
// Specifies the number of retries before marking this job failed.
// Each retry creates a new RayCluster.
// +kubebuilder:default:=0
BackoffLimit *int32 `json:"backoffLimit,omitempty"`
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we enforce shutdownAfterJobFinishes=true when setting backoffLimit > 0? Or should we still respect it but only for the last retry?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the later one for now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, I think this is already the case then

@andrewsykim andrewsykim force-pushed the ray-job-retry branch 2 times, most recently from e7a803c to 55ca9f2 Compare June 17, 2024 14:12
// Specifies the number of retries before marking this job failed.
// Each retry creates a new RayCluster.
// +kubebuilder:default:=0
BackoffLimit *int32 `json:"backoffLimit,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, what's the reason to use *int32 instead of int32? This PR checks whether it is nil or not multiple times.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was being safe in case a user updates the kuberay-operator version before they update the CRD manifest. In that case all the code for backoffLimit is guarded by rayJobInstance.Spec.BackoffLimit != nil and never runs. Also, it effects the omitempty behavior, if it's not a pointer we will omit the field if set to 0.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it more, this maybe doesn't need to be a pointer. Unlike with numOfHosts, if we default backOffLimit to 0 there should be no changes in behavior if there's version skew between kuberay-operator and the CRD. So I guess this mainly boils down to the omitempty behavior. This would need to be a pointer if we defaulted backoffLimit to a value > 1 though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevin85421 I'm keeping this as a pointer per the recommendation in the Kubernetes API conventions for optoinal / omitempty fields: https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#optional-vs-required

// Specifies the number of retries before marking this job failed.
// Each retry creates a new RayCluster.
// +kubebuilder:default:=0
BackoffLimit *int32 `json:"backoffLimit,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the later one for now?

@andrewsykim andrewsykim force-pushed the ray-job-retry branch 7 times, most recently from d3c6f6b to 9756e46 Compare June 27, 2024 15:07
}
}

if jobDeploymentStatus == rayv1.JobDeploymentStatusFailed &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move the logic to

			if jobInfo.JobStatus == rayv1.JobStatusFailed {
				jobDeploymentStatus = rayv1.JobDeploymentStatusFailed
				reason = rayv1.AppFailed
				failedCount++
			}

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

failedCount++
rayJob.Status.Failed = ptr.To[int32](failedCount)

if rayJob.Spec.BackoffLimit != nil && *rayJob.Spec.BackoffLimit > 0 &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also check whether BackoffLimit is smaller than 0 or not in validateRayJobSpec, and then we don't need to check *rayJob.Spec.BackoffLimit > 0 here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -706,6 +734,21 @@ func (r *RayJobReconciler) checkK8sJobAndUpdateStatusIfNeeded(ctx context.Contex
rayJob.Status.Reason = rayv1.SubmissionFailed
rayJob.Status.Message = fmt.Sprintf("Job submission has failed. Reason: %s. Message: %s", cond.Reason, cond.Message)
}

failedCount := int32(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some comments

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@andrewsykim andrewsykim force-pushed the ray-job-retry branch 3 times, most recently from 360f86b to 912e8b9 Compare June 28, 2024 17:19
failedCount++
rayJob.Status.Failed = ptr.To[int32](failedCount)

if rayJob.Spec.BackoffLimit != nil && *rayJob.Spec.BackoffLimit > 0 &&
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's 3 different places where we check for retry now:

  1. checkK8sJobAndUpdateStatusIfNeeded
  2. checkActiveDeadlineAndUpdateStatusIfNeeded
  3. Reconcile -- inside rayv1.JobDeploymentStatusRunning case

I'd like to refactor this code a bit so that we only need to check for retry in one place in a future PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we just update Failed / Succeeded and transition to Retrying if needed in updateRayJobStatus? Then, we don't need to update Failed / Succeeded and check backoffLimit in the switch rayJobInstance.Status.JobDeploymentStatus statement.

func (r *RayJobReconciler) updateRayJobStatus(ctx context.Context, oldRayJob *rayv1.RayJob, newRayJob *rayv1.RayJob) error {
	logger := ctrl.LoggerFrom(ctx)
	oldRayJobStatus := oldRayJob.Status
	newRayJobStatus := newRayJob.Status
        if newRayJobStatus.JobDeploymentStatus == rayv1.JobDeploymentStatusFailed {
            // (1) Update `Failed` 
            // (2) Check backoffLimit: update the status to `Retrying` if needed.
        } else if newRayJobStatus.JobDeploymentStatus == rayv1.JobDeploymentStatusComplete {
            // (1) Update `Succeeded`
        }


        ...
	return nil
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that updateRayJobStatus doesn't have any logic for the RayJob state machine and only does equality check + the API call. I think adding the backoffLimit logic here would be more confusing in the end because we're pushing the statemachine logic outside the switch in Reconcile.

I removed case 2) for checkActiveDeadlineAndUpdateStatusIfNeeded so there's only 2 places where backoffLimit is checked. I think this is fine for now. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may prefer to add logic in updateRayJobStatus. It's fine to update the status in this function because it is the only place to update the CR status. Maintaining retry in a single place also enables us to easily understand when we should and should not retry. Sorry for the last-minute change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree it's better to keep it in a single place. How about a function like checkBackoffLimt() before updateRayJobStatus() that checks for retries and updates the deployment status? I had this in the initial version but moved it into the switch case

Copy link
Member

@kevin85421 kevin85421 Jun 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about a function like checkBackoffLimt() before updateRayJobStatus() that checks for retries and updates the deployment status?

SGTM

I had this in the initial version but moved it into the switch case

My bad. Try to add a GIF from my GIPHY Chrome extensions to this comment, but I failed lol

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finally figure it out

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha nice :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added new method checkBackoffLimitAndUpdateStatusIfNeeded to consolidate retry checks into a single place

Signed-off-by: Andrew Sy Kim <andrewsy@google.com>
@andrewsykim andrewsykim force-pushed the ray-job-retry branch 4 times, most recently from 6783a5b to 37e1542 Compare July 1, 2024 04:13
@@ -339,6 +346,9 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
}

// check for retries and update deployment status to Retrying
r.checkBackoffLimitAndUpdateStatusIfNeeded(ctx, rayJobInstance)

// This is the only place where we update the RayJob status. Please do NOT add any code
// between the above switch statement and the following code.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Please do NOT add any code between checkBackoffLimitAndUpdateStatusIfNeeded and the following code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, but is the comment really relevant anymore?

if rayJob.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusFailed && rayJob.Spec.BackoffLimit != nil && *rayJob.Status.Failed < *rayJob.Spec.BackoffLimit+1 {
logger.Info("RayJob is eligible for retry, setting JobDeploymentStatus to Retrying",
"backoffLimit", *rayJob.Spec.BackoffLimit, "succeeded", *rayJob.Status.Succeeded, "failed", rayJob.Status.Failed)
rayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusRetrying
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also check the reason for failure. If the reason is DeadlineExceeded, we should not retry.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I'll also have a follow-up PR to add tests for active deadline seconds + backoff limit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it more, it's kind of weird having to check DeadlineExceeded inside checkBackoffLimitAndUpdateStatusIfNeeded, it should be agnostic to the reason.

What do you think about calling checkBackoffLimitAndUpdateStatusIfNeeded twice inside the Running switch case:

  1. Right after calling checkK8sJobAndUpdateStatusIfNeeded
  2. At the end of the Running switch case

And we can exclude it after checkActiveDeadlineAndUpdateStatusIfNeeded. I think this will be less error prone since we're not checking retry for all future cases as well

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the latest commit: a7bc5cd

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may prefer to call checkBackoffLimitAndUpdateStatusIfNeeded once, but I am completely fine with merging this PR for now. I will open a follow up PR to update it a bit.

…try checks into a single place

Signed-off-by: Andrew Sy Kim <andrewsy@google.com>
…itch case

Signed-off-by: Andrew Sy Kim <andrewsy@google.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature] Make RayJob recover automatically from K8S submitter job and Ray cluster head node failures
2 participants