-
Notifications
You must be signed in to change notification settings - Fork 603
[RayJob] Add spec.backoffLimit for retrying RayJobs with new clusters #2192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
7bfd4dc
to
c6cbaba
Compare
c6cbaba
to
7956c7f
Compare
I just had a quick glance, and I think we should reuse the code path of the existing state machine. To elaborate, we can add a new state case rayv1.JobDeploymentStatusSuspending, rayv1.JobDeploymentStatusRestarting:
// Delete RayCluster and submitter K8s Job
...
// Reset the RayCluster and Ray job related status.
...
// Reset the JobStatus to JobStatusNew
...
// Transition the JobDeploymentStatus to `Suspended` if the status is `Suspending`.
// Transition the JobDeploymentStatus to `New` if the status is `Restarting`.
rayJobInstance.Status.JobDeploymentStatus = ... |
@kevin85421 the reason I didn't use suspend is that I don't think it will play nicely with Kueue because suspend means each retry goes to the back of the job queue instead of immediate retry on failure |
Actually,I think I misunderstood what you said, we can use the same state machine logic for suspend without actually suspending the RayJob, will update the PR to do that. I'll also move restartRayJobOnFailure into the switch statement |
|
||
// Succeeded is the number of times this job succeeded. | ||
// +kubebuilder:default:=0 | ||
Succeeded *int32 `json:"succeeded,omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Succeeded is always 0 or 1 for now, but this could be useful in the future if we want a success policy siimlar to Job: https://kubernetes.io/docs/concepts/workloads/controllers/job/#success-policy
7956c7f
to
8cbc930
Compare
@kevin85421 I incorporated your feedback, if the overall approach looks good to you I can polish the PR and add tests. So far I've manually verified this works:
|
8cbc930
to
c020f61
Compare
rayJobInstance.Status.JobStatus == rayv1.JobStatusFailed && | ||
rayJobInstance.Status.Failed != nil && | ||
*rayJobInstance.Status.Failed < *rayJobInstance.Spec.BackoffLimit+1 { | ||
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusRestarting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can transition the status from JobDeploymentStatusRunning
to JobDeploymentStatusRestarting
directly. If we update the status here, it may enter the if rayJobInstance.Spec.ShutdownAfterJobFinishes && len(rayJobInstance.Spec.ClusterSelector) == 0
statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can transition the status from JobDeploymentStatusRunning to JobDeploymentStatusRestarting directly.
I think it'll be confusing to skip the JobDeploymentStatusFailed state though. Job going directly from Running -> Restarting could imply the job was restarted for another reason
If we update the status here, it may enter the if rayJobInstance.Spec.ShutdownAfterJobFinishes && len(rayJobInstance.Spec.ClusterSelector) == 0 statement.
This is a good catch. Should we instead just add a check for JobDeploymentStatus == JobDeploymentStatusComplete
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think rayv1.JobDeploymentStatusComplete
and rayv1.JobDeploymentStatusFailed
should be terminal states, meaning that no state transitions from these two states to any other states are allowed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe consider adding a Failing status, so the state transition would be Running -> Failing -> Restarting -> New ... -> Failed or Complete. This is how the Spark operator does it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed Restarting -> Retrying and updating state flow to Running -> Retrying -> New
|
||
if rayJobInstance.Spec.BackoffLimit != nil && *rayJobInstance.Spec.BackoffLimit > 0 && | ||
rayJobInstance.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusFailed && | ||
rayJobInstance.Status.JobStatus == rayv1.JobStatusFailed && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to include the status of JobStatusFailed
here? For example, there is a user who collects the driver logs by reading the submitter K8s Job's STDOUT / STDERR. If the submitter K8s job fails but the Ray job succeeds, should we restart? The user doesn't collect any driver logs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I thought JobStatusFailed only occurs if the job failed from Ray API, but it wouldn't work if it's also Failed if the submitter job fails. Will update to only check JobDeploymentStatus
7a41e17
to
e7f5fda
Compare
cc185b6
to
14277b9
Compare
// Specifies the number of retries before marking this job failed. | ||
// Each retry creates a new RayCluster. | ||
// +kubebuilder:default:=0 | ||
BackoffLimit *int32 `json:"backoffLimit,omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we enforce shutdownAfterJobFinishes=true
when setting backoffLimit > 0? Or should we still respect it but only for the last retry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the later one for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good, I think this is already the case then
e7a803c
to
55ca9f2
Compare
// Specifies the number of retries before marking this job failed. | ||
// Each retry creates a new RayCluster. | ||
// +kubebuilder:default:=0 | ||
BackoffLimit *int32 `json:"backoffLimit,omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, what's the reason to use *int32
instead of int32
? This PR checks whether it is nil or not multiple times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was being safe in case a user updates the kuberay-operator version before they update the CRD manifest. In that case all the code for backoffLimit is guarded by rayJobInstance.Spec.BackoffLimit != nil
and never runs. Also, it effects the omitempty behavior, if it's not a pointer we will omit the field if set to 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about it more, this maybe doesn't need to be a pointer. Unlike with numOfHosts
, if we default backOffLimit to 0 there should be no changes in behavior if there's version skew between kuberay-operator and the CRD. So I guess this mainly boils down to the omitempty behavior. This would need to be a pointer if we defaulted backoffLimit to a value > 1 though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kevin85421 I'm keeping this as a pointer per the recommendation in the Kubernetes API conventions for optoinal / omitempty fields: https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#optional-vs-required
// Specifies the number of retries before marking this job failed. | ||
// Each retry creates a new RayCluster. | ||
// +kubebuilder:default:=0 | ||
BackoffLimit *int32 `json:"backoffLimit,omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the later one for now?
d3c6f6b
to
9756e46
Compare
9756e46
to
9a1bf62
Compare
} | ||
} | ||
|
||
if jobDeploymentStatus == rayv1.JobDeploymentStatusFailed && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe move the logic to
if jobInfo.JobStatus == rayv1.JobStatusFailed {
jobDeploymentStatus = rayv1.JobDeploymentStatusFailed
reason = rayv1.AppFailed
failedCount++
}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
failedCount++ | ||
rayJob.Status.Failed = ptr.To[int32](failedCount) | ||
|
||
if rayJob.Spec.BackoffLimit != nil && *rayJob.Spec.BackoffLimit > 0 && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also check whether BackoffLimit is smaller than 0 or not in validateRayJobSpec
, and then we don't need to check *rayJob.Spec.BackoffLimit > 0
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -706,6 +734,21 @@ func (r *RayJobReconciler) checkK8sJobAndUpdateStatusIfNeeded(ctx context.Contex | |||
rayJob.Status.Reason = rayv1.SubmissionFailed | |||
rayJob.Status.Message = fmt.Sprintf("Job submission has failed. Reason: %s. Message: %s", cond.Reason, cond.Message) | |||
} | |||
|
|||
failedCount := int32(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add some comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
360f86b
to
912e8b9
Compare
failedCount++ | ||
rayJob.Status.Failed = ptr.To[int32](failedCount) | ||
|
||
if rayJob.Spec.BackoffLimit != nil && *rayJob.Spec.BackoffLimit > 0 && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's 3 different places where we check for retry now:
- checkK8sJobAndUpdateStatusIfNeeded
- checkActiveDeadlineAndUpdateStatusIfNeeded
- Reconcile -- inside rayv1.JobDeploymentStatusRunning case
I'd like to refactor this code a bit so that we only need to check for retry in one place in a future PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we just update Failed
/ Succeeded
and transition to Retrying
if needed in updateRayJobStatus
? Then, we don't need to update Failed
/ Succeeded
and check backoffLimit
in the switch rayJobInstance.Status.JobDeploymentStatus
statement.
func (r *RayJobReconciler) updateRayJobStatus(ctx context.Context, oldRayJob *rayv1.RayJob, newRayJob *rayv1.RayJob) error {
logger := ctrl.LoggerFrom(ctx)
oldRayJobStatus := oldRayJob.Status
newRayJobStatus := newRayJob.Status
if newRayJobStatus.JobDeploymentStatus == rayv1.JobDeploymentStatusFailed {
// (1) Update `Failed`
// (2) Check backoffLimit: update the status to `Retrying` if needed.
} else if newRayJobStatus.JobDeploymentStatus == rayv1.JobDeploymentStatusComplete {
// (1) Update `Succeeded`
}
...
return nil
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that updateRayJobStatus doesn't have any logic for the RayJob state machine and only does equality check + the API call. I think adding the backoffLimit logic here would be more confusing in the end because we're pushing the statemachine logic outside the switch in Reconcile.
I removed case 2) for checkActiveDeadlineAndUpdateStatusIfNeeded
so there's only 2 places where backoffLimit is checked. I think this is fine for now. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may prefer to add logic in updateRayJobStatus
. It's fine to update the status in this function because it is the only place to update the CR status. Maintaining retry in a single place also enables us to easily understand when we should and should not retry. Sorry for the last-minute change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree it's better to keep it in a single place. How about a function like checkBackoffLimt()
before updateRayJobStatus()
that checks for retries and updates the deployment status? I had this in the initial version but moved it into the switch case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about a function like checkBackoffLimt() before updateRayJobStatus() that checks for retries and updates the deployment status?
SGTM
I had this in the initial version but moved it into the switch case
My bad. Try to add a GIF from my GIPHY Chrome extensions to this comment, but I failed lol
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finally figure it out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haha nice :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added new method checkBackoffLimitAndUpdateStatusIfNeeded
to consolidate retry checks into a single place
912e8b9
to
8383b77
Compare
8383b77
to
fc9dfd9
Compare
Signed-off-by: Andrew Sy Kim <andrewsy@google.com>
6783a5b
to
37e1542
Compare
@@ -339,6 +346,9 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) | |||
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil | |||
} | |||
|
|||
// check for retries and update deployment status to Retrying | |||
r.checkBackoffLimitAndUpdateStatusIfNeeded(ctx, rayJobInstance) | |||
|
|||
// This is the only place where we update the RayJob status. Please do NOT add any code | |||
// between the above switch statement and the following code. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Please do NOT add any code between checkBackoffLimitAndUpdateStatusIfNeeded
and the following code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, but is the comment really relevant anymore?
if rayJob.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusFailed && rayJob.Spec.BackoffLimit != nil && *rayJob.Status.Failed < *rayJob.Spec.BackoffLimit+1 { | ||
logger.Info("RayJob is eligible for retry, setting JobDeploymentStatus to Retrying", | ||
"backoffLimit", *rayJob.Spec.BackoffLimit, "succeeded", *rayJob.Status.Succeeded, "failed", rayJob.Status.Failed) | ||
rayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusRetrying |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also check the reason for failure. If the reason is DeadlineExceeded
, we should not retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I'll also have a follow-up PR to add tests for active deadline seconds + backoff limit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about it more, it's kind of weird having to check DeadlineExceeded
inside checkBackoffLimitAndUpdateStatusIfNeeded
, it should be agnostic to the reason.
What do you think about calling checkBackoffLimitAndUpdateStatusIfNeeded
twice inside the Running
switch case:
- Right after calling checkK8sJobAndUpdateStatusIfNeeded
- At the end of the Running switch case
And we can exclude it after checkActiveDeadlineAndUpdateStatusIfNeeded
. I think this will be less error prone since we're not checking retry for all future cases as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See the latest commit: a7bc5cd
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may prefer to call checkBackoffLimitAndUpdateStatusIfNeeded
once, but I am completely fine with merging this PR for now. I will open a follow up PR to update it a bit.
…try checks into a single place Signed-off-by: Andrew Sy Kim <andrewsy@google.com>
37e1542
to
95d8ab2
Compare
…itch case Signed-off-by: Andrew Sy Kim <andrewsy@google.com>
Why are these changes needed?
Add a new field
spec.backOffLimit
to RayJob for retrying failed jobs. A retry involves deleting and recreating the RayCluster.Marking WIP since I haven't added tests yet
Related issue number
Fixes #1902
Checks