Skip to content

Conversation

rkooo567
Copy link
Contributor

@rkooo567 rkooo567 commented Apr 21, 2021

Why are these changes needed?

This is fully review-able

Recommended review workflow.

  • First, look at core_worker.cc and reference_count.cc to see how the new interface looks like.
  • Check subscriber.h and publisher.h.
  • And then review other parts.

The current interface is not ideal, but it is working pretty well with the current status. Let's iterate on the interface!

I verified this reduced the number of WaitForRefRemoved requests.

Related issue number

Close #14322
#14762

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@rkooo567 rkooo567 changed the title Generalize pubsub 4 [Pubsub] Generalize the pubsub interface so that reference counting can use it Apr 21, 2021
@rkooo567 rkooo567 changed the title [Pubsub] Generalize the pubsub interface so that reference counting can use it [Pubsub] Generalize the pubsub interface and adapt it for ref counting protocol Apr 21, 2021
@rkooo567
Copy link
Contributor Author

Current test failure should be temporary,.

Copy link
Contributor

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a usage first pass, I have a few high-level pubsub design questions before I dig deeper.

It looks like we're using a channel abstraction in order to be able to piggyback off of shared publisher <--> subscriber long-polling connections, where the publisher and subscriber interfaces expose the channel concept to the application code via the channel type and deal only with a generic PubMessage message type that oneofs across the messages for the different channels. This then entails pass-through methods to the subinterfaces for each channel, maintaining channel --> data structure maps internally, and leaking what is essentially a transport detail (the long-polling connection) to the application level by requiring publishers/subscribers to specify the channel type. I'm wondering if there's a better way to structure this such that the application code doesn't have to worry about channels and more of this logic is encoded in the type system so we have compile-time guarantees that things are 👌 .

What if we adopted a pattern similar to gRPC's shared transport channel, generalized from managing a 1-to-1 connection to managing 1-to-n connections, where we have a transport broker concept that encapsulates the publisher <--> subscriber long-polling connections. The subscriber interface for each pubsub instance takes a shared pointer to the broker on construction, and can therefore share the broker across different pubsub instances (e.g. separate subscriber objects for WAIT_FOR_OBJECT_EVICTION and WAIT_FOR_REF_REMOVED), where new publisher <--> subscriber connections can be registered by any of the pubsub instances at subscription time.

subscribers <--> broker <--transport--> broker <--> publishers

I think there could be a few advantages here:

  1. We could then expose a subscriber interface per pubsub instance, templated on the concrete message type (no need for a proto oneof message to be exposed to the application code, that could be isolated to the broker), which will give better compile-time checking of the application-level pubsub messages and will obviate the need for those passthrough methods in a top-level subscriber interface that requires a channel type to be specified.
  2. I think that you'd still need a proto-level channel type enum, but I think that you could get away with specializing the subscriber with the enum value itself. E.g. Subscriber<typename MessageID, typename Message, rpc::ChannelType channel_type> could be specialized as Subscriber<ObjectID, rpc::WaitForObjectEvictionMessage, rpc::WAIT_FOR_OBJECT_EVICTION>, where the channel_type is included when interacting with the broker (e.g. subscribe, unsubscribe). Dispatch of messages received in the long-poll response by the broker will still have to use a channel --> subscriber table, but that can be populated dynamically when the subscriber is constructed: broker_.RegisterSubscriber(channel_type, this).
  3. The transport-level optimization of sharing long-polling connections is pushed down as an implementation detail of the transport broker concept, and is a natural place to evolve other transport-level details, like switching to a bidi streaming connection or switching to a centralized message broker.
  4. Other transport-level things, like the core worker client pool and the subscriber address/port, can be moved to the broker. Publisher addresses will probably continue to be required by subscriber methods, unless we want the broker to always map worker or node IDs to concrete addresses (probably less efficient in some cases?).
  5. The current process of registering a new channel will be replaced by adding a new channel type to the proto enum, adding the channel's proto to the broker's pubsub message oneof, templating a subscriber with a new message type and the channel type, and reusing the existing broker. I don't think there should be much more required than that.

I think that you would have an analogous broker abstraction on the publisher side as well (encapsulating the cached long-polling connection, controlling the message batching), although I haven't thought it through as much since I think it should be simpler than the subscriber-side. My guess is that the publisher would be similarly templated, would similarly register itself with the publish-side broker on construction, and each publisher instance (one per channel type) would have its own subscription index. I think that the publisher's Subscriber would be internal to the publish-side broker.

