-
Notifications
You must be signed in to change notification settings - Fork 6.8k
[Pubsub] Generalize the pubsub interface and adapt it for ref counting protocol #15446
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Current test failure should be temporary,. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did a usage first pass, I have a few high-level pubsub design questions before I dig deeper.
It looks like we're using a channel abstraction in order to be able to piggyback off of shared publisher <--> subscriber long-polling connections, where the publisher and subscriber interfaces expose the channel concept to the application code via the channel type and deal only with a generic PubMessage
message type that oneofs across the messages for the different channels. This then entails pass-through methods to the subinterfaces for each channel, maintaining channel --> data structure maps internally, and leaking what is essentially a transport detail (the long-polling connection) to the application level by requiring publishers/subscribers to specify the channel type. I'm wondering if there's a better way to structure this such that the application code doesn't have to worry about channels and more of this logic is encoded in the type system so we have compile-time guarantees that things are 👌 .
What if we adopted a pattern similar to gRPC's shared transport channel, generalized from managing a 1-to-1 connection to managing 1-to-n connections, where we have a transport broker concept that encapsulates the publisher <--> subscriber long-polling connections. The subscriber interface for each pubsub instance takes a shared pointer to the broker on construction, and can therefore share the broker across different pubsub instances (e.g. separate subscriber objects for WAIT_FOR_OBJECT_EVICTION
and WAIT_FOR_REF_REMOVED
), where new publisher <--> subscriber connections can be registered by any of the pubsub instances at subscription time.
subscribers <--> broker <--transport--> broker <--> publishers
I think there could be a few advantages here:
- We could then expose a subscriber interface per pubsub instance, templated on the concrete message type (no need for a proto oneof message to be exposed to the application code, that could be isolated to the broker), which will give better compile-time checking of the application-level pubsub messages and will obviate the need for those passthrough methods in a top-level subscriber interface that requires a channel type to be specified.
- I think that you'd still need a proto-level channel type enum, but I think that you could get away with specializing the subscriber with the enum value itself. E.g.
Subscriber<typename MessageID, typename Message, rpc::ChannelType channel_type>
could be specialized asSubscriber<ObjectID, rpc::WaitForObjectEvictionMessage, rpc::WAIT_FOR_OBJECT_EVICTION>
, where thechannel_type
is included when interacting with the broker (e.g. subscribe, unsubscribe). Dispatch of messages received in the long-poll response by the broker will still have to use a channel --> subscriber table, but that can be populated dynamically when the subscriber is constructed:broker_.RegisterSubscriber(channel_type, this)
. - The transport-level optimization of sharing long-polling connections is pushed down as an implementation detail of the transport broker concept, and is a natural place to evolve other transport-level details, like switching to a bidi streaming connection or switching to a centralized message broker.
- Other transport-level things, like the core worker client pool and the subscriber address/port, can be moved to the broker. Publisher addresses will probably continue to be required by subscriber methods, unless we want the broker to always map worker or node IDs to concrete addresses (probably less efficient in some cases?).
- The current process of registering a new channel will be replaced by adding a new channel type to the proto enum, adding the channel's proto to the broker's pubsub message oneof, templating a subscriber with a new message type and the channel type, and reusing the existing broker. I don't think there should be much more required than that.
I think that you would have an analogous broker abstraction on the publisher side as well (encapsulating the cached long-polling connection, controlling the message batching), although I haven't thought it through as much since I think it should be simpler than the subscriber-side. My guess is that the publisher would be similarly templated, would similarly register itself with the publish-side broker on construction, and each publisher instance (one per channel type) would have its own subscription index. I think that the publisher's Subscriber
would be internal to the publish-side broker.
The cons with this broker approach, as I see them:
- Broker concept is exposed to the application level, although what's needed to configure that concept (worker client pool, subscriber address/port, etc.) is already exposed to the application level, so I'm not sure if this is any worse?
- A shared transport-level broker will obviously have to be thread-safe, although that shouldn't be very difficult.
- Hidden dragons that I haven't thought of.
Lmk what you think about this approach and whether I'm missing anything! This is also something that could wait until a future PR, e.g. when I port the OBOD object location subscriptions or the GCS node/actor tables to this pubsub I'm sure that I'll have to refactor some stuff anyway.
Thanks for the suggestion! Did I understand correctly the below API examples are what you are describing? # either this
# manages the connection
broker_
# manages the tracking
wait_for_object_channel = Channel<ObjectID, MessageType>(broker)
wait_for_object_channel->interface();
# or this
subscriber_broker->Channel(WaitForObjectEviction)->interface() |
Yeah it makes sense. But we probably want to have a prefix to distinguish worker-based channel and gcs-based channels? What about WORKER_OBJECT_EVICTION? |
I'm saying that the different pubsub "channels" are presented to the application-level code as independent subscriber objects, with APIs that are channel-agnostic. The broker should hide all of the details around managing subscriber <--> publisher connections, and the multiplexing of multiple different pubsub channels over a single subscriber <--> publisher connection should be hidden from the application code.
Definitely, that sounds like the best route to me! I should be able to get to another review pass of this PR today. |
I totally agree that adding a prefix to distinguish worker-based channels and gcs-based channels. But I don't have an idea on what the best prefix format is~ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! I mostly have nits, although we should definitely fix the more-than-one-move when queueing messages before merging, but I trust that you'll fix that. I'm also still of the opinion that we should try to register subscriptions before adding ref deletion callbacks (which include unregistering subscriptions) to prevent surprise leaks of subscriptions in the future (thread here), lmk what you think about that.
I also didn't get a chance to say this on the initial PR, but great work on the tests! 🙌
Btw, it looks like there's an ASAN failure for a reference count test that might need attention. |
@clarkzinzow Yeah I am aware of that one. It is super weird because it is not a memory-related error + never been reproducible from my laptop... I will make sure to fix this issue (probably there are some funny mistakes). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is too big. We should avoid this as much as possible in the future. I haven't reviewed subscriber.h/cc
yet. Except for these, all the rest has been reviewed.
src/ray/pubsub/subscriber.cc
Outdated
/// Subscriber | ||
/////////////////////////////////////////////////////////////////////////////// | ||
|
||
inline std::shared_ptr<SubscribeChannelInterface> Subscriber::Channel( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why inline here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because this function is frequently called, but pretty tiny.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like inline doesn't mean the function is actually inlined. I will move the definition to the header instead to get true inline functions.
This reverts commit 9a6a521.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good now. I left some minor comments.
Docker / Mac wheel build times out frequently. The flaky test seems to pass, but it failed for some weird reasons. I will just merge it. |
There are already 2 approvals, and I didn't receive new reviews for a while, so I will just merge it.
Why are these changes needed?
This is fully review-able
Recommended review workflow.
The current interface is not ideal, but it is working pretty well with the current status. Let's iterate on the interface!
I verified this reduced the number of WaitForRefRemoved requests.
Related issue number
Close #14322
#14762
Checks
scripts/format.sh
to lint the changes in this PR.