From 52ba5a2d1cce0f362f2f41c03eebdf88b0256bcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Zwoli=C5=84ski?= Date: Tue, 13 Feb 2024 20:15:28 +0100 Subject: [PATCH 1/6] replace NodeConfig with NodeBuilder --- Cargo.lock | 2 +- cli/Cargo.toml | 1 - node/Cargo.toml | 1 + node/src/lib.rs | 3 +- node/src/network.rs | 68 ++++----- node/src/node.rs | 56 +------- node/src/node_builder.rs | 295 ++++++++++++++++++++++++++++++++++++++ node/src/p2p.rs | 15 +- node/src/p2p/header_ex.rs | 9 +- node/src/test_utils.rs | 39 ++--- node/src/utils.rs | 7 + 11 files changed, 376 insertions(+), 120 deletions(-) create mode 100644 node/src/node_builder.rs diff --git a/Cargo.lock b/Cargo.lock index 2f887328..1fea7c47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2790,7 +2790,6 @@ dependencies = [ "celestia-rpc", "celestia-types", "clap", - "directories", "dotenvy", "envy", "libp2p", @@ -2821,6 +2820,7 @@ dependencies = [ "celestia-types", "cid", "dashmap", + "directories", "dotenvy", "function_name", "futures", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index eb1e93a3..8e30073c 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -32,7 +32,6 @@ lumina-node = { workspace = true } anyhow = "1.0.71" axum = "0.6.20" clap = { version = "4.4.4", features = ["derive"] } -directories = "5.0.1" dotenvy = "0.15.7" mime_guess = "2.0" rust-embed = { version = "8.0.0", features = ["interpolate-folder-path"] } diff --git a/node/Cargo.toml b/node/Cargo.toml index de914002..5f62df3f 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -52,6 +52,7 @@ tracing = "0.1.37" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] backoff = { version = "0.4.0", features = ["tokio"] } blockstore = { workspace = true, features = ["sled"] } +directories = "5.0.1" # Upgrading this dependency invalidates existing persistent dbs. # Those can be restored by migrating between versions: # https://docs.rs/sled/latest/sled/struct.Db.html#examples-1 diff --git a/node/src/lib.rs b/node/src/lib.rs index 4ca3aef5..857b7c28 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -5,6 +5,7 @@ pub mod blockstore; mod executor; pub mod network; pub mod node; +mod node_builder; pub mod p2p; pub mod peer_tracker; pub mod store; @@ -12,4 +13,4 @@ pub mod syncer; #[cfg(any(test, feature = "test-utils"))] #[cfg_attr(docs_rs, doc(cfg(feature = "test-utils")))] pub mod test_utils; -mod utils; +pub mod utils; diff --git a/node/src/network.rs b/node/src/network.rs index a0bddf82..d19fc0f3 100644 --- a/node/src/network.rs +++ b/node/src/network.rs @@ -40,33 +40,34 @@ impl FromStr for Network { } /// Get the string id of the given network. -pub fn network_id(network: Network) -> &'static str { - match network { - Network::Arabica => "arabica-10", - Network::Mocha => "mocha-4", - Network::Private => "private", - Network::Mainnet => "celestia", +impl Network { + pub fn id(&self) -> &'static str { + match self { + Network::Arabica => "arabica-10", + Network::Mocha => "mocha-4", + Network::Private => "private", + Network::Mainnet => "celestia", + } } -} -/// Get the hash of a genesis block for the given network. -pub fn network_genesis(network: Network) -> Option { - let hex = match network { - Network::Mainnet => "6BE39EFD10BA412A9DB5288488303F5DD32CF386707A5BEF33617F4C43301872", - Network::Arabica => "5904E55478BA4B3002EE885621E007A2A6A2399662841912219AECD5D5CBE393", - Network::Mocha => "B93BBE20A0FBFDF955811B6420F8433904664D45DB4BF51022BE4200C1A1680D", - Network::Private => return None, - }; + /// Get the hash of a genesis block for the given network. + pub fn genesis(&self) -> Option { + let hex = match self { + Network::Mainnet => "6BE39EFD10BA412A9DB5288488303F5DD32CF386707A5BEF33617F4C43301872", + Network::Arabica => "5904E55478BA4B3002EE885621E007A2A6A2399662841912219AECD5D5CBE393", + Network::Mocha => "B93BBE20A0FBFDF955811B6420F8433904664D45DB4BF51022BE4200C1A1680D", + Network::Private => return None, + }; - let bytes = hex::decode(hex).expect("failed decoding genesis hash"); - let array = bytes.try_into().expect("invalid genesis hash lenght"); + let bytes = hex::decode(hex).expect("failed decoding genesis hash"); + let array = bytes.try_into().expect("invalid genesis hash lenght"); - Some(Hash::Sha256(array)) -} + Some(Hash::Sha256(array)) + } -/// Get official Celestia and Lumina bootnodes for the given network. -pub fn canonical_network_bootnodes(network: Network) -> impl Iterator { - let peers: &[_] = match network { + /// Get official Celestia and Lumina bootnodes for the given network. + pub fn canonical_bootnodes(&self) -> impl Iterator { + let peers: &[_] = match self { Network::Mainnet => &[ "/dns4/lumina.eiger.co/tcp/2121/p2p/12D3KooW9z4jLqwodwNRcSa5qgcSgtJ13kN7CYLcwZQjPRYodqWx", "/dns4/lumina.eiger.co/udp/2121/quic-v1/webtransport/p2p/12D3KooW9z4jLqwodwNRcSa5qgcSgtJ13kN7CYLcwZQjPRYodqWx", @@ -94,9 +95,10 @@ pub fn canonical_network_bootnodes(network: Network) -> impl Iterator &[], }; - peers - .iter() - .map(|s| s.parse().expect("Invalid bootstrap address")) + peers + .iter() + .map(|s| s.parse().expect("Invalid bootstrap address")) + } } #[cfg(test)] @@ -105,32 +107,32 @@ mod tests { #[test] fn test_network_genesis() { - let mainnet = network_genesis(Network::Mainnet); + let mainnet = Network::Mainnet.genesis(); assert!(mainnet.is_some()); - let arabica = network_genesis(Network::Arabica); + let arabica = Network::Arabica.genesis(); assert!(arabica.is_some()); - let mocha = network_genesis(Network::Mocha); + let mocha = Network::Mocha.genesis(); assert!(mocha.is_some()); - let private = network_genesis(Network::Private); + let private = Network::Private.genesis(); assert!(private.is_none()); } #[test] fn test_canonical_network_bootnodes() { // canonical_network_bootnodes works on const data, test it doesn't panic and the data is there - let mainnet = canonical_network_bootnodes(Network::Mainnet); + let mainnet = Network::Mainnet.canonical_bootnodes(); assert_ne!(mainnet.count(), 0); - let arabica = canonical_network_bootnodes(Network::Arabica); + let arabica = Network::Arabica.canonical_bootnodes(); assert_ne!(arabica.count(), 0); - let mocha = canonical_network_bootnodes(Network::Mocha); + let mocha = Network::Mocha.canonical_bootnodes(); assert_ne!(mocha.count(), 0); - let private = canonical_network_bootnodes(Network::Private); + let private = Network::Private.canonical_bootnodes(); assert_eq!(private.count(), 0); } } diff --git a/node/src/node.rs b/node/src/node.rs index 549f0e73..114914fb 100644 --- a/node/src/node.rs +++ b/node/src/node.rs @@ -7,7 +7,6 @@ use std::ops::RangeBounds; use std::sync::Arc; -use blockstore::Blockstore; use celestia_types::hash::Hash; use celestia_types::namespaced_data::NamespacedData; use celestia_types::nmt::Namespace; @@ -15,17 +14,18 @@ use celestia_types::row::Row; use celestia_types::sample::Sample; use celestia_types::ExtendedHeader; use cid::Cid; -use libp2p::identity::Keypair; use libp2p::swarm::NetworkInfo; use libp2p::{Multiaddr, PeerId}; -use crate::p2p::{P2p, P2pArgs, P2pError}; +use crate::p2p::{P2p, P2pError}; use crate::peer_tracker::PeerTrackerInfo; use crate::store::{Store, StoreError}; -use crate::syncer::{Syncer, SyncerArgs, SyncerError, SyncingInfo}; +use crate::syncer::{Syncer, SyncerError, SyncingInfo}; type Result = std::result::Result; +pub use crate::node_builder::NodeBuilder; + /// Representation of all the errors that can occur when interacting with the [`Node`]. #[derive(Debug, thiserror::Error)] pub enum NodeError { @@ -42,36 +42,14 @@ pub enum NodeError { Store(#[from] StoreError), } -/// Node conifguration. -pub struct NodeConfig -where - B: Blockstore, - S: Store, -{ - /// An id of the network to connect to. - pub network_id: String, - /// The hash of the genesis block in network. - pub genesis_hash: Option, - /// The keypair to be used as [`Node`]s identity. - pub p2p_local_keypair: Keypair, - /// List of bootstrap nodes to connect to and trust. - pub p2p_bootnodes: Vec, - /// List of the addresses where [`Node`] will listen for incoming connections. - pub p2p_listen_on: Vec, - /// The blockstore for bitswap. - pub blockstore: B, - /// The store for headers. - pub store: S, -} - /// Celestia node. pub struct Node where S: Store + 'static, { p2p: Arc, - store: Arc, syncer: Arc>, + store: Arc, } impl Node @@ -79,28 +57,8 @@ where S: Store, { /// Creates and starts a new celestia node with a given config. - pub async fn new(config: NodeConfig) -> Result - where - B: Blockstore + 'static, - { - let store = Arc::new(config.store); - - let p2p = Arc::new(P2p::start(P2pArgs { - network_id: config.network_id, - local_keypair: config.p2p_local_keypair, - bootnodes: config.p2p_bootnodes, - listen_on: config.p2p_listen_on, - blockstore: config.blockstore, - store: store.clone(), - })?); - - let syncer = Arc::new(Syncer::start(SyncerArgs { - genesis_hash: config.genesis_hash, - store: store.clone(), - p2p: p2p.clone(), - })?); - - Ok(Node { p2p, store, syncer }) + pub(crate) fn new(p2p: Arc, syncer: Arc>, store: Arc) -> Self { + Node { p2p, store, syncer } } /// Get node's local peer ID. diff --git a/node/src/node_builder.rs b/node/src/node_builder.rs new file mode 100644 index 00000000..54cf9f7a --- /dev/null +++ b/node/src/node_builder.rs @@ -0,0 +1,295 @@ +use std::io; +use std::sync::Arc; + +use blockstore::{Blockstore, BlockstoreError}; +use celestia_types::hash::Hash; +use libp2p::identity::Keypair; +use libp2p::Multiaddr; + +use crate::network::Network; +use crate::node::Node; +use crate::p2p::{P2p, P2pArgs, P2pError}; +use crate::store::{Store, StoreError}; +use crate::syncer::{Syncer, SyncerArgs, SyncerError}; + +type Result = std::result::Result; + +/// Representation of all the errors that can occur when interacting with the [`NodeBuilder`]. +#[derive(Debug, thiserror::Error)] +pub enum NodeBuilderError { + /// An error propagated from the [`P2p`] module. + #[error(transparent)] + P2p(#[from] P2pError), + + /// An error propagated from the [`Syncer`] module. + #[error(transparent)] + Syncer(#[from] SyncerError), + + /// An error propagated from the [`Blockstore`] module. + #[error(transparent)] + BlockstoreError(#[from] BlockstoreError), + + /// An error propagated from the [`Store`] module. + #[error(transparent)] + StoreError(#[from] StoreError), + + /// An error propagated from the IO operation. + #[error("Received io error from persistent storage: {0}")] + IoError(#[from] io::Error), + + /// A required setting wasn't provided. + #[error("Required setting not provided: {0}")] + SettingMissing(String), +} + +/// Node conifguration. +pub struct NodeBuilder +where + B: Blockstore, + S: Store, +{ + /// An id of the network to connect to. + network: Option, + /// The hash of the genesis block in network. + genesis_hash: Option, + /// The keypair to be used as [`Node`]s identity. + p2p_local_keypair: Option, + /// List of bootstrap nodes to connect to and trust. + p2p_bootnodes: Vec, + /// List of the addresses where [`Node`] will listen for incoming connections. + p2p_listen_on: Vec, + /// The blockstore for bitswap. + blockstore: Option, + /// The store for headers. + store: Option, + /// A handle for the [`sled::Db`] to initialize default sled store and blockstore + /// within the same instance. + #[cfg(not(target_arch = "wasm32"))] + sled_db: Option, +} + +impl Default for NodeBuilder +where + B: Blockstore, + S: Store, +{ + fn default() -> Self { + Self { + network: None, + genesis_hash: None, + p2p_local_keypair: None, + p2p_bootnodes: vec![], + p2p_listen_on: vec![], + blockstore: None, + store: None, + #[cfg(not(target_arch = "wasm32"))] + sled_db: None, + } + } +} + +impl NodeBuilder +where + B: Blockstore + 'static, + S: Store, +{ + pub fn new() -> Self { + Self::default() + } + + pub fn with_network(mut self, network: Network) -> Self { + self.network = Some(network); + self + } + + pub fn with_genesis(mut self, hash: Option) -> Self { + self.genesis_hash = hash; + self + } + + pub fn with_p2p_keypair(mut self, keypair: Keypair) -> Self { + self.p2p_local_keypair = Some(keypair); + self + } + + pub fn with_listeners(mut self, listeners: Vec) -> Self { + self.p2p_listen_on = listeners; + self + } + + pub fn with_bootnodes(mut self, bootnodes: Vec) -> Self { + self.p2p_bootnodes = bootnodes; + self + } + + pub fn with_blockstore(mut self, blockstore: B) -> Self { + self.blockstore = Some(blockstore); + self + } + + pub fn with_store(mut self, store: S) -> Self { + self.store = Some(store); + self + } + + pub async fn build(self) -> Result> { + let network = self + .network + .ok_or_else(|| NodeBuilderError::SettingMissing("network".into()))?; + let local_keypair = self + .p2p_local_keypair + .ok_or_else(|| NodeBuilderError::SettingMissing("p2p_local_keypair".into()))?; + let blockstore = self + .blockstore + .ok_or_else(|| NodeBuilderError::SettingMissing("blockstore".into()))?; + let store = self + .store + .ok_or_else(|| NodeBuilderError::SettingMissing("store".into()))?; + + let store = Arc::new(store); + + let p2p = Arc::new(P2p::start(P2pArgs { + network, + local_keypair, + bootnodes: self.p2p_bootnodes, + listen_on: self.p2p_listen_on, + blockstore, + store: store.clone(), + })?); + + let syncer = Arc::new(Syncer::start(SyncerArgs { + genesis_hash: self.genesis_hash, + store: store.clone(), + p2p: p2p.clone(), + })?); + + Ok(Node::new(p2p, syncer, store)) + } +} + +#[cfg(not(target_arch = "wasm32"))] +mod native { + use std::path::Path; + + use directories::ProjectDirs; + use tokio::{fs, task::spawn_blocking}; + use tracing::warn; + + use crate::{blockstore::SledBlockstore, store::SledStore, utils}; + + use super::*; + + impl NodeBuilder + where + B: Blockstore, + S: Store, + { + async fn get_sled_db(&mut self) -> Result { + if let Some(db) = self.sled_db.clone() { + return Ok(db); + } + + let network_id = self + .network + .ok_or_else(|| NodeBuilderError::SettingMissing("network".into()))? + .id(); + let mut data_dir = utils::data_dir() + .ok_or_else(|| StoreError::OpenFailed("Can't find home of current user".into()))?; + + // TODO(02.2024): remove in 3 months or after few releases + migrate_from_old_cache_dir(&data_dir).await?; + + data_dir.push(network_id); + data_dir.push("db"); + + let db = spawn_blocking(|| sled::open(data_dir)) + .await + .map_err(io::Error::from)? + .map_err(|e| StoreError::OpenFailed(e.to_string()))?; + + self.sled_db = Some(db.clone()); + + Ok(db) + } + } + + impl NodeBuilder + where + S: Store, + { + pub async fn with_default_blockstore(mut self) -> Result { + let db = self.get_sled_db().await?; + self.blockstore = Some(SledBlockstore::new(db).await?); + Ok(self) + } + } + + impl NodeBuilder + where + B: Blockstore, + { + pub async fn with_default_store(mut self) -> Result { + let db = self.get_sled_db().await?; + self.store = Some(SledStore::new(db).await?); + Ok(self) + } + } + + // TODO(02.2024): remove in 3 months or after few releases + // Previously we used `.cache/celestia/{network_id}` (or os equivalent) for + // sled's persistent storage. This function will migrate it to a new lumina + // data dir. If a new storage is found too, migration will not overwrite it. + async fn migrate_from_old_cache_dir(data_dir: &Path) -> Result<()> { + let old_cache_dir = ProjectDirs::from("co", "eiger", "celestia") + .expect("Must succeed after data_dir is known") + .cache_dir() + .to_owned(); + + // already migrated or fresh usage + if !old_cache_dir.exists() { + return Ok(()); + } + + // we won't migrate old data if user already have a new persistent storage. + if data_dir.exists() { + warn!( + "Found both old and new Lumina storages. {} can be deleted.", + old_cache_dir.display() + ); + return Ok(()); + } + + warn!( + "Migrating Lumina storage to a new location: {} -> {}", + old_cache_dir.display(), + data_dir.display() + ); + + // migrate data for each network + for network in [ + Network::Arabica, + Network::Mocha, + Network::Mainnet, + Network::Private, + ] { + let net_id = network.id(); + let old = old_cache_dir.join(net_id); + let new = data_dir.join(net_id); + + if old.exists() { + fs::create_dir_all(&new).await?; + fs::rename(old, new.join("db")).await?; + } + } + + if old_cache_dir.read_dir()?.count() > 0 { + warn!("Old Lumina storage not empty after successful migration."); + warn!( + "Inspect and remove it manually: {}", + old_cache_dir.display() + ); + } + + Ok(()) + } +} diff --git a/node/src/p2p.rs b/node/src/p2p.rs index 9f149fb0..09d083e7 100644 --- a/node/src/p2p.rs +++ b/node/src/p2p.rs @@ -49,6 +49,7 @@ mod shwap; mod swarm; use crate::executor::{spawn, Interval}; +use crate::network::Network; use crate::p2p::header_ex::{HeaderExBehaviour, HeaderExConfig}; use crate::p2p::header_session::HeaderSession; use crate::p2p::shwap::{namespaced_data_cid, row_cid, sample_cid, ShwapMultihasher}; @@ -159,7 +160,7 @@ where S: Store, { /// An id of the network to connect to. - pub network_id: String, + pub network: Network, /// The keypair to be used as the identity. pub local_keypair: Keypair, /// List of bootstrap nodes to connect to and trust. @@ -511,14 +512,14 @@ where args.local_keypair.public(), )); - let header_sub_topic = gossipsub_ident_topic(&args.network_id, "/header-sub/v0.0.1"); + let header_sub_topic = gossipsub_ident_topic(args.network.id(), "/header-sub/v0.0.1"); let gossipsub = init_gossipsub(&args, [&header_sub_topic])?; let kademlia = init_kademlia(&args)?; - let bitswap = init_bitswap(args.blockstore, &args.network_id)?; + let bitswap = init_bitswap(args.blockstore, args.network)?; let header_ex = HeaderExBehaviour::new(HeaderExConfig { - network_id: &args.network_id, + network: args.network, peer_tracker: peer_tracker.clone(), header_store: args.store.clone(), }); @@ -914,7 +915,7 @@ where let local_peer_id = PeerId::from(args.local_keypair.public()); let mut config = kad::Config::default(); - let protocol_id = celestia_protocol_id(&args.network_id, "/kad/1.0.0"); + let protocol_id = celestia_protocol_id(args.network.id(), "/kad/1.0.0"); config.set_protocol_names(vec![protocol_id]); @@ -934,11 +935,11 @@ where Ok(kademlia) } -fn init_bitswap(blockstore: B, network_id: &str) -> Result> +fn init_bitswap(blockstore: B, network: Network) -> Result> where B: Blockstore, { - let protocol_prefix = format!("/celestia/{}", network_id); + let protocol_prefix = format!("/celestia/{}", network.id()); Ok(beetswap::Behaviour::builder(blockstore) .protocol_prefix(&protocol_prefix)? diff --git a/node/src/p2p/header_ex.rs b/node/src/p2p/header_ex.rs index 758b5396..dff229a2 100644 --- a/node/src/p2p/header_ex.rs +++ b/node/src/p2p/header_ex.rs @@ -25,6 +25,7 @@ mod server; pub(crate) mod utils; use crate::executor::timeout; +use crate::network::Network; use crate::p2p::header_ex::client::HeaderExClientHandler; use crate::p2p::header_ex::server::HeaderExServerHandler; use crate::p2p::P2pError; @@ -59,8 +60,8 @@ where server_handler: HeaderExServerHandler, } -pub(crate) struct HeaderExConfig<'a, S> { - pub network_id: &'a str, +pub(crate) struct HeaderExConfig { + pub network: Network, pub peer_tracker: Arc, pub header_store: Arc, } @@ -93,11 +94,11 @@ impl HeaderExBehaviour where S: Store + 'static, { - pub(crate) fn new(config: HeaderExConfig<'_, S>) -> Self { + pub(crate) fn new(config: HeaderExConfig) -> Self { HeaderExBehaviour { req_resp: ReqRespBehaviour::new( [( - protocol_id(config.network_id, "/header-ex/v0.0.3"), + protocol_id(config.network.id(), "/header-ex/v0.0.3"), ProtocolSupport::Full, )], request_response::Config::default(), diff --git a/node/src/test_utils.rs b/node/src/test_utils.rs index 1a72a187..cd36e11c 100644 --- a/node/src/test_utils.rs +++ b/node/src/test_utils.rs @@ -10,7 +10,8 @@ use tokio::sync::{mpsc, watch}; use crate::{ blockstore::InMemoryBlockstore, executor::timeout, - node::NodeConfig, + network::Network, + node::NodeBuilder, p2p::{P2pCmd, P2pError}, peer_tracker::PeerTrackerInfo, store::InMemoryStore, @@ -32,38 +33,28 @@ pub fn gen_filled_store(amount: u64) -> (InMemoryStore, ExtendedHeaderGenerator) (s, gen) } -/// [`NodeConfig`] with default values for the usage in tests. +/// [`NodeBuilder`] with default values for the usage in tests. /// /// Can be used to fill the missing fields with `..test_node_config()` syntax. -pub fn test_node_config() -> NodeConfig { +pub fn test_node_builder() -> NodeBuilder { let node_keypair = identity::Keypair::generate_ed25519(); - NodeConfig { - network_id: "private".to_string(), - genesis_hash: None, - p2p_local_keypair: node_keypair, - p2p_bootnodes: vec![], - p2p_listen_on: vec![], - blockstore: InMemoryBlockstore::new(), - store: InMemoryStore::new(), - } + NodeBuilder::new() + .with_network(Network::Private) + .with_p2p_keypair(node_keypair) + .with_blockstore(InMemoryBlockstore::new()) + .with_store(InMemoryStore::new()) } -/// [`NodeConfig`] with listen address and default values for the usage in tests. -pub fn listening_test_node_config() -> NodeConfig { - NodeConfig { - p2p_listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()], - ..test_node_config() - } +/// [`NodeBuilder`] with listen address and default values for the usage in tests. +pub fn listening_test_node_config() -> NodeBuilder { + test_node_builder().with_listeners(vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()]) } -/// [`NodeConfig`] with given keypair and default values for the usage in tests. +/// [`NodeBuilder`] with given keypair and default values for the usage in tests. pub fn test_node_config_with_keypair( keypair: Keypair, -) -> NodeConfig { - NodeConfig { - p2p_local_keypair: keypair, - ..test_node_config() - } +) -> NodeBuilder { + test_node_builder().with_p2p_keypair(keypair) } /// A handle to the mocked [`P2p`] component. diff --git a/node/src/utils.rs b/node/src/utils.rs index 873706b0..ec16088a 100644 --- a/node/src/utils.rs +++ b/node/src/utils.rs @@ -1,3 +1,5 @@ +//! Utilities and helpers for running the node. + use celestia_types::ExtendedHeader; use libp2p::gossipsub::IdentTopic; use libp2p::multiaddr::{Multiaddr, Protocol}; @@ -6,6 +8,11 @@ use tokio::sync::oneshot; use crate::executor::yield_now; +#[cfg(not(target_arch = "wasm32"))] +pub fn data_dir() -> Option { + directories::ProjectDirs::from("co", "eiger", "lumina").map(|dirs| dirs.cache_dir().to_owned()) +} + pub(crate) const VALIDATIONS_PER_YIELD: usize = 4; pub(crate) fn protocol_id(network: &str, protocol: &str) -> StreamProtocol { From a2ba86a66a9b6445ae48acbabf72225ecaaceb83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Zwoli=C5=84ski?= Date: Wed, 14 Feb 2024 17:14:27 +0100 Subject: [PATCH 2/6] wip --- cli/src/native.rs | 66 +++++++++------------------------------- node/src/node.rs | 16 ++++++++++ node/src/node_builder.rs | 25 ++++++++++++--- node/src/test_utils.rs | 4 +-- 4 files changed, 54 insertions(+), 57 deletions(-) diff --git a/cli/src/native.rs b/cli/src/native.rs index aa417dd9..ffe59135 100644 --- a/cli/src/native.rs +++ b/cli/src/native.rs @@ -6,11 +6,11 @@ use anyhow::{bail, Context, Result}; use celestia_rpc::prelude::*; use celestia_rpc::Client; use clap::Parser; -use directories::ProjectDirs; use libp2p::{identity, multiaddr::Protocol, Multiaddr}; use lumina_node::blockstore::SledBlockstore; -use lumina_node::network::{canonical_network_bootnodes, network_genesis, network_id, Network}; -use lumina_node::node::{Node, NodeConfig}; +use lumina_node::network::Network; +use lumina_node::node::Node; +use lumina_node::node::NodeBuilder; use lumina_node::store::{SledStore, Store}; use sled::Db; use tokio::fs; @@ -44,22 +44,17 @@ pub(crate) struct Params { pub(crate) async fn run(args: Params) -> Result<()> { let network = args.network.into(); - let p2p_local_keypair = identity::Keypair::generate_ed25519(); - let p2p_bootnodes = if args.bootnodes.is_empty() { + let bootnodes = if args.bootnodes.is_empty() { match network { Network::Private => fetch_bridge_multiaddrs(CELESTIA_LOCAL_BRIDGE_RPC_ADDR).await?, - network => canonical_network_bootnodes(network).collect(), + network => network.canonical_bootnodes().collect(), } } else { args.bootnodes }; - let network_id = network_id(network).to_owned(); - let genesis_hash = network_genesis(network); - - info!("Initializing store"); - let db = open_db(args.store, &network_id).await?; + let db = open_db(args.store.unwrap()).await?; let store = SledStore::new(db.clone()).await?; let blockstore = SledBlockstore::new(db).await?; @@ -68,17 +63,12 @@ pub(crate) async fn run(args: Params) -> Result<()> { Err(_) => info!("Initialised new store"), } - let node = Node::new(NodeConfig { - network_id, - genesis_hash, - p2p_local_keypair, - p2p_bootnodes, - p2p_listen_on: args.listen_addrs, - blockstore, - store, - }) - .await - .context("Failed to start node")?; + let node = NodeBuilder::from_network_with_defaults(network) + .await? + .with_bootnodes(bootnodes) + .build() + .await + .context("Failed to start node")?; node.wait_connected_trusted().await?; @@ -88,35 +78,9 @@ pub(crate) async fn run(args: Params) -> Result<()> { } } -async fn open_db(path: Option, network_id: &str) -> Result { - if let Some(path) = path { - let db = spawn_blocking(|| sled::open(path)).await??; - return Ok(db); - } - - let cache_dir = - ProjectDirs::from("co", "eiger", "lumina").context("Couldn't find lumina's cache dir")?; - let mut cache_dir = cache_dir.cache_dir().to_owned(); - - // TODO: remove it in 2 months or after a few releases - // If we find an old ('celestia') cache dir, move it to the new one. - if let Some(old_cache_dir) = ProjectDirs::from("co", "eiger", "celestia") { - let old_cache_dir = old_cache_dir.cache_dir(); - if old_cache_dir.exists() && !cache_dir.exists() { - warn!( - "Migrating old cache dir to a new location: {} -> {}", - old_cache_dir.display(), - cache_dir.display() - ); - fs::rename(old_cache_dir, &cache_dir).await?; - } - } - - cache_dir.push(network_id); - // TODO: should we create there also a subdirectory for the 'db' - // in case we want to put there some other stuff too? - let db = spawn_blocking(|| sled::open(cache_dir)).await??; - Ok(db) +async fn open_db(path: PathBuf) -> Result { + let db = spawn_blocking(|| sled::open(path)).await??; + return Ok(db); } /// Get the address of the local bridge node diff --git a/node/src/node.rs b/node/src/node.rs index 114914fb..9834ddb6 100644 --- a/node/src/node.rs +++ b/node/src/node.rs @@ -7,6 +7,7 @@ use std::ops::RangeBounds; use std::sync::Arc; +use blockstore::Blockstore; use celestia_types::hash::Hash; use celestia_types::namespaced_data::NamespacedData; use celestia_types::nmt::Namespace; @@ -17,6 +18,7 @@ use cid::Cid; use libp2p::swarm::NetworkInfo; use libp2p::{Multiaddr, PeerId}; +use crate::network::Network; use crate::p2p::{P2p, P2pError}; use crate::peer_tracker::PeerTrackerInfo; use crate::store::{Store, StoreError}; @@ -61,6 +63,20 @@ where Node { p2p, store, syncer } } + pub fn builder() -> NodeBuilder + where + B: Blockstore + 'static, + { + NodeBuilder::new() + } + + pub fn from_network(network: Network) -> NodeBuilder + where + B: Blockstore + 'static, + { + NodeBuilder::from_network(network) + } + /// Get node's local peer ID. pub fn local_peer_id(&self) -> &PeerId { self.p2p.local_peer_id() diff --git a/node/src/node_builder.rs b/node/src/node_builder.rs index 54cf9f7a..a1510a7f 100644 --- a/node/src/node_builder.rs +++ b/node/src/node_builder.rs @@ -132,21 +132,28 @@ where self } + pub fn from_network(network: Network) -> Self { + Self::new() + .with_network(network) + .with_genesis(network.genesis()) + .with_bootnodes(network.canonical_bootnodes().collect()) + } + pub async fn build(self) -> Result> { let network = self .network .ok_or_else(|| NodeBuilderError::SettingMissing("network".into()))?; - let local_keypair = self - .p2p_local_keypair - .ok_or_else(|| NodeBuilderError::SettingMissing("p2p_local_keypair".into()))?; let blockstore = self .blockstore .ok_or_else(|| NodeBuilderError::SettingMissing("blockstore".into()))?; let store = self .store + .map(Arc::new) .ok_or_else(|| NodeBuilderError::SettingMissing("store".into()))?; - let store = Arc::new(store); + let local_keypair = self + .p2p_local_keypair + .unwrap_or_else(Keypair::generate_ed25519); let p2p = Arc::new(P2p::start(P2pArgs { network, @@ -235,6 +242,16 @@ mod native { } } + impl NodeBuilder { + pub async fn from_network_with_defaults(network: Network) -> Result { + Self::from_network(network) + .with_default_blockstore() + .await? + .with_default_store() + .await + } + } + // TODO(02.2024): remove in 3 months or after few releases // Previously we used `.cache/celestia/{network_id}` (or os equivalent) for // sled's persistent storage. This function will migrate it to a new lumina diff --git a/node/src/test_utils.rs b/node/src/test_utils.rs index cd36e11c..51389598 100644 --- a/node/src/test_utils.rs +++ b/node/src/test_utils.rs @@ -11,7 +11,7 @@ use crate::{ blockstore::InMemoryBlockstore, executor::timeout, network::Network, - node::NodeBuilder, + node::{Node, NodeBuilder}, p2p::{P2pCmd, P2pError}, peer_tracker::PeerTrackerInfo, store::InMemoryStore, @@ -38,7 +38,7 @@ pub fn gen_filled_store(amount: u64) -> (InMemoryStore, ExtendedHeaderGenerator) /// Can be used to fill the missing fields with `..test_node_config()` syntax. pub fn test_node_builder() -> NodeBuilder { let node_keypair = identity::Keypair::generate_ed25519(); - NodeBuilder::new() + Node::builder() .with_network(Network::Private) .with_p2p_keypair(node_keypair) .with_blockstore(InMemoryBlockstore::new()) From ccd690e1b22444cbb9d128d7d373ad3082f0ce7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Zwoli=C5=84ski?= Date: Wed, 14 Feb 2024 17:17:58 +0100 Subject: [PATCH 3/6] wip2 --- node/src/node_builder.rs | 86 +++++++++++++++++----------------------- 1 file changed, 37 insertions(+), 49 deletions(-) diff --git a/node/src/node_builder.rs b/node/src/node_builder.rs index a1510a7f..466a428a 100644 --- a/node/src/node_builder.rs +++ b/node/src/node_builder.rs @@ -37,9 +37,9 @@ pub enum NodeBuilderError { #[error("Received io error from persistent storage: {0}")] IoError(#[from] io::Error), - /// A required setting wasn't provided. - #[error("Required setting not provided: {0}")] - SettingMissing(String), + /// Network was required but not provided. + #[error("Network not provided. Consider calling `.with_network`")] + NetworkMissing, } /// Node conifguration. @@ -62,10 +62,6 @@ where blockstore: Option, /// The store for headers. store: Option, - /// A handle for the [`sled::Db`] to initialize default sled store and blockstore - /// within the same instance. - #[cfg(not(target_arch = "wasm32"))] - sled_db: Option, } impl Default for NodeBuilder @@ -82,8 +78,6 @@ where p2p_listen_on: vec![], blockstore: None, store: None, - #[cfg(not(target_arch = "wasm32"))] - sled_db: None, } } } @@ -140,21 +134,29 @@ where } pub async fn build(self) -> Result> { - let network = self - .network - .ok_or_else(|| NodeBuilderError::SettingMissing("network".into()))?; - let blockstore = self - .blockstore - .ok_or_else(|| NodeBuilderError::SettingMissing("blockstore".into()))?; - let store = self - .store - .map(Arc::new) - .ok_or_else(|| NodeBuilderError::SettingMissing("store".into()))?; - + let network = self.network.ok_or(NodeBuilderError::NetworkMissing)?; let local_keypair = self .p2p_local_keypair .unwrap_or_else(Keypair::generate_ed25519); + if self.blockstore.is_none() || self.store.is_none() { + #[cfg(not(target_arch = "wasm32"))] + { + let db = native::default_sled_db(network).await?; + if self.blockstore.is_none() { + let blockstore = crate::blockstore::SledBlockstore::new(db.clone()).await?; + self.blockstore = Some(blockstore); + } + if self.store.is_none() { + let store = crate::store::SledStore::new(db).await?; + self.store = Some(store); + } + } + } + + let blockstore = self.blockstore.unwrap(); + let store = Arc::new(self.store.unwrap()); + let p2p = Arc::new(P2p::start(P2pArgs { network, local_keypair, @@ -186,42 +188,28 @@ mod native { use super::*; - impl NodeBuilder - where - B: Blockstore, - S: Store, - { - async fn get_sled_db(&mut self) -> Result { - if let Some(db) = self.sled_db.clone() { - return Ok(db); - } + pub(super) async fn default_sled_db(network: Network) -> Result { + let network_id = network.id(); + let mut data_dir = utils::data_dir() + .ok_or_else(|| StoreError::OpenFailed("Can't find home of current user".into()))?; - let network_id = self - .network - .ok_or_else(|| NodeBuilderError::SettingMissing("network".into()))? - .id(); - let mut data_dir = utils::data_dir() - .ok_or_else(|| StoreError::OpenFailed("Can't find home of current user".into()))?; + // TODO(02.2024): remove in 3 months or after few releases + migrate_from_old_cache_dir(&data_dir).await?; - // TODO(02.2024): remove in 3 months or after few releases - migrate_from_old_cache_dir(&data_dir).await?; + data_dir.push(network_id); + data_dir.push("db"); - data_dir.push(network_id); - data_dir.push("db"); - - let db = spawn_blocking(|| sled::open(data_dir)) - .await - .map_err(io::Error::from)? - .map_err(|e| StoreError::OpenFailed(e.to_string()))?; + let db = spawn_blocking(|| sled::open(data_dir)) + .await + .map_err(io::Error::from)? + .map_err(|e| StoreError::OpenFailed(e.to_string()))?; - self.sled_db = Some(db.clone()); - - Ok(db) - } + Ok(db) } - impl NodeBuilder + impl NodeBuilder where + B: Blockstore, S: Store, { pub async fn with_default_blockstore(mut self) -> Result { From 7086e6bdb360bb2246d3210333c7522961ca87f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Zwoli=C5=84ski?= Date: Thu, 15 Feb 2024 19:15:22 +0100 Subject: [PATCH 4/6] wip --- node/src/node.rs | 24 +++++++------------ node/src/node_builder.rs | 52 +++++++--------------------------------- node/src/store.rs | 51 +++++++++++++++++++++++++++++++++++++++ node/src/syncer.rs | 41 ++++++++----------------------- node/src/test_utils.rs | 8 +++---- 5 files changed, 81 insertions(+), 95 deletions(-) diff --git a/node/src/node.rs b/node/src/node.rs index 9834ddb6..14345c44 100644 --- a/node/src/node.rs +++ b/node/src/node.rs @@ -45,34 +45,28 @@ pub enum NodeError { } /// Celestia node. -pub struct Node -where - S: Store + 'static, -{ +pub struct Node { p2p: Arc, - syncer: Arc>, - store: Arc, + syncer: Arc, + store: Arc, } -impl Node -where - S: Store, -{ +impl Node { /// Creates and starts a new celestia node with a given config. - pub(crate) fn new(p2p: Arc, syncer: Arc>, store: Arc) -> Self { + pub(crate) fn new(p2p: Arc, syncer: Arc, store: Arc) -> Self { Node { p2p, store, syncer } } - pub fn builder() -> NodeBuilder + pub fn builder() -> NodeBuilder where - B: Blockstore + 'static, + B: Blockstore, { NodeBuilder::new() } - pub fn from_network(network: Network) -> NodeBuilder + pub fn from_network(network: Network) -> NodeBuilder where - B: Blockstore + 'static, + B: Blockstore, { NodeBuilder::from_network(network) } diff --git a/node/src/node_builder.rs b/node/src/node_builder.rs index 466a428a..c3f9aa5d 100644 --- a/node/src/node_builder.rs +++ b/node/src/node_builder.rs @@ -43,10 +43,9 @@ pub enum NodeBuilderError { } /// Node conifguration. -pub struct NodeBuilder +pub struct NodeBuilder where B: Blockstore, - S: Store, { /// An id of the network to connect to. network: Option, @@ -61,13 +60,12 @@ where /// The blockstore for bitswap. blockstore: Option, /// The store for headers. - store: Option, + store: Option>, } -impl Default for NodeBuilder +impl Default for NodeBuilder where B: Blockstore, - S: Store, { fn default() -> Self { Self { @@ -82,10 +80,9 @@ where } } -impl NodeBuilder +impl NodeBuilder where B: Blockstore + 'static, - S: Store, { pub fn new() -> Self { Self::default() @@ -121,8 +118,8 @@ where self } - pub fn with_store(mut self, store: S) -> Self { - self.store = Some(store); + pub fn with_store(mut self, store: S) -> Self { + self.store = Some(Arc::new(store)); self } @@ -133,7 +130,7 @@ where .with_bootnodes(network.canonical_bootnodes().collect()) } - pub async fn build(self) -> Result> { + pub async fn build(self) -> Result { let network = self.network.ok_or(NodeBuilderError::NetworkMissing)?; let local_keypair = self .p2p_local_keypair @@ -155,7 +152,7 @@ where } let blockstore = self.blockstore.unwrap(); - let store = Arc::new(self.store.unwrap()); + let store = self.store.unwrap(); let p2p = Arc::new(P2p::start(P2pArgs { network, @@ -207,39 +204,6 @@ mod native { Ok(db) } - impl NodeBuilder - where - B: Blockstore, - S: Store, - { - pub async fn with_default_blockstore(mut self) -> Result { - let db = self.get_sled_db().await?; - self.blockstore = Some(SledBlockstore::new(db).await?); - Ok(self) - } - } - - impl NodeBuilder - where - B: Blockstore, - { - pub async fn with_default_store(mut self) -> Result { - let db = self.get_sled_db().await?; - self.store = Some(SledStore::new(db).await?); - Ok(self) - } - } - - impl NodeBuilder { - pub async fn from_network_with_defaults(network: Network) -> Result { - Self::from_network(network) - .with_default_blockstore() - .await? - .with_default_store() - .await - } - } - // TODO(02.2024): remove in 3 months or after few releases // Previously we used `.cache/celestia/{network_id}` (or os equivalent) for // sled's persistent storage. This function will migrate it to a new lumina diff --git a/node/src/store.rs b/node/src/store.rs index 5df11f5c..ab934f18 100644 --- a/node/src/store.rs +++ b/node/src/store.rs @@ -3,6 +3,7 @@ use std::fmt::Debug; use std::io::{self, Cursor}; use std::ops::{Bound, RangeBounds, RangeInclusive}; +use std::sync::Arc; use async_trait::async_trait; use celestia_tendermint_proto::Protobuf; @@ -236,6 +237,56 @@ pub enum StoreError { InvalidHeadersRange, } +// impl Store for Arc +// where +// S: Store + ?Sized, +// { +// async fn get_head(&self) -> Result { +// self.get_head() +// } + +// async fn get_by_hash(&self, hash: &Hash) -> Result { +// self.get_by_hash(hash) +// } + +// async fn get_by_height(&self, height: u64) -> Result { +// self.get_by_height(height) +// } + +// async fn head_height(&self) -> Result { +// self.head_height() +// } + +// async fn has(&self, hash: &Hash) -> bool { +// self.has(hash) +// } + +// async fn has_at(&self, height: u64) -> bool { +// self.has_at(height) +// } + +// async fn append_single_unchecked(&self, header: ExtendedHeader) -> Result<()> { +// self.append_single_unchecked(header) +// } + +// async fn next_unsampled_height(&self) -> Result { +// self.next_unsampled_height() +// } + +// async fn update_sampling_metadata( +// &self, +// height: u64, +// accepted: bool, +// cids: Vec, +// ) -> Result { +// self.update_sampling_metadata(height, accepted, cids) +// } + +// async fn get_sampling_metadata(&self, height: u64) -> Result> { +// self.get_sampling_metadata(height) +// } +// } + #[derive(Message)] struct RawSamplingMetadata { #[prost(bool, tag = "1")] diff --git a/node/src/syncer.rs b/node/src/syncer.rs index c2327c97..2fb75b27 100644 --- a/node/src/syncer.rs +++ b/node/src/syncer.rs @@ -9,7 +9,6 @@ //! headers announced on the `header-sub` p2p protocol to keep the `subjective_head` as close //! to the `network_head` as possible. -use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; @@ -66,26 +65,19 @@ impl From for SyncerError { /// Component responsible for synchronizing block headers from the network. #[derive(Debug)] -pub struct Syncer -where - S: Store + 'static, -{ +pub struct Syncer { cmd_tx: mpsc::Sender, cancellation_token: CancellationToken, - _store: PhantomData, } /// Arguments used to configure the [`Syncer`]. -pub struct SyncerArgs -where - S: Store + 'static, -{ +pub struct SyncerArgs { /// Hash of the genesis block. pub genesis_hash: Option, /// Handler for the peer to peer messaging. pub p2p: Arc, /// Headers storage. - pub store: Arc, + pub store: Arc, } #[derive(Debug)] @@ -104,12 +96,9 @@ pub struct SyncingInfo { pub subjective_head: u64, } -impl Syncer -where - S: Store, -{ +impl Syncer { /// Create and start the [`Syncer`]. - pub fn start(args: SyncerArgs) -> Result { + pub fn start(args: SyncerArgs) -> Result { let cancellation_token = CancellationToken::new(); let (cmd_tx, cmd_rx) = mpsc::channel(16); let mut worker = Worker::new(args, cancellation_token.child_token(), cmd_rx)?; @@ -121,7 +110,6 @@ where Ok(Syncer { cancellation_token, cmd_tx, - _store: PhantomData, }) } @@ -154,23 +142,17 @@ where } } -impl Drop for Syncer -where - S: Store, -{ +impl Drop for Syncer { fn drop(&mut self) { self.cancellation_token.cancel(); } } -struct Worker -where - S: Store + 'static, -{ +struct Worker { cancellation_token: CancellationToken, cmd_rx: mpsc::Receiver, p2p: Arc, - store: Arc, + store: Arc, header_sub_watcher: watch::Receiver>, genesis_hash: Option, subjective_head_height: Option, @@ -185,12 +167,9 @@ struct Ongoing { cancellation_token: CancellationToken, } -impl Worker -where - S: Store, -{ +impl Worker { fn new( - args: SyncerArgs, + args: SyncerArgs, cancellation_token: CancellationToken, cmd_rx: mpsc::Receiver, ) -> Result { diff --git a/node/src/test_utils.rs b/node/src/test_utils.rs index 51389598..9cd746ad 100644 --- a/node/src/test_utils.rs +++ b/node/src/test_utils.rs @@ -36,7 +36,7 @@ pub fn gen_filled_store(amount: u64) -> (InMemoryStore, ExtendedHeaderGenerator) /// [`NodeBuilder`] with default values for the usage in tests. /// /// Can be used to fill the missing fields with `..test_node_config()` syntax. -pub fn test_node_builder() -> NodeBuilder { +pub fn test_node_builder() -> NodeBuilder { let node_keypair = identity::Keypair::generate_ed25519(); Node::builder() .with_network(Network::Private) @@ -46,14 +46,12 @@ pub fn test_node_builder() -> NodeBuilder { } /// [`NodeBuilder`] with listen address and default values for the usage in tests. -pub fn listening_test_node_config() -> NodeBuilder { +pub fn listening_test_node_config() -> NodeBuilder { test_node_builder().with_listeners(vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()]) } /// [`NodeBuilder`] with given keypair and default values for the usage in tests. -pub fn test_node_config_with_keypair( - keypair: Keypair, -) -> NodeBuilder { +pub fn test_node_config_with_keypair(keypair: Keypair) -> NodeBuilder { test_node_builder().with_p2p_keypair(keypair) } From b4d0f15dedef0307c80d67102e7aa5614c3b42a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Zwoli=C5=84ski?= Date: Mon, 19 Feb 2024 11:50:16 +0100 Subject: [PATCH 5/6] wip --- cli/src/native.rs | 4 +- node/src/node.rs | 16 +-- node/src/node_builder.rs | 129 ++++++++++++------- node/src/p2p.rs | 33 ++--- node/src/p2p/header_ex.rs | 23 +--- node/src/p2p/header_ex/server.rs | 20 +-- node/src/store.rs | 215 +++---------------------------- node/src/syncer.rs | 11 +- 8 files changed, 138 insertions(+), 313 deletions(-) diff --git a/cli/src/native.rs b/cli/src/native.rs index ffe59135..e2ef49d9 100644 --- a/cli/src/native.rs +++ b/cli/src/native.rs @@ -63,9 +63,9 @@ pub(crate) async fn run(args: Params) -> Result<()> { Err(_) => info!("Initialised new store"), } - let node = NodeBuilder::from_network_with_defaults(network) - .await? + let node = Node::from_network(network) .with_bootnodes(bootnodes) + .with_default_blockstore() .build() .await .context("Failed to start node")?; diff --git a/node/src/node.rs b/node/src/node.rs index 14345c44..8e679ac0 100644 --- a/node/src/node.rs +++ b/node/src/node.rs @@ -4,7 +4,6 @@ //! [`Store`]: crate::store::Store //! [`Syncer`]: crate::syncer::Syncer -use std::ops::RangeBounds; use std::sync::Arc; use blockstore::Blockstore; @@ -59,14 +58,14 @@ impl Node { pub fn builder() -> NodeBuilder where - B: Blockstore, + B: Blockstore + 'static, { NodeBuilder::new() } pub fn from_network(network: Network) -> NodeBuilder where - B: Blockstore, + B: Blockstore + 'static, { NodeBuilder::from_network(network) } @@ -213,10 +212,11 @@ impl Node { /// /// If range contains a height of a header that is not found in the store or [`RangeBounds`] /// cannot be converted to a valid range. - pub async fn get_headers(&self, range: R) -> Result> - where - R: RangeBounds + Send, - { - Ok(self.store.get_range(range).await?) + pub async fn get_headers( + &self, + from: Option, + to: Option, + ) -> Result> { + Ok(self.store.get_range(from, to).await?) } } diff --git a/node/src/node_builder.rs b/node/src/node_builder.rs index c3f9aa5d..3c6216b2 100644 --- a/node/src/node_builder.rs +++ b/node/src/node_builder.rs @@ -45,7 +45,7 @@ pub enum NodeBuilderError { /// Node conifguration. pub struct NodeBuilder where - B: Blockstore, + B: Blockstore + 'static, { /// An id of the network to connect to. network: Option, @@ -58,11 +58,28 @@ where /// List of the addresses where [`Node`] will listen for incoming connections. p2p_listen_on: Vec, /// The blockstore for bitswap. - blockstore: Option, + blockstore: BlockstoreChoice, /// The store for headers. store: Option>, } +enum BlockstoreChoice +where + B: Blockstore + 'static, +{ + Default, + Custom(B), +} + +impl BlockstoreChoice +where + B: Blockstore, +{ + fn is_default(&self) -> bool { + matches!(self, BlockstoreChoice::Default) + } +} + impl Default for NodeBuilder where B: Blockstore, @@ -74,7 +91,7 @@ where p2p_local_keypair: None, p2p_bootnodes: vec![], p2p_listen_on: vec![], - blockstore: None, + blockstore: BlockstoreChoice::Default, store: None, } } @@ -114,11 +131,11 @@ where } pub fn with_blockstore(mut self, blockstore: B) -> Self { - self.blockstore = Some(blockstore); + self.blockstore = BlockstoreChoice::Custom(blockstore); self } - pub fn with_store(mut self, store: S) -> Self { + pub fn with_store(mut self, store: S) -> Self { self.store = Some(Arc::new(store)); self } @@ -129,48 +146,6 @@ where .with_genesis(network.genesis()) .with_bootnodes(network.canonical_bootnodes().collect()) } - - pub async fn build(self) -> Result { - let network = self.network.ok_or(NodeBuilderError::NetworkMissing)?; - let local_keypair = self - .p2p_local_keypair - .unwrap_or_else(Keypair::generate_ed25519); - - if self.blockstore.is_none() || self.store.is_none() { - #[cfg(not(target_arch = "wasm32"))] - { - let db = native::default_sled_db(network).await?; - if self.blockstore.is_none() { - let blockstore = crate::blockstore::SledBlockstore::new(db.clone()).await?; - self.blockstore = Some(blockstore); - } - if self.store.is_none() { - let store = crate::store::SledStore::new(db).await?; - self.store = Some(store); - } - } - } - - let blockstore = self.blockstore.unwrap(); - let store = self.store.unwrap(); - - let p2p = Arc::new(P2p::start(P2pArgs { - network, - local_keypair, - bootnodes: self.p2p_bootnodes, - listen_on: self.p2p_listen_on, - blockstore, - store: store.clone(), - })?); - - let syncer = Arc::new(Syncer::start(SyncerArgs { - genesis_hash: self.genesis_hash, - store: store.clone(), - p2p: p2p.clone(), - })?); - - Ok(Node::new(p2p, syncer, store)) - } } #[cfg(not(target_arch = "wasm32"))] @@ -185,7 +160,65 @@ mod native { use super::*; - pub(super) async fn default_sled_db(network: Network) -> Result { + impl NodeBuilder { + pub fn with_default_blockstore(mut self) -> Self { + self.blockstore = BlockstoreChoice::Default; + self + } + } + + impl NodeBuilder + where + B: Blockstore + 'static, + { + pub async fn build(self) -> Result { + let network = self.network.ok_or(NodeBuilderError::NetworkMissing)?; + let local_keypair = self + .p2p_local_keypair + .unwrap_or_else(Keypair::generate_ed25519); + + let default_db = if self.blockstore.is_default() || self.store.is_none() { + Some(default_sled_db(network).await?) + } else { + None + }; + let store = if let Some(store) = self.store { + store + } else { + Arc::new(SledStore::new(default_db.clone().unwrap()).await?) + }; + + let p2p = if let BlockstoreChoice::Custom(blockstore) = self.blockstore { + Arc::new(P2p::start(P2pArgs { + network, + local_keypair, + bootnodes: self.p2p_bootnodes, + listen_on: self.p2p_listen_on, + blockstore, + store: store.clone(), + })?) + } else { + Arc::new(P2p::start(P2pArgs { + network, + local_keypair, + bootnodes: self.p2p_bootnodes, + listen_on: self.p2p_listen_on, + blockstore: SledBlockstore::new(default_db.unwrap()).await?, + store: store.clone(), + })?) + }; + + let syncer = Arc::new(Syncer::start(SyncerArgs { + genesis_hash: self.genesis_hash, + store: store.clone(), + p2p: p2p.clone(), + })?); + + Ok(Node::new(p2p, syncer, store)) + } + } + + async fn default_sled_db(network: Network) -> Result { let network_id = network.id(); let mut data_dir = utils::data_dir() .ok_or_else(|| StoreError::OpenFailed("Can't find home of current user".into()))?; diff --git a/node/src/p2p.rs b/node/src/p2p.rs index 09d083e7..e2f589cd 100644 --- a/node/src/p2p.rs +++ b/node/src/p2p.rs @@ -154,10 +154,9 @@ pub struct P2p { } /// Arguments used to configure the [`P2p`]. -pub struct P2pArgs +pub struct P2pArgs where B: Blockstore, - S: Store, { /// An id of the network to connect to. pub network: Network, @@ -170,7 +169,7 @@ where /// The store for headers. pub blockstore: B, /// The store for headers. - pub store: Arc, + pub store: Arc, } #[derive(Debug)] @@ -203,10 +202,9 @@ pub(crate) enum P2pCmd { impl P2p { /// Creates and starts a new p2p handler. - pub fn start(args: P2pArgs) -> Result + pub fn start(args: P2pArgs) -> Result where B: Blockstore + 'static, - S: Store + 'static, { validate_bootnode_addrs(&args.bootnodes)?; @@ -464,26 +462,24 @@ impl P2p { /// Our network behaviour. #[derive(NetworkBehaviour)] -struct Behaviour +struct Behaviour where B: Blockstore + 'static, - S: Store + 'static, { autonat: autonat::Behaviour, bitswap: beetswap::Behaviour, ping: ping::Behaviour, identify: identify::Behaviour, - header_ex: HeaderExBehaviour, + header_ex: HeaderExBehaviour, gossipsub: gossipsub::Behaviour, kademlia: kad::Behaviour, } -struct Worker +struct Worker where B: Blockstore + 'static, - S: Store + 'static, { - swarm: Swarm>, + swarm: Swarm>, header_sub_topic_hash: TopicHash, cmd_rx: mpsc::Receiver, peer_tracker: Arc, @@ -491,13 +487,12 @@ where bitswap_queries: HashMap, beetswap::Error>>, } -impl Worker +impl Worker where B: Blockstore, - S: Store, { fn new( - args: P2pArgs, + args: P2pArgs, cmd_rx: mpsc::Receiver, header_sub_watcher: watch::Sender>, peer_tracker: Arc, @@ -594,7 +589,7 @@ where } } - async fn on_swarm_event(&mut self, ev: SwarmEvent>) -> Result<()> { + async fn on_swarm_event(&mut self, ev: SwarmEvent>) -> Result<()> { match ev { SwarmEvent::Behaviour(ev) => match ev { BehaviourEvent::Identify(ev) => self.on_identify_event(ev).await?, @@ -877,13 +872,12 @@ fn validate_bootnode_addrs(addrs: &[Multiaddr]) -> Result<(), P2pError> { } } -fn init_gossipsub<'a, B, S>( - args: &'a P2pArgs, +fn init_gossipsub<'a, B>( + args: &'a P2pArgs, topics: impl IntoIterator, ) -> Result where B: Blockstore, - S: Store, { // Set the message authenticity - How we expect to publish messages // Here we expect the publisher to sign the message with their key. @@ -907,10 +901,9 @@ where Ok(gossipsub) } -fn init_kademlia(args: &P2pArgs) -> Result> +fn init_kademlia(args: &P2pArgs) -> Result> where B: Blockstore, - S: Store, { let local_peer_id = PeerId::from(args.local_keypair.public()); let mut config = kad::Config::default(); diff --git a/node/src/p2p/header_ex.rs b/node/src/p2p/header_ex.rs index dff229a2..cd63c9cb 100644 --- a/node/src/p2p/header_ex.rs +++ b/node/src/p2p/header_ex.rs @@ -51,19 +51,16 @@ type ReqRespEvent = request_response::Event; type ReqRespMessage = request_response::Message; type ReqRespConnectionHandler = ::ConnectionHandler; -pub(crate) struct HeaderExBehaviour -where - S: Store + 'static, -{ +pub(crate) struct HeaderExBehaviour { req_resp: ReqRespBehaviour, client_handler: HeaderExClientHandler, - server_handler: HeaderExServerHandler, + server_handler: HeaderExServerHandler, } -pub(crate) struct HeaderExConfig { +pub(crate) struct HeaderExConfig { pub network: Network, pub peer_tracker: Arc, - pub header_store: Arc, + pub header_store: Arc, } /// Representation of all the errors that can occur when interacting with the header-ex. @@ -90,11 +87,8 @@ pub enum HeaderExError { OutboundFailure(OutboundFailure), } -impl HeaderExBehaviour -where - S: Store + 'static, -{ - pub(crate) fn new(config: HeaderExConfig) -> Self { +impl HeaderExBehaviour { + pub(crate) fn new(config: HeaderExConfig) -> Self { HeaderExBehaviour { req_resp: ReqRespBehaviour::new( [( @@ -187,10 +181,7 @@ where } } -impl NetworkBehaviour for HeaderExBehaviour -where - S: Store + 'static, -{ +impl NetworkBehaviour for HeaderExBehaviour { type ConnectionHandler = ConnHandler; type ToSwarm = (); diff --git a/node/src/p2p/header_ex/server.rs b/node/src/p2p/header_ex/server.rs index c9358d02..43fbd23a 100644 --- a/node/src/p2p/header_ex/server.rs +++ b/node/src/p2p/header_ex/server.rs @@ -18,13 +18,11 @@ use crate::store::Store; const MAX_HEADERS_AMOUNT_RESPONSE: u64 = 512; -pub(super) struct HeaderExServerHandler +pub(super) struct HeaderExServerHandler where - S: Store, R: ResponseSender, { - store: Arc, - + store: Arc, rx: mpsc::Receiver<(R::Channel, ResponseType)>, tx: mpsc::Sender<(R::Channel, ResponseType)>, } @@ -45,12 +43,11 @@ impl ResponseSender for ReqRespBehaviour { } } -impl HeaderExServerHandler +impl HeaderExServerHandler where - S: Store + 'static, R: ResponseSender, { - pub(super) fn new(store: Arc) -> Self { + pub(super) fn new(store: Arc) -> Self { let (tx, rx) = mpsc::channel(32); HeaderExServerHandler { store, rx, tx } } @@ -400,12 +397,9 @@ mod tests { // helper which waits for result over the test channel, while continously polling the handler // needed because `HeaderExServerHandler::poll` never returns `Ready` - async fn poll_handler_for_result( - handler: &mut HeaderExServerHandler, - ) -> Vec - where - S: Store + 'static, - { + async fn poll_handler_for_result( + handler: &mut HeaderExServerHandler, + ) -> Vec { let (tx, receiver) = oneshot::channel(); let mut sender = TestResponseSender(Some(tx)); diff --git a/node/src/store.rs b/node/src/store.rs index ab934f18..d52936e8 100644 --- a/node/src/store.rs +++ b/node/src/store.rs @@ -2,8 +2,6 @@ use std::fmt::Debug; use std::io::{self, Cursor}; -use std::ops::{Bound, RangeBounds, RangeInclusive}; -use std::sync::Arc; use async_trait::async_trait; use celestia_tendermint_proto::Protobuf; @@ -58,23 +56,29 @@ pub trait Store: Send + Sync + Debug { /// Returns the header of a specific height. async fn get_by_height(&self, height: u64) -> Result; - /// Returns the headers from the given heights range. + /// Returns the headers from the given heights range `from..=to`. /// - /// If start of the range is unbounded, the first returned header will be of height 1. - /// If end of the range is unbounded, the last returned header will be the last header in the + /// If `from` is `None` the first returned header will be of height 1. + /// If `to` is `None` the last returned header will be the last header in the /// store. + /// If `from > to` the range is considered empty and no header is returned. /// /// # Errors /// - /// If range contains a height of a header that is not found in the store or [`RangeBounds`] - /// cannot be converted to a valid range. - async fn get_range(&self, range: R) -> Result> - where - R: RangeBounds + Send, - { + /// - If either `from` or `to` is out of bounds of stored headers. + /// - If some header is not found. + /// - If the amount of headers to return is bigger than `usize` capacity. + async fn get_range(&self, from: Option, to: Option) -> Result> { let head_height = self.head_height().await?; - let range = to_headers_range(range, head_height)?; + let from = from.unwrap_or(1); + let to = to.unwrap_or(head_height); + + if from < 1 || to < 1 || from > head_height || to > head_height { + return Err(StoreError::NotFound); + } + + let range = from..=to; let amount = if range.is_empty() { 0 } else { @@ -237,56 +241,6 @@ pub enum StoreError { InvalidHeadersRange, } -// impl Store for Arc -// where -// S: Store + ?Sized, -// { -// async fn get_head(&self) -> Result { -// self.get_head() -// } - -// async fn get_by_hash(&self, hash: &Hash) -> Result { -// self.get_by_hash(hash) -// } - -// async fn get_by_height(&self, height: u64) -> Result { -// self.get_by_height(height) -// } - -// async fn head_height(&self) -> Result { -// self.head_height() -// } - -// async fn has(&self, hash: &Hash) -> bool { -// self.has(hash) -// } - -// async fn has_at(&self, height: u64) -> bool { -// self.has_at(height) -// } - -// async fn append_single_unchecked(&self, header: ExtendedHeader) -> Result<()> { -// self.append_single_unchecked(header) -// } - -// async fn next_unsampled_height(&self) -> Result { -// self.next_unsampled_height() -// } - -// async fn update_sampling_metadata( -// &self, -// height: u64, -// accepted: bool, -// cids: Vec, -// ) -> Result { -// self.update_sampling_metadata(height, accepted, cids) -// } - -// async fn get_sampling_metadata(&self, height: u64) -> Result> { -// self.get_sampling_metadata(height) -// } -// } - #[derive(Message)] struct RawSamplingMetadata { #[prost(bool, tag = "1")] @@ -328,140 +282,3 @@ impl From for RawSamplingMetadata { } } } - -/// a helper function to convert any kind of range to the inclusive range of header heights. -fn to_headers_range(bounds: impl RangeBounds, last_index: u64) -> Result> { - let start = match bounds.start_bound() { - // in case of unbounded, default to the first height - Bound::Unbounded => 1, - // range starts after the last index or before first height - Bound::Included(&x) if x > last_index || x == 0 => return Err(StoreError::NotFound), - Bound::Excluded(&x) if x >= last_index => return Err(StoreError::NotFound), - // valid start indexes - Bound::Included(&x) => x, - Bound::Excluded(&x) => x + 1, // can't overflow thanks to last_index check - }; - let end = match bounds.end_bound() { - // in case of unbounded, default to the last index - Bound::Unbounded => last_index, - // range ends after the last index - Bound::Included(&x) if x > last_index => return Err(StoreError::NotFound), - Bound::Excluded(&x) if x > last_index + 1 => return Err(StoreError::NotFound), - // prevent the underflow later on - Bound::Excluded(&0) => 0, - // valid end indexes - Bound::Included(&x) => x, - Bound::Excluded(&x) => x - 1, - }; - - Ok(start..=end) -} - -#[cfg(test)] -mod tests { - use std::ops::Bound; - - use super::to_headers_range; - - #[test] - fn converts_bounded_ranges() { - assert_eq!(1..=15, to_headers_range(1..16, 100).unwrap()); - assert_eq!(1..=15, to_headers_range(1..=15, 100).unwrap()); - assert_eq!(300..=400, to_headers_range(300..401, 500).unwrap()); - assert_eq!(300..=400, to_headers_range(300..=400, 500).unwrap()); - } - - #[test] - fn starts_from_one_when_unbounded_start() { - assert_eq!(&1, to_headers_range(..=10, 100).unwrap().start()); - assert_eq!(&1, to_headers_range(..10, 100).unwrap().start()); - assert_eq!(&1, to_headers_range(.., 100).unwrap().start()); - } - - #[test] - fn ends_on_last_index_when_unbounded_end() { - assert_eq!(&10, to_headers_range(1.., 10).unwrap().end()); - assert_eq!(&11, to_headers_range(1.., 11).unwrap().end()); - assert_eq!(&10, to_headers_range(.., 10).unwrap().end()); - } - - #[test] - fn handle_ranges_ending_precisely_at_last_index() { - let last_index = 10; - - let bounds_ending_at_last_index = [ - (Bound::Unbounded, Bound::Included(last_index)), - (Bound::Unbounded, Bound::Excluded(last_index + 1)), - ]; - - for bound in bounds_ending_at_last_index { - let range = to_headers_range(bound, last_index).unwrap(); - assert_eq!(*range.end(), last_index); - } - } - - #[test] - fn handle_ranges_ending_after_last_index() { - let last_index = 10; - - let bounds_ending_after_last_index = [ - (Bound::Unbounded, Bound::Included(last_index + 1)), - (Bound::Unbounded, Bound::Excluded(last_index + 2)), - ]; - - for bound in bounds_ending_after_last_index { - to_headers_range(bound, last_index).unwrap_err(); - } - } - - #[test] - fn errors_if_zero_heigth_is_included() { - let includes_zero_height = 0..5; - to_headers_range(includes_zero_height, 10).unwrap_err(); - } - - #[test] - fn handle_ranges_starting_precisely_at_last_index() { - let last_index = 10; - - let bounds_starting_at_last_index = [ - (Bound::Included(last_index), Bound::Unbounded), - (Bound::Excluded(last_index - 1), Bound::Unbounded), - ]; - - for bound in bounds_starting_at_last_index { - let range = to_headers_range(bound, last_index).unwrap(); - assert_eq!(*range.start(), last_index); - } - } - - #[test] - fn handle_ranges_starting_after_last_index() { - let last_index = 10; - - let bounds_starting_after_last_index = [ - (Bound::Included(last_index + 1), Bound::Unbounded), - (Bound::Excluded(last_index), Bound::Unbounded), - ]; - - for bound in bounds_starting_after_last_index { - to_headers_range(bound, last_index).unwrap_err(); - } - } - - #[test] - fn handle_ranges_that_lead_to_empty_ranges() { - let last_index = 10; - - let bounds_leading_to_empty_range = [ - (Bound::Unbounded, Bound::Excluded(0)), - (Bound::Included(3), Bound::Excluded(3)), - (Bound::Included(3), Bound::Included(2)), - (Bound::Excluded(2), Bound::Included(2)), - ]; - - for bound in bounds_leading_to_empty_range { - assert!(to_headers_range(bound, last_index).unwrap().is_empty()); - } - } -} diff --git a/node/src/syncer.rs b/node/src/syncer.rs index 2fb75b27..84b9127e 100644 --- a/node/src/syncer.rs +++ b/node/src/syncer.rs @@ -328,7 +328,7 @@ impl Worker { .build(); loop { - match try_init(&p2p, &*store, genesis_hash).await { + match try_init(&p2p, &store, genesis_hash).await { Ok(network_height) => { tx.maybe_send(network_height); break; @@ -475,10 +475,7 @@ impl Worker { } } -async fn try_init(p2p: &P2p, store: &S, genesis_hash: Option) -> Result -where - S: Store, -{ +async fn try_init(p2p: &P2p, store: &Arc, genesis_hash: Option) -> Result { p2p.wait_connected_trusted().await?; // IF store is empty, intialize it with genesis @@ -807,7 +804,7 @@ mod tests { } async fn assert_syncing( - syncer: &Syncer, + syncer: &Syncer, store: &InMemoryStore, expected_local_head: u64, expected_subjective_head: u64, @@ -827,7 +824,7 @@ mod tests { async fn initialized_syncer( genesis: ExtendedHeader, head: ExtendedHeader, - ) -> (Syncer, Arc, MockP2pHandle) { + ) -> (Syncer, Arc, MockP2pHandle) { let (mock, mut handle) = P2p::mocked(); let store = Arc::new(InMemoryStore::new()); From b14996297860165840b871f6d44568ca4f7a70e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Zwoli=C5=84ski?= Date: Mon, 19 Feb 2024 16:49:36 +0100 Subject: [PATCH 6/6] wip --- cli/src/native.rs | 47 +++++++++++++++++++++++++---------------------- cli/src/server.rs | 9 +++++---- 2 files changed, 30 insertions(+), 26 deletions(-) diff --git a/cli/src/native.rs b/cli/src/native.rs index e2ef49d9..c328057c 100644 --- a/cli/src/native.rs +++ b/cli/src/native.rs @@ -1,4 +1,5 @@ use std::env; +use std::path::Path; use std::path::PathBuf; use std::time::Duration; @@ -6,18 +7,15 @@ use anyhow::{bail, Context, Result}; use celestia_rpc::prelude::*; use celestia_rpc::Client; use clap::Parser; -use libp2p::{identity, multiaddr::Protocol, Multiaddr}; +use libp2p::{multiaddr::Protocol, Multiaddr}; use lumina_node::blockstore::SledBlockstore; use lumina_node::network::Network; use lumina_node::node::Node; -use lumina_node::node::NodeBuilder; -use lumina_node::store::{SledStore, Store}; +use lumina_node::store::SledStore; use sled::Db; -use tokio::fs; use tokio::task::spawn_blocking; use tokio::time::sleep; use tracing::info; -use tracing::warn; use crate::common::ArgNetwork; @@ -54,21 +52,25 @@ pub(crate) async fn run(args: Params) -> Result<()> { args.bootnodes }; - let db = open_db(args.store.unwrap()).await?; - let store = SledStore::new(db.clone()).await?; - let blockstore = SledBlockstore::new(db).await?; - - match store.head_height().await { - Ok(height) => info!("Initialised store with head height: {height}"), - Err(_) => info!("Initialised new store"), - } - - let node = Node::from_network(network) - .with_bootnodes(bootnodes) - .with_default_blockstore() - .build() - .await - .context("Failed to start node")?; + let node = if let Some(path) = args.store { + let db = open_db(path).await?; + let store = SledStore::new(db.clone()).await?; + let blockstore = SledBlockstore::new(db).await?; + Node::from_network(network) + .with_bootnodes(bootnodes) + .with_store(store) + .with_blockstore(blockstore) + .build() + .await + .context("Failed to start node")? + } else { + Node::from_network(network) + .with_bootnodes(bootnodes) + .with_default_blockstore() + .build() + .await + .context("Failed to start node")? + }; node.wait_connected_trusted().await?; @@ -78,9 +80,10 @@ pub(crate) async fn run(args: Params) -> Result<()> { } } -async fn open_db(path: PathBuf) -> Result { +async fn open_db(path: impl AsRef) -> Result { + let path = path.as_ref().to_owned(); let db = spawn_blocking(|| sled::open(path)).await??; - return Ok(db); + Ok(db) } /// Get the address of the local bridge node diff --git a/cli/src/server.rs b/cli/src/server.rs index eae78138..989c1ded 100644 --- a/cli/src/server.rs +++ b/cli/src/server.rs @@ -10,7 +10,7 @@ use celestia_types::hash::Hash; use clap::Args; use libp2p::multiaddr::Protocol; use libp2p::Multiaddr; -use lumina_node::network::{canonical_network_bootnodes, network_genesis}; +use lumina_node::network::Network; use rust_embed::RustEmbed; use serde::Serialize; use tracing::info; @@ -50,10 +50,11 @@ pub(crate) struct Params { } pub(crate) async fn run(args: Params) -> Result<()> { - let network = args.network.into(); - let genesis_hash = network_genesis(network); + let network = Network::from(args.network); + let genesis_hash = network.genesis(); let bootnodes = if args.bootnodes.is_empty() { - canonical_network_bootnodes(network) + network + .canonical_bootnodes() .filter(|addr| addr.iter().any(|proto| proto == Protocol::WebTransport)) .collect() } else {