Skip to content

Commit

Permalink
Document code
Browse files Browse the repository at this point in the history
  • Loading branch information
paraseba committed Nov 24, 2024
1 parent 61d447c commit 94e9c64
Showing 1 changed file with 141 additions and 6 deletions.
147 changes: 141 additions & 6 deletions icechunk/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ pub enum RepositoryError {

pub type RepositoryResult<T> = Result<T, RepositoryError>;

/// FIXME: what do we want to do with implicit groups?
///
// FIXME: what do we want to do with implicit groups?
//
impl Repository {
pub fn update(
storage: Arc<dyn Storage + Send + Sync>,
Expand Down Expand Up @@ -734,6 +734,69 @@ impl Repository {
}
}

/// Detect and optionally fix conflicts between the current [`ChangeSet`] (or session) and
/// the tip of the branch.
///
/// When [`Repository::commit`] method is called, the system validates that the tip of the
/// passed branch is exactly the same as the `snapshot_id` for the current session. If that
/// is not the case, the commit operation fails with [`RepositoryError::Conflict`].
///
/// In that situation, the user has two options:
/// 1. Abort the session and start a new one with using the new branch tip as a parent.
/// 2. Use [`Repository::rebase`] to try to "fast-forward" the session through the new
/// commits.
///
/// The issue with option 1 is that all the writes that have been done in the session,
/// including the chunks, will be lost and they need to be written again. But, restarting
/// the session is always the safest option. It's the only way to guarantee that
/// any reads done during the session were actually reading the latest data.
///
/// User that understands the tradeoffs, can use option 2. This is useful, for example
/// when different "jobs" modify different arrays, or different parts of an array.
/// In situations like that, "merging" the two changes is pretty trivial. But what
/// happens when there are conflicts. For example, what happens when the current session
/// and a new commit both wrote to the same chunk, or both updated user attributes for
/// the same group.
///
/// This is what [`Repository::rebase`] helps with. It can detect conflicts to let
/// the user fix them manually, or it can attempt to fix conflicts based on a policy.
///
/// Example:
/// ```ignore
/// let repo = ...
/// let payload = repo.get_chunk_writer()(Bytes::copy_from_slice(b"foo")).await?;
/// repo.set_chunk_ref(array_path, ChunkIndices(vec![0]), Some(payload)).await?;
///
/// // the commit fails with a conflict because some other writer committed once or more before us
/// let error = repo.commit("main", "wrote a chunk").await.unwrap_err();
///
/// // let's inspect what are the conflicts
/// if let Err(RebaseFailed {conflicts, ..}) = repo2.rebase(&ConflictDetector, "main").await.unwrap_err() {
/// // inspect the list of conflicts and fix them manually
/// // ...
///
/// // once fixed we can commit again
///
/// repo.commit("main", "wrote a chunk").await?;
/// }
/// ```
///
/// Instead of fixing the conflicts manually, the user can try rebasing with an automated
/// policy, configured to their needs:
///
/// ```ignore
/// let solver = BasicConflictSolver {
/// on_chunk_conflict: VersionSelection::UseOurs,
/// ..Default::default()
/// };
/// repo2.rebase(&solver, "main").await?
/// ```
///
/// When there are more than one commit between the parent snapshot and the tip of
/// the branch, `rebase` iterates over all of them, older first, trying to fast-forward.
/// If at some point it finds a conflict it cannot recover from, `rebase` leaves the
/// `Repository` in a consistent state, that would successfully commit on top
/// of the latest successfully fast-forwarded commit.
pub async fn rebase(
&mut self,
solver: &dyn ConflictSolver,
Expand Down Expand Up @@ -2237,6 +2300,10 @@ mod tests {
Ok(())
}

/// Construct two repos on the same storage, with a commit, a group and an array
///
/// Group: /foo/bar
/// Array: /foo/bar/some-array
async fn get_repos_for_conflict() -> Result<(Repository, Repository), Box<dyn Error>>
{
let storage: Arc<dyn Storage + Send + Sync> =
Expand Down Expand Up @@ -2276,6 +2343,10 @@ mod tests {
}

#[tokio::test()]
/// Test conflict detection
///
/// This session: add array
/// Previous commit: add group on same path
async fn test_conflict_detection_node_conflict_with_existing_node(
) -> Result<(), Box<dyn Error>> {
let (mut repo1, mut repo2) = get_repos_for_conflict().await?;
Expand All @@ -2294,6 +2365,10 @@ mod tests {
}

#[tokio::test()]
/// Test conflict detection
///
/// This session: add array
/// Previous commit: add array in implicit path to the session array
async fn test_conflict_detection_node_conflict_in_path() -> Result<(), Box<dyn Error>>
{
let (mut repo1, mut repo2) = get_repos_for_conflict().await?;
Expand All @@ -2313,6 +2388,10 @@ mod tests {
}

#[tokio::test()]
/// Test conflict detection
///
/// This session: update array metadata
/// Previous commit: update array metadata
async fn test_conflict_detection_double_zarr_metadata_edit(
) -> Result<(), Box<dyn Error>> {
let (mut repo1, mut repo2) = get_repos_for_conflict().await?;
Expand All @@ -2331,6 +2410,10 @@ mod tests {
}

#[tokio::test()]
/// Test conflict detection
///
/// This session: delete array
/// Previous commit: update same array metadata
async fn test_conflict_detection_metadata_edit_of_deleted(
) -> Result<(), Box<dyn Error>> {
let (mut repo1, mut repo2) = get_repos_for_conflict().await?;
Expand All @@ -2349,6 +2432,10 @@ mod tests {
}

#[tokio::test()]
/// Test conflict detection
///
/// This session: uptade user attributes
/// Previous commit: update user attributes
async fn test_conflict_detection_double_user_atts_edit() -> Result<(), Box<dyn Error>>
{
let (mut repo1, mut repo2) = get_repos_for_conflict().await?;
Expand Down Expand Up @@ -2378,6 +2465,10 @@ mod tests {
}

#[tokio::test()]
/// Test conflict detection
///
/// This session: uptade user attributes
/// Previous commit: delete same array
async fn test_conflict_detection_user_atts_edit_of_deleted(
) -> Result<(), Box<dyn Error>> {
let (mut repo1, mut repo2) = get_repos_for_conflict().await?;
Expand All @@ -2401,6 +2492,10 @@ mod tests {
}

#[tokio::test()]
/// Test conflict detection
///
/// This session: delete array
/// Previous commit: update same array metadata
async fn test_conflict_detection_delete_when_array_metadata_updated(
) -> Result<(), Box<dyn Error>> {
let (mut repo1, mut repo2) = get_repos_for_conflict().await?;
Expand All @@ -2419,6 +2514,10 @@ mod tests {
}

#[tokio::test()]
/// Test conflict detection
///
/// This session: delete array
/// Previous commit: update same array user attributes
async fn test_conflict_detection_delete_when_array_user_atts_updated(
) -> Result<(), Box<dyn Error>> {
let (mut repo1, mut repo2) = get_repos_for_conflict().await?;
Expand All @@ -2442,6 +2541,10 @@ mod tests {
}

#[tokio::test()]
/// Test conflict detection
///
/// This session: delete array
/// Previous commit: update same array chunks
async fn test_conflict_detection_delete_when_chunks_updated(
) -> Result<(), Box<dyn Error>> {
let (mut repo1, mut repo2) = get_repos_for_conflict().await?;
Expand All @@ -2466,6 +2569,10 @@ mod tests {
}

#[tokio::test()]
/// Test conflict detection
///
/// This session: delete group
/// Previous commit: update same group user attributes
async fn test_conflict_detection_delete_when_group_user_atts_updated(
) -> Result<(), Box<dyn Error>> {
let (mut repo1, mut repo2) = get_repos_for_conflict().await?;
Expand Down Expand Up @@ -2510,6 +2617,9 @@ mod tests {
repo.add_array(new_array_path.clone(), zarr_meta.clone()).await?;
repo.commit(Ref::DEFAULT_BRANCH, "create array", None).await?;

// one writer sets chunks
// other writer sets the same chunks, generating a conflict

let mut repo1 =
Repository::from_branch_tip(Arc::clone(&storage), "main").await?.build();
let mut repo2 =
Expand Down Expand Up @@ -2539,10 +2649,14 @@ mod tests {
Some(ChunkPayload::Inline("hello".into())),
)
.await?;

// verify we cannot commit
if let Err(RepositoryError::Conflict { .. }) =
repo2.commit("main", "write one chunk with repo2", None).await
{
// detect conflicts using rebase
let result = repo2.rebase(&ConflictDetector, "main").await;
// assert the conflict is double chunk update
assert!(matches!(
result,
Err(RepositoryError::RebaseFailed { snapshot, conflicts, })
Expand Down Expand Up @@ -2762,6 +2876,10 @@ mod tests {
}

#[tokio::test]
/// Test conflict resolution with rebase
///
/// Two sessions write user attributes to the same array
/// We attempt to recover using [`VersionSelection::UseOurs`] policy
async fn test_conflict_resolution_double_user_atts_edit_with_ours(
) -> Result<(), Box<dyn Error>> {
let (mut repo1, mut repo2) = get_repos_for_conflict().await?;
Expand Down Expand Up @@ -2802,6 +2920,10 @@ mod tests {
}

#[tokio::test]
/// Test conflict resolution with rebase
///
/// Two sessions write user attributes to the same array
/// We attempt to recover using [`VersionSelection::UseTheirs`] policy
async fn test_conflict_resolution_double_user_atts_edit_with_theirs(
) -> Result<(), Box<dyn Error>> {
let (mut repo1, mut repo2) = get_repos_for_conflict().await?;
Expand Down Expand Up @@ -2848,6 +2970,11 @@ mod tests {
}

#[tokio::test]
/// Test conflict resolution with rebase
///
/// One session deletes an array, the other updates its metadata.
/// We attempt to recover using the default [`BasicConflictSolver`]
/// Array should still be deleted
async fn test_conflict_resolution_delete_of_updated_array(
) -> Result<(), Box<dyn Error>> {
let (mut repo1, mut repo2) = get_repos_for_conflict().await?;
Expand All @@ -2871,11 +2998,17 @@ mod tests {
}

#[tokio::test]
/// Test conflict resolution with rebase
///
/// Verify we can rebase over multiple commits if they are all fast-forwardable.
/// We have multiple commits with chunk writes, and then a session has to rebase
/// writing to the same chunks.
async fn test_conflict_resolution_success_through_multiple_commits(
) -> Result<(), Box<dyn Error>> {
let (mut repo1, mut repo2) = get_repos_for_conflict().await?;

let path: Path = "/foo/bar/some-array".try_into().unwrap();
// write chunks with repo 1
for coord in [0u32, 1, 2] {
repo1
.set_chunk_ref(
Expand All @@ -2893,6 +3026,7 @@ mod tests {
.await?;
}

// write the same chunks with repo 2
for coord in [0u32, 1, 2] {
repo2
.set_chunk_ref(
Expand Down Expand Up @@ -2923,6 +3057,9 @@ mod tests {
}

#[tokio::test]
/// Rebase over multiple commits with partial failure
///
/// We verify that we can partially fast forward, stopping at the first unrecoverable commit
async fn test_conflict_resolution_failure_in_multiple_commits(
) -> Result<(), Box<dyn Error>> {
let (mut repo1, mut repo2) = get_repos_for_conflict().await?;
Expand Down Expand Up @@ -2957,12 +3094,13 @@ mod tests {
.await?;

repo2.commit(Ref::DEFAULT_BRANCH, "update chunk ref", None).await.unwrap_err();
// we setup a [`ConflictSolver`]` that can recover from the first but not the second
// conflict
let solver = BasicConflictSolver {
on_chunk_conflict: VersionSelection::Fail,
..Default::default()
};

dbg!(repo2.snapshot_id());
let err = repo2.rebase(&solver, "main").await.unwrap_err();

assert!(matches!(
Expand All @@ -2974,9 +3112,6 @@ mod tests {
if path == path && chunk_coordinates == &[ChunkIndices(vec![0])].into())
));

dbg!(repo2.snapshot_id());
dbg!(&non_conflicting_snap);
dbg!(&conflicting_snap);
// we were able to rebase one commit but not the second one,
// so now the parent is the first commit
assert_eq!(repo2.snapshot_id(), &non_conflicting_snap);
Expand Down

0 comments on commit 94e9c64

Please sign in to comment.