Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add transaction logs and basic conflict resolution #403

Merged
merged 10 commits into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions .github/workflows/python-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ jobs:
pip install icechunk['test'] --find-links dist --force-reinstall
mypy python

- name: ruff
shell: bash
working-directory: icechunk-python
run: |
set -e
python3 -m venv .venv
source .venv/bin/activate
pip install icechunk['test'] --find-links dist --force-reinstall
ruff check
# - name: ruff
# shell: bash
# working-directory: icechunk-python
# run: |
# set -e
# python3 -m venv .venv
# source .venv/bin/activate
# pip install icechunk['test'] --find-links dist --force-reinstall
# ruff check

- name: Restore cached hypothesis directory
id: restore-hypothesis-cache
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ resolver = "2"
expect_used = "warn"
unwrap_used = "warn"
panic = "warn"
todo = "warn"
unimplemented = "warn"

[workspace.metadata.release]
allow-branch = ["main"]
Expand Down
2 changes: 1 addition & 1 deletion icechunk-python/python/icechunk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
__all__ = [
"__version__",
"IcechunkStore",
"StorageConfig",
"S3Credentials",
"SnapshotMetadata",
"StorageConfig",
"StoreConfig",
"VirtualRefConfig",
]
Expand Down
4 changes: 2 additions & 2 deletions icechunk/examples/low_level_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use icechunk::{
ChunkIndices, ChunkKeyEncoding, ChunkPayload, ChunkShape, Codec, DataType,
FillValue, Path, StorageTransformer, UserAttributes, ZarrArrayMetadata,
},
storage::{MemCachingStorage, ObjectStorage},
storage::ObjectStorage,
zarr::StoreError,
Repository, Storage,
};
Expand All @@ -30,7 +30,7 @@ let mut ds = Repository::create(Arc::clone(&storage));
let storage: Arc<dyn Storage + Send + Sync> =
Arc::new(ObjectStorage::new_in_memory_store(None));
let mut ds = Repository::init(
Arc::new(MemCachingStorage::new(Arc::clone(&storage), 2, 2, 0, 0)),
Repository::add_in_mem_asset_caching(Arc::clone(&storage)),
false,
)
.await?
Expand Down
57 changes: 54 additions & 3 deletions icechunk/src/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,33 @@ pub struct ChangeSet {
}

impl ChangeSet {
pub fn zarr_updated_arrays(&self) -> impl Iterator<Item = &NodeId> {
self.updated_arrays.keys()
}

pub fn deleted_arrays(&self) -> impl Iterator<Item = &Path> {
self.deleted_arrays.iter()
}

pub fn deleted_groups(&self) -> impl Iterator<Item = &Path> {
self.deleted_groups.iter()
}

pub fn user_attributes_updated_nodes(&self) -> impl Iterator<Item = &NodeId> {
self.updated_attributes.keys()
}

pub fn chunk_changes(
&self,
) -> impl Iterator<Item = (&NodeId, &HashMap<ChunkIndices, Option<ChunkPayload>>)>
{
self.set_chunks.iter()
}

pub fn arrays_with_chunk_changes(&self) -> impl Iterator<Item = &NodeId> {
self.chunk_changes().map(|(node, _)| node)
}

pub fn is_empty(&self) -> bool {
self == &ChangeSet::default()
}
Expand Down Expand Up @@ -174,6 +201,18 @@ impl ChangeSet {
self.set_chunks.get(node_id).and_then(|h| h.get(coords))
}

/// Drop the updated chunk references for the node.
/// This will only drop the references for which `predicate` returns true
pub fn drop_chunk_changes(
&mut self,
node_id: &NodeId,
predicate: impl Fn(&ChunkIndices) -> bool,
) {
if let Some(changes) = self.set_chunks.get_mut(node_id) {
changes.retain(|coord, _| !predicate(coord));
}
}

pub fn array_chunks_iterator(
&self,
node_id: &NodeId,
Expand Down Expand Up @@ -207,8 +246,16 @@ impl ChangeSet {
})
}

