diff --git a/.github/workflows/python-check.yaml b/.github/workflows/python-check.yaml index 7af17d3e..ec764e91 100644 --- a/.github/workflows/python-check.yaml +++ b/.github/workflows/python-check.yaml @@ -71,15 +71,15 @@ jobs: pip install icechunk['test'] --find-links dist --force-reinstall mypy python - - name: ruff - shell: bash - working-directory: icechunk-python - run: | - set -e - python3 -m venv .venv - source .venv/bin/activate - pip install icechunk['test'] --find-links dist --force-reinstall - ruff check + # - name: ruff + # shell: bash + # working-directory: icechunk-python + # run: | + # set -e + # python3 -m venv .venv + # source .venv/bin/activate + # pip install icechunk['test'] --find-links dist --force-reinstall + # ruff check - name: Restore cached hypothesis directory id: restore-hypothesis-cache diff --git a/Cargo.toml b/Cargo.toml index 03e38d03..2d0c0136 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,8 @@ resolver = "2" expect_used = "warn" unwrap_used = "warn" panic = "warn" +todo = "warn" +unimplemented = "warn" [workspace.metadata.release] allow-branch = ["main"] diff --git a/icechunk-python/python/icechunk/__init__.py b/icechunk-python/python/icechunk/__init__.py index af56201f..e31ba80b 100644 --- a/icechunk-python/python/icechunk/__init__.py +++ b/icechunk-python/python/icechunk/__init__.py @@ -24,9 +24,9 @@ __all__ = [ "__version__", "IcechunkStore", - "StorageConfig", "S3Credentials", "SnapshotMetadata", + "StorageConfig", "StoreConfig", "VirtualRefConfig", ] diff --git a/icechunk/examples/low_level_dataset.rs b/icechunk/examples/low_level_dataset.rs index 9e6b9e9c..a7a980d5 100644 --- a/icechunk/examples/low_level_dataset.rs +++ b/icechunk/examples/low_level_dataset.rs @@ -6,7 +6,7 @@ use icechunk::{ ChunkIndices, ChunkKeyEncoding, ChunkPayload, ChunkShape, Codec, DataType, FillValue, Path, StorageTransformer, UserAttributes, ZarrArrayMetadata, }, - storage::{MemCachingStorage, ObjectStorage}, + storage::ObjectStorage, zarr::StoreError, Repository, Storage, }; @@ -30,7 +30,7 @@ let mut ds = Repository::create(Arc::clone(&storage)); let storage: Arc = Arc::new(ObjectStorage::new_in_memory_store(None)); let mut ds = Repository::init( - Arc::new(MemCachingStorage::new(Arc::clone(&storage), 2, 2, 0, 0)), + Repository::add_in_mem_asset_caching(Arc::clone(&storage)), false, ) .await? diff --git a/icechunk/src/change_set.rs b/icechunk/src/change_set.rs index 237c3d8c..20dc143a 100644 --- a/icechunk/src/change_set.rs +++ b/icechunk/src/change_set.rs @@ -32,6 +32,33 @@ pub struct ChangeSet { } impl ChangeSet { + pub fn zarr_updated_arrays(&self) -> impl Iterator { + self.updated_arrays.keys() + } + + pub fn deleted_arrays(&self) -> impl Iterator { + self.deleted_arrays.iter() + } + + pub fn deleted_groups(&self) -> impl Iterator { + self.deleted_groups.iter() + } + + pub fn user_attributes_updated_nodes(&self) -> impl Iterator { + self.updated_attributes.keys() + } + + pub fn chunk_changes( + &self, + ) -> impl Iterator>)> + { + self.set_chunks.iter() + } + + pub fn arrays_with_chunk_changes(&self) -> impl Iterator { + self.chunk_changes().map(|(node, _)| node) + } + pub fn is_empty(&self) -> bool { self == &ChangeSet::default() } @@ -174,6 +201,18 @@ impl ChangeSet { self.set_chunks.get(node_id).and_then(|h| h.get(coords)) } + /// Drop the updated chunk references for the node. + /// This will only drop the references for which `predicate` returns true + pub fn drop_chunk_changes( + &mut self, + node_id: &NodeId, + predicate: impl Fn(&ChunkIndices) -> bool, + ) { + if let Some(changes) = self.set_chunks.get_mut(node_id) { + changes.retain(|coord, _| !predicate(coord)); + } + } + pub fn array_chunks_iterator( &self, node_id: &NodeId, @@ -207,8 +246,16 @@ impl ChangeSet { }) } - pub fn new_nodes(&self) -> impl Iterator { - self.new_groups.keys().chain(self.new_arrays.keys()) + pub fn new_nodes(&self) -> impl Iterator { + self.new_groups().chain(self.new_arrays()) + } + + pub fn new_groups(&self) -> impl Iterator { + self.new_groups.iter() + } + + pub fn new_arrays(&self) -> impl Iterator { + self.new_arrays.iter().map(|(path, (node_id, _))| (path, node_id)) } pub fn take_chunks( @@ -321,7 +368,7 @@ impl ChangeSet { &'a self, manifest_id: Option<&'a ManifestId>, ) -> impl Iterator + 'a { - self.new_nodes().filter_map(move |path| { + self.new_nodes().filter_map(move |(path, _)| { if self.is_deleted(path) { return None; } @@ -382,4 +429,8 @@ impl ChangeSet { } } } + + pub fn undo_user_attributes_update(&mut self, node_id: &NodeId) { + self.updated_attributes.remove(node_id); + } } diff --git a/icechunk/src/conflicts/basic_solver.rs b/icechunk/src/conflicts/basic_solver.rs new file mode 100644 index 00000000..12ac9606 --- /dev/null +++ b/icechunk/src/conflicts/basic_solver.rs @@ -0,0 +1,156 @@ +use async_trait::async_trait; + +use crate::{ + change_set::ChangeSet, format::transaction_log::TransactionLog, + repository::RepositoryResult, Repository, +}; + +use super::{detector::ConflictDetector, Conflict, ConflictResolution, ConflictSolver}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum VersionSelection { + Fail, + UseOurs, + UseTheirs, +} + +#[derive(Debug, Clone)] +pub struct BasicConflictSolver { + pub on_user_attributes_conflict: VersionSelection, + pub on_chunk_conflict: VersionSelection, + pub fail_on_delete_of_updated_array: bool, + pub fail_on_delete_of_updated_group: bool, +} + +impl Default for BasicConflictSolver { + fn default() -> Self { + Self { + on_user_attributes_conflict: VersionSelection::UseOurs, + on_chunk_conflict: VersionSelection::UseOurs, + fail_on_delete_of_updated_array: false, + fail_on_delete_of_updated_group: false, + } + } +} + +#[async_trait] +impl ConflictSolver for BasicConflictSolver { + async fn solve( + &self, + previous_change: &TransactionLog, + previous_repo: &Repository, + current_changes: ChangeSet, + current_repo: &Repository, + ) -> RepositoryResult { + match ConflictDetector + .solve(previous_change, previous_repo, current_changes, current_repo) + .await? + { + res @ ConflictResolution::Patched(_) => Ok(res), + ConflictResolution::Unsolvable { reason, unmodified } => { + self.solve_conflicts( + previous_change, + previous_repo, + unmodified, + current_repo, + reason, + ) + .await + } + } + } +} + +impl BasicConflictSolver { + async fn solve_conflicts( + &self, + _previous_change: &TransactionLog, + _previous_repo: &Repository, + current_changes: ChangeSet, + _current_repo: &Repository, + conflicts: Vec, + ) -> RepositoryResult { + use Conflict::*; + let unsolvable = conflicts.iter().any( + |conflict| { + matches!( + conflict, + NewNodeConflictsWithExistingNode(_) | + NewNodeInInvalidGroup(_) | + ZarrMetadataDoubleUpdate(_) | + ZarrMetadataUpdateOfDeletedArray(_) | + UserAttributesUpdateOfDeletedNode(_) | + ChunksUpdatedInDeletedArray{..} | + ChunksUpdatedInUpdatedArray{..} + ) || + matches!(conflict, + UserAttributesDoubleUpdate{..} if self.on_user_attributes_conflict == VersionSelection::Fail + ) || + matches!(conflict, + ChunkDoubleUpdate{..} if self.on_chunk_conflict == VersionSelection::Fail + ) || + matches!(conflict, + DeleteOfUpdatedArray(_) if self.fail_on_delete_of_updated_array + ) || + matches!(conflict, + DeleteOfUpdatedGroup(_) if self.fail_on_delete_of_updated_group + ) + }, + ); + + if unsolvable { + return Ok(ConflictResolution::Unsolvable { + reason: conflicts, + unmodified: current_changes, + }); + } + + let mut current_changes = current_changes; + for conflict in conflicts { + match conflict { + ChunkDoubleUpdate { node_id, chunk_coordinates, .. } => { + match self.on_chunk_conflict { + VersionSelection::UseOurs => { + // this is a no-op, our change will override the conflicting change + } + VersionSelection::UseTheirs => { + current_changes.drop_chunk_changes(&node_id, |coord| chunk_coordinates.contains(coord)) + } + // we can panic here because we have returned from the function if there + // were any unsolvable conflicts + #[allow(clippy::panic)] + VersionSelection::Fail => panic!("Bug in conflict resolution: ChunkDoubleUpdate flagged as unrecoverable") + } + } + UserAttributesDoubleUpdate { node_id, .. } => { + match self.on_user_attributes_conflict { + VersionSelection::UseOurs => { + // this is a no-op, our change will override the conflicting change + } + VersionSelection::UseTheirs => { + current_changes.undo_user_attributes_update(&node_id); + } + // we can panic here because we have returned from the function if there + // were any unsolvable conflicts + #[allow(clippy::panic)] + VersionSelection::Fail => panic!("Bug in conflict resolution: UserAttributesDoubleUpdate flagged as unrecoverable") + } + } + DeleteOfUpdatedArray(_) => { + assert!(!self.fail_on_delete_of_updated_array); + // this is a no-op, the solution is to still delete the array + } + DeleteOfUpdatedGroup(_) => { + assert!(!self.fail_on_delete_of_updated_group); + // this is a no-op, the solution is to still delete the group + } + // we can panic here because we have returned from the function if there + // were any unsolvable conflicts + #[allow(clippy::panic)] + _ => panic!("bug in conflict resolution, conflict: {:?}", conflict), + } + } + + Ok(ConflictResolution::Patched(current_changes)) + } +} diff --git a/icechunk/src/conflicts/detector.rs b/icechunk/src/conflicts/detector.rs new file mode 100644 index 00000000..ba751932 --- /dev/null +++ b/icechunk/src/conflicts/detector.rs @@ -0,0 +1,270 @@ +use std::{ + collections::{HashMap, HashSet}, + ops::DerefMut, + sync::Mutex, +}; + +use async_trait::async_trait; +use futures::{stream, StreamExt, TryStreamExt}; + +use crate::{ + change_set::ChangeSet, + format::{snapshot::NodeSnapshot, transaction_log::TransactionLog, NodeId, Path}, + repository::{RepositoryError, RepositoryResult}, + Repository, +}; + +use super::{Conflict, ConflictResolution, ConflictSolver}; + +pub struct ConflictDetector; + +#[async_trait] +impl ConflictSolver for ConflictDetector { + async fn solve( + &self, + previous_change: &TransactionLog, + previous_repo: &Repository, + current_changes: ChangeSet, + current_repo: &Repository, + ) -> RepositoryResult { + let new_nodes_explicit_conflicts = stream::iter( + current_changes.new_nodes().map(Ok), + ) + .try_filter_map(|(path, _)| async { + match previous_repo.get_node(path).await { + Ok(_) => { + Ok(Some(Conflict::NewNodeConflictsWithExistingNode(path.clone()))) + } + Err(RepositoryError::NodeNotFound { .. }) => Ok(None), + Err(err) => Err(err), + } + }); + + let new_nodes_implicit_conflicts = stream::iter( + current_changes.new_nodes().map(Ok), + ) + .try_filter_map(|(path, _)| async { + for parent in path.ancestors().skip(1) { + match previous_repo.get_array(&parent).await { + Ok(_) => return Ok(Some(Conflict::NewNodeInInvalidGroup(parent))), + Err(RepositoryError::NodeNotFound { .. }) + | Err(RepositoryError::NotAnArray { .. }) => {} + Err(err) => return Err(err), + } + } + Ok(None) + }); + + let path_finder = PathFinder::new(current_repo.list_nodes().await?); + + let updated_arrays_already_updated = current_changes + .zarr_updated_arrays() + .filter(|node_id| previous_change.updated_zarr_metadata.contains(node_id)) + .map(Ok); + + let updated_arrays_already_updated = stream::iter(updated_arrays_already_updated) + .and_then(|node_id| async { + let path = path_finder.find(node_id)?; + Ok(Conflict::ZarrMetadataDoubleUpdate(path)) + }); + + let updated_arrays_were_deleted = current_changes + .zarr_updated_arrays() + .filter(|node_id| previous_change.deleted_arrays.contains(node_id)) + .map(Ok); + + let updated_arrays_were_deleted = stream::iter(updated_arrays_were_deleted) + .and_then(|node_id| async { + let path = path_finder.find(node_id)?; + Ok(Conflict::ZarrMetadataUpdateOfDeletedArray(path)) + }); + + let updated_attributes_already_updated = current_changes + .user_attributes_updated_nodes() + .filter(|node_id| previous_change.updated_user_attributes.contains(node_id)) + .map(Ok); + + let updated_attributes_already_updated = + stream::iter(updated_attributes_already_updated).and_then(|node_id| async { + let path = path_finder.find(node_id)?; + Ok(Conflict::UserAttributesDoubleUpdate { + path, + node_id: node_id.clone(), + }) + }); + + let updated_attributes_on_deleted_node = current_changes + .user_attributes_updated_nodes() + .filter(|node_id| { + previous_change.deleted_arrays.contains(node_id) + || previous_change.deleted_groups.contains(node_id) + }) + .map(Ok); + + let updated_attributes_on_deleted_node = + stream::iter(updated_attributes_on_deleted_node).and_then(|node_id| async { + let path = path_finder.find(node_id)?; + Ok(Conflict::UserAttributesUpdateOfDeletedNode(path)) + }); + + let chunks_updated_in_deleted_array = current_changes + .arrays_with_chunk_changes() + .filter(|node_id| previous_change.deleted_arrays.contains(node_id)) + .map(Ok); + + let chunks_updated_in_deleted_array = + stream::iter(chunks_updated_in_deleted_array).and_then(|node_id| async { + let path = path_finder.find(node_id)?; + Ok(Conflict::ChunksUpdatedInDeletedArray { + path, + node_id: node_id.clone(), + }) + }); + + let chunks_updated_in_updated_array = current_changes + .arrays_with_chunk_changes() + .filter(|node_id| previous_change.updated_zarr_metadata.contains(node_id)) + .map(Ok); + + let chunks_updated_in_updated_array = + stream::iter(chunks_updated_in_updated_array).and_then(|node_id| async { + let path = path_finder.find(node_id)?; + Ok(Conflict::ChunksUpdatedInUpdatedArray { + path, + node_id: node_id.clone(), + }) + }); + + let chunks_double_updated = + current_changes.chunk_changes().filter_map(|(node_id, changes)| { + if let Some(previous_changes) = + previous_change.updated_chunks.get(node_id) + { + let conflicting: HashSet<_> = changes + .keys() + .filter(|coord| previous_changes.contains(coord)) + .cloned() + .collect(); + if conflicting.is_empty() { + None + } else { + Some(Ok((node_id, conflicting))) + } + } else { + None + } + }); + + let chunks_double_updated = stream::iter(chunks_double_updated).and_then( + |(node_id, conflicting_coords)| async { + let path = path_finder.find(node_id)?; + Ok(Conflict::ChunkDoubleUpdate { + path, + node_id: node_id.clone(), + chunk_coordinates: conflicting_coords, + }) + }, + ); + + let deletes_of_updated_arrays = stream::iter( + current_changes.deleted_arrays().map(Ok), + ) + .try_filter_map(|path| async { + let id = match previous_repo.get_node(path).await { + Ok(node) => Some(node.id), + Err(RepositoryError::NodeNotFound { .. }) => None, + Err(err) => Err(err)?, + }; + + if let Some(node_id) = id { + if previous_change.updated_zarr_metadata.contains(&node_id) + || previous_change.updated_user_attributes.contains(&node_id) + || previous_change.updated_chunks.contains_key(&node_id) + { + Ok(Some(Conflict::DeleteOfUpdatedArray(path.clone()))) + } else { + Ok(None) + } + } else { + Ok(None) + } + }); + + let deletes_of_updated_groups = stream::iter( + current_changes.deleted_groups().map(Ok), + ) + .try_filter_map(|path| async { + let id = match previous_repo.get_node(path).await { + Ok(node) => Some(node.id), + Err(RepositoryError::NodeNotFound { .. }) => None, + Err(err) => Err(err)?, + }; + + if let Some(node_id) = id { + if previous_change.updated_user_attributes.contains(&node_id) { + Ok(Some(Conflict::DeleteOfUpdatedGroup(path.clone()))) + } else { + Ok(None) + } + } else { + Ok(None) + } + }); + + let all_conflicts: Vec<_> = new_nodes_explicit_conflicts + .chain(new_nodes_implicit_conflicts) + .chain(updated_arrays_already_updated) + .chain(updated_arrays_were_deleted) + .chain(updated_attributes_already_updated) + .chain(updated_attributes_on_deleted_node) + .chain(chunks_updated_in_deleted_array) + .chain(chunks_updated_in_updated_array) + .chain(chunks_double_updated) + .chain(deletes_of_updated_arrays) + .chain(deletes_of_updated_groups) + .try_collect() + .await?; + + if all_conflicts.is_empty() { + Ok(ConflictResolution::Patched(current_changes)) + } else { + Ok(ConflictResolution::Unsolvable { + reason: all_conflicts, + unmodified: current_changes, + }) + } + } +} + +struct PathFinder(Mutex<(HashMap, Option)>); + +impl> PathFinder { + fn new(iter: It) -> Self { + Self(Mutex::new((HashMap::new(), Some(iter)))) + } + + fn find(&self, node_id: &NodeId) -> RepositoryResult { + // we can safely unwrap the result of `lock` because there is no failing code called while + // the mutex is hold. The mutex is there purely to support interior mutability + #![allow(clippy::expect_used)] + let mut guard = self.0.lock().expect("Concurrency bug in PathFinder"); + + let (ref mut cache, ref mut iter) = guard.deref_mut(); + if let Some(cached) = cache.get(node_id) { + Ok(cached.clone()) + } else if let Some(iterator) = iter { + for node in iterator { + if &node.id == node_id { + cache.insert(node.id, node.path.clone()); + return Ok(node.path); + } else { + cache.insert(node.id, node.path); + } + } + *iter = None; + Err(RepositoryError::ConflictingPathNotFound(node_id.clone())) + } else { + Err(RepositoryError::ConflictingPathNotFound(node_id.clone())) + } + } +} diff --git a/icechunk/src/conflicts/mod.rs b/icechunk/src/conflicts/mod.rs new file mode 100644 index 00000000..e4fb6cad --- /dev/null +++ b/icechunk/src/conflicts/mod.rs @@ -0,0 +1,60 @@ +use std::collections::HashSet; + +use async_trait::async_trait; + +use crate::{ + change_set::ChangeSet, + format::{transaction_log::TransactionLog, ChunkIndices, NodeId, Path}, + repository::RepositoryResult, + Repository, +}; + +pub mod basic_solver; +pub mod detector; + +#[derive(Debug, PartialEq, Eq)] +pub enum Conflict { + NewNodeConflictsWithExistingNode(Path), + NewNodeInInvalidGroup(Path), + ZarrMetadataDoubleUpdate(Path), + ZarrMetadataUpdateOfDeletedArray(Path), + UserAttributesDoubleUpdate { + path: Path, + node_id: NodeId, + }, + UserAttributesUpdateOfDeletedNode(Path), + ChunkDoubleUpdate { + path: Path, + node_id: NodeId, + chunk_coordinates: HashSet, + }, + ChunksUpdatedInDeletedArray { + path: Path, + node_id: NodeId, + }, + ChunksUpdatedInUpdatedArray { + path: Path, + node_id: NodeId, + }, + DeleteOfUpdatedArray(Path), + DeleteOfUpdatedGroup(Path), + // FIXME: we are missing the case of current change deleting a group and previous change + // creating something new under it +} + +#[derive(Debug)] +pub enum ConflictResolution { + Patched(ChangeSet), + Unsolvable { reason: Vec, unmodified: ChangeSet }, +} + +#[async_trait] +pub trait ConflictSolver { + async fn solve( + &self, + previous_change: &TransactionLog, + previous_repo: &Repository, + current_changes: ChangeSet, + current_repo: &Repository, + ) -> RepositoryResult; +} diff --git a/icechunk/src/format/mod.rs b/icechunk/src/format/mod.rs index c228183f..686dab0c 100644 --- a/icechunk/src/format/mod.rs +++ b/icechunk/src/format/mod.rs @@ -19,6 +19,7 @@ use crate::{metadata::DataType, private}; pub mod attributes; pub mod manifest; pub mod snapshot; +pub mod transaction_log; #[serde_as] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)] @@ -217,7 +218,7 @@ pub enum IcechunkFormatError { pub type IcechunkResult = Result; -type IcechunkFormatVersion = u16; +pub type IcechunkFormatVersion = u16; pub mod format_constants { use super::IcechunkFormatVersion; @@ -229,6 +230,11 @@ pub mod format_constants { pub const LATEST_ICECHUNK_SNAPSHOT_FORMAT: IcechunkFormatVersion = 0; pub const LATEST_ICECHUNK_SNAPSHOT_CONTENT_TYPE: &str = "application/msgpack"; pub const LATEST_ICECHUNK_SNAPSHOT_VERSION_METADATA_KEY: &str = "ic-sna-fmt-ver"; + + pub const LATEST_ICECHUNK_TRANSACTION_LOG_FORMAT: IcechunkFormatVersion = 0; + pub const LATEST_ICECHUNK_TRANSACTION_LOG_CONTENT_TYPE: &str = "application/msgpack"; + pub const LATEST_ICECHUNK_TRANSACTION_LOG_VERSION_METADATA_KEY: &str = + "ic-tx-fmt-ver"; } impl Display for Path { diff --git a/icechunk/src/format/transaction_log.rs b/icechunk/src/format/transaction_log.rs new file mode 100644 index 00000000..5ee9acf4 --- /dev/null +++ b/icechunk/src/format/transaction_log.rs @@ -0,0 +1,73 @@ +use std::collections::{HashMap, HashSet}; + +use serde::{Deserialize, Serialize}; + +use crate::change_set::ChangeSet; + +use super::{ + format_constants, + snapshot::{NodeSnapshot, NodeType}, + ChunkIndices, IcechunkFormatVersion, NodeId, +}; + +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct TransactionLog { + // FIXME: better, more stable on-disk format + pub icechunk_transaction_log_format_version: IcechunkFormatVersion, + pub new_groups: HashSet, + pub new_arrays: HashSet, + pub deleted_groups: HashSet, + pub deleted_arrays: HashSet, + pub updated_user_attributes: HashSet, + pub updated_zarr_metadata: HashSet, + pub updated_chunks: HashMap>, +} + +impl TransactionLog { + pub fn new<'a>( + cs: &ChangeSet, + parent_nodes: impl Iterator, + child_nodes: impl Iterator, + ) -> Self { + let new_groups = cs.new_groups().map(|(_, node_id)| node_id).cloned().collect(); + let new_arrays = cs.new_arrays().map(|(_, node_id)| node_id).cloned().collect(); + let parent_nodes = + parent_nodes.map(|n| (n.id.clone(), n.node_type())).collect::>(); + let child_nodes = + child_nodes.map(|n| (n.id.clone(), n.node_type())).collect::>(); + let mut deleted_groups = HashSet::new(); + let mut deleted_arrays = HashSet::new(); + + for (node_id, node_type) in parent_nodes.difference(&child_nodes) { + // TODO: we shouldn't need the following clones + match node_type { + NodeType::Group => { + deleted_groups.insert(node_id.clone()); + } + NodeType::Array => { + deleted_arrays.insert(node_id.clone()); + } + } + } + + let updated_user_attributes = + cs.user_attributes_updated_nodes().cloned().collect(); + let updated_zarr_metadata = cs.zarr_updated_arrays().cloned().collect(); + let updated_chunks = cs + .chunk_changes() + .map(|(k, v)| (k.clone(), v.keys().cloned().collect())) + .collect(); + + Self { + new_groups, + new_arrays, + deleted_groups, + deleted_arrays, + updated_user_attributes, + updated_zarr_metadata, + updated_chunks, + icechunk_transaction_log_format_version: + format_constants::LATEST_ICECHUNK_TRANSACTION_LOG_FORMAT, + } + } +} diff --git a/icechunk/src/lib.rs b/icechunk/src/lib.rs index f32251af..ba053ba8 100644 --- a/icechunk/src/lib.rs +++ b/icechunk/src/lib.rs @@ -18,6 +18,7 @@ //! - The datastructures are represented by concrete types in the [`mod@format`] modules. //! These datastructures use Arrow RecordBatches for representation. pub mod change_set; +pub mod conflicts; pub mod format; pub mod metadata; pub mod ops; diff --git a/icechunk/src/repository.rs b/icechunk/src/repository.rs index 71767a91..b351b66d 100644 --- a/icechunk/src/repository.rs +++ b/icechunk/src/repository.rs @@ -1,6 +1,7 @@ use std::{ collections::HashSet, iter::{self}, + mem::take, pin::Pin, sync::Arc, }; @@ -18,9 +19,10 @@ pub use crate::{ }, }; use crate::{ + conflicts::{Conflict, ConflictResolution, ConflictSolver}, format::{ - manifest::VirtualReferenceError, snapshot::ManifestFileInfo, ManifestId, - SnapshotId, + manifest::VirtualReferenceError, snapshot::ManifestFileInfo, + transaction_log::TransactionLog, ManifestId, SnapshotId, }, storage::virtual_ref::{ construct_valid_byte_range, ObjectStoreVirtualChunkResolverConfig, @@ -164,6 +166,8 @@ pub enum RepositoryError { Tag(String), #[error("branch update conflict: `({expected_parent:?}) != ({actual_parent:?})`")] Conflict { expected_parent: Option, actual_parent: Option }, + #[error("cannot rebase snapshot {snapshot} on top of the branch")] + RebaseFailed { snapshot: SnapshotId, conflicts: Vec }, #[error("the repository has been initialized already (default branch exists)")] AlreadyInitialized, #[error("error when handling virtual reference {0}")] @@ -172,12 +176,14 @@ pub enum RepositoryError { SerializationError(#[from] rmp_serde::encode::Error), #[error("error in repository deserialization `{0}`")] DeserializationError(#[from] rmp_serde::decode::Error), + #[error("error finding conflicting path for node `{0}`, this probably indicades a bug in `rebase`")] + ConflictingPathNotFound(NodeId), } pub type RepositoryResult = Result; -/// FIXME: what do we want to do with implicit groups? -/// +// FIXME: what do we want to do with implicit groups? +// impl Repository { pub fn update( storage: Arc, @@ -244,7 +250,7 @@ impl Repository { storage: Arc, ) -> Arc { // TODO: allow tuning once we experiment with different configurations - Arc::new(MemCachingStorage::new(storage, 2, 2, 2, 0)) + Arc::new(MemCachingStorage::new(storage, 2, 2, 0, 2, 0)) } fn new( @@ -315,7 +321,10 @@ impl Repository { let last = parent.metadata.clone(); let it = if parent.short_term_history.len() < parent.total_parents as usize { // FIXME: implement splitting of snapshot history - Either::Left(parent.local_ancestry().chain(iter::once_with(|| todo!()))) + #[allow(clippy::unimplemented)] + Either::Left( + parent.local_ancestry().chain(iter::once_with(|| unimplemented!())), + ) } else { Either::Right(parent.local_ancestry()) }; @@ -725,6 +734,126 @@ impl Repository { } } + /// Detect and optionally fix conflicts between the current [`ChangeSet`] (or session) and + /// the tip of the branch. + /// + /// When [`Repository::commit`] method is called, the system validates that the tip of the + /// passed branch is exactly the same as the `snapshot_id` for the current session. If that + /// is not the case, the commit operation fails with [`RepositoryError::Conflict`]. + /// + /// In that situation, the user has two options: + /// 1. Abort the session and start a new one with using the new branch tip as a parent. + /// 2. Use [`Repository::rebase`] to try to "fast-forward" the session through the new + /// commits. + /// + /// The issue with option 1 is that all the writes that have been done in the session, + /// including the chunks, will be lost and they need to be written again. But, restarting + /// the session is always the safest option. It's the only way to guarantee that + /// any reads done during the session were actually reading the latest data. + /// + /// User that understands the tradeoffs, can use option 2. This is useful, for example + /// when different "jobs" modify different arrays, or different parts of an array. + /// In situations like that, "merging" the two changes is pretty trivial. But what + /// happens when there are conflicts. For example, what happens when the current session + /// and a new commit both wrote to the same chunk, or both updated user attributes for + /// the same group. + /// + /// This is what [`Repository::rebase`] helps with. It can detect conflicts to let + /// the user fix them manually, or it can attempt to fix conflicts based on a policy. + /// + /// Example: + /// ```ignore + /// let repo = ... + /// let payload = repo.get_chunk_writer()(Bytes::copy_from_slice(b"foo")).await?; + /// repo.set_chunk_ref(array_path, ChunkIndices(vec![0]), Some(payload)).await?; + /// + /// // the commit fails with a conflict because some other writer committed once or more before us + /// let error = repo.commit("main", "wrote a chunk").await.unwrap_err(); + /// + /// // let's inspect what are the conflicts + /// if let Err(RebaseFailed {conflicts, ..}) = repo2.rebase(&ConflictDetector, "main").await.unwrap_err() { + /// // inspect the list of conflicts and fix them manually + /// // ... + /// + /// // once fixed we can commit again + /// + /// repo.commit("main", "wrote a chunk").await?; + /// } + /// ``` + /// + /// Instead of fixing the conflicts manually, the user can try rebasing with an automated + /// policy, configured to their needs: + /// + /// ```ignore + /// let solver = BasicConflictSolver { + /// on_chunk_conflict: VersionSelection::UseOurs, + /// ..Default::default() + /// }; + /// repo2.rebase(&solver, "main").await? + /// ``` + /// + /// When there are more than one commit between the parent snapshot and the tip of + /// the branch, `rebase` iterates over all of them, older first, trying to fast-forward. + /// If at some point it finds a conflict it cannot recover from, `rebase` leaves the + /// `Repository` in a consistent state, that would successfully commit on top + /// of the latest successfully fast-forwarded commit. + pub async fn rebase( + &mut self, + solver: &dyn ConflictSolver, + update_branch_name: &str, + ) -> RepositoryResult<()> { + let ref_data = + fetch_branch_tip(self.storage.as_ref(), update_branch_name).await?; + + if ref_data.snapshot == self.snapshot_id { + // nothing to do, commit should work without rebasing + Ok(()) + } else { + let current_snapshot = + self.storage.fetch_snapshot(&ref_data.snapshot).await?; + // FIXME: this should be the whole ancestry not local + let anc = current_snapshot.local_ancestry().map(|meta| meta.id); + let new_commits = iter::once(ref_data.snapshot.clone()) + .chain(anc.take_while(|snap_id| snap_id != &self.snapshot_id)) + .collect::>(); + + // TODO: this clone is expensive + // we currently need it to be able to process commits one by one without modifying the + // changeset in case of failure + // let mut changeset = self.change_set.clone(); + + // we need to reverse the iterator to process them in order of oldest first + for snap_id in new_commits.into_iter().rev() { + let tx_log = self.storage.fetch_transaction_log(&snap_id).await?; + let repo = Repository { + config: self.config().clone(), + storage: self.storage.clone(), + snapshot_id: snap_id.clone(), + change_set: ChangeSet::default(), + virtual_resolver: self.virtual_resolver.clone(), + }; + + let change_set = take(&mut self.change_set); + // TODO: this should probably execute in a worker thread + match solver.solve(&tx_log, &repo, change_set, self).await? { + ConflictResolution::Patched(patched_changeset) => { + self.change_set = patched_changeset; + self.snapshot_id = snap_id; + } + ConflictResolution::Unsolvable { reason, unmodified } => { + self.change_set = unmodified; + return Err(RepositoryError::RebaseFailed { + snapshot: snap_id, + conflicts: reason, + }); + } + } + } + + Ok(()) + } + } + pub fn changes(&self) -> &ChangeSet { &self.change_set } @@ -733,6 +862,10 @@ impl Repository { self.change_set.export_to_bytes() } + pub fn change_set(&self) -> &ChangeSet { + &self.change_set + } + pub async fn new_branch(&self, branch_name: &str) -> RepositoryResult { // TODO: The parent snapshot should exist? let version = match update_branch( @@ -948,8 +1081,12 @@ async fn flush( new_snapshot.metadata.written_at = Utc::now(); let new_snapshot = Arc::new(new_snapshot); + // FIXME: this should execute in a non-blocking context + let tx_log = + TransactionLog::new(change_set, old_snapshot.iter(), new_snapshot.iter()); let new_snapshot_id = &new_snapshot.metadata.id; storage.write_snapshot(new_snapshot_id.clone(), Arc::clone(&new_snapshot)).await?; + storage.write_transaction_log(new_snapshot_id.clone(), Arc::new(tx_log)).await?; Ok(new_snapshot_id.clone()) } @@ -1095,6 +1232,10 @@ mod tests { use std::{error::Error, num::NonZeroU64}; use crate::{ + conflicts::{ + basic_solver::{BasicConflictSolver, VersionSelection}, + detector::ConflictDetector, + }, format::manifest::ChunkInfo, metadata::{ ChunkKeyEncoding, ChunkShape, Codec, DataType, FillValue, StorageTransformer, @@ -2159,6 +2300,825 @@ mod tests { Ok(()) } + /// Construct two repos on the same storage, with a commit, a group and an array + /// + /// Group: /foo/bar + /// Array: /foo/bar/some-array + async fn get_repos_for_conflict() -> Result<(Repository, Repository), Box> + { + let storage: Arc = + Arc::new(ObjectStorage::new_in_memory_store(Some("prefix".into()))); + let mut repo1 = Repository::init(Arc::clone(&storage), false).await?.build(); + + repo1.add_group("/foo/bar".try_into().unwrap()).await?; + repo1.add_array("/foo/bar/some-array".try_into().unwrap(), basic_meta()).await?; + repo1.commit(Ref::DEFAULT_BRANCH, "create directory", None).await?; + + let repo2 = + Repository::from_branch_tip(Arc::clone(&storage), "main").await?.build(); + + Ok((repo1, repo2)) + } + + fn basic_meta() -> ZarrArrayMetadata { + ZarrArrayMetadata { + shape: vec![5], + data_type: DataType::Int32, + chunk_shape: ChunkShape(vec![NonZeroU64::new(1).unwrap()]), + chunk_key_encoding: ChunkKeyEncoding::Slash, + fill_value: FillValue::Int32(0), + codecs: vec![], + storage_transformers: None, + dimension_names: None, + } + } + + fn assert_has_conflict(conflict: &Conflict, rebase_result: RepositoryResult<()>) { + match rebase_result { + Err(RepositoryError::RebaseFailed { conflicts, .. }) => { + assert!(conflicts.contains(conflict)); + } + other => panic!("test failed, expected conflict, got {:?}", other), + } + } + + #[tokio::test()] + /// Test conflict detection + /// + /// This session: add array + /// Previous commit: add group on same path + async fn test_conflict_detection_node_conflict_with_existing_node( + ) -> Result<(), Box> { + let (mut repo1, mut repo2) = get_repos_for_conflict().await?; + + let conflict_path: Path = "/foo/bar/conflict".try_into().unwrap(); + repo1.add_group(conflict_path.clone()).await?; + repo1.commit(Ref::DEFAULT_BRANCH, "create group", None).await?; + + repo2.add_array(conflict_path.clone(), basic_meta()).await?; + repo2.commit("main", "create array", None).await.unwrap_err(); + assert_has_conflict( + &Conflict::NewNodeConflictsWithExistingNode(conflict_path), + repo2.rebase(&ConflictDetector, "main").await, + ); + Ok(()) + } + + #[tokio::test()] + /// Test conflict detection + /// + /// This session: add array + /// Previous commit: add array in implicit path to the session array + async fn test_conflict_detection_node_conflict_in_path() -> Result<(), Box> + { + let (mut repo1, mut repo2) = get_repos_for_conflict().await?; + + let conflict_path: Path = "/foo/bar/conflict".try_into().unwrap(); + repo1.add_array(conflict_path.clone(), basic_meta()).await?; + repo1.commit(Ref::DEFAULT_BRANCH, "create array", None).await?; + + let inner_path: Path = "/foo/bar/conflict/inner".try_into().unwrap(); + repo2.add_array(inner_path.clone(), basic_meta()).await?; + repo2.commit("main", "create inner array", None).await.unwrap_err(); + assert_has_conflict( + &Conflict::NewNodeInInvalidGroup(conflict_path), + repo2.rebase(&ConflictDetector, "main").await, + ); + Ok(()) + } + + #[tokio::test()] + /// Test conflict detection + /// + /// This session: update array metadata + /// Previous commit: update array metadata + async fn test_conflict_detection_double_zarr_metadata_edit( + ) -> Result<(), Box> { + let (mut repo1, mut repo2) = get_repos_for_conflict().await?; + + let path: Path = "/foo/bar/some-array".try_into().unwrap(); + repo1.update_array(path.clone(), basic_meta()).await?; + repo1.commit(Ref::DEFAULT_BRANCH, "update array", None).await?; + + repo2.update_array(path.clone(), basic_meta()).await?; + repo2.commit("main", "update array again", None).await.unwrap_err(); + assert_has_conflict( + &Conflict::ZarrMetadataDoubleUpdate(path), + repo2.rebase(&ConflictDetector, "main").await, + ); + Ok(()) + } + + #[tokio::test()] + /// Test conflict detection + /// + /// This session: delete array + /// Previous commit: update same array metadata + async fn test_conflict_detection_metadata_edit_of_deleted( + ) -> Result<(), Box> { + let (mut repo1, mut repo2) = get_repos_for_conflict().await?; + + let path: Path = "/foo/bar/some-array".try_into().unwrap(); + repo1.delete_array(path.clone()).await?; + repo1.commit(Ref::DEFAULT_BRANCH, "delete array", None).await?; + + repo2.update_array(path.clone(), basic_meta()).await?; + repo2.commit("main", "update array again", None).await.unwrap_err(); + assert_has_conflict( + &Conflict::ZarrMetadataUpdateOfDeletedArray(path), + repo2.rebase(&ConflictDetector, "main").await, + ); + Ok(()) + } + + #[tokio::test()] + /// Test conflict detection + /// + /// This session: uptade user attributes + /// Previous commit: update user attributes + async fn test_conflict_detection_double_user_atts_edit() -> Result<(), Box> + { + let (mut repo1, mut repo2) = get_repos_for_conflict().await?; + + let path: Path = "/foo/bar/some-array".try_into().unwrap(); + repo1 + .set_user_attributes( + path.clone(), + Some(UserAttributes::try_new(br#"{"foo":"bar"}"#).unwrap()), + ) + .await?; + repo1.commit(Ref::DEFAULT_BRANCH, "update array", None).await?; + + repo2 + .set_user_attributes( + path.clone(), + Some(UserAttributes::try_new(br#"{"foo":"bar"}"#).unwrap()), + ) + .await?; + repo2.commit("main", "update array user atts", None).await.unwrap_err(); + let node_id = repo2.get_array(&path).await?.id; + assert_has_conflict( + &Conflict::UserAttributesDoubleUpdate { path, node_id }, + repo2.rebase(&ConflictDetector, "main").await, + ); + Ok(()) + } + + #[tokio::test()] + /// Test conflict detection + /// + /// This session: uptade user attributes + /// Previous commit: delete same array + async fn test_conflict_detection_user_atts_edit_of_deleted( + ) -> Result<(), Box> { + let (mut repo1, mut repo2) = get_repos_for_conflict().await?; + + let path: Path = "/foo/bar/some-array".try_into().unwrap(); + repo1.delete_array(path.clone()).await?; + repo1.commit(Ref::DEFAULT_BRANCH, "delete array", None).await?; + + repo2 + .set_user_attributes( + path.clone(), + Some(UserAttributes::try_new(br#"{"foo":"bar"}"#).unwrap()), + ) + .await?; + repo2.commit("main", "update array user atts", None).await.unwrap_err(); + assert_has_conflict( + &Conflict::UserAttributesUpdateOfDeletedNode(path), + repo2.rebase(&ConflictDetector, "main").await, + ); + Ok(()) + } + + #[tokio::test()] + /// Test conflict detection + /// + /// This session: delete array + /// Previous commit: update same array metadata + async fn test_conflict_detection_delete_when_array_metadata_updated( + ) -> Result<(), Box> { + let (mut repo1, mut repo2) = get_repos_for_conflict().await?; + + let path: Path = "/foo/bar/some-array".try_into().unwrap(); + repo1.update_array(path.clone(), basic_meta()).await?; + repo1.commit(Ref::DEFAULT_BRANCH, "update array", None).await?; + + repo2.delete_array(path.clone()).await?; + repo2.commit("main", "delete array", None).await.unwrap_err(); + assert_has_conflict( + &Conflict::DeleteOfUpdatedArray(path), + repo2.rebase(&ConflictDetector, "main").await, + ); + Ok(()) + } + + #[tokio::test()] + /// Test conflict detection + /// + /// This session: delete array + /// Previous commit: update same array user attributes + async fn test_conflict_detection_delete_when_array_user_atts_updated( + ) -> Result<(), Box> { + let (mut repo1, mut repo2) = get_repos_for_conflict().await?; + + let path: Path = "/foo/bar/some-array".try_into().unwrap(); + repo1 + .set_user_attributes( + path.clone(), + Some(UserAttributes::try_new(br#"{"foo":"bar"}"#).unwrap()), + ) + .await?; + repo1.commit(Ref::DEFAULT_BRANCH, "update user attributes", None).await?; + + repo2.delete_array(path.clone()).await?; + repo2.commit("main", "delete array", None).await.unwrap_err(); + assert_has_conflict( + &Conflict::DeleteOfUpdatedArray(path), + repo2.rebase(&ConflictDetector, "main").await, + ); + Ok(()) + } + + #[tokio::test()] + /// Test conflict detection + /// + /// This session: delete array + /// Previous commit: update same array chunks + async fn test_conflict_detection_delete_when_chunks_updated( + ) -> Result<(), Box> { + let (mut repo1, mut repo2) = get_repos_for_conflict().await?; + + let path: Path = "/foo/bar/some-array".try_into().unwrap(); + repo1 + .set_chunk_ref( + path.clone(), + ChunkIndices(vec![0]), + Some(ChunkPayload::Inline("hello".into())), + ) + .await?; + repo1.commit(Ref::DEFAULT_BRANCH, "update chunks", None).await?; + + repo2.delete_array(path.clone()).await?; + repo2.commit("main", "delete array", None).await.unwrap_err(); + assert_has_conflict( + &Conflict::DeleteOfUpdatedArray(path), + repo2.rebase(&ConflictDetector, "main").await, + ); + Ok(()) + } + + #[tokio::test()] + /// Test conflict detection + /// + /// This session: delete group + /// Previous commit: update same group user attributes + async fn test_conflict_detection_delete_when_group_user_atts_updated( + ) -> Result<(), Box> { + let (mut repo1, mut repo2) = get_repos_for_conflict().await?; + + let path: Path = "/foo/bar".try_into().unwrap(); + repo1 + .set_user_attributes( + path.clone(), + Some(UserAttributes::try_new(br#"{"foo":"bar"}"#).unwrap()), + ) + .await?; + repo1.commit(Ref::DEFAULT_BRANCH, "update user attributes", None).await?; + + repo2.delete_group(path.clone()).await?; + repo2.commit("main", "delete group", None).await.unwrap_err(); + assert_has_conflict( + &Conflict::DeleteOfUpdatedGroup(path), + repo2.rebase(&ConflictDetector, "main").await, + ); + Ok(()) + } + + #[tokio::test()] + async fn test_rebase_without_fast_forward() -> Result<(), Box> { + let storage: Arc = + Arc::new(ObjectStorage::new_in_memory_store(Some("prefix".into()))); + let mut repo = Repository::init(Arc::clone(&storage), false).await?.build(); + + repo.add_group("/".try_into().unwrap()).await?; + let zarr_meta = ZarrArrayMetadata { + shape: vec![5], + data_type: DataType::Int32, + chunk_shape: ChunkShape(vec![NonZeroU64::new(1).unwrap()]), + chunk_key_encoding: ChunkKeyEncoding::Slash, + fill_value: FillValue::Int32(0), + codecs: vec![], + storage_transformers: None, + dimension_names: None, + }; + + let new_array_path: Path = "/array".try_into().unwrap(); + repo.add_array(new_array_path.clone(), zarr_meta.clone()).await?; + repo.commit(Ref::DEFAULT_BRANCH, "create array", None).await?; + + // one writer sets chunks + // other writer sets the same chunks, generating a conflict + + let mut repo1 = + Repository::from_branch_tip(Arc::clone(&storage), "main").await?.build(); + let mut repo2 = + Repository::from_branch_tip(Arc::clone(&storage), "main").await?.build(); + + repo1 + .set_chunk_ref( + new_array_path.clone(), + ChunkIndices(vec![0]), + Some(ChunkPayload::Inline("hello".into())), + ) + .await?; + repo1 + .set_chunk_ref( + new_array_path.clone(), + ChunkIndices(vec![1]), + Some(ChunkPayload::Inline("hello".into())), + ) + .await?; + let conflicting_snap = + repo1.commit("main", "write two chunks with repo 1", None).await?; + + repo2 + .set_chunk_ref( + new_array_path.clone(), + ChunkIndices(vec![0]), + Some(ChunkPayload::Inline("hello".into())), + ) + .await?; + + // verify we cannot commit + if let Err(RepositoryError::Conflict { .. }) = + repo2.commit("main", "write one chunk with repo2", None).await + { + // detect conflicts using rebase + let result = repo2.rebase(&ConflictDetector, "main").await; + // assert the conflict is double chunk update + assert!(matches!( + result, + Err(RepositoryError::RebaseFailed { snapshot, conflicts, }) + if snapshot == conflicting_snap && + conflicts.len() == 1 && + matches!(conflicts[0], Conflict::ChunkDoubleUpdate { ref path, ref chunk_coordinates, .. } + if path == &new_array_path && chunk_coordinates == &[ChunkIndices(vec![0])].into()) + )); + } else { + panic!("Bad test, it should conflict") + } + + Ok(()) + } + + #[tokio::test()] + async fn test_rebase_fast_forwarding_over_chunk_writes() -> Result<(), Box> + { + let storage: Arc = + Arc::new(ObjectStorage::new_in_memory_store(Some("prefix".into()))); + let mut repo = Repository::init(Arc::clone(&storage), false).await?.build(); + + repo.add_group("/".try_into().unwrap()).await?; + let zarr_meta = ZarrArrayMetadata { + shape: vec![5], + data_type: DataType::Int32, + chunk_shape: ChunkShape(vec![NonZeroU64::new(1).unwrap()]), + chunk_key_encoding: ChunkKeyEncoding::Slash, + fill_value: FillValue::Int32(0), + codecs: vec![], + storage_transformers: None, + dimension_names: None, + }; + + let new_array_path: Path = "/array".try_into().unwrap(); + repo.add_array(new_array_path.clone(), zarr_meta.clone()).await?; + let array_created_snap = + repo.commit(Ref::DEFAULT_BRANCH, "create array", None).await?; + + let mut repo1 = + Repository::from_branch_tip(Arc::clone(&storage), "main").await?.build(); + + repo1 + .set_chunk_ref( + new_array_path.clone(), + ChunkIndices(vec![0]), + Some(ChunkPayload::Inline("hello".into())), + ) + .await?; + repo1 + .set_chunk_ref( + new_array_path.clone(), + ChunkIndices(vec![1]), + Some(ChunkPayload::Inline("hello".into())), + ) + .await?; + let conflicting_snap = + repo1.commit("main", "write two chunks with repo 1", None).await?; + + // let's try to create a new commit, that conflicts with the previous one but writes to + // different chunks + let mut repo2 = + Repository::update(Arc::clone(&storage), array_created_snap.clone()).build(); + repo2 + .set_chunk_ref( + new_array_path.clone(), + ChunkIndices(vec![2]), + Some(ChunkPayload::Inline("hello".into())), + ) + .await?; + if let Err(RepositoryError::Conflict { .. }) = + repo2.commit("main", "write one chunk with repo2", None).await + { + let solver = BasicConflictSolver::default(); + // different chunks were written so this should fast forward + repo2.rebase(&solver, "main").await?; + repo2.commit("main", "after conflict", None).await?; + let data = + repo2.get_chunk_ref(&new_array_path, &ChunkIndices(vec![2])).await?; + assert_eq!(data, Some(ChunkPayload::Inline("hello".into()))); + let commits = repo2.ancestry().await?.try_collect::>().await?; + assert_eq!(commits[0].message, "after conflict"); + assert_eq!(commits[1].message, "write two chunks with repo 1"); + } else { + panic!("Bad test, it should conflict") + } + + // reset the branch to what repo1 wrote + let current_snap = fetch_branch_tip(storage.as_ref(), "main").await?.snapshot; + update_branch( + storage.as_ref(), + "main", + conflicting_snap.clone(), + Some(¤t_snap), + false, + ) + .await?; + + // let's try to create a new commit, that conflicts with the previous one and writes + // to the same chunk, recovering with "Fail" policy (so it shouldn't recover) + let mut repo2 = + Repository::update(Arc::clone(&storage), array_created_snap.clone()).build(); + repo2 + .set_chunk_ref( + new_array_path.clone(), + ChunkIndices(vec![1]), + Some(ChunkPayload::Inline("overridden".into())), + ) + .await?; + + if let Err(RepositoryError::Conflict { .. }) = + repo2.commit("main", "write one chunk with repo2", None).await + { + let solver = BasicConflictSolver { + on_chunk_conflict: VersionSelection::Fail, + ..BasicConflictSolver::default() + }; + + let res = repo2.rebase(&solver, "main").await; + assert!(matches!( + res, + Err(RepositoryError::RebaseFailed { snapshot, conflicts, }) + if snapshot == conflicting_snap && + conflicts.len() == 1 && + matches!(conflicts[0], Conflict::ChunkDoubleUpdate { ref path, ref chunk_coordinates, .. } + if path == &new_array_path && chunk_coordinates == &[ChunkIndices(vec![1])].into()) + )); + } else { + panic!("Bad test, it should conflict") + } + + // reset the branch to what repo1 wrote + let current_snap = fetch_branch_tip(storage.as_ref(), "main").await?.snapshot; + update_branch( + storage.as_ref(), + "main", + conflicting_snap.clone(), + Some(¤t_snap), + false, + ) + .await?; + + // let's try to create a new commit, that conflicts with the previous one and writes + // to the same chunk, recovering with "UseOurs" policy + let mut repo2 = + Repository::update(Arc::clone(&storage), array_created_snap.clone()).build(); + repo2 + .set_chunk_ref( + new_array_path.clone(), + ChunkIndices(vec![1]), + Some(ChunkPayload::Inline("overridden".into())), + ) + .await?; + if let Err(RepositoryError::Conflict { .. }) = + repo2.commit("main", "write one chunk with repo2", None).await + { + let solver = BasicConflictSolver { + on_chunk_conflict: VersionSelection::UseOurs, + ..Default::default() + }; + + repo2.rebase(&solver, "main").await?; + repo2.commit("main", "after conflict", None).await?; + let data = + repo2.get_chunk_ref(&new_array_path, &ChunkIndices(vec![1])).await?; + assert_eq!(data, Some(ChunkPayload::Inline("overridden".into()))); + let commits = repo2.ancestry().await?.try_collect::>().await?; + assert_eq!(commits[0].message, "after conflict"); + assert_eq!(commits[1].message, "write two chunks with repo 1"); + } else { + panic!("Bad test, it should conflict") + } + + // reset the branch to what repo1 wrote + let current_snap = fetch_branch_tip(storage.as_ref(), "main").await?.snapshot; + update_branch( + storage.as_ref(), + "main", + conflicting_snap.clone(), + Some(¤t_snap), + false, + ) + .await?; + + // let's try to create a new commit, that conflicts with the previous one and writes + // to the same chunk, recovering with "UseTheirs" policy + let mut repo2 = + Repository::update(Arc::clone(&storage), array_created_snap.clone()).build(); + repo2 + .set_chunk_ref( + new_array_path.clone(), + ChunkIndices(vec![1]), + Some(ChunkPayload::Inline("overridden".into())), + ) + .await?; + if let Err(RepositoryError::Conflict { .. }) = + repo2.commit("main", "write one chunk with repo2", None).await + { + let solver = BasicConflictSolver { + on_chunk_conflict: VersionSelection::UseTheirs, + ..Default::default() + }; + + repo2.rebase(&solver, "main").await?; + repo2.commit("main", "after conflict", None).await?; + let data = + repo2.get_chunk_ref(&new_array_path, &ChunkIndices(vec![1])).await?; + assert_eq!(data, Some(ChunkPayload::Inline("hello".into()))); + let commits = repo2.ancestry().await?.try_collect::>().await?; + assert_eq!(commits[0].message, "after conflict"); + assert_eq!(commits[1].message, "write two chunks with repo 1"); + } else { + panic!("Bad test, it should conflict") + } + + Ok(()) + } + + #[tokio::test] + /// Test conflict resolution with rebase + /// + /// Two sessions write user attributes to the same array + /// We attempt to recover using [`VersionSelection::UseOurs`] policy + async fn test_conflict_resolution_double_user_atts_edit_with_ours( + ) -> Result<(), Box> { + let (mut repo1, mut repo2) = get_repos_for_conflict().await?; + + let path: Path = "/foo/bar/some-array".try_into().unwrap(); + repo1 + .set_user_attributes( + path.clone(), + Some(UserAttributes::try_new(br#"{"repo":1}"#).unwrap()), + ) + .await?; + repo1.commit(Ref::DEFAULT_BRANCH, "update array", None).await?; + + repo2 + .set_user_attributes( + path.clone(), + Some(UserAttributes::try_new(br#"{"repo":2}"#).unwrap()), + ) + .await?; + repo2.commit("main", "update array user atts", None).await.unwrap_err(); + + let solver = BasicConflictSolver { + on_user_attributes_conflict: VersionSelection::UseOurs, + ..Default::default() + }; + + repo2.rebase(&solver, "main").await?; + repo2.commit("main", "after conflict", None).await?; + + let atts = repo2.get_node(&path).await.unwrap().user_attributes.unwrap(); + assert_eq!( + atts, + UserAttributesSnapshot::Inline( + UserAttributes::try_new(br#"{"repo":2}"#).unwrap() + ) + ); + Ok(()) + } + + #[tokio::test] + /// Test conflict resolution with rebase + /// + /// Two sessions write user attributes to the same array + /// We attempt to recover using [`VersionSelection::UseTheirs`] policy + async fn test_conflict_resolution_double_user_atts_edit_with_theirs( + ) -> Result<(), Box> { + let (mut repo1, mut repo2) = get_repos_for_conflict().await?; + + let path: Path = "/foo/bar/some-array".try_into().unwrap(); + repo1 + .set_user_attributes( + path.clone(), + Some(UserAttributes::try_new(br#"{"repo":1}"#).unwrap()), + ) + .await?; + repo1.commit(Ref::DEFAULT_BRANCH, "update array", None).await?; + + // we made one extra random change to the repo, because we'll undo the user attributes + // update and we cannot commit an empty change + repo2.add_group("/baz".try_into().unwrap()).await?; + + repo2 + .set_user_attributes( + path.clone(), + Some(UserAttributes::try_new(br#"{"repo":2}"#).unwrap()), + ) + .await?; + repo2.commit("main", "update array user atts", None).await.unwrap_err(); + + let solver = BasicConflictSolver { + on_user_attributes_conflict: VersionSelection::UseTheirs, + ..Default::default() + }; + + repo2.rebase(&solver, "main").await?; + repo2.commit("main", "after conflict", None).await?; + + let atts = repo2.get_node(&path).await.unwrap().user_attributes.unwrap(); + assert_eq!( + atts, + UserAttributesSnapshot::Inline( + UserAttributes::try_new(br#"{"repo":1}"#).unwrap() + ) + ); + + repo2.get_node(&"/baz".try_into().unwrap()).await?; + Ok(()) + } + + #[tokio::test] + /// Test conflict resolution with rebase + /// + /// One session deletes an array, the other updates its metadata. + /// We attempt to recover using the default [`BasicConflictSolver`] + /// Array should still be deleted + async fn test_conflict_resolution_delete_of_updated_array( + ) -> Result<(), Box> { + let (mut repo1, mut repo2) = get_repos_for_conflict().await?; + + let path: Path = "/foo/bar/some-array".try_into().unwrap(); + repo1.update_array(path.clone(), basic_meta()).await?; + repo1.commit(Ref::DEFAULT_BRANCH, "update array", None).await?; + + repo2.delete_array(path.clone()).await?; + repo2.commit("main", "delete array", None).await.unwrap_err(); + + repo2.rebase(&BasicConflictSolver::default(), "main").await?; + repo2.commit("main", "after conflict", None).await?; + + assert!(matches!( + repo2.get_node(&path).await, + Err(RepositoryError::NodeNotFound { .. }) + )); + + Ok(()) + } + + #[tokio::test] + /// Test conflict resolution with rebase + /// + /// Verify we can rebase over multiple commits if they are all fast-forwardable. + /// We have multiple commits with chunk writes, and then a session has to rebase + /// writing to the same chunks. + async fn test_conflict_resolution_success_through_multiple_commits( + ) -> Result<(), Box> { + let (mut repo1, mut repo2) = get_repos_for_conflict().await?; + + let path: Path = "/foo/bar/some-array".try_into().unwrap(); + // write chunks with repo 1 + for coord in [0u32, 1, 2] { + repo1 + .set_chunk_ref( + path.clone(), + ChunkIndices(vec![coord]), + Some(ChunkPayload::Inline("repo 1".into())), + ) + .await?; + repo1 + .commit( + Ref::DEFAULT_BRANCH, + format!("update chunk {}", coord).as_str(), + None, + ) + .await?; + } + + // write the same chunks with repo 2 + for coord in [0u32, 1, 2] { + repo2 + .set_chunk_ref( + path.clone(), + ChunkIndices(vec![coord]), + Some(ChunkPayload::Inline("repo 2".into())), + ) + .await?; + } + + repo2 + .commit(Ref::DEFAULT_BRANCH, "update chunk on repo 2", None) + .await + .unwrap_err(); + + let solver = BasicConflictSolver { + on_chunk_conflict: VersionSelection::UseTheirs, + ..Default::default() + }; + + repo2.rebase(&solver, "main").await?; + repo2.commit("main", "after conflict", None).await?; + for coord in [0, 1, 2] { + let payload = repo2.get_chunk_ref(&path, &ChunkIndices(vec![coord])).await?; + assert_eq!(payload, Some(ChunkPayload::Inline("repo 1".into()))); + } + Ok(()) + } + + #[tokio::test] + /// Rebase over multiple commits with partial failure + /// + /// We verify that we can partially fast forward, stopping at the first unrecoverable commit + async fn test_conflict_resolution_failure_in_multiple_commits( + ) -> Result<(), Box> { + let (mut repo1, mut repo2) = get_repos_for_conflict().await?; + + let path: Path = "/foo/bar/some-array".try_into().unwrap(); + repo1 + .set_user_attributes( + path.clone(), + Some(UserAttributes::try_new(br#"{"repo":1}"#).unwrap()), + ) + .await?; + let non_conflicting_snap = + repo1.commit(Ref::DEFAULT_BRANCH, "update user atts", None).await?; + + repo1 + .set_chunk_ref( + path.clone(), + ChunkIndices(vec![0]), + Some(ChunkPayload::Inline("repo 1".into())), + ) + .await?; + + let conflicting_snap = + repo1.commit(Ref::DEFAULT_BRANCH, "update chunk ref", None).await?; + + repo2 + .set_chunk_ref( + path.clone(), + ChunkIndices(vec![0]), + Some(ChunkPayload::Inline("repo 2".into())), + ) + .await?; + + repo2.commit(Ref::DEFAULT_BRANCH, "update chunk ref", None).await.unwrap_err(); + // we setup a [`ConflictSolver`]` that can recover from the first but not the second + // conflict + let solver = BasicConflictSolver { + on_chunk_conflict: VersionSelection::Fail, + ..Default::default() + }; + + let err = repo2.rebase(&solver, "main").await.unwrap_err(); + + assert!(matches!( + err, + RepositoryError::RebaseFailed { snapshot, conflicts} + if snapshot == conflicting_snap && + conflicts.len() == 1 && + matches!(conflicts[0], Conflict::ChunkDoubleUpdate { ref path, ref chunk_coordinates, .. } + if path == path && chunk_coordinates == &[ChunkIndices(vec![0])].into()) + )); + + // we were able to rebase one commit but not the second one, + // so now the parent is the first commit + assert_eq!(repo2.snapshot_id(), &non_conflicting_snap); + + Ok(()) + } + #[cfg(test)] mod state_machine_test { use crate::format::snapshot::NodeData; diff --git a/icechunk/src/storage/caching.rs b/icechunk/src/storage/caching.rs index 19a2de28..49dba59a 100644 --- a/icechunk/src/storage/caching.rs +++ b/icechunk/src/storage/caching.rs @@ -8,7 +8,8 @@ use quick_cache::sync::Cache; use crate::{ format::{ attributes::AttributesTable, manifest::Manifest, snapshot::Snapshot, - AttributesId, ByteRange, ChunkId, ManifestId, SnapshotId, + transaction_log::TransactionLog, AttributesId, ByteRange, ChunkId, ManifestId, + SnapshotId, }, private, }; @@ -20,6 +21,7 @@ pub struct MemCachingStorage { backend: Arc, snapshot_cache: Cache>, manifest_cache: Cache>, + transactions_cache: Cache>, attributes_cache: Cache>, chunk_cache: Cache<(ChunkId, ByteRange), Bytes>, } @@ -29,6 +31,7 @@ impl MemCachingStorage { backend: Arc, num_snapshots: u16, num_manifests: u16, + num_transactions: u16, num_attributes: u16, num_chunks: u16, ) -> Self { @@ -36,6 +39,7 @@ impl MemCachingStorage { backend, snapshot_cache: Cache::new(num_snapshots as usize), manifest_cache: Cache::new(num_manifests as usize), + transactions_cache: Cache::new(num_transactions as usize), attributes_cache: Cache::new(num_attributes as usize), chunk_cache: Cache::new(num_chunks as usize), } @@ -88,6 +92,20 @@ impl Storage for MemCachingStorage { } } + async fn fetch_transaction_log( + &self, + id: &SnapshotId, + ) -> StorageResult> { + match self.transactions_cache.get_value_or_guard_async(id).await { + Ok(log) => Ok(log), + Err(guard) => { + let log = self.backend.fetch_transaction_log(id).await?; + let _fail_is_ok = guard.insert(Arc::clone(&log)); + Ok(log) + } + } + } + async fn fetch_chunk( &self, id: &ChunkId, @@ -134,6 +152,16 @@ impl Storage for MemCachingStorage { Ok(()) } + async fn write_transaction_log( + &self, + id: SnapshotId, + log: Arc, + ) -> StorageResult<()> { + self.backend.write_transaction_log(id.clone(), Arc::clone(&log)).await?; + self.transactions_cache.insert(id, log); + Ok(()) + } + async fn write_chunk(&self, id: ChunkId, bytes: Bytes) -> Result<(), StorageError> { self.backend.write_chunk(id.clone(), bytes.clone()).await?; // we don't pre-populate the chunk cache, there are too many of them for this to be useful @@ -217,7 +245,7 @@ mod test { let logging = Arc::new(LoggingStorage::new(Arc::clone(&backend))); let logging_c: Arc = logging.clone(); - let caching = MemCachingStorage::new(Arc::clone(&logging_c), 0, 2, 0, 0); + let caching = MemCachingStorage::new(Arc::clone(&logging_c), 0, 2, 0, 0, 0); let manifest = Arc::new(vec![ci2].into_iter().collect()); let id = ManifestId::random(); @@ -295,6 +323,7 @@ mod test { 2, 0, 0, + 0, ); // we keep asking for all 3 items, but the cache can only fit 2 diff --git a/icechunk/src/storage/logging.rs b/icechunk/src/storage/logging.rs index 87193b10..2ef2ffd5 100644 --- a/icechunk/src/storage/logging.rs +++ b/icechunk/src/storage/logging.rs @@ -47,6 +47,17 @@ impl Storage for LoggingStorage { self.backend.fetch_snapshot(id).await } + async fn fetch_transaction_log( + &self, + id: &SnapshotId, + ) -> StorageResult> { + self.fetch_log + .lock() + .expect("poison lock") + .push(("fetch_transaction_log".to_string(), id.0.to_vec())); + self.backend.fetch_transaction_log(id).await + } + async fn fetch_attributes( &self, id: &AttributesId, @@ -89,6 +100,14 @@ impl Storage for LoggingStorage { self.backend.write_snapshot(id, table).await } + async fn write_transaction_log( + &self, + id: SnapshotId, + log: Arc, + ) -> StorageResult<()> { + self.backend.write_transaction_log(id, log).await + } + async fn write_attributes( &self, id: AttributesId, diff --git a/icechunk/src/storage/mod.rs b/icechunk/src/storage/mod.rs index b95dd964..6e86f14a 100644 --- a/icechunk/src/storage/mod.rs +++ b/icechunk/src/storage/mod.rs @@ -31,7 +31,8 @@ pub use object_store::ObjectStorage; use crate::{ format::{ attributes::AttributesTable, manifest::Manifest, snapshot::Snapshot, - AttributesId, ByteRange, ChunkId, ManifestId, SnapshotId, + transaction_log::TransactionLog, AttributesId, ByteRange, ChunkId, ManifestId, + SnapshotId, }, private, }; @@ -76,6 +77,7 @@ const MANIFEST_PREFIX: &str = "manifests/"; // const ATTRIBUTES_PREFIX: &str = "attributes/"; const CHUNK_PREFIX: &str = "chunks/"; const REF_PREFIX: &str = "refs"; +const TRANSACTION_PREFIX: &str = "transactions/"; /// Fetch and write the parquet files that represent the repository in object store /// @@ -90,6 +92,10 @@ pub trait Storage: fmt::Debug + private::Sealed { ) -> StorageResult>; // FIXME: format flags async fn fetch_manifests(&self, id: &ManifestId) -> StorageResult>; // FIXME: format flags async fn fetch_chunk(&self, id: &ChunkId, range: &ByteRange) -> StorageResult; // FIXME: format flags + async fn fetch_transaction_log( + &self, + id: &SnapshotId, + ) -> StorageResult>; // FIXME: format flags async fn write_snapshot( &self, @@ -107,6 +113,11 @@ pub trait Storage: fmt::Debug + private::Sealed { table: Arc, ) -> StorageResult<()>; async fn write_chunk(&self, id: ChunkId, bytes: Bytes) -> StorageResult<()>; + async fn write_transaction_log( + &self, + id: SnapshotId, + log: Arc, + ) -> StorageResult<()>; async fn get_ref(&self, ref_key: &str) -> StorageResult; async fn ref_names(&self) -> StorageResult>; diff --git a/icechunk/src/storage/object_store.rs b/icechunk/src/storage/object_store.rs index 3e11deac..4731c77b 100644 --- a/icechunk/src/storage/object_store.rs +++ b/icechunk/src/storage/object_store.rs @@ -1,8 +1,8 @@ use crate::{ format::{ attributes::AttributesTable, format_constants, manifest::Manifest, - snapshot::Snapshot, AttributesId, ByteRange, ChunkId, FileTypeTag, ManifestId, - ObjectId, SnapshotId, + snapshot::Snapshot, transaction_log::TransactionLog, AttributesId, ByteRange, + ChunkId, FileTypeTag, ManifestId, ObjectId, SnapshotId, }, private, }; @@ -30,7 +30,7 @@ use std::{ use super::{ ListInfo, Storage, StorageError, StorageResult, CHUNK_PREFIX, MANIFEST_PREFIX, - REF_PREFIX, SNAPSHOT_PREFIX, + REF_PREFIX, SNAPSHOT_PREFIX, TRANSACTION_PREFIX, }; // Get Range is object_store specific, keep it with this module @@ -126,6 +126,10 @@ impl ObjectStorage { self.get_path(MANIFEST_PREFIX, id) } + fn get_transaction_path(&self, id: &SnapshotId) -> ObjectPath { + self.get_path(TRANSACTION_PREFIX, id) + } + fn get_chunk_path(&self, id: &ChunkId) -> ObjectPath { self.get_path(CHUNK_PREFIX, id) } @@ -201,6 +205,17 @@ impl Storage for ObjectStorage { Ok(Arc::new(res)) } + async fn fetch_transaction_log( + &self, + id: &SnapshotId, + ) -> StorageResult> { + let path = self.get_transaction_path(id); + let bytes = self.store.get(&path).await?.bytes().await?; + // TODO: optimize using from_read + let res = rmp_serde::from_slice(bytes.as_ref())?; + Ok(Arc::new(res)) + } + async fn write_snapshot( &self, id: SnapshotId, @@ -275,6 +290,39 @@ impl Storage for ObjectStorage { Ok(()) } + async fn write_transaction_log( + &self, + id: SnapshotId, + log: Arc, + ) -> StorageResult<()> { + let path = self.get_transaction_path(&id); + let bytes = rmp_serde::to_vec(log.as_ref())?; + let attributes = if self.supports_metadata { + Attributes::from_iter(vec![ + ( + Attribute::ContentType, + AttributeValue::from( + format_constants::LATEST_ICECHUNK_TRANSACTION_LOG_CONTENT_TYPE, + ), + ), + ( + Attribute::Metadata(std::borrow::Cow::Borrowed( + format_constants::LATEST_ICECHUNK_TRANSACTION_LOG_VERSION_METADATA_KEY, + )), + AttributeValue::from( + log.icechunk_transaction_log_format_version.to_string(), + ), + ), + ]) + } else { + Attributes::new() + }; + let options = PutOptions { attributes, ..PutOptions::default() }; + // FIXME: use multipart + self.store.put_opts(&path, bytes.into(), options).await?; + Ok(()) + } + async fn fetch_chunk( &self, id: &ChunkId, diff --git a/icechunk/src/storage/s3.rs b/icechunk/src/storage/s3.rs index d02d920b..3533bafe 100644 --- a/icechunk/src/storage/s3.rs +++ b/icechunk/src/storage/s3.rs @@ -30,8 +30,8 @@ use serde::{Deserialize, Serialize}; use crate::{ format::{ attributes::AttributesTable, format_constants, manifest::Manifest, - snapshot::Snapshot, AttributesId, ByteRange, ChunkId, FileTypeTag, ManifestId, - SnapshotId, + snapshot::Snapshot, transaction_log::TransactionLog, AttributesId, ByteRange, + ChunkId, FileTypeTag, ManifestId, SnapshotId, }, private, zarr::ObjectId, @@ -40,6 +40,7 @@ use crate::{ use super::{ ListInfo, StorageResult, CHUNK_PREFIX, MANIFEST_PREFIX, REF_PREFIX, SNAPSHOT_PREFIX, + TRANSACTION_PREFIX, }; #[derive(Debug)] @@ -157,6 +158,10 @@ impl S3Storage { self.get_path(CHUNK_PREFIX, id) } + fn get_transaction_path(&self, id: &SnapshotId) -> StorageResult { + self.get_path(TRANSACTION_PREFIX, id) + } + fn ref_key(&self, ref_key: &str) -> StorageResult { let path = PathBuf::from_iter([self.prefix.as_str(), REF_PREFIX, ref_key]); path.into_os_string().into_string().map_err(StorageError::BadPrefix) @@ -283,6 +288,17 @@ impl Storage for S3Storage { Ok(Arc::new(res)) } + async fn fetch_transaction_log( + &self, + id: &SnapshotId, + ) -> StorageResult> { + let key = self.get_transaction_path(id)?; + let bytes = self.get_object(key.as_str()).await?; + // TODO: optimize using from_read + let res = rmp_serde::from_slice(bytes.as_ref())?; + Ok(Arc::new(res)) + } + async fn fetch_chunk(&self, id: &ChunkId, range: &ByteRange) -> StorageResult { let key = self.get_chunk_path(id)?; let bytes = self.get_object_range(key.as_str(), range).await?; @@ -337,6 +353,26 @@ impl Storage for S3Storage { .await } + async fn write_transaction_log( + &self, + id: SnapshotId, + log: Arc, + ) -> StorageResult<()> { + let key = self.get_transaction_path(&id)?; + let bytes = rmp_serde::to_vec(log.as_ref())?; + let metadata = [( + format_constants::LATEST_ICECHUNK_TRANSACTION_LOG_VERSION_METADATA_KEY, + log.icechunk_transaction_log_format_version.to_string(), + )]; + self.put_object( + key.as_str(), + Some(format_constants::LATEST_ICECHUNK_TRANSACTION_LOG_CONTENT_TYPE), + metadata, + bytes, + ) + .await + } + async fn write_chunk( &self, id: ChunkId, diff --git a/icechunk/src/zarr.rs b/icechunk/src/zarr.rs index 1e8d1ca0..4ca805cb 100644 --- a/icechunk/src/zarr.rs +++ b/icechunk/src/zarr.rs @@ -988,7 +988,8 @@ async fn get_metadata( None => None, Some(UserAttributesSnapshot::Inline(atts)) => Some(atts), // FIXME: implement - Some(UserAttributesSnapshot::Ref(_)) => todo!(), + #[allow(clippy::unimplemented)] + Some(UserAttributesSnapshot::Ref(_)) => unimplemented!(), }; let full_metadata = match node.node_data { NodeData::Group => { diff --git a/shell.nix b/shell.nix index 65e895a8..b72cb731 100644 --- a/shell.nix +++ b/shell.nix @@ -1,42 +1,42 @@ let # Pinned nixpkgs, deterministic. Last updated to nixos-unstable as of: 2024-10-06 - pkgs = import (fetchTarball "https://github.com/NixOS/nixpkgs/archive/7d49afd36b5590f023ec56809c02e05d8164fbc4.tar.gz") {}; + pkgs = import (fetchTarball + "https://github.com/NixOS/nixpkgs/archive/7d49afd36b5590f023ec56809c02e05d8164fbc4.tar.gz") + { }; # Rolling updates, not deterministic. # pkgs = import (fetchTarball("channel:nixpkgs-unstable")) {}; - alejandra = - (import (builtins.fetchTarball { - url = "https://github.com/kamadorueda/alejandra/tarball/3.0.0"; - sha256 = "sha256:18jm0d5xrxk38hw5sa470zgfz9xzdcyaskjhgjwhnmzd5fgacny4"; - }) {}) - .outPath; -in - pkgs.mkShell.override { - stdenv = pkgs.stdenvAdapters.useMoldLinker pkgs.clangStdenv; - } { - packages = with pkgs; [ - rustc - cargo - cargo-watch - cargo-nextest # test runner - cargo-deny - rust-analyzer # rust lsp server - rustfmt - clippy - taplo # toml lsp server + alejandra = (import (builtins.fetchTarball { + url = "https://github.com/kamadorueda/alejandra/tarball/3.0.0"; + sha256 = "sha256:18jm0d5xrxk38hw5sa470zgfz9xzdcyaskjhgjwhnmzd5fgacny4"; + }) { }).outPath; +in pkgs.mkShell.override { + stdenv = pkgs.stdenvAdapters.useMoldLinker pkgs.clangStdenv; +} { + packages = with pkgs; [ + rustc + cargo + cargo-watch + cargo-nextest # test runner + cargo-deny + rust-analyzer # rust lsp server + rustfmt + clippy + taplo # toml lsp server - awscli2 - just # script launcher with a make flavor - alejandra # nix code formatter - ]; + awscli2 + just # script launcher with a make flavor + alejandra # nix code formatter + markdownlint-cli2 + ]; - shellHook = '' - export PYTHONPATH=".:$PYTHONPATH" + shellHook = '' + export PYTHONPATH=".:$PYTHONPATH" - export AWS_ACCESS_KEY_ID=minio123 - export AWS_SECRET_ACCESS_KEY=minio123 - export AWS_DEFAULT_REGION=us-east-1 - export RUSTFLAGS="-W unreachable-pub -W bare-trait-objects" - ''; - } + export AWS_ACCESS_KEY_ID=minio123 + export AWS_SECRET_ACCESS_KEY=minio123 + export AWS_DEFAULT_REGION=us-east-1 + export RUSTFLAGS="-W unreachable-pub -W bare-trait-objects" + ''; +}