buffer: Port idleness changes to tokio-0.2#505
Conversation
Applies the changes in #502 to the tokio-0.2 branch.
hawkw
left a comment
There was a problem hiding this comment.
Overall, this looks like it was ported to std::future correctly! I had a couple minor nits, but they're not blockers.
Also, I noticed that the formatting of the IdleError appears to be incorrect when the timeout is >= 1 second. We should probably fix that on master as well.
Once that's fixed, this LGTM!
| // .push_spawn_buffer_with_idle_timeout( | ||
| // buffer_capacity, | ||
| // cache_max_idle_age, | ||
| // ) |
There was a problem hiding this comment.
Thanks for copying over the changes to commented out code as well, that will make my life much easier later :)
|
I took another pass at this to simplify |
| idle_timeout: Option<Duration>, | ||
| ) -> ( | ||
| Buffer<Req, S::Future>, | ||
| impl std::future::Future<Output = ()> + Send + 'static, |
There was a problem hiding this comment.
nit: import Future at the stop instead of using the full path here?
| S::Response: Send + 'static, | ||
| S::Future: Send + 'static, | ||
| { | ||
| use futures::future; |
There was a problem hiding this comment.
Similar comment about having this with the use statements
| let idle = move || match idle_timeout { | ||
| Some(t) => future::Either::Left(dispatch::idle(t)), | ||
| None => future::Either::Right(future::pending()), | ||
| }; |
There was a problem hiding this comment.
Any reason this is a closure and not just pass it in directly to dispatch::run? If passed directly it wouldn't need to to evaluate it each iteration
There was a problem hiding this comment.
Oh is this so idle can be repeatedly polled in the loop? This would move the value if we passed in idle: I instead of idle: Fn() -> I
There was a problem hiding this comment.
Right, this is a factory for idles, so they can be reset every time we start waiting for a request.
When `linkerd2-buffer` was updated to `std::future` in PR #505, the behaviour of the buffer was changed subtly. The previous implementation of the buffer's `Dispatch` task was _poll-based_; it implemented its logic in an implementation of `Future::poll` with the following behavior: 1. Call `poll_ready` on the underlying service, returning `NotReady` if it is not ready. 2. Broadcast readiness to senders. 3. Call `poll_next` on the channel of requests. If a request is received, dispatch it to the service. If no request is ready, return `NotReady` (yield). Since this was an implementation of the `poll` function, if we yield due to the request channel being empty, when we are woken again by the next request, we resume _at the beginning of the `poll` function_. The new implementation, however, was written using async/await syntax. Async/await generates a state machine which, when woken after yielding at an await point, resumes _from the same await point it yielded at_. This means that if the new implementation yields because the request channel is empty, when it is woken by a request, it will **not** drive the service to readiness before sending that request. Instead, the previously acquired readiness from before the task yielded is consumed by that request. This behavior is totally fine with regards to the `tower-service` readiness contract. All the contract requires is that a call to `poll_ready` must return `Ready` before each call to `call`. It doesn't matter if there was a long period of time in between `poll_ready` and `call`, as long as the readiness was not consumed by another `call`. However, it is **not** fine from the perspective of the load balancer. The load balancer relies on `poll_ready` to drive updates from service discovery. This means that if a long period of time passes between when the balancer becomes ready and when it is called, it may have a stale service discovery state. Therefore, this change in behavior broke a large number of the proxy's integration tests that expect changes to service discovery state to be reflected in a timely manner. This commit fixes this issue by updating the new `dispatch::run` implementation to drive the service to readiness immediately before dispatching a request. Once the service is driven to readiness initially, we advertise that it is ready, and call `try_recv` on the request channel. If there is a request already in the channel, we can consume the existing readiness. Otherwise, if there is not a request immediately available, and we have to wait on the channel, we will drive the service to readiness again before calling it. This ensures that service discovery changes are reflected for the next request after they occur, rather than for the request _after_ that request. Signed-off-by: Eliza Weisman <[email protected]>
When `linkerd2-buffer` was updated to `std::future` in PR #505, the behaviour of the buffer was changed subtly. The previous implementation of the buffer's `Dispatch` task was _poll-based_; it implemented its logic in an implementation of `Future::poll` with the following behavior: 1. Call `poll_ready` on the underlying service, returning `NotReady` if it is not ready. 2. Broadcast readiness to senders. 3. Call `poll_next` on the channel of requests. If a request is received, dispatch it to the service. If no request is ready, return `NotReady` (yield). Since this was an implementation of the `poll` function, if we yield due to the request channel being empty, when we are woken again by the next request, we resume _at the beginning of the `poll` function_. The new implementation, however, was written using async/await syntax. Async/await generates a state machine which, when woken after yielding at an await point, resumes _from the same await point it yielded at_. This means that if the new implementation yields because the request channel is empty, when it is woken by a request, it will **not** drive the service to readiness before sending that request. Instead, the previously acquired readiness from before the task yielded is consumed by that request. This behavior is totally fine with regards to the `tower-service` readiness contract. All the contract requires is that a call to `poll_ready` must return `Ready` before each call to `call`. It doesn't matter if there was a long period of time in between `poll_ready` and `call`, as long as the readiness was not consumed by another `call`. However, it is **not** fine from the perspective of the load balancer. The load balancer relies on `poll_ready` to drive updates from service discovery. This means that if a long period of time passes between when the balancer becomes ready and when it is called, it may have a stale service discovery state. Therefore, this change in behavior broke a large number of the proxy's integration tests that expect changes to service discovery state to be reflected in a timely manner. This commit fixes this issue by updating the new `dispatch::run` implementation to drive the service to readiness immediately before dispatching a request. Once the service is driven to readiness initially, we advertise that it is ready, and call `try_recv` on the request channel. If there is a request already in the channel, we can consume the existing readiness. Otherwise, if there is not a request immediately available, and we have to wait on the channel, we will drive the service to readiness again before calling it. This ensures that service discovery changes are reflected for the next request after they occur, rather than for the request _after_ that request. Signed-off-by: Eliza Weisman <[email protected]> Co-authored-by: Oliver Gould <[email protected]>
Applies the changes in #502 to the tokio-0.2 branch.