Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement initial architecture of node crate #42

Merged
merged 6 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,457 changes: 2,268 additions & 189 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
[workspace]
resolver = "2"
members = ["proto", "types", "rpc"]
members = ["celestia", "node", "proto", "rpc", "types"]

[workspace.dependencies]
celestia-node = { version = "0.1.0", path = "node" }
celestia-proto = { version = "0.1.0", path = "proto" }
celestia-rpc = { version = "0.1.0", path = "rpc" }
celestia-types = { version = "0.1.0", path = "types" }
nmt-rs = { git = "https://github.com/eigerco/nmt-rs.git", rev = "5146800" }
tendermint = { git = "https://github.com/eigerco/celestia-tendermint-rs.git", rev = "dbb4434" }
tendermint-proto = { git = "https://github.com/eigerco/celestia-tendermint-rs.git", rev = "dbb4434" }
tendermint = { git = "https://github.com/eigerco/celestia-tendermint-rs.git", rev = "19dc3da" }
tendermint-proto = { git = "https://github.com/eigerco/celestia-tendermint-rs.git", rev = "19dc3da" }

[patch.'https://github.com/eigerco/celestia-tendermint-rs.git']
# Uncomment to apply local changes
Expand Down
19 changes: 19 additions & 0 deletions celestia/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "celestia"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"

[dependencies]
celestia-node = { workspace = true }
celestia-rpc = { workspace = true }

anyhow = "1.0.71"
dotenvy = "0.15.7"
libp2p = { version = "0.52.3", features = [
"ed25519",
"noise",
"tcp",
"yamux",
] }
tokio = { version = "1.29.0", features = ["macros", "rt-multi-thread"] }
48 changes: 48 additions & 0 deletions celestia/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::env;

use anyhow::{Context, Result};
use celestia_node::node::{Node, NodeConfig};
use celestia_rpc::prelude::*;
use libp2p::{core::upgrade::Version, identity, noise, tcp, yamux, Multiaddr, Transport};

const WS_URL: &str = "ws://localhost:26658";

#[tokio::main]
async fn main() -> Result<()> {
let _ = dotenvy::dotenv();

// Get the address of the local bridge node
let auth_token = env::var("CELESTIA_NODE_AUTH_TOKEN_ADMIN")?;
let client = celestia_rpc::client::new_websocket(WS_URL, Some(&auth_token)).await?;
let bridge_info = client.p2p_info().await?;
let bridge_maddrs: Vec<Multiaddr> = bridge_info
.addrs
.into_iter()
.map(|addr| addr.parse().context("Parsing addr failed"))
.collect::<Result<_>>()?;
println!("bridge id: {:?}", bridge_info.id);
println!("bridge listens on: {bridge_maddrs:?}");

let bridge_ma = bridge_maddrs
.into_iter()
.find(|ma| ma.protocol_stack().any(|protocol| protocol == "tcp"))
.context("Bridge doesn't listen on tcp")?;

let local_keypair = identity::Keypair::generate_ed25519();

let transport = tcp::tokio::Transport::default()
.upgrade(Version::V1Lazy)
.authenticate(noise::Config::new(&local_keypair)?)
.multiplex(yamux::Config::default())
.boxed();

let _node = Node::new(NodeConfig {
transport,
network_id: "private".to_string(),
local_keypair,
bootstrap_peers: vec![bridge_ma],
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse()?],
});

Ok(())
}
27 changes: 27 additions & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
name = "celestia-node"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"

[dependencies]
celestia-proto = { workspace = true }
celestia-types = { workspace = true }
tendermint-proto = { workspace = true }

async-trait = "0.1.73"
futures = "0.3.28"
libp2p = { version = "0.52.3", features = [
"gossipsub",
"identify",
"macros",
"request-response",
] }
log = "0.4.20"
prost = "0.12.0"
thiserror = "1.0.48"
tokio = { version = "1.32.0", features = ["macros", "sync"] }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
libp2p = { version = "0.52.3", features = ["tokio"] }
tokio = { version = "1.32.0", features = ["rt-multi-thread"] }
97 changes: 97 additions & 0 deletions node/src/exchange.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use std::io;

use async_trait::async_trait;
use celestia_proto::p2p::pb::{HeaderRequest, HeaderResponse};
use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use libp2p::request_response::{self, Codec, ProtocolSupport};
use libp2p::StreamProtocol;
use prost::Message;

