Skip to content

Conversation

AhmedSoliman
Copy link
Contributor

@AhmedSoliman AhmedSoliman commented Apr 15, 2025

Copy link

github-actions bot commented Apr 16, 2025

Test Results

  7 files  +  3    7 suites  +3   4m 58s ⏱️ + 3m 56s
 54 tests + 38   53 ✅ + 37  1 💤 +1  0 ❌ ±0 
223 runs  +191  220 ✅ +188  3 💤 +3  0 ❌ ±0 

Results for commit a932e8a. ± Comparison against base commit d100b46.

This pull request removes 16 and adds 54 tests. Note that renamed tests count towards both.
dev.restate.sdktesting.tests.AwakeableIngressEndpoint ‑ completeWithFailure(Client)
dev.restate.sdktesting.tests.AwakeableIngressEndpoint ‑ completeWithSuccess(Client)
dev.restate.sdktesting.tests.FrontCompatibilityTest$NewVersion ‑ completeAwakeable(Client)
dev.restate.sdktesting.tests.FrontCompatibilityTest$NewVersion ‑ completeRetryableOperation(Client)
dev.restate.sdktesting.tests.FrontCompatibilityTest$NewVersion ‑ proxyCallShouldBeDone(Client)
dev.restate.sdktesting.tests.FrontCompatibilityTest$NewVersion ‑ proxyOneWayCallShouldBeDone(Client)
dev.restate.sdktesting.tests.FrontCompatibilityTest$OldVersion ‑ createAwakeable(Client)
dev.restate.sdktesting.tests.FrontCompatibilityTest$OldVersion ‑ startOneWayProxyCall(Client)
dev.restate.sdktesting.tests.FrontCompatibilityTest$OldVersion ‑ startProxyCall(Client)
dev.restate.sdktesting.tests.FrontCompatibilityTest$OldVersion ‑ startRetryableOperation(Client)
…
dev.restate.sdktesting.tests.CallOrdering ‑ ordering(boolean[], Client)[1]
dev.restate.sdktesting.tests.CallOrdering ‑ ordering(boolean[], Client)[2]
dev.restate.sdktesting.tests.CallOrdering ‑ ordering(boolean[], Client)[3]
dev.restate.sdktesting.tests.Cancellation ‑ cancelFromAdminAPI(BlockingOperation, Client, URI)[1]
dev.restate.sdktesting.tests.Cancellation ‑ cancelFromAdminAPI(BlockingOperation, Client, URI)[2]
dev.restate.sdktesting.tests.Cancellation ‑ cancelFromAdminAPI(BlockingOperation, Client, URI)[3]
dev.restate.sdktesting.tests.Cancellation ‑ cancelFromContext(BlockingOperation, Client)[1]
dev.restate.sdktesting.tests.Cancellation ‑ cancelFromContext(BlockingOperation, Client)[2]
dev.restate.sdktesting.tests.Cancellation ‑ cancelFromContext(BlockingOperation, Client)[3]
dev.restate.sdktesting.tests.Combinators ‑ awakeableOrTimeoutUsingAwaitAny(Client)
…

♻️ This comment has been updated with latest results.

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding read_modify_write to the MetadataClientWrapper @AhmedSoliman. The changes look good to me. I think it's a good improvement to also include other sources (nodes) to get the latest metadata from.

Reasoning about the retry behavior of the read_modify_write call has gotten a little bit harder because of the added layer of indirection when fetching updates. I guess this is hard to avoid. And it wasn't simple before either.

The one question I had was whether errors that happen within the interaction between the metadata store client and the store are retried indefinitely. If yes, then this looks as if it slightly changes the semantics of some of the call sites (admin API, cluster configuration updates and the bifrost watchdog). Should we add some higher level timeouts to the affected call sites?

}

