-
Notifications
You must be signed in to change notification settings - Fork 278
Move resolve api to async-stream #599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Integration test failure isn't your fault, BTW — still trying to figure out what's up with that test consistently failing on CI. |
Does this actually depend on the other PR? |
@olix0r now that I look at it, I do not think it does, I will change the base |
@olix0r It does depend because |
ccddb4e
to
b3ea7ad
Compare
b3ea7ad
to
038733e
Compare
Tests hang
|
@olix0r yes I just saw that. It is strange that simply moving that to async stream caused this hang. Must be doing something wrong. Will dig in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding the loop appears to change the stream's behavior so that the underlying gRPC stream is polled again even after it returns None
. I'm surprised that this fixes a hang --- that implies that the underlying gRPC stream will sometimes return None
and then return Some
again after being polled again. That's not the behaviour I would have expected.
I'm also surprised that this manifests as a hang --- "falling off" the end of the try_stream!
block should result in the stream returning None
and terminating. If this is the case, I think that adding the loop will result in the resolve stream returning Poll::Pending
rather than Poll::Ready(None)
when the underlying stream terminates, and then never being woken up again. If the gRPC stream returns None
, then the while
loop terminates. The outer loop
then loops around and executes the while
loop again, polling the (now terminated) gRPC stream, which I guess must return Poll::Pending
or something?
I'd like to figure out why returning None
here causes a hang. My guess is that some higher-level component that polls the resolve
stream doesn't expect it to ever return None
? I don't know if leaking streams that will never finish is a great solution.
Of course, it's also possible that my understanding is wrong, or I'm missing something.
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
a28ec48
to
e7ac85e
Compare
} | ||
|
||
// Before polling the resolution, where we could potentially receive | ||
// an `Add`, poll_ready to ensure that `make` is ready to build new | ||
// services. Don't process any updates until we can do so. | ||
ready!(this.make_endpoint.poll_ready(cx)).map_err(Into::into)?; | ||
|
||
if let Some(change) = ready!(this.discover.poll_discover(cx)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hawkw This is where it was hanging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahhh, glad you found that!
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
e7ac85e
to
8307876
Compare
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
9ee1946
to
83a54b4
Compare
terser imports
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good to me! i commented on a couple minor style nits.
if let Some(change) = ready!(this.discover.poll_discover(cx)) { | ||
match change.map_err(Into::into)? { | ||
match ready!(this.discover.poll_discover(cx)) { | ||
Some(change) => match change.map_err(Into::into)? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take it or leave it: we could maybe collapse the two matches into a single match, like this:
match ready!(this.discover.poll_discover(cx)) {
Some(Ok(Change::Insert(key, target))) =>
// Start building the service and continue. If a pending
// service exists for this addr, it will be canceled.
let fut = this.make_endpoint.call(target);
this.make_futures.push(key, fut);
},
Some(Ok(Change::Remove(key))) => {
this.pending_removals.push(key);
},
Some(Err(e)) => return Poll::Ready(Err(e.into())),
None => return Poll::Ready(Ok(None)),
}
which might be a little easier to read...what do you think? Is this clearer?
let update = if exists { | ||
Update::Empty | ||
} else { | ||
Update::DoesNotExist | ||
}; | ||
yield update.into(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, take it or leave it: can this just be
let update = if exists { | |
Update::Empty | |
} else { | |
Update::DoesNotExist | |
}; | |
yield update.into(); | |
if exists { | |
yield Update::Empty.into(); | |
} | |
yield Update::DoesNotExist.into(); |
not a blocker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be unclear on the semantics of yield, but this at least seems like it would always yield DNE after yielding Empty?
In either case, the original shows the intent much more clearly imo.
Going to merge this once it's green, but happy to ship fixups afterwards |
This release includes internal changes to the service discovery system, especially when discovering control plane components (like the destination and identity controllers). Now, the proxy attempts to balance requests across all pods in each control plane service. This requires control plane changes to use "headless" services so that SRV records are exposed. When the control plane services have a `clusterIP` set, the proxy falls back to using normal A-record lookups. --- * tracing: add richer verbose spans to http clients (linkerd/linkerd2-proxy#622) * trace: update tracing dependencies (linkerd/linkerd2-proxy#623) * Remove `Resolution` trait (linkerd/linkerd2-proxy#606) * Update proxy-identity to edge-20.8.2 (linkerd/linkerd2-proxy#627) * Add build arg for skipping identity wrapper (linkerd/linkerd2-proxy#624) * Wait for proxy thread to terminate in integration tests (linkerd/linkerd2-proxy#625) * Remove scrubbing for unused headers (linkerd/linkerd2-proxy#628) * Split orig-proto tests out of discovery tests (linkerd/linkerd2-proxy#629) * Re-enable outbound timeout test (linkerd/linkerd2-proxy#630) * profiles: perform profile resolution for IP addresses (linkerd/linkerd2-proxy#626) * Move resolve api to async-stream (linkerd/linkerd2-proxy#599) * Decouple discovery buffering from endpoint conversion (linkerd/linkerd2-proxy#631) * resolve: Add a Reset state (linkerd/linkerd2-proxy#633) * resolve: Eagerly fail resolutions (linkerd/linkerd2-proxy#634) * test: replace `net2` dependency with `socket2` (linkerd/linkerd2-proxy#635) * dns: Run DNS resolutions on the main runtime (linkerd/linkerd2-proxy#637) * Load balance requests to the control plane (linkerd/linkerd2-proxy#594) * Unify control plane client construction (linkerd/linkerd2-proxy#638)
This release includes internal changes to the service discovery system, especially when discovering control plane components (like the destination and identity controllers). Now, the proxy attempts to balance requests across all pods in each control plane service. This requires control plane changes to use "headless" services so that SRV records are exposed. When the control plane services have a `clusterIP` set, the proxy falls back to using normal A-record lookups. --- * tracing: add richer verbose spans to http clients (linkerd/linkerd2-proxy#622) * trace: update tracing dependencies (linkerd/linkerd2-proxy#623) * Remove `Resolution` trait (linkerd/linkerd2-proxy#606) * Update proxy-identity to edge-20.8.2 (linkerd/linkerd2-proxy#627) * Add build arg for skipping identity wrapper (linkerd/linkerd2-proxy#624) * Wait for proxy thread to terminate in integration tests (linkerd/linkerd2-proxy#625) * Remove scrubbing for unused headers (linkerd/linkerd2-proxy#628) * Split orig-proto tests out of discovery tests (linkerd/linkerd2-proxy#629) * Re-enable outbound timeout test (linkerd/linkerd2-proxy#630) * profiles: perform profile resolution for IP addresses (linkerd/linkerd2-proxy#626) * Move resolve api to async-stream (linkerd/linkerd2-proxy#599) * Decouple discovery buffering from endpoint conversion (linkerd/linkerd2-proxy#631) * resolve: Add a Reset state (linkerd/linkerd2-proxy#633) * resolve: Eagerly fail resolutions (linkerd/linkerd2-proxy#634) * test: replace `net2` dependency with `socket2` (linkerd/linkerd2-proxy#635) * dns: Run DNS resolutions on the main runtime (linkerd/linkerd2-proxy#637) * Load balance requests to the control plane (linkerd/linkerd2-proxy#594) * Unify control plane client construction (linkerd/linkerd2-proxy#638)
This PR implements the suggestion left by @hawkw right here. The result is that the
Resolution
struct is now gone and replaces bympl Stream<Item = Result<resolve::Update<Metadata>, grpc::Status>>
that is generates by thetry_stream
macro