use crate::utils::stream_protocol_id;

/// Max request size in bytes
const REQUEST_SIZE_MAXIMUM: u64 = 1024;
/// Max response size in bytes
const RESPONSE_SIZE_MAXIMUM: u64 = 10 * 1024 * 1024;

pub type Behaviour = request_response::Behaviour<HeaderCodec>;
pub type Event = request_response::Event<HeaderRequest, HeaderResponse>;

/// Create a new [`Behaviour`]
pub fn new_behaviour(network: &str) -> Behaviour {
Behaviour::new(
[(
stream_protocol_id(network, "/header-ex/v0.0.3"),
ProtocolSupport::Full,
)],
request_response::Config::default(),
)
}

#[derive(Clone, Copy, Debug, Default)]
pub struct HeaderCodec;

#[async_trait]
impl Codec for HeaderCodec {
type Protocol = StreamProtocol;
type Request = HeaderRequest;
type Response = HeaderResponse;

async fn read_request<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Self::Request>
where
T: AsyncRead + Unpin + Send,
{
let mut vec = Vec::new();

io.take(REQUEST_SIZE_MAXIMUM).read_to_end(&mut vec).await?;
zvolin marked this conversation as resolved.
Show resolved Hide resolved

Ok(HeaderRequest::decode_length_delimited(&vec[..])?)
}

async fn read_response<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
) -> io::Result<Self::Response>
where
T: AsyncRead + Unpin + Send,
{
let mut vec = Vec::new();

io.take(RESPONSE_SIZE_MAXIMUM).read_to_end(&mut vec).await?;

Ok(HeaderResponse::decode_length_delimited(&vec[..])?)
}

async fn write_request<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
req: Self::Request,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
let data = req.encode_length_delimited_to_vec();

io.write_all(data.as_ref()).await?;

Ok(())
}

async fn write_response<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
resp: Self::Response,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
let data = resp.encode_length_delimited_to_vec();

io.write_all(data.as_ref()).await?;

Ok(())
}
}
6 changes: 6 additions & 0 deletions node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
mod exchange;
pub mod node;
pub mod p2p;
pub mod store;
pub mod syncer;
mod utils;
86 changes: 86 additions & 0 deletions node/src/node.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//! High-level integration of [`P2p`], [`Store`], [`Syncer`].
//!
//! [`P2p`]: crate::p2p::P2p
//! [`Store`]: crate::store::Store
//! [`Syncer`]: crate::syncer::Syncer

use std::sync::Arc;

use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::transport::Boxed;
use libp2p::identity::Keypair;
use libp2p::{Multiaddr, PeerId};
use tokio::select;
use tokio::sync::RwLock;

use crate::p2p::{P2p, P2pConfig, P2pError};
use crate::store::Store;
use crate::syncer::Syncer;

pub struct Node {
// TODO
}

pub struct NodeConfig {
pub transport: Boxed<(PeerId, StreamMuxerBox)>,
pub network_id: String,
pub local_keypair: Keypair,
pub bootstrap_peers: Vec<Multiaddr>,
pub listen_on: Vec<Multiaddr>,
}

#[derive(Debug, thiserror::Error)]
pub enum NodeError {
#[error("P2p: {0}")]
P2p(#[from] P2pError),
}

type Result<T, E = NodeError> = std::result::Result<T, E>;

#[allow(unused)]
struct Worker {
store: Arc<RwLock<Store>>,
syncer: Syncer,
p2p: P2p,
}

#[allow(unused)]
enum NodeCmd {}

#[allow(unused)]
enum NodeEvent {}

impl Node {
pub fn new(config: NodeConfig) -> Result<Self> {
let store = Arc::new(RwLock::new(Store::new()));
let syncer = Syncer::new(store.clone());

let p2p = P2p::new(P2pConfig {
transport: config.transport,
store: store.clone(),
network_id: config.network_id,
local_keypair: config.local_keypair,
bootstrap_peers: config.bootstrap_peers,
listen_on: config.listen_on,
})?;

tokio::spawn(async move {
Worker { store, syncer, p2p }.run().await;
});

Ok(Node {})
}
}

impl Worker {
async fn run(&mut self) {
loop {
select! {
Some(_ev) = self.p2p.next_event() => {
// TODO: feed it to syncer
}
// TODO: receive command from `Node` and handle it
}
}
}
}
Loading
Loading