Skip to content

Commit

Permalink
Query Membership in service
Browse files Browse the repository at this point in the history
Node simply has to call the membership service with None members
  • Loading branch information
pool2win committed Oct 9, 2024
1 parent 9a9b0db commit 423511d
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 22 deletions.
32 changes: 11 additions & 21 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,27 +234,17 @@ impl Node {

pub(crate) async fn send_membership(&self, sender: ReliableSenderHandle) {
log::info!("Sending membership information");
match self.state.membership_handle.get_members().await {
Err(_) => {
log::debug!("Error reading membership");
}
Ok(members) => {
let members_name = Some(members.into_keys().collect());
let protocol_service =
protocol::Protocol::new(self.get_node_id().clone(), self.state.clone());
let reliable_sender_service = ReliableSend::new(protocol_service, sender);
let timeout_layer = tower::timeout::TimeoutLayer::new(
tokio::time::Duration::from_millis(self.delivery_timeout),
);
let res = timeout_layer
.layer(reliable_sender_service)
.oneshot(
MembershipMessage::new(self.get_node_id().clone(), members_name).into(),
)
.await;
log::debug!("Membership sending result {:?}", res);
}
}
let protocol_service =
protocol::Protocol::new(self.get_node_id().clone(), self.state.clone());
let reliable_sender_service = ReliableSend::new(protocol_service, sender);
let timeout_layer = tower::timeout::TimeoutLayer::new(tokio::time::Duration::from_millis(
self.delivery_timeout,
));
let res = timeout_layer
.layer(reliable_sender_service)
.oneshot(MembershipMessage::new(self.get_node_id().clone(), None).into())
.await;
log::debug!("Membership sending result {:?}", res);
}

/// Connect to all peers and start reader writer tasks
Expand Down
1 change: 0 additions & 1 deletion src/node/protocol/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ impl Service<Message> for Membership {
/// node when anyone connects to it.
fn call(&mut self, msg: Message) -> Self::Future {
let state = self.state.clone();
log::debug!("MSG {:?}", msg);
async move {
match msg {
Message::UnicastMessage(Unicast::Membership(MembershipMessage {
Expand Down

0 comments on commit 423511d

Please sign in to comment.