Skip to content

Commit

Permalink
Run init protocol in an async task
Browse files Browse the repository at this point in the history
  • Loading branch information
pool2win committed Oct 13, 2024
1 parent 896f06a commit 002d76e
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 67 deletions.
69 changes: 14 additions & 55 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ use self::echo_broadcast::EchoBroadcastHandle;
use self::protocol::BroadcastProtocol;
use self::{membership::MembershipHandle, protocol::Message};
use crate::node::echo_broadcast::service::EchoBroadcast;
use crate::node::noise_handler::{NoiseHandler, NoiseIO};
use crate::node::protocol::init::initialize;
use crate::node::protocol::{MembershipMessage, RoundOnePackageMessage};
use crate::node::reliable_sender::service::ReliableSend;
use crate::node::reliable_sender::ReliableNetworkMessage;
#[mockall_double::double]
use crate::node::reliable_sender::ReliableSenderHandle;
use crate::node::state::State;
use crate::node::{
noise_handler::{NoiseHandler, NoiseIO},
protocol::HandshakeMessage,
};
#[mockall_double::double]
use connection::ConnectionHandle;
use protocol::message_id_generator::MessageIdGenerator;
Expand Down Expand Up @@ -192,62 +190,23 @@ impl Node {
}

let node_id = self.get_node_id();

// let handshake_service = protocol::Protocol::new(node_id.clone(), self.state.clone());
// let reliable_sender_service =
// ReliableSend::new(handshake_service, reliable_sender_handle.clone());
// let timeout_layer = tower::timeout::TimeoutLayer::new(
// tokio::time::Duration::from_millis(self.delivery_timeout),
// );
// let _ = timeout_layer
// .layer(reliable_sender_service)
// .oneshot(HandshakeMessage::default().into())
// .await;

// log::info!("Handshake finished");

let round_one_service = protocol::Protocol::new(node_id.clone(), self.state.clone());
let echo_broadcast_service = EchoBroadcast::new(
round_one_service,
self.echo_broadcast_handle.clone(),
self.state.clone(),
self.get_node_id(),
);

log::info!("Sending echo broadcast");

let _ = echo_broadcast_service
.oneshot(
RoundOnePackageMessage::new(
self.get_node_id(),
"hello from round one package".into(),
)
.into(),
let state = self.state.clone();
let echo_broadcast_handle = self.echo_broadcast_handle.clone();
let delivery_timeout = self.delivery_timeout;
let reliable_sender_handle = reliable_sender_handle.clone();
tokio::spawn(async move {
let _ = initialize(
node_id,
state,
echo_broadcast_handle,
reliable_sender_handle,
delivery_timeout,
)
.await;

log::info!("Echo broadcast finished");
let _ = self.send_membership(reliable_sender_handle).await;

log::info!("Membership sent");
});
}
}

pub(crate) async fn send_membership(&self, sender: ReliableSenderHandle) {
log::info!("Sending membership information");
let protocol_service =
protocol::Protocol::new(self.get_node_id().clone(), self.state.clone());
let reliable_sender_service = ReliableSend::new(protocol_service, sender);
let timeout_layer = tower::timeout::TimeoutLayer::new(tokio::time::Duration::from_millis(
self.delivery_timeout,
));
let res = timeout_layer
.layer(reliable_sender_service)
.oneshot(MembershipMessage::new(self.get_node_id().clone(), None).into())
.await;
log::debug!("Membership sending result {:?}", res);
}

