diff --git a/src/analytics/ledger/address_balance.rs b/src/analytics/ledger/address_balance.rs index 5d3cc0cc3..dfe7281a9 100644 --- a/src/analytics/ledger/address_balance.rs +++ b/src/analytics/ledger/address_balance.rs @@ -3,10 +3,7 @@ use std::collections::HashMap; -use iota_sdk::types::block::{ - address::{Bech32Address, ToBech32Ext}, - protocol::ProtocolParameters, -}; +use iota_sdk::types::block::address::Address; use serde::{Deserialize, Serialize}; use crate::{ @@ -32,20 +29,16 @@ pub(crate) struct DistributionStat { /// Computes the number of addresses the currently hold a balance. #[derive(Serialize, Deserialize)] pub(crate) struct AddressBalancesAnalytics { - balances: HashMap, + balances: HashMap, } impl AddressBalancesAnalytics { /// Initialize the analytics by reading the current ledger state. - pub(crate) fn init<'a>( - unspent_outputs: impl IntoIterator, - protocol_params: &ProtocolParameters, - ) -> Self { - let hrp = protocol_params.bech32_hrp(); + pub(crate) fn init<'a>(unspent_outputs: impl IntoIterator) -> Self { let mut balances = HashMap::new(); for output in unspent_outputs { if let Some(a) = output.address() { - *balances.entry(a.clone().to_bech32(hrp)).or_default() += output.amount(); + *balances.entry(a.clone()).or_default() += output.amount(); } } Self { balances } @@ -56,15 +49,13 @@ impl Analytics for AddressBalancesAnalytics { type Measurement = AddressBalanceMeasurement; fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], ctx: &dyn AnalyticsContext) { - let hrp = ctx.protocol_params().bech32_hrp(); for output in consumed { if let Some(a) = output.address() { - let a = a.clone().to_bech32(hrp); // All inputs should be present in `addresses`. If not, we skip it's value. - if let Some(amount) = self.balances.get_mut(&a) { + if let Some(amount) = self.balances.get_mut(a) { *amount -= output.amount(); if *amount == 0 { - self.balances.remove(&a); + self.balances.remove(a); } } } @@ -73,7 +64,7 @@ impl Analytics for AddressBalancesAnalytics { for output in created { if let Some(a) = output.address() { // All inputs should be present in `addresses`. If not, we skip it's value. - *self.balances.entry(a.clone().to_bech32(hrp)).or_default() += output.amount(); + *self.balances.entry(a.clone()).or_default() += output.amount(); } } } diff --git a/src/analytics/mod.rs b/src/analytics/mod.rs index 99ca1e2a6..28a4cf04b 100644 --- a/src/analytics/mod.rs +++ b/src/analytics/mod.rs @@ -45,7 +45,7 @@ pub trait AnalyticsContext: Send + Sync { impl<'a, I: InputSource> AnalyticsContext for Slot<'a, I> { fn protocol_params(&self) -> &ProtocolParameters { - &self.protocol_params.parameters + &self.protocol_parameters } fn slot_index(&self) -> SlotIndex { @@ -158,9 +158,7 @@ impl Analytic { unspent_outputs: impl IntoIterator, ) -> Self { Self(match choice { - AnalyticsChoice::AddressBalance => { - Box::new(AddressBalancesAnalytics::init(unspent_outputs, &protocol_params)) as _ - } + AnalyticsChoice::AddressBalance => Box::new(AddressBalancesAnalytics::init(unspent_outputs)) as _, AnalyticsChoice::BaseTokenActivity => Box::::default() as _, AnalyticsChoice::BlockActivity => Box::::default() as _, AnalyticsChoice::ActiveAddresses => Box::::default() as _, @@ -446,7 +444,7 @@ mod test { ) -> Self { Self { active_addresses: Default::default(), - address_balance: AddressBalancesAnalytics::init(unspent_outputs, &protocol_params), + address_balance: AddressBalancesAnalytics::init(unspent_outputs), base_tokens: Default::default(), ledger_outputs: LedgerOutputMeasurement::init(unspent_outputs), ledger_size: LedgerSizeAnalytics::init(protocol_params, unspent_outputs), diff --git a/src/bin/inx-chronicle/api/core/routes.rs b/src/bin/inx-chronicle/api/core/routes.rs index ec0759fe0..edd18f11e 100644 --- a/src/bin/inx-chronicle/api/core/routes.rs +++ b/src/bin/inx-chronicle/api/core/routes.rs @@ -12,14 +12,13 @@ use axum::{ use chronicle::{ db::{ mongodb::collections::{ - BlockCollection, CommittedSlotCollection, ConfigurationUpdateCollection, OutputCollection, OutputMetadata, - OutputWithMetadataResult, ProtocolUpdateCollection, UtxoChangesResult, + ApplicationStateCollection, BlockCollection, CommittedSlotCollection, OutputCollection, OutputMetadata, + OutputWithMetadataResult, UtxoChangesResult, }, MongoDb, }, model::block_metadata::BlockMetadata, }; -use futures::TryStreamExt; use iota_sdk::types::{ api::core::{ BaseTokenResponse, BlockMetadataResponse, OutputWithMetadataResponse, ProtocolParametersResponse, @@ -88,30 +87,26 @@ pub fn routes() -> Router { } pub async fn info(database: Extension) -> ApiResult { - let protocol_parameters = database - .collection::() - .get_all_protocol_parameters() + let node_config = database + .collection::() + .get_node_config() .await? - .map_ok(|doc| ProtocolParametersResponse { + .ok_or(CorruptStateError::NodeConfig)?; + let protocol_parameters = node_config + .protocol_parameters + .into_iter() + .map(|doc| ProtocolParametersResponse { parameters: doc.parameters, start_epoch: doc.start_epoch, }) - .try_collect::>() - .await - .map_err(|_| CorruptStateError::ProtocolParams)?; + .collect::>(); let is_healthy = is_healthy(&database).await.unwrap_or_else(|ApiError { error, .. }| { tracing::error!("An error occured during health check: {error}"); false }); - let base_token = database - .collection::() - .get_latest_node_configuration() - .await? - .ok_or(CorruptStateError::NodeConfig)? - .config - .base_token; + let base_token = node_config.base_token; let latest_commitment_id = database .collection::() diff --git a/src/bin/inx-chronicle/api/explorer/routes.rs b/src/bin/inx-chronicle/api/explorer/routes.rs index 4f84d0a54..f844fe6b4 100644 --- a/src/bin/inx-chronicle/api/explorer/routes.rs +++ b/src/bin/inx-chronicle/api/explorer/routes.rs @@ -4,7 +4,7 @@ use axum::{extract::Path, routing::get, Extension}; use chronicle::db::{ mongodb::collections::{ - BlockCollection, CommittedSlotCollection, LedgerUpdateCollection, OutputCollection, ProtocolUpdateCollection, + ApplicationStateCollection, BlockCollection, CommittedSlotCollection, LedgerUpdateCollection, OutputCollection, }, MongoDb, }; @@ -105,11 +105,10 @@ async fn ledger_updates_by_slot( LedgerUpdatesBySlotPagination { page_size, cursor }: LedgerUpdatesBySlotPagination, ) -> ApiResult { let hrp = database - .collection::() - .get_latest_protocol_parameters() + .collection::() + .get_protocol_parameters() .await? .ok_or(CorruptStateError::ProtocolParams)? - .parameters .bech32_hrp(); let mut record_stream = database @@ -298,11 +297,10 @@ async fn richest_addresses_ledger_analytics( .await?; let hrp = database - .collection::() - .get_latest_protocol_parameters() + .collection::() + .get_protocol_parameters() .await? .ok_or(CorruptStateError::ProtocolParams)? - .parameters .bech32_hrp(); Ok(RichestAddressesResponse { diff --git a/src/bin/inx-chronicle/api/poi/routes.rs b/src/bin/inx-chronicle/api/poi/routes.rs index aa5b374b3..802054534 100644 --- a/src/bin/inx-chronicle/api/poi/routes.rs +++ b/src/bin/inx-chronicle/api/poi/routes.rs @@ -9,7 +9,7 @@ use axum::{ Extension, }; use chronicle::db::{ - mongodb::collections::{BlockCollection, CommittedSlotCollection, ConfigurationUpdateCollection}, + mongodb::collections::{BlockCollection, CommittedSlotCollection}, MongoDb, }; use iota_sdk::types::{api::core::BlockState, block::BlockId, TryFromDto}; diff --git a/src/bin/inx-chronicle/api/routes.rs b/src/bin/inx-chronicle/api/routes.rs index fca6645ff..51144a777 100644 --- a/src/bin/inx-chronicle/api/routes.rs +++ b/src/bin/inx-chronicle/api/routes.rs @@ -11,7 +11,7 @@ use axum::{ Extension, Json, TypedHeader, }; use chronicle::db::{ - mongodb::collections::{CommittedSlotCollection, ProtocolUpdateCollection}, + mongodb::collections::{ApplicationStateCollection, CommittedSlotCollection}, MongoDb, }; use hyper::StatusCode; @@ -145,10 +145,9 @@ pub async fn is_healthy(database: &MongoDb) -> ApiResult { .await? { if let Some(protocol_params) = database - .collection::() - .get_latest_protocol_parameters() + .collection::() + .get_protocol_parameters() .await? - .map(|p| p.parameters) { if is_new_enough(newest_slot.slot_index.to_timestamp( protocol_params.genesis_unix_timestamp(), diff --git a/src/bin/inx-chronicle/cli/analytics.rs b/src/bin/inx-chronicle/cli/analytics.rs index 6def8c9d3..b182f340a 100644 --- a/src/bin/inx-chronicle/cli/analytics.rs +++ b/src/bin/inx-chronicle/cli/analytics.rs @@ -10,7 +10,7 @@ use chronicle::{ config::{all_analytics, all_interval_analytics, IntervalAnalyticsChoice}, AnalyticsChoice, InfluxDb, }, - mongodb::collections::{OutputCollection, ProtocolUpdateCollection}, + mongodb::collections::{ApplicationStateCollection, OutputCollection}, MongoDb, }, tangle::{InputSource, Tangle}, @@ -91,11 +91,10 @@ impl FillAnalyticsCommand { tracing::info!("Connecting to database using hosts: `{}`.", config.mongodb.hosts_str()?); let db = MongoDb::connect(&config.mongodb).await?; let protocol_params = db - .collection::() - .get_latest_protocol_parameters() + .collection::() + .get_protocol_parameters() .await? - .ok_or_else(|| eyre::eyre!("No protocol parameters in database."))? - .parameters; + .ok_or_else(|| eyre::eyre!("No protocol parameters in database."))?; let start_index = if let Some(index) = start_index { *index } else if let Some(start_date) = start_date { @@ -234,7 +233,7 @@ pub async fn fill_analytics( if let Some(slot) = slot_stream.try_next().await? { // Check if the protocol params changed (or we just started) - if !matches!(&state, Some(state) if state.prev_protocol_params == slot.protocol_params.parameters) { + if !matches!(&state, Some(state) if state.prev_protocol_params == slot.protocol_parameters) { // Only get the ledger state for slots after the genesis since it requires // getting the previous slot data. let ledger_state = if slot.slot_index().0 > 0 { @@ -249,11 +248,11 @@ pub async fn fill_analytics( let analytics = analytics_choices .iter() - .map(|choice| Analytic::init(choice, &slot.protocol_params.parameters, &ledger_state)) + .map(|choice| Analytic::init(choice, &slot.protocol_parameters, &ledger_state)) .collect::>(); state = Some(AnalyticsState { analytics, - prev_protocol_params: slot.protocol_params.parameters.clone(), + prev_protocol_params: slot.protocol_parameters.clone(), }); } diff --git a/src/bin/inx-chronicle/inx/influx/analytics.rs b/src/bin/inx-chronicle/inx/influx/analytics.rs index 7206cbe11..55a0a58e8 100644 --- a/src/bin/inx-chronicle/inx/influx/analytics.rs +++ b/src/bin/inx-chronicle/inx/influx/analytics.rs @@ -60,7 +60,7 @@ impl InxWorker { if let (Some(influx_db), analytics_choices) = (&self.influx_db, analytics_choices) { if influx_db.config().analytics_enabled { // Check if the protocol params changed (or we just started) - if !matches!(&state, Some(state) if state.prev_protocol_params == slot.protocol_params.parameters) { + if !matches!(&state, Some(state) if state.prev_protocol_params == slot.protocol_parameters) { let ledger_state = self .db .collection::() @@ -71,11 +71,11 @@ impl InxWorker { let analytics = analytics_choices .iter() - .map(|choice| Analytic::init(choice, &slot.protocol_params.parameters, &ledger_state)) + .map(|choice| Analytic::init(choice, &slot.protocol_parameters, &ledger_state)) .collect::>(); *state = Some(AnalyticsState { analytics, - prev_protocol_params: slot.protocol_params.parameters.clone(), + prev_protocol_params: slot.protocol_parameters.clone(), }); } diff --git a/src/bin/inx-chronicle/inx/mod.rs b/src/bin/inx-chronicle/inx/mod.rs index 0908d55fc..9ed21662d 100644 --- a/src/bin/inx-chronicle/inx/mod.rs +++ b/src/bin/inx-chronicle/inx/mod.rs @@ -11,8 +11,8 @@ use std::time::Duration; use chronicle::{ db::{ mongodb::collections::{ - ApplicationStateCollection, BlockCollection, CommittedSlotCollection, ConfigurationUpdateCollection, - LedgerUpdateCollection, OutputCollection, ProtocolUpdateCollection, + ApplicationStateCollection, BlockCollection, CommittedSlotCollection, LedgerUpdateCollection, + OutputCollection, }, MongoDb, }, @@ -126,34 +126,34 @@ impl InxWorker { let node_configuration = inx.get_node_configuration().await?; - let protocol_parameters = node_configuration.protocol_parameters.last().unwrap(); - debug!( "Connected to network `{}` with base token `{}[{}]`.", - protocol_parameters.parameters.network_name(), + node_configuration + .protocol_parameters + .last() + .unwrap() + .parameters + .network_name(), node_configuration.base_token.name, node_configuration.base_token.ticker_symbol ); - if let Some(latest) = self + if let Some(db_node_config) = self .db - .collection::() - .get_latest_protocol_parameters() + .collection::() + .get_node_config() .await? { - if latest.parameters.network_name() != protocol_parameters.parameters.network_name() { - bail!(InxWorkerError::NetworkChanged { - old: latest.parameters.network_name().to_owned(), - new: protocol_parameters.parameters.network_name().to_owned(), - }); - } - debug!("Found matching network in the database."); - if latest.parameters != protocol_parameters.parameters { - debug!("Updating protocol parameters."); - self.db - .collection::() - .upsert_protocol_parameters(protocol_parameters.start_epoch, protocol_parameters.parameters.clone()) - .await?; + if db_node_config != node_configuration { + if db_node_config.latest_parameters().network_name() + != node_configuration.latest_parameters().network_name() + { + bail!(InxWorkerError::NetworkChanged { + old: db_node_config.latest_parameters().network_name().to_owned(), + new: node_configuration.latest_parameters().network_name().to_owned(), + }); + } + // TODO: Maybe we need to do some additional checking? } } else { self.db.clear().await?; @@ -210,10 +210,12 @@ impl InxWorker { let starting_index = starting_index.unwrap_or(SlotIndex(0)); + let protocol_params = node_configuration.latest_parameters(); + // Get the timestamp for the starting index let slot_timestamp = starting_index.to_timestamp( - protocol_parameters.parameters.genesis_unix_timestamp(), - protocol_parameters.parameters.slot_duration_in_seconds(), + protocol_params.genesis_unix_timestamp(), + protocol_params.slot_duration_in_seconds(), ); info!( @@ -231,15 +233,16 @@ impl InxWorker { info!( "Linking database `{}` to network `{}`.", self.db.name(), - protocol_parameters.parameters.network_name() + protocol_params.network_name() ); - - self.db - .collection::() - .upsert_protocol_parameters(protocol_parameters.start_epoch, protocol_parameters.parameters.clone()) - .await?; } + debug!("Updating node configuration."); + self.db + .collection::() + .set_node_config(node_configuration) + .await?; + Ok((start_index, inx)) } @@ -276,18 +279,6 @@ impl InxWorker { tracing::Span::current().record("consumed", slot.ledger_updates().consumed_outputs().len()); self.handle_accepted_blocks(&slot).await?; - self.db - .collection::() - .upsert_protocol_parameters( - slot.index() - .to_epoch_index(slot.protocol_params.parameters.slots_per_epoch_exponent()), - slot.protocol_params.parameters.clone(), - ) - .await?; - self.db - .collection::() - .upsert_node_configuration(slot.index(), slot.node_config.clone()) - .await?; #[cfg(feature = "influx")] self.update_influx( diff --git a/src/db/mongodb/collections/application_state.rs b/src/db/mongodb/collections/application_state.rs index 3209b6832..0846b29b4 100644 --- a/src/db/mongodb/collections/application_state.rs +++ b/src/db/mongodb/collections/application_state.rs @@ -1,13 +1,17 @@ // Copyright 2023 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 -use iota_sdk::types::block::slot::SlotIndex; +use futures::TryStreamExt; +use iota_sdk::types::block::{protocol::ProtocolParameters, slot::SlotIndex}; use mongodb::{bson::doc, options::UpdateOptions}; use serde::{Deserialize, Serialize}; -use crate::db::{ - mongodb::{DbError, MongoDbCollection, MongoDbCollectionExt}, - MongoDb, +use crate::{ + db::{ + mongodb::{DbError, MongoDbCollection, MongoDbCollectionExt}, + MongoDb, + }, + model::{node::NodeConfiguration, SerializeToBson}, }; /// The MongoDb document representation of singleton Application State. @@ -15,6 +19,7 @@ use crate::db::{ pub struct ApplicationStateDocument { pub starting_slot: Option, pub last_migration: Option, + pub node_config: Option, } /// The migration version and associated metadata. @@ -85,11 +90,45 @@ impl ApplicationStateCollection { self.update_one( doc! {}, doc! { - "$set": { "last_migration": mongodb::bson::to_bson(&last_migration)? } + "$set": { "last_migration": last_migration.to_bson() } }, UpdateOptions::builder().upsert(true).build(), ) .await?; Ok(()) } + + /// Gets the node config. + pub async fn get_node_config(&self) -> Result, DbError> { + Ok(self + .find_one::(doc! {}, None) + .await? + .and_then(|doc| doc.node_config)) + } + + /// Set the node_config in the singleton application state. + pub async fn set_node_config(&self, node_config: NodeConfiguration) -> Result<(), DbError> { + self.update_one( + doc! {}, + doc! { + "$set": { "node_config": node_config.to_bson() } + }, + UpdateOptions::builder().upsert(true).build(), + ) + .await?; + Ok(()) + } + + /// Gets the protocol parameters. + pub async fn get_protocol_parameters(&self) -> Result, DbError> { + Ok(self + .aggregate::( + [doc! { "$replaceWith": { "$last": "$node_config.protocol_parameters" } }], + None, + ) + .await? + .try_next() + .await? + .map(|p| p.parameters)) + } } diff --git a/src/db/mongodb/collections/configuration_update.rs b/src/db/mongodb/collections/configuration_update.rs deleted file mode 100644 index 8953d63c7..000000000 --- a/src/db/mongodb/collections/configuration_update.rs +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright 2023 IOTA Stiftung -// SPDX-License-Identifier: Apache-2.0 - -use iota_sdk::types::block::slot::SlotIndex; -use mongodb::{ - bson::doc, - options::{FindOneOptions, UpdateOptions}, -}; -use serde::{Deserialize, Serialize}; - -use crate::{ - db::{ - mongodb::{DbError, MongoDbCollection, MongoDbCollectionExt}, - MongoDb, - }, - model::{node::NodeConfiguration, SerializeToBson}, -}; - -/// The corresponding MongoDb document representation to store [`NodeConfiguration`]s. -#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -pub struct ConfigurationUpdateDocument { - #[serde(rename = "_id")] - pub slot_index: SlotIndex, - #[serde(flatten)] - pub config: NodeConfiguration, -} - -/// A collection to store [`NodeConfiguration`]s. -pub struct ConfigurationUpdateCollection { - collection: mongodb::Collection, -} - -impl MongoDbCollection for ConfigurationUpdateCollection { - const NAME: &'static str = "iota_configuration_updates"; - type Document = ConfigurationUpdateDocument; - - fn instantiate(_db: &MongoDb, collection: mongodb::Collection) -> Self { - Self { collection } - } - - fn collection(&self) -> &mongodb::Collection { - &self.collection - } -} - -impl ConfigurationUpdateCollection { - /// Gets the latest node configuration. - pub async fn get_latest_node_configuration(&self) -> Result, DbError> { - Ok(self - .find_one(doc! {}, FindOneOptions::builder().sort(doc! { "_id": -1 }).build()) - .await?) - } - - /// Gets the node configuration that was valid for the given slot index. - pub async fn get_node_configuration_for_slot_index( - &self, - slot_index: SlotIndex, - ) -> Result, DbError> { - Ok(self - .find_one( - doc! { "_id": { "$lte": slot_index.0 } }, - FindOneOptions::builder().sort(doc! { "_id": -1 }).build(), - ) - .await?) - } - - /// Inserts or updates a node configuration for a given slot index. - pub async fn upsert_node_configuration( - &self, - slot_index: SlotIndex, - config: NodeConfiguration, - ) -> Result<(), DbError> { - let node_config = self.get_node_configuration_for_slot_index(slot_index).await?; - if !matches!(node_config, Some(node_config) if node_config.config == config) { - self.update_one( - doc! { "_id": slot_index.0 }, - doc! { "$set": config.to_bson() }, - UpdateOptions::builder().upsert(true).build(), - ) - .await?; - } - Ok(()) - } -} diff --git a/src/db/mongodb/collections/mod.rs b/src/db/mongodb/collections/mod.rs index 5474761a4..017acea67 100644 --- a/src/db/mongodb/collections/mod.rs +++ b/src/db/mongodb/collections/mod.rs @@ -6,14 +6,10 @@ mod application_state; mod block; /// Module containing the committed slot collection. mod committed_slot; -/// Module containing the node configuration collection. -mod configuration_update; /// Module containing the ledger update collection. mod ledger_update; /// Module containing the outputs collection. mod outputs; -/// Module containing the protocol parameters collection. -mod protocol_update; use std::str::FromStr; @@ -26,14 +22,12 @@ pub use self::{ application_state::{ApplicationStateCollection, MigrationVersion}, block::BlockCollection, committed_slot::CommittedSlotCollection, - configuration_update::ConfigurationUpdateCollection, ledger_update::{LedgerUpdateByAddressRecord, LedgerUpdateBySlotRecord, LedgerUpdateCollection}, outputs::{ AccountOutputsQuery, AddressStat, AnchorOutputsQuery, BasicOutputsQuery, DelegationOutputsQuery, DistributionStat, FoundryOutputsQuery, IndexedId, NftOutputsQuery, OutputCollection, OutputMetadata, OutputMetadataResult, OutputWithMetadataResult, OutputsResult, UtxoChangesResult, }, - protocol_update::ProtocolUpdateCollection, }; /// Helper to specify a kind for an output type. diff --git a/src/db/mongodb/collections/outputs/mod.rs b/src/db/mongodb/collections/outputs/mod.rs index 4a0316d71..2c36c5430 100644 --- a/src/db/mongodb/collections/outputs/mod.rs +++ b/src/db/mongodb/collections/outputs/mod.rs @@ -32,7 +32,7 @@ use super::ledger_update::{LedgerOutputRecord, LedgerSpentRecord}; use crate::{ db::{ mongodb::{ - collections::ProtocolUpdateCollection, DbError, InsertIgnoreDuplicatesExt, MongoDbCollection, + collections::ApplicationStateCollection, DbError, InsertIgnoreDuplicatesExt, MongoDbCollection, MongoDbCollectionExt, }, MongoDb, @@ -85,7 +85,7 @@ pub struct SpentMetadata { pub struct OutputCollection { db: mongodb::Database, collection: mongodb::Collection, - protocol_updates: ProtocolUpdateCollection, + app_state: ApplicationStateCollection, } #[async_trait::async_trait] @@ -97,7 +97,7 @@ impl MongoDbCollection for OutputCollection { Self { db: db.db(), collection, - protocol_updates: db.collection(), + app_state: db.collection(), } } @@ -684,11 +684,10 @@ impl OutputCollection { // TODO: handle missing params let protocol_params = self - .protocol_updates - .get_latest_protocol_parameters() + .app_state + .get_protocol_parameters() .await? - .expect("missing protocol parameters") - .parameters; + .expect("missing protocol parameters"); let (start_slot, end_slot) = ( protocol_params.slot_index(start_date.midnight().assume_utc().unix_timestamp() as _), diff --git a/src/db/mongodb/collections/protocol_update.rs b/src/db/mongodb/collections/protocol_update.rs deleted file mode 100644 index 3ab3170c9..000000000 --- a/src/db/mongodb/collections/protocol_update.rs +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright 2023 IOTA Stiftung -// SPDX-License-Identifier: Apache-2.0 - -use futures::{Stream, TryStreamExt}; -use iota_sdk::types::block::{protocol::ProtocolParameters, slot::EpochIndex}; -use mongodb::{ - bson::doc, - options::{FindOneOptions, FindOptions, UpdateOptions}, -}; -use serde::{Deserialize, Serialize}; - -use crate::{ - db::{ - mongodb::{DbError, MongoDbCollection, MongoDbCollectionExt}, - MongoDb, - }, - model::SerializeToBson, -}; - -/// A protocol update document. -#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -pub struct ProtocolUpdateDocument { - #[serde(rename = "_id")] - pub start_epoch: EpochIndex, - pub parameters: ProtocolParameters, -} - -/// The iota protocol parameters collection. -pub struct ProtocolUpdateCollection { - collection: mongodb::Collection, -} - -impl MongoDbCollection for ProtocolUpdateCollection { - const NAME: &'static str = "iota_protocol_updates"; - type Document = ProtocolUpdateDocument; - - fn instantiate(_db: &MongoDb, collection: mongodb::Collection) -> Self { - Self { collection } - } - - fn collection(&self) -> &mongodb::Collection { - &self.collection - } -} - -impl ProtocolUpdateCollection { - /// Gets the latest protocol parameters. - pub async fn get_latest_protocol_parameters(&self) -> Result, DbError> { - Ok(self - .find_one(doc! {}, FindOneOptions::builder().sort(doc! { "_id": -1 }).build()) - .await?) - } - - /// Gets the protocol parameters that are valid for the given ledger index. - pub async fn get_protocol_parameters_for_epoch_index( - &self, - epoch_index: EpochIndex, - ) -> Result, DbError> { - Ok(self - .find_one( - doc! { "_id": { "$lte": epoch_index.0 } }, - FindOneOptions::builder().sort(doc! { "_id": -1 }).build(), - ) - .await?) - } - - /// Gets the protocol parameters for a given protocol version. - pub async fn get_protocol_parameters_for_version( - &self, - version: u8, - ) -> Result, DbError> { - Ok(self - .find_one(doc! { "parameters.version": version as i32 }, None) - .await?) - } - - /// Gets all protocol parameters by their start epoch. - pub async fn get_all_protocol_parameters( - &self, - ) -> Result>, DbError> { - Ok(self - .find(None, FindOptions::builder().sort(doc! { "_id": -1 }).build()) - .await? - .map_err(Into::into)) - } - - /// Add the protocol parameters to the list if the protocol parameters have changed. - pub async fn upsert_protocol_parameters( - &self, - epoch_index: EpochIndex, - parameters: ProtocolParameters, - ) -> Result<(), DbError> { - let params = self.get_protocol_parameters_for_epoch_index(epoch_index).await?; - if !matches!(params, Some(params) if params.parameters == parameters) { - self.update_one( - doc! { "_id": epoch_index.0 }, - doc! { "$set": { - "parameters": parameters.to_bson() - } }, - UpdateOptions::builder().upsert(true).build(), - ) - .await?; - } - Ok(()) - } -} diff --git a/src/inx/client.rs b/src/inx/client.rs index e8c262524..d3397cac5 100644 --- a/src/inx/client.rs +++ b/src/inx/client.rs @@ -39,20 +39,14 @@ impl Inx { Ok(self.inx.read_node_status(proto::NoParams {}).await?.try_convert()?) } - /// Stream status updates from the node. - pub async fn get_node_status_updates( - &mut self, - cooldown_in_milliseconds: u32, - ) -> Result>, InxError> { - Ok(self - .inx - .listen_to_node_status(proto::NodeStatusRequest { - cooldown_in_milliseconds, - }) - .await? - .into_inner() - .map(|msg| TryConvertTo::try_convert(msg?))) - } + // /// Stream status updates from the node. + // pub async fn get_node_status_updates( + // &mut self, + // cooldown_in_milliseconds: u32, + // ) -> Result>, InxError> { Ok(self .inx + // .listen_to_node_status(proto::NodeStatusRequest { cooldown_in_milliseconds, }) .await? .into_inner() .map(|msg| + // TryConvertTo::try_convert(msg?))) + // } /// Get the configuration of the node. pub async fn get_node_configuration(&mut self) -> Result { diff --git a/src/model/node.rs b/src/model/node.rs index 8f6ed612d..f83deb599 100644 --- a/src/model/node.rs +++ b/src/model/node.rs @@ -23,6 +23,12 @@ pub struct NodeConfiguration { pub protocol_parameters: Vec, } +impl NodeConfiguration { + pub fn latest_parameters(&self) -> &iota_sdk::types::block::protocol::ProtocolParameters { + &self.protocol_parameters.last().unwrap().parameters + } +} + pub struct NodeStatus { pub is_healthy: bool, pub accepted_tangle_time: Option, diff --git a/src/tangle/mod.rs b/src/tangle/mod.rs index 98783ee31..48009db08 100644 --- a/src/tangle/mod.rs +++ b/src/tangle/mod.rs @@ -50,8 +50,9 @@ impl Tangle { .ledger_updates(data.commitment.commitment_id.slot_index()) .await?, source, - protocol_params: data.node_config.protocol_parameters.last().unwrap().clone(), - node_config: data.node_config, + protocol_parameters: source + .protocol_parameters(data.commitment.commitment_id.slot_index()) + .await?, commitment: data.commitment, }) } diff --git a/src/tangle/slot_stream.rs b/src/tangle/slot_stream.rs index 4722cc07b..853439e70 100644 --- a/src/tangle/slot_stream.rs +++ b/src/tangle/slot_stream.rs @@ -7,20 +7,19 @@ use std::{ }; use futures::{stream::BoxStream, Stream}; -use iota_sdk::types::block::slot::{SlotCommitment, SlotCommitmentId, SlotIndex}; +use iota_sdk::types::block::{ + protocol::ProtocolParameters, + slot::{SlotCommitment, SlotCommitmentId, SlotIndex}, +}; use super::InputSource; -use crate::model::{ - block_metadata::BlockWithMetadata, ledger::LedgerUpdateStore, node::NodeConfiguration, - protocol::ProtocolParameters, raw::Raw, slot::Commitment, -}; +use crate::model::{block_metadata::BlockWithMetadata, ledger::LedgerUpdateStore, raw::Raw, slot::Commitment}; #[allow(missing_docs)] pub struct Slot<'a, I: InputSource> { pub(super) source: &'a I, pub commitment: Commitment, - pub protocol_params: ProtocolParameters, - pub node_config: NodeConfiguration, + pub protocol_parameters: ProtocolParameters, pub ledger_updates: LedgerUpdateStore, } diff --git a/src/tangle/sources/inx.rs b/src/tangle/sources/inx.rs index 535ae3586..eae78660c 100644 --- a/src/tangle/sources/inx.rs +++ b/src/tangle/sources/inx.rs @@ -5,7 +5,7 @@ use core::ops::RangeBounds; use async_trait::async_trait; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; -use iota_sdk::types::block::slot::SlotIndex; +use iota_sdk::types::block::{protocol::ProtocolParameters, slot::SlotIndex}; use thiserror::Error; use super::{InputSource, SlotData}; @@ -39,13 +39,7 @@ impl InputSource for Inx { .map_err(Self::Error::from) .and_then(move |commitment| { let mut inx = inx.clone(); - async move { - let node_config = inx.get_node_configuration().await?.into(); - Ok(SlotData { - commitment, - node_config, - }) - } + async move { Ok(SlotData { commitment }) } }), )) } @@ -91,4 +85,18 @@ impl InputSource for Inx { Ok(LedgerUpdateStore::init(consumed, created)) } + + async fn protocol_parameters(&self, _index: SlotIndex) -> Result { + let mut inx = self.clone(); + // TODO: eventually we'll have to do this right + Ok(inx + .get_node_configuration() + .await? + .protocol_parameters + .into_iter() + .rev() + .next() + .unwrap() + .parameters) + } } diff --git a/src/tangle/sources/memory.rs b/src/tangle/sources/memory.rs index 7cdac6146..97c413bd7 100644 --- a/src/tangle/sources/memory.rs +++ b/src/tangle/sources/memory.rs @@ -6,7 +6,7 @@ use std::collections::BTreeMap; use async_trait::async_trait; use futures::stream::BoxStream; -use iota_sdk::types::block::{slot::SlotIndex, BlockId}; +use iota_sdk::types::block::{protocol::ProtocolParameters, slot::SlotIndex, BlockId}; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -57,4 +57,8 @@ impl InputSource for BTreeMap { .ledger_updates .clone()) } + + async fn protocol_parameters(&self, index: SlotIndex) -> Result { + todo!() + } } diff --git a/src/tangle/sources/mod.rs b/src/tangle/sources/mod.rs index eefd6c359..b2f2f648b 100644 --- a/src/tangle/sources/mod.rs +++ b/src/tangle/sources/mod.rs @@ -10,18 +10,15 @@ use core::ops::RangeBounds; use async_trait::async_trait; use futures::stream::BoxStream; -use iota_sdk::types::block::slot::SlotIndex; +use iota_sdk::types::block::{protocol::ProtocolParameters, slot::SlotIndex}; use serde::{Deserialize, Serialize}; -use crate::model::{ - block_metadata::BlockWithMetadata, ledger::LedgerUpdateStore, node::NodeConfiguration, slot::Commitment, -}; +use crate::model::{block_metadata::BlockWithMetadata, ledger::LedgerUpdateStore, slot::Commitment}; #[derive(Clone, Debug, Serialize, Deserialize)] #[allow(missing_docs)] pub struct SlotData { pub commitment: Commitment, - pub node_config: NodeConfiguration, } /// Defines a type as a source for block and ledger update data. @@ -44,4 +41,7 @@ pub trait InputSource: Send + Sync { /// Retrieves the updates to the ledger for a given range of slots. async fn ledger_updates(&self, index: SlotIndex) -> Result; + + /// Retrieves the protocol parameters for the given slot index. + async fn protocol_parameters(&self, index: SlotIndex) -> Result; } diff --git a/src/tangle/sources/mongodb.rs b/src/tangle/sources/mongodb.rs index bbf691216..aabb2ae8f 100644 --- a/src/tangle/sources/mongodb.rs +++ b/src/tangle/sources/mongodb.rs @@ -5,13 +5,16 @@ use core::ops::RangeBounds; use async_trait::async_trait; use futures::{stream::BoxStream, TryStreamExt}; -use iota_sdk::types::block::slot::SlotIndex; +use iota_sdk::types::block::{protocol::ProtocolParameters, slot::SlotIndex}; use thiserror::Error; use super::{InputSource, SlotData}; use crate::{ db::{ - mongodb::{collections::OutputCollection, DbError}, + mongodb::{ + collections::{ApplicationStateCollection, OutputCollection}, + DbError, + }, MongoDb, }, model::{block_metadata::BlockWithMetadata, ledger::LedgerUpdateStore}, @@ -19,8 +22,6 @@ use crate::{ #[derive(Debug, Error)] pub enum MongoDbInputSourceError { - #[error("missing node config for ledger index {0}")] - MissingNodeConfig(SlotIndex), #[error("missing protocol params for ledger index {0}")] MissingProtocolParams(SlotIndex), #[error(transparent)] @@ -91,4 +92,12 @@ impl InputSource for MongoDb { Ok(LedgerUpdateStore::init(consumed, created)) } + + async fn protocol_parameters(&self, index: SlotIndex) -> Result { + Ok(self + .collection::() + .get_protocol_parameters() + .await? + .ok_or_else(|| MongoDbInputSourceError::MissingProtocolParams(index))?) + } }