Skip to content

Commit

Permalink
valid table can have checkpoint as first log
Browse files Browse the repository at this point in the history
Signed-off-by: stretchadito <[email protected]>
  • Loading branch information
stretchadito authored and rtyler committed Nov 20, 2024
1 parent d4f18b3 commit 10d686d
Showing 1 changed file with 63 additions and 1 deletion.
64 changes: 63 additions & 1 deletion crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ pub trait LogStore: Sync + Send {
let mut stream = object_store.list(Some(self.log_path()));
if let Some(res) = stream.next().await {
match res {
Ok(meta) => Ok(meta.location.is_commit_file()),
Ok(meta) => {
Ok(meta.location.is_commit_file() || meta.location.is_checkpoint_file())
}
Err(ObjectStoreError::NotFound { .. }) => Ok(false),
Err(err) => Err(err)?,
}
Expand Down Expand Up @@ -590,6 +592,66 @@ mod tests {
.await
.expect("Failed to look at table"));
}

#[tokio::test]
async fn test_is_location_a_table_commit() {
use object_store::path::Path;
use object_store::{PutOptions, PutPayload};
let location = Url::parse("memory://table").unwrap();
let store =
logstore_for(location, HashMap::default(), None).expect("Failed to get logstore");
assert!(!store
.is_delta_table_location()
.await
.expect("Failed to identify table"));

// Save a commit to the transaction log
let payload = PutPayload::from_static(b"test");
let _put = store
.object_store()
.put_opts(
&Path::from("_delta_log/0.json"),
payload,
PutOptions::default(),
)
.await
.expect("Failed to put");
// The table should be considered a delta table
assert!(store
.is_delta_table_location()
.await
.expect("Failed to identify table"));
}

#[tokio::test]
async fn test_is_location_a_table_checkpoint() {
use object_store::path::Path;
use object_store::{PutOptions, PutPayload};
let location = Url::parse("memory://table").unwrap();
let store =
logstore_for(location, HashMap::default(), None).expect("Failed to get logstore");
assert!(!store
.is_delta_table_location()
.await
.expect("Failed to identify table"));

// Save a "checkpoint" file to the transaction log directory
let payload = PutPayload::from_static(b"test");
let _put = store
.object_store()
.put_opts(
&Path::from("_delta_log/0.checkpoint.parquet"),
payload,
PutOptions::default(),
)
.await
.expect("Failed to put");
// The table should be considered a delta table
assert!(store
.is_delta_table_location()
.await
.expect("Failed to identify table"));
}
}

#[cfg(feature = "datafusion")]
Expand Down

0 comments on commit 10d686d

Please sign in to comment.