From 557fab91c183974ea326a697ad132a389e019eb9 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 19 Jan 2024 17:07:24 +0600 Subject: [PATCH] SenderLink close notification --- CHANGES.md | 4 ++++ Cargo.toml | 3 ++- src/connection.rs | 1 + src/dispatcher.rs | 7 ++++--- src/rcvlink.rs | 23 +++++++++++++++++++++++ src/session.rs | 34 ++++++++++++++++++++-------------- src/sndlink.rs | 36 ++++++++++++++++++++++++++++++++---- 7 files changed, 86 insertions(+), 22 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 7d7a765..66a6ea0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [1.0.2] - 2024-01-19 + +* SenderLink close notification + ## [1.0.1] - 2024-01-18 * Fix SenderLink closed state, if link is closed remotely diff --git a/Cargo.toml b/Cargo.toml index 91af647..c7a8a08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-amqp" -version = "1.0.1" +version = "1.0.2" authors = ["ntex contributors "] description = "AMQP 1.0 Client/Server framework" documentation = "https://docs.rs/ntex-amqp" @@ -25,6 +25,7 @@ frame-trace = [] [dependencies] ntex = "1.0" +ntex-util = "1.0.1" ntex-amqp-codec = "0.9" bitflags = "2.4" diff --git a/src/connection.rs b/src/connection.rs index b20241b..1a42cac 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -263,6 +263,7 @@ impl ConnectionInner { if self.error.is_none() { self.error = Some(err); } + self.on_close.notify_and_lock_readiness(); } pub(crate) fn post_frame(&mut self, frame: AmqpFrame) { diff --git a/src/dispatcher.rs b/src/dispatcher.rs index 5b22f80..f4af04a 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -256,9 +256,10 @@ where fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> { let mut shutdown = self.shutdown.borrow_mut(); if !shutdown.is_some() { - let sink = self.sink.0.get_mut(); - sink.on_close.notify(); - sink.set_error(AmqpProtocolError::Disconnected); + self.sink + .0 + .get_mut() + .set_error(AmqpProtocolError::Disconnected); let fut = self .ctl_service .call_static(ControlFrame::new_kind(ControlFrameKind::Closed)); diff --git a/src/rcvlink.rs b/src/rcvlink.rs index 11c3be1..acfa8ff 100644 --- a/src/rcvlink.rs +++ b/src/rcvlink.rs @@ -415,6 +415,29 @@ impl ReceiverLinkInner { } } +#[derive(Debug)] +pub(crate) struct EstablishedReceiverLink(ReceiverLink); + +impl EstablishedReceiverLink { + pub(crate) fn new(inner: Cell) -> EstablishedReceiverLink { + EstablishedReceiverLink(ReceiverLink::new(inner)) + } +} + +impl std::ops::Deref for EstablishedReceiverLink { + type Target = ReceiverLink; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Drop for EstablishedReceiverLink { + fn drop(&mut self) { + self.0.inner.get_mut().closed = true; + } +} + pub struct ReceiverLinkBuilder { frame: Attach, session: Cell, diff --git a/src/session.rs b/src/session.rs index 6677225..0b14ed9 100644 --- a/src/session.rs +++ b/src/session.rs @@ -12,8 +12,12 @@ use ntex_amqp_codec::protocol::{ use ntex_amqp_codec::AmqpFrame; use crate::error::AmqpProtocolError; -use crate::rcvlink::{ReceiverLink, ReceiverLinkBuilder, ReceiverLinkInner}; -use crate::sndlink::{DeliveryPromise, SenderLink, SenderLinkBuilder, SenderLinkInner}; +use crate::rcvlink::{ + EstablishedReceiverLink, ReceiverLink, ReceiverLinkBuilder, ReceiverLinkInner, +}; +use crate::sndlink::{ + DeliveryPromise, EstablishedSenderLink, SenderLink, SenderLinkBuilder, SenderLinkInner, +}; use crate::{cell::Cell, types::Action, ConnectionRef, ControlFrame}; const INITIAL_OUTGOING_ID: TransferNumber = 0; @@ -235,7 +239,7 @@ impl Session { #[derive(Debug)] enum SenderLinkState { - Established(SenderLink), + Established(EstablishedSenderLink), OpeningRemote, Opening(Option, AmqpProtocolError>>>), Closing(Option>>), @@ -250,7 +254,7 @@ enum ReceiverLinkState { oneshot::Sender>, )>, ), - Established(ReceiverLink), + Established(EstablishedReceiverLink), Closing(Option>>), } @@ -388,7 +392,7 @@ impl SessionInner { self.error = Some(err); self.flags.insert(Flags::ENDED); - self.closed.notify(); + self.closed.notify_and_lock_readiness(); } /// End session. @@ -406,10 +410,10 @@ impl SessionInner { .iter() .filter_map(|(_, st)| match st { Either::Left(SenderLinkState::Established(ref link)) => { - Some(Either::Left(link.clone())) + Some(Either::Left((*link).clone())) } Either::Right(ReceiverLinkState::Established(ref link)) => { - Some(Either::Right(link.clone())) + Some(Either::Right((*link).clone())) } _ => None, }) @@ -482,8 +486,9 @@ impl SessionInner { *self .links .get_mut(token) - .expect("new remote sender entry must exist") = - Either::Left(SenderLinkState::Established(SenderLink::new(link.clone()))); + .expect("new remote sender entry must exist") = Either::Left( + SenderLinkState::Established(EstablishedSenderLink::new(link.clone())), + ); let attach = Attach(Box::new(codec::AttachInner { name: attach.0.name.clone(), @@ -695,7 +700,7 @@ impl SessionInner { desired_capabilities: None, properties: None, })); - *link = ReceiverLinkState::Established(ReceiverLink::new(l)); + *link = ReceiverLinkState::Established(EstablishedReceiverLink::new(l)); self.post_frame(attach.into()); return; } @@ -812,7 +817,7 @@ impl SessionInner { .and_then(|h| self.links.get_mut(h)) { if let SenderLinkState::Established(ref link) = link { - return Ok(Action::Flow(link.clone(), flow)); + return Ok(Action::Flow((*link).clone(), flow)); } log::warn!("{}: Received flow frame", self.tag()); } @@ -920,7 +925,7 @@ impl SessionInner { )); let local_sender = std::mem::replace( item, - SenderLinkState::Established(SenderLink::new(link.clone())), + SenderLinkState::Established(EstablishedSenderLink::new(link.clone())), ); if let SenderLinkState::Opening(Some(tx)) = local_sender { @@ -941,8 +946,9 @@ impl SessionInner { if let Some((link, tx)) = opt_item.take() { self.remote_handles.insert(attach.handle(), *index); - *item = - ReceiverLinkState::Established(ReceiverLink::new(link.clone())); + *item = ReceiverLinkState::Established( + EstablishedReceiverLink::new(link.clone()), + ); let _ = tx.send(Ok(ReceiverLink::new(link))); } else { // TODO: close session diff --git a/src/sndlink.rs b/src/sndlink.rs index 8c9de07..a0bf3a1 100644 --- a/src/sndlink.rs +++ b/src/sndlink.rs @@ -310,8 +310,8 @@ impl SenderLinkInner { self.closed = true; self.error = Some(err); - self.on_close.notify(); - self.on_credit.notify(); + self.on_close.notify_and_lock_readiness(); + self.on_credit.notify_and_lock_readiness(); } pub(crate) fn close( @@ -322,8 +322,8 @@ impl SenderLinkInner { Either::Left(Ready::Ok(())) } else { self.closed = true; - self.on_close.notify(); - self.on_credit.notify(); + self.on_close.notify_and_lock_readiness(); + self.on_credit.notify_and_lock_readiness(); let (tx, rx) = oneshot::channel(); @@ -603,6 +603,34 @@ impl SenderLinkInner { } } +#[derive(Debug)] +pub(crate) struct EstablishedSenderLink(SenderLink); + +impl EstablishedSenderLink { + pub(crate) fn new(inner: Cell) -> EstablishedSenderLink { + EstablishedSenderLink(SenderLink::new(inner)) + } +} + +impl std::ops::Deref for EstablishedSenderLink { + type Target = SenderLink; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Drop for EstablishedSenderLink { + fn drop(&mut self) { + let inner = self.0.inner.get_mut(); + if !inner.closed { + inner.closed = true; + inner.on_close.notify_and_lock_readiness(); + inner.on_credit.notify_and_lock_readiness(); + } + } +} + pub struct SenderLinkBuilder { frame: Attach, session: Cell,