diff --git a/Cargo.lock b/Cargo.lock index b9646cc55..6717fd655 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7646,6 +7646,7 @@ name = "serai-docker-tests" version = "0.1.0" dependencies = [ "chrono", + "tokio", ] [[package]] @@ -7665,6 +7666,7 @@ dependencies = [ "rand_core", "serai-client", "serai-coordinator-tests", + "serai-docker-tests", "serai-message-queue-tests", "serai-processor", "serai-processor-tests", diff --git a/processor/src/tests/literal/mod.rs b/processor/src/tests/literal/mod.rs index f7cd72fb7..aa5fd8020 100644 --- a/processor/src/tests/literal/mod.rs +++ b/processor/src/tests/literal/mod.rs @@ -19,8 +19,8 @@ mod bitcoin { check::= bitcoin_serai::wallet::DUST }>>(); } - fn spawn_bitcoin() -> DockerTest { - serai_docker_tests::build("bitcoin".to_string()); + async fn spawn_bitcoin() -> DockerTest { + serai_docker_tests::build("bitcoin".to_string()).await; let composition = TestBodySpecification::with_image( Image::with_repository("serai-dev-bitcoin").pull_policy(PullPolicy::Never), @@ -73,8 +73,8 @@ mod monero { use super::*; use crate::networks::{Network, Monero}; - fn spawn_monero() -> DockerTest { - serai_docker_tests::build("monero".to_string()); + async fn spawn_monero() -> DockerTest { + serai_docker_tests::build("monero".to_string()).await; let composition = TestBodySpecification::with_image( Image::with_repository("serai-dev-monero").pull_policy(PullPolicy::Never), diff --git a/processor/src/tests/mod.rs b/processor/src/tests/mod.rs index 9a68b7bc5..4473abbbd 100644 --- a/processor/src/tests/mod.rs +++ b/processor/src/tests/mod.rs @@ -44,49 +44,59 @@ macro_rules! test_network { test_key_gen::<$N>().await; } - #[test] - fn $scanner() { + #[tokio::test] + async fn $scanner() { *INIT_LOGGER; - let docker = $docker(); - docker.run(|ops| async move { - test_scanner($network(&ops).await).await; - }); + let docker = $docker().await; + docker + .run_async(|ops| async move { + test_scanner($network(&ops).await).await; + }) + .await; } - #[test] - fn $signer() { + #[tokio::test] + async fn $signer() { *INIT_LOGGER; - let docker = $docker(); - docker.run(|ops| async move { - test_signer($network(&ops).await).await; - }); + let docker = $docker().await; + docker + .run_async(|ops| async move { + test_signer($network(&ops).await).await; + }) + .await; } - #[test] - fn $wallet() { + #[tokio::test] + async fn $wallet() { *INIT_LOGGER; - let docker = $docker(); - docker.run(|ops| async move { - test_wallet($network(&ops).await).await; - }); + let docker = $docker().await; + docker + .run_async(|ops| async move { + test_wallet($network(&ops).await).await; + }) + .await; } - #[test] - fn $addresses() { + #[tokio::test] + async fn $addresses() { *INIT_LOGGER; - let docker = $docker(); - docker.run(|ops| async move { - test_addresses($network(&ops).await).await; - }); + let docker = $docker().await; + docker + .run_async(|ops| async move { + test_addresses($network(&ops).await).await; + }) + .await; } - #[test] - fn $no_deadlock_in_multisig_completed() { + #[tokio::test] + async fn $no_deadlock_in_multisig_completed() { *INIT_LOGGER; - let docker = $docker(); - docker.run(|ops| async move { - test_no_deadlock_in_multisig_completed($network(&ops).await).await; - }); + let docker = $docker().await; + docker + .run_async(|ops| async move { + test_no_deadlock_in_multisig_completed($network(&ops).await).await; + }) + .await; } }; } diff --git a/substrate/client/tests/common/mod.rs b/substrate/client/tests/common/mod.rs index 0624ca0c0..abe4ed33d 100644 --- a/substrate/client/tests/common/mod.rs +++ b/substrate/client/tests/common/mod.rs @@ -14,7 +14,7 @@ macro_rules! serai_test { TestBodySpecification, DockerTest, }; - serai_docker_tests::build("serai".to_string()); + serai_docker_tests::build("serai".to_string()).await; let handle = concat!("serai_client-serai_node-", stringify!($name)); diff --git a/tests/coordinator/src/lib.rs b/tests/coordinator/src/lib.rs index d26000928..201241f18 100644 --- a/tests/coordinator/src/lib.rs +++ b/tests/coordinator/src/lib.rs @@ -37,14 +37,18 @@ mod tests; static UNIQUE_ID: OnceLock> = OnceLock::new(); -pub fn coordinator_instance( +pub fn coordinator_docker_name() -> String { + "serai-dev-coordinator".to_string() +} + +pub async fn coordinator_instance( name: &str, message_queue_key: ::F, ) -> TestBodySpecification { - serai_docker_tests::build("coordinator".to_string()); + serai_docker_tests::build("coordinator".to_string()).await; TestBodySpecification::with_image( - Image::with_repository("serai-dev-coordinator").pull_policy(PullPolicy::Never), + Image::with_repository(coordinator_docker_name()).pull_policy(PullPolicy::Never), ) .replace_env( [ @@ -63,11 +67,15 @@ pub fn coordinator_instance( ) } -pub fn serai_composition(name: &str) -> TestBodySpecification { - serai_docker_tests::build("serai".to_string()); +pub fn serai_docker_name() -> String { + "serai-dev-serai".to_string() +} + +pub async fn serai_composition(name: &str) -> TestBodySpecification { + serai_docker_tests::build("serai".to_string()).await; TestBodySpecification::with_image( - Image::with_repository("serai-dev-serai").pull_policy(PullPolicy::Never), + Image::with_repository(serai_docker_name()).pull_policy(PullPolicy::Never), ) .replace_cmd(vec![ "serai-node".to_string(), @@ -82,15 +90,22 @@ pub fn serai_composition(name: &str) -> TestBodySpecification { } pub type Handles = (String, String, String); -pub fn coordinator_stack( +pub async fn coordinator_stack( name: &str, ) -> (Handles, ::F, Vec) { - let serai_composition = serai_composition(name); + serai_docker_tests::build_batch(vec![ + serai_docker_name(), + serai_message_queue_tests::docker_name(), + coordinator_docker_name(), + ]) + .await; + + let serai_composition = serai_composition(name).await; let (coord_key, message_queue_keys, message_queue_composition) = - serai_message_queue_tests::instance(); + serai_message_queue_tests::instance().await; - let coordinator_composition = coordinator_instance(name, coord_key); + let coordinator_composition = coordinator_instance(name, coord_key).await; // Give every item in this stack a unique ID // Uses a Mutex as we can't generate a 8-byte random ID without hitting hostname length limits diff --git a/tests/coordinator/src/tests/batch.rs b/tests/coordinator/src/tests/batch.rs index a3024c590..9f172a513 100644 --- a/tests/coordinator/src/tests/batch.rs +++ b/tests/coordinator/src/tests/batch.rs @@ -253,7 +253,7 @@ pub async fn batch( #[tokio::test] async fn batch_test() { let _one_at_a_time = ONE_AT_A_TIME.get_or_init(|| Mutex::new(())).lock(); - let (processors, test) = new_test(); + let (processors, test) = new_test().await; test .run_async(|ops| async move { diff --git a/tests/coordinator/src/tests/key_gen.rs b/tests/coordinator/src/tests/key_gen.rs index 59b1fbf51..633659f3a 100644 --- a/tests/coordinator/src/tests/key_gen.rs +++ b/tests/coordinator/src/tests/key_gen.rs @@ -222,7 +222,7 @@ pub async fn key_gen( #[tokio::test] async fn key_gen_test() { let _one_at_a_time = ONE_AT_A_TIME.get_or_init(|| Mutex::new(())).lock(); - let (processors, test) = new_test(); + let (processors, test) = new_test().await; test .run_async(|ops| async move { diff --git a/tests/coordinator/src/tests/mod.rs b/tests/coordinator/src/tests/mod.rs index 0e84ec667..36b1eb88b 100644 --- a/tests/coordinator/src/tests/mod.rs +++ b/tests/coordinator/src/tests/mod.rs @@ -21,7 +21,7 @@ pub(crate) const THRESHOLD: usize = ((COORDINATORS * 2) / 3) + 1; pub(crate) static ONE_AT_A_TIME: OnceLock> = OnceLock::new(); -pub(crate) fn new_test() -> (Vec<(Handles, ::F)>, DockerTest) { +pub(crate) async fn new_test() -> (Vec<(Handles, ::F)>, DockerTest) { let mut coordinators = vec![]; let mut test = DockerTest::new().with_network(dockertest::Network::Isolated); for i in 0 .. COORDINATORS { @@ -33,7 +33,8 @@ pub(crate) fn new_test() -> (Vec<(Handles, ::F)>, Dock 4 => "Eve", 5 => "Ferdie", _ => panic!("needed a 7th name for a serai node"), - }); + }) + .await; coordinators.push((handles, coord_key)); for composition in compositions { test.provide_container(composition); diff --git a/tests/coordinator/src/tests/sign.rs b/tests/coordinator/src/tests/sign.rs index ce4b09189..ac97e284a 100644 --- a/tests/coordinator/src/tests/sign.rs +++ b/tests/coordinator/src/tests/sign.rs @@ -170,7 +170,7 @@ pub async fn sign( #[tokio::test] async fn sign_test() { let _one_at_a_time = ONE_AT_A_TIME.get_or_init(|| Mutex::new(())).lock(); - let (processors, test) = new_test(); + let (processors, test) = new_test().await; test .run_async(|ops| async move { diff --git a/tests/docker/Cargo.toml b/tests/docker/Cargo.toml index 8db507730..edf71b85e 100644 --- a/tests/docker/Cargo.toml +++ b/tests/docker/Cargo.toml @@ -15,3 +15,5 @@ rustdoc-args = ["--cfg", "docsrs"] [dependencies] chrono = "0.4" + +tokio = { version = "1", default-features = false, features = ["sync"] } diff --git a/tests/docker/src/lib.rs b/tests/docker/src/lib.rs index 572df4563..48dfcffdb 100644 --- a/tests/docker/src/lib.rs +++ b/tests/docker/src/lib.rs @@ -1,19 +1,27 @@ use std::{ - sync::{Mutex, OnceLock}, + sync::{OnceLock, Arc}, collections::{HashSet, HashMap}, time::SystemTime, path::PathBuf, fs, env, - process::Command, }; -static BUILT: OnceLock>> = OnceLock::new(); -pub fn build(name: String) { +use tokio::{sync::Mutex, process::Command}; + +static BUILT: OnceLock>>>> = OnceLock::new(); +async fn build_inner(name: String) { let built = BUILT.get_or_init(|| Mutex::new(HashMap::new())); // Only one call to build will acquire this lock - let mut built_lock = built.lock().unwrap(); - if built_lock.contains_key(&name) { - // If it was built, return + let mut built_lock = built.lock().await; + if !built_lock.contains_key(&name) { + built_lock.insert(name.clone(), Arc::new(Mutex::new(false))); + } + let this_lock = built_lock[&name].clone(); + drop(built_lock); + + let mut built_lock = this_lock.lock().await; + // Already built + if *built_lock { return; } @@ -30,14 +38,17 @@ pub fn build(name: String) { let mut orchestration_path = repo_path.clone(); orchestration_path.push("orchestration"); + let name_without_serai_dev = name.split("serai-dev-").nth(1).unwrap_or(&name); + // If this Docker image was created after this repo was last edited, return here // This should have better performance than Docker and allows running while offline if let Ok(res) = Command::new("docker") .arg("inspect") .arg("-f") .arg("{{ .Metadata.LastTagTime }}") - .arg(format!("serai-dev-{name}")) + .arg(name.clone()) .output() + .await { let last_tag_time_buf = String::from_utf8(res.stdout).expect("docker had non-utf8 output"); let last_tag_time = last_tag_time_buf.trim(); @@ -51,16 +62,19 @@ pub fn build(name: String) { ); let mut dockerfile_path = orchestration_path.clone(); - if HashSet::from(["bitcoin", "ethereum", "monero"]).contains(name.as_str()) { - dockerfile_path = dockerfile_path.join("coins"); - } - if name.contains("-processor") { - dockerfile_path = dockerfile_path - .join("processor") - .join(name.split('-').next().unwrap()) - .join("Dockerfile"); - } else { - dockerfile_path = dockerfile_path.join(&name).join("Dockerfile"); + { + let name = name_without_serai_dev; + if HashSet::from(["bitcoin", "ethereum", "monero"]).contains(&name) { + dockerfile_path = dockerfile_path.join("coins"); + } + if name.contains("-processor") { + dockerfile_path = dockerfile_path + .join("processor") + .join(name.split('-').next().unwrap()) + .join("Dockerfile"); + } else { + dockerfile_path = dockerfile_path.join(name).join("Dockerfile"); + } } // For all services, if the Dockerfile was edited after the image was built we should rebuild @@ -69,7 +83,7 @@ pub fn build(name: String) { // Check any additionally specified paths let meta = |path: PathBuf| (path.clone(), fs::metadata(path)); - let mut metadatas = match name.as_str() { + let mut metadatas = match name_without_serai_dev { "bitcoin" => vec![], "monero" => vec![], "message-queue" => vec![ @@ -133,7 +147,7 @@ pub fn build(name: String) { if let Some(last_modified) = last_modified { if last_modified < created_time { println!("{} was built after the most recent source code edits, assuming built.", name); - built_lock.insert(name, true); + *built_lock = true; return; } } @@ -143,6 +157,7 @@ pub fn build(name: String) { println!("Building {}...", &name); // Version which always prints + /* if !Command::new("docker") .current_dir(orchestration_path) .arg("compose") @@ -151,20 +166,22 @@ pub fn build(name: String) { .spawn() .unwrap() .wait() + .await .unwrap() .success() { panic!("failed to build {name}"); } + */ // Version which only prints on error - /* let res = Command::new("docker") .current_dir(orchestration_path) .arg("compose") .arg("build") - .arg(&name) + .arg(name_without_serai_dev) .output() + .await .unwrap(); if !res.status.success() { println!("failed to build {name}\n"); @@ -182,10 +199,14 @@ pub fn build(name: String) { ); panic!("failed to build {name}"); } - */ println!("Built!"); + // Set built + *built_lock = true; +} + +async fn clear_cache_if_github() { if std::env::var("GITHUB_CI").is_ok() { println!("In CI, so clearing cache to prevent hitting the storage limits."); if !Command::new("docker") @@ -194,14 +215,28 @@ pub fn build(name: String) { .arg("--all") .arg("--force") .output() + .await .unwrap() .status .success() { - println!("failed to clear cache after building {name}\n"); + println!("failed to clear cache\n"); } } +} - // Set built - built_lock.insert(name, true); +pub async fn build(name: String) { + build_inner(name).await; + clear_cache_if_github().await; +} + +pub async fn build_batch(names: Vec) { + let mut handles = vec![]; + for name in names.into_iter().collect::>() { + handles.push(tokio::spawn(build_inner(name))); + } + for handle in handles { + handle.await.unwrap(); + } + clear_cache_if_github().await; } diff --git a/tests/full-stack/Cargo.toml b/tests/full-stack/Cargo.toml index aa7fc11a9..2f5883661 100644 --- a/tests/full-stack/Cargo.toml +++ b/tests/full-stack/Cargo.toml @@ -35,6 +35,7 @@ serai-client = { path = "../../substrate/client", features = ["serai"] } tokio = { version = "1", features = ["time"] } dockertest = "0.4" +serai-docker-tests = { path = "../docker" } serai-message-queue-tests = { path = "../message-queue" } serai-processor-tests = { path = "../processor" } serai-coordinator-tests = { path = "../coordinator" } diff --git a/tests/full-stack/src/lib.rs b/tests/full-stack/src/lib.rs index 7a9bbe8a6..d00d057ca 100644 --- a/tests/full-stack/src/lib.rs +++ b/tests/full-stack/src/lib.rs @@ -31,19 +31,38 @@ pub struct Handles { serai: String, } -pub fn full_stack(name: &str) -> (Handles, Vec) { - let (coord_key, message_queue_keys, message_queue_composition) = message_queue_instance(); +pub async fn full_stack(name: &str) -> (Handles, Vec) { + let mut processor_docker_names = serai_processor_tests::docker_names(NetworkId::Bitcoin); + processor_docker_names.extend(serai_processor_tests::docker_names(NetworkId::Monero)); + + let mut docker_names = vec![ + serai_message_queue_tests::docker_name(), + serai_coordinator_tests::serai_docker_name(), + serai_coordinator_tests::coordinator_docker_name(), + ]; + + // If this is in the GH CI, build in two stages so we don't hit storage limits + if std::env::var("GITHUB_CI").is_ok() { + serai_docker_tests::build_batch(processor_docker_names).await; + } else { + docker_names.extend(processor_docker_names); + } + serai_docker_tests::build_batch(docker_names).await; + + let (coord_key, message_queue_keys, message_queue_composition) = message_queue_instance().await; - let (bitcoin_composition, bitcoin_port) = network_instance(NetworkId::Bitcoin); + let (bitcoin_composition, bitcoin_port) = network_instance(NetworkId::Bitcoin).await; let bitcoin_processor_composition = - processor_instance(NetworkId::Bitcoin, bitcoin_port, message_queue_keys[&NetworkId::Bitcoin]); + processor_instance(NetworkId::Bitcoin, bitcoin_port, message_queue_keys[&NetworkId::Bitcoin]) + .await; - let (monero_composition, monero_port) = network_instance(NetworkId::Monero); + let (monero_composition, monero_port) = network_instance(NetworkId::Monero).await; let monero_processor_composition = - processor_instance(NetworkId::Monero, monero_port, message_queue_keys[&NetworkId::Monero]); + processor_instance(NetworkId::Monero, monero_port, message_queue_keys[&NetworkId::Monero]) + .await; - let coordinator_composition = coordinator_instance(name, coord_key); - let serai_composition = serai_composition(name); + let coordinator_composition = coordinator_instance(name, coord_key).await; + let serai_composition = serai_composition(name).await; // Give every item in this stack a unique ID // Uses a Mutex as we can't generate a 8-byte random ID without hitting hostname length limits diff --git a/tests/full-stack/src/tests/mint_and_burn.rs b/tests/full-stack/src/tests/mint_and_burn.rs index 3a5252f54..a4dd65866 100644 --- a/tests/full-stack/src/tests/mint_and_burn.rs +++ b/tests/full-stack/src/tests/mint_and_burn.rs @@ -26,7 +26,7 @@ use crate::tests::*; #[tokio::test] async fn mint_and_burn_test() { let _one_at_a_time = ONE_AT_A_TIME.get_or_init(|| Mutex::new(())).lock(); - let (handles, test) = new_test(); + let (handles, test) = new_test().await; test .run_async(|ops| async move { diff --git a/tests/full-stack/src/tests/mod.rs b/tests/full-stack/src/tests/mod.rs index 2c6603933..507181b23 100644 --- a/tests/full-stack/src/tests/mod.rs +++ b/tests/full-stack/src/tests/mod.rs @@ -11,7 +11,7 @@ pub(crate) const VALIDATORS: usize = 4; pub(crate) static ONE_AT_A_TIME: OnceLock> = OnceLock::new(); -pub(crate) fn new_test() -> (Vec, DockerTest) { +pub(crate) async fn new_test() -> (Vec, DockerTest) { let mut validators = vec![]; let mut test = DockerTest::new().with_network(dockertest::Network::Isolated); for i in 0 .. VALIDATORS { @@ -23,7 +23,8 @@ pub(crate) fn new_test() -> (Vec, DockerTest) { 4 => "Eve", 5 => "Ferdie", _ => panic!("needed a 7th name for a serai node"), - }); + }) + .await; validators.push(handles); for composition in compositions { test.provide_container(composition); diff --git a/tests/message-queue/src/lib.rs b/tests/message-queue/src/lib.rs index e2bfd3a75..68c706a9b 100644 --- a/tests/message-queue/src/lib.rs +++ b/tests/message-queue/src/lib.rs @@ -13,10 +13,14 @@ use dockertest::{ PullPolicy, Image, LogAction, LogPolicy, LogSource, LogOptions, TestBodySpecification, }; +pub fn docker_name() -> String { + "serai-dev-message-queue".to_string() +} + pub type MessageQueuePrivateKey = ::F; -pub fn instance( +pub async fn instance( ) -> (MessageQueuePrivateKey, HashMap, TestBodySpecification) { - serai_docker_tests::build("message-queue".to_string()); + serai_docker_tests::build("message-queue".to_string()).await; let coord_key = ::F::random(&mut OsRng); let priv_keys = HashMap::from([ @@ -26,7 +30,7 @@ pub fn instance( ]); let composition = TestBodySpecification::with_image( - Image::with_repository("serai-dev-message-queue").pull_policy(PullPolicy::Never), + Image::with_repository(docker_name()).pull_policy(PullPolicy::Never), ) .set_log_options(Some(LogOptions { action: LogAction::Forward, @@ -58,8 +62,8 @@ pub fn instance( (coord_key, priv_keys, composition) } -#[test] -fn basic_functionality() { +#[tokio::test] +async fn basic_functionality() { use zeroize::Zeroizing; use dockertest::DockerTest; @@ -67,99 +71,107 @@ fn basic_functionality() { use serai_message_queue::{Service, Metadata, client::MessageQueue}; let mut test = DockerTest::new().with_network(dockertest::Network::Isolated); - let (coord_key, priv_keys, composition) = instance(); + let (coord_key, priv_keys, composition) = instance().await; test.provide_container(composition); - test.run(|ops| async move { - tokio::time::timeout(core::time::Duration::from_secs(60), async move { - // Sleep for a second for the message-queue to boot - // It isn't an error to start immediately, it just silences an error - tokio::time::sleep(core::time::Duration::from_secs(1)).await; - - let rpc = ops.handle("serai-dev-message-queue").host_port(2287).unwrap(); - let rpc = rpc.0.to_string() + ":" + &rpc.1.to_string(); - - // Queue some messages - let coordinator = - MessageQueue::new(Service::Coordinator, rpc.clone(), Zeroizing::new(coord_key)); - coordinator - .queue( - Metadata { - from: Service::Coordinator, - to: Service::Processor(NetworkId::Bitcoin), - intent: b"intent".to_vec(), - }, - b"Hello, World!".to_vec(), - ) - .await; - - // Queue this twice, which message-queue should de-duplicate - for _ in 0 .. 2 { + test + .run_async(|ops| async move { + tokio::time::timeout(core::time::Duration::from_secs(60), async move { + // Sleep for a second for the message-queue to boot + // It isn't an error to start immediately, it just silences an error + tokio::time::sleep(core::time::Duration::from_secs(1)).await; + + let rpc = ops.handle("serai-dev-message-queue").host_port(2287).unwrap(); + let rpc = rpc.0.to_string() + ":" + &rpc.1.to_string(); + + // Queue some messages + let coordinator = + MessageQueue::new(Service::Coordinator, rpc.clone(), Zeroizing::new(coord_key)); coordinator .queue( Metadata { from: Service::Coordinator, to: Service::Processor(NetworkId::Bitcoin), - intent: b"intent 2".to_vec(), + intent: b"intent".to_vec(), }, - b"Hello, World, again!".to_vec(), + b"Hello, World!".to_vec(), ) .await; - } - - // Successfully get it - let bitcoin = MessageQueue::new( - Service::Processor(NetworkId::Bitcoin), - rpc.clone(), - Zeroizing::new(priv_keys[&NetworkId::Bitcoin]), - ); - let msg = bitcoin.next(Service::Coordinator).await; - assert_eq!(msg.from, Service::Coordinator); - assert_eq!(msg.id, 0); - assert_eq!(&msg.msg, b"Hello, World!"); - - // If we don't ack it, it should continue to be returned - assert_eq!(msg, bitcoin.next(Service::Coordinator).await); - - // Acknowledging it should yield the next message - bitcoin.ack(Service::Coordinator, 0).await; - - let next_msg = bitcoin.next(Service::Coordinator).await; - assert!(msg != next_msg); - assert_eq!(next_msg.from, Service::Coordinator); - assert_eq!(next_msg.id, 1); - assert_eq!(&next_msg.msg, b"Hello, World, again!"); - bitcoin.ack(Service::Coordinator, 1).await; - - // No further messages should be available - tokio::time::timeout(core::time::Duration::from_secs(10), bitcoin.next(Service::Coordinator)) + + // Queue this twice, which message-queue should de-duplicate + for _ in 0 .. 2 { + coordinator + .queue( + Metadata { + from: Service::Coordinator, + to: Service::Processor(NetworkId::Bitcoin), + intent: b"intent 2".to_vec(), + }, + b"Hello, World, again!".to_vec(), + ) + .await; + } + + // Successfully get it + let bitcoin = MessageQueue::new( + Service::Processor(NetworkId::Bitcoin), + rpc.clone(), + Zeroizing::new(priv_keys[&NetworkId::Bitcoin]), + ); + let msg = bitcoin.next(Service::Coordinator).await; + assert_eq!(msg.from, Service::Coordinator); + assert_eq!(msg.id, 0); + assert_eq!(&msg.msg, b"Hello, World!"); + + // If we don't ack it, it should continue to be returned + assert_eq!(msg, bitcoin.next(Service::Coordinator).await); + + // Acknowledging it should yield the next message + bitcoin.ack(Service::Coordinator, 0).await; + + let next_msg = bitcoin.next(Service::Coordinator).await; + assert!(msg != next_msg); + assert_eq!(next_msg.from, Service::Coordinator); + assert_eq!(next_msg.id, 1); + assert_eq!(&next_msg.msg, b"Hello, World, again!"); + bitcoin.ack(Service::Coordinator, 1).await; + + // No further messages should be available + tokio::time::timeout( + core::time::Duration::from_secs(10), + bitcoin.next(Service::Coordinator), + ) .await .unwrap_err(); - // Queueing to a distinct processor should work, with a unique ID - coordinator - .queue( - Metadata { - from: Service::Coordinator, - to: Service::Processor(NetworkId::Monero), - // Intents should be per-from-to, making this valid - intent: b"intent".to_vec(), - }, - b"Hello, World!".to_vec(), + // Queueing to a distinct processor should work, with a unique ID + coordinator + .queue( + Metadata { + from: Service::Coordinator, + to: Service::Processor(NetworkId::Monero), + // Intents should be per-from-to, making this valid + intent: b"intent".to_vec(), + }, + b"Hello, World!".to_vec(), + ) + .await; + + let monero = MessageQueue::new( + Service::Processor(NetworkId::Monero), + rpc, + Zeroizing::new(priv_keys[&NetworkId::Monero]), + ); + assert_eq!(monero.next(Service::Coordinator).await.id, 0); + monero.ack(Service::Coordinator, 0).await; + tokio::time::timeout( + core::time::Duration::from_secs(10), + monero.next(Service::Coordinator), ) - .await; - - let monero = MessageQueue::new( - Service::Processor(NetworkId::Monero), - rpc, - Zeroizing::new(priv_keys[&NetworkId::Monero]), - ); - assert_eq!(monero.next(Service::Coordinator).await.id, 0); - monero.ack(Service::Coordinator, 0).await; - tokio::time::timeout(core::time::Duration::from_secs(10), monero.next(Service::Coordinator)) .await .unwrap_err(); + }) + .await + .unwrap(); }) - .await - .unwrap(); - }); + .await; } diff --git a/tests/processor/src/lib.rs b/tests/processor/src/lib.rs index a318851eb..01983e18d 100644 --- a/tests/processor/src/lib.rs +++ b/tests/processor/src/lib.rs @@ -24,7 +24,20 @@ mod tests; static UNIQUE_ID: OnceLock> = OnceLock::new(); -pub fn processor_instance( +fn network_str(network: NetworkId) -> &'static str { + match network { + NetworkId::Serai => panic!("starting a processor for Serai"), + NetworkId::Bitcoin => "bitcoin", + NetworkId::Ethereum => "ethereum", + NetworkId::Monero => "monero", + } +} + +pub fn processor_docker_name(network: NetworkId) -> String { + format!("{}-processor", network_str(network)) +} + +pub async fn processor_instance( network: NetworkId, port: u32, message_queue_key: ::F, @@ -32,17 +45,12 @@ pub fn processor_instance( let mut entropy = [0; 32]; OsRng.fill_bytes(&mut entropy); - let network_str = match network { - NetworkId::Serai => panic!("starting a processor for Serai"), - NetworkId::Bitcoin => "bitcoin", - NetworkId::Ethereum => "ethereum", - NetworkId::Monero => "monero", - }; - let image = format!("{network_str}-processor"); - serai_docker_tests::build(image.clone()); + let network_str = network_str(network); + serai_docker_tests::build(processor_docker_name(network)).await; TestBodySpecification::with_image( - Image::with_repository(format!("serai-dev-{image}")).pull_policy(PullPolicy::Never), + Image::with_repository(format!("serai-dev-{}", processor_docker_name(network))) + .pull_policy(PullPolicy::Never), ) .replace_env( [ @@ -58,17 +66,23 @@ pub fn processor_instance( ) } +pub fn docker_names(network: NetworkId) -> Vec { + vec![network_docker_name(network), processor_docker_name(network)] +} + pub type Handles = (String, String, String); -pub fn processor_stack( +pub async fn processor_stack( network: NetworkId, ) -> (Handles, ::F, Vec) { - let (network_composition, network_rpc_port) = network_instance(network); + serai_docker_tests::build_batch(docker_names(network)).await; + + let (network_composition, network_rpc_port) = network_instance(network).await; let (coord_key, message_queue_keys, message_queue_composition) = - serai_message_queue_tests::instance(); + serai_message_queue_tests::instance().await; let processor_composition = - processor_instance(network, network_rpc_port, message_queue_keys[&network]); + processor_instance(network, network_rpc_port, message_queue_keys[&network]).await; // Give every item in this stack a unique ID // Uses a Mutex as we can't generate a 8-byte random ID without hitting hostname length limits diff --git a/tests/processor/src/networks.rs b/tests/processor/src/networks.rs index 140f096e0..233545b06 100644 --- a/tests/processor/src/networks.rs +++ b/tests/processor/src/networks.rs @@ -21,8 +21,19 @@ pub const RPC_PASS: &str = "seraidex"; pub const BTC_PORT: u32 = 8332; pub const XMR_PORT: u32 = 18081; -pub fn bitcoin_instance() -> (TestBodySpecification, u32) { - serai_docker_tests::build("bitcoin".to_string()); +pub fn network_docker_name(network: NetworkId) -> String { + match network { + NetworkId::Serai => { + panic!("asking for docker name for external network Serai, which isn't external") + } + NetworkId::Bitcoin => "bitcoin".to_string(), + NetworkId::Ethereum => todo!(), + NetworkId::Monero => "monero".to_string(), + } +} + +pub async fn bitcoin_instance() -> (TestBodySpecification, u32) { + serai_docker_tests::build(network_docker_name(NetworkId::Bitcoin)).await; let composition = TestBodySpecification::with_image( Image::with_repository("serai-dev-bitcoin").pull_policy(PullPolicy::Never), @@ -41,8 +52,8 @@ pub fn bitcoin_instance() -> (TestBodySpecification, u32) { (composition, BTC_PORT) } -pub fn monero_instance() -> (TestBodySpecification, u32) { - serai_docker_tests::build("monero".to_string()); +pub async fn monero_instance() -> (TestBodySpecification, u32) { + serai_docker_tests::build(network_docker_name(NetworkId::Monero)).await; let composition = TestBodySpecification::with_image( Image::with_repository("serai-dev-monero").pull_policy(PullPolicy::Never), @@ -63,11 +74,11 @@ pub fn monero_instance() -> (TestBodySpecification, u32) { (composition, XMR_PORT) } -pub fn network_instance(network: NetworkId) -> (TestBodySpecification, u32) { +pub async fn network_instance(network: NetworkId) -> (TestBodySpecification, u32) { match network { - NetworkId::Bitcoin => bitcoin_instance(), + NetworkId::Bitcoin => bitcoin_instance().await, NetworkId::Ethereum => todo!(), - NetworkId::Monero => monero_instance(), + NetworkId::Monero => monero_instance().await, NetworkId::Serai => { panic!("Serai is not a valid network to spawn an instance of for a processor") } diff --git a/tests/processor/src/tests/batch.rs b/tests/processor/src/tests/batch.rs index 9c678b983..3ead6efc4 100644 --- a/tests/processor/src/tests/batch.rs +++ b/tests/processor/src/tests/batch.rs @@ -191,164 +191,167 @@ pub(crate) async fn substrate_block( } } -#[test] -fn batch_test() { +#[tokio::test] +async fn batch_test() { for network in [NetworkId::Bitcoin, NetworkId::Monero] { - let (coordinators, test) = new_test(network); + let (coordinators, test) = new_test(network).await; - test.run(|ops| async move { - tokio::time::sleep(Duration::from_secs(1)).await; + test + .run_async(|ops| async move { + tokio::time::sleep(Duration::from_secs(1)).await; - let mut coordinators = coordinators - .into_iter() - .map(|(handles, key)| Coordinator::new(network, &ops, handles, key)) - .collect::>(); + let mut coordinators = coordinators + .into_iter() + .map(|(handles, key)| Coordinator::new(network, &ops, handles, key)) + .collect::>(); - // Create a wallet before we start generating keys - let mut wallet = Wallet::new(network, &ops, coordinators[0].network_handle.clone()).await; - coordinators[0].sync(&ops, &coordinators[1 ..]).await; + // Create a wallet before we start generating keys + let mut wallet = Wallet::new(network, &ops, coordinators[0].network_handle.clone()).await; + coordinators[0].sync(&ops, &coordinators[1 ..]).await; - // Generate keys - let key_pair = key_gen(&mut coordinators).await; + // Generate keys + let key_pair = key_gen(&mut coordinators).await; - // Now we we have to mine blocks to activate the key - // (the first key is activated when the network's time as of a block exceeds the Serai time - // it was confirmed at) - // Mine multiple sets of medians to ensure the median is sufficiently advanced - for _ in 0 .. (10 * confirmations(network)) { - coordinators[0].add_block(&ops).await; - tokio::time::sleep(Duration::from_secs(1)).await; - } - coordinators[0].sync(&ops, &coordinators[1 ..]).await; - - // Run twice, once with an instruction and once without - let substrate_block_num = (OsRng.next_u64() % 4_000_000_000u64) + 1; - for i in 0 .. 2 { - let mut serai_address = [0; 32]; - OsRng.fill_bytes(&mut serai_address); - let instruction = - if i == 0 { Some(InInstruction::Transfer(SeraiAddress(serai_address))) } else { None }; - - // Send into the processor's wallet - let (tx, balance_sent) = - wallet.send_to_address(&ops, &key_pair.1, instruction.clone()).await; - for coordinator in &mut coordinators { - coordinator.publish_transacton(&ops, &tx).await; + // Now we we have to mine blocks to activate the key + // (the first key is activated when the network's time as of a block exceeds the Serai time + // it was confirmed at) + // Mine multiple sets of medians to ensure the median is sufficiently advanced + for _ in 0 .. (10 * confirmations(network)) { + coordinators[0].add_block(&ops).await; + tokio::time::sleep(Duration::from_secs(1)).await; } + coordinators[0].sync(&ops, &coordinators[1 ..]).await; - // Put the TX past the confirmation depth - let mut block_with_tx = None; - for _ in 0 .. confirmations(network) { - let (hash, _) = coordinators[0].add_block(&ops).await; - if block_with_tx.is_none() { - block_with_tx = Some(hash); + // Run twice, once with an instruction and once without + let substrate_block_num = (OsRng.next_u64() % 4_000_000_000u64) + 1; + for i in 0 .. 2 { + let mut serai_address = [0; 32]; + OsRng.fill_bytes(&mut serai_address); + let instruction = + if i == 0 { Some(InInstruction::Transfer(SeraiAddress(serai_address))) } else { None }; + + // Send into the processor's wallet + let (tx, balance_sent) = + wallet.send_to_address(&ops, &key_pair.1, instruction.clone()).await; + for coordinator in &mut coordinators { + coordinator.publish_transacton(&ops, &tx).await; } - } - coordinators[0].sync(&ops, &coordinators[1 ..]).await; - // Sleep for 10s - // The scanner works on a 5s interval, so this leaves a few s for any processing/latency - tokio::time::sleep(Duration::from_secs(10)).await; - - let expected_batch = Batch { - network, - id: i, - block: BlockHash(block_with_tx.unwrap()), - instructions: if let Some(instruction) = &instruction { - vec![InInstructionWithBalance { - instruction: instruction.clone(), - balance: Balance { - coin: balance_sent.coin, - amount: Amount( - balance_sent.amount.0 - - (2 * if network == NetworkId::Bitcoin { - Bitcoin::COST_TO_AGGREGATE - } else { - Monero::COST_TO_AGGREGATE - }), - ), - }, - }] - } else { - // This shouldn't have an instruction as we didn't add any data into the TX we sent - // Empty batches remain valuable as they let us achieve consensus on the block and spend - // contained outputs - vec![] - }, - }; - - // Make sure the processors picked it up by checking they're trying to sign a batch for it - let (mut id, mut preprocesses) = - recv_batch_preprocesses(&mut coordinators, Session(0), &expected_batch, 0).await; - // Trigger a random amount of re-attempts - for attempt in 1 ..= u32::try_from(OsRng.next_u64() % 4).unwrap() { - // TODO: Double check how the processor handles this ID field - // It should be able to assert its perfectly sequential - id.attempt = attempt; - for coordinator in coordinators.iter_mut() { - coordinator - .send_message(messages::coordinator::CoordinatorMessage::BatchReattempt { - id: id.clone(), - }) - .await; + // Put the TX past the confirmation depth + let mut block_with_tx = None; + for _ in 0 .. confirmations(network) { + let (hash, _) = coordinators[0].add_block(&ops).await; + if block_with_tx.is_none() { + block_with_tx = Some(hash); + } + } + coordinators[0].sync(&ops, &coordinators[1 ..]).await; + + // Sleep for 10s + // The scanner works on a 5s interval, so this leaves a few s for any processing/latency + tokio::time::sleep(Duration::from_secs(10)).await; + + let expected_batch = Batch { + network, + id: i, + block: BlockHash(block_with_tx.unwrap()), + instructions: if let Some(instruction) = &instruction { + vec![InInstructionWithBalance { + instruction: instruction.clone(), + balance: Balance { + coin: balance_sent.coin, + amount: Amount( + balance_sent.amount.0 - + (2 * if network == NetworkId::Bitcoin { + Bitcoin::COST_TO_AGGREGATE + } else { + Monero::COST_TO_AGGREGATE + }), + ), + }, + }] + } else { + // This shouldn't have an instruction as we didn't add any data into the TX we sent + // Empty batches remain valuable as they let us achieve consensus on the block and + // spend contained outputs + vec![] + }, + }; + + // Make sure the processors picked it up by checking they're trying to sign a batch for it + let (mut id, mut preprocesses) = + recv_batch_preprocesses(&mut coordinators, Session(0), &expected_batch, 0).await; + // Trigger a random amount of re-attempts + for attempt in 1 ..= u32::try_from(OsRng.next_u64() % 4).unwrap() { + // TODO: Double check how the processor handles this ID field + // It should be able to assert its perfectly sequential + id.attempt = attempt; + for coordinator in coordinators.iter_mut() { + coordinator + .send_message(messages::coordinator::CoordinatorMessage::BatchReattempt { + id: id.clone(), + }) + .await; + } + (id, preprocesses) = + recv_batch_preprocesses(&mut coordinators, Session(0), &expected_batch, attempt) + .await; } - (id, preprocesses) = - recv_batch_preprocesses(&mut coordinators, Session(0), &expected_batch, attempt).await; - } - // Continue with signing the batch - let batch = sign_batch(&mut coordinators, key_pair.0 .0, id, preprocesses).await; - - // Check it - assert_eq!(batch.batch, expected_batch); - - // Fire a SubstrateBlock - let serai_time = - SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); - for coordinator in &mut coordinators { - let plans = substrate_block( - coordinator, - messages::substrate::CoordinatorMessage::SubstrateBlock { - context: SubstrateContext { - serai_time, - network_latest_finalized_block: batch.batch.block, + // Continue with signing the batch + let batch = sign_batch(&mut coordinators, key_pair.0 .0, id, preprocesses).await; + + // Check it + assert_eq!(batch.batch, expected_batch); + + // Fire a SubstrateBlock + let serai_time = + SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + for coordinator in &mut coordinators { + let plans = substrate_block( + coordinator, + messages::substrate::CoordinatorMessage::SubstrateBlock { + context: SubstrateContext { + serai_time, + network_latest_finalized_block: batch.batch.block, + }, + block: substrate_block_num + u64::from(i), + burns: vec![], + batches: vec![batch.batch.id], }, - block: substrate_block_num + u64::from(i), - burns: vec![], - batches: vec![batch.batch.id], - }, - ) - .await; - if instruction.is_some() || (instruction.is_none() && (network == NetworkId::Monero)) { - assert!(plans.is_empty()); - } else { - // If no instruction was used, and the processor csn presume the origin, it'd have - // created a refund Plan - assert_eq!(plans.len(), 1); + ) + .await; + if instruction.is_some() || (instruction.is_none() && (network == NetworkId::Monero)) { + assert!(plans.is_empty()); + } else { + // If no instruction was used, and the processor csn presume the origin, it'd have + // created a refund Plan + assert_eq!(plans.len(), 1); + } } } - } - // With the latter InInstruction not existing, we should've triggered a refund if the origin - // was detectable - // Check this is trying to sign a Plan - if network != NetworkId::Monero { - let mut refund_id = None; - for coordinator in &mut coordinators { - match coordinator.recv_message().await { - messages::ProcessorMessage::Sign(messages::sign::ProcessorMessage::Preprocess { - id, - .. - }) => { - if refund_id.is_none() { - refund_id = Some(id.clone()); + // With the latter InInstruction not existing, we should've triggered a refund if the origin + // was detectable + // Check this is trying to sign a Plan + if network != NetworkId::Monero { + let mut refund_id = None; + for coordinator in &mut coordinators { + match coordinator.recv_message().await { + messages::ProcessorMessage::Sign(messages::sign::ProcessorMessage::Preprocess { + id, + .. + }) => { + if refund_id.is_none() { + refund_id = Some(id.clone()); + } + assert_eq!(refund_id.as_ref().unwrap(), &id); } - assert_eq!(refund_id.as_ref().unwrap(), &id); + _ => panic!("processor didn't send preprocess for expected refund transaction"), } - _ => panic!("processor didn't send preprocess for expected refund transaction"), } } - } - }); + }) + .await; } } diff --git a/tests/processor/src/tests/key_gen.rs b/tests/processor/src/tests/key_gen.rs index b98ec04eb..72f2bd2ee 100644 --- a/tests/processor/src/tests/key_gen.rs +++ b/tests/processor/src/tests/key_gen.rs @@ -142,23 +142,25 @@ pub(crate) async fn key_gen(coordinators: &mut [Coordinator]) -> KeyPair { key_pair } -#[test] -fn key_gen_test() { +#[tokio::test] +async fn key_gen_test() { for network in [NetworkId::Bitcoin, NetworkId::Monero] { - let (coordinators, test) = new_test(network); - - test.run(|ops| async move { - // Sleep for a second for the message-queue to boot - // It isn't an error to start immediately, it just silences an error - tokio::time::sleep(core::time::Duration::from_secs(1)).await; - - // Connect to the Message Queues as the coordinator - let mut coordinators = coordinators - .into_iter() - .map(|(handles, key)| Coordinator::new(network, &ops, handles, key)) - .collect::>(); - - key_gen(&mut coordinators).await; - }); + let (coordinators, test) = new_test(network).await; + + test + .run_async(|ops| async move { + // Sleep for a second for the message-queue to boot + // It isn't an error to start immediately, it just silences an error + tokio::time::sleep(core::time::Duration::from_secs(1)).await; + + // Connect to the Message Queues as the coordinator + let mut coordinators = coordinators + .into_iter() + .map(|(handles, key)| Coordinator::new(network, &ops, handles, key)) + .collect::>(); + + key_gen(&mut coordinators).await; + }) + .await; } } diff --git a/tests/processor/src/tests/mod.rs b/tests/processor/src/tests/mod.rs index 54a17020f..ddad8c9de 100644 --- a/tests/processor/src/tests/mod.rs +++ b/tests/processor/src/tests/mod.rs @@ -17,11 +17,13 @@ mod send; pub(crate) const COORDINATORS: usize = 4; pub(crate) const THRESHOLD: usize = ((COORDINATORS * 2) / 3) + 1; -fn new_test(network: NetworkId) -> (Vec<(Handles, ::F)>, DockerTest) { +pub(crate) async fn new_test( + network: NetworkId, +) -> (Vec<(Handles, ::F)>, DockerTest) { let mut coordinators = vec![]; let mut test = DockerTest::new().with_network(dockertest::Network::Isolated); for _ in 0 .. COORDINATORS { - let (handles, coord_key, compositions) = processor_stack(network); + let (handles, coord_key, compositions) = processor_stack(network).await; coordinators.push((handles, coord_key)); for composition in compositions { test.provide_container(composition); diff --git a/tests/processor/src/tests/send.rs b/tests/processor/src/tests/send.rs index 986671c16..b01c4bd2d 100644 --- a/tests/processor/src/tests/send.rs +++ b/tests/processor/src/tests/send.rs @@ -142,163 +142,166 @@ pub(crate) async fn sign_tx( tx.unwrap() } -#[test] -fn send_test() { +#[tokio::test] +async fn send_test() { for network in [NetworkId::Bitcoin, NetworkId::Monero] { - let (coordinators, test) = new_test(network); + let (coordinators, test) = new_test(network).await; - test.run(|ops| async move { - tokio::time::sleep(Duration::from_secs(1)).await; - - let mut coordinators = coordinators - .into_iter() - .map(|(handles, key)| Coordinator::new(network, &ops, handles, key)) - .collect::>(); - - // Create a wallet before we start generating keys - let mut wallet = Wallet::new(network, &ops, coordinators[0].network_handle.clone()).await; - coordinators[0].sync(&ops, &coordinators[1 ..]).await; - - // Generate keys - let key_pair = key_gen(&mut coordinators).await; - - // Now we we have to mine blocks to activate the key - // (the first key is activated when the network's time as of a block exceeds the Serai time - // it was confirmed at) - // Mine multiple sets of medians to ensure the median is sufficiently advanced - for _ in 0 .. (10 * confirmations(network)) { - coordinators[0].add_block(&ops).await; + test + .run_async(|ops| async move { tokio::time::sleep(Duration::from_secs(1)).await; - } - coordinators[0].sync(&ops, &coordinators[1 ..]).await; - - // Send into the processor's wallet - let (tx, balance_sent) = wallet.send_to_address(&ops, &key_pair.1, None).await; - for coordinator in &mut coordinators { - coordinator.publish_transacton(&ops, &tx).await; - } - // Put the TX past the confirmation depth - let mut block_with_tx = None; - for _ in 0 .. confirmations(network) { - let (hash, _) = coordinators[0].add_block(&ops).await; - if block_with_tx.is_none() { - block_with_tx = Some(hash); + let mut coordinators = coordinators + .into_iter() + .map(|(handles, key)| Coordinator::new(network, &ops, handles, key)) + .collect::>(); + + // Create a wallet before we start generating keys + let mut wallet = Wallet::new(network, &ops, coordinators[0].network_handle.clone()).await; + coordinators[0].sync(&ops, &coordinators[1 ..]).await; + + // Generate keys + let key_pair = key_gen(&mut coordinators).await; + + // Now we we have to mine blocks to activate the key + // (the first key is activated when the network's time as of a block exceeds the Serai time + // it was confirmed at) + // Mine multiple sets of medians to ensure the median is sufficiently advanced + for _ in 0 .. (10 * confirmations(network)) { + coordinators[0].add_block(&ops).await; + tokio::time::sleep(Duration::from_secs(1)).await; } - } - coordinators[0].sync(&ops, &coordinators[1 ..]).await; - - // Sleep for 10s - // The scanner works on a 5s interval, so this leaves a few s for any processing/latency - tokio::time::sleep(Duration::from_secs(10)).await; - - let expected_batch = - Batch { network, id: 0, block: BlockHash(block_with_tx.unwrap()), instructions: vec![] }; - - // Make sure the proceessors picked it up by checking they're trying to sign a batch for it - let (id, preprocesses) = - recv_batch_preprocesses(&mut coordinators, Session(0), &expected_batch, 0).await; - - // Continue with signing the batch - let batch = sign_batch(&mut coordinators, key_pair.0 .0, id, preprocesses).await; - - // Check it - assert_eq!(batch.batch, expected_batch); + coordinators[0].sync(&ops, &coordinators[1 ..]).await; - // Fire a SubstrateBlock with a burn - let substrate_block_num = (OsRng.next_u64() % 4_000_000_000u64) + 1; - let serai_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + // Send into the processor's wallet + let (tx, balance_sent) = wallet.send_to_address(&ops, &key_pair.1, None).await; + for coordinator in &mut coordinators { + coordinator.publish_transacton(&ops, &tx).await; + } - let mut plans = vec![]; - for coordinator in &mut coordinators { - let these_plans = substrate_block( - coordinator, - messages::substrate::CoordinatorMessage::SubstrateBlock { - context: SubstrateContext { - serai_time, - network_latest_finalized_block: batch.batch.block, + // Put the TX past the confirmation depth + let mut block_with_tx = None; + for _ in 0 .. confirmations(network) { + let (hash, _) = coordinators[0].add_block(&ops).await; + if block_with_tx.is_none() { + block_with_tx = Some(hash); + } + } + coordinators[0].sync(&ops, &coordinators[1 ..]).await; + + // Sleep for 10s + // The scanner works on a 5s interval, so this leaves a few s for any processing/latency + tokio::time::sleep(Duration::from_secs(10)).await; + + let expected_batch = + Batch { network, id: 0, block: BlockHash(block_with_tx.unwrap()), instructions: vec![] }; + + // Make sure the proceessors picked it up by checking they're trying to sign a batch for it + let (id, preprocesses) = + recv_batch_preprocesses(&mut coordinators, Session(0), &expected_batch, 0).await; + + // Continue with signing the batch + let batch = sign_batch(&mut coordinators, key_pair.0 .0, id, preprocesses).await; + + // Check it + assert_eq!(batch.batch, expected_batch); + + // Fire a SubstrateBlock with a burn + let substrate_block_num = (OsRng.next_u64() % 4_000_000_000u64) + 1; + let serai_time = + SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + + let mut plans = vec![]; + for coordinator in &mut coordinators { + let these_plans = substrate_block( + coordinator, + messages::substrate::CoordinatorMessage::SubstrateBlock { + context: SubstrateContext { + serai_time, + network_latest_finalized_block: batch.batch.block, + }, + block: substrate_block_num, + burns: vec![OutInstructionWithBalance { + instruction: OutInstruction { address: wallet.address(), data: None }, + balance: balance_sent, + }], + batches: vec![batch.batch.id], }, - block: substrate_block_num, - burns: vec![OutInstructionWithBalance { - instruction: OutInstruction { address: wallet.address(), data: None }, - balance: balance_sent, - }], - batches: vec![batch.batch.id], - }, - ) - .await; + ) + .await; - if plans.is_empty() { - plans = these_plans; - } else { - assert_eq!(plans, these_plans); + if plans.is_empty() { + plans = these_plans; + } else { + assert_eq!(plans, these_plans); + } } - } - assert_eq!(plans.len(), 1); - - // Start signing the TX - let (mut id, mut preprocesses) = - recv_sign_preprocesses(&mut coordinators, Session(0), 0).await; - assert_eq!(id, SignId { session: Session(0), id: plans[0].id, attempt: 0 }); - - // Trigger a random amount of re-attempts - for attempt in 1 ..= u32::try_from(OsRng.next_u64() % 4).unwrap() { - // TODO: Double check how the processor handles this ID field - // It should be able to assert its perfectly sequential - id.attempt = attempt; - for coordinator in coordinators.iter_mut() { - coordinator - .send_message(messages::sign::CoordinatorMessage::Reattempt { id: id.clone() }) - .await; + assert_eq!(plans.len(), 1); + + // Start signing the TX + let (mut id, mut preprocesses) = + recv_sign_preprocesses(&mut coordinators, Session(0), 0).await; + assert_eq!(id, SignId { session: Session(0), id: plans[0].id, attempt: 0 }); + + // Trigger a random amount of re-attempts + for attempt in 1 ..= u32::try_from(OsRng.next_u64() % 4).unwrap() { + // TODO: Double check how the processor handles this ID field + // It should be able to assert its perfectly sequential + id.attempt = attempt; + for coordinator in coordinators.iter_mut() { + coordinator + .send_message(messages::sign::CoordinatorMessage::Reattempt { id: id.clone() }) + .await; + } + (id, preprocesses) = recv_sign_preprocesses(&mut coordinators, Session(0), attempt).await; } - (id, preprocesses) = recv_sign_preprocesses(&mut coordinators, Session(0), attempt).await; - } - let participating = preprocesses.keys().cloned().collect::>(); + let participating = preprocesses.keys().cloned().collect::>(); - let tx_id = sign_tx(&mut coordinators, Session(0), id.clone(), preprocesses).await; + let tx_id = sign_tx(&mut coordinators, Session(0), id.clone(), preprocesses).await; - // Make sure all participating nodes published the TX - let participating = - participating.iter().map(|p| usize::from(u16::from(*p) - 1)).collect::>(); - for participant in &participating { - assert!(coordinators[*participant].get_transaction(&ops, &tx_id).await.is_some()); - } + // Make sure all participating nodes published the TX + let participating = + participating.iter().map(|p| usize::from(u16::from(*p) - 1)).collect::>(); + for participant in &participating { + assert!(coordinators[*participant].get_transaction(&ops, &tx_id).await.is_some()); + } - // Publish this transaction to the left out nodes - let tx = coordinators[*participating.iter().next().unwrap()] - .get_transaction(&ops, &tx_id) - .await - .unwrap(); - for (i, coordinator) in coordinators.iter_mut().enumerate() { - if !participating.contains(&i) { - coordinator.publish_transacton(&ops, &tx).await; - // Tell them of it as a completion of the relevant signing nodess - coordinator - .send_message(messages::sign::CoordinatorMessage::Completed { - session: Session(0), - id: id.id, - tx: tx_id.clone(), - }) - .await; - // Verify they send Completed back - match coordinator.recv_message().await { - messages::ProcessorMessage::Sign(messages::sign::ProcessorMessage::Completed { - session, - id: this_id, - tx: this_tx, - }) => { - assert_eq!(session, Session(0)); - assert_eq!(&this_id, &id.id); - assert_eq!(this_tx, tx_id); + // Publish this transaction to the left out nodes + let tx = coordinators[*participating.iter().next().unwrap()] + .get_transaction(&ops, &tx_id) + .await + .unwrap(); + for (i, coordinator) in coordinators.iter_mut().enumerate() { + if !participating.contains(&i) { + coordinator.publish_transacton(&ops, &tx).await; + // Tell them of it as a completion of the relevant signing nodess + coordinator + .send_message(messages::sign::CoordinatorMessage::Completed { + session: Session(0), + id: id.id, + tx: tx_id.clone(), + }) + .await; + // Verify they send Completed back + match coordinator.recv_message().await { + messages::ProcessorMessage::Sign(messages::sign::ProcessorMessage::Completed { + session, + id: this_id, + tx: this_tx, + }) => { + assert_eq!(session, Session(0)); + assert_eq!(&this_id, &id.id); + assert_eq!(this_tx, tx_id); + } + _ => panic!("processor didn't send Completed"), } - _ => panic!("processor didn't send Completed"), } } - } - // TODO: Test the Eventuality from the blockchain, instead of from the coordinator - // TODO: Test what happenns when Completed is sent with a non-existent TX ID - // TODO: Test what happenns when Completed is sent with a non-completing TX ID - }); + // TODO: Test the Eventuality from the blockchain, instead of from the coordinator + // TODO: Test what happenns when Completed is sent with a non-existent TX ID + // TODO: Test what happenns when Completed is sent with a non-completing TX ID + }) + .await; } } diff --git a/tests/reproducible-runtime/src/lib.rs b/tests/reproducible-runtime/src/lib.rs index 2a7f7f51d..6f2a54549 100644 --- a/tests/reproducible-runtime/src/lib.rs +++ b/tests/reproducible-runtime/src/lib.rs @@ -1,5 +1,5 @@ -#[test] -pub fn reproducibly_builds() { +#[tokio::test] +pub async fn reproducibly_builds() { use std::{collections::HashSet, process::Command}; use rand_core::{RngCore, OsRng}; @@ -9,7 +9,7 @@ pub fn reproducibly_builds() { const RUNS: usize = 3; const TIMEOUT: u16 = 180 * 60; // 3 hours - serai_docker_tests::build("runtime".to_string()); + serai_docker_tests::build("runtime".to_string()).await; let mut ids = vec![[0; 8]; RUNS]; for id in &mut ids { @@ -38,64 +38,66 @@ pub fn reproducibly_builds() { ); } - test.run(|_| async { - let ids = ids; - let mut containers = vec![]; - for container in String::from_utf8( - Command::new("docker").arg("ps").arg("--format").arg("{{.Names}}").output().unwrap().stdout, - ) - .expect("output wasn't utf-8") - .lines() - { - for id in &ids { - if container.contains(&hex::encode(id)) { - containers.push(container.trim().to_string()); + test + .run_async(|_| async { + let ids = ids; + let mut containers = vec![]; + for container in String::from_utf8( + Command::new("docker").arg("ps").arg("--format").arg("{{.Names}}").output().unwrap().stdout, + ) + .expect("output wasn't utf-8") + .lines() + { + for id in &ids { + if container.contains(&hex::encode(id)) { + containers.push(container.trim().to_string()); + } } } - } - assert_eq!(containers.len(), RUNS, "couldn't find all containers"); + assert_eq!(containers.len(), RUNS, "couldn't find all containers"); - let mut res = vec![None; RUNS]; - 'attempt: for _ in 0 .. (TIMEOUT / 10) { - tokio::time::sleep(core::time::Duration::from_secs(10)).await; + let mut res = vec![None; RUNS]; + 'attempt: for _ in 0 .. (TIMEOUT / 10) { + tokio::time::sleep(core::time::Duration::from_secs(10)).await; - 'runner: for (i, container) in containers.iter().enumerate() { - if res[i].is_some() { - continue; - } + 'runner: for (i, container) in containers.iter().enumerate() { + if res[i].is_some() { + continue; + } - let logs = Command::new("docker").arg("logs").arg(container).output().unwrap(); - let Some(last_log) = - std::str::from_utf8(&logs.stdout).expect("output wasn't utf-8").lines().last() - else { - continue 'runner; - }; + let logs = Command::new("docker").arg("logs").arg(container).output().unwrap(); + let Some(last_log) = + std::str::from_utf8(&logs.stdout).expect("output wasn't utf-8").lines().last() + else { + continue 'runner; + }; - let split = last_log.split("Runtime hash: ").collect::>(); - if split.len() == 2 { - res[i] = Some(split[1].to_string()); - continue 'runner; + let split = last_log.split("Runtime hash: ").collect::>(); + if split.len() == 2 { + res[i] = Some(split[1].to_string()); + continue 'runner; + } } + + for item in &res { + if item.is_none() { + continue 'attempt; + } + } + break; } + // If we didn't get results from all runners, panic for item in &res { if item.is_none() { - continue 'attempt; + panic!("couldn't get runtime hashes within allowed time"); } } - break; - } - - // If we didn't get results from all runners, panic - for item in &res { - if item.is_none() { - panic!("couldn't get runtime hashes within allowed time"); + let mut identical = HashSet::new(); + for res in res.clone() { + identical.insert(res.unwrap()); } - } - let mut identical = HashSet::new(); - for res in res.clone() { - identical.insert(res.unwrap()); - } - assert_eq!(identical.len(), 1, "got different runtime hashes {:?}", res); - }); + assert_eq!(identical.len(), 1, "got different runtime hashes {:?}", res); + }) + .await; }