Skip endpoint resolution when profile lookup is rejected#691
Conversation
When the service profile client skips resolution or the server responds with InvalidArgument, we now synthesize a default profile response.
When the resolver rejects resolution, we currently propagate that error so that it can be handled via fallback. And due to recent HTTP router changes, these resolution errors can propagate up across splits, etc. This change simplifies this behavior by isntead synthesizing a resolution with a default endpoint. The `not_http` reason has been removed, as it's no longer useful.
hawkw
left a comment
There was a problem hiding this comment.
this all seems good to me after reading over it. there were a couple of things that seemed kind of weird but beyond a couple small nits i think this looks right.
| let tcp = svc::stack(tcp_connect) | ||
| .push_make_thunk() | ||
| .push_on_response(svc::layer::mk(tcp::Forward::new)) | ||
| .instrument(|_: &TcpEndpoint| debug_span!("tcp.forward")) |
There was a problem hiding this comment.
nit/TIOLI: the TCP balance stack has a span that's just called tcp, should we change that to tcp.balance or something by analogy?
| type Future = | ||
| Pin<Box<dyn Future<Output = Result<Option<profiles::Receiver>, Error>> + Send + 'static>>; |
There was a problem hiding this comment.
nit/TIOLI: the closure in the or_else doesn't capture anything, so i think this could be
| type Future = | |
| Pin<Box<dyn Future<Output = Result<Option<profiles::Receiver>, Error>> + Send + 'static>>; | |
| type Future = future::OrElse< | |
| S::Future, | |
| future::Ready<Result<Self::Output, Self::Error>> | |
| fn(S::Error) -> future::Ready<Result<Self::Output, Self::Error>>, | |
| >; |
if we wanted to remove the box. in practice, i don't know if the box here matters at all, but since we're already using combinators with a non-capturing closure, i thought it was worth pointing out.
| let permitted = (&t).into().and_then(|addr| { | ||
| if self.0.matches(&addr) { | ||
| Some(addr) | ||
| } else { | ||
| None | ||
| } | ||
| }); | ||
| tracing::debug!(permitted = %permitted.is_some(), "Resolve"); | ||
| permitted.ok_or_else(|| Rejected(()).into()) |
There was a problem hiding this comment.
i was a little surprised at first that there are two separate implementations for resolve and profiles which look equivalent after a quick skim, but i didn't notice the Option until i took a second look at it. it does occur to me that we could probably only have one permit implementation if we added Into<Option<Addr>> impls that always return Some for profile targets.
i don't think this is like, important --- the small amount of duplication is totally fine with me, i was just surprised until i looked closer.
There was a problem hiding this comment.
I had something like that initially but i think it's a bit clearer to have them separated (for now at least).
| inner: None, | ||
| services: ReadyCache::default(), | ||
| } | ||
| let inner = match Into::<Option<Receiver>>::into(&target) { |
There was a problem hiding this comment.
nit/tioli: personally i might have written this with an early return in the None case instead of putting most of the code inside the match, like
let rx = match Into::<Option<Receiver>>::into(&target) {
Some(rx) => rx,
None => {
trace!("Building default service");
return Self { inner: Inner::Default(self.inner.new_service((None, target))) };
},
};
let mut targets = rx.borrow().targets.clone();
// ...but i'm not actually sure if this is any clearer...
| let mut update = None; | ||
| while let Poll::Ready(Some(up)) = rx.poll_recv_ref(cx) { | ||
| update = Some(up.clone()); | ||
| } | ||
|
|
||
| // If, somehow, the watch hasn't been notified at least once, build the | ||
| // default target. This shouldn't actually be exercised, though. | ||
| if self.inner.is_none() { | ||
| self.update_inner(Vec::new()); | ||
| } | ||
| debug_assert_ne!(self.services.len(), 0); | ||
| // Every time the profile updates, rebuild the distribution, reusing | ||
| // services that existed in the prior state. | ||
| if let Some(Profile { mut targets, .. }) = update { | ||
| if targets.len() == 0 { | ||
| targets.push(Target { | ||
| addr: target.into(), | ||
| weight: 1, | ||
| }) | ||
| } | ||
| debug!(?targets, "Updating"); | ||
|
|
||
| let mut prior_addrs = | ||
| std::mem::replace(addrs, IndexSet::with_capacity(targets.len())); | ||
| let mut weights = Vec::with_capacity(targets.len()); | ||
|
|
||
| // Create an updated distribution and set of services. | ||
| for Target { weight, addr } in targets.into_iter() { | ||
| // Reuse the prior services whenever possible. | ||
| if !prior_addrs.remove(&addr) { | ||
| debug!(%addr, "Creating target"); | ||
| let svc = new_service.new_service((Some(addr.clone()), target.clone())); | ||
| services.push(addr.clone(), svc); | ||
| } else { | ||
| trace!(%addr, "Target already exists"); | ||
| } | ||
| addrs.insert(addr); | ||
| weights.push(weight); | ||
| } | ||
|
|
||
| *distribution = WeightedIndex::new(weights).unwrap(); | ||
|
|
||
| for addr in prior_addrs.into_iter() { | ||
| services.evict(&addr); | ||
| } | ||
| } | ||
|
|
||
| // Wait for all target services to be ready. If any services fail, then | ||
| // the whole service fails. | ||
| Poll::Ready(ready!(self.services.poll_pending(cx)).map_err(Into::into)) | ||
| // Wait for all target services to be ready. If any services fail, then | ||
| // the whole service fails. | ||
| Poll::Ready(ready!(services.poll_pending(cx)).map_err(Into::into)) |
There was a problem hiding this comment.
this logic used to all live in update_inner and was just moved here as a result of inner becoming an enum, right? the diff didn't make that super obvious so i wanted to double check
There was a problem hiding this comment.
There's no functional change, but the block has been cleaned up slightly (mostly pushing a default target on to targets when there are none) to reduce duplication.
| let mut targets = rx.borrow().targets.clone(); | ||
| if targets.len() == 0 { | ||
| targets.push(Target { | ||
| addr: (&target).into(), | ||
| weight: 1, | ||
| }) | ||
| } | ||
| trace!(?targets, "Building split service"); | ||
|
|
||
| let mut addrs = IndexSet::with_capacity(targets.len()); | ||
| let mut weights = Vec::with_capacity(targets.len()); | ||
| let mut services = ReadyCache::default(); | ||
| let mut new_service = self.inner.clone(); | ||
| for Target { weight, addr } in targets.into_iter() { | ||
| services.push( | ||
| addr.clone(), | ||
| new_service.new_service((Some(addr.clone()), target.clone())), | ||
| ); | ||
| addrs.insert(addr); | ||
| weights.push(weight); | ||
| } |
There was a problem hiding this comment.
i guess the only real change in this file besides adding the default case is that we now call new_service eagerly in new_service on splits, instead of doing it lazily in the first poll_ready?
this seems better regardless; less moving parts and new_service is expected to be called eagerly. i can't imagine anything relying on it happening in poll_ready?
When profile lookup is rejected, this error propagates to the stack and is handled by falling back to an alternate stack. In order to move profile discovery out of the HTTP stack and onto the initial TCP accept stack, we want to avoid this sort of fallback behavior. So this change modifies profile discovery to make the profile optional. When a profile lookup is rejected/skipped, we now simply return a `None` profile; and in these cases we avoid performing endpoint resolution by omitting a concrete address.
hawkw
left a comment
There was a problem hiding this comment.
looks great, thanks for adding comments! i noticed a couple copyedits but the change all makes sense
This release includes changes to TCP metrics to ensure that peer identities are encoded via the `client_id` and `server_id` labels. --- * outbound: Explicitly ignore the source address for tap (linkerd/linkerd2-proxy#680) * Update proxy-api and tonic (linkerd/linkerd2-proxy#682) * http: Lazily build http/tcp stacks (linkerd/linkerd2-proxy#681) * outbound: Remove required identity from HttpLogical (linkerd/linkerd2-proxy#683) * profiles: Expose the fully_qualified_name (linkerd/linkerd2-proxy#684) * request-filter: Support altering the request type (linkerd/linkerd2-proxy#685) * tracing: Set contexts in new_service/make_service (linkerd/linkerd2-proxy#686) * discover: Allow resolution streams to terminate (linkerd/linkerd2-proxy#689) * metrics: add peer identities to all TLS metric labels (linkerd/linkerd2-proxy#687) * outbound: Return a default endpoint on reject (linkerd/linkerd2-proxy#690) * Skip endpoint resolution when profile lookup is rejected (linkerd/linkerd2-proxy#691)
This release includes changes to TCP metrics to ensure that peer identities are encoded via the `client_id` and `server_id` labels. --- * outbound: Explicitly ignore the source address for tap (linkerd/linkerd2-proxy#680) * Update proxy-api and tonic (linkerd/linkerd2-proxy#682) * http: Lazily build http/tcp stacks (linkerd/linkerd2-proxy#681) * outbound: Remove required identity from HttpLogical (linkerd/linkerd2-proxy#683) * profiles: Expose the fully_qualified_name (linkerd/linkerd2-proxy#684) * request-filter: Support altering the request type (linkerd/linkerd2-proxy#685) * tracing: Set contexts in new_service/make_service (linkerd/linkerd2-proxy#686) * discover: Allow resolution streams to terminate (linkerd/linkerd2-proxy#689) * metrics: add peer identities to all TLS metric labels (linkerd/linkerd2-proxy#687) * outbound: Return a default endpoint on reject (linkerd/linkerd2-proxy#690) * Skip endpoint resolution when profile lookup is rejected (linkerd/linkerd2-proxy#691)
When profile lookup is rejected, this error propagates to the stack and
is handled by falling back to an alternate stack.
In order to move profile discovery out of the HTTP stack and onto the
initial TCP accept stack, we want to avoid this sort of fallback
behavior. So this change modifies profile discovery to make the profile
optional. When a profile lookup is rejected/skipped, we now simply
return a
Noneprofile; and in these cases we avoid performing endpointresolution by omitting a concrete address.