pub trait Extraction: GlobalMetadata {
type Output;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a need for extracting a different Output than Self?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really but that's the only trick I could find to get the dynamic extraction to work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it wouldn't work to change fn extract_as_global_metadata(v: MetadataContainer) -> Option<Arc<Self>>; and to remove the associated type?

@AhmedSoliman AhmedSoliman force-pushed the pr3164 branch 2 times, most recently from 83e8432 to 55201e5 Compare April 24, 2025 17:24
@AhmedSoliman
Copy link
Contributor Author

@tillrohrmann Thanks for taking a look and the feedback. I've adjusted the approach a little to take into account the total retry duration to indirectly limit the wait time. In all fairness it's a tricky problem to solve, but at least we don't end up in being stuck forever with this change.

This commit introduces a new network protocol version with automatica backward compatibility layer and an accompanying modified networking API to provider support for the following:
1. Native RPC in protocol level with fabric-level service and service-shard routing. Routing to service shards is offered via an opaque `SortCode`, a u64 value that can be used to route messages to a specific shard of a service (e.g. `PartitionId`, or `LogletId`, and etc.).
2. Support for cancellation of enqueued egress RPC requests if the caller is not interested in the result anymore.
3. Message payload deserialization offloaded from the network reactor to the call-site's thread to reduce the network reactor's CPU usage and reduce the effect of head-of-line blocking caused by expensive messages.
5. Adds the concept of `Service` that can handle `rpc`, `unary`, and `watch` messages.
6. Improved ergonomics for using the message fabric for sending rpc requests, no need to get access to `MessageRouterBuilder` anymore, this unlocks the ability to create arbitrary connections that are self-managed (not tied to connection manager).
7. WIP Introduces `Swimlane`s concept to classify streams/connections into different swim lanes. This will provide isolation between fat data streams and metadata low-latency streams in the future.
8. WIP support for "remote watches". Not fully implemented, but will be available in the future.
9. A variety of fixes and improvements to the existing code.
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding a timeout for the read-modify-write operation @AhmedSoliman. Nice tooling for the RetryPolicy you've added :-) LGTM. +1 for merging.

Comment on lines +354 to +389
//-------------------------------------------------------------
// Put all arithmetic in f64 milliseconds for convenience
//-------------------------------------------------------------
let r = *factor as f64;
let d1_ms = next_delay.as_secs_f64() * 1_000.0; // d₁
let cap_ms = max_interval.map(|d| d.as_secs_f64() * 1_000.0); // M

//-------------------------------------------------------------
// How many future delays remain purely exponential (< cap)?
//-------------------------------------------------------------
let n_exp = match cap_ms {
None => retries_left, // no cap at all
Some(m) if d1_ms >= m => 0, // already above / at the cap
Some(m) => {
// smallest j s.t. d₁·rʲ ≥ M → j = ceil(log_r(M/d₁))
let ceil_j = ((m / d1_ms).ln() / r.ln()).ceil() as usize;
retries_left.min(ceil_j)
}
};

//-------------------------------------------------------------
// Geometric part (those still < cap)
//-------------------------------------------------------------
let geom_ms = if n_exp == 0 {
0.0
} else {
d1_ms * (r.powi(n_exp as i32) - 1.0) / (r - 1.0)
};

//-------------------------------------------------------------
// Flat tail at the cap, if any
//-------------------------------------------------------------
let cap_tail_ms = match cap_ms {
Some(m) => (retries_left - n_exp) as f64 * m,
None => 0.0,
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super nicely explained the math 🤩

None => retries_left, // no cap at all
Some(m) if d1_ms >= m => 0, // already above / at the cap
Some(m) => {
// smallest j s.t. d₁·rʲ ≥ M → j = ceil(log_r(M/d₁))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this works because if M/d1 is a power of r, then we simply count the last exponential step as the first tail step, right?

Network swimlanes categorize network connections into different groups based on their characteristics. This PR is a continuation of the work done in the previous PR, which introduced the concept of swimlanes. The goal is to enhance the network management capabilities of the system by allowing for more granular control and monitoring of network connections.

Most notable changes:
- We keep a single connection per swimlane instead of a pool of connections. This removes the configuration option `num-concurrent-connections`.
- Connections/stream's window sizes are now configurable and are set to reasonable defaults that suit our workloads better than tonic/hyper's defaults.
- Removal of `outbound-queue-length` in networking in favor of relying on memory-level settings (stream window sizes)
- Introducing `swimlane` in `Hello` message to communicate the swimlane to the peer during handshake
- Gossip connections are automatically bidirectional
@AhmedSoliman AhmedSoliman merged commit a932e8a into main Apr 25, 2025
55 checks passed
@AhmedSoliman AhmedSoliman deleted the pr3164 branch April 25, 2025 14:36
@github-actions github-actions bot locked and limited conversation to collaborators Apr 25, 2025
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants