Skip to content

Commit

Permalink
[storage] simplify wake_and_wait_pruner
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse committed Aug 22, 2022
1 parent 205d2ef commit 7351edc
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 67 deletions.
4 changes: 2 additions & 2 deletions storage/aptosdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ fn error_if_too_many_requested(num_requested: u64, max_allowed: u64) -> Result<(
}
}

fn error_if_version_is_pruned(
pruner: &(dyn PrunerManager),
fn error_if_version_is_pruned<P: PrunerManager>(
pruner: &P,
data_type: &str,
version: Version,
) -> Result<()> {
Expand Down
40 changes: 6 additions & 34 deletions storage/aptosdb/src/pruner/ledger_pruner_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ pub(crate) struct LedgerPrunerManager {
}

impl PrunerManager for LedgerPrunerManager {
type Pruner = LedgerPruner;

fn pruner(&self) -> &Self::Pruner {
&self.pruner
}

fn is_pruner_enabled(&self) -> bool {
self.pruner_enabled
}
Expand Down Expand Up @@ -90,40 +96,6 @@ impl PrunerManager for LedgerPrunerManager {
.as_ref()
.set_target_db_version(latest_version.saturating_sub(self.prune_window));
}

#[cfg(test)]
fn wake_and_wait_pruner(&self, latest_version: Version) -> anyhow::Result<()> {
use std::{
thread::sleep,
time::{Duration, Instant},
};

*self.latest_version.lock() = latest_version;

if latest_version
>= *self.last_version_sent_to_pruner.as_ref().lock() + self.pruning_batch_size as u64
{
self.set_pruner_target_db_version(latest_version);
*self.last_version_sent_to_pruner.as_ref().lock() = latest_version;
}

if self.pruner_enabled && latest_version > self.prune_window {
let min_readable_ledger_version = latest_version - self.prune_window;

// Assuming no big pruning chunks will be issued by a test.
const TIMEOUT: Duration = Duration::from_secs(10);
let end = Instant::now() + TIMEOUT;

while Instant::now() < end {
if self.get_min_readable_version() >= min_readable_ledger_version {
return Ok(());
}
sleep(Duration::from_millis(1));
}
anyhow::bail!("Timeout waiting for pruner worker.");
}
Ok(())
}
}

impl LedgerPrunerManager {
Expand Down
30 changes: 29 additions & 1 deletion storage/aptosdb/src/pruner/pruner_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0

use crate::pruner::db_pruner::DBPruner;
use aptos_types::transaction::Version;
use std::fmt::Debug;

Expand All @@ -13,6 +14,8 @@ use std::fmt::Debug;
/// It creates a worker thread on construction and joins it on destruction. When destructed, it
/// quits the worker thread eagerly without waiting for all pending work to be done.
pub trait PrunerManager: Debug + Sync {
type Pruner: DBPruner;

fn is_pruner_enabled(&self) -> bool;

fn get_pruner_window(&self) -> Version;
Expand All @@ -26,8 +29,33 @@ pub trait PrunerManager: Debug + Sync {

fn set_pruner_target_db_version(&self, latest_version: Version);

fn pruner(&self) -> &Self::Pruner;

/// (For tests only.) Notifies the worker thread and waits for it to finish its job by polling
/// an internal counter.
#[cfg(test)]
fn wake_and_wait_pruner(&self, latest_version: Version) -> anyhow::Result<()>;
fn wake_and_wait_pruner(&self, latest_version: Version) -> anyhow::Result<()> {
use std::{
thread::sleep,
time::{Duration, Instant},
};

if !self.is_pruner_enabled() {
return Ok(());
}

self.maybe_set_pruner_target_db_version(latest_version);

// Assuming no big pruning chunks will be issued by a test.
const TIMEOUT: Duration = Duration::from_secs(10);
let end = Instant::now() + TIMEOUT;

while Instant::now() < end {
if !self.pruner().is_pruning_pending() {
return Ok(());
}
sleep(Duration::from_millis(1));
}
anyhow::bail!("Timeout waiting for pruner worker.");
}
}
36 changes: 6 additions & 30 deletions storage/aptosdb/src/pruner/state_pruner_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ pub struct StatePrunerManager {
}

impl PrunerManager for StatePrunerManager {
type Pruner = StateMerklePruner;

fn pruner(&self) -> &Self::Pruner {
&self.pruner
}

fn is_pruner_enabled(&self) -> bool {
self.pruner_enabled
}
Expand Down Expand Up @@ -92,36 +98,6 @@ impl PrunerManager for StatePrunerManager {
.as_ref()
.set_target_db_version(latest_version.saturating_sub(self.prune_window));
}

/// (For tests only.) Notifies the worker thread and waits for it to finish its job by polling
/// an internal counter.
#[cfg(test)]
fn wake_and_wait_pruner(&self, latest_version: Version) -> anyhow::Result<()> {
use std::{
thread::sleep,
time::{Duration, Instant},
};

*self.latest_version.lock() = latest_version;
self.set_pruner_target_db_version(latest_version);

if self.pruner_enabled && latest_version > self.prune_window {
let min_readable_state_store_version = latest_version - self.prune_window;

// Assuming no big pruning chunks will be issued by a test.
const TIMEOUT: Duration = Duration::from_secs(10);
let end = Instant::now() + TIMEOUT;

while Instant::now() < end {
if self.get_min_readable_version() >= min_readable_state_store_version {
return Ok(());
}
sleep(Duration::from_millis(1));
}
anyhow::bail!("Timeout waiting for pruner worker.");
}
Ok(())
}
}

impl StatePrunerManager {
Expand Down

0 comments on commit 7351edc

Please sign in to comment.