Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl checkpoint for SnapShot #219

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,19 @@ crc32fast = "1"
crossbeam-skiplist = "0.1"
datafusion = { version = "42", optional = true }
flume = { version = "0.11", features = ["async"] }
fusio = { package = "fusio", version = "0.3.3", features = [
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "a86969d0564f010f4f1ab136deae4cbe638d8f37", package = "fusio", version = "0.3.3", features = [
"aws",
"dyn",
"fs",
"object_store",
"tokio",
"tokio-http",
] }
fusio-dispatch = { package = "fusio-dispatch", version = "0.2.1", features = [
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "a86969d0564f010f4f1ab136deae4cbe638d8f37", package = "fusio-dispatch", version = "0.2.1", features = [
"aws",
"tokio",
] }
fusio-parquet = { package = "fusio-parquet", version = "0.2.1" }
fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "a86969d0564f010f4f1ab136deae4cbe638d8f37", package = "fusio-parquet", version = "0.2.1" }
futures-core = "0.3"
futures-io = "0.3"
futures-util = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ crate-type = ["cdylib"]
[workspace]

[dependencies]
fusio = { package = "fusio", version = "0.3.1", features = ["aws", "tokio"] }
fusio-dispatch = { package = "fusio-dispatch", version = "0.2.0", features = [
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "a86969d0564f010f4f1ab136deae4cbe638d8f37", package = "fusio", version = "0.3.1", features = ["aws", "tokio"] }
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "a86969d0564f010f4f1ab136deae4cbe638d8f37", package = "fusio-dispatch", version = "0.2.0", features = [
"aws",
"tokio",
] }
Expand Down
2 changes: 1 addition & 1 deletion src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ pub(crate) mod tests {
where
R: Record + Send,
{
let trigger = Arc::new(TriggerFactory::create(option.trigger_type));
let trigger = TriggerFactory::create(option.trigger_type);

let mutable: Mutable<R> = Mutable::new(option, trigger, fs).await?;

Expand Down
19 changes: 18 additions & 1 deletion src/fs/manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::HashMap, sync::Arc};

use fusio::{dynamic::DynFs, path::Path, Error};
use fusio::{dynamic, dynamic::DynFs, path::Path, Error};
use fusio_dispatch::FsOptions;

pub struct StoreManager {
Expand Down Expand Up @@ -32,4 +32,21 @@ impl StoreManager {
}
}

pub async fn copy(
from_fs: &Arc<dyn DynFs>,
from_path: &Path,
to_fs: &Arc<dyn DynFs>,
to_path: &Path,
) -> Result<(), Error> {
if from_fs.file_system() == to_fs.file_system() {
match from_fs.link(from_path, to_path).await {
Err(Error::Unsupported { .. }) => (),
result => return result,
}
}
dynamic::fs::copy(from_fs, from_path, to_fs, to_path).await?;

Ok(())
}

// TODO: TestCases
2 changes: 1 addition & 1 deletion src/inmem/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
timestamp::{Timestamp, Timestamped, TimestampedRef, EPOCH},
};

pub trait ArrowArrays: Sized + Sync {
pub trait ArrowArrays: Sized + Send + Sync {
type Record: Record;

type Builder: Builder<Self>;
Expand Down
18 changes: 13 additions & 5 deletions src/inmem/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ where
{
pub(crate) data: SkipMap<Timestamped<R::Key>, Option<R>>,
wal: Option<Mutex<WalFile<Box<dyn DynWrite>, R>>>,
pub(crate) trigger: Arc<Box<dyn Trigger<R> + Send + Sync>>,
pub(crate) trigger: Arc<dyn Trigger<R> + Send + Sync>,
}

impl<R> Mutable<R>
Expand All @@ -47,7 +47,7 @@ where
{
pub async fn new(
option: &DbOption<R>,
trigger: Arc<Box<dyn Trigger<R> + Send + Sync>>,
trigger: Arc<dyn Trigger<R> + Send + Sync>,
fs: &Arc<dyn DynFs>,
) -> Result<Self, fusio::Error> {
let mut wal = None;
Expand Down Expand Up @@ -193,6 +193,14 @@ where
}
Ok(())
}

pub(crate) async fn wal_id(&self) -> Option<FileId> {
if let Some(wal) = self.wal.as_ref() {
let wal_guard = wal.lock().await;
return Some(wal_guard.file_id());
}
None
}
}

impl<R> Mutable<R>
Expand Down Expand Up @@ -231,7 +239,7 @@ mod tests {
let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap());
fs.create_dir_all(&option.wal_dir_path()).await.unwrap();

let trigger = Arc::new(TriggerFactory::create(option.trigger_type));
let trigger = TriggerFactory::create(option.trigger_type);
let mem_table = Mutable::<Test>::new(&option, trigger, &fs).await.unwrap();

mem_table
Expand Down Expand Up @@ -279,7 +287,7 @@ mod tests {
let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap());
fs.create_dir_all(&option.wal_dir_path()).await.unwrap();

let trigger = Arc::new(TriggerFactory::create(option.trigger_type));
let trigger = TriggerFactory::create(option.trigger_type);

let mutable = Mutable::<String>::new(&option, trigger, &fs).await.unwrap();

Expand Down Expand Up @@ -367,7 +375,7 @@ mod tests {
let fs = Arc::new(TokioFs) as Arc<dyn DynFs>;
fs.create_dir_all(&option.wal_dir_path()).await.unwrap();

let trigger = Arc::new(TriggerFactory::create(option.trigger_type));
let trigger = TriggerFactory::create(option.trigger_type);

let mutable = Mutable::<DynRecord>::new(&option, trigger, &fs)
.await
Expand Down
Loading
Loading