Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 7 additions & 16 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -896,11 +896,13 @@ dependencies = [
name = "linkerd2-buffer"
version = "0.1.0"
dependencies = [
"futures 0.1.26",
"futures 0.3.4",
"linkerd2-error",
"tokio 0.1.22",
"tower 0.1.1",
"tower-test 0.1.0",
"pin-project",
"tokio 0.2.17",
"tokio-test",
"tower 0.3.1",
"tower-test",
"tracing",
"tracing-futures 0.1.0",
]
Expand Down Expand Up @@ -1490,7 +1492,7 @@ dependencies = [
"tokio-connect",
"tokio-test",
"tower 0.3.1",
"tower-test 0.3.0",
"tower-test",
"tracing",
]

Expand Down Expand Up @@ -3035,17 +3037,6 @@ dependencies = [
"tower-util 0.1.0",
]

[[package]]
name = "tower-test"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99a0b25036e2c58f681a04a5b312e9283d229445ad5e1221a1b7a4630df6fbdf"
dependencies = [
"futures 0.1.26",
"tokio-sync",
"tower-service 0.2.0",
]

[[package]]
name = "tower-test"
version = "0.3.0"
Expand Down
22 changes: 11 additions & 11 deletions linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,17 +191,17 @@ impl<S> Stack<S> {
// self.push(stack::MakeReadyLayer::new())
// }

// /// Buffer requests when when the next layer is out of capacity.
// pub fn spawn_buffer<Req>(self, capacity: usize) -> Stack<buffer::Buffer<Req, S::Future>>
// where
// Req: Send + 'static,
// S: Service<Req> + Send + 'static,
// S::Response: Send + 'static,
// S::Error: Into<Error> + Send + Sync,
// S::Future: Send,
// {
// self.push(buffer::SpawnBufferLayer::new(capacity))
// }
/// Buffer requests when when the next layer is out of capacity.
pub fn spawn_buffer<Req>(self, capacity: usize) -> Stack<buffer::Buffer<Req, S::Future>>
where
Req: Send + 'static,
S: Service<Req> + Send + 'static,
S::Response: Send + 'static,
S::Error: Into<Error> + Send + Sync,
S::Future: Send,
{
self.push(buffer::SpawnBufferLayer::new(capacity))
}

// /// Assuming `S` implements `NewService` or `MakeService`, applies the given
// /// `L`-typed layer on each service produced by `S`.
Expand Down
12 changes: 7 additions & 5 deletions linkerd/buffer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ edition = "2018"
publish = false

[dependencies]
futures = "0.1"
futures = "0.3"
linkerd2-error = { path = "../error" }
tokio = "0.1"
tower = "0.1"
tokio = { version = "0.2", features = ["sync", "stream", "macros"] }
tower = "0.3"
tracing = "0.1"
tracing-futures = "0.1"
tracing-futures = { version = "0.1", features = ["std-future"] }
pin-project = "0.4"

[dev-dependencies]
tower-test = "0.1"
tower-test = "0.3"
tokio-test = "0.2"
71 changes: 40 additions & 31 deletions linkerd/buffer/src/dispatch.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use crate::error::ServiceError;
use crate::InFlight;
use futures::{Async, Future, Poll, Stream};
use linkerd2_error::{Error, Never};
use std::sync::Arc;
use linkerd2_error::Error;
use pin_project::pin_project;
use std::task::{Context, Poll};
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::sync::{mpsc, watch};
use tracing::trace;

/// A future that drives the inner service.
#[pin_project]
pub struct Dispatch<S, Req, F> {
inner: Option<S>,
#[pin]
rx: mpsc::Receiver<InFlight<Req, F>>,
ready: watch::Sender<Poll<(), ServiceError>>,
ready: watch::Sender<Poll<Result<(), ServiceError>>>,
}

impl<S, Req> Dispatch<S, Req, S::Future>
Expand All @@ -23,7 +26,7 @@ where
pub(crate) fn new(
inner: S,
rx: mpsc::Receiver<InFlight<Req, S::Future>>,
ready: watch::Sender<Poll<(), ServiceError>>,
ready: watch::Sender<Poll<Result<(), ServiceError>>>,
) -> Self {
Self {
inner: Some(inner),
Expand All @@ -36,7 +39,7 @@ where
macro_rules! return_ready {
() => {{
trace!("Complete");
return Ok(Async::Ready(()));
return Poll::Ready(());
}};
}

Expand All @@ -55,76 +58,82 @@ where
S::Response: Send + 'static,
S::Future: Send + 'static,
{
type Item = ();
type Error = Never;
type Output = ();

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut this = self.project();
// Complete the task when all services have dropped.
return_ready_if!(self.ready.poll_close().expect("must not fail").is_ready());
return_ready_if!({
// `watch::Sender::poll_close` is private in `tokio::sync`.
let closed = this.ready.closed();
tokio::pin!(closed);
closed.poll(cx).is_ready()
});

// Drive requests from the queue to the inner service.
loop {
let ready = match self.inner.as_mut() {
Some(inner) => inner.poll_ready(),
let ready = match this.inner.as_mut() {
Some(inner) => inner.poll_ready(cx),
None => {
// This is safe because ready.poll_close has returned NotReady.
return Ok(Async::NotReady);
// This is safe because ready.closed() has returned Pending.
return Poll::Pending;
}
};

match ready {
// If it's not ready, wait for it..
Ok(Async::NotReady) => {
return_ready_if!(self.ready.broadcast(Ok(Async::NotReady)).is_err());
Poll::Pending => {
return_ready_if!(this.ready.broadcast(Poll::Pending).is_err());

trace!("Waiting for inner service");
return Ok(Async::NotReady);
return Poll::Pending;
}

// If the service fails, propagate the failure to all pending
// requests and then complete.
Err(error) => {
Poll::Ready(Err(error)) => {
let shared = ServiceError(Arc::new(error.into()));
trace!(%shared, "Inner service failed");

// First, notify services of the readiness change to prevent new requests from
// being buffered.
let is_active = self.ready.broadcast(Err(shared.clone())).is_ok();
let is_active = this
.ready
.broadcast(Poll::Ready(Err(shared.clone())))
.is_ok();

// Propagate the error to all in-flight requests.
while let Ok(Async::Ready(Some(InFlight { tx, .. }))) = self.rx.poll() {
while let Poll::Ready(Some(InFlight { tx, .. })) = this.rx.poll_recv(cx) {
let _ = tx.send(Err(shared.clone().into()));
}

// Drop the inner Service to free its resources. It won't be used again.
self.inner = None;
let _ = this.inner.take();

// Ensure the task remains active until all services have observed the error.
return_ready_if!(!is_active);

// This is safe because ready.poll_close has returned NotReady. The task will
// This is safe because ready.closed() has returned Pending. The task will
// complete when all observes have dropped their interest in `ready`.
return Ok(Async::NotReady);
return Poll::Pending;
}

// If inner service can receive requests, start polling the channel.
Ok(Async::Ready(())) => {
return_ready_if!(self.ready.broadcast(Ok(Async::Ready(()))).is_err());
Poll::Ready(Ok(())) => {
return_ready_if!(this.ready.broadcast(Poll::Ready(Ok(()))).is_err());
trace!("Ready for requests");
}
}

// The inner service is ready, so poll for new requests.
match self.rx.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),

match futures::ready!(this.rx.poll_recv(cx)) {
// All senders have been dropped, complete.
Err(_) | Ok(Async::Ready(None)) => return_ready!(),
None => return_ready!(),

// If a request was ready return it to the caller.
Ok(Async::Ready(Some(InFlight { request, tx }))) => {
Some(InFlight { request, tx }) => {
trace!("Dispatching a request");
let fut = self
let fut = this
.inner
.as_mut()
.expect("Service must not be dropped")
Expand Down
3 changes: 1 addition & 2 deletions linkerd/buffer/src/layer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::Buffer;
use futures::Future;
use linkerd2_error::Error;
use tracing_futures::Instrument;

Expand Down Expand Up @@ -38,7 +37,7 @@ where

fn layer(&self, inner: S) -> Self::Service {
let (buffer, dispatch) = crate::new(inner, self.capacity);
tokio::spawn(dispatch.in_current_span().map_err(|n| match n {}));
tokio::spawn(dispatch.in_current_span());
buffer
}
}
Loading