Skip to content

Commit

Permalink
SenderLink close notification (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Jan 19, 2024
1 parent 8b7212c commit 62e259a
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 22 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [1.0.2] - 2024-01-19

* SenderLink close notification

## [1.0.1] - 2024-01-18

* Fix SenderLink closed state, if link is closed remotely
Expand Down
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "1.0.1"
version = "1.0.2"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand All @@ -25,6 +25,7 @@ frame-trace = []

[dependencies]
ntex = "1.0"
ntex-util = "1.0.1"
ntex-amqp-codec = "0.9"

bitflags = "2.4"
Expand Down
1 change: 1 addition & 0 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ impl ConnectionInner {
if self.error.is_none() {
self.error = Some(err);
}
self.on_close.notify_and_lock_readiness();
}

pub(crate) fn post_frame(&mut self, frame: AmqpFrame) {
Expand Down
7 changes: 4 additions & 3 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,10 @@ where
fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
let mut shutdown = self.shutdown.borrow_mut();
if !shutdown.is_some() {
let sink = self.sink.0.get_mut();
sink.on_close.notify();
sink.set_error(AmqpProtocolError::Disconnected);
self.sink
.0
.get_mut()
.set_error(AmqpProtocolError::Disconnected);
let fut = self
.ctl_service
.call_static(ControlFrame::new_kind(ControlFrameKind::Closed));
Expand Down
23 changes: 23 additions & 0 deletions src/rcvlink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,29 @@ impl ReceiverLinkInner {
}
}

#[derive(Debug)]
pub(crate) struct EstablishedReceiverLink(ReceiverLink);

impl EstablishedReceiverLink {
pub(crate) fn new(inner: Cell<ReceiverLinkInner>) -> EstablishedReceiverLink {
EstablishedReceiverLink(ReceiverLink::new(inner))
}
}

impl std::ops::Deref for EstablishedReceiverLink {
type Target = ReceiverLink;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl Drop for EstablishedReceiverLink {
fn drop(&mut self) {
self.0.inner.get_mut().closed = true;
}
}

pub struct ReceiverLinkBuilder {
frame: Attach,
session: Cell<SessionInner>,
Expand Down
34 changes: 20 additions & 14 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ use ntex_amqp_codec::protocol::{
use ntex_amqp_codec::AmqpFrame;

use crate::error::AmqpProtocolError;
use crate::rcvlink::{ReceiverLink, ReceiverLinkBuilder, ReceiverLinkInner};
use crate::sndlink::{DeliveryPromise, SenderLink, SenderLinkBuilder, SenderLinkInner};
use crate::rcvlink::{
EstablishedReceiverLink, ReceiverLink, ReceiverLinkBuilder, ReceiverLinkInner,
};
use crate::sndlink::{
DeliveryPromise, EstablishedSenderLink, SenderLink, SenderLinkBuilder, SenderLinkInner,
};
use crate::{cell::Cell, types::Action, ConnectionRef, ControlFrame};

const INITIAL_OUTGOING_ID: TransferNumber = 0;
Expand Down Expand Up @@ -235,7 +239,7 @@ impl Session {

#[derive(Debug)]
enum SenderLinkState {
Established(SenderLink),
Established(EstablishedSenderLink),
OpeningRemote,
Opening(Option<oneshot::Sender<Result<Cell<SenderLinkInner>, AmqpProtocolError>>>),
Closing(Option<oneshot::Sender<Result<(), AmqpProtocolError>>>),
Expand All @@ -250,7 +254,7 @@ enum ReceiverLinkState {
oneshot::Sender<Result<ReceiverLink, AmqpProtocolError>>,
)>,
),
Established(ReceiverLink),
Established(EstablishedReceiverLink),
Closing(Option<oneshot::Sender<Result<(), AmqpProtocolError>>>),
}

Expand Down Expand Up @@ -388,7 +392,7 @@ impl SessionInner {

self.error = Some(err);
self.flags.insert(Flags::ENDED);
self.closed.notify();
self.closed.notify_and_lock_readiness();
}

/// End session.
Expand All @@ -406,10 +410,10 @@ impl SessionInner {
.iter()
.filter_map(|(_, st)| match st {
Either::Left(SenderLinkState::Established(ref link)) => {
Some(Either::Left(link.clone()))
Some(Either::Left((*link).clone()))
}
Either::Right(ReceiverLinkState::Established(ref link)) => {
Some(Either::Right(link.clone()))
Some(Either::Right((*link).clone()))
}
_ => None,
})
Expand Down Expand Up @@ -482,8 +486,9 @@ impl SessionInner {
*self
.links
.get_mut(token)
.expect("new remote sender entry must exist") =
Either::Left(SenderLinkState::Established(SenderLink::new(link.clone())));
.expect("new remote sender entry must exist") = Either::Left(
SenderLinkState::Established(EstablishedSenderLink::new(link.clone())),
);

let attach = Attach(Box::new(codec::AttachInner {
name: attach.0.name.clone(),
Expand Down Expand Up @@ -695,7 +700,7 @@ impl SessionInner {
desired_capabilities: None,
properties: None,
}));
*link = ReceiverLinkState::Established(ReceiverLink::new(l));
*link = ReceiverLinkState::Established(EstablishedReceiverLink::new(l));
self.post_frame(attach.into());
return;
}
Expand Down Expand Up @@ -812,7 +817,7 @@ impl SessionInner {
.and_then(|h| self.links.get_mut(h))
{
if let SenderLinkState::Established(ref link) = link {
return Ok(Action::Flow(link.clone(), flow));
return Ok(Action::Flow((*link).clone(), flow));
}
log::warn!("{}: Received flow frame", self.tag());
}
Expand Down Expand Up @@ -920,7 +925,7 @@ impl SessionInner {
));
let local_sender = std::mem::replace(
item,
SenderLinkState::Established(SenderLink::new(link.clone())),
SenderLinkState::Established(EstablishedSenderLink::new(link.clone())),
);

if let SenderLinkState::Opening(Some(tx)) = local_sender {
Expand All @@ -941,8 +946,9 @@ impl SessionInner {
if let Some((link, tx)) = opt_item.take() {
self.remote_handles.insert(attach.handle(), *index);

*item =
ReceiverLinkState::Established(ReceiverLink::new(link.clone()));
*item = ReceiverLinkState::Established(
EstablishedReceiverLink::new(link.clone()),
);
let _ = tx.send(Ok(ReceiverLink::new(link)));
} else {
// TODO: close session
Expand Down
36 changes: 32 additions & 4 deletions src/sndlink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,8 @@ impl SenderLinkInner {

self.closed = true;
self.error = Some(err);
self.on_close.notify();
self.on_credit.notify();
self.on_close.notify_and_lock_readiness();
self.on_credit.notify_and_lock_readiness();
}

pub(crate) fn close(
Expand All @@ -322,8 +322,8 @@ impl SenderLinkInner {
Either::Left(Ready::Ok(()))
} else {
self.closed = true;
self.on_close.notify();
self.on_credit.notify();
self.on_close.notify_and_lock_readiness();
self.on_credit.notify_and_lock_readiness();

let (tx, rx) = oneshot::channel();

Expand Down Expand Up @@ -603,6 +603,34 @@ impl SenderLinkInner {
}
}

#[derive(Debug)]
pub(crate) struct EstablishedSenderLink(SenderLink);

impl EstablishedSenderLink {
pub(crate) fn new(inner: Cell<SenderLinkInner>) -> EstablishedSenderLink {
EstablishedSenderLink(SenderLink::new(inner))
}
}

impl std::ops::Deref for EstablishedSenderLink {
type Target = SenderLink;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl Drop for EstablishedSenderLink {
fn drop(&mut self) {
let inner = self.0.inner.get_mut();
if !inner.closed {
inner.closed = true;
inner.on_close.notify_and_lock_readiness();
inner.on_credit.notify_and_lock_readiness();
}
}
}

pub struct SenderLinkBuilder {
frame: Attach,
session: Cell<SessionInner>,
Expand Down

0 comments on commit 62e259a

Please sign in to comment.