From 2c7e8becf0890bdac8a3629366078ecac07a2751 Mon Sep 17 00:00:00 2001 From: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com> Date: Thu, 7 Jul 2022 11:49:55 +0200 Subject: [PATCH 1/2] feat(timeout): add a timeout which can also timeout on poll_ready Signed-off-by: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com> --- tower/src/timeout/future.rs | 4 +- tower/src/timeout/layer.rs | 25 +++++++++++- tower/src/timeout/mod.rs | 80 ++++++++++++++++++++++++++++++++++++- 3 files changed, 105 insertions(+), 4 deletions(-) diff --git a/tower/src/timeout/future.rs b/tower/src/timeout/future.rs index b4eb3f4e3..6ee7e950c 100644 --- a/tower/src/timeout/future.rs +++ b/tower/src/timeout/future.rs @@ -18,12 +18,12 @@ pin_project! { #[pin] response: T, #[pin] - sleep: Sleep, + sleep: Pin>, } } impl ResponseFuture { - pub(crate) fn new(response: T, sleep: Sleep) -> Self { + pub(crate) fn new(response: T, sleep: Pin>) -> Self { ResponseFuture { response, sleep } } } diff --git a/tower/src/timeout/layer.rs b/tower/src/timeout/layer.rs index 3397fd85f..27334ad90 100644 --- a/tower/src/timeout/layer.rs +++ b/tower/src/timeout/layer.rs @@ -1,4 +1,4 @@ -use super::Timeout; +use super::{GlobalTimeout, Timeout}; use std::time::Duration; use tower_layer::Layer; @@ -22,3 +22,26 @@ impl Layer for TimeoutLayer { Timeout::new(service, self.timeout) } } + +/// Applies a timeout to requests via the supplied inner service (including poll_ready). +/// The main difference with [`TimeoutLayer`] is that we're starting the timeout before the `poll_ready` call of the inner service. +/// Which means you can use it with the `RateLimitLayer` service if you want to abort a rate limited call +#[derive(Debug, Clone)] +pub struct GlobalTimeoutLayer { + timeout: Duration, +} + +impl GlobalTimeoutLayer { + /// Create a timeout from a duration + pub fn new(timeout: Duration) -> Self { + GlobalTimeoutLayer { timeout } + } +} + +impl Layer for GlobalTimeoutLayer { + type Service = GlobalTimeout; + + fn layer(&self, service: S) -> Self::Service { + GlobalTimeout::new(service, self.timeout) + } +} diff --git a/tower/src/timeout/mod.rs b/tower/src/timeout/mod.rs index 0e65a3f65..2095aa2e5 100644 --- a/tower/src/timeout/mod.rs +++ b/tower/src/timeout/mod.rs @@ -7,11 +7,17 @@ pub mod error; pub mod future; mod layer; +use crate::timeout::error::Elapsed; + +pub use self::layer::GlobalTimeoutLayer; pub use self::layer::TimeoutLayer; use self::future::ResponseFuture; +use futures_core::future::Future; +use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; +use tokio::time::Sleep; use tower_service::Service; /// Applies a timeout to requests. @@ -65,6 +71,78 @@ where let response = self.inner.call(request); let sleep = tokio::time::sleep(self.timeout); - ResponseFuture::new(response, sleep) + ResponseFuture::new(response, Box::pin(sleep)) + } +} + +/// Applies a timeout to requests (including poll_ready). +/// The main difference with [`Timeout`] is that we're starting the timeout before the `poll_ready` call of the inner service. +/// Which means you can use it with the `RateLimit` service if you want to abort a rate limited call +#[derive(Debug)] +pub struct GlobalTimeout { + inner: T, + timeout: Duration, + sleep: Option>>, +} + +// ===== impl GlobalTimeout ===== + +impl GlobalTimeout { + /// Creates a new [`GlobalTimeout`] + pub fn new(inner: T, timeout: Duration) -> Self { + GlobalTimeout { + inner, + timeout, + sleep: None, + } + } +} + +impl Service for GlobalTimeout +where + S: Service, + S::Error: Into, +{ + type Response = S::Response; + type Error = crate::BoxError; + type Future = ResponseFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + if self.sleep.is_none() { + self.sleep = Some(Box::pin(tokio::time::sleep(self.timeout))); + } + match self.inner.poll_ready(cx) { + Poll::Pending => {} + Poll::Ready(r) => return Poll::Ready(r.map_err(Into::into)), + }; + + // Checking if we don't timeout on `poll_ready` + if Pin::new( + &mut self + .sleep + .as_mut() + .expect("we can unwrap because we set it just before"), + ) + .poll(cx) + .is_ready() + { + tracing::trace!("timeout exceeded."); + self.sleep = None; + + return Poll::Ready(Err(Elapsed::new().into())); + } + + Poll::Pending + } + + fn call(&mut self, request: Request) -> Self::Future { + let response = self.inner.call(request); + + ResponseFuture::new( + response, + self.sleep + .take() + .expect("poll_ready must been called before"), + ) } } From 9f5470ff3f49489d607ecd026fe274d647aed48c Mon Sep 17 00:00:00 2001 From: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com> Date: Fri, 8 Jul 2022 09:43:48 +0200 Subject: [PATCH 2/2] fix dependencies Signed-off-by: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com> --- tower/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tower/Cargo.toml b/tower/Cargo.toml index f80b7b1fb..7ab12f1aa 100644 --- a/tower/Cargo.toml +++ b/tower/Cargo.toml @@ -61,7 +61,7 @@ reconnect = ["make", "tokio/io-std", "tracing"] retry = ["__common", "tokio/time"] spawn-ready = ["__common", "futures-util", "tokio/sync", "tokio/rt", "util", "tracing"] steer = [] -timeout = ["pin-project-lite", "tokio/time"] +timeout = ["pin-project-lite", "tokio/time", "futures-core", "tracing"] util = ["__common", "futures-util", "pin-project-lite"] [dependencies]