From 8b7212c807a555b82e77c93fa5a81b0c1f0615bc Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 19 Jan 2024 02:57:54 +0600 Subject: [PATCH] More tests --- src/sndlink.rs | 1 + tests/test_server.rs | 110 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+) diff --git a/src/sndlink.rs b/src/sndlink.rs index d050802..8c9de07 100644 --- a/src/sndlink.rs +++ b/src/sndlink.rs @@ -311,6 +311,7 @@ impl SenderLinkInner { self.closed = true; self.error = Some(err); self.on_close.notify(); + self.on_credit.notify(); } pub(crate) fn close( diff --git a/tests/test_server.rs b/tests/test_server.rs index 092eaac..32f5660 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -264,3 +264,113 @@ async fn test_link_detach() -> std::io::Result<()> { Ok(()) } + +#[ntex::test] +async fn test_link_detach_on_session_end() -> std::io::Result<()> { + let _ = env_logger::try_init(); + + let srv = test_server(move || { + server::Server::build(|con: server::Handshake| async move { + match con { + server::Handshake::Amqp(con) => { + let con = con.open().await.unwrap(); + Ok(con.ack(())) + } + server::Handshake::Sasl(_) => Err(()), + } + }) + .finish( + server::Router::<()>::new() + .service( + "test", + fn_factory_with_config(|link: types::Link<()>| async move { + rt::spawn(async move { + sleep(Millis(150)).await; + let _ = link.session().end().await; + }); + + Ok::<_, LinkError>(boxed::service(fn_service(|_req| async move { + Ok::<_, LinkError>(types::Outcome::Accept) + }))) + }), + ) + .finish(), + ) + }); + + let uri = Uri::try_from(format!("amqp://{}:{}", srv.addr().ip(), srv.addr().port())).unwrap(); + let client = client::Connector::new().connect(uri).await.unwrap(); + + let sink = client.sink(); + ntex::rt::spawn(async move { + let _ = client.start_default().await; + }); + + let session = sink.open_session().await.unwrap(); + let link = session + .build_sender_link("test", "test") + .attach() + .await + .unwrap(); + + link.on_close().await; + assert!(link.is_closed()); + assert!(!link.is_opened()); + + Ok(()) +} + +#[ntex::test] +async fn test_link_detach_on_disconnect() -> std::io::Result<()> { + let _ = env_logger::try_init(); + + let srv = test_server(move || { + server::Server::build(|con: server::Handshake| async move { + match con { + server::Handshake::Amqp(con) => { + let con = con.open().await.unwrap(); + Ok(con.ack(())) + } + server::Handshake::Sasl(_) => Err(()), + } + }) + .finish( + server::Router::<()>::new() + .service( + "test", + fn_factory_with_config(|link: types::Link<()>| async move { + rt::spawn(async move { + sleep(Millis(150)).await; + let _ = link.session().connection().close().await; + }); + + Ok::<_, LinkError>(boxed::service(fn_service(|_req| async move { + Ok::<_, LinkError>(types::Outcome::Accept) + }))) + }), + ) + .finish(), + ) + }); + + let uri = Uri::try_from(format!("amqp://{}:{}", srv.addr().ip(), srv.addr().port())).unwrap(); + let client = client::Connector::new().connect(uri).await.unwrap(); + + let sink = client.sink(); + ntex::rt::spawn(async move { + let _ = client.start_default().await; + }); + + let session = sink.open_session().await.unwrap(); + let link = session + .build_sender_link("test", "test") + .attach() + .await + .unwrap(); + + link.on_close().await; + assert!(link.is_closed()); + assert!(!link.is_opened()); + + Ok(()) +}