-
Notifications
You must be signed in to change notification settings - Fork 6.7k
[core][refactor] move NodeManager::KillWorker to WorkerInterface::KillAsync for better testability #54068
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
6dac598
to
d7132a3
Compare
d7132a3
to
c7532d2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors worker termination logic by moving NodeManager::KillWorker
into the WorkerInterface
and updating all consumers accordingly. It implements a new Kill
method on Worker
, updates NodeManager
to call it, refreshes mocks and tests to support the change, and removes the old KillWorker
helper.
- Introduces
WorkerInterface::Kill
and implements it inWorker
. - Updates
NodeManager
and tests to use the newKill
API. - Adjusts GCS client usage in local object manager tests and BUILD dependencies for mocks.
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
src/ray/raylet/worker.h | Added Kill to WorkerInterface |
src/ray/raylet/worker.cc | Implemented Worker::Kill with graceful and force modes |
src/ray/raylet/test/util.h | Expanded MockWorker to track Kill invocations |
src/ray/raylet/test/node_manager_test.cc | Updated NodeManager tests to use mock callbacks rather than real threads/processes |
src/ray/raylet/test/local_object_manager_test.cc | Switched to MockGcsClient and updated pointer type |
src/ray/raylet/node_manager.h | Removed old KillWorker and exposed HandleRequestWorkerLease |
src/ray/raylet/node_manager.cc | Replaced calls to KillWorker with worker->Kill |
src/mock/ray/raylet/worker.h | Added mock for new Kill method |
BUILD.bazel | Added :ray_mock to test dependencies |
Comments suppressed due to low confidence (4)
src/ray/raylet/worker.h:49
- Add a doc comment for the new
Kill
method inWorkerInterface
to describe its behavior, parameters, and expected side effects for better maintainability.
virtual void Kill(instrumented_io_context &io_service, bool force = false) = 0;
src/ray/raylet/node_manager.h:259
- [nitpick] Consider passing
request
by const reference (const rpc::RequestWorkerLeaseRequest&
) to avoid an unnecessary copy of the protobuf message.
void HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest request,
src/ray/raylet/test/local_object_manager_test.cc:28
- Verify that the include path for the mock GCS client is correct—an incorrect path here could cause test compilation failures.
#include "mock/ray/gcs/gcs_client/gcs_client.h"
BUILD.bazel:686
- [nitpick] Ensure that the
ray_mock
target is defined and described in the BUILD file or a README so that its purpose and dependencies are clear to future maintainers.
":ray_mock",
…ter testability Signed-off-by: Rueian <rueian@anyscale.com> Signed-off-by: Rueian <rueiancsie@gmail.com>
…ter testability Signed-off-by: Rueian <rueian@anyscale.com> Signed-off-by: Rueian <rueiancsie@gmail.com>
c7532d2
to
68864a3
Compare
Hi @israbbani, this PR follows your suggestions to make |
src/ray/raylet/worker.h
Outdated
/// \param force true to kill immediately, false to give time for the worker to | ||
/// clean up and exit gracefully. | ||
/// \return Void. | ||
void Kill(instrumented_io_context &io_service, bool force = false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document if Kill
is idempotent
I think it is currently not because the PID could be reused after the process exits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I think there isn't much we can do to address the PID reuse problem, making this Kill
idempotent is simple. All we need is to set and check a flag if the method is invoked or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have made the Kill
idempotent by adding the new killed_
flag, but I am figuring out if we could merge MarkDead
with Kill
and share their flags; otherwise, their semantics seem to be overlapping a bit. If we can, I will send a new PR for merging them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, I was going to suggest the same :)
Signed-off-by: Rueian <rueian@anyscale.com>
d6ed584
to
1381cc2
Compare
src/ray/raylet/worker.cc
Outdated
@@ -58,6 +59,36 @@ void Worker::MarkDead() { dead_ = true; } | |||
|
|||
bool Worker::IsDead() const { return dead_; } | |||
|
|||
void Worker::Kill(instrumented_io_context &io_service, bool force) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we name this KillAsync
to make it clear that when the function returns, the worker may still be alive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
src/ray/raylet/worker.cc
Outdated
if (killed_) { // TODO(rueian): could we just reuse the dead_ flag? | ||
return; // If the worker is already killed by this Kill method, do nothing. | ||
} | ||
killed_ = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to use atomic fetch_add
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1.
src/ray/raylet/worker.cc
Outdated
RayConfig::instance().kill_worker_timeout_milliseconds()); | ||
retry_timer->expires_from_now(retry_duration); | ||
retry_timer->async_wait([retry_timer, worker](const boost::system::error_code &error) { | ||
RAY_LOG(DEBUG) << "Send SIGKILL to worker, pid=" << worker->GetProcess().GetId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RAY_LOG(DEBUG) << "Send SIGKILL to worker, pid=" << worker->GetProcess().GetId(); | |
RAY_LOG(DEBUG) << "Worker with PID=" << worker->GetProcess().GetId() << " did not exit after " << timeout << "ms, force killing with SIGKILL."; |
this should be at least INFO
-level IMO, maybe WARNING
. if you agree please update it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
except this will mean it's logged every time on windows because there is no graceful kill above on windows... conditional log based on platform?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small nits. Thanks for being so proactive about this. We should make the flag atomic and follow up with merging it with the dead_
flag to provide cleaner semantics like you suggested.
src/ray/raylet/worker.h
Outdated
/// Kill the worker. This is idempotent, that is, no effect starting from the second | ||
/// call. \param io_service for scheduling the graceful period timer. \param force true | ||
/// to kill immediately, false to give time for the worker to clean up and exit | ||
/// gracefully. \return Void. | ||
void Kill(instrumented_io_context &io_service, bool force = false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Kill the worker. This is idempotent, that is, no effect starting from the second | |
/// call. \param io_service for scheduling the graceful period timer. \param force true | |
/// to kill immediately, false to give time for the worker to clean up and exit | |
/// gracefully. \return Void. | |
void Kill(instrumented_io_context &io_service, bool force = false); | |
/// Kill the worker process. This is idempotent | |
/// \param io_service for scheduling the graceful period timer. | |
/// \param force true to kill immediately, false to give time for the worker to clean up and exit gracefully. | |
/// \return Void. | |
void Kill(instrumented_io_context &io_service, bool force = false); |
src/ray/raylet/worker.cc
Outdated
@@ -58,6 +59,36 @@ void Worker::MarkDead() { dead_ = true; } | |||
|
|||
bool Worker::IsDead() const { return dead_; } | |||
|
|||
void Worker::Kill(instrumented_io_context &io_service, bool force) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
src/ray/raylet/worker.cc
Outdated
if (killed_) { // TODO(rueian): could we just reuse the dead_ flag? | ||
return; // If the worker is already killed by this Kill method, do nothing. | ||
} | ||
killed_ = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1.
src/ray/raylet/worker.h
Outdated
@@ -297,6 +303,8 @@ class Worker : public WorkerInterface { | |||
BundleID bundle_id_; | |||
/// Whether the worker is dead. | |||
bool dead_; | |||
/// Whether the worker is killed by the Kill method. | |||
bool killed_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be made atomic and be named killing_ instead of killed_. Technically, the process isn't killed successfully when this is set to true.
return; | ||
} | ||
#ifdef _WIN32 | ||
// TODO(mehrdadn): implement graceful process termination mechanism |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@edoakes is this worth leaving in? Either we create an issue and make open it up for a windows community contribution or we should just remove the TODO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree -- issue for a windows community contribution would be great!
src/ray/raylet/worker.cc
Outdated
// If we're just cleaning up a single worker, allow it some time to clean | ||
// up its state before force killing. The client socket will be closed | ||
// and the worker struct will be freed after the timeout. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// If we're just cleaning up a single worker, allow it some time to clean | |
// up its state before force killing. The client socket will be closed | |
// and the worker struct will be freed after the timeout. | |
// Attempt to gracefully shutdown the worker before force killing it. |
Signed-off-by: Rueian <rueian@anyscale.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Let's follow up to combine the killing_
and dead_
flags.
@@ -239,8 +239,7 @@ class NodeManagerTest : public ::testing::Test { | |||
return std::make_shared<rpc::MockCoreWorkerClientInterface>(); | |||
}) { | |||
RayConfig::instance().initialize(R"({ | |||
"raylet_liveness_self_check_interval_ms": 100, | |||
"kill_worker_timeout_milliseconds": 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice! 🙌
and can you also file a GH issue for the windows graceful process shutdown? |
No problem. Filed here #54374. |
…lAsync for better testability (#54068) Following the suggestion in #53562 (comment), this PR moves `NodeManager::KillWorker` to `WorkerInterface::Kill` so that we can mock the method for testing if it is invoked or not, instead of spawning a real process to see if it is killed or not. As a side effect, this will also eliminate the confusion between `NodeManager::KillWorker` and `NodeManager::DestroyWorker`. --------- Signed-off-by: Rueian <rueian@anyscale.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
…ace::KillAsync for better testability (ray-project#54068)" This reverts commit 4d9b323.
## Why are these changes needed? Following [the previous discussion](#54068 (comment)), this PR merges the flags used by `Worker::MarkDead` and `Worker::KillAsync`. Currently, `Worker::MarkDead` will only be called after we ask the worker to shutdown via the `Exit` RPC. After the `Exit` RPC, the worker should shut down by itself. This change essentially prevents sending `SIGTERM` from the node manager to a worker once its `Worker::MarkDead` method has been called, ensuring that we don't interrupt its shutdown process. ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: Rueian <rueian@anyscale.com> Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com>
…oject#54377) ## Why are these changes needed? Following [the previous discussion](ray-project#54068 (comment)), this PR merges the flags used by `Worker::MarkDead` and `Worker::KillAsync`. Currently, `Worker::MarkDead` will only be called after we ask the worker to shutdown via the `Exit` RPC. After the `Exit` RPC, the worker should shut down by itself. This change essentially prevents sending `SIGTERM` from the node manager to a worker once its `Worker::MarkDead` method has been called, ensuring that we don't interrupt its shutdown process. ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: Rueian <rueian@anyscale.com> Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com> Signed-off-by: alimaazamat <alima.azamat2003@gmail.com>
…lAsync for better testability (ray-project#54068) Following the suggestion in ray-project#53562 (comment), this PR moves `NodeManager::KillWorker` to `WorkerInterface::Kill` so that we can mock the method for testing if it is invoked or not, instead of spawning a real process to see if it is killed or not. As a side effect, this will also eliminate the confusion between `NodeManager::KillWorker` and `NodeManager::DestroyWorker`. --------- Signed-off-by: Rueian <rueian@anyscale.com> Signed-off-by: ChanChan Mao <chanchanmao1130@gmail.com>
…oject#54377) ## Why are these changes needed? Following [the previous discussion](ray-project#54068 (comment)), this PR merges the flags used by `Worker::MarkDead` and `Worker::KillAsync`. Currently, `Worker::MarkDead` will only be called after we ask the worker to shutdown via the `Exit` RPC. After the `Exit` RPC, the worker should shut down by itself. This change essentially prevents sending `SIGTERM` from the node manager to a worker once its `Worker::MarkDead` method has been called, ensuring that we don't interrupt its shutdown process. ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: Rueian <rueian@anyscale.com> Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com> Signed-off-by: Krishna Kalyan <krishnakalyan3@gmail.com>
Why are these changes needed?
Following the suggestion in #53562 (comment), this PR moves
NodeManager::KillWorker
toWorkerInterface::Kill
so that we can mock the method for testing if it is invoked or not, instead of spawning a real process to see if it is killed or not.As a side effect, this will also eliminate the confusion between
NodeManager::KillWorker
andNodeManager::DestroyWorker
.Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.