Skip to content

Conversation

AhmedSoliman
Copy link
Contributor

@AhmedSoliman AhmedSoliman commented Apr 22, 2025

This commit introduces a new network protocol version with automatica backward compatibility layer and an accompanying modified networking API to provider support for the following:

  1. Native RPC in protocol level with fabric-level service and service-shard routing. Routing to service shards is offered via an opaque SortCode, a u64 value that can be used to route messages to a specific shard of a service (e.g. PartitionId, or LogletId, and etc.).
  2. Support for cancellation of enqueued egress RPC requests if the caller is not interested in the result anymore.
  3. Message payload deserialization offloaded from the network reactor to the call-site's thread to reduce the network reactor's CPU usage and reduce the effect of head-of-line blocking caused by expensive messages.
  4. Adds the concept of Service that can handle rpc, unary, and watch messages.
  5. Improved ergonomics for using the message fabric for sending rpc requests, no need to get access to MessageRouterBuilder anymore, this unlocks the ability to create arbitrary connections that are self-managed (not tied to connection manager).
  6. WIP Introduces Swimlanes concept to classify streams/connections into different swim lanes. This will provide isolation between fat data streams and metadata low-latency streams in the future.
  7. WIP support for "remote watches". Not fully implemented, but will be available in the future.
  8. A variety of fixes and improvements to the existing code.

Stack created with Sapling. Best reviewed with ReviewStack.

Copy link

github-actions bot commented Apr 22, 2025

Test Results

  7 files  +  3    7 suites  +3   4m 59s ⏱️ + 3m 57s
 54 tests + 38   53 ✅ + 37  1 💤 +1  0 ❌ ±0 
223 runs  +191  220 ✅ +188  3 💤 +3  0 ❌ ±0 

Results for commit 9fd454d. ± Comparison against base commit d100b46.

This pull request removes 16 and adds 54 tests. Note that renamed tests count towards both.
dev.restate.sdktesting.tests.AwakeableIngressEndpoint ‑ completeWithFailure(Client)
dev.restate.sdktesting.tests.AwakeableIngressEndpoint ‑ completeWithSuccess(Client)
dev.restate.sdktesting.tests.FrontCompatibilityTest$NewVersion ‑ completeAwakeable(Client)
dev.restate.sdktesting.tests.FrontCompatibilityTest$NewVersion ‑ completeRetryableOperation(Client)
dev.restate.sdktesting.tests.FrontCompatibilityTest$NewVersion ‑ proxyCallShouldBeDone(Client)
dev.restate.sdktesting.tests.FrontCompatibilityTest$NewVersion ‑ proxyOneWayCallShouldBeDone(Client)
dev.restate.sdktesting.tests.FrontCompatibilityTest$OldVersion ‑ createAwakeable(Client)
dev.restate.sdktesting.tests.FrontCompatibilityTest$OldVersion ‑ startOneWayProxyCall(Client)
dev.restate.sdktesting.tests.FrontCompatibilityTest$OldVersion ‑ startProxyCall(Client)
dev.restate.sdktesting.tests.FrontCompatibilityTest$OldVersion ‑ startRetryableOperation(Client)
…
dev.restate.sdktesting.tests.CallOrdering ‑ ordering(boolean[], Client)[1]
dev.restate.sdktesting.tests.CallOrdering ‑ ordering(boolean[], Client)[2]
dev.restate.sdktesting.tests.CallOrdering ‑ ordering(boolean[], Client)[3]
dev.restate.sdktesting.tests.Cancellation ‑ cancelFromAdminAPI(BlockingOperation, Client, URI)[1]
dev.restate.sdktesting.tests.Cancellation ‑ cancelFromAdminAPI(BlockingOperation, Client, URI)[2]
dev.restate.sdktesting.tests.Cancellation ‑ cancelFromAdminAPI(BlockingOperation, Client, URI)[3]
dev.restate.sdktesting.tests.Cancellation ‑ cancelFromContext(BlockingOperation, Client)[1]
dev.restate.sdktesting.tests.Cancellation ‑ cancelFromContext(BlockingOperation, Client)[2]
dev.restate.sdktesting.tests.Cancellation ‑ cancelFromContext(BlockingOperation, Client)[3]
dev.restate.sdktesting.tests.Combinators ‑ awakeableOrTimeoutUsingAwaitAny(Client)
…

♻️ This comment has been updated with latest results.

