-
Notifications
You must be signed in to change notification settings - Fork 98
Introduction of Message Fabric V2 #3183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test Results 7 files + 3 7 suites +3 4m 59s ⏱️ + 3m 57s Results for commit 9fd454d. ± Comparison against base commit d100b46. This pull request removes 16 and adds 54 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
d3e4ab5
to
1d71d12
Compare
c2f225c
to
26a9ab8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for reworking our network stack @AhmedSoliman. This is some monumental work which looks amazing 🦸 It simplifies and improves things as well ass preparing the ground for the future. Really great! What's really cool is the way rpc types are bound to services and thereby enforcing type safety when receiving and responding to rpcs. I only left some minor comments/questions. +1 for merging :-)
network_sender: Networking<T>, | ||
cluster_state_tx: Arc<watch::Sender<Arc<ClusterState>>>, | ||
) -> Result<Option<TaskHandle<anyhow::Result<()>>>, ShutdownError> { | ||
let refresh = async move { | ||
trace!("Refreshing cluster state"); | ||
let last_state = Arc::clone(&cluster_state_tx.borrow()); | ||
let heartbeat_timeout = Configuration::pinned().networking.keep_alive_timeout(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it on purpose that we are using the keep alive timeout as the heartbeat_timeout
? Before we set a timeout of 1s. Now it is 5s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Two thoughts on this, 1) it's aligns it with our connection-reset detection period and 2) it's a transient change until a replacement failure detector which will happen before the next release.
Err(NetworkError::RemoteVersionMismatch(msg)) => { | ||
// When **this** node has just started, other peers might not have | ||
// learned about the new metadata version and then they can | ||
// return a RemoteVersionMismatch error. | ||
// In this case we are not sure about the peer state but it's | ||
// definitely not dead! | ||
// Hence we set it as Suspect node. This gives it enough time to update | ||
// its metadata, before we know the exact state | ||
debug!("Node {node_id} is marked as Suspect: {msg}"); | ||
nodes.insert( | ||
node_id.as_plain(), | ||
NodeState::Suspect(SuspectNode { | ||
generational_node_id: node_id, | ||
last_attempt: MillisSinceEpoch::now(), | ||
}), | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This case already became obsolete with eef7ea5, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct
@@ -323,46 +302,33 @@ impl RemoteSequencerConnection { | |||
connection: Connection, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need to pass in the Connection
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be removed but it's currently used to get the peer (sequencer node) id in an error message.
@@ -76,12 +76,11 @@ xxhash-rust = { workspace = true } | |||
tonic-build = { workspace = true } | |||
|
|||
[dev-dependencies] | |||
restate-core = {path = ".", default-features = false, features = ["test-util"]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We seem to sometimes use #[cfg(any(test, feature = "test-util"))]
and sometimes #[cfg(feature = "test-util")]
in this crate. Is it because of this that we enable the test-util
feature? Maybe something to unify in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I didn't want to make this PR even bigger :)
if let Err(e) = self.self_proposer.propose(partition_key, cmd).await { | ||
respond_to_rpc( | ||
reciprocal | ||
.prepare(Err(PartitionProcessorRpcError::Internal(e.to_string()))), | ||
); | ||
reciprocal.send(Err(PartitionProcessorRpcError::Internal(e.to_string()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated to this PR: It looks that we only fail if we cannot propose. In this case, we might return a PartitionProcessorRpcError::Busy
(or another error telling the caller that the rpc wasn't processed and could be retried).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll leave this out of the scope of this PR
reciprocal.prepare(Err(PartitionProcessorRpcError::Internal(e.to_string()))), | ||
); | ||
} | ||
Err(e) => reciprocal.send(Err(PartitionProcessorRpcError::Internal(e.to_string()))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
This commit introduces a new network protocol version with automatica backward compatibility layer and an accompanying modified networking API to provider support for the following: 1. Native RPC in protocol level with fabric-level service and service-shard routing. Routing to service shards is offered via an opaque `SortCode`, a u64 value that can be used to route messages to a specific shard of a service (e.g. `PartitionId`, or `LogletId`, and etc.). 2. Support for cancellation of enqueued egress RPC requests if the caller is not interested in the result anymore. 3. Message payload deserialization offloaded from the network reactor to the call-site's thread to reduce the network reactor's CPU usage and reduce the effect of head-of-line blocking caused by expensive messages. 5. Adds the concept of `Service` that can handle `rpc`, `unary`, and `watch` messages. 6. Improved ergonomics for using the message fabric for sending rpc requests, no need to get access to `MessageRouterBuilder` anymore, this unlocks the ability to create arbitrary connections that are self-managed (not tied to connection manager). 7. WIP Introduces `Swimlane`s concept to classify streams/connections into different swim lanes. This will provide isolation between fat data streams and metadata low-latency streams in the future. 8. WIP support for "remote watches". Not fully implemented, but will be available in the future. 9. A variety of fixes and improvements to the existing code.
This commit introduces a new network protocol version with automatica backward compatibility layer and an accompanying modified networking API to provider support for the following:
SortCode
, a u64 value that can be used to route messages to a specific shard of a service (e.g.PartitionId
, orLogletId
, and etc.).Service
that can handlerpc
,unary
, andwatch
messages.MessageRouterBuilder
anymore, this unlocks the ability to create arbitrary connections that are self-managed (not tied to connection manager).Swimlane
s concept to classify streams/connections into different swim lanes. This will provide isolation between fat data streams and metadata low-latency streams in the future.Stack created with Sapling. Best reviewed with ReviewStack.