Skip to content

[core][refactor] move NodeManager::KillWorker to WorkerInterface::KillAsync for better testability #54068

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 7, 2025

Conversation

rueian
Copy link
Contributor

@rueian rueian commented Jun 25, 2025

Why are these changes needed?

Following the suggestion in #53562 (comment), this PR moves NodeManager::KillWorker to WorkerInterface::Kill so that we can mock the method for testing if it is invoked or not, instead of spawning a real process to see if it is killed or not.

As a side effect, this will also eliminate the confusion between NodeManager::KillWorker and NodeManager::DestroyWorker.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@rueian rueian force-pushed the refactor-worker-kill branch 3 times, most recently from 6dac598 to d7132a3 Compare June 25, 2025 23:54
@rueian rueian force-pushed the refactor-worker-kill branch from d7132a3 to c7532d2 Compare June 27, 2025 05:22
@rueian rueian marked this pull request as ready for review June 27, 2025 06:33
@Copilot Copilot AI review requested due to automatic review settings June 27, 2025 06:33
@rueian rueian added the go add ONLY when ready to merge, run all tests label Jun 27, 2025
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors worker termination logic by moving NodeManager::KillWorker into the WorkerInterface and updating all consumers accordingly. It implements a new Kill method on Worker, updates NodeManager to call it, refreshes mocks and tests to support the change, and removes the old KillWorker helper.

  • Introduces WorkerInterface::Kill and implements it in Worker.
  • Updates NodeManager and tests to use the new Kill API.
  • Adjusts GCS client usage in local object manager tests and BUILD dependencies for mocks.

Reviewed Changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
src/ray/raylet/worker.h Added Kill to WorkerInterface
src/ray/raylet/worker.cc Implemented Worker::Kill with graceful and force modes
src/ray/raylet/test/util.h Expanded MockWorker to track Kill invocations
src/ray/raylet/test/node_manager_test.cc Updated NodeManager tests to use mock callbacks rather than real threads/processes
src/ray/raylet/test/local_object_manager_test.cc Switched to MockGcsClient and updated pointer type
src/ray/raylet/node_manager.h Removed old KillWorker and exposed HandleRequestWorkerLease
src/ray/raylet/node_manager.cc Replaced calls to KillWorker with worker->Kill
src/mock/ray/raylet/worker.h Added mock for new Kill method
BUILD.bazel Added :ray_mock to test dependencies
Comments suppressed due to low confidence (4)

src/ray/raylet/worker.h:49

  • Add a doc comment for the new Kill method in WorkerInterface to describe its behavior, parameters, and expected side effects for better maintainability.
  virtual void Kill(instrumented_io_context &io_service, bool force = false) = 0;

src/ray/raylet/node_manager.h:259

  • [nitpick] Consider passing request by const reference (const rpc::RequestWorkerLeaseRequest&) to avoid an unnecessary copy of the protobuf message.
  void HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest request,

src/ray/raylet/test/local_object_manager_test.cc:28

  • Verify that the include path for the mock GCS client is correct—an incorrect path here could cause test compilation failures.
#include "mock/ray/gcs/gcs_client/gcs_client.h"

BUILD.bazel:686

  • [nitpick] Ensure that the ray_mock target is defined and described in the BUILD file or a README so that its purpose and dependencies are clear to future maintainers.
        ":ray_mock",

rueian added 2 commits June 27, 2025 10:48
…ter testability

Signed-off-by: Rueian <rueian@anyscale.com>
Signed-off-by: Rueian <rueiancsie@gmail.com>
…ter testability

Signed-off-by: Rueian <rueian@anyscale.com>
Signed-off-by: Rueian <rueiancsie@gmail.com>
@rueian rueian force-pushed the refactor-worker-kill branch from c7532d2 to 68864a3 Compare June 27, 2025 17:49
@israbbani israbbani self-assigned this Jun 27, 2025
@rueian
Copy link
Contributor Author

rueian commented Jun 27, 2025

Hi @israbbani, this PR follows your suggestions to make WorkerInterface::Kill mockable. Please take a look! All tests, including tsan and ubsan, pass.

