-
Notifications
You must be signed in to change notification settings - Fork 98
[Core] Read-modify-write for global metadata #3164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cbba7d3
to
6907bb8
Compare
Test Results 7 files + 3 7 suites +3 4m 58s ⏱️ + 3m 56s Results for commit a932e8a. ± Comparison against base commit d100b46. This pull request removes 16 and adds 54 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
3332a02
to
ed30410
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding read_modify_write
to the MetadataClientWrapper
@AhmedSoliman. The changes look good to me. I think it's a good improvement to also include other sources (nodes) to get the latest metadata from.
Reasoning about the retry behavior of the read_modify_write
call has gotten a little bit harder because of the added layer of indirection when fetching updates. I guess this is hard to avoid. And it wasn't simple before either.
The one question I had was whether errors that happen within the interaction between the metadata store client and the store are retried indefinitely. If yes, then this looks as if it slightly changes the semantics of some of the call sites (admin API, cluster configuration updates and the bifrost watchdog). Should we add some higher level timeouts to the affected call sites?
} | ||
|
||
pub trait Extraction: GlobalMetadata { | ||
type Output; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a need for extracting a different Output
than Self
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really but that's the only trick I could find to get the dynamic extraction to work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it wouldn't work to change fn extract_as_global_metadata(v: MetadataContainer) -> Option<Arc<Self>>;
and to remove the associated type?
e3c08df
to
9a80ce4
Compare
5a114a0
to
44f78c3
Compare
83e8432
to
55201e5
Compare
@tillrohrmann Thanks for taking a look and the feedback. I've adjusted the approach a little to take into account the total retry duration to indirectly limit the wait time. In all fairness it's a tricky problem to solve, but at least we don't end up in being stuck forever with this change. |
This commit introduces a new network protocol version with automatica backward compatibility layer and an accompanying modified networking API to provider support for the following: 1. Native RPC in protocol level with fabric-level service and service-shard routing. Routing to service shards is offered via an opaque `SortCode`, a u64 value that can be used to route messages to a specific shard of a service (e.g. `PartitionId`, or `LogletId`, and etc.). 2. Support for cancellation of enqueued egress RPC requests if the caller is not interested in the result anymore. 3. Message payload deserialization offloaded from the network reactor to the call-site's thread to reduce the network reactor's CPU usage and reduce the effect of head-of-line blocking caused by expensive messages. 5. Adds the concept of `Service` that can handle `rpc`, `unary`, and `watch` messages. 6. Improved ergonomics for using the message fabric for sending rpc requests, no need to get access to `MessageRouterBuilder` anymore, this unlocks the ability to create arbitrary connections that are self-managed (not tied to connection manager). 7. WIP Introduces `Swimlane`s concept to classify streams/connections into different swim lanes. This will provide isolation between fat data streams and metadata low-latency streams in the future. 8. WIP support for "remote watches". Not fully implemented, but will be available in the future. 9. A variety of fixes and improvements to the existing code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding a timeout for the read-modify-write operation @AhmedSoliman. Nice tooling for the RetryPolicy
you've added :-) LGTM. +1 for merging.
//------------------------------------------------------------- | ||
// Put all arithmetic in f64 milliseconds for convenience | ||
//------------------------------------------------------------- | ||
let r = *factor as f64; | ||
let d1_ms = next_delay.as_secs_f64() * 1_000.0; // d₁ | ||
let cap_ms = max_interval.map(|d| d.as_secs_f64() * 1_000.0); // M | ||
|
||
//------------------------------------------------------------- | ||
// How many future delays remain purely exponential (< cap)? | ||
//------------------------------------------------------------- | ||
let n_exp = match cap_ms { | ||
None => retries_left, // no cap at all | ||
Some(m) if d1_ms >= m => 0, // already above / at the cap | ||
Some(m) => { | ||
// smallest j s.t. d₁·rʲ ≥ M → j = ceil(log_r(M/d₁)) | ||
let ceil_j = ((m / d1_ms).ln() / r.ln()).ceil() as usize; | ||
retries_left.min(ceil_j) | ||
} | ||
}; | ||
|
||
//------------------------------------------------------------- | ||
// Geometric part (those still < cap) | ||
//------------------------------------------------------------- | ||
let geom_ms = if n_exp == 0 { | ||
0.0 | ||
} else { | ||
d1_ms * (r.powi(n_exp as i32) - 1.0) / (r - 1.0) | ||
}; | ||
|
||
//------------------------------------------------------------- | ||
// Flat tail at the cap, if any | ||
//------------------------------------------------------------- | ||
let cap_tail_ms = match cap_ms { | ||
Some(m) => (retries_left - n_exp) as f64 * m, | ||
None => 0.0, | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super nicely explained the math 🤩
None => retries_left, // no cap at all | ||
Some(m) if d1_ms >= m => 0, // already above / at the cap | ||
Some(m) => { | ||
// smallest j s.t. d₁·rʲ ≥ M → j = ceil(log_r(M/d₁)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this works because if M/d1 is a power of r, then we simply count the last exponential step as the first tail step, right?
Network swimlanes categorize network connections into different groups based on their characteristics. This PR is a continuation of the work done in the previous PR, which introduced the concept of swimlanes. The goal is to enhance the network management capabilities of the system by allowing for more granular control and monitoring of network connections. Most notable changes: - We keep a single connection per swimlane instead of a pool of connections. This removes the configuration option `num-concurrent-connections`. - Connections/stream's window sizes are now configurable and are set to reasonable defaults that suit our workloads better than tonic/hyper's defaults. - Removal of `outbound-queue-length` in networking in favor of relying on memory-level settings (stream window sizes) - Introducing `swimlane` in `Hello` message to communicate the swimlane to the peer during handshake - Gossip connections are automatically bidirectional
Stack created with Sapling. Best reviewed with ReviewStack.