Skip to content

Commit

Permalink
Add tests for responding to Node::*cast methods
Browse files Browse the repository at this point in the history
Add mocks for echo broadcast handle and use them in tests
  • Loading branch information
pool2win committed Oct 9, 2024
1 parent ce515b6 commit 11d52ea
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 39 deletions.
69 changes: 57 additions & 12 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
// along with Frost-Federation. If not, see
// <https://www.gnu.org/licenses/>.

use self::echo_broadcast::{start_echo_broadcast, EchoBroadcastHandle};
use self::protocol::{Broadcast, RoundOnePackage};
#[mockall_double::double]
use self::echo_broadcast::EchoBroadcastHandle;
use self::protocol::Broadcast;
use self::{membership::MembershipHandle, protocol::Message};
use crate::node::echo_broadcast::service::EchoBroadcast;
use crate::node::protocol::{MembershipMessage, RoundOnePackageMessage};
Expand Down Expand Up @@ -67,16 +68,17 @@ impl Node {
pub async fn new() -> Self {
let bind_address = "localhost".to_string();
let message_id_generator = MessageIdGenerator::new(bind_address.clone());
let echo_broadcast_handle = start_echo_broadcast().await;
let echo_broadcast_handle = EchoBroadcastHandle::start().await;
let state = State {
membership_handle: MembershipHandle::start(bind_address.clone()).await,
message_id_generator,
};
Node {
seeds: vec!["localhost:6680".to_string()],
bind_address: bind_address.clone(),
static_key_pem: String::new(),
delivery_timeout: 500,
state: State {
membership_handle: MembershipHandle::start(bind_address).await,
message_id_generator,
},
state,
echo_broadcast_handle,
}
}
Expand Down Expand Up @@ -385,7 +387,7 @@ impl Node {

let protocol_service = protocol::Protocol::new(node_id.clone(), state.clone());
let echo_broadcast_service =
EchoBroadcast::new(protocol_service, echo_broadcast_handle.clone(), state);
EchoBroadcast::new(protocol_service, echo_broadcast_handle, state);
let timeout_layer =
tower::timeout::TimeoutLayer::new(tokio::time::Duration::from_millis(timeout));
let _ = timeout_layer
Expand All @@ -397,22 +399,30 @@ impl Node {

#[cfg(test)]
mod node_tests {
use super::{membership, Node};
use super::Node;
#[mockall_double::double]
use crate::node::echo_broadcast::EchoBroadcastHandle;
use crate::node::membership::MembershipHandle;
use crate::node::protocol::message_id_generator::MessageIdGenerator;
use crate::node::protocol::{Message, PingMessage};
use crate::node::protocol::message_id_generator::{MessageId, MessageIdGenerator};
use crate::node::protocol::{Message, PingMessage, RoundOnePackageMessage};
#[mockall_double::double]
use crate::node::reliable_sender::ReliableSenderHandle;
use futures::FutureExt;

#[tokio::test]
async fn it_should_return_well_formed_node_id() {
let ctx = EchoBroadcastHandle::start_context();
ctx.expect().returning(|| EchoBroadcastHandle::default());

let node = Node::new().await;
assert_eq!(node.get_node_id(), "localhost");
}

#[tokio::test]
async fn it_should_create_nodew_with_config() {
async fn it_should_create_node_with_config() {
let ctx = EchoBroadcastHandle::start_context();
ctx.expect().returning(|| EchoBroadcastHandle::default());

let node = Node::new()
.await
.seeds(vec![
Expand All @@ -433,6 +443,9 @@ mod node_tests {

#[tokio::test]
async fn it_should_start_listen_without_error() {
let ctx = EchoBroadcastHandle::start_context();
ctx.expect().returning(|| EchoBroadcastHandle::default());

mockall::mock! {
TcpListener{}
}
Expand All @@ -442,6 +455,9 @@ mod node_tests {

#[tokio::test]
async fn it_should_respond_to_unicast_messages() {
let ctx = EchoBroadcastHandle::start_context();
ctx.expect().returning(|| EchoBroadcastHandle::default());

let unicast_message: Message = PingMessage::default().into();
let mut reliable_sender_handle = ReliableSenderHandle::default();

Expand All @@ -465,4 +481,33 @@ mod node_tests {
)
.await;
}

#[tokio::test]
async fn it_should_respond_to_broadcast_messages() {
let ctx = EchoBroadcastHandle::start_context();
ctx.expect().returning(|| EchoBroadcastHandle::default());

let broadcast_message = crate::node::protocol::Broadcast::RoundOnePackage(
RoundOnePackageMessage::new("local".into(), "hello".into()),
Some(MessageId(1)),
);
let mut echo_broadcast_handle = EchoBroadcastHandle::default();
echo_broadcast_handle
.expect_clone()
.returning(EchoBroadcastHandle::default);
echo_broadcast_handle
.expect_send()
.return_once(|_, _| Ok(()));

let membership_handle = MembershipHandle::start("local".into()).await;
let state = super::State::new(membership_handle, MessageIdGenerator::new("local".into()));
let res = Node::respond_to_broadcast_message(
"local".into(),
100,
broadcast_message,
echo_broadcast_handle,
state,
)
.await;
}
}
52 changes: 28 additions & 24 deletions src/node/echo_broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ use super::connection::{ConnectionResult, ConnectionResultSender};
use super::membership::ReliableSenderMap;
use super::protocol::message_id_generator::MessageId;
use super::protocol::message_id_generator::MessageIdGenerator;
use super::protocol::{Broadcast, NetworkMessage};
use super::protocol::NetworkMessage;
use crate::node::protocol::Message;
#[mockall_double::double]
use crate::node::reliable_sender::ReliableSenderHandle;
use serde::Serialize;
use std::collections::HashMap;
use std::error::Error;
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::sync::{mpsc, oneshot};

pub mod service;

Expand Down Expand Up @@ -184,26 +184,19 @@ pub(crate) struct EchoBroadcastHandle {
sender: mpsc::Sender<EchoBroadcastMessage>,
}

/// Start the echo broadcast actor by listening to any messages on the
/// receiver channel
pub async fn start_echo_broadcast() -> EchoBroadcastHandle {
let (tx, rx) = mpsc::channel(512);
let mut actor = EchoBroadcastActor::start(rx);
tokio::spawn(async move {
while let Some(message) = actor.command_receiver.recv().await {
actor.handle_message(message).await;
}
});
EchoBroadcastHandle { sender: tx }
}

/// Handle for the echo broadcast actor
impl EchoBroadcastHandle {
pub fn start(
message_id_generator: MessageIdGenerator,
actor_tx: mpsc::Sender<EchoBroadcastMessage>,
) -> Self {
Self { sender: actor_tx }
/// Start the echo broadcast actor by listening to any messages on the
/// receiver channel
pub async fn start() -> Self {
let (tx, rx) = mpsc::channel(512);
let mut actor = EchoBroadcastActor::start(rx);
tokio::spawn(async move {
while let Some(message) = actor.command_receiver.recv().await {
actor.handle_message(message).await;
}
});
Self { sender: tx }
}

/// Keep the same signature to send, so we can convert that into a Trait later if we want.
Expand Down Expand Up @@ -237,12 +230,23 @@ impl EchoBroadcastHandle {
}
}

mockall::mock! {
pub EchoBroadcastHandle{
pub async fn start() -> Self;
pub async fn send(&self, message: Message, members: ReliableSenderMap) -> ConnectionResult<()>;
pub async fn receive(&self, message: Message, message_id: MessageId) -> ConnectionResult<()>;
}

impl Clone for EchoBroadcastHandle {
fn clone(&self) -> Self;
}
}

#[cfg(test)]
mod echo_broadcast_actor_tests {
use futures::FutureExt;

use super::*;
use crate::node::protocol::RoundOnePackageMessage;
use crate::node::protocol::{Broadcast, RoundOnePackageMessage};
use futures::FutureExt;

#[tokio::test]
async fn it_should_create_actor_with_echos_setup() {
Expand Down Expand Up @@ -331,7 +335,7 @@ mod echo_broadcast_actor_tests {
("a".to_string(), first_reliable_sender_handle),
("b".to_string(), second_reliable_sender_handle),
]);
let echo_bcast_handle = start_echo_broadcast().await;
let echo_bcast_handle = EchoBroadcastHandle::start().await;

let result = echo_bcast_handle.send(msg, reliable_senders_map).await;
assert!(result.is_err());
Expand Down
11 changes: 8 additions & 3 deletions src/node/echo_broadcast/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
// along with Frost-Federation. If not, see
// <https://www.gnu.org/licenses/>.

use crate::node::echo_broadcast::{EchoBroadcastHandle, EchoBroadcastMessage};
#[mockall_double::double]
use crate::node::echo_broadcast::EchoBroadcastHandle;
use crate::node::protocol::Message;
use crate::node::state::State;
use futures::Future;
Expand Down Expand Up @@ -85,7 +86,8 @@ mod echo_broadcast_service_tests {
use tower::ServiceExt;

use super::*;
use crate::node::echo_broadcast::start_echo_broadcast;
#[mockall_double::double]
use crate::node::echo_broadcast::EchoBroadcastHandle;
use crate::node::membership::MembershipHandle;
use crate::node::protocol::message_id_generator::MessageIdGenerator;
use crate::node::protocol::{HeartbeatMessage, Protocol};
Expand All @@ -105,7 +107,10 @@ mod echo_broadcast_service_tests {
.await;
let message_id_generator = MessageIdGenerator::new("localhost".to_string());
let state = State::new(membership_handle, message_id_generator);
let echo_bcast_handle = start_echo_broadcast().await;
let mut echo_bcast_handle = EchoBroadcastHandle::default();
echo_bcast_handle
.expect_clone()
.returning(EchoBroadcastHandle::default);
let message = HeartbeatMessage {
sender_id: "localhost".into(),
time: SystemTime::now(),
Expand Down

0 comments on commit 11d52ea

Please sign in to comment.