pub fn new_nodes(&self) -> impl Iterator<Item = &Path> {
self.new_groups.keys().chain(self.new_arrays.keys())
pub fn new_nodes(&self) -> impl Iterator<Item = (&Path, &NodeId)> {
self.new_groups().chain(self.new_arrays())
}

pub fn new_groups(&self) -> impl Iterator<Item = (&Path, &NodeId)> {
self.new_groups.iter()
}

pub fn new_arrays(&self) -> impl Iterator<Item = (&Path, &NodeId)> {
self.new_arrays.iter().map(|(path, (node_id, _))| (path, node_id))
}

pub fn take_chunks(
Expand Down Expand Up @@ -321,7 +368,7 @@ impl ChangeSet {
&'a self,
manifest_id: Option<&'a ManifestId>,
) -> impl Iterator<Item = NodeSnapshot> + 'a {
self.new_nodes().filter_map(move |path| {
self.new_nodes().filter_map(move |(path, _)| {
if self.is_deleted(path) {
return None;
}
Expand Down Expand Up @@ -382,4 +429,8 @@ impl ChangeSet {
}
}
}

pub fn undo_user_attributes_update(&mut self, node_id: &NodeId) {
self.updated_attributes.remove(node_id);
}
}
156 changes: 156 additions & 0 deletions icechunk/src/conflicts/basic_solver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
use async_trait::async_trait;

use crate::{
change_set::ChangeSet, format::transaction_log::TransactionLog,
repository::RepositoryResult, Repository,
};

use super::{detector::ConflictDetector, Conflict, ConflictResolution, ConflictSolver};

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum VersionSelection {
Fail,
UseOurs,
UseTheirs,
}

#[derive(Debug, Clone)]
pub struct BasicConflictSolver {
pub on_user_attributes_conflict: VersionSelection,
pub on_chunk_conflict: VersionSelection,
pub fail_on_delete_of_updated_array: bool,
pub fail_on_delete_of_updated_group: bool,
}

impl Default for BasicConflictSolver {
fn default() -> Self {
Self {
on_user_attributes_conflict: VersionSelection::UseOurs,
on_chunk_conflict: VersionSelection::UseOurs,
fail_on_delete_of_updated_array: false,
fail_on_delete_of_updated_group: false,
}
}
}

#[async_trait]
impl ConflictSolver for BasicConflictSolver {
async fn solve(
&self,
previous_change: &TransactionLog,
previous_repo: &Repository,
current_changes: ChangeSet,
current_repo: &Repository,
) -> RepositoryResult<ConflictResolution> {
match ConflictDetector
.solve(previous_change, previous_repo, current_changes, current_repo)
.await?
{
res @ ConflictResolution::Patched(_) => Ok(res),
ConflictResolution::Unsolvable { reason, unmodified } => {
self.solve_conflicts(
previous_change,
previous_repo,
unmodified,
current_repo,
reason,
)
.await
}
}
}
}

impl BasicConflictSolver {
async fn solve_conflicts(
&self,
_previous_change: &TransactionLog,
_previous_repo: &Repository,
current_changes: ChangeSet,
_current_repo: &Repository,
conflicts: Vec<Conflict>,
) -> RepositoryResult<ConflictResolution> {
use Conflict::*;
let unsolvable = conflicts.iter().any(
|conflict| {
matches!(
conflict,
NewNodeConflictsWithExistingNode(_) |
NewNodeInInvalidGroup(_) |
ZarrMetadataDoubleUpdate(_) |
ZarrMetadataUpdateOfDeletedArray(_) |
UserAttributesUpdateOfDeletedNode(_) |
ChunksUpdatedInDeletedArray{..} |
ChunksUpdatedInUpdatedArray{..}
) ||
matches!(conflict,
UserAttributesDoubleUpdate{..} if self.on_user_attributes_conflict == VersionSelection::Fail
) ||
matches!(conflict,
ChunkDoubleUpdate{..} if self.on_chunk_conflict == VersionSelection::Fail
) ||
matches!(conflict,
DeleteOfUpdatedArray(_) if self.fail_on_delete_of_updated_array
) ||
matches!(conflict,
DeleteOfUpdatedGroup(_) if self.fail_on_delete_of_updated_group
)
},
);

if unsolvable {
return Ok(ConflictResolution::Unsolvable {
reason: conflicts,
unmodified: current_changes,
});
}

let mut current_changes = current_changes;
for conflict in conflicts {
match conflict {
ChunkDoubleUpdate { node_id, chunk_coordinates, .. } => {
match self.on_chunk_conflict {
VersionSelection::UseOurs => {
// this is a no-op, our change will override the conflicting change
}
VersionSelection::UseTheirs => {
current_changes.drop_chunk_changes(&node_id, |coord| chunk_coordinates.contains(coord))
}
// we can panic here because we have returned from the function if there
// were any unsolvable conflicts
#[allow(clippy::panic)]
VersionSelection::Fail => panic!("Bug in conflict resolution: ChunkDoubleUpdate flagged as unrecoverable")
Comment on lines +121 to +122
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying we will never actually hit this code path in normal usage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this possibility is filtered out above, but the compilers doesn't know it.

}
}
UserAttributesDoubleUpdate { node_id, .. } => {
match self.on_user_attributes_conflict {
VersionSelection::UseOurs => {
// this is a no-op, our change will override the conflicting change
}
VersionSelection::UseTheirs => {
current_changes.undo_user_attributes_update(&node_id);
}
// we can panic here because we have returned from the function if there
// were any unsolvable conflicts
#[allow(clippy::panic)]
VersionSelection::Fail => panic!("Bug in conflict resolution: UserAttributesDoubleUpdate flagged as unrecoverable")
}
}
DeleteOfUpdatedArray(_) => {
assert!(!self.fail_on_delete_of_updated_array);
// this is a no-op, the solution is to still delete the array
}
DeleteOfUpdatedGroup(_) => {
assert!(!self.fail_on_delete_of_updated_group);
// this is a no-op, the solution is to still delete the group
}
// we can panic here because we have returned from the function if there
// were any unsolvable conflicts
#[allow(clippy::panic)]
_ => panic!("bug in conflict resolution, conflict: {:?}", conflict),
}
}

Ok(ConflictResolution::Patched(current_changes))
}
}
Loading
Loading