@AhmedSoliman AhmedSoliman force-pushed the pr3183 branch 5 times, most recently from d3e4ab5 to 1d71d12 Compare April 23, 2025 15:25
@AhmedSoliman AhmedSoliman changed the title [WIP] Network Fabric V2 Introduction of Message Fabric V2 Apr 23, 2025
@AhmedSoliman AhmedSoliman marked this pull request as ready for review April 23, 2025 15:36
@AhmedSoliman AhmedSoliman force-pushed the pr3183 branch 5 times, most recently from c2f225c to 26a9ab8 Compare April 24, 2025 08:12
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for reworking our network stack @AhmedSoliman. This is some monumental work which looks amazing 🦸 It simplifies and improves things as well ass preparing the ground for the future. Really great! What's really cool is the way rpc types are bound to services and thereby enforcing type safety when receiving and responding to rpcs. I only left some minor comments/questions. +1 for merging :-)

network_sender: Networking<T>,
cluster_state_tx: Arc<watch::Sender<Arc<ClusterState>>>,
) -> Result<Option<TaskHandle<anyhow::Result<()>>>, ShutdownError> {
let refresh = async move {
trace!("Refreshing cluster state");
let last_state = Arc::clone(&cluster_state_tx.borrow());
let heartbeat_timeout = Configuration::pinned().networking.keep_alive_timeout();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it on purpose that we are using the keep alive timeout as the heartbeat_timeout? Before we set a timeout of 1s. Now it is 5s.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Two thoughts on this, 1) it's aligns it with our connection-reset detection period and 2) it's a transient change until a replacement failure detector which will happen before the next release.

Comment on lines -173 to -189
Err(NetworkError::RemoteVersionMismatch(msg)) => {
// When **this** node has just started, other peers might not have
// learned about the new metadata version and then they can
// return a RemoteVersionMismatch error.
// In this case we are not sure about the peer state but it's
// definitely not dead!
// Hence we set it as Suspect node. This gives it enough time to update
// its metadata, before we know the exact state
debug!("Node {node_id} is marked as Suspect: {msg}");
nodes.insert(
node_id.as_plain(),
NodeState::Suspect(SuspectNode {
generational_node_id: node_id,
last_attempt: MillisSinceEpoch::now(),
}),
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case already became obsolete with eef7ea5, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct

@@ -323,46 +302,33 @@ impl RemoteSequencerConnection {
connection: Connection,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need to pass in the Connection here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be removed but it's currently used to get the peer (sequencer node) id in an error message.

@@ -76,12 +76,11 @@ xxhash-rust = { workspace = true }
tonic-build = { workspace = true }

[dev-dependencies]
restate-core = {path = ".", default-features = false, features = ["test-util"]}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We seem to sometimes use #[cfg(any(test, feature = "test-util"))] and sometimes #[cfg(feature = "test-util")] in this crate. Is it because of this that we enable the test-util feature? Maybe something to unify in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I didn't want to make this PR even bigger :)

Comment on lines 293 to +294
if let Err(e) = self.self_proposer.propose(partition_key, cmd).await {
respond_to_rpc(
reciprocal
.prepare(Err(PartitionProcessorRpcError::Internal(e.to_string()))),
);
reciprocal.send(Err(PartitionProcessorRpcError::Internal(e.to_string())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR: It looks that we only fail if we cannot propose. In this case, we might return a PartitionProcessorRpcError::Busy (or another error telling the caller that the rpc wasn't processed and could be retried).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave this out of the scope of this PR

reciprocal.prepare(Err(PartitionProcessorRpcError::Internal(e.to_string()))),
);
}
Err(e) => reciprocal.send(Err(PartitionProcessorRpcError::Internal(e.to_string()))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

This commit introduces a new network protocol version with automatica backward compatibility layer and an accompanying modified networking API to provider support for the following:
1. Native RPC in protocol level with fabric-level service and service-shard routing. Routing to service shards is offered via an opaque `SortCode`, a u64 value that can be used to route messages to a specific shard of a service (e.g. `PartitionId`, or `LogletId`, and etc.).
2. Support for cancellation of enqueued egress RPC requests if the caller is not interested in the result anymore.
3. Message payload deserialization offloaded from the network reactor to the call-site's thread to reduce the network reactor's CPU usage and reduce the effect of head-of-line blocking caused by expensive messages.
5. Adds the concept of `Service` that can handle `rpc`, `unary`, and `watch` messages.
6. Improved ergonomics for using the message fabric for sending rpc requests, no need to get access to `MessageRouterBuilder` anymore, this unlocks the ability to create arbitrary connections that are self-managed (not tied to connection manager).
7. WIP Introduces `Swimlane`s concept to classify streams/connections into different swim lanes. This will provide isolation between fat data streams and metadata low-latency streams in the future.
8. WIP support for "remote watches". Not fully implemented, but will be available in the future.
9. A variety of fixes and improvements to the existing code.
@AhmedSoliman AhmedSoliman merged commit 9fd454d into main Apr 25, 2025
55 checks passed
@AhmedSoliman AhmedSoliman deleted the pr3183 branch April 25, 2025 14:34
@github-actions github-actions bot locked and limited conversation to collaborators Apr 25, 2025
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants