From 875f4e4ffedcca99b4119815a030157c9caec74a Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Tue, 22 Oct 2024 10:57:24 -0400 Subject: [PATCH 1/3] chore: fix many warnings across the codebase, removed several non-public methods that weren't being used anywhere Signed-off-by: Stephen Carman --- crates/aws/src/constants.rs | 12 +- crates/aws/src/credentials.rs | 33 +- crates/aws/src/lib.rs | 44 +-- crates/aws/src/storage.rs | 98 +++--- crates/core/Cargo.toml | 1 - .../delta_datafusion/find_files/logical.rs | 107 ------- .../src/delta_datafusion/find_files/mod.rs | 285 ------------------ .../delta_datafusion/find_files/physical.rs | 158 ---------- crates/core/src/delta_datafusion/mod.rs | 3 - crates/core/src/kernel/arrow/json.rs | 4 +- .../core/src/kernel/snapshot/log_segment.rs | 5 - crates/core/src/lib.rs | 1 + crates/core/src/storage/mod.rs | 2 + crates/core/src/table/config.rs | 3 +- crates/core/src/table/mod.rs | 2 +- crates/core/src/table/state.rs | 4 +- crates/core/tests/checkpoint_writer.rs | 23 +- crates/core/tests/time_travel.rs | 13 +- crates/gcp/tests/context.rs | 23 +- crates/sql/src/logical_plan.rs | 4 +- 20 files changed, 144 insertions(+), 681 deletions(-) delete mode 100644 crates/core/src/delta_datafusion/find_files/logical.rs delete mode 100644 crates/core/src/delta_datafusion/find_files/mod.rs delete mode 100644 crates/core/src/delta_datafusion/find_files/physical.rs diff --git a/crates/aws/src/constants.rs b/crates/aws/src/constants.rs index 90c23ff572..00b3b3dbac 100644 --- a/crates/aws/src/constants.rs +++ b/crates/aws/src/constants.rs @@ -8,6 +8,7 @@ use std::time::Duration; /// Custom S3 endpoint. pub const AWS_ENDPOINT_URL: &str = "AWS_ENDPOINT_URL"; /// Custom DynamoDB endpoint. +/// /// If DynamoDB endpoint is not supplied, will use S3 endpoint (AWS_ENDPOINT_URL) /// If it is supplied, this endpoint takes precedence over the global endpoint set in AWS_ENDPOINT_URL for DynamoDB pub const AWS_ENDPOINT_URL_DYNAMODB: &str = "AWS_ENDPOINT_URL_DYNAMODB"; @@ -41,7 +42,9 @@ pub const AWS_IAM_ROLE_SESSION_NAME: &str = "AWS_IAM_ROLE_SESSION_NAME"; note = "Please use AWS_IAM_ROLE_SESSION_NAME instead" )] pub const AWS_S3_ROLE_SESSION_NAME: &str = "AWS_S3_ROLE_SESSION_NAME"; -/// The `pool_idle_timeout` option of aws http client. Has to be lower than 20 seconds, which is +/// The `pool_idle_timeout` option of aws http client. +/// +/// Has to be lower than 20 seconds, which is /// default S3 server timeout . /// However, since rusoto uses hyper as a client, its default timeout is 90 seconds /// . @@ -55,16 +58,19 @@ pub const AWS_STS_POOL_IDLE_TIMEOUT_SECONDS: &str = "AWS_STS_POOL_IDLE_TIMEOUT_S pub const AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES: &str = "AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES"; /// The web identity token file to use when using a web identity provider. +/// /// NOTE: web identity related options are set in the environment when /// creating an instance of [crate::storage::s3::S3StorageOptions]. /// See also . pub const AWS_WEB_IDENTITY_TOKEN_FILE: &str = "AWS_WEB_IDENTITY_TOKEN_FILE"; /// The role name to use for web identity. +/// /// NOTE: web identity related options are set in the environment when /// creating an instance of [crate::storage::s3::S3StorageOptions]. /// See also . pub const AWS_ROLE_ARN: &str = "AWS_ROLE_ARN"; /// The role session name to use for web identity. +/// /// NOTE: web identity related options are set in the environment when /// creating an instance of [crate::storage::s3::S3StorageOptions]. /// See also . @@ -99,8 +105,8 @@ pub const S3_OPTS: &[&str] = &[ AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, AWS_S3_LOCKING_PROVIDER, - AWS_S3_ASSUME_ROLE_ARN, - AWS_S3_ROLE_SESSION_NAME, + AWS_IAM_ROLE_ARN, + AWS_IAM_ROLE_SESSION_NAME, AWS_WEB_IDENTITY_TOKEN_FILE, AWS_ROLE_ARN, AWS_ROLE_SESSION_NAME, diff --git a/crates/aws/src/credentials.rs b/crates/aws/src/credentials.rs index 43c9d88a31..27f4491923 100644 --- a/crates/aws/src/credentials.rs +++ b/crates/aws/src/credentials.rs @@ -21,7 +21,7 @@ use deltalake_core::storage::StorageOptions; use deltalake_core::DeltaResult; use tracing::log::*; -use crate::constants::{self, AWS_ENDPOINT_URL}; +use crate::constants; /// An [object_store::CredentialProvider] which handles converting a populated [SdkConfig] /// into a necessary [AwsCredential] type for configuring [object_store::aws::AmazonS3] @@ -183,19 +183,26 @@ fn assume_role_arn(options: &StorageOptions) -> Option { options .0 .get(constants::AWS_IAM_ROLE_ARN) - .or(options.0.get(constants::AWS_S3_ASSUME_ROLE_ARN)) + .or( + #[allow(deprecated)] + options.0.get(constants::AWS_S3_ASSUME_ROLE_ARN), + ) .or(std::env::var_os(constants::AWS_IAM_ROLE_ARN) .map(|o| { o.into_string() .expect("Failed to unwrap AWS_IAM_ROLE_ARN which may have invalid data") }) .as_ref()) - .or(std::env::var_os(constants::AWS_S3_ASSUME_ROLE_ARN) - .map(|o| { - o.into_string() - .expect("Failed to unwrap AWS_S3_ASSUME_ROLE_ARN which may have invalid data") - }) - .as_ref()) + .or( + #[allow(deprecated)] + std::env::var_os(constants::AWS_S3_ASSUME_ROLE_ARN) + .map(|o| { + o.into_string().expect( + "Failed to unwrap AWS_S3_ASSUME_ROLE_ARN which may have invalid data", + ) + }) + .as_ref(), + ) .cloned() } @@ -204,13 +211,13 @@ fn assume_session_name(options: &StorageOptions) -> String { let assume_session = options .0 .get(constants::AWS_IAM_ROLE_SESSION_NAME) - .or(options.0.get(constants::AWS_S3_ROLE_SESSION_NAME)) + .or( + #[allow(deprecated)] + options.0.get(constants::AWS_S3_ROLE_SESSION_NAME), + ) .cloned(); - match assume_session { - Some(s) => s, - None => assume_role_sessio_name(), - } + assume_session.unwrap_or_else(assume_role_sessio_name) } /// Take a set of [StorageOptions] and produce an appropriate AWS SDK [SdkConfig] diff --git a/crates/aws/src/lib.rs b/crates/aws/src/lib.rs index ddb768bdd9..2f2bc1b472 100644 --- a/crates/aws/src/lib.rs +++ b/crates/aws/src/lib.rs @@ -57,7 +57,7 @@ impl LogStoreFactory for S3LogStoreFactory { // With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent if options.0.keys().any(|key| { let key = key.to_ascii_lowercase(); - vec![ + [ AmazonS3ConfigKey::ConditionalPut.as_ref(), "conditional_put", ] @@ -69,7 +69,7 @@ impl LogStoreFactory for S3LogStoreFactory { if options.0.keys().any(|key| { let key = key.to_ascii_lowercase(); - vec![ + [ AmazonS3ConfigKey::CopyIfNotExists.as_ref(), "copy_if_not_exists", ] @@ -306,9 +306,11 @@ impl DynamoDbLockClient { .send() .await }, - |err| match err.as_service_error() { - Some(GetItemError::ProvisionedThroughputExceededException(_)) => true, - _ => false, + |err| { + matches!( + err.as_service_error(), + Some(GetItemError::ProvisionedThroughputExceededException(_)) + ) }, ) .await @@ -340,9 +342,11 @@ impl DynamoDbLockClient { .await?; Ok(()) }, - |err: &SdkError<_, _>| match err.as_service_error() { - Some(PutItemError::ProvisionedThroughputExceededException(_)) => true, - _ => false, + |err: &SdkError<_, _>| { + matches!( + err.as_service_error(), + Some(PutItemError::ProvisionedThroughputExceededException(_)) + ) }, ) .await @@ -395,9 +399,11 @@ impl DynamoDbLockClient { .send() .await }, - |err: &SdkError<_, _>| match err.as_service_error() { - Some(QueryError::ProvisionedThroughputExceededException(_)) => true, - _ => false, + |err: &SdkError<_, _>| { + matches!( + err.as_service_error(), + Some(QueryError::ProvisionedThroughputExceededException(_)) + ) }, ) .await @@ -446,9 +452,11 @@ impl DynamoDbLockClient { .await?; Ok(()) }, - |err: &SdkError<_, _>| match err.as_service_error() { - Some(UpdateItemError::ProvisionedThroughputExceededException(_)) => true, - _ => false, + |err: &SdkError<_, _>| { + matches!( + err.as_service_error(), + Some(UpdateItemError::ProvisionedThroughputExceededException(_)) + ) }, ) .await; @@ -488,9 +496,11 @@ impl DynamoDbLockClient { .await?; Ok(()) }, - |err: &SdkError<_, _>| match err.as_service_error() { - Some(DeleteItemError::ProvisionedThroughputExceededException(_)) => true, - _ => false, + |err: &SdkError<_, _>| { + matches!( + err.as_service_error(), + Some(DeleteItemError::ProvisionedThroughputExceededException(_)) + ) }, ) .await diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index a6735b1c0f..400f9710c8 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -4,9 +4,8 @@ use aws_config::{Region, SdkConfig}; use bytes::Bytes; use deltalake_core::storage::object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey}; use deltalake_core::storage::object_store::{ - parse_url_opts, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, - ObjectStoreScheme, PutMultipartOpts, PutOptions, PutPayload, PutResult, - Result as ObjectStoreResult, + GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, ObjectStoreScheme, + PutMultipartOpts, PutOptions, PutPayload, PutResult, Result as ObjectStoreResult, }; use deltalake_core::storage::{ limit_store_handler, str_is_truthy, ObjectStoreFactory, ObjectStoreRef, StorageOptions, @@ -14,7 +13,6 @@ use deltalake_core::storage::{ use deltalake_core::{DeltaResult, DeltaTableError, ObjectStoreError, Path}; use futures::stream::BoxStream; use futures::Future; -use object_store::aws::S3CopyIfNotExists; use std::collections::HashMap; use std::fmt::Debug; use std::ops::Range; @@ -135,27 +133,21 @@ fn aws_storage_handler( // Determine whether this crate is being configured for use with native AWS S3 or an S3-alike // -// This function will rteturn true in the default case since it's most likely that the absence of +// This function will return true in the default case since it's most likely that the absence of // options will mean default/S3 configuration fn is_aws(options: &StorageOptions) -> bool { - if options - .0 - .contains_key(crate::constants::AWS_FORCE_CREDENTIAL_LOAD) - { + if options.0.contains_key(constants::AWS_FORCE_CREDENTIAL_LOAD) { return true; } - if options - .0 - .contains_key(crate::constants::AWS_S3_LOCKING_PROVIDER) - { + if options.0.contains_key(constants::AWS_S3_LOCKING_PROVIDER) { return true; } - !options.0.contains_key(crate::constants::AWS_ENDPOINT_URL) + !options.0.contains_key(constants::AWS_ENDPOINT_URL) } /// Options used to configure the [S3StorageBackend]. /// -/// Available options are described in [s3_constants]. +/// Available options are described in [constants]. #[derive(Clone, Debug)] #[allow(missing_docs)] pub struct S3StorageOptions { @@ -190,7 +182,7 @@ impl S3StorageOptions { pub fn from_map(options: &HashMap) -> DeltaResult { let extra_opts: HashMap = options .iter() - .filter(|(k, _)| !s3_constants::S3_OPTS.contains(&k.as_str())) + .filter(|(k, _)| !constants::S3_OPTS.contains(&k.as_str())) .map(|(k, v)| (k.to_owned(), v.to_owned())) .collect(); // Copy web identity values provided in options but not the environment into the environment @@ -215,7 +207,7 @@ impl S3StorageOptions { ) as usize; let virtual_hosted_style_request: bool = - str_option(options, s3_constants::AWS_S3_ADDRESSING_STYLE) + str_option(options, constants::AWS_S3_ADDRESSING_STYLE) .map(|addressing_style| addressing_style == "virtual") .unwrap_or(false); @@ -250,12 +242,12 @@ impl S3StorageOptions { /// Return the configured endpoint URL for S3 operations pub fn endpoint_url(&self) -> Option<&str> { - self.sdk_config.as_ref().map(|v| v.endpoint_url()).flatten() + self.sdk_config.as_ref().and_then(|v| v.endpoint_url()) } /// Return the configured region used for S3 operations pub fn region(&self) -> Option<&Region> { - self.sdk_config.as_ref().map(|v| v.region()).flatten() + self.sdk_config.as_ref().and_then(|v| v.region()) } fn u64_or_default(map: &HashMap, key: &str, default: u64) -> u64 { @@ -332,7 +324,7 @@ impl std::fmt::Display for S3StorageBackend { impl S3StorageBackend { /// Creates a new S3StorageBackend. /// - /// Options are described in [s3_constants]. + /// Options are described in [constants]. pub fn try_new(storage: ObjectStoreRef, allow_unsafe_rename: bool) -> ObjectStoreResult { Ok(Self { inner: storage, @@ -341,7 +333,7 @@ impl S3StorageBackend { } } -impl std::fmt::Debug for S3StorageBackend { +impl Debug for S3StorageBackend { fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { write!( fmt, @@ -366,6 +358,18 @@ impl ObjectStore for S3StorageBackend { self.inner.put_opts(location, bytes, options).await } + async fn put_multipart(&self, location: &Path) -> ObjectStoreResult> { + self.inner.put_multipart(location).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + options: PutMultipartOpts, + ) -> ObjectStoreResult> { + self.inner.put_multipart_opts(location, options).await + } + async fn get(&self, location: &Path) -> ObjectStoreResult { self.inner.get(location).await } @@ -420,21 +424,10 @@ impl ObjectStore for S3StorageBackend { }) } } - - async fn put_multipart(&self, location: &Path) -> ObjectStoreResult> { - self.inner.put_multipart(location).await - } - - async fn put_multipart_opts( - &self, - location: &Path, - options: PutMultipartOpts, - ) -> ObjectStoreResult> { - self.inner.put_multipart_opts(location, options).await - } } /// Storage option keys to use when creating [crate::storage::s3::S3StorageOptions]. +/// /// The same key should be used whether passing a key in the hashmap or setting it as an environment variable. /// Provided keys may include configuration for the S3 backend and also the optional DynamoDb lock used for atomic rename. #[deprecated( @@ -479,15 +472,6 @@ mod tests { let _env_scope = Self::new(); f() } - - pub async fn run_async(future: F) -> F::Output - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - let _env_scope = Self::new(); - future.await - } } impl Drop for ScopedEnv { @@ -538,10 +522,10 @@ mod tests { std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "default_secret_key"); std::env::set_var(constants::AWS_S3_LOCKING_PROVIDER, "dynamodb"); std::env::set_var( - constants::AWS_S3_ASSUME_ROLE_ARN, + constants::AWS_IAM_ROLE_ARN, "arn:aws:iam::123456789012:role/some_role", ); - std::env::set_var(constants::AWS_S3_ROLE_SESSION_NAME, "session_name"); + std::env::set_var(constants::AWS_IAM_ROLE_SESSION_NAME, "session_name"); std::env::set_var(constants::AWS_WEB_IDENTITY_TOKEN_FILE, "token_file"); let options = S3StorageOptions::try_default().unwrap(); @@ -572,11 +556,11 @@ mod tests { fn storage_options_with_only_region_and_credentials() { ScopedEnv::run(|| { clear_env_of_aws_keys(); - std::env::remove_var(s3_constants::AWS_ENDPOINT_URL); + std::env::remove_var(constants::AWS_ENDPOINT_URL); let options = S3StorageOptions::from_map(&hashmap! { - s3_constants::AWS_REGION.to_string() => "eu-west-1".to_string(), - s3_constants::AWS_ACCESS_KEY_ID.to_string() => "test".to_string(), - s3_constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(), + constants::AWS_REGION.to_string() => "eu-west-1".to_string(), + constants::AWS_ACCESS_KEY_ID.to_string() => "test".to_string(), + constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(), }) .unwrap(); @@ -601,8 +585,8 @@ mod tests { constants::AWS_PROFILE.to_string() => "default".to_string(), constants::AWS_S3_ADDRESSING_STYLE.to_string() => "virtual".to_string(), constants::AWS_S3_LOCKING_PROVIDER.to_string() => "another_locking_provider".to_string(), - constants::AWS_S3_ASSUME_ROLE_ARN.to_string() => "arn:aws:iam::123456789012:role/another_role".to_string(), - constants::AWS_S3_ROLE_SESSION_NAME.to_string() => "another_session_name".to_string(), + constants::AWS_IAM_ROLE_ARN.to_string() => "arn:aws:iam::123456789012:role/another_role".to_string(), + constants::AWS_IAM_ROLE_SESSION_NAME.to_string() => "another_session_name".to_string(), constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string() => "another_token_file".to_string(), constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "1".to_string(), constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "2".to_string(), @@ -641,8 +625,8 @@ mod tests { constants::AWS_PROFILE.to_string() => "default".to_string(), constants::AWS_S3_ADDRESSING_STYLE.to_string() => "virtual".to_string(), constants::AWS_S3_LOCKING_PROVIDER.to_string() => "another_locking_provider".to_string(), - constants::AWS_S3_ASSUME_ROLE_ARN.to_string() => "arn:aws:iam::123456789012:role/another_role".to_string(), - constants::AWS_S3_ROLE_SESSION_NAME.to_string() => "another_session_name".to_string(), + constants::AWS_IAM_ROLE_ARN.to_string() => "arn:aws:iam::123456789012:role/another_role".to_string(), + constants::AWS_IAM_ROLE_SESSION_NAME.to_string() => "another_session_name".to_string(), constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string() => "another_token_file".to_string(), constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "1".to_string(), constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "2".to_string(), @@ -674,10 +658,10 @@ mod tests { std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "wrong_secret_key"); std::env::set_var(constants::AWS_S3_LOCKING_PROVIDER, "dynamodb"); std::env::set_var( - constants::AWS_S3_ASSUME_ROLE_ARN, + constants::AWS_IAM_ROLE_ARN, "arn:aws:iam::123456789012:role/some_role", ); - std::env::set_var(constants::AWS_S3_ROLE_SESSION_NAME, "session_name"); + std::env::set_var(constants::AWS_IAM_ROLE_SESSION_NAME, "session_name"); std::env::set_var(constants::AWS_WEB_IDENTITY_TOKEN_FILE, "token_file"); std::env::set_var(constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS, "1"); @@ -799,14 +783,14 @@ mod tests { assert!(is_aws(&options)); let minio: HashMap = hashmap! { - crate::constants::AWS_ENDPOINT_URL.to_string() => "http://minio:8080".to_string(), + constants::AWS_ENDPOINT_URL.to_string() => "http://minio:8080".to_string(), }; let options = StorageOptions::from(minio); assert!(!is_aws(&options)); let localstack: HashMap = hashmap! { - crate::constants::AWS_FORCE_CREDENTIAL_LOAD.to_string() => "true".to_string(), - crate::constants::AWS_ENDPOINT_URL.to_string() => "http://minio:8080".to_string(), + constants::AWS_FORCE_CREDENTIAL_LOAD.to_string() => "true".to_string(), + constants::AWS_ENDPOINT_URL.to_string() => "http://minio:8080".to_string(), }; let options = StorageOptions::from(localstack); assert!(is_aws(&options)); diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index d150b5b0d6..7d5bfbaf10 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -119,7 +119,6 @@ rand = "0.8" serial_test = "3" tempfile = "3" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } -utime = "0.3" [features] cdf = [] diff --git a/crates/core/src/delta_datafusion/find_files/logical.rs b/crates/core/src/delta_datafusion/find_files/logical.rs deleted file mode 100644 index 4dd4a3b5da..0000000000 --- a/crates/core/src/delta_datafusion/find_files/logical.rs +++ /dev/null @@ -1,107 +0,0 @@ -use std::collections::HashSet; -use std::hash::{Hash, Hasher}; - -use datafusion_common::DFSchemaRef; -use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; - -use crate::delta_datafusion::find_files::ONLY_FILES_DF_SCHEMA; -use crate::logstore::LogStoreRef; -use crate::table::state::DeltaTableState; - -#[derive(Debug, Clone)] -pub struct FindFilesNode { - id: String, - predicate: Expr, - table_state: DeltaTableState, - log_store: LogStoreRef, - version: i64, -} - -impl FindFilesNode { - pub fn new( - id: String, - table_state: DeltaTableState, - log_store: LogStoreRef, - predicate: Expr, - ) -> datafusion_common::Result { - let version = table_state.version(); - Ok(Self { - id, - predicate, - log_store, - table_state, - - version, - }) - } - - pub fn predicate(&self) -> Expr { - self.predicate.clone() - } - - pub fn state(&self) -> DeltaTableState { - self.table_state.clone() - } - - pub fn log_store(&self) -> LogStoreRef { - self.log_store.clone() - } -} - -impl Eq for FindFilesNode {} - -impl PartialEq for FindFilesNode { - fn eq(&self, other: &Self) -> bool { - self.id == other.id - } -} - -impl Hash for FindFilesNode { - fn hash(&self, state: &mut H) { - state.write(self.id.as_bytes()); - state.finish(); - } -} - -impl UserDefinedLogicalNodeCore for FindFilesNode { - fn name(&self) -> &str { - "FindFiles" - } - - fn inputs(&self) -> Vec<&LogicalPlan> { - vec![] - } - - fn schema(&self) -> &DFSchemaRef { - &ONLY_FILES_DF_SCHEMA - } - - fn expressions(&self) -> Vec { - vec![] - } - - fn prevent_predicate_push_down_columns(&self) -> HashSet { - HashSet::new() - } - - fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!( - f, - "FindFiles[id={}, predicate=\"{}\", version={}]", - &self.id, self.predicate, self.version, - ) - } - - fn from_template(&self, exprs: &[Expr], inputs: &[LogicalPlan]) -> Self { - self.with_exprs_and_inputs(exprs.to_vec(), inputs.to_vec()) - .unwrap() - } - - fn with_exprs_and_inputs( - &self, - _exprs: Vec, - _inputs: Vec, - ) -> datafusion_common::Result { - Ok(self.clone()) - } -} diff --git a/crates/core/src/delta_datafusion/find_files/mod.rs b/crates/core/src/delta_datafusion/find_files/mod.rs deleted file mode 100644 index 0c235242c2..0000000000 --- a/crates/core/src/delta_datafusion/find_files/mod.rs +++ /dev/null @@ -1,285 +0,0 @@ -use std::sync::Arc; - -use arrow_array::cast::AsArray; -use arrow_array::types::UInt16Type; -use arrow_array::RecordBatch; -use arrow_schema::SchemaBuilder; -use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef}; -use arrow_select::concat::concat_batches; -use async_trait::async_trait; -use datafusion::datasource::MemTable; -use datafusion::execution::context::{QueryPlanner, SessionState}; -use datafusion::execution::TaskContext; -use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner}; -use datafusion::prelude::SessionContext; -use datafusion_common::{DFSchemaRef, Result, ToDFSchema}; -use datafusion_expr::{col, Expr, LogicalPlan, UserDefinedLogicalNode}; -use datafusion_physical_plan::filter::FilterExec; -use datafusion_physical_plan::limit::LocalLimitExec; -use datafusion_physical_plan::ExecutionPlan; -use lazy_static::lazy_static; - -use crate::delta_datafusion::find_files::logical::FindFilesNode; -use crate::delta_datafusion::find_files::physical::FindFilesExec; -use crate::delta_datafusion::{ - df_logical_schema, register_store, DeltaScanBuilder, DeltaScanConfigBuilder, PATH_COLUMN, -}; -use crate::logstore::LogStoreRef; -use crate::table::state::DeltaTableState; -use crate::DeltaTableError; - -pub mod logical; -pub mod physical; - -lazy_static! { - static ref ONLY_FILES_SCHEMA: Arc = { - let mut builder = SchemaBuilder::new(); - builder.push(Field::new(PATH_COLUMN, DataType::Utf8, false)); - Arc::new(builder.finish()) - }; - static ref ONLY_FILES_DF_SCHEMA: DFSchemaRef = - ONLY_FILES_SCHEMA.clone().to_dfschema_ref().unwrap(); -} - -#[derive(Default)] -struct FindFilesPlannerExtension {} - -#[derive(Default)] -struct FindFilesPlanner {} - -#[async_trait] -impl ExtensionPlanner for FindFilesPlannerExtension { - async fn plan_extension( - &self, - _planner: &dyn PhysicalPlanner, - node: &dyn UserDefinedLogicalNode, - _logical_inputs: &[&LogicalPlan], - _physical_inputs: &[Arc], - _session_state: &SessionState, - ) -> Result>> { - if let Some(find_files_node) = node.as_any().downcast_ref::() { - return Ok(Some(Arc::new(FindFilesExec::new( - find_files_node.state(), - find_files_node.log_store(), - find_files_node.predicate(), - )?))); - } - Ok(None) - } -} - -#[async_trait] -impl QueryPlanner for FindFilesPlanner { - async fn create_physical_plan( - &self, - logical_plan: &LogicalPlan, - session_state: &SessionState, - ) -> Result> { - let planner = Arc::new(Box::new(DefaultPhysicalPlanner::with_extension_planners( - vec![Arc::new(FindFilesPlannerExtension {})], - ))); - planner - .create_physical_plan(logical_plan, session_state) - .await - } -} - -async fn scan_table_by_partitions(batch: RecordBatch, predicate: Expr) -> Result { - let mut arrays = Vec::new(); - let mut fields = Vec::new(); - - let schema = batch.schema(); - - arrays.push( - batch - .column_by_name("path") - .ok_or(DeltaTableError::Generic( - "Column with name `path` does not exist".to_owned(), - ))? - .to_owned(), - ); - fields.push(Field::new(PATH_COLUMN, DataType::Utf8, false)); - - for field in schema.fields() { - if field.name().starts_with("partition.") { - let name = field.name().strip_prefix("partition.").unwrap(); - - arrays.push(batch.column_by_name(field.name()).unwrap().to_owned()); - fields.push(Field::new( - name, - field.data_type().to_owned(), - field.is_nullable(), - )); - } - } - - let schema = Arc::new(Schema::new(fields)); - let batch = RecordBatch::try_new(schema, arrays)?; - let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?; - - let ctx = SessionContext::new(); - let mut df = ctx.read_table(Arc::new(mem_table))?; - df = df - .filter(predicate.to_owned())? - .select(vec![col(PATH_COLUMN)])?; - let df_schema = df.schema().clone(); - let batches = df.collect().await?; - Ok(concat_batches(&SchemaRef::from(df_schema), &batches)?) -} - -async fn scan_table_by_files( - snapshot: DeltaTableState, - log_store: LogStoreRef, - state: SessionState, - expression: Expr, -) -> Result { - register_store(log_store.clone(), state.runtime_env().clone()); - let scan_config = DeltaScanConfigBuilder::new() - .wrap_partition_values(true) - .with_file_column(true) - .build(&snapshot)?; - - let logical_schema = df_logical_schema(&snapshot, &scan_config.file_column_name, None)?; - - // Identify which columns we need to project - let mut used_columns = expression - .column_refs() - .into_iter() - .map(|column| logical_schema.index_of(&column.name)) - .collect::, ArrowError>>()?; - // Add path column - used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?); - - let scan = DeltaScanBuilder::new(&snapshot, log_store, &state) - .with_filter(Some(expression.clone())) - .with_projection(Some(&used_columns)) - .with_scan_config(scan_config) - .build() - .await?; - - let scan = Arc::new(scan); - let input_schema = scan.logical_schema.as_ref().to_owned(); - let input_dfschema = input_schema.clone().try_into()?; - - let predicate_expr = - state.create_physical_expr(Expr::IsTrue(Box::new(expression.clone())), &input_dfschema)?; - - let filter: Arc = - Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?); - let limit: Arc = Arc::new(LocalLimitExec::new(filter, 1)); - let field_idx = input_schema.index_of(PATH_COLUMN)?; - let task_ctx = Arc::new(TaskContext::from(&state)); - let path_batches: Vec = datafusion::physical_plan::collect(limit, task_ctx) - .await? - .into_iter() - .map(|batch| { - let col = batch - .column(field_idx) - .as_dictionary::() - .values(); - RecordBatch::try_from_iter(vec![(PATH_COLUMN, col.clone())]).unwrap() - }) - .collect(); - - let result_batches = concat_batches(&ONLY_FILES_SCHEMA.clone(), &path_batches)?; - - Ok(result_batches) -} - -#[cfg(test)] -pub mod tests { - use std::sync::Arc; - - use datafusion::execution::session_state::SessionStateBuilder; - use datafusion::prelude::{DataFrame, SessionContext}; - use datafusion_common::{assert_batches_eq, assert_batches_sorted_eq}; - use datafusion_expr::{col, lit, Expr, Extension, LogicalPlan}; - - use crate::delta_datafusion::find_files::logical::FindFilesNode; - use crate::delta_datafusion::find_files::FindFilesPlanner; - use crate::operations::collect_sendable_stream; - use crate::{DeltaResult, DeltaTable, DeltaTableError}; - - pub async fn test_plan<'a>( - table: DeltaTable, - expr: Expr, - ) -> Result, DeltaTableError> { - let ctx = SessionContext::new(); - let state = SessionStateBuilder::new_from_existing(ctx.state()) - .with_query_planner(Arc::new(FindFilesPlanner::default())) - .build(); - let find_files_node = LogicalPlan::Extension(Extension { - node: Arc::new(FindFilesNode::new( - "my_cool_plan".into(), - table.snapshot()?.clone(), - table.log_store().clone(), - expr, - )?), - }); - let df = DataFrame::new(state.clone(), find_files_node); - - let p = state - .clone() - .create_physical_plan(df.logical_plan()) - .await?; - - let e = p.execute(0, state.task_ctx())?; - collect_sendable_stream(e).await.map_err(Into::into) - } - - #[tokio::test] - pub async fn test_find_files_partitioned() -> DeltaResult<()> { - let table = crate::open_table("../test/tests/data/delta-0.8.0-partitioned").await?; - let expr: Expr = col("year").eq(lit(2020)); - let s = test_plan(table, expr).await?; - - assert_batches_eq! { - ["+---------------------------------------------------------------------------------------------+", - "| __delta_rs_path |", - "+---------------------------------------------------------------------------------------------+", - "| year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet |", - "| year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet |", - "| year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet |", - "+---------------------------------------------------------------------------------------------+"], - &s - } - Ok(()) - } - - #[tokio::test] - pub async fn test_find_files_unpartitioned() -> DeltaResult<()> { - let table = crate::open_table("../test/tests/data/simple_table").await?; - let expr: Expr = col("id").in_list(vec![lit(9i64), lit(7i64)], false); - let s = test_plan(table, expr).await?; - - assert_batches_sorted_eq! { - ["+---------------------------------------------------------------------+", - "| __delta_rs_path |", - "+---------------------------------------------------------------------+", - "| part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet |", - "| part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet |", - "+---------------------------------------------------------------------+"], - &s - } - Ok(()) - } - - #[tokio::test] - pub async fn test_find_files_unpartitioned2() -> DeltaResult<()> { - let table = crate::open_table("../test/tests/data/simple_table").await?; - let expr: Expr = col("id").is_not_null(); - let s = test_plan(table, expr).await?; - - assert_batches_sorted_eq! { - ["+---------------------------------------------------------------------+", - "| __delta_rs_path |", - "+---------------------------------------------------------------------+", - "| part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet |", - "| part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet |", - "| part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet |", - "+---------------------------------------------------------------------+"], - &s - } - Ok(()) - } -} diff --git a/crates/core/src/delta_datafusion/find_files/physical.rs b/crates/core/src/delta_datafusion/find_files/physical.rs deleted file mode 100644 index 508d1f672e..0000000000 --- a/crates/core/src/delta_datafusion/find_files/physical.rs +++ /dev/null @@ -1,158 +0,0 @@ -use std::any::Any; -use std::fmt::{Debug, Formatter}; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - -use arrow_array::RecordBatch; -use arrow_schema::SchemaRef; -use datafusion::error::Result; -use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; -use datafusion::prelude::SessionContext; -use datafusion_common::tree_node::TreeNode; -use datafusion_expr::Expr; -use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; -use datafusion_physical_plan::memory::MemoryStream; -use datafusion_physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, -}; -use futures::stream::BoxStream; -use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; - -use crate::delta_datafusion::find_files::{ - scan_table_by_files, scan_table_by_partitions, ONLY_FILES_SCHEMA, -}; -use crate::delta_datafusion::FindFilesExprProperties; -use crate::logstore::LogStoreRef; -use crate::table::state::DeltaTableState; - -pub struct FindFilesExec { - predicate: Expr, - state: DeltaTableState, - log_store: LogStoreRef, - plan_properties: PlanProperties, -} - -impl FindFilesExec { - pub fn new(state: DeltaTableState, log_store: LogStoreRef, predicate: Expr) -> Result { - Ok(Self { - predicate, - log_store, - state, - plan_properties: PlanProperties::new( - EquivalenceProperties::new(ONLY_FILES_SCHEMA.clone()), - Partitioning::RoundRobinBatch(num_cpus::get()), - ExecutionMode::Bounded, - ), - }) - } -} - -struct FindFilesStream<'a> { - mem_stream: BoxStream<'a, Result>, -} - -impl<'a> FindFilesStream<'a> { - pub fn new(mem_stream: BoxStream<'a, Result>) -> Result { - Ok(Self { mem_stream }) - } -} - -impl<'a> RecordBatchStream for FindFilesStream<'a> { - fn schema(&self) -> SchemaRef { - ONLY_FILES_SCHEMA.clone() - } -} - -impl<'a> Stream for FindFilesStream<'a> { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.as_mut().mem_stream.poll_next_unpin(cx) - } -} - -impl Debug for FindFilesExec { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "FindFilesExec[predicate=\"{}\"]", self.predicate) - } -} - -impl DisplayAs for FindFilesExec { - fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { - write!(f, "FindFilesExec[predicate=\"{}\"]", self.predicate) - } -} - -impl ExecutionPlan for FindFilesExec { - fn name(&self) -> &str { - Self::static_name() - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - ONLY_FILES_SCHEMA.clone() - } - - fn properties(&self) -> &PlanProperties { - &self.plan_properties - } - - fn children(&self) -> Vec<&Arc> { - vec![] - } - - fn with_new_children( - self: Arc, - children: Vec>, - ) -> Result> { - if !children.is_empty() { - return Err(datafusion::error::DataFusionError::Plan( - "Children cannot be replaced in FindFilesExec".to_string(), - )); - } - - Ok(self) - } - - fn execute( - &self, - _partition: usize, - _context: Arc, - ) -> Result { - let current_metadata = self.state.metadata(); - let mut expr_properties = FindFilesExprProperties { - partition_only: true, - partition_columns: current_metadata.partition_columns.clone(), - result: Ok(()), - }; - - TreeNode::visit(&self.predicate, &mut expr_properties)?; - expr_properties.result?; - - if expr_properties.partition_only { - let actions_table = self.state.add_actions_table(true)?; - let predicate = self.predicate.clone(); - let schema = actions_table.schema(); - let mem_stream = - MemoryStream::try_new(vec![actions_table.clone()], schema.clone(), None)? - .and_then(move |batch| scan_table_by_partitions(batch, predicate.clone())) - .boxed(); - - Ok(Box::pin(FindFilesStream::new(mem_stream)?)) - } else { - let ctx = SessionContext::new(); - let state = ctx.state(); - let table_state = self.state.clone(); - let predicate = self.predicate.clone(); - let output_files = - scan_table_by_files(table_state, self.log_store.clone(), state, predicate); - - let mem_stream = output_files.into_stream().boxed(); - Ok(Box::pin(FindFilesStream::new(mem_stream)?)) - } - } -} diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 50983d8400..5fba1bd608 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -91,7 +91,6 @@ pub mod logical; pub mod physical; pub mod planner; -mod find_files; mod schema_adapter; impl From for DataFusionError { @@ -1772,7 +1771,6 @@ impl From for DeltaColumn { #[cfg(test)] mod tests { - use crate::operations::create::CreateBuilder; use crate::operations::write::SchemaMode; use crate::writer::test_utils::get_delta_schema; use arrow::array::StructArray; @@ -1785,7 +1783,6 @@ mod tests { use datafusion_expr::lit; use datafusion_proto::physical_plan::AsExecutionPlan; use datafusion_proto::protobuf; - use delta_kernel::schema::StructField; use object_store::path::Path; use serde_json::json; use std::ops::Deref; diff --git a/crates/core/src/kernel/arrow/json.rs b/crates/core/src/kernel/arrow/json.rs index ed31a7b64e..fc085b4381 100644 --- a/crates/core/src/kernel/arrow/json.rs +++ b/crates/core/src/kernel/arrow/json.rs @@ -139,11 +139,11 @@ pub(crate) fn decode_stream> + Unpin>( }) } -/// Decode data prvided by a reader into an iterator of record batches. +/// Decode data provided by a reader into an iterator of record batches. pub(crate) fn decode_reader<'a, R: BufRead + 'a>( decoder: &'a mut Decoder, mut reader: R, -) -> impl Iterator> + '_ { +) -> impl Iterator> + 'a { let mut next = move || { loop { let buf = reader.fill_buf()?; diff --git a/crates/core/src/kernel/snapshot/log_segment.rs b/crates/core/src/kernel/snapshot/log_segment.rs index 596304e003..2dc1d62b31 100644 --- a/crates/core/src/kernel/snapshot/log_segment.rs +++ b/crates/core/src/kernel/snapshot/log_segment.rs @@ -38,7 +38,6 @@ lazy_static! { /// parse the version number from a log file path // TODO handle compaction files pub(crate) trait PathExt { - fn child(&self, path: impl AsRef) -> DeltaResult; /// Returns the last path segment if not terminated with a "/" fn filename(&self) -> Option<&str>; @@ -65,10 +64,6 @@ pub(crate) trait PathExt { } impl PathExt for Path { - fn child(&self, path: impl AsRef) -> DeltaResult { - Ok(self.child(path.as_ref())) - } - fn filename(&self) -> Option<&str> { self.filename() } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 0498e4493c..fef4fce183 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -145,6 +145,7 @@ pub async fn open_table_with_version( } /// Creates a DeltaTable from the given path. +/// /// Loads metadata from the version appropriate based on the given ISO-8601/RFC-3339 timestamp. /// Infers the storage backend to use from the scheme in the given table path. /// diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index 99d3b8c587..d4b3988dbb 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -50,6 +50,7 @@ fn io_rt(config: Option<&RuntimeConfig>) -> &Runtime { RuntimeBuilder::new_current_thread() }; let builder = builder.worker_threads(config.worker_threads); + #[allow(unused_mut)] let mut builder = if config.enable_io && config.enable_time { builder.enable_all() } else if !config.enable_io && config.enable_time { @@ -480,6 +481,7 @@ pub fn limit_store_handler(store: T, options: &StorageOptions) - } /// Storage option keys to use when creating [ObjectStore]. +/// /// The same key should be used whether passing a key in the hashmap or setting it as an environment variable. /// Must be implemented for a given storage provider pub mod storage_constants { diff --git a/crates/core/src/table/config.rs b/crates/core/src/table/config.rs index bc04ec6e91..df28f11aae 100644 --- a/crates/core/src/table/config.rs +++ b/crates/core/src/table/config.rs @@ -10,6 +10,7 @@ use super::Constraint; use crate::errors::DeltaTableError; /// Typed property keys that can be defined on a delta table +/// /// /// #[derive(PartialEq, Eq, Hash)] @@ -213,7 +214,7 @@ pub const DEFAULT_NUM_INDEX_COLS: i32 = 32; /// Default target file size pub const DEFAULT_TARGET_FILE_SIZE: i64 = 104857600; -impl<'a> TableConfig<'a> { +impl TableConfig<'_> { table_config!( ( "true for this Delta table to be append-only", diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 9b73745a20..20d04836ce 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -418,7 +418,7 @@ impl DeltaTable { pub fn get_active_add_actions_by_partitions<'a>( &'a self, filters: &'a [PartitionFilter], - ) -> Result>> + '_, DeltaTableError> { + ) -> Result>>, DeltaTableError> { self.state .as_ref() .ok_or(DeltaTableError::NoMetadata)? diff --git a/crates/core/src/table/state.rs b/crates/core/src/table/state.rs index 0876dc9e79..2a25399c42 100644 --- a/crates/core/src/table/state.rs +++ b/crates/core/src/table/state.rs @@ -183,7 +183,7 @@ impl DeltaTableState { /// Get the table config which is loaded with of the snapshot pub fn load_config(&self) -> &DeltaTableConfig { - &self.snapshot.load_config() + self.snapshot.load_config() } /// Well known table configuration @@ -210,7 +210,7 @@ impl DeltaTableState { pub fn get_active_add_actions_by_partitions<'a>( &'a self, filters: &'a [PartitionFilter], - ) -> Result>> + '_, DeltaTableError> { + ) -> Result>>, DeltaTableError> { let current_metadata = self.metadata(); let nonpartitioned_columns: Vec = filters diff --git a/crates/core/tests/checkpoint_writer.rs b/crates/core/tests/checkpoint_writer.rs index 1be439f9e5..1f1cc701e4 100644 --- a/crates/core/tests/checkpoint_writer.rs +++ b/crates/core/tests/checkpoint_writer.rs @@ -84,9 +84,11 @@ mod simple_checkpoint { mod delete_expired_delta_log_in_checkpoint { use super::*; + use std::fs::{FileTimes, OpenOptions}; + use std::ops::Sub; + use std::time::{Duration, SystemTime}; use ::object_store::path::Path as ObjectStorePath; - use chrono::Utc; use deltalake_core::table::config::TableProperty; use deltalake_core::*; use maplit::hashmap; @@ -103,10 +105,14 @@ mod delete_expired_delta_log_in_checkpoint { .await; let table_path = table.table_uri(); - let set_file_last_modified = |version: usize, last_modified_millis: i64| { - let last_modified_secs = last_modified_millis / 1000; - let path = format!("{}/_delta_log/{:020}.json", &table_path, version); - utime::set_file_times(path, last_modified_secs, last_modified_secs).unwrap(); + let set_file_last_modified = |version: usize, last_modified_millis: u64| { + let path = format!("{}_delta_log/{:020}.json", &table_path, version); + let file = OpenOptions::new().write(true).open(path).unwrap(); + let last_modified = SystemTime::now().sub(Duration::from_millis(last_modified_millis)); + let times = FileTimes::new() + .set_modified(last_modified) + .set_accessed(last_modified); + file.set_times(times).unwrap(); }; // create 2 commits @@ -116,10 +122,9 @@ mod delete_expired_delta_log_in_checkpoint { assert_eq!(2, fs_common::commit_add(&mut table, &a2).await); // set last_modified - let now = Utc::now().timestamp_millis(); - set_file_last_modified(0, now - 25 * 60 * 1000); // 25 mins ago, should be deleted - set_file_last_modified(1, now - 15 * 60 * 1000); // 25 mins ago, should be deleted - set_file_last_modified(2, now - 5 * 60 * 1000); // 25 mins ago, should be kept + set_file_last_modified(0, 25 * 60 * 1000); // 25 mins ago, should be deleted + set_file_last_modified(1, 15 * 60 * 1000); // 25 mins ago, should be deleted + set_file_last_modified(2, 5 * 60 * 1000); // 25 mins ago, should be kept table.load_version(0).await.expect("Cannot load version 0"); table.load_version(1).await.expect("Cannot load version 1"); diff --git a/crates/core/tests/time_travel.rs b/crates/core/tests/time_travel.rs index d0ec869e98..3277a41961 100644 --- a/crates/core/tests/time_travel.rs +++ b/crates/core/tests/time_travel.rs @@ -1,5 +1,7 @@ use chrono::{DateTime, FixedOffset, Utc}; +use std::fs::{FileTimes, OpenOptions}; use std::path::Path; +use std::time::SystemTime; #[tokio::test] async fn time_travel_by_ds() { @@ -13,8 +15,11 @@ async fn time_travel_by_ds() { ("00000000000000000004.json", "2020-05-05T22:47:31-07:00"), ]; for (fname, ds) in log_mtime_pair { - let ts = ds_to_ts(ds); - utime::set_file_times(Path::new(log_dir).join(fname), ts, ts).unwrap(); + let ts: SystemTime = ds_to_ts(ds).into(); + let full_path = Path::new(log_dir).join(fname); + let file = OpenOptions::new().write(true).open(full_path).unwrap(); + let times = FileTimes::new().set_accessed(ts).set_modified(ts); + file.set_times(times).unwrap() } let mut table = deltalake_core::open_table_with_ds( @@ -83,7 +88,7 @@ async fn time_travel_by_ds() { assert_eq!(table.version(), 4); } -fn ds_to_ts(ds: &str) -> i64 { +fn ds_to_ts(ds: &str) -> DateTime { let fixed_dt = DateTime::::parse_from_rfc3339(ds).unwrap(); - DateTime::::from(fixed_dt).timestamp() + DateTime::::from(fixed_dt) } diff --git a/crates/gcp/tests/context.rs b/crates/gcp/tests/context.rs index 5419075f68..4bcc2c1b3b 100644 --- a/crates/gcp/tests/context.rs +++ b/crates/gcp/tests/context.rs @@ -9,6 +9,7 @@ use std::collections::HashMap; use std::process::ExitStatus; use std::sync::Arc; use tempfile::TempDir; +use tracing::info; /// Kinds of storage integration #[derive(Debug)] @@ -64,12 +65,16 @@ pub async fn copy_table( } impl StorageIntegration for GcpIntegration { + fn create_bucket(&self) -> std::io::Result { + gs_cli::create_bucket(self.bucket_name()) + } + fn prepare_env(&self) { gs_cli::prepare_env(); let base_url = std::env::var("GOOGLE_BASE_URL").unwrap(); let token = serde_json::json!({"gcs_base_url": base_url, "disable_oauth": true, "client_email": "", "private_key": "", "private_key_id": ""}); let account_path = self.temp_dir.path().join("gcs.json"); - println!("accoutn_path: {account_path:?}"); + info!("account_path: {account_path:?}"); std::fs::write(&account_path, serde_json::to_vec(&token).unwrap()).unwrap(); std::env::set_var( "GOOGLE_SERVICE_ACCOUNT", @@ -77,10 +82,6 @@ impl StorageIntegration for GcpIntegration { ); } - fn create_bucket(&self) -> std::io::Result { - gs_cli::create_bucket(self.bucket_name()) - } - fn bucket_name(&self) -> String { self.bucket_name.clone() } @@ -113,7 +114,7 @@ pub mod gs_cli { let endpoint = std::env::var("GOOGLE_ENDPOINT_URL") .expect("variable GOOGLE_ENDPOINT_URL must be set to connect to GCS Emulator"); let payload = serde_json::json!({ "name": container_name.as_ref() }); - let mut child = Command::new("curl") + Command::new("curl") .args([ "--insecure", "-v", @@ -126,15 +127,15 @@ pub mod gs_cli { &endpoint, ]) .spawn() - .expect("curl command is installed"); - child.wait() + .expect("curl command is installed") + .wait() } pub fn delete_bucket(container_name: impl AsRef) -> std::io::Result { let endpoint = std::env::var("GOOGLE_ENDPOINT_URL") .expect("variable GOOGLE_ENDPOINT_URL must be set to connect to GCS Emulator"); let payload = serde_json::json!({ "name": container_name.as_ref() }); - let mut child = Command::new("curl") + Command::new("curl") .args([ "--insecure", "-v", @@ -147,8 +148,8 @@ pub mod gs_cli { &endpoint, ]) .spawn() - .expect("curl command is installed"); - child.wait() + .expect("curl command is installed") + .wait() } /// prepare_env diff --git a/crates/sql/src/logical_plan.rs b/crates/sql/src/logical_plan.rs index 6e3c7d5dbc..8ff7b90b9e 100644 --- a/crates/sql/src/logical_plan.rs +++ b/crates/sql/src/logical_plan.rs @@ -24,12 +24,12 @@ impl Debug for DeltaStatement { } impl DeltaStatement { - /// Return a `format`able structure with the a human readable + /// Return a `format`ed structure with a human-readable /// description of this LogicalPlan node per node, not including /// children. pub fn display(&self) -> impl fmt::Display + '_ { struct Wrapper<'a>(&'a DeltaStatement); - impl<'a> Display for Wrapper<'a> { + impl Display for Wrapper<'_> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self.0 { DeltaStatement::Vacuum(Vacuum { From 84cf6231da7f2a8194484ada0e4ad639226cdb6f Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Tue, 22 Oct 2024 11:08:27 -0400 Subject: [PATCH 2/3] chore: fix 1 typo and sign off on my commit Signed-off-by: Stephen Carman --- crates/core/src/kernel/snapshot/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 0df62c867b..dbd6662f2f 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -8,7 +8,7 @@ //! bare minimum - [`Protocol`] and [`Metadata`] - is cached in memory. //! - [`EagerSnapshot`] is a snapshot where much more log data is eagerly loaded into memory. //! -//! The sub modules provide structures and methods that aid in generating +//! The submodules provide structures and methods that aid in generating //! and consuming snapshots. //! //! ## Reading the log From 6974931af0b6c542ac5bab9fd128131abfe5fce4 Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Fri, 25 Oct 2024 10:45:20 -0400 Subject: [PATCH 3/3] chore: revert S3 key changes for deprecated keys Signed-off-by: Stephen Carman --- crates/aws/src/constants.rs | 2 ++ crates/aws/src/storage.rs | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/crates/aws/src/constants.rs b/crates/aws/src/constants.rs index 00b3b3dbac..cfc5559518 100644 --- a/crates/aws/src/constants.rs +++ b/crates/aws/src/constants.rs @@ -107,6 +107,8 @@ pub const S3_OPTS: &[&str] = &[ AWS_S3_LOCKING_PROVIDER, AWS_IAM_ROLE_ARN, AWS_IAM_ROLE_SESSION_NAME, + AWS_S3_ASSUME_ROLE_ARN, + AWS_S3_ROLE_SESSION_NAME, AWS_WEB_IDENTITY_TOKEN_FILE, AWS_ROLE_ARN, AWS_ROLE_SESSION_NAME, diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index 400f9710c8..bfa44c3eac 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -526,6 +526,11 @@ mod tests { "arn:aws:iam::123456789012:role/some_role", ); std::env::set_var(constants::AWS_IAM_ROLE_SESSION_NAME, "session_name"); + std::env::set_var( + constants::AWS_S3_ASSUME_ROLE_ARN, + "arn:aws:iam::123456789012:role/some_role", + ); + std::env::set_var(constants::AWS_S3_ROLE_SESSION_NAME, "session_name"); std::env::set_var(constants::AWS_WEB_IDENTITY_TOKEN_FILE, "token_file"); let options = S3StorageOptions::try_default().unwrap();