Skip to content

Commit

Permalink
More tests
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Jan 18, 2024
1 parent 3921603 commit 8b7212c
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/sndlink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ impl SenderLinkInner {
self.closed = true;
self.error = Some(err);
self.on_close.notify();
self.on_credit.notify();
}

pub(crate) fn close(
Expand Down
110 changes: 110 additions & 0 deletions tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,113 @@ async fn test_link_detach() -> std::io::Result<()> {

Ok(())
}

#[ntex::test]
async fn test_link_detach_on_session_end() -> std::io::Result<()> {
let _ = env_logger::try_init();

let srv = test_server(move || {
server::Server::build(|con: server::Handshake| async move {
match con {
server::Handshake::Amqp(con) => {
let con = con.open().await.unwrap();
Ok(con.ack(()))
}
server::Handshake::Sasl(_) => Err(()),
}
})
.finish(
server::Router::<()>::new()
.service(
"test",
fn_factory_with_config(|link: types::Link<()>| async move {
rt::spawn(async move {
sleep(Millis(150)).await;
let _ = link.session().end().await;
});

Ok::<_, LinkError>(boxed::service(fn_service(|_req| async move {
Ok::<_, LinkError>(types::Outcome::Accept)
})))
}),
)
.finish(),
)
});

let uri = Uri::try_from(format!("amqp://{}:{}", srv.addr().ip(), srv.addr().port())).unwrap();
let client = client::Connector::new().connect(uri).await.unwrap();

let sink = client.sink();
ntex::rt::spawn(async move {
let _ = client.start_default().await;
});

let session = sink.open_session().await.unwrap();
let link = session
.build_sender_link("test", "test")
.attach()
.await
.unwrap();

link.on_close().await;
assert!(link.is_closed());
assert!(!link.is_opened());

Ok(())
}

#[ntex::test]
async fn test_link_detach_on_disconnect() -> std::io::Result<()> {
let _ = env_logger::try_init();

let srv = test_server(move || {
server::Server::build(|con: server::Handshake| async move {
match con {
server::Handshake::Amqp(con) => {
let con = con.open().await.unwrap();
Ok(con.ack(()))
}
server::Handshake::Sasl(_) => Err(()),
}
})
.finish(
server::Router::<()>::new()
.service(
"test",
fn_factory_with_config(|link: types::Link<()>| async move {
rt::spawn(async move {
sleep(Millis(150)).await;
let _ = link.session().connection().close().await;
});

Ok::<_, LinkError>(boxed::service(fn_service(|_req| async move {
Ok::<_, LinkError>(types::Outcome::Accept)
})))
}),
)
.finish(),
)
});

let uri = Uri::try_from(format!("amqp://{}:{}", srv.addr().ip(), srv.addr().port())).unwrap();
let client = client::Connector::new().connect(uri).await.unwrap();

let sink = client.sink();
ntex::rt::spawn(async move {
let _ = client.start_default().await;
});

let session = sink.open_session().await.unwrap();
let link = session
.build_sender_link("test", "test")
.attach()
.await
.unwrap();

link.on_close().await;
assert!(link.is_closed());
assert!(!link.is_opened());

Ok(())
}

0 comments on commit 8b7212c

Please sign in to comment.