-
Notifications
You must be signed in to change notification settings - Fork 615
[RayJob][Refactor] use ray job status
and ray job logs
to be tolerant of duplicated job submissions
#2579
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ray job logs
to capture logs and be tolerant to duplicated job submissionray job logs
to capture logs and be tolerant of duplicated job submissions
… to duplicated job submission Signed-off-by: Rueian <rueiancsie@gmail.com>
09ea70a
to
20d6d38
Compare
I used a similar workaround in here https://github.com/ray-project/kuberay/blob/master/benchmark/perf-tests/10000-rayjob/pytorch-mnist-rayjob.yaml#L23-L24 The problem is that with either solution, either |
Hi @andrewsykim, thanks for the workaround! I don't notice that before. Here is an example scenario of a duplicated submission with this PoC: outdated```sh ▶ kubectl logs -f rayjob-sample-9gvpq 2024-11-26 18:40:07,699 INFO cli.py:36 -- Job submission server address: http://rayjob-sample-raycluster-m9p8c-head-svc.default.svc.cluster.local:8265 Traceback (most recent call last): File "/home/ray/anaconda3/bin/ray", line 8, in sys.exit(main()) File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/scripts/scripts.py", line 2498, in main return cli() File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 1157, in __call__ return self.main(*args, **kwargs) File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 1078, in main rv = self.invoke(ctx) File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 1688, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 1688, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 1434, in invoke return ctx.invoke(self.callback, **ctx.params) File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 783, in invoke return __callback(*args, **kwargs) File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/cli_utils.py", line 54, in wrapper return func(*args, **kwargs) File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/autoscaler/_private/cli_logger.py", line 856, in wrapper return f(*args, **kwargs) File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/cli.py", line 272, in submit job_id = client.submit_job( File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/sdk.py", line 254, in submit_job self._raise_error(r) File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/dashboard_sdk.py", line 283, in _raise_error raise RuntimeError( RuntimeError: Request failed with status code 500: Traceback (most recent call last): File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/job_head.py", line 287, in submit_job resp = await job_agent_client.submit_job_internal(submit_request) File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/job_head.py", line 80, in submit_job_internal await self._raise_error(resp) File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/job_head.py", line 68, in _raise_error raise RuntimeError(f"Request failed with status code {status}: {error_text}.") RuntimeError: Request failed with status code 400: Traceback (most recent call last): File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/job_agent.py", line 45, in submit_job submission_id = await self.get_job_manager().submit_job( File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/job_manager.py", line 945, in submit_job raise ValueError( ValueError: Job with submission_id rayjob-sample-6tqdm already exists. Please use a different submission_id. . . 2024-11-26 18:40:08,975 INFO cli.py:36 -- Job submission server address: http://rayjob-sample-raycluster-m9p8c-head-svc.default.svc.cluster.local:8265 2024-11-26 18:40:03,772 INFO worker.py:1405 -- Using address 10.244.0.31:6379 set in the environment variable RAY_ADDRESS 2024-11-26 18:40:03,772 INFO worker.py:1540 -- Connecting to existing Ray cluster at address: 10.244.0.31:6379... 2024-11-26 18:40:03,778 INFO worker.py:1715 -- Connected to Ray cluster. View the dashboard at http://10.244.0.31:8265 test_counter got 1 test_counter got 2 test_counter got 3 test_counter got 4 test_counter got 5 2024-11-26 18:40:17,022 SUCC cli.py:60 -- ----------------------------------- 2024-11-26 18:40:17,023 SUCC cli.py:61 -- Job 'rayjob-sample-6tqdm' succeeded 2024-11-26 18:40:17,023 SUCC cli.py:62 -- ----------------------------------- ```Note that in this PoC, we do To summarize: Besides this PoC, I am also working on a lightweight submitter, where I will use the same approach but implement it in Go. I think it will probably be packaged into the same image of kuberay operator, so that we don't need to maintain another docker release. |
This lines up with the results I had. The problem is still dumping a stack trace from an exception, I think users will find that really confusing. I wonder if there's an option where we can run the first command but just pipe the output to |
I think we can pipe to ▶ kubectl logs -f rayjob-sample-rxw2m
2024-11-26 22:04:56,033 INFO cli.py:36 -- Job submission server address: http://rayjob-sample-raycluster-5npzh-head-svc.default.svc.cluster.local:8265
2024-11-26 22:04:59,053 INFO worker.py:1405 -- Using address 10.244.0.41:6379 set in the environment variable RAY_ADDRESS
2024-11-26 22:04:59,053 INFO worker.py:1540 -- Connecting to existing Ray cluster at address: 10.244.0.41:6379...
2024-11-26 22:04:59,058 INFO worker.py:1715 -- Connected to Ray cluster. View the dashboard at http://10.244.0.41:8265
test_counter got 1
test_counter got 2
test_counter got 3
test_counter got 4
test_counter got 5
2024-11-26 22:05:12,112 SUCC cli.py:60 -- -----------------------------------
2024-11-26 22:05:12,112 SUCC cli.py:61 -- Job 'rayjob-sample-964gd' succeeded
2024-11-26 22:05:12,113 SUCC cli.py:62 -- ----------------------------------- while stack trace can still be printed for other errors:```sh ▶ kubectl logs -f rayjob-sample-5vpwg Traceback (most recent call last): File "/home/ray/anaconda3/lib/python3.8/site-packages/urllib3/connection.py", line 174, in _new_conn conn = connection.create_connection( File "/home/ray/anaconda3/lib/python3.8/site-packages/urllib3/util/connection.py", line 72, in create_connection for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM): File "/home/ray/anaconda3/lib/python3.8/socket.py", line 918, in getaddrinfo for res in _socket.getaddrinfo(host, port, family, type, proto, flags): socket.gaierror: [Errno -2] Name or service not knownDuring handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last):
|
…ion error message Signed-off-by: Rueian <rueiancsie@gmail.com>
@@ -115,6 +119,10 @@ func GetK8sJobCommand(rayJobInstance *rayv1.RayJob) ([]string, error) { | |||
} | |||
k8sJobCommand = append(k8sJobCommand, commandSlice...) | |||
|
|||
k8sJobCommand = append(k8sJobCommand, "2>&1", "|", "grep", "-zv", "'Please use a different submission_id'") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this obsure any output of the normal case where ray job submit
doesn't fail due to duplicate submission ID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it will only hide the output of duplicate submission. Any other cases will still be printed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example:
echo "
normal output 1
normal output 2
normal output 3
" | grep -vz 'Please use a different submission_id'
still outputs:
normal output 1
normal output 2
normal output 3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there's no duplicate submission ID, will it print the logs twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It won’t because we use --no-wait flag when submitting the job. The flag skips log tailing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is an output example of a normal case:
2024-12-02 18:19:21,800 INFO cli.py:36 -- Job submission server address: http://rayjob-sample-raycluster-bc774-head-svc.default.svc.cluster.local:8265
2024-12-02 18:19:22,405 SUCC cli.py:60 -- ------------------------------------------------
2024-12-02 18:19:22,405 SUCC cli.py:61 -- Job 'rayjob-sample-wvtph' submitted successfully
2024-12-02 18:19:22,405 SUCC cli.py:62 -- ------------------------------------------------
2024-12-02 18:19:22,405 INFO cli.py:285 -- Next steps
2024-12-02 18:19:22,405 INFO cli.py:286 -- Query the logs of the job:
2024-12-02 18:19:22,405 INFO cli.py:288 -- ray job logs rayjob-sample-wvtph
2024-12-02 18:19:22,405 INFO cli.py:290 -- Query the status of the job:
2024-12-02 18:19:22,405 INFO cli.py:292 -- ray job status rayjob-sample-wvtph
2024-12-02 18:19:22,405 INFO cli.py:294 -- Request the job to be stopped:
2024-12-02 18:19:22,406 INFO cli.py:296 -- ray job stop rayjob-sample-wvtph
2024-12-02 18:19:23,626 INFO cli.py:36 -- Job submission server address: http://rayjob-sample-raycluster-bc774-head-svc.default.svc.cluster.local:8265
2024-12-02 18:19:34,567 INFO worker.py:1405 -- Using address 10.244.0.12:6379 set in the environment variable RAY_ADDRESS
2024-12-02 18:19:34,568 INFO worker.py:1540 -- Connecting to existing Ray cluster at address: 10.244.0.12:6379...
2024-12-02 18:19:34,572 INFO worker.py:1715 -- Connected to Ray cluster. View the dashboard at http://10.244.0.12:8265
test_counter got 1
test_counter got 2
test_counter got 3
test_counter got 4
test_counter got 5
2024-12-02 18:19:47,758 SUCC cli.py:60 -- -----------------------------------
2024-12-02 18:19:47,758 SUCC cli.py:61 -- Job 'rayjob-sample-wvtph' succeeded
2024-12-02 18:19:47,758 SUCC cli.py:62 -- -----------------------------------
Worth mentioning that this may not be required if ray-project/ray#45498 was merged, but I think there was pushback |
We still need this PR if we want to maintain the same compatibility matrix between KubeRay and Ray. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatives to consider:
- Add an inline bash script that runs "ray job list" and grep for submission ID, then run either
ray job submit
orray job logs
- Add an inline bash script that runs
ray job status <submission_id>
, based on the exit code, runray job submit
orray job logs
@andrewsykim, I prefer the first one: ray job list | grep <submission_id> && ray job logs --follow <submission_id> || \
ray job submit --submission-id <submission_id> -- python job.py The second one, |
The nice thing about |
…d job submission Signed-off-by: Rueian <rueiancsie@gmail.com>
Updated. Now the full command looks like: if ray job status --address http://test-url test-job-id >/dev/null 2>&1 ;
then ray job logs --address http://test-url --follow test-job-id ;
else ray job submit --address http://test-url --submission-id test-job-id -- echo hello world ;
fi |
ray job logs
to capture logs and be tolerant of duplicated job submissionsray job status
and ray jog lobs
to be tolerant of duplicated job submissions
Thanks, that seems much cleaner IMO. Can you confirm that |
Thanks @rueian, can you think of any other reason why |
Yes, if the Ray head is not reachable, it will fail and run Actually |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just left some minor comments about code documentation.
address = "http://" + address | ||
} | ||
|
||
jobStatusCommand := []string{"ray", "job", "status", "--address", address, jobId, ">/dev/null", "2>&1"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add some comments here for why we run ray job status
first? (to avoid duplciate submission ID issue on retry)
jobStatusCommand := []string{"ray", "job", "status", "--address", address, jobId, ">/dev/null", "2>&1"} | ||
jobFollowCommand := []string{"ray", "job", "logs", "--address", address, "--follow", jobId} | ||
jobSubmitCommand := []string{"ray", "job", "submit", "--address", address} | ||
k8sJobCommand := append([]string{"if"}, jobStatusCommand...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you include the full example of what the command looks like in the comments? i.e.
if ray job status --address http://$RAY_ADDRESS >/dev/null 2>&1 ;
then ray job logs --address http://RAY_ADDRESS --follow test-job-id ;
else ray job submit --address http://RAY_ADDRESS --submission-id $RAY_JOB_SUBMISSION_ID -- ...;
fi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Comments are added.
…d job submission Signed-off-by: Rueian <rueiancsie@gmail.com>
LGTM, thanks @rueian -- will defer final review to @kevin85421 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ray job status
and ray jog lobs
to be tolerant of duplicated job submissionsray job status
and ray job logs
to be tolerant of duplicated job submissions
…rant of duplicated job submissions (ray-project#2579)
Why are these changes needed?
Use
ray job submit --no-wait + ray job logs --follow
to be tolerant of duplicated job submissions.In summary, we can use this shell script to submit to workaround duplicated submissions issue:
Related issue number
Checks