diff --git a/storage/aptosdb/src/lib.rs b/storage/aptosdb/src/lib.rs index 9779a105d0cc5..16163df127e71 100644 --- a/storage/aptosdb/src/lib.rs +++ b/storage/aptosdb/src/lib.rs @@ -170,8 +170,8 @@ fn error_if_too_many_requested(num_requested: u64, max_allowed: u64) -> Result<( } } -fn error_if_version_is_pruned( - pruner: &(dyn PrunerManager), +fn error_if_version_is_pruned( + pruner: &P, data_type: &str, version: Version, ) -> Result<()> { diff --git a/storage/aptosdb/src/pruner/ledger_pruner_manager.rs b/storage/aptosdb/src/pruner/ledger_pruner_manager.rs index 70bf7f3cb6e68..5493f25ec3393 100644 --- a/storage/aptosdb/src/pruner/ledger_pruner_manager.rs +++ b/storage/aptosdb/src/pruner/ledger_pruner_manager.rs @@ -43,6 +43,12 @@ pub(crate) struct LedgerPrunerManager { } impl PrunerManager for LedgerPrunerManager { + type Pruner = LedgerPruner; + + fn pruner(&self) -> &Self::Pruner { + &self.pruner + } + fn is_pruner_enabled(&self) -> bool { self.pruner_enabled } @@ -90,40 +96,6 @@ impl PrunerManager for LedgerPrunerManager { .as_ref() .set_target_db_version(latest_version.saturating_sub(self.prune_window)); } - - #[cfg(test)] - fn wake_and_wait_pruner(&self, latest_version: Version) -> anyhow::Result<()> { - use std::{ - thread::sleep, - time::{Duration, Instant}, - }; - - *self.latest_version.lock() = latest_version; - - if latest_version - >= *self.last_version_sent_to_pruner.as_ref().lock() + self.pruning_batch_size as u64 - { - self.set_pruner_target_db_version(latest_version); - *self.last_version_sent_to_pruner.as_ref().lock() = latest_version; - } - - if self.pruner_enabled && latest_version > self.prune_window { - let min_readable_ledger_version = latest_version - self.prune_window; - - // Assuming no big pruning chunks will be issued by a test. - const TIMEOUT: Duration = Duration::from_secs(10); - let end = Instant::now() + TIMEOUT; - - while Instant::now() < end { - if self.get_min_readable_version() >= min_readable_ledger_version { - return Ok(()); - } - sleep(Duration::from_millis(1)); - } - anyhow::bail!("Timeout waiting for pruner worker."); - } - Ok(()) - } } impl LedgerPrunerManager { diff --git a/storage/aptosdb/src/pruner/pruner_manager.rs b/storage/aptosdb/src/pruner/pruner_manager.rs index a31e81b2cc42b..0f92b3967fd4d 100644 --- a/storage/aptosdb/src/pruner/pruner_manager.rs +++ b/storage/aptosdb/src/pruner/pruner_manager.rs @@ -1,6 +1,7 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 +use crate::pruner::db_pruner::DBPruner; use aptos_types::transaction::Version; use std::fmt::Debug; @@ -13,6 +14,8 @@ use std::fmt::Debug; /// It creates a worker thread on construction and joins it on destruction. When destructed, it /// quits the worker thread eagerly without waiting for all pending work to be done. pub trait PrunerManager: Debug + Sync { + type Pruner: DBPruner; + fn is_pruner_enabled(&self) -> bool; fn get_pruner_window(&self) -> Version; @@ -26,8 +29,33 @@ pub trait PrunerManager: Debug + Sync { fn set_pruner_target_db_version(&self, latest_version: Version); + fn pruner(&self) -> &Self::Pruner; + /// (For tests only.) Notifies the worker thread and waits for it to finish its job by polling /// an internal counter. #[cfg(test)] - fn wake_and_wait_pruner(&self, latest_version: Version) -> anyhow::Result<()>; + fn wake_and_wait_pruner(&self, latest_version: Version) -> anyhow::Result<()> { + use std::{ + thread::sleep, + time::{Duration, Instant}, + }; + + if !self.is_pruner_enabled() { + return Ok(()); + } + + self.maybe_set_pruner_target_db_version(latest_version); + + // Assuming no big pruning chunks will be issued by a test. + const TIMEOUT: Duration = Duration::from_secs(10); + let end = Instant::now() + TIMEOUT; + + while Instant::now() < end { + if !self.pruner().is_pruning_pending() { + return Ok(()); + } + sleep(Duration::from_millis(1)); + } + anyhow::bail!("Timeout waiting for pruner worker."); + } } diff --git a/storage/aptosdb/src/pruner/state_pruner_manager.rs b/storage/aptosdb/src/pruner/state_pruner_manager.rs index f8ee473062034..2c9700d78b2fd 100644 --- a/storage/aptosdb/src/pruner/state_pruner_manager.rs +++ b/storage/aptosdb/src/pruner/state_pruner_manager.rs @@ -50,6 +50,12 @@ pub struct StatePrunerManager { } impl PrunerManager for StatePrunerManager { + type Pruner = StateMerklePruner; + + fn pruner(&self) -> &Self::Pruner { + &self.pruner + } + fn is_pruner_enabled(&self) -> bool { self.pruner_enabled } @@ -92,36 +98,6 @@ impl PrunerManager for StatePrunerManager { .as_ref() .set_target_db_version(latest_version.saturating_sub(self.prune_window)); } - - /// (For tests only.) Notifies the worker thread and waits for it to finish its job by polling - /// an internal counter. - #[cfg(test)] - fn wake_and_wait_pruner(&self, latest_version: Version) -> anyhow::Result<()> { - use std::{ - thread::sleep, - time::{Duration, Instant}, - }; - - *self.latest_version.lock() = latest_version; - self.set_pruner_target_db_version(latest_version); - - if self.pruner_enabled && latest_version > self.prune_window { - let min_readable_state_store_version = latest_version - self.prune_window; - - // Assuming no big pruning chunks will be issued by a test. - const TIMEOUT: Duration = Duration::from_secs(10); - let end = Instant::now() + TIMEOUT; - - while Instant::now() < end { - if self.get_min_readable_version() >= min_readable_state_store_version { - return Ok(()); - } - sleep(Duration::from_millis(1)); - } - anyhow::bail!("Timeout waiting for pruner worker."); - } - Ok(()) - } } impl StatePrunerManager {