/// Connect to all peers and start reader writer tasks
pub async fn connect_to_seeds(&mut self) -> Result<(), Box<dyn Error>> {
log::debug!("Connecting to seeds...");
Expand Down
24 changes: 12 additions & 12 deletions src/node/echo_broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub mod service;
type EchosMap = HashMap<MessageId, HashMap<String, bool>>;

/// Message types for echo broadcast actor -> handle communication
pub(crate) enum EchoBroadcast {
pub(crate) enum EchoBroadcastMessage {
Send {
data: Message,
members: ReliableSenderMap,
Expand All @@ -56,15 +56,15 @@ pub(crate) struct EchoBroadcastActor {
/// A map from message id to the members in a map: message id -> [node id -> [reliable sender handles]]
reliable_senders: HashMap<MessageId, ReliableSenderMap>,
/// RX for actor to receive requests on
command_receiver: mpsc::Receiver<EchoBroadcast>,
command_receiver: mpsc::Receiver<EchoBroadcastMessage>,
/// Map of echo messages received from the nodes in membership when this broadcast was initiated
message_echos: EchosMap,
/// TX for the message id's echo broadcast to finish
message_client_txs: HashMap<MessageId, ConnectionResultSender>,
}

impl EchoBroadcastActor {
pub fn start(receiver: mpsc::Receiver<EchoBroadcast>) -> Self {
pub fn start(receiver: mpsc::Receiver<EchoBroadcastMessage>) -> Self {
Self {
command_receiver: receiver,
message_echos: EchosMap::new(),
Expand All @@ -73,24 +73,24 @@ impl EchoBroadcastActor {
}
}

pub async fn handle_message(&mut self, message: EchoBroadcast) {
pub async fn handle_message(&mut self, message: EchoBroadcastMessage) {
match message {
EchoBroadcast::Send {
EchoBroadcastMessage::Send {
data,
respond_to,
members,
} => {
let _ = self.send_message(data, members, respond_to).await;
}
EchoBroadcast::EchoSend {
EchoBroadcastMessage::EchoSend {
data,
members,
respond_to,
} => {
// send echo to all members
let _ = self.send_message(data.clone(), members, respond_to).await;
}
EchoBroadcast::EchoReceive { data, peer_addr } => {
EchoBroadcastMessage::EchoReceive { data, peer_addr } => {
// manage echo data structures and confirm delivered
log::debug!("Handle received echo in actor");
self.handle_received_echo(data, peer_addr).await;
Expand Down Expand Up @@ -196,7 +196,7 @@ impl EchoBroadcastActor {
/// broadcast was originally sent.
#[derive(Clone)]
pub(crate) struct EchoBroadcastHandle {
sender: mpsc::Sender<EchoBroadcast>,
sender: mpsc::Sender<EchoBroadcastMessage>,
}

/// Handle for the echo broadcast actor
Expand All @@ -217,7 +217,7 @@ impl EchoBroadcastHandle {
/// Keep the same signature to send, so we can convert that into a Trait later if we want.
pub async fn send(&self, message: Message, members: ReliableSenderMap) -> ConnectionResult<()> {
let (sender_from_actor, receiver_from_actor) = oneshot::channel();
let msg = EchoBroadcast::Send {
let msg = EchoBroadcastMessage::Send {
data: message,
members,
respond_to: sender_from_actor,
Expand Down Expand Up @@ -245,7 +245,7 @@ impl EchoBroadcastHandle {
members: ReliableSenderMap,
) -> ConnectionResult<()> {
let (sender_from_actor, receiver_from_actor) = oneshot::channel();
let msg = EchoBroadcast::EchoSend {
let msg = EchoBroadcastMessage::EchoSend {
data: message,
members,
respond_to: sender_from_actor,
Expand All @@ -265,7 +265,7 @@ impl EchoBroadcastHandle {
/// Receive an echo broadcast message from connection
/// Pass this to the actor, which will respond to the echo
pub async fn receive_echo(&self, message: Message, peer_addr: String) -> ConnectionResult<()> {
let msg = EchoBroadcast::EchoReceive {
let msg = EchoBroadcastMessage::EchoReceive {
data: message,
peer_addr,
};
Expand Down Expand Up @@ -417,7 +417,7 @@ mod echo_broadcast_actor_tests {
MessageId(1),
);

let msg = EchoBroadcast::EchoReceive {
let msg = EchoBroadcastMessage::EchoReceive {
data: msg,
peer_addr: "a".into(),
};
Expand Down
1 change: 1 addition & 0 deletions src/node/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ extern crate serde;

mod handshake;
mod heartbeat;
pub mod init;
mod membership;
pub(crate) mod message_id_generator;
mod ping;
Expand Down
96 changes: 96 additions & 0 deletions src/node/protocol/init.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2024 Kulpreet Singh

// This file is part of Frost-Federation

// Frost-Federation is free software: you can redistribute it and/or
// modify it under the terms of the GNU General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.

// Frost-Federation is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Frost-Federation. If not, see
// <https://www.gnu.org/licenses/>.

use crate::node::echo_broadcast::service::EchoBroadcast;
#[mockall_double::double]
use crate::node::echo_broadcast::EchoBroadcastHandle;
use crate::node::protocol::{MembershipMessage, Protocol, RoundOnePackageMessage};
use crate::node::reliable_sender::service::ReliableSend;
#[mockall_double::double]
use crate::node::reliable_sender::ReliableSenderHandle;
use crate::node::{reliable_sender, State};

use tower::{Layer, ServiceExt};

/// Run initial protocols for Node
pub(crate) async fn initialize(
node_id: String,
state: State,
echo_broadcast_handle: EchoBroadcastHandle,
reliable_sender_handle: ReliableSenderHandle,
delivery_time: u64,
) {
// let handshake_service = protocol::Protocol::new(node_id.clone(), self.state.clone());
// let reliable_sender_service =
// ReliableSend::new(handshake_service, reliable_sender_handle.clone());
// let timeout_layer = tower::timeout::TimeoutLayer::new(
// tokio::time::Duration::from_millis(self.delivery_timeout),
// );
// let _ = timeout_layer
// .layer(reliable_sender_service)
// .oneshot(HandshakeMessage::default().into())
// .await;

// log::info!("Handshake finished");

let round_one_service = Protocol::new(node_id.clone(), state.clone());
let echo_broadcast_service = EchoBroadcast::new(
round_one_service,
echo_broadcast_handle,
state.clone(),
node_id.clone(),
);

log::info!("Sending echo broadcast");

let _ = echo_broadcast_service
.oneshot(
RoundOnePackageMessage::new(node_id.clone(), "hello from round one package".into())
.into(),
)
.await;

log::info!("Echo broadcast finished");
let _ = send_membership(
node_id.clone(),
reliable_sender_handle,
state.clone(),
delivery_time,
)
.await;

log::info!("Membership sent");
}

pub(crate) async fn send_membership(
node_id: String,
sender: ReliableSenderHandle,
state: State,
delivery_time: u64,
) {
log::info!("Sending membership information");
let protocol_service = Protocol::new(node_id.clone(), state);
let reliable_sender_service = ReliableSend::new(protocol_service, sender);
let timeout_layer =
tower::timeout::TimeoutLayer::new(tokio::time::Duration::from_millis(delivery_time));
let res = timeout_layer
.layer(reliable_sender_service)
.oneshot(MembershipMessage::new(node_id, None).into())
.await;
log::debug!("Membership sending result {:?}", res);
}

0 comments on commit 002d76e

Please sign in to comment.