The cons with this broker approach, as I see them:

  1. Broker concept is exposed to the application level, although what's needed to configure that concept (worker client pool, subscriber address/port, etc.) is already exposed to the application level, so I'm not sure if this is any worse?
  2. A shared transport-level broker will obviously have to be thread-safe, although that shouldn't be very difficult.
  3. Hidden dragons that I haven't thought of.

Lmk what you think about this approach and whether I'm missing anything! This is also something that could wait until a future PR, e.g. when I port the OBOD object location subscriptions or the GCS node/actor tables to this pubsub I'm sure that I'll have to refactor some stuff anyway.

@rkooo567
Copy link
Contributor Author

Thanks for the suggestion! Did I understand correctly the below API examples are what you are describing?

# either this
# manages the connection
broker_ 
# manages the tracking
wait_for_object_channel = Channel<ObjectID, MessageType>(broker)
wait_for_object_channel->interface();

# or this

subscriber_broker->Channel(WaitForObjectEviction)->interface()

@rkooo567
Copy link
Contributor Author

@jovany-wang

BTW, I prefer name the channels to OBJECT_EVICTION and REF_REMOVED(Remove the WAIT_FOR_ prefix.).

Yeah it makes sense. But we probably want to have a prefix to distinguish worker-based channel and gcs-based channels? What about WORKER_OBJECT_EVICTION?

@clarkzinzow
Copy link
Contributor

But one subscriber can still subscribe one channel right? Are you saying not quite channel because subscriber "subscribes" channels (that says, due to semantics) or is there other reasons?

I'm saying that the different pubsub "channels" are presented to the application-level code as independent subscriber objects, with APIs that are channel-agnostic. The broker should hide all of the details around managing subscriber <--> publisher connections, and the multiplexing of multiple different pubsub channels over a single subscriber <--> publisher connection should be hidden from the application code.

So, I suggest to move forward with the current impl as you mentioned here (This is also something that could wait until a future PR), and we compile a design doc that includes general interface of this and perform refactor in a separate PR if necessary. What do you think?

Definitely, that sounds like the best route to me! I should be able to get to another review pass of this PR today.

@jovany-wang
Copy link
Contributor

@jovany-wang

BTW, I prefer name the channels to OBJECT_EVICTION and REF_REMOVED(Remove the WAIT_FOR_ prefix.).

Yeah it makes sense. But we probably want to have a prefix to distinguish worker-based channel and gcs-based channels? What about WORKER_OBJECT_EVICTION?

I totally agree that adding a prefix to distinguish worker-based channels and gcs-based channels. But I don't have an idea on what the best prefix format is~

Copy link
Contributor

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! I mostly have nits, although we should definitely fix the more-than-one-move when queueing messages before merging, but I trust that you'll fix that. I'm also still of the opinion that we should try to register subscriptions before adding ref deletion callbacks (which include unregistering subscriptions) to prevent surprise leaks of subscriptions in the future (thread here), lmk what you think about that.

I also didn't get a chance to say this on the initial PR, but great work on the tests! 🙌

@clarkzinzow
Copy link
Contributor

Btw, it looks like there's an ASAN failure for a reference count test that might need attention.

@rkooo567
Copy link
Contributor Author

rkooo567 commented May 3, 2021

@clarkzinzow Yeah I am aware of that one. It is super weird because it is not a memory-related error + never been reproducible from my laptop... I will make sure to fix this issue (probably there are some funny mistakes).

@clarkzinzow clarkzinzow added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label May 4, 2021
Copy link
Contributor

@fishbone fishbone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is too big. We should avoid this as much as possible in the future. I haven't reviewed subscriber.h/cc yet. Except for these, all the rest has been reviewed.

/// Subscriber
///////////////////////////////////////////////////////////////////////////////

inline std::shared_ptr<SubscribeChannelInterface> Subscriber::Channel(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why inline here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because this function is frequently called, but pretty tiny.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like inline doesn't mean the function is actually inlined. I will move the definition to the header instead to get true inline functions.

@rkooo567 rkooo567 requested review from jovany-wang and fishbone May 12, 2021 16:47
@rkooo567 rkooo567 removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label May 12, 2021
Copy link
Contributor

@jovany-wang jovany-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good now. I left some minor comments.

@rkooo567
Copy link
Contributor Author

Docker / Mac wheel build times out frequently. The flaky test seems to pass, but it failed for some weird reasons. I will just merge it.

@rkooo567 rkooo567 dismissed fishbone’s stale review May 13, 2021 16:28

There are already 2 approvals, and I didn't receive new reviews for a while, so I will just merge it.

@rkooo567 rkooo567 merged commit 259fcbd into ray-project:master May 13, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Excessive heap memory usage in raylet / owner process when shuffling many objects
5 participants