diff --git a/CHANGES.md b/CHANGES.md index aa30dfb..9b88447 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [3.0.0] - 2024-05-28 + +* Use ntex-service 3.0 + ## [2.1.7] - 2024-05-12 * Cleanup pending transfers and deliveries on link detach diff --git a/Cargo.toml b/Cargo.toml index 28c9ac2..68619fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-amqp" -version = "2.1.7" +version = "3.0.0" authors = ["ntex contributors "] description = "AMQP 1.0 Client/Server framework" documentation = "https://docs.rs/ntex-amqp" @@ -24,8 +24,7 @@ default = [] frame-trace = [] [dependencies] -ntex = "1" -ntex-util = "1.0.1" +ntex = "2" ntex-amqp-codec = "0.9" bitflags = "2" @@ -38,7 +37,7 @@ uuid = { version = "1", features = ["v4"] } [dev-dependencies] env_logger = "0.11" rand = "0.8" -ntex = { version = "1", features = ["tokio"] } +ntex = { version = "2", features = ["tokio"] } ntex-amqp = { path = ".", features = ["frame-trace"] } [patch.crates-io] diff --git a/src/dispatcher.rs b/src/dispatcher.rs index f10f5dc..f10dcef 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -1,9 +1,11 @@ use std::collections::VecDeque; -use std::{cell, future::Future, marker, pin::Pin, rc::Rc, task::Context, task::Poll}; +use std::{ + cell, future::poll_fn, future::Future, marker, pin::Pin, rc::Rc, task::Context, task::Poll, +}; -use ntex::service::{Pipeline, PipelineCall, Service, ServiceCtx}; +use ntex::service::{Pipeline, PipelineBinding, PipelineCall, Service, ServiceCtx}; use ntex::time::{sleep, Millis, Sleep}; -use ntex::util::{ready, BoxFuture, Either}; +use ntex::util::{ready, Either}; use ntex::{io::DispatchItem, rt::spawn, task::LocalWaker}; use crate::codec::{protocol::Frame, AmqpCodec, AmqpFrame}; @@ -24,13 +26,12 @@ impl ControlQueue { } /// Amqp server dispatcher service. -pub(crate) struct Dispatcher> { +pub(crate) struct Dispatcher, Ctl: Service> { sink: Connection, - service: Pipeline, - ctl_service: Pipeline, + service: PipelineBinding, + ctl_service: PipelineBinding, ctl_fut: cell::RefCell)>>, ctl_queue: Rc, - shutdown: cell::RefCell>>, expire: Sleep, idle_timeout: Millis, } @@ -51,17 +52,16 @@ where Dispatcher { sink, idle_timeout, - service, - ctl_service, ctl_queue, + service: service.bind(), + ctl_service: ctl_service.bind(), ctl_fut: cell::RefCell::new(Vec::new()), - shutdown: cell::RefCell::new(None), expire: sleep(idle_timeout), } } fn call_control_service(&self, frame: ControlFrame) { - let fut = self.ctl_service.call_static(frame.clone()); + let fut = self.ctl_service.call(frame.clone()); self.ctl_fut.borrow_mut().push((frame, fut)); self.ctl_queue.waker.wake(); } @@ -152,7 +152,7 @@ where let frm = frm.clone(); let fut = self .service - .call_static(types::Message::Attached(frm.clone(), link.clone())); + .call(types::Message::Attached(frm.clone(), link.clone())); let _ = ntex::rt::spawn(async move { let result = fut.await; if let Err(err) = result { @@ -200,80 +200,72 @@ where type Response = Option; type Error = AmqpDispatcherError; - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - self.ctl_queue.waker.register(cx.waker()); + async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + poll_fn(|cx| { + self.ctl_queue.waker.register(cx.waker()); - // idle timeout - self.handle_idle_timeout(cx); + // idle timeout + self.handle_idle_timeout(cx); - // process control frame - let mut control_fut_pending = !self.handle_control_fut(cx)?; + // process control frame + let mut control_fut_pending = !self.handle_control_fut(cx)?; - // check readiness - let service_poll = self.service.poll_ready(cx).map_err(|err| { - let err = Error::from(err); - log::error!( - "{}: Publish service readiness check failed: {:?}", - self.sink.tag(), - err - ); - let _ = self.sink.close_with_error(err); - AmqpDispatcherError::Service - })?; + // check readiness + let service_poll = self.service.poll_ready(cx).map_err(|err| { + let err = Error::from(err); + log::error!( + "{}: Publish service readiness check failed: {:?}", + self.sink.tag(), + err + ); + let _ = self.sink.close_with_error(err); + AmqpDispatcherError::Service + })?; - let ctl_service_poll = self.ctl_service.poll_ready(cx).map_err(|err| { - let err = Error::from(err); - log::error!( - "{}: Control service readiness check failed: {:?}", - self.sink.tag(), - err - ); - let _ = self.sink.close_with_error(err); - AmqpDispatcherError::Service - })?; + let ctl_service_poll = self.ctl_service.poll_ready(cx).map_err(|err| { + let err = Error::from(err); + log::error!( + "{}: Control service readiness check failed: {:?}", + self.sink.tag(), + err + ); + let _ = self.sink.close_with_error(err); + AmqpDispatcherError::Service + })?; - // enqueue pending control frames - if ctl_service_poll.is_ready() && !self.ctl_queue.pending.borrow().is_empty() { - self.ctl_queue - .pending - .borrow_mut() - .drain(..) - .for_each(|frame| { - self.call_control_service(frame); - }); - control_fut_pending = true; - } + // enqueue pending control frames + if ctl_service_poll.is_ready() && !self.ctl_queue.pending.borrow().is_empty() { + self.ctl_queue + .pending + .borrow_mut() + .drain(..) + .for_each(|frame| { + self.call_control_service(frame); + }); + control_fut_pending = true; + } - if control_fut_pending || service_poll.is_pending() || ctl_service_poll.is_pending() { - Poll::Pending - } else { - Poll::Ready(Ok(())) - } + if control_fut_pending || service_poll.is_pending() || ctl_service_poll.is_pending() { + Poll::Pending + } else { + Poll::Ready(Ok(())) + } + }) + .await } - fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> { - let mut shutdown = self.shutdown.borrow_mut(); - if !shutdown.is_some() { - self.sink - .0 - .get_mut() - .set_error(AmqpProtocolError::Disconnected); - let fut = self - .ctl_service - .call_static(ControlFrame::new_kind(ControlFrameKind::Closed)); - *shutdown = Some(Box::pin(async move { - let _ = fut.await; - })); - } + async fn shutdown(&self) { + self.sink + .0 + .get_mut() + .set_error(AmqpProtocolError::Disconnected); + let _ = self + .ctl_service + .call(ControlFrame::new_kind(ControlFrameKind::Closed)) + .await; - let res0 = shutdown.as_mut().expect("guard above").as_mut().poll(cx); - let res1 = self.service.poll_shutdown(cx); - let res2 = self.ctl_service.poll_shutdown(cx); - if res0.is_pending() || res1.is_pending() || res2.is_pending() { - Poll::Pending - } else { - Poll::Ready(()) - } + self.service.shutdown().await; + self.ctl_service.shutdown().await; } async fn call( @@ -334,7 +326,7 @@ where } types::Action::DetachReceiver(link, frm) => { let lnk = link.clone(); - let fut = self.service.call_static(types::Message::Detached(lnk)); + let fut = self.service.call(types::Message::Detached(lnk)); let _ = spawn(async move { let _ = fut.await; }); @@ -352,9 +344,7 @@ where }) .collect(); - let fut = self - .service - .call_static(types::Message::DetachedAll(receivers)); + let fut = self.service.call(types::Message::DetachedAll(receivers)); let _ = spawn(async move { let _ = fut.await; }); diff --git a/src/router.rs b/src/router.rs index d9ba7e1..98cb443 100644 --- a/src/router.rs +++ b/src/router.rs @@ -1,4 +1,4 @@ -use std::{future::poll_fn, marker, rc::Rc}; +use std::{marker, rc::Rc}; use ntex::router::{IntoPattern, Router as PatternRouter}; use ntex::service::{ @@ -141,34 +141,41 @@ impl Service for RouterService { log::trace!("Releasing handler service for {}", link.name()); let name = link.name().clone(); let _ = ntex::rt::spawn(async move { - poll_fn(move |cx| srv.poll_shutdown(cx)).await; + srv.shutdown().await; log::trace!("Handler service for {} has shutdown", name); }); } Ok(()) } Message::DetachedAll(links) => { - let futs: Vec<_> = links + let links: Vec<_> = links .into_iter() .filter_map(|link| { - self.0.get_mut().handlers.remove(&link).and_then(|srv| { - srv.map(|srv| { - log::trace!( - "Releasing handler service for {} (session ended)", - link.name() - ); - poll_fn(move |cx| srv.poll_shutdown(cx)) - }) - }) + self.0 + .get_mut() + .handlers + .remove(&link) + .and_then(move |srv| srv.map(|srv| (link, srv))) }) .collect(); log::trace!( "Shutting down {} handler services (session ended)", - futs.len() + links.len() ); let _ = ntex::rt::spawn(async move { + let futs: Vec<_> = links + .iter() + .map(|(link, srv)| { + log::trace!( + "Releasing handler service for {} (session ended)", + link.name() + ); + srv.shutdown() + }) + .collect(); + let len = futs.len(); let _ = join_all(futs).await; log::trace!( @@ -279,8 +286,8 @@ where type Response = Outcome; type Error = Error; - ntex::forward_poll_ready!(service); - ntex::forward_poll_shutdown!(service); + ntex::forward_ready!(service); + ntex::forward_shutdown!(service); async fn call( &self, diff --git a/src/server/service.rs b/src/server/service.rs index 06c2ed9..1e26417 100644 --- a/src/server/service.rs +++ b/src/server/service.rs @@ -231,8 +231,15 @@ where type Response = (); type Error = ServerError; - ntex::forward_poll_ready!(handshake, ServerError::Service); - ntex::forward_poll_shutdown!(handshake); + #[inline] + async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + self.handshake.ready().await.map_err(ServerError::Service) + } + + #[inline] + async fn shutdown(&self) { + self.handshake.shutdown().await + } async fn call( &self, @@ -256,9 +263,17 @@ where type Response = (); type Error = ServerError; - ntex::forward_poll_ready!(handshake, ServerError::Service); - ntex::forward_poll_shutdown!(handshake); + #[inline] + async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + self.handshake.ready().await.map_err(ServerError::Service) + } + + #[inline] + async fn shutdown(&self) { + self.handshake.shutdown().await + } + #[inline] async fn call( &self, req: IoBoxed,