Skip to content

Commit

Permalink
remove excess collections
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Coats committed Nov 8, 2023
1 parent 2b4d544 commit 39e838e
Show file tree
Hide file tree
Showing 22 changed files with 185 additions and 351 deletions.
23 changes: 7 additions & 16 deletions src/analytics/ledger/address_balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@

use std::collections::HashMap;

use iota_sdk::types::block::{
address::{Bech32Address, ToBech32Ext},
protocol::ProtocolParameters,
};
use iota_sdk::types::block::address::Address;
use serde::{Deserialize, Serialize};

use crate::{
Expand All @@ -32,20 +29,16 @@ pub(crate) struct DistributionStat {
/// Computes the number of addresses the currently hold a balance.
#[derive(Serialize, Deserialize)]
pub(crate) struct AddressBalancesAnalytics {
balances: HashMap<Bech32Address, u64>,
balances: HashMap<Address, u64>,
}

impl AddressBalancesAnalytics {
/// Initialize the analytics by reading the current ledger state.
pub(crate) fn init<'a>(
unspent_outputs: impl IntoIterator<Item = &'a LedgerOutput>,
protocol_params: &ProtocolParameters,
) -> Self {
let hrp = protocol_params.bech32_hrp();
pub(crate) fn init<'a>(unspent_outputs: impl IntoIterator<Item = &'a LedgerOutput>) -> Self {
let mut balances = HashMap::new();
for output in unspent_outputs {
if let Some(a) = output.address() {
*balances.entry(a.clone().to_bech32(hrp)).or_default() += output.amount();
*balances.entry(a.clone()).or_default() += output.amount();
}
}
Self { balances }
Expand All @@ -56,15 +49,13 @@ impl Analytics for AddressBalancesAnalytics {
type Measurement = AddressBalanceMeasurement;

fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], ctx: &dyn AnalyticsContext) {

Check warning on line 51 in src/analytics/ledger/address_balance.rs

View workflow job for this annotation

GitHub Actions / check and test / ubuntu-latest, stable

unused variable: `ctx`
let hrp = ctx.protocol_params().bech32_hrp();
for output in consumed {
if let Some(a) = output.address() {
let a = a.clone().to_bech32(hrp);
// All inputs should be present in `addresses`. If not, we skip it's value.
if let Some(amount) = self.balances.get_mut(&a) {
if let Some(amount) = self.balances.get_mut(a) {
*amount -= output.amount();
if *amount == 0 {
self.balances.remove(&a);
self.balances.remove(a);
}
}
}
Expand All @@ -73,7 +64,7 @@ impl Analytics for AddressBalancesAnalytics {
for output in created {
if let Some(a) = output.address() {
// All inputs should be present in `addresses`. If not, we skip it's value.
*self.balances.entry(a.clone().to_bech32(hrp)).or_default() += output.amount();
*self.balances.entry(a.clone()).or_default() += output.amount();
}
}
}
Expand Down
8 changes: 3 additions & 5 deletions src/analytics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub trait AnalyticsContext: Send + Sync {

impl<'a, I: InputSource> AnalyticsContext for Slot<'a, I> {
fn protocol_params(&self) -> &ProtocolParameters {
&self.protocol_params.parameters
&self.protocol_parameters
}

fn slot_index(&self) -> SlotIndex {
Expand Down Expand Up @@ -158,9 +158,7 @@ impl Analytic {
unspent_outputs: impl IntoIterator<Item = &'a LedgerOutput>,
) -> Self {
Self(match choice {
AnalyticsChoice::AddressBalance => {
Box::new(AddressBalancesAnalytics::init(unspent_outputs, &protocol_params)) as _
}
AnalyticsChoice::AddressBalance => Box::new(AddressBalancesAnalytics::init(unspent_outputs)) as _,
AnalyticsChoice::BaseTokenActivity => Box::<BaseTokenActivityMeasurement>::default() as _,
AnalyticsChoice::BlockActivity => Box::<BlockActivityMeasurement>::default() as _,
AnalyticsChoice::ActiveAddresses => Box::<AddressActivityAnalytics>::default() as _,
Expand Down Expand Up @@ -446,7 +444,7 @@ mod test {
) -> Self {
Self {
active_addresses: Default::default(),
address_balance: AddressBalancesAnalytics::init(unspent_outputs, &protocol_params),
address_balance: AddressBalancesAnalytics::init(unspent_outputs),
base_tokens: Default::default(),
ledger_outputs: LedgerOutputMeasurement::init(unspent_outputs),
ledger_size: LedgerSizeAnalytics::init(protocol_params, unspent_outputs),
Expand Down
29 changes: 12 additions & 17 deletions src/bin/inx-chronicle/api/core/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ use axum::{
use chronicle::{
db::{
mongodb::collections::{
BlockCollection, CommittedSlotCollection, ConfigurationUpdateCollection, OutputCollection, OutputMetadata,
OutputWithMetadataResult, ProtocolUpdateCollection, UtxoChangesResult,
ApplicationStateCollection, BlockCollection, CommittedSlotCollection, OutputCollection, OutputMetadata,
OutputWithMetadataResult, UtxoChangesResult,
},
MongoDb,
},
model::block_metadata::BlockMetadata,
};
use futures::TryStreamExt;
use iota_sdk::types::{
api::core::{
BaseTokenResponse, BlockMetadataResponse, OutputWithMetadataResponse, ProtocolParametersResponse,
Expand Down Expand Up @@ -88,30 +87,26 @@ pub fn routes() -> Router {
}

pub async fn info(database: Extension<MongoDb>) -> ApiResult<InfoResponse> {
let protocol_parameters = database
.collection::<ProtocolUpdateCollection>()
.get_all_protocol_parameters()
let node_config = database
.collection::<ApplicationStateCollection>()
.get_node_config()
.await?
.map_ok(|doc| ProtocolParametersResponse {
.ok_or(CorruptStateError::NodeConfig)?;
let protocol_parameters = node_config
.protocol_parameters
.into_iter()
.map(|doc| ProtocolParametersResponse {
parameters: doc.parameters,
start_epoch: doc.start_epoch,
})
.try_collect::<Vec<_>>()
.await
.map_err(|_| CorruptStateError::ProtocolParams)?;
.collect::<Vec<_>>();

let is_healthy = is_healthy(&database).await.unwrap_or_else(|ApiError { error, .. }| {
tracing::error!("An error occured during health check: {error}");
false
});

let base_token = database
.collection::<ConfigurationUpdateCollection>()
.get_latest_node_configuration()
.await?
.ok_or(CorruptStateError::NodeConfig)?
.config
.base_token;
let base_token = node_config.base_token;

let latest_commitment_id = database
.collection::<CommittedSlotCollection>()
Expand Down
12 changes: 5 additions & 7 deletions src/bin/inx-chronicle/api/explorer/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use axum::{extract::Path, routing::get, Extension};
use chronicle::db::{
mongodb::collections::{
BlockCollection, CommittedSlotCollection, LedgerUpdateCollection, OutputCollection, ProtocolUpdateCollection,
ApplicationStateCollection, BlockCollection, CommittedSlotCollection, LedgerUpdateCollection, OutputCollection,
},
MongoDb,
};
Expand Down Expand Up @@ -105,11 +105,10 @@ async fn ledger_updates_by_slot(
LedgerUpdatesBySlotPagination { page_size, cursor }: LedgerUpdatesBySlotPagination,
) -> ApiResult<LedgerUpdatesBySlotResponse> {
let hrp = database
.collection::<ProtocolUpdateCollection>()
.get_latest_protocol_parameters()
.collection::<ApplicationStateCollection>()
.get_protocol_parameters()
.await?
.ok_or(CorruptStateError::ProtocolParams)?
.parameters
.bech32_hrp();

let mut record_stream = database
Expand Down Expand Up @@ -298,11 +297,10 @@ async fn richest_addresses_ledger_analytics(
.await?;

let hrp = database
.collection::<ProtocolUpdateCollection>()
.get_latest_protocol_parameters()
.collection::<ApplicationStateCollection>()
.get_protocol_parameters()
.await?
.ok_or(CorruptStateError::ProtocolParams)?
.parameters
.bech32_hrp();

Ok(RichestAddressesResponse {
Expand Down
2 changes: 1 addition & 1 deletion src/bin/inx-chronicle/api/poi/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use axum::{
Extension,
};
use chronicle::db::{
mongodb::collections::{BlockCollection, CommittedSlotCollection, ConfigurationUpdateCollection},
mongodb::collections::{BlockCollection, CommittedSlotCollection},
MongoDb,
};
use iota_sdk::types::{api::core::BlockState, block::BlockId, TryFromDto};
Expand Down
7 changes: 3 additions & 4 deletions src/bin/inx-chronicle/api/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use axum::{
Extension, Json, TypedHeader,
};
use chronicle::db::{
mongodb::collections::{CommittedSlotCollection, ProtocolUpdateCollection},
mongodb::collections::{ApplicationStateCollection, CommittedSlotCollection},
MongoDb,
};
use hyper::StatusCode;
Expand Down Expand Up @@ -145,10 +145,9 @@ pub async fn is_healthy(database: &MongoDb) -> ApiResult<bool> {
.await?
{
if let Some(protocol_params) = database
.collection::<ProtocolUpdateCollection>()
.get_latest_protocol_parameters()
.collection::<ApplicationStateCollection>()
.get_protocol_parameters()
.await?
.map(|p| p.parameters)
{
if is_new_enough(newest_slot.slot_index.to_timestamp(
protocol_params.genesis_unix_timestamp(),
Expand Down
15 changes: 7 additions & 8 deletions src/bin/inx-chronicle/cli/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use chronicle::{
config::{all_analytics, all_interval_analytics, IntervalAnalyticsChoice},
AnalyticsChoice, InfluxDb,
},
mongodb::collections::{OutputCollection, ProtocolUpdateCollection},
mongodb::collections::{ApplicationStateCollection, OutputCollection},
MongoDb,
},
tangle::{InputSource, Tangle},
Expand Down Expand Up @@ -91,11 +91,10 @@ impl FillAnalyticsCommand {
tracing::info!("Connecting to database using hosts: `{}`.", config.mongodb.hosts_str()?);
let db = MongoDb::connect(&config.mongodb).await?;
let protocol_params = db
.collection::<ProtocolUpdateCollection>()
.get_latest_protocol_parameters()
.collection::<ApplicationStateCollection>()
.get_protocol_parameters()
.await?
.ok_or_else(|| eyre::eyre!("No protocol parameters in database."))?
.parameters;
.ok_or_else(|| eyre::eyre!("No protocol parameters in database."))?;
let start_index = if let Some(index) = start_index {
*index
} else if let Some(start_date) = start_date {
Expand Down Expand Up @@ -234,7 +233,7 @@ pub async fn fill_analytics<I: 'static + InputSource + Clone>(

if let Some(slot) = slot_stream.try_next().await? {
// Check if the protocol params changed (or we just started)
if !matches!(&state, Some(state) if state.prev_protocol_params == slot.protocol_params.parameters) {
if !matches!(&state, Some(state) if state.prev_protocol_params == slot.protocol_parameters) {
// Only get the ledger state for slots after the genesis since it requires
// getting the previous slot data.
let ledger_state = if slot.slot_index().0 > 0 {
Expand All @@ -249,11 +248,11 @@ pub async fn fill_analytics<I: 'static + InputSource + Clone>(

let analytics = analytics_choices
.iter()
.map(|choice| Analytic::init(choice, &slot.protocol_params.parameters, &ledger_state))
.map(|choice| Analytic::init(choice, &slot.protocol_parameters, &ledger_state))
.collect::<Vec<_>>();
state = Some(AnalyticsState {
analytics,
prev_protocol_params: slot.protocol_params.parameters.clone(),
prev_protocol_params: slot.protocol_parameters.clone(),
});
}

Expand Down
6 changes: 3 additions & 3 deletions src/bin/inx-chronicle/inx/influx/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl InxWorker {
if let (Some(influx_db), analytics_choices) = (&self.influx_db, analytics_choices) {
if influx_db.config().analytics_enabled {
// Check if the protocol params changed (or we just started)
if !matches!(&state, Some(state) if state.prev_protocol_params == slot.protocol_params.parameters) {
if !matches!(&state, Some(state) if state.prev_protocol_params == slot.protocol_parameters) {
let ledger_state = self
.db
.collection::<OutputCollection>()
Expand All @@ -71,11 +71,11 @@ impl InxWorker {

let analytics = analytics_choices
.iter()
.map(|choice| Analytic::init(choice, &slot.protocol_params.parameters, &ledger_state))
.map(|choice| Analytic::init(choice, &slot.protocol_parameters, &ledger_state))
.collect::<Vec<_>>();
*state = Some(AnalyticsState {
analytics,
prev_protocol_params: slot.protocol_params.parameters.clone(),
prev_protocol_params: slot.protocol_parameters.clone(),
});
}

Expand Down
Loading

0 comments on commit 39e838e

Please sign in to comment.