From f75c7f16806ba8aade542647df06155d324beef1 Mon Sep 17 00:00:00 2001 From: Sebastian Galkin Date: Sat, 23 Nov 2024 23:13:45 -0300 Subject: [PATCH] Document code --- icechunk/src/repository.rs | 147 +++++++++++++++++++++++++++++++++++-- 1 file changed, 141 insertions(+), 6 deletions(-) diff --git a/icechunk/src/repository.rs b/icechunk/src/repository.rs index 9c6bb0bb..1e20a3bb 100644 --- a/icechunk/src/repository.rs +++ b/icechunk/src/repository.rs @@ -182,8 +182,8 @@ pub enum RepositoryError { pub type RepositoryResult = Result; -/// FIXME: what do we want to do with implicit groups? -/// +// FIXME: what do we want to do with implicit groups? +// impl Repository { pub fn update( storage: Arc, @@ -734,6 +734,69 @@ impl Repository { } } + /// Detect and optionally fix conflicts between the current [`ChangeSet`] (or session) and + /// the tip of the branch. + /// + /// When [`Repository::commit`] method is called, the system validates that the tip of the + /// passed branch is exactly the same as the `snapshot_id` for the current session. If that + /// is not the case, the commit operation fails with [`RepositoryError::Conflict`]. + /// + /// In that situation, the user has two options: + /// 1. Abort the session and start a new one with using the new branch tip as a parent. + /// 2. Use [`Repository::rebase`] to try to "fast-forward" the session through the new + /// commits. + /// + /// The issue with option 1 is that all the writes that have been done in the session, + /// including the chunks, will be lost and they need to be written again. But, restarting + /// the session is always the safest option. It's the only way to guarantee that + /// any reads done during the session were actually reading the latest data. + /// + /// User that understands the tradeoffs, can use option 2. This is useful, for example + /// when different "jobs" modify different arrays, or different parts of an array. + /// In situations like that, "merging" the two changes is pretty trivial. But what + /// happens when there are conflicts. For example, what happens when the current session + /// and a new commit both wrote to the same chunk, or both updated user attributes for + /// the same group. + /// + /// This is what [`Repository::rebase`] helps with. It can detect conflicts to let + /// the user fix them manually, or it can attempt to fix conflicts based on a policy. + /// + /// Example: + /// ``` + /// let repo = ... + /// let payload = repo.get_chunk_writer()(Bytes::copy_from_slice(b"foo")).await?; + /// repo.set_chunk_ref(array_path, ChunkIndices(vec![0]), Some(payload)).await?; + /// + /// // the commit fails with a conflict because some other writer commited once or more before us + /// let error = repo.commit("main", "wrote a chunk").await.unwrap_err(); + /// + /// // let's inspect what are the conflicts + /// if let Err(RebaseFailed {conflicts, ..}) = repo2.rebase(&ConflictDetector, "main").await.unwrap_err() { + /// // inspect the list of conflicts and fix them manually + /// // ... + /// + /// // once fixed we can commit again + /// + /// repo.commit("main", "wrote a chunk").await?; + /// } + /// ``` + /// + /// Instead of fixing the conflicts manually, the user can try rebasing with an automated + /// policy, configured to their needs: + /// + /// ``` + /// let solver = BasicConflictSolver { + /// on_chunk_conflict: VersionSelection::UseOurs, + /// ..Default::default() + /// }; + /// repo2.rebase(&solver, "main").await? + /// ``` + /// + /// When there are more than one commit between the parent snapshot and the tip of + /// the branch, `rebase` iterates over all of them, older first, trying to fast-forward. + /// If at some point it finds a conflict it cannot recover from, `rebase` leaves the + /// `Repository` in a consistent state, that would successfully commit on top + /// of the latest successfully fast-forwarded commit. pub async fn rebase( &mut self, solver: &dyn ConflictSolver, @@ -2237,6 +2300,10 @@ mod tests { Ok(()) } + /// Construct two repos on the same storage, with a commit, a group and an array + /// + /// Group: /foo/bar + /// Array: /foo/bar/some-array async fn get_repos_for_conflict() -> Result<(Repository, Repository), Box> { let storage: Arc = @@ -2276,6 +2343,10 @@ mod tests { } #[tokio::test()] + /// Test conflict detection + /// + /// This session: add array + /// Previous commit: add group on same path async fn test_conflict_detection_node_conflict_with_existing_node( ) -> Result<(), Box> { let (mut repo1, mut repo2) = get_repos_for_conflict().await?; @@ -2294,6 +2365,10 @@ mod tests { } #[tokio::test()] + /// Test conflict detection + /// + /// This session: add array + /// Previous commit: add array in implicit path to the session array async fn test_conflict_detection_node_conflict_in_path() -> Result<(), Box> { let (mut repo1, mut repo2) = get_repos_for_conflict().await?; @@ -2313,6 +2388,10 @@ mod tests { } #[tokio::test()] + /// Test conflict detection + /// + /// This session: update array metadata + /// Previous commit: update array metadata async fn test_conflict_detection_double_zarr_metadata_edit( ) -> Result<(), Box> { let (mut repo1, mut repo2) = get_repos_for_conflict().await?; @@ -2331,6 +2410,10 @@ mod tests { } #[tokio::test()] + /// Test conflict detection + /// + /// This session: delete array + /// Previous commit: update same array metadata async fn test_conflict_detection_metadata_edit_of_deleted( ) -> Result<(), Box> { let (mut repo1, mut repo2) = get_repos_for_conflict().await?; @@ -2349,6 +2432,10 @@ mod tests { } #[tokio::test()] + /// Test conflict detection + /// + /// This session: uptade user attributes + /// Previous commit: update user attributes async fn test_conflict_detection_double_user_atts_edit() -> Result<(), Box> { let (mut repo1, mut repo2) = get_repos_for_conflict().await?; @@ -2378,6 +2465,10 @@ mod tests { } #[tokio::test()] + /// Test conflict detection + /// + /// This session: uptade user attributes + /// Previous commit: delete same array async fn test_conflict_detection_user_atts_edit_of_deleted( ) -> Result<(), Box> { let (mut repo1, mut repo2) = get_repos_for_conflict().await?; @@ -2401,6 +2492,10 @@ mod tests { } #[tokio::test()] + /// Test conflict detection + /// + /// This session: delete array + /// Previous commit: update same array metadata async fn test_conflict_detection_delete_when_array_metadata_updated( ) -> Result<(), Box> { let (mut repo1, mut repo2) = get_repos_for_conflict().await?; @@ -2419,6 +2514,10 @@ mod tests { } #[tokio::test()] + /// Test conflict detection + /// + /// This session: delete array + /// Previous commit: update same array user attributes async fn test_conflict_detection_delete_when_array_user_atts_updated( ) -> Result<(), Box> { let (mut repo1, mut repo2) = get_repos_for_conflict().await?; @@ -2442,6 +2541,10 @@ mod tests { } #[tokio::test()] + /// Test conflict detection + /// + /// This session: delete array + /// Previous commit: update same array chunks async fn test_conflict_detection_delete_when_chunks_updated( ) -> Result<(), Box> { let (mut repo1, mut repo2) = get_repos_for_conflict().await?; @@ -2466,6 +2569,10 @@ mod tests { } #[tokio::test()] + /// Test conflict detection + /// + /// This session: delete group + /// Previous commit: update same group user attributes async fn test_conflict_detection_delete_when_group_user_atts_updated( ) -> Result<(), Box> { let (mut repo1, mut repo2) = get_repos_for_conflict().await?; @@ -2510,6 +2617,9 @@ mod tests { repo.add_array(new_array_path.clone(), zarr_meta.clone()).await?; repo.commit(Ref::DEFAULT_BRANCH, "create array", None).await?; + // one writer sets chunks + // other writer sets the same chunks, generating a conflict + let mut repo1 = Repository::from_branch_tip(Arc::clone(&storage), "main").await?.build(); let mut repo2 = @@ -2539,10 +2649,14 @@ mod tests { Some(ChunkPayload::Inline("hello".into())), ) .await?; + + // verify we cannot commit if let Err(RepositoryError::Conflict { .. }) = repo2.commit("main", "write one chunk with repo2", None).await { + // detect conflicts using rebase let result = repo2.rebase(&ConflictDetector, "main").await; + // assert the conflict is double chunk update assert!(matches!( result, Err(RepositoryError::RebaseFailed { snapshot, conflicts, }) @@ -2762,6 +2876,10 @@ mod tests { } #[tokio::test] + /// Test conflict resolution with rebase + /// + /// Two sessions write user attributes to the same array + /// We attempt to recover using [`VersionSelection::UseOurs`] policy async fn test_conflict_resolution_double_user_atts_edit_with_ours( ) -> Result<(), Box> { let (mut repo1, mut repo2) = get_repos_for_conflict().await?; @@ -2802,6 +2920,10 @@ mod tests { } #[tokio::test] + /// Test conflict resolution with rebase + /// + /// Two sessions write user attributes to the same array + /// We attempt to recover using [`VersionSelection::UseTheirs`] policy async fn test_conflict_resolution_double_user_atts_edit_with_theirs( ) -> Result<(), Box> { let (mut repo1, mut repo2) = get_repos_for_conflict().await?; @@ -2848,6 +2970,11 @@ mod tests { } #[tokio::test] + /// Test conflict resolution with rebase + /// + /// One session deletes an array, the other updates its metadata. + /// We attempt to recover using the default [`BasicConflictSolver`] + /// Array should still be deleted async fn test_conflict_resolution_delete_of_updated_array( ) -> Result<(), Box> { let (mut repo1, mut repo2) = get_repos_for_conflict().await?; @@ -2871,11 +2998,17 @@ mod tests { } #[tokio::test] + /// Test conflict resolution with rebase + /// + /// Verify we can rebase over multiple commits if they are all fast-forwardable. + /// We have multiple commits with chunk writes, and then a session has to rebase + /// writing to the same chunks. async fn test_conflict_resolution_success_through_multiple_commits( ) -> Result<(), Box> { let (mut repo1, mut repo2) = get_repos_for_conflict().await?; let path: Path = "/foo/bar/some-array".try_into().unwrap(); + // write chunks with repo 1 for coord in [0u32, 1, 2] { repo1 .set_chunk_ref( @@ -2893,6 +3026,7 @@ mod tests { .await?; } + // write the same chunks with repo 2 for coord in [0u32, 1, 2] { repo2 .set_chunk_ref( @@ -2923,6 +3057,9 @@ mod tests { } #[tokio::test] + /// Rebase over multiple commits with partial failure + /// + /// We verify that we can partially fast forward, stopping at the first unrecoverable commit async fn test_conflict_resolution_failure_in_multiple_commits( ) -> Result<(), Box> { let (mut repo1, mut repo2) = get_repos_for_conflict().await?; @@ -2957,12 +3094,13 @@ mod tests { .await?; repo2.commit(Ref::DEFAULT_BRANCH, "update chunk ref", None).await.unwrap_err(); + // we setup a [`ConflictSolver`]` that can recover from the first but not the second + // conflict let solver = BasicConflictSolver { on_chunk_conflict: VersionSelection::Fail, ..Default::default() }; - dbg!(repo2.snapshot_id()); let err = repo2.rebase(&solver, "main").await.unwrap_err(); assert!(matches!( @@ -2974,9 +3112,6 @@ mod tests { if path == path && chunk_coordinates == &[ChunkIndices(vec![0])].into()) )); - dbg!(repo2.snapshot_id()); - dbg!(&non_conflicting_snap); - dbg!(&conflicting_snap); // we were able to rebase one commit but not the second one, // so now the parent is the first commit assert_eq!(repo2.snapshot_id(), &non_conflicting_snap);