diff --git a/Cargo.toml b/Cargo.toml index c9dfa9dc..2c2d747b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,7 +58,7 @@ crc32fast = "1" crossbeam-skiplist = "0.1" datafusion = { version = "42", optional = true } flume = { version = "0.11", features = ["async"] } -fusio = { package = "fusio", version = "0.3.3", features = [ +fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "a86969d0564f010f4f1ab136deae4cbe638d8f37", package = "fusio", version = "0.3.3", features = [ "aws", "dyn", "fs", @@ -66,11 +66,11 @@ fusio = { package = "fusio", version = "0.3.3", features = [ "tokio", "tokio-http", ] } -fusio-dispatch = { package = "fusio-dispatch", version = "0.2.1", features = [ +fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "a86969d0564f010f4f1ab136deae4cbe638d8f37", package = "fusio-dispatch", version = "0.2.1", features = [ "aws", "tokio", ] } -fusio-parquet = { package = "fusio-parquet", version = "0.2.1" } +fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "a86969d0564f010f4f1ab136deae4cbe638d8f37", package = "fusio-parquet", version = "0.2.1" } futures-core = "0.3" futures-io = "0.3" futures-util = "0.3" diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 21b170ca..0aa43fe1 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -9,8 +9,8 @@ crate-type = ["cdylib"] [workspace] [dependencies] -fusio = { package = "fusio", version = "0.3.1", features = ["aws", "tokio"] } -fusio-dispatch = { package = "fusio-dispatch", version = "0.2.0", features = [ +fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "a86969d0564f010f4f1ab136deae4cbe638d8f37", package = "fusio", version = "0.3.1", features = ["aws", "tokio"] } +fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "a86969d0564f010f4f1ab136deae4cbe638d8f37", package = "fusio-dispatch", version = "0.2.0", features = [ "aws", "tokio", ] } diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index eac33d95..a494015e 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -539,7 +539,7 @@ pub(crate) mod tests { where R: Record + Send, { - let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); + let trigger = TriggerFactory::create(option.trigger_type); let mutable: Mutable = Mutable::new(option, trigger, fs).await?; diff --git a/src/fs/manager.rs b/src/fs/manager.rs index 6ae83720..b325e590 100644 --- a/src/fs/manager.rs +++ b/src/fs/manager.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, sync::Arc}; -use fusio::{dynamic::DynFs, path::Path, Error}; +use fusio::{dynamic, dynamic::DynFs, path::Path, Error}; use fusio_dispatch::FsOptions; pub struct StoreManager { @@ -32,4 +32,21 @@ impl StoreManager { } } +pub async fn copy( + from_fs: &Arc, + from_path: &Path, + to_fs: &Arc, + to_path: &Path, +) -> Result<(), Error> { + if from_fs.file_system() == to_fs.file_system() { + match from_fs.link(from_path, to_path).await { + Err(Error::Unsupported { .. }) => (), + result => return result, + } + } + dynamic::fs::copy(from_fs, from_path, to_fs, to_path).await?; + + Ok(()) +} + // TODO: TestCases diff --git a/src/inmem/immutable.rs b/src/inmem/immutable.rs index aad9f7ec..56c30de5 100644 --- a/src/inmem/immutable.rs +++ b/src/inmem/immutable.rs @@ -15,7 +15,7 @@ use crate::{ timestamp::{Timestamp, Timestamped, TimestampedRef, EPOCH}, }; -pub trait ArrowArrays: Sized + Sync { +pub trait ArrowArrays: Sized + Send + Sync { type Record: Record; type Builder: Builder; diff --git a/src/inmem/mutable.rs b/src/inmem/mutable.rs index 230baffa..4494031b 100644 --- a/src/inmem/mutable.rs +++ b/src/inmem/mutable.rs @@ -38,7 +38,7 @@ where { pub(crate) data: SkipMap, Option>, wal: Option, R>>>, - pub(crate) trigger: Arc + Send + Sync>>, + pub(crate) trigger: Arc + Send + Sync>, } impl Mutable @@ -47,7 +47,7 @@ where { pub async fn new( option: &DbOption, - trigger: Arc + Send + Sync>>, + trigger: Arc + Send + Sync>, fs: &Arc, ) -> Result { let mut wal = None; @@ -193,6 +193,14 @@ where } Ok(()) } + + pub(crate) async fn wal_id(&self) -> Option { + if let Some(wal) = self.wal.as_ref() { + let wal_guard = wal.lock().await; + return Some(wal_guard.file_id()); + } + None + } } impl Mutable @@ -231,7 +239,7 @@ mod tests { let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); - let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); + let trigger = TriggerFactory::create(option.trigger_type); let mem_table = Mutable::::new(&option, trigger, &fs).await.unwrap(); mem_table @@ -279,7 +287,7 @@ mod tests { let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); - let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); + let trigger = TriggerFactory::create(option.trigger_type); let mutable = Mutable::::new(&option, trigger, &fs).await.unwrap(); @@ -367,7 +375,7 @@ mod tests { let fs = Arc::new(TokioFs) as Arc; fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); - let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); + let trigger = TriggerFactory::create(option.trigger_type); let mutable = Mutable::::new(&option, trigger, &fs) .await diff --git a/src/lib.rs b/src/lib.rs index a45f508f..964b716a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -137,6 +137,7 @@ pub use arrow; use async_lock::RwLock; use async_stream::stream; use flume::{bounded, Sender}; +use fusio::{path::Path, DynFs}; use futures_core::Stream; use futures_util::StreamExt; use inmem::{immutable::Immutable, mutable::Mutable}; @@ -159,7 +160,10 @@ pub use crate::option::*; use crate::{ compaction::{CompactTask, CompactionError, Compactor}, executor::Executor, - fs::{manager::StoreManager, parse_file_id, FileId, FileType}, + fs::{ + manager::{copy, StoreManager}, + parse_file_id, FileId, FileType, + }, serdes::Decode, snapshot::Snapshot, stream::{ @@ -167,7 +171,7 @@ use crate::{ ScanStream, }, timestamp::Timestamped, - trigger::{Trigger, TriggerFactory}, + trigger::{LengthTrigger, Trigger, TriggerFactory}, version::{cleaner::Cleaner, set::VersionSet, TransactionTs, Version, VersionError}, wal::{log::LogType, RecoverError, WalFile}, }; @@ -246,7 +250,14 @@ where let version_set = VersionSet::new(clean_sender, option.clone(), manager.clone()).await?; let schema = Arc::new(RwLock::new( - Schema::new(option.clone(), task_tx, &version_set, instance, &manager).await?, + Schema::new( + option.clone(), + Some(task_tx), + || version_set.increase_ts(), + instance, + &manager, + ) + .await?, )); let mut compactor = Compactor::::new( schema.clone(), @@ -296,6 +307,7 @@ where Snapshot::new( self.schema.read().await, self.version_set.current().await, + self.version_set.options().clone(), self.manager.clone(), ) } @@ -327,10 +339,11 @@ where pub async fn flush(&self) -> Result<(), CommitError> { let (tx, rx) = oneshot::channel(); - let compaction_tx = { self.schema.read().await.compaction_tx.clone() }; - compaction_tx - .send_async(CompactTask::Flush(Some(tx))) - .await?; + if let Some(compaction_tx) = self.schema.read().await.compaction_tx.clone() { + compaction_tx + .send_async(CompactTask::Flush(Some(tx))) + .await?; + } rx.await.map_err(|_| CommitError::ChannelClose)?; @@ -393,7 +406,9 @@ where let schema = self.schema.read().await; if schema.write(LogType::Full, record, ts).await? { - let _ = schema.compaction_tx.try_send(CompactTask::Freeze); + if let Some(compaction_tx) = &schema.compaction_tx { + let _ = compaction_tx.try_send(CompactTask::Freeze); + } } Ok(()) @@ -421,8 +436,8 @@ where } else { schema.write(LogType::Full, first, ts).await? }; - if is_excess { - let _ = schema.compaction_tx.try_send(CompactTask::Freeze); + if let (Some(compaction_tx), true) = (&schema.compaction_tx, is_excess) { + let _ = compaction_tx.try_send(CompactTask::Freeze); } }; @@ -441,9 +456,9 @@ where { pub mutable: Mutable, pub immutables: Vec<(Option, Immutable)>, - compaction_tx: Sender, + compaction_tx: Option>, recover_wal_ids: Option>, - trigger: Arc + Send + Sync>>, + trigger: Arc + Send + Sync>, record_instance: RecordInstance, } @@ -451,14 +466,14 @@ impl Schema where R: Record + Send, { - async fn new( + async fn new Timestamp>( option: Arc>, - compaction_tx: Sender, - version_set: &VersionSet, + compaction_tx: Option>, + fn_increase_ts: F, record_instance: RecordInstance, manager: &StoreManager, ) -> Result> { - let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); + let trigger = TriggerFactory::create(option.trigger_type); let mut schema = Schema { mutable: Mutable::new(&option, trigger.clone(), manager.base_fs()).await?, immutables: Default::default(), @@ -468,14 +483,25 @@ where record_instance, }; - let base_fs = manager.base_fs(); + schema.recover_wal_ids = + Some(Self::recover(option, fn_increase_ts, manager.base_fs(), &mut schema).await?); + + Ok(schema) + } + + async fn recover Timestamp>( + option: Arc>, + fn_increase_ts: F, + fs: &Arc, + schema: &mut Schema, + ) -> Result, DbError> { let wal_dir_path = option.wal_dir_path(); let mut transaction_map = HashMap::new(); let mut wal_ids = Vec::new(); let wal_metas = { let mut wal_metas = Vec::new(); - let mut wal_stream = base_fs.list(&wal_dir_path).await?; + let mut wal_stream = fs.list(&wal_dir_path).await?; while let Some(file_meta) = wal_stream.next().await { wal_metas.push(file_meta?); @@ -487,7 +513,7 @@ where for wal_meta in wal_metas { let wal_path = wal_meta.path; - let file = base_fs + let file = fs .open_options(&wal_path, FileType::Wal.open_options(false)) .await?; // SAFETY: wal_stream return only file name @@ -502,7 +528,7 @@ where let is_excess = match log_type { LogType::Full => { schema - .recover_append(key, version_set.increase_ts(), value_option) + .recover_append(key, fn_increase_ts(), value_option) .await? } LogType::First => { @@ -521,7 +547,7 @@ where let mut records = transaction_map.remove(&ts).unwrap(); records.push((key, value_option)); - let ts = version_set.increase_ts(); + let ts = fn_increase_ts(); for (key, value_option) in records { is_excess = schema.recover_append(key, ts, value_option).await?; } @@ -529,13 +555,13 @@ where } }; if is_excess { - let _ = schema.compaction_tx.try_send(CompactTask::Freeze); + if let (Some(compaction_tx), true) = (&schema.compaction_tx, is_excess) { + let _ = compaction_tx.try_send(CompactTask::Freeze); + } } } } - schema.recover_wal_ids = Some(wal_ids); - - Ok(schema) + Ok(wal_ids) } async fn write(&self, log_ty: LogType, record: R, ts: Timestamp) -> Result> { @@ -614,6 +640,59 @@ where self.mutable.flush_wal().await?; Ok(()) } + + pub(crate) async fn checkpoint Path>( + &self, + from_fs: &Arc, + to_fs: &Arc, + option: &DbOption, + fn_copy: F, + ) -> Result<(), DbError> { + if let Some(wal_id) = self.mutable.wal_id().await { + Self::copy_wal(from_fs, to_fs, option, &fn_copy, &wal_id).await?; + } + for wal_id in self.immutables.iter().flat_map(|(wal_id, _)| wal_id) { + Self::copy_wal(from_fs, to_fs, option, &fn_copy, wal_id).await?; + } + + Ok(()) + } + + pub(crate) async fn from_checkpoint Timestamp>( + option: Arc>, + fs: &Arc, + fn_increase_ts: F, + record_instance: RecordInstance, + ) -> Result, DbError> { + // The Schema restored from checkpoint does not trigger compaction. + let trigger = Arc::new(LengthTrigger::new(usize::MAX)); + let mut schema = Schema { + mutable: Mutable::new(&option, trigger.clone(), fs).await?, + immutables: Default::default(), + compaction_tx: None, + recover_wal_ids: None, + trigger, + record_instance, + }; + let _ = Self::recover(option, fn_increase_ts, fs, &mut schema).await?; + + Ok(schema) + } + + async fn copy_wal Path>( + from_fs: &Arc, + to_fs: &Arc, + option: &DbOption, + fn_copy: &F, + wal_id: &FileId, + ) -> Result<(), DbError> { + let from_path = option.wal_path(wal_id); + let to_path = fn_copy(wal_id); + + copy(from_fs, &from_path, to_fs, &to_path).await?; + + Ok(()) + } } /// scan configuration intermediate structure @@ -858,7 +937,7 @@ pub(crate) mod tests { use crate::{ compaction::{CompactTask, CompactionError, Compactor}, executor::{tokio::TokioExecutor, Executor}, - fs::{manager::StoreManager, FileId}, + fs::manager::StoreManager, inmem::{immutable::tests::TestImmutableArrays, mutable::Mutable}, record::{ internal::InternalRecordRef, @@ -1132,7 +1211,7 @@ pub(crate) mod tests { option: Arc>, fs: &Arc, ) -> Result<(crate::Schema, Receiver), fusio::Error> { - let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); + let trigger = TriggerFactory::create(option.trigger_type); let mutable = Mutable::new(&option, trigger.clone(), fs).await?; @@ -1174,7 +1253,7 @@ pub(crate) mod tests { .unwrap(); let immutables = { - let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); + let trigger = TriggerFactory::create(option.trigger_type); let mutable: Mutable = Mutable::new(&option, trigger.clone(), fs).await?; @@ -1215,11 +1294,13 @@ pub(crate) mod tests { .await .unwrap(); + mutable.flush_wal().await.unwrap(); vec![( - Some(FileId::new()), + mutable.wal_id().await, Immutable::from((mutable.data, &RecordInstance::Normal)), )] }; + mutable.flush_wal().await.unwrap(); let (compaction_tx, compaction_rx) = bounded(1); @@ -1227,7 +1308,7 @@ pub(crate) mod tests { crate::Schema { mutable, immutables, - compaction_tx, + compaction_tx: Some(compaction_tx), recover_wal_ids: None, trigger, record_instance: RecordInstance::Normal, @@ -1593,11 +1674,11 @@ pub(crate) mod tests { let (task_tx, _task_rx) = bounded(1); - let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); + let trigger = TriggerFactory::create(option.trigger_type); let schema: crate::Schema = crate::Schema { mutable: Mutable::new(&option, trigger.clone(), &fs).await.unwrap(), immutables: Default::default(), - compaction_tx: task_tx.clone(), + compaction_tx: Some(task_tx.clone()), recover_wal_ids: None, trigger, record_instance: RecordInstance::Normal, @@ -1658,13 +1739,13 @@ pub(crate) mod tests { let (task_tx, _task_rx) = bounded(1); - let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); + let trigger = TriggerFactory::create(option.trigger_type); let schema: crate::Schema = crate::Schema { mutable: Mutable::new(&option, trigger.clone(), manager.base_fs()) .await .unwrap(), immutables: Default::default(), - compaction_tx: task_tx.clone(), + compaction_tx: Some(task_tx.clone()), recover_wal_ids: None, trigger, record_instance: RecordInstance::Normal, diff --git a/src/snapshot.rs b/src/snapshot.rs index ef28823c..d2462352 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -1,22 +1,39 @@ -use std::{collections::Bound, sync::Arc}; +use std::{collections::Bound, ops::Deref, sync::Arc}; use async_lock::RwLockReadGuard; use parquet::arrow::ProjectionMask; use crate::{ fs::manager::StoreManager, - record::Record, + record::{Record, RecordInstance}, stream, stream::ScanStream, timestamp::Timestamp, - version::{TransactionTs, VersionRef}, - DbError, Projection, Scan, Schema, + version::{set::VersionSet, TransactionTs, VersionRef}, + DbError, DbOption, Projection, Scan, Schema, }; +pub(crate) enum Share<'s, R: Record> { + ReadLock(RwLockReadGuard<'s, Schema>), + Arc(Arc>), +} + +impl<'s, R: Record> Deref for Share<'s, R> { + type Target = Schema; + + fn deref(&self) -> &Self::Target { + match self { + Share::ReadLock(schema) => schema.deref(), + Share::Arc(schema) => schema, + } + } +} + pub struct Snapshot<'s, R: Record> { ts: Timestamp, - share: RwLockReadGuard<'s, Schema>, + share: Share<'s, R>, version: VersionRef, + option: Arc>, manager: Arc, } @@ -53,15 +70,89 @@ impl<'s, R: Record> Snapshot<'s, R> { ) } + #[allow(dead_code)] + pub async fn checkpoint(&self, to_option: &Arc>) -> Result<(), DbError> { + let to_manager = + StoreManager::new(to_option.base_fs.clone(), to_option.level_paths.clone())?; + { + self.manager + .base_fs() + .create_dir_all(&to_option.wal_dir_path()) + .await + .map_err(DbError::Fusio)?; + self.manager + .base_fs() + .create_dir_all(&to_option.version_log_dir_path()) + .await + .map_err(DbError::Fusio)?; + } + + self.share + .checkpoint( + self.manager.base_fs(), + to_manager.base_fs(), + &self.option, + |wal_id| to_option.wal_path(wal_id), + ) + .await?; + self.version + .checkpoint(&self.manager, &self.option, &to_manager, to_option) + .await?; + + Ok(()) + } + + #[allow(dead_code)] + pub(crate) async fn from_checkpoint(option: Arc>) -> Result> { + let manager = Arc::new(StoreManager::new( + option.base_fs.clone(), + option.level_paths.clone(), + )?); + { + manager + .base_fs() + .create_dir_all(&option.wal_dir_path()) + .await + .map_err(DbError::Fusio)?; + manager + .base_fs() + .create_dir_all(&option.version_log_dir_path()) + .await + .map_err(DbError::Fusio)?; + } + + // FIXME: move to Version + let version_set = VersionSet::from_checkpoint(option.clone(), manager.clone()).await?; + let schema = Schema::from_checkpoint( + option.clone(), + manager.base_fs(), + || version_set.increase_ts(), + RecordInstance::Normal, + ) + .await?; + let version = version_set.current().await; + + let ts = version.load_ts(); + Ok(Snapshot { + ts, + share: Share::Arc(Arc::new(schema)), + version, + option, + manager, + }) + } + pub(crate) fn new( share: RwLockReadGuard<'s, Schema>, version: VersionRef, + option: Arc>, manager: Arc, ) -> Self { Self { ts: version.load_ts(), - share, + share: Share::ReadLock(share), version, + option, manager, } } @@ -109,7 +200,8 @@ mod tests { compaction::tests::build_version, executor::tokio::TokioExecutor, fs::manager::StoreManager, - tests::{build_db, build_schema}, + snapshot::Snapshot, + tests::{build_db, build_schema, Test}, version::TransactionTs, DbOption, }; @@ -152,8 +244,12 @@ mod tests { // to increase timestamps to 1 because the data ts built in advance is 1 db.version_set.increase_ts(); } - let mut snapshot = db.snapshot().await; + let snapshot = db.snapshot().await; + + test_snapshot_scan(snapshot).await; + } + async fn test_snapshot_scan<'a>(snapshot: Snapshot<'a, Test>) { let mut stream = snapshot .scan((Bound::Unbounded, Bound::Unbounded)) .projection(vec![1]) @@ -201,4 +297,55 @@ mod tests { let entry_14 = stream.next().await.unwrap().unwrap(); assert_eq!(entry_14.key().value, "funk"); } + + #[tokio::test] + async fn snapshot_checkpoint() { + let source_dir = TempDir::new().unwrap(); + let checkpoint_dir = TempDir::new().unwrap(); + let checkpoint_option = Arc::new(DbOption::from( + Path::from_filesystem_path(checkpoint_dir.path()).unwrap(), + )); + { + let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); + let option = Arc::new(DbOption::from( + Path::from_filesystem_path(source_dir.path()).unwrap(), + )); + + manager + .base_fs() + .create_dir_all(&option.version_log_dir_path()) + .await + .unwrap(); + manager + .base_fs() + .create_dir_all(&option.wal_dir_path()) + .await + .unwrap(); + + let (_, version) = build_version(&option, &manager).await; + let (schema, compaction_rx) = build_schema(option.clone(), manager.base_fs()) + .await + .unwrap(); + let db = build_db( + option, + compaction_rx, + TokioExecutor::new(), + schema, + version, + manager, + ) + .await + .unwrap(); + + let snapshot = db.snapshot().await; + snapshot.checkpoint(&checkpoint_option).await.unwrap(); + } + let mut snapshot = Snapshot::from_checkpoint(checkpoint_option).await.unwrap(); + { + // to increase timestamps to 1 because the data ts built in advance is 1 + snapshot.ts = 6.into(); + } + + test_snapshot_scan(snapshot).await; + } } diff --git a/src/stream/mem_projection.rs b/src/stream/mem_projection.rs index 6671e285..2c23305f 100644 --- a/src/stream/mem_projection.rs +++ b/src/stream/mem_projection.rs @@ -75,7 +75,7 @@ mod tests { fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); - let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); + let trigger = TriggerFactory::create(option.trigger_type); let mutable = Mutable::::new(&option, trigger, &fs).await.unwrap(); diff --git a/src/stream/merge.rs b/src/stream/merge.rs index f58583c6..0d05a028 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -175,7 +175,7 @@ mod tests { fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); - let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); + let trigger = TriggerFactory::create(option.trigger_type); let m1 = Mutable::::new(&option, trigger, &fs).await.unwrap(); @@ -189,7 +189,7 @@ mod tests { .await .unwrap(); - let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); + let trigger = TriggerFactory::create(option.trigger_type); let m2 = Mutable::::new(&option, trigger, &fs).await.unwrap(); m2.insert(LogType::Full, "a".into(), 1.into()) @@ -202,7 +202,7 @@ mod tests { .await .unwrap(); - let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); + let trigger = TriggerFactory::create(option.trigger_type); let m3 = Mutable::::new(&option, trigger, &fs).await.unwrap(); m3.insert(LogType::Full, "e".into(), 4.into()) @@ -269,7 +269,7 @@ mod tests { fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); - let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); + let trigger = TriggerFactory::create(option.trigger_type); let m1 = Mutable::::new(&option, trigger, &fs).await.unwrap(); m1.insert(LogType::Full, "1".into(), 0_u32.into()) @@ -355,7 +355,7 @@ mod tests { fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); - let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); + let trigger = TriggerFactory::create(option.trigger_type); let m1 = Mutable::::new(&option, trigger, &fs).await.unwrap(); m1.insert(LogType::Full, "1".into(), 0_u32.into()) diff --git a/src/stream/package.rs b/src/stream/package.rs index c8493e8f..f8b337b9 100644 --- a/src/stream/package.rs +++ b/src/stream/package.rs @@ -107,7 +107,7 @@ mod tests { fs.create_dir_all(&option.wal_dir_path()).await.unwrap(); - let trigger = Arc::new(TriggerFactory::create(option.trigger_type)); + let trigger = TriggerFactory::create(option.trigger_type); let m1 = Mutable::::new(&option, trigger, &fs).await.unwrap(); m1.insert( diff --git a/src/transaction.rs b/src/transaction.rs index 1bada127..fc71d4af 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -168,12 +168,8 @@ where Self::append(self.snapshot.schema(), LogType::Last, key, record, new_ts).await? } }; - if is_excess { - let _ = self - .snapshot - .schema() - .compaction_tx - .try_send(CompactTask::Freeze); + if let (Some(compaction_tx), true) = (&self.snapshot.schema().compaction_tx, is_excess) { + let _ = compaction_tx.try_send(CompactTask::Freeze); } Ok(()) } diff --git a/src/trigger.rs b/src/trigger.rs index 34d71de7..17ba660f 100644 --- a/src/trigger.rs +++ b/src/trigger.rs @@ -2,7 +2,10 @@ use std::{ fmt, fmt::Debug, marker::PhantomData, - sync::atomic::{AtomicUsize, Ordering}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, }; use crate::record::Record; @@ -78,10 +81,10 @@ pub(crate) struct TriggerFactory { } impl TriggerFactory { - pub fn create(trigger_type: TriggerType) -> Box + Send + Sync> { + pub fn create(trigger_type: TriggerType) -> Arc + Send + Sync> { match trigger_type { - TriggerType::SizeOfMem(threshold) => Box::new(SizeOfMemTrigger::new(threshold)), - TriggerType::Length(threshold) => Box::new(LengthTrigger::new(threshold)), + TriggerType::SizeOfMem(threshold) => Arc::new(SizeOfMemTrigger::new(threshold)), + TriggerType::Length(threshold) => Arc::new(LengthTrigger::new(threshold)), } } } diff --git a/src/version/mod.rs b/src/version/mod.rs index 37d15c65..fee8c7ad 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -11,7 +11,7 @@ use std::{ }; use flume::{SendError, Sender}; -use fusio::DynFs; +use fusio::{dynamic::fs::copy, DynFs, Write}; use parquet::arrow::ProjectionMask; use thiserror::Error; use tracing::error; @@ -25,7 +25,7 @@ use crate::{ stream::{level::LevelStream, record_batch::RecordBatchEntry, ScanStream}, timestamp::{Timestamp, TimestampedRef}, version::{cleaner::CleanTag, edit::VersionEdit}, - DbOption, + DbError, DbOption, }; pub(crate) const MAX_LEVEL: usize = 7; @@ -45,7 +45,7 @@ where { ts: Timestamp, pub(crate) level_slice: [Vec>; MAX_LEVEL], - clean_sender: Sender, + clean_sender: Option>, option: Arc>, timestamp: Arc, log_length: u32, @@ -64,13 +64,24 @@ where Version { ts: Timestamp::from(0), level_slice: [const { Vec::new() }; MAX_LEVEL], - clean_sender, + clean_sender: Some(clean_sender), option: option.clone(), timestamp, log_length: 0, } } + pub(crate) fn empty(option: Arc>) -> Self { + Version { + ts: Timestamp::from(0), + level_slice: [const { Vec::new() }; MAX_LEVEL], + clean_sender: None, + option: option.clone(), + timestamp: Default::default(), + log_length: 0, + } + } + pub(crate) fn option(&self) -> &Arc> { &self.option } @@ -292,6 +303,55 @@ where edits.push(VersionEdit::NewLogLength { len: 0 }); edits } + + pub(crate) async fn checkpoint( + &self, + from_manager: &StoreManager, + from_option: &DbOption, + to_manager: &StoreManager, + to_option: &DbOption, + ) -> Result<(), DbError> { + fn level_fs<'a, R: Record>( + store_manager: &'a StoreManager, + option: &DbOption, + level: usize, + ) -> &'a Arc { + option + .level_fs_path(level) + .map(|path| store_manager.get_fs(path)) + .unwrap_or(store_manager.base_fs()) + } + + for level in 0..MAX_LEVEL { + let from_level_fs = level_fs(from_manager, from_option, level); + let to_level_fs = level_fs(to_manager, to_option, level); + + for scope in self.level_slice[level].iter() { + let gen = &scope.gen; + let from_path = from_option.table_path(gen, level); + let to_path = to_option.table_path(gen, level); + + copy(from_level_fs, &from_path, to_level_fs, &to_path).await?; + } + } + let mut new_log = from_manager + .base_fs() + .open_options( + &to_option.version_log_path(&FileId::new()), + FileType::Log.open_options(false), + ) + .await?; + for new_edit in self.to_edits() { + new_edit + .encode(&mut new_log) + .await + .map_err(VersionError::Encode)?; + } + new_log.flush().await?; + new_log.close().await?; + + Ok(()) + } } impl Drop for Version @@ -299,8 +359,10 @@ where R: Record, { fn drop(&mut self) { - if let Err(err) = self.clean_sender.send(CleanTag::Clean { ts: self.ts }) { - error!("[Version Drop Error]: {}", err) + if let Some(clean_sender) = &self.clean_sender { + if let Err(err) = clean_sender.send(CleanTag::Clean { ts: self.ts }) { + error!("[Version Drop Error]: {}", err) + } } } } @@ -322,4 +384,6 @@ where UlidDecode(#[from] ulid::DecodeError), #[error("version send error: {0}")] Send(#[from] SendError), + #[error("checkpoint not found")] + CheckpointNotFound, } diff --git a/src/version/set.rs b/src/version/set.rs index 586ca89e..074ee1d2 100644 --- a/src/version/set.rs +++ b/src/version/set.rs @@ -58,7 +58,7 @@ where R: Record, { inner: Arc>>, - clean_sender: Sender, + clean_sender: Option>, timestamp: Arc, option: Arc>, manager: Arc, @@ -153,14 +153,14 @@ where current: Arc::new(Version:: { ts: Timestamp::from(0), level_slice: [const { Vec::new() }; MAX_LEVEL], - clean_sender: clean_sender.clone(), + clean_sender: Some(clean_sender.clone()), option: option.clone(), timestamp: timestamp.clone(), log_length: 0, }), log_with_id: (log, log_id), })), - clean_sender, + clean_sender: Some(clean_sender), timestamp, option, manager, @@ -174,6 +174,50 @@ where self.inner.read().await.current.clone() } + pub(crate) fn options(&self) -> &Arc> { + &self.option + } + + pub(crate) async fn from_checkpoint( + option: Arc>, + manager: Arc, + ) -> Result, VersionError> { + let fs = manager.base_fs(); + let version_dir = option.version_log_dir_path(); + let mut log_stream = fs.list(&version_dir).await?; + let log_id = log_stream + .next() + .await + .transpose()? + .map(|meta| parse_file_id(&meta.path, FileType::Log)) + .transpose()? + .flatten() + .ok_or(VersionError::CheckpointNotFound)?; + drop(log_stream); + let mut log = manager + .base_fs() + .open_options( + &option.version_log_path(&log_id), + FileType::Log.open_options(false), + ) + .await?; + + let edits = VersionEdit::recover(&mut Cursor::new(&mut log)).await; + let set = VersionSet:: { + inner: Arc::new(RwLock::new(VersionSetInner { + current: Arc::new(Version::::empty(option.clone())), + log_with_id: (log, log_id), + })), + clean_sender: None, + timestamp: Arc::new(AtomicU32::default()), + option, + manager, + }; + set.apply_edits(edits, None, true).await?; + + Ok(set) + } + pub(crate) async fn apply_edits( &self, mut version_edits: Vec>, @@ -229,14 +273,15 @@ where } if is_recover { // issue: https://github.com/tonbo-io/tonbo/issues/123 - new_version - .clean_sender - .send_async(CleanTag::RecoverClean { - wal_id: gen, - level: level as usize, - }) - .await - .map_err(VersionError::Send)?; + if let Some(clean_sender) = &new_version.clean_sender { + clean_sender + .send_async(CleanTag::RecoverClean { + wal_id: gen, + level: level as usize, + }) + .await + .map_err(VersionError::Send)?; + } } } VersionEdit::LatestTimeStamp { ts } => { @@ -251,14 +296,15 @@ where } } if let Some(delete_gens) = delete_gens { - new_version - .clean_sender - .send_async(CleanTag::Add { - ts: new_version.ts, - gens: delete_gens, - }) - .await - .map_err(VersionError::Send)?; + if let Some(clean_sender) = &new_version.clean_sender { + clean_sender + .send_async(CleanTag::Add { + ts: new_version.ts, + gens: delete_gens, + }) + .await + .map_err(VersionError::Send)?; + } } log.close().await?; if edit_len >= option.version_log_snapshot_threshold { @@ -332,7 +378,7 @@ pub(crate) mod tests { current: Arc::new(version), log_with_id: (log, log_id), })), - clean_sender, + clean_sender: Some(clean_sender), timestamp, option, manager,