Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: correctly recognize existing delta tables using the transaction log #3005

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ pub trait LogStore: Sync + Send {
let mut stream = object_store.list(Some(self.log_path()));
if let Some(res) = stream.next().await {
match res {
Ok(meta) => Ok(meta.location.is_commit_file()),
Ok(meta) => {
Ok(meta.location.is_commit_file() || meta.location.is_checkpoint_file())
}
Err(ObjectStoreError::NotFound { .. }) => Ok(false),
Err(err) => Err(err)?,
}
Expand Down Expand Up @@ -590,6 +592,66 @@ mod tests {
.await
.expect("Failed to look at table"));
}

#[tokio::test]
async fn test_is_location_a_table_commit() {
use object_store::path::Path;
use object_store::{PutOptions, PutPayload};
let location = Url::parse("memory://table").unwrap();
let store =
logstore_for(location, HashMap::default(), None).expect("Failed to get logstore");
assert!(!store
.is_delta_table_location()
.await
.expect("Failed to identify table"));

// Save a commit to the transaction log
let payload = PutPayload::from_static(b"test");
let _put = store
.object_store()
.put_opts(
&Path::from("_delta_log/0.json"),
payload,
PutOptions::default(),
)
.await
.expect("Failed to put");
// The table should be considered a delta table
assert!(store
.is_delta_table_location()
.await
.expect("Failed to identify table"));
}

#[tokio::test]
async fn test_is_location_a_table_checkpoint() {
use object_store::path::Path;
use object_store::{PutOptions, PutPayload};
let location = Url::parse("memory://table").unwrap();
let store =
logstore_for(location, HashMap::default(), None).expect("Failed to get logstore");
assert!(!store
.is_delta_table_location()
.await
.expect("Failed to identify table"));

// Save a "checkpoint" file to the transaction log directory
let payload = PutPayload::from_static(b"test");
let _put = store
.object_store()
.put_opts(
&Path::from("_delta_log/0.checkpoint.parquet"),
payload,
PutOptions::default(),
)
.await
.expect("Failed to put");
// The table should be considered a delta table
assert!(store
.is_delta_table_location()
.await
.expect("Failed to identify table"));
}
}

#[cfg(feature = "datafusion")]
Expand Down
Loading