diff --git a/chain-signatures/node/src/cli.rs b/chain-signatures/node/src/cli.rs index 83131e2df..20c9a2d56 100644 --- a/chain-signatures/node/src/cli.rs +++ b/chain-signatures/node/src/cli.rs @@ -1,5 +1,6 @@ use crate::config::{Config, LocalConfig, NetworkConfig, OverrideConfig}; use crate::gcp::GcpService; +use crate::mesh::Mesh; use crate::protocol::{MpcSignProtocol, SignQueue}; use crate::{http_client, indexer, mesh, storage, web}; use clap::Parser; @@ -237,11 +238,12 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { tracing::info!(rpc_addr = rpc_client.rpc_addr(), "rpc client initialized"); let signer = InMemorySigner::from_secret_key(account_id.clone(), account_sk); + let (mesh, mesh_state) = Mesh::init(mesh_options); let (protocol, protocol_state) = MpcSignProtocol::init( my_address, - mpc_contract_id, + mpc_contract_id.clone(), account_id, - rpc_client, + rpc_client.clone(), signer, receiver, sign_queue, @@ -255,13 +257,23 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { sign_sk, }, }), - mesh_options, + mesh_state.clone(), message_options, ); + let (contract_updater, contract_state) = + crate::contract_updater::ContractStateUpdater::init(rpc_client, mpc_contract_id); + rt.block_on(async { tracing::info!("protocol initialized"); - let protocol_handle = tokio::spawn(async move { protocol.run().await }); + let contract_state_clone = contract_state.clone(); + let contract_handle = + tokio::spawn(async move { contract_updater.run(contract_state_clone).await }); + let contract_state_clone = contract_state.clone(); + let mesh_handle = + tokio::spawn(async move { mesh.run(contract_state_clone, mesh_state).await }); + let protocol_handle = + tokio::spawn(async move { protocol.run(contract_state).await }); tracing::info!("protocol thread spawned"); let cipher_sk = hpke::SecretKey::try_from_bytes(&hex::decode(cipher_sk)?)?; let web_handle = tokio::spawn(async move { @@ -269,6 +281,8 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { }); tracing::info!("protocol http server spawned"); + contract_handle.await??; + mesh_handle.await??; protocol_handle.await??; web_handle.await??; tracing::info!("spinning down"); diff --git a/chain-signatures/node/src/contract_updater.rs b/chain-signatures/node/src/contract_updater.rs new file mode 100644 index 000000000..79f0492ae --- /dev/null +++ b/chain-signatures/node/src/contract_updater.rs @@ -0,0 +1,42 @@ +use crate::protocol::ProtocolState; +use crate::rpc_client; +use near_account_id::AccountId; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::RwLock; + +pub struct ContractStateUpdater { + rpc_client: near_fetch::Client, + mpc_contract_id: AccountId, +} + +impl ContractStateUpdater { + pub fn init( + rpc_client: near_fetch::Client, + mpc_contract_id: AccountId, + ) -> (Self, Arc>>) { + let updater = Self { + rpc_client, + mpc_contract_id: mpc_contract_id.clone(), + }; + let contract_state = Arc::new(RwLock::new(None)); + (updater, contract_state) + } + + pub async fn run( + &self, + contract_state: Arc>>, + ) -> anyhow::Result<()> { + let mut last_update = Instant::now(); + loop { + if last_update.elapsed() > Duration::from_millis(1000) { + let mut contract_state = contract_state.write().await; + *contract_state = + rpc_client::fetch_mpc_contract_state(&self.rpc_client, &self.mpc_contract_id) + .await + .ok(); + last_update = Instant::now(); + } + } + } +} diff --git a/chain-signatures/node/src/lib.rs b/chain-signatures/node/src/lib.rs index 5d1d7d058..2d8afca77 100644 --- a/chain-signatures/node/src/lib.rs +++ b/chain-signatures/node/src/lib.rs @@ -1,5 +1,6 @@ pub mod cli; pub mod config; +pub mod contract_updater; pub mod gcp; pub mod http_client; pub mod indexer; diff --git a/chain-signatures/node/src/mesh/mod.rs b/chain-signatures/node/src/mesh/mod.rs index 7dcedbcc6..7f475019a 100644 --- a/chain-signatures/node/src/mesh/mod.rs +++ b/chain-signatures/node/src/mesh/mod.rs @@ -1,7 +1,9 @@ -use std::time::Duration; +use std::time::{Duration, Instant}; use crate::protocol::contract::primitives::Participants; use crate::protocol::ProtocolState; +use std::sync::Arc; +use tokio::sync::RwLock; pub mod connection; @@ -29,28 +31,49 @@ impl Options { } } +#[derive(Clone)] +pub struct MeshState { + /// Participants that are active at the beginning of each protocol loop. + pub active_participants: Participants, + + /// Potential participants that are active at the beginning of each protocol loop. This + /// includes participants belonging to the next epoch. + pub active_potential_participants: Participants, + + pub potential_participants: Participants, + + pub stable_participants: Participants, +} + pub struct Mesh { /// Pool of connections to participants. Used to check who is alive in the network. - pub connections: connection::Pool, + connections: connection::Pool, /// Participants that are active at the beginning of each protocol loop. - pub active_participants: Participants, + active_participants: Participants, /// Potential participants that are active at the beginning of each protocol loop. This /// includes participants belonging to the next epoch. - pub active_potential_participants: Participants, + active_potential_participants: Participants, } impl Mesh { - pub fn new(options: Options) -> Self { - Self { + pub fn init(options: Options) -> (Self, Arc>) { + let mesh = Self { connections: connection::Pool::new( Duration::from_millis(options.fetch_participant_timeout), Duration::from_millis(options.refresh_active_timeout), ), active_participants: Participants::default(), active_potential_participants: Participants::default(), - } + }; + let mesh_state = Arc::new(RwLock::new(MeshState { + active_participants: Participants::default(), + active_potential_participants: Participants::default(), + potential_participants: Participants::default(), + stable_participants: Participants::default(), + })); + (mesh, mesh_state) } /// Participants that are active at the beginning of each protocol loop. @@ -96,7 +119,7 @@ impl Mesh { stable } - pub async fn establish_participants(&mut self, contract_state: &ProtocolState) { + async fn establish_participants(&mut self, contract_state: &ProtocolState) { self.connections .establish_participants(contract_state) .await; @@ -110,8 +133,31 @@ impl Mesh { } /// Ping the active participants such that we can see who is alive. - pub async fn ping(&mut self) { + async fn ping(&mut self) { self.active_participants = self.connections.ping().await; self.active_potential_participants = self.connections.ping_potential().await; } + + pub async fn run( + mut self, + contract_state: Arc>>, + mesh_state: Arc>, + ) -> anyhow::Result<()> { + let mut last_pinged = Instant::now(); + loop { + if last_pinged.elapsed() > Duration::from_millis(300) { + if let Some(state) = contract_state.read().await.clone() { + self.establish_participants(&state).await; + let mut mesh_state = mesh_state.write().await; + *mesh_state = MeshState { + active_participants: self.active_participants().clone(), + active_potential_participants: self.active_potential_participants().clone(), + potential_participants: self.potential_participants().await.clone(), + stable_participants: self.potential_participants().await.clone(), + }; + last_pinged = Instant::now(); + } + } + } + } } diff --git a/chain-signatures/node/src/protocol/contract/mod.rs b/chain-signatures/node/src/protocol/contract/mod.rs index 44a7a451d..264cfbe47 100644 --- a/chain-signatures/node/src/protocol/contract/mod.rs +++ b/chain-signatures/node/src/protocol/contract/mod.rs @@ -9,7 +9,7 @@ use std::{collections::HashSet, str::FromStr}; use self::primitives::{Candidates, Participants, PkVotes, Votes}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Clone, Serialize, Deserialize, Debug)] pub struct InitializingContractState { pub candidates: Candidates, pub threshold: usize, @@ -26,7 +26,7 @@ impl From for InitializingContractState } } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Clone, Serialize, Deserialize, Debug)] pub struct RunningContractState { pub epoch: u64, pub participants: Participants, @@ -51,7 +51,7 @@ impl From for RunningContractState { } } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Clone, Serialize, Deserialize, Debug)] pub struct ResharingContractState { pub old_epoch: u64, pub old_participants: Participants, @@ -78,7 +78,7 @@ impl From for ResharingContractState { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum ProtocolState { Initializing(InitializingContractState), Running(RunningContractState), diff --git a/chain-signatures/node/src/protocol/contract/primitives.rs b/chain-signatures/node/src/protocol/contract/primitives.rs index 101ec57d9..149722a45 100644 --- a/chain-signatures/node/src/protocol/contract/primitives.rs +++ b/chain-signatures/node/src/protocol/contract/primitives.rs @@ -291,7 +291,7 @@ impl From for PkVotes { } } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Votes { pub votes: BTreeMap>, } diff --git a/chain-signatures/node/src/protocol/cryptography.rs b/chain-signatures/node/src/protocol/cryptography.rs index 24faac927..da901278d 100644 --- a/chain-signatures/node/src/protocol/cryptography.rs +++ b/chain-signatures/node/src/protocol/cryptography.rs @@ -4,9 +4,9 @@ use super::state::{GeneratingState, NodeState, ResharingState, RunningState}; use super::Config; use crate::gcp::error::SecretStorageError; use crate::http_client::SendError; -use crate::mesh::Mesh; use crate::protocol::message::{GeneratingMessage, ResharingMessage}; use crate::protocol::state::{PersistentNodeData, WaitingForConsensusState}; +use crate::protocol::MeshState; use crate::protocol::MpcMessage; use crate::storage::secret_storage::SecretNodeStorageBox; use async_trait::async_trait; @@ -14,6 +14,8 @@ use cait_sith::protocol::{Action, InitializationError, Participant, ProtocolErro use k256::elliptic_curve::group::GroupEncoding; use near_account_id::AccountId; use near_crypto::InMemorySigner; +use std::sync::Arc; +use tokio::sync::RwLock; #[async_trait::async_trait] pub trait CryptographicCtx { @@ -26,7 +28,7 @@ pub trait CryptographicCtx { fn cfg(&self) -> &Config; /// Active participants is the active participants at the beginning of each protocol loop. - fn mesh(&self) -> &Mesh; + fn mesh_state(&self) -> &Arc>; } #[derive(thiserror::Error, Debug)] @@ -74,7 +76,8 @@ impl CryptographicProtocol for GeneratingState { mut self, mut ctx: C, ) -> Result { - tracing::info!(active = ?ctx.mesh().active_participants().keys_vec(), "generating: progressing key generation"); + let mesh_state = ctx.mesh_state().read().await.clone(); + tracing::info!(active = ?mesh_state.active_participants.keys_vec(), "generating: progressing key generation"); let mut protocol = self.protocol.write().await; loop { let action = match protocol.poke() { @@ -99,13 +102,13 @@ impl CryptographicProtocol for GeneratingState { ctx.me().await, &ctx.cfg().local.network.sign_sk, ctx.http_client(), - ctx.mesh().active_participants(), + &mesh_state.active_participants, &ctx.cfg().protocol, ) .await; if !failures.is_empty() { tracing::warn!( - active = ?ctx.mesh().active_participants().keys_vec(), + active = ?mesh_state.active_participants.keys_vec(), "generating(wait): failed to send encrypted message; {failures:?}" ); } @@ -115,7 +118,7 @@ impl CryptographicProtocol for GeneratingState { Action::SendMany(data) => { tracing::debug!("generating: sending a message to many participants"); let mut messages = self.messages.write().await; - for (p, info) in ctx.mesh().active_participants().iter() { + for (p, info) in mesh_state.active_participants.iter() { if p == &ctx.me().await { // Skip yourself, cait-sith never sends messages to oneself continue; @@ -161,13 +164,13 @@ impl CryptographicProtocol for GeneratingState { ctx.me().await, &ctx.cfg().local.network.sign_sk, ctx.http_client(), - ctx.mesh().active_participants(), + &mesh_state.active_participants, &ctx.cfg().protocol, ) .await; if !failures.is_empty() { tracing::warn!( - active = ?ctx.mesh().active_participants().keys_vec(), + active = ?mesh_state.active_participants.keys_vec(), "generating(return): failed to send encrypted message; {failures:?}" ); } @@ -191,6 +194,7 @@ impl CryptographicProtocol for WaitingForConsensusState { mut self, ctx: C, ) -> Result { + let mesh_state = ctx.mesh_state().read().await.clone(); let failures = self .messages .write() @@ -199,13 +203,13 @@ impl CryptographicProtocol for WaitingForConsensusState { ctx.me().await, &ctx.cfg().local.network.sign_sk, ctx.http_client(), - ctx.mesh().active_participants(), + &mesh_state.active_participants, &ctx.cfg().protocol, ) .await; if !failures.is_empty() { tracing::warn!( - active = ?ctx.mesh().active_participants().keys_vec(), + active = ?mesh_state.active_participants.keys_vec(), "waitingForConsensus: failed to send encrypted message; {failures:?}" ); } @@ -221,13 +225,13 @@ impl CryptographicProtocol for ResharingState { mut self, mut ctx: C, ) -> Result { + let mesh_state = ctx.mesh_state().read().await.clone(); // TODO: we are not using active potential participants here, but we should in the future. // Currently resharing protocol does not timeout and restart with new set of participants. // So if it picks up a participant that is not active, it will never be able to send a message to it. - let active = ctx - .mesh() - .active_participants() - .and(&ctx.mesh().potential_participants().await); + let active = mesh_state + .active_participants + .and(&mesh_state.potential_participants); tracing::info!(active = ?active.keys().collect::>(), "progressing key reshare"); let mut protocol = self.protocol.write().await; loop { @@ -356,8 +360,9 @@ impl CryptographicProtocol for RunningState { mut self, ctx: C, ) -> Result { + let mesh_state = ctx.mesh_state().read().await.clone(); let protocol_cfg = &ctx.cfg().protocol; - let active = ctx.mesh().active_participants(); + let active = &mesh_state.active_participants; if active.len() < self.threshold { tracing::warn!( active = ?active.keys_vec(), @@ -429,7 +434,7 @@ impl CryptographicProtocol for RunningState { // stable participants utilizes more than the online status of a node, such as whether or not their // block height is up to date, such that they too can process signature requests. If they cannot // then they are considered unstable and should not be a part of signature generation this round. - let stable = ctx.mesh().stable_participants().await; + let stable = mesh_state.stable_participants; tracing::debug!(?stable, "stable participants"); let mut sign_queue = self.sign_queue.write().await; diff --git a/chain-signatures/node/src/protocol/message.rs b/chain-signatures/node/src/protocol/message.rs index e6f0f4545..c635dd827 100644 --- a/chain-signatures/node/src/protocol/message.rs +++ b/chain-signatures/node/src/protocol/message.rs @@ -6,7 +6,7 @@ use super::triple::TripleId; use crate::gcp::error::SecretStorageError; use crate::http_client::SendError; use crate::indexer::ContractSignRequest; -use crate::mesh::Mesh; +use crate::protocol::MeshState; use crate::util; use async_trait::async_trait; @@ -22,7 +22,7 @@ use tokio::sync::RwLock; #[async_trait::async_trait] pub trait MessageCtx { async fn me(&self) -> Participant; - fn mesh(&self) -> &Mesh; + fn mesh_state(&self) -> &Arc>; fn cfg(&self) -> &crate::config::Config; } @@ -236,8 +236,9 @@ impl MessageHandler for RunningState { ctx: C, queue: &mut MpcMessageQueue, ) -> Result<(), MessageHandleError> { + let mesh_state = ctx.mesh_state().read().await.clone(); let protocol_cfg = &ctx.cfg().protocol; - let participants = ctx.mesh().active_participants(); + let participants = &mesh_state.active_participants; let mut triple_manager = self.triple_manager.write().await; // remove the triple_id that has already failed or taken from the triple_bins diff --git a/chain-signatures/node/src/protocol/mod.rs b/chain-signatures/node/src/protocol/mod.rs index 7109e7c4c..af00df24a 100644 --- a/chain-signatures/node/src/protocol/mod.rs +++ b/chain-signatures/node/src/protocol/mod.rs @@ -23,12 +23,10 @@ use self::cryptography::CryptographicCtx; use self::message::MessageCtx; use crate::config::Config; use crate::http_client; -use crate::mesh; -use crate::mesh::Mesh; +use crate::mesh::MeshState; use crate::protocol::consensus::ConsensusProtocol; use crate::protocol::cryptography::CryptographicProtocol; use crate::protocol::message::{MessageHandler, MpcMessageQueue}; -use crate::rpc_client; use crate::storage::presignature_storage::PresignatureRedisStorage; use crate::storage::secret_storage::SecretNodeStorageBox; use crate::storage::triple_storage::TripleRedisStorage; @@ -56,7 +54,7 @@ struct Ctx { triple_storage: TripleRedisStorage, presignature_storage: PresignatureRedisStorage, cfg: Config, - mesh: Mesh, + mesh_state: Arc>, message_options: http_client::Options, } @@ -140,8 +138,8 @@ impl CryptographicCtx for &mut MpcSignProtocol { &self.ctx.cfg } - fn mesh(&self) -> &Mesh { - &self.ctx.mesh + fn mesh_state(&self) -> &Arc> { + &self.ctx.mesh_state } } @@ -151,8 +149,8 @@ impl MessageCtx for &MpcSignProtocol { get_my_participant(self).await } - fn mesh(&self) -> &Mesh { - &self.ctx.mesh + fn mesh_state(&self) -> &Arc> { + &self.ctx.mesh_state } fn cfg(&self) -> &Config { @@ -180,7 +178,7 @@ impl MpcSignProtocol { triple_storage: TripleRedisStorage, presignature_storage: PresignatureRedisStorage, cfg: Config, - mesh_options: mesh::Options, + mesh_state: Arc>, message_options: http_client::Options, ) -> (Self, Arc>) { let my_address = my_address.into_url().unwrap(); @@ -208,7 +206,7 @@ impl MpcSignProtocol { triple_storage, presignature_storage, cfg, - mesh: Mesh::new(mesh_options), + mesh_state, message_options, }; let protocol = MpcSignProtocol { @@ -219,7 +217,10 @@ impl MpcSignProtocol { (protocol, state) } - pub async fn run(mut self) -> anyhow::Result<()> { + pub async fn run( + mut self, + contract_state: Arc>>, + ) -> anyhow::Result<()> { let my_account_id = self.ctx.account_id.to_string(); let _span = tracing::info_span!("running", my_account_id); crate::metrics::NODE_RUNNING @@ -229,10 +230,8 @@ impl MpcSignProtocol { .with_label_values(&[my_account_id.as_str()]) .set(node_version()); let mut queue = MpcMessageQueue::default(); - let mut last_state_update = Instant::now(); let mut last_config_update = Instant::now(); let mut last_hardware_pull = Instant::now(); - let mut last_pinged = Instant::now(); // Sets the latest configurations from the contract: if let Err(err) = self @@ -274,29 +273,9 @@ impl MpcSignProtocol { } } - let contract_state = if last_state_update.elapsed() > Duration::from_secs(1) { - let contract_state = match rpc_client::fetch_mpc_contract_state( - &self.ctx.rpc_client, - &self.ctx.mpc_contract_id, - ) - .await - { - Ok(contract_state) => contract_state, - Err(_) => { - tokio::time::sleep(Duration::from_secs(1)).await; - continue; - } - }; - - // Establish the participants for this current iteration of the protocol loop. This will - // set which participants are currently active in the protocol and determines who will be - // receiving messages. - self.ctx.mesh.establish_participants(&contract_state).await; - - last_state_update = Instant::now(); - Some(contract_state) - } else { - None + let contract_state = { + let guard = contract_state.read().await; + guard.clone() }; if last_config_update.elapsed() > Duration::from_secs(5 * 60) { @@ -312,11 +291,6 @@ impl MpcSignProtocol { last_config_update = Instant::now(); } - if last_pinged.elapsed() > Duration::from_millis(300) { - self.ctx.mesh.ping().await; - last_pinged = Instant::now(); - } - let state = { let guard = self.state.read().await; guard.clone()