/// \param force true to kill immediately, false to give time for the worker to
/// clean up and exit gracefully.
/// \return Void.
void Kill(instrumented_io_context &io_service, bool force = false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document if Kill is idempotent

I think it is currently not because the PID could be reused after the process exits.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I think there isn't much we can do to address the PID reuse problem, making this Kill idempotent is simple. All we need is to set and check a flag if the method is invoked or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have made the Kill idempotent by adding the new killed_ flag, but I am figuring out if we could merge MarkDead with Kill and share their flags; otherwise, their semantics seem to be overlapping a bit. If we can, I will send a new PR for merging them.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I was going to suggest the same :)

Signed-off-by: Rueian <rueian@anyscale.com>
@rueian rueian force-pushed the refactor-worker-kill branch from d6ed584 to 1381cc2 Compare July 2, 2025 22:46
@@ -58,6 +59,36 @@ void Worker::MarkDead() { dead_ = true; }

bool Worker::IsDead() const { return dead_; }

void Worker::Kill(instrumented_io_context &io_service, bool force) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we name this KillAsync to make it clear that when the function returns, the worker may still be alive?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment on lines 63 to 66
if (killed_) { // TODO(rueian): could we just reuse the dead_ flag?
return; // If the worker is already killed by this Kill method, do nothing.
}
killed_ = true;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to use atomic fetch_add here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.

RayConfig::instance().kill_worker_timeout_milliseconds());
retry_timer->expires_from_now(retry_duration);
retry_timer->async_wait([retry_timer, worker](const boost::system::error_code &error) {
RAY_LOG(DEBUG) << "Send SIGKILL to worker, pid=" << worker->GetProcess().GetId();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
RAY_LOG(DEBUG) << "Send SIGKILL to worker, pid=" << worker->GetProcess().GetId();
RAY_LOG(DEBUG) << "Worker with PID=" << worker->GetProcess().GetId() << " did not exit after " << timeout << "ms, force killing with SIGKILL.";

this should be at least INFO-level IMO, maybe WARNING. if you agree please update it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

except this will mean it's logged every time on windows because there is no graceful kill above on windows... conditional log based on platform?

Copy link
Contributor

@israbbani israbbani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nits. Thanks for being so proactive about this. We should make the flag atomic and follow up with merging it with the dead_ flag to provide cleaner semantics like you suggested.

Comment on lines 158 to 162
/// Kill the worker. This is idempotent, that is, no effect starting from the second
/// call. \param io_service for scheduling the graceful period timer. \param force true
/// to kill immediately, false to give time for the worker to clean up and exit
/// gracefully. \return Void.
void Kill(instrumented_io_context &io_service, bool force = false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Kill the worker. This is idempotent, that is, no effect starting from the second
/// call. \param io_service for scheduling the graceful period timer. \param force true
/// to kill immediately, false to give time for the worker to clean up and exit
/// gracefully. \return Void.
void Kill(instrumented_io_context &io_service, bool force = false);
/// Kill the worker process. This is idempotent
/// \param io_service for scheduling the graceful period timer.
/// \param force true to kill immediately, false to give time for the worker to clean up and exit gracefully.
/// \return Void.
void Kill(instrumented_io_context &io_service, bool force = false);

@@ -58,6 +59,36 @@ void Worker::MarkDead() { dead_ = true; }

bool Worker::IsDead() const { return dead_; }

void Worker::Kill(instrumented_io_context &io_service, bool force) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment on lines 63 to 66
if (killed_) { // TODO(rueian): could we just reuse the dead_ flag?
return; // If the worker is already killed by this Kill method, do nothing.
}
killed_ = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.

@@ -297,6 +303,8 @@ class Worker : public WorkerInterface {
BundleID bundle_id_;
/// Whether the worker is dead.
bool dead_;
/// Whether the worker is killed by the Kill method.
bool killed_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be made atomic and be named killing_ instead of killed_. Technically, the process isn't killed successfully when this is set to true.

return;
}
#ifdef _WIN32
// TODO(mehrdadn): implement graceful process termination mechanism
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edoakes is this worth leaving in? Either we create an issue and make open it up for a windows community contribution or we should just remove the TODO.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- issue for a windows community contribution would be great!

Comment on lines 75 to 77
// If we're just cleaning up a single worker, allow it some time to clean
// up its state before force killing. The client socket will be closed
// and the worker struct will be freed after the timeout.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// If we're just cleaning up a single worker, allow it some time to clean
// up its state before force killing. The client socket will be closed
// and the worker struct will be freed after the timeout.
// Attempt to gracefully shutdown the worker before force killing it.

Signed-off-by: Rueian <rueian@anyscale.com>
@rueian rueian changed the title [core][refactor] move NodeManager::KillWorker to WorkerInterface::Kill for better testability [core][refactor] move NodeManager::KillWorker to WorkerInterface::KillAsync for better testability Jul 4, 2025
Copy link
Collaborator

@edoakes edoakes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Let's follow up to combine the killing_ and dead_ flags.

@@ -239,8 +239,7 @@ class NodeManagerTest : public ::testing::Test {
return std::make_shared<rpc::MockCoreWorkerClientInterface>();
}) {
RayConfig::instance().initialize(R"({
"raylet_liveness_self_check_interval_ms": 100,
"kill_worker_timeout_milliseconds": 10
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice! 🙌

@edoakes edoakes merged commit 4d9b323 into ray-project:master Jul 7, 2025
5 checks passed
@edoakes
Copy link
Collaborator

edoakes commented Jul 7, 2025

and can you also file a GH issue for the windows graceful process shutdown?

@rueian
Copy link
Contributor Author

rueian commented Jul 7, 2025

and can you also file a GH issue for the windows graceful process shutdown?

No problem. Filed here #54374.

elliot-barn pushed a commit that referenced this pull request Jul 7, 2025
…lAsync for better testability (#54068)

Following the suggestion in
#53562 (comment),
this PR moves `NodeManager::KillWorker` to `WorkerInterface::Kill` so
that we can mock the method for testing if it is invoked or not, instead
of spawning a real process to see if it is killed or not.

As a side effect, this will also eliminate the confusion between
`NodeManager::KillWorker` and `NodeManager::DestroyWorker`.

---------

Signed-off-by: Rueian <rueian@anyscale.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
rueian added a commit to rueian/ray that referenced this pull request Jul 8, 2025
…ace::KillAsync for better testability (ray-project#54068)"

This reverts commit 4d9b323.
edoakes pushed a commit that referenced this pull request Jul 21, 2025
## Why are these changes needed?

Following [the previous
discussion](#54068 (comment)),
this PR merges the flags used by `Worker::MarkDead` and
`Worker::KillAsync`.

Currently, `Worker::MarkDead` will only be called after we ask the
worker to shutdown via the `Exit` RPC. After the `Exit` RPC, the worker
should shut down by itself.

This change essentially prevents sending `SIGTERM` from the node manager
to a worker once its `Worker::MarkDead` method has been called, ensuring
that we don't interrupt its shutdown process.




## Related issue number

<!-- For example: "Closes #1234" -->

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: Rueian <rueian@anyscale.com>
Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com>
alimaazamat pushed a commit to alimaazamat/ray that referenced this pull request Jul 24, 2025
…oject#54377)

## Why are these changes needed?

Following [the previous
discussion](ray-project#54068 (comment)),
this PR merges the flags used by `Worker::MarkDead` and
`Worker::KillAsync`.

Currently, `Worker::MarkDead` will only be called after we ask the
worker to shutdown via the `Exit` RPC. After the `Exit` RPC, the worker
should shut down by itself.

This change essentially prevents sending `SIGTERM` from the node manager
to a worker once its `Worker::MarkDead` method has been called, ensuring
that we don't interrupt its shutdown process.

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: Rueian <rueian@anyscale.com>
Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com>
Signed-off-by: alimaazamat <alima.azamat2003@gmail.com>
ccmao1130 pushed a commit to ccmao1130/ray that referenced this pull request Jul 29, 2025
…lAsync for better testability (ray-project#54068)

Following the suggestion in
ray-project#53562 (comment),
this PR moves `NodeManager::KillWorker` to `WorkerInterface::Kill` so
that we can mock the method for testing if it is invoked or not, instead
of spawning a real process to see if it is killed or not.

As a side effect, this will also eliminate the confusion between
`NodeManager::KillWorker` and `NodeManager::DestroyWorker`.

---------

Signed-off-by: Rueian <rueian@anyscale.com>
Signed-off-by: ChanChan Mao <chanchanmao1130@gmail.com>
krishnakalyan3 pushed a commit to krishnakalyan3/ray that referenced this pull request Jul 30, 2025
…oject#54377)

## Why are these changes needed?

Following [the previous
discussion](ray-project#54068 (comment)),
this PR merges the flags used by `Worker::MarkDead` and
`Worker::KillAsync`.

Currently, `Worker::MarkDead` will only be called after we ask the
worker to shutdown via the `Exit` RPC. After the `Exit` RPC, the worker
should shut down by itself.

This change essentially prevents sending `SIGTERM` from the node manager
to a worker once its `Worker::MarkDead` method has been called, ensuring
that we don't interrupt its shutdown process.

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: Rueian <rueian@anyscale.com>
Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com>
Signed-off-by: Krishna Kalyan <krishnakalyan3@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants