diff --git a/crates/aws/src/constants.rs b/crates/aws/src/constants.rs
index 90c23ff572..cfc5559518 100644
--- a/crates/aws/src/constants.rs
+++ b/crates/aws/src/constants.rs
@@ -8,6 +8,7 @@ use std::time::Duration;
/// Custom S3 endpoint.
pub const AWS_ENDPOINT_URL: &str = "AWS_ENDPOINT_URL";
/// Custom DynamoDB endpoint.
+///
/// If DynamoDB endpoint is not supplied, will use S3 endpoint (AWS_ENDPOINT_URL)
/// If it is supplied, this endpoint takes precedence over the global endpoint set in AWS_ENDPOINT_URL for DynamoDB
pub const AWS_ENDPOINT_URL_DYNAMODB: &str = "AWS_ENDPOINT_URL_DYNAMODB";
@@ -41,7 +42,9 @@ pub const AWS_IAM_ROLE_SESSION_NAME: &str = "AWS_IAM_ROLE_SESSION_NAME";
note = "Please use AWS_IAM_ROLE_SESSION_NAME instead"
)]
pub const AWS_S3_ROLE_SESSION_NAME: &str = "AWS_S3_ROLE_SESSION_NAME";
-/// The `pool_idle_timeout` option of aws http client. Has to be lower than 20 seconds, which is
+/// The `pool_idle_timeout` option of aws http client.
+///
+/// Has to be lower than 20 seconds, which is
/// default S3 server timeout .
/// However, since rusoto uses hyper as a client, its default timeout is 90 seconds
/// .
@@ -55,16 +58,19 @@ pub const AWS_STS_POOL_IDLE_TIMEOUT_SECONDS: &str = "AWS_STS_POOL_IDLE_TIMEOUT_S
pub const AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES: &str =
"AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES";
/// The web identity token file to use when using a web identity provider.
+///
/// NOTE: web identity related options are set in the environment when
/// creating an instance of [crate::storage::s3::S3StorageOptions].
/// See also .
pub const AWS_WEB_IDENTITY_TOKEN_FILE: &str = "AWS_WEB_IDENTITY_TOKEN_FILE";
/// The role name to use for web identity.
+///
/// NOTE: web identity related options are set in the environment when
/// creating an instance of [crate::storage::s3::S3StorageOptions].
/// See also .
pub const AWS_ROLE_ARN: &str = "AWS_ROLE_ARN";
/// The role session name to use for web identity.
+///
/// NOTE: web identity related options are set in the environment when
/// creating an instance of [crate::storage::s3::S3StorageOptions].
/// See also .
@@ -99,6 +105,8 @@ pub const S3_OPTS: &[&str] = &[
AWS_SECRET_ACCESS_KEY,
AWS_SESSION_TOKEN,
AWS_S3_LOCKING_PROVIDER,
+ AWS_IAM_ROLE_ARN,
+ AWS_IAM_ROLE_SESSION_NAME,
AWS_S3_ASSUME_ROLE_ARN,
AWS_S3_ROLE_SESSION_NAME,
AWS_WEB_IDENTITY_TOKEN_FILE,
diff --git a/crates/aws/src/credentials.rs b/crates/aws/src/credentials.rs
index 43c9d88a31..27f4491923 100644
--- a/crates/aws/src/credentials.rs
+++ b/crates/aws/src/credentials.rs
@@ -21,7 +21,7 @@ use deltalake_core::storage::StorageOptions;
use deltalake_core::DeltaResult;
use tracing::log::*;
-use crate::constants::{self, AWS_ENDPOINT_URL};
+use crate::constants;
/// An [object_store::CredentialProvider] which handles converting a populated [SdkConfig]
/// into a necessary [AwsCredential] type for configuring [object_store::aws::AmazonS3]
@@ -183,19 +183,26 @@ fn assume_role_arn(options: &StorageOptions) -> Option {
options
.0
.get(constants::AWS_IAM_ROLE_ARN)
- .or(options.0.get(constants::AWS_S3_ASSUME_ROLE_ARN))
+ .or(
+ #[allow(deprecated)]
+ options.0.get(constants::AWS_S3_ASSUME_ROLE_ARN),
+ )
.or(std::env::var_os(constants::AWS_IAM_ROLE_ARN)
.map(|o| {
o.into_string()
.expect("Failed to unwrap AWS_IAM_ROLE_ARN which may have invalid data")
})
.as_ref())
- .or(std::env::var_os(constants::AWS_S3_ASSUME_ROLE_ARN)
- .map(|o| {
- o.into_string()
- .expect("Failed to unwrap AWS_S3_ASSUME_ROLE_ARN which may have invalid data")
- })
- .as_ref())
+ .or(
+ #[allow(deprecated)]
+ std::env::var_os(constants::AWS_S3_ASSUME_ROLE_ARN)
+ .map(|o| {
+ o.into_string().expect(
+ "Failed to unwrap AWS_S3_ASSUME_ROLE_ARN which may have invalid data",
+ )
+ })
+ .as_ref(),
+ )
.cloned()
}
@@ -204,13 +211,13 @@ fn assume_session_name(options: &StorageOptions) -> String {
let assume_session = options
.0
.get(constants::AWS_IAM_ROLE_SESSION_NAME)
- .or(options.0.get(constants::AWS_S3_ROLE_SESSION_NAME))
+ .or(
+ #[allow(deprecated)]
+ options.0.get(constants::AWS_S3_ROLE_SESSION_NAME),
+ )
.cloned();
- match assume_session {
- Some(s) => s,
- None => assume_role_sessio_name(),
- }
+ assume_session.unwrap_or_else(assume_role_sessio_name)
}
/// Take a set of [StorageOptions] and produce an appropriate AWS SDK [SdkConfig]
diff --git a/crates/aws/src/lib.rs b/crates/aws/src/lib.rs
index ddb768bdd9..2f2bc1b472 100644
--- a/crates/aws/src/lib.rs
+++ b/crates/aws/src/lib.rs
@@ -57,7 +57,7 @@ impl LogStoreFactory for S3LogStoreFactory {
// With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent
if options.0.keys().any(|key| {
let key = key.to_ascii_lowercase();
- vec![
+ [
AmazonS3ConfigKey::ConditionalPut.as_ref(),
"conditional_put",
]
@@ -69,7 +69,7 @@ impl LogStoreFactory for S3LogStoreFactory {
if options.0.keys().any(|key| {
let key = key.to_ascii_lowercase();
- vec![
+ [
AmazonS3ConfigKey::CopyIfNotExists.as_ref(),
"copy_if_not_exists",
]
@@ -306,9 +306,11 @@ impl DynamoDbLockClient {
.send()
.await
},
- |err| match err.as_service_error() {
- Some(GetItemError::ProvisionedThroughputExceededException(_)) => true,
- _ => false,
+ |err| {
+ matches!(
+ err.as_service_error(),
+ Some(GetItemError::ProvisionedThroughputExceededException(_))
+ )
},
)
.await
@@ -340,9 +342,11 @@ impl DynamoDbLockClient {
.await?;
Ok(())
},
- |err: &SdkError<_, _>| match err.as_service_error() {
- Some(PutItemError::ProvisionedThroughputExceededException(_)) => true,
- _ => false,
+ |err: &SdkError<_, _>| {
+ matches!(
+ err.as_service_error(),
+ Some(PutItemError::ProvisionedThroughputExceededException(_))
+ )
},
)
.await
@@ -395,9 +399,11 @@ impl DynamoDbLockClient {
.send()
.await
},
- |err: &SdkError<_, _>| match err.as_service_error() {
- Some(QueryError::ProvisionedThroughputExceededException(_)) => true,
- _ => false,
+ |err: &SdkError<_, _>| {
+ matches!(
+ err.as_service_error(),
+ Some(QueryError::ProvisionedThroughputExceededException(_))
+ )
},
)
.await
@@ -446,9 +452,11 @@ impl DynamoDbLockClient {
.await?;
Ok(())
},
- |err: &SdkError<_, _>| match err.as_service_error() {
- Some(UpdateItemError::ProvisionedThroughputExceededException(_)) => true,
- _ => false,
+ |err: &SdkError<_, _>| {
+ matches!(
+ err.as_service_error(),
+ Some(UpdateItemError::ProvisionedThroughputExceededException(_))
+ )
},
)
.await;
@@ -488,9 +496,11 @@ impl DynamoDbLockClient {
.await?;
Ok(())
},
- |err: &SdkError<_, _>| match err.as_service_error() {
- Some(DeleteItemError::ProvisionedThroughputExceededException(_)) => true,
- _ => false,
+ |err: &SdkError<_, _>| {
+ matches!(
+ err.as_service_error(),
+ Some(DeleteItemError::ProvisionedThroughputExceededException(_))
+ )
},
)
.await
diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs
index a6735b1c0f..bfa44c3eac 100644
--- a/crates/aws/src/storage.rs
+++ b/crates/aws/src/storage.rs
@@ -4,9 +4,8 @@ use aws_config::{Region, SdkConfig};
use bytes::Bytes;
use deltalake_core::storage::object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey};
use deltalake_core::storage::object_store::{
- parse_url_opts, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
- ObjectStoreScheme, PutMultipartOpts, PutOptions, PutPayload, PutResult,
- Result as ObjectStoreResult,
+ GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, ObjectStoreScheme,
+ PutMultipartOpts, PutOptions, PutPayload, PutResult, Result as ObjectStoreResult,
};
use deltalake_core::storage::{
limit_store_handler, str_is_truthy, ObjectStoreFactory, ObjectStoreRef, StorageOptions,
@@ -14,7 +13,6 @@ use deltalake_core::storage::{
use deltalake_core::{DeltaResult, DeltaTableError, ObjectStoreError, Path};
use futures::stream::BoxStream;
use futures::Future;
-use object_store::aws::S3CopyIfNotExists;
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Range;
@@ -135,27 +133,21 @@ fn aws_storage_handler(
// Determine whether this crate is being configured for use with native AWS S3 or an S3-alike
//
-// This function will rteturn true in the default case since it's most likely that the absence of
+// This function will return true in the default case since it's most likely that the absence of
// options will mean default/S3 configuration
fn is_aws(options: &StorageOptions) -> bool {
- if options
- .0
- .contains_key(crate::constants::AWS_FORCE_CREDENTIAL_LOAD)
- {
+ if options.0.contains_key(constants::AWS_FORCE_CREDENTIAL_LOAD) {
return true;
}
- if options
- .0
- .contains_key(crate::constants::AWS_S3_LOCKING_PROVIDER)
- {
+ if options.0.contains_key(constants::AWS_S3_LOCKING_PROVIDER) {
return true;
}
- !options.0.contains_key(crate::constants::AWS_ENDPOINT_URL)
+ !options.0.contains_key(constants::AWS_ENDPOINT_URL)
}
/// Options used to configure the [S3StorageBackend].
///
-/// Available options are described in [s3_constants].
+/// Available options are described in [constants].
#[derive(Clone, Debug)]
#[allow(missing_docs)]
pub struct S3StorageOptions {
@@ -190,7 +182,7 @@ impl S3StorageOptions {
pub fn from_map(options: &HashMap) -> DeltaResult {
let extra_opts: HashMap = options
.iter()
- .filter(|(k, _)| !s3_constants::S3_OPTS.contains(&k.as_str()))
+ .filter(|(k, _)| !constants::S3_OPTS.contains(&k.as_str()))
.map(|(k, v)| (k.to_owned(), v.to_owned()))
.collect();
// Copy web identity values provided in options but not the environment into the environment
@@ -215,7 +207,7 @@ impl S3StorageOptions {
) as usize;
let virtual_hosted_style_request: bool =
- str_option(options, s3_constants::AWS_S3_ADDRESSING_STYLE)
+ str_option(options, constants::AWS_S3_ADDRESSING_STYLE)
.map(|addressing_style| addressing_style == "virtual")
.unwrap_or(false);
@@ -250,12 +242,12 @@ impl S3StorageOptions {
/// Return the configured endpoint URL for S3 operations
pub fn endpoint_url(&self) -> Option<&str> {
- self.sdk_config.as_ref().map(|v| v.endpoint_url()).flatten()
+ self.sdk_config.as_ref().and_then(|v| v.endpoint_url())
}
/// Return the configured region used for S3 operations
pub fn region(&self) -> Option<&Region> {
- self.sdk_config.as_ref().map(|v| v.region()).flatten()
+ self.sdk_config.as_ref().and_then(|v| v.region())
}
fn u64_or_default(map: &HashMap, key: &str, default: u64) -> u64 {
@@ -332,7 +324,7 @@ impl std::fmt::Display for S3StorageBackend {
impl S3StorageBackend {
/// Creates a new S3StorageBackend.
///
- /// Options are described in [s3_constants].
+ /// Options are described in [constants].
pub fn try_new(storage: ObjectStoreRef, allow_unsafe_rename: bool) -> ObjectStoreResult {
Ok(Self {
inner: storage,
@@ -341,7 +333,7 @@ impl S3StorageBackend {
}
}
-impl std::fmt::Debug for S3StorageBackend {
+impl Debug for S3StorageBackend {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
write!(
fmt,
@@ -366,6 +358,18 @@ impl ObjectStore for S3StorageBackend {
self.inner.put_opts(location, bytes, options).await
}
+ async fn put_multipart(&self, location: &Path) -> ObjectStoreResult> {
+ self.inner.put_multipart(location).await
+ }
+
+ async fn put_multipart_opts(
+ &self,
+ location: &Path,
+ options: PutMultipartOpts,
+ ) -> ObjectStoreResult> {
+ self.inner.put_multipart_opts(location, options).await
+ }
+
async fn get(&self, location: &Path) -> ObjectStoreResult {
self.inner.get(location).await
}
@@ -420,21 +424,10 @@ impl ObjectStore for S3StorageBackend {
})
}
}
-
- async fn put_multipart(&self, location: &Path) -> ObjectStoreResult> {
- self.inner.put_multipart(location).await
- }
-
- async fn put_multipart_opts(
- &self,
- location: &Path,
- options: PutMultipartOpts,
- ) -> ObjectStoreResult> {
- self.inner.put_multipart_opts(location, options).await
- }
}
/// Storage option keys to use when creating [crate::storage::s3::S3StorageOptions].
+///
/// The same key should be used whether passing a key in the hashmap or setting it as an environment variable.
/// Provided keys may include configuration for the S3 backend and also the optional DynamoDb lock used for atomic rename.
#[deprecated(
@@ -479,15 +472,6 @@ mod tests {
let _env_scope = Self::new();
f()
}
-
- pub async fn run_async(future: F) -> F::Output
- where
- F: Future + Send + 'static,
- F::Output: Send + 'static,
- {
- let _env_scope = Self::new();
- future.await
- }
}
impl Drop for ScopedEnv {
@@ -537,6 +521,11 @@ mod tests {
std::env::set_var(constants::AWS_ACCESS_KEY_ID, "default_key_id");
std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "default_secret_key");
std::env::set_var(constants::AWS_S3_LOCKING_PROVIDER, "dynamodb");
+ std::env::set_var(
+ constants::AWS_IAM_ROLE_ARN,
+ "arn:aws:iam::123456789012:role/some_role",
+ );
+ std::env::set_var(constants::AWS_IAM_ROLE_SESSION_NAME, "session_name");
std::env::set_var(
constants::AWS_S3_ASSUME_ROLE_ARN,
"arn:aws:iam::123456789012:role/some_role",
@@ -572,11 +561,11 @@ mod tests {
fn storage_options_with_only_region_and_credentials() {
ScopedEnv::run(|| {
clear_env_of_aws_keys();
- std::env::remove_var(s3_constants::AWS_ENDPOINT_URL);
+ std::env::remove_var(constants::AWS_ENDPOINT_URL);
let options = S3StorageOptions::from_map(&hashmap! {
- s3_constants::AWS_REGION.to_string() => "eu-west-1".to_string(),
- s3_constants::AWS_ACCESS_KEY_ID.to_string() => "test".to_string(),
- s3_constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(),
+ constants::AWS_REGION.to_string() => "eu-west-1".to_string(),
+ constants::AWS_ACCESS_KEY_ID.to_string() => "test".to_string(),
+ constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(),
})
.unwrap();
@@ -601,8 +590,8 @@ mod tests {
constants::AWS_PROFILE.to_string() => "default".to_string(),
constants::AWS_S3_ADDRESSING_STYLE.to_string() => "virtual".to_string(),
constants::AWS_S3_LOCKING_PROVIDER.to_string() => "another_locking_provider".to_string(),
- constants::AWS_S3_ASSUME_ROLE_ARN.to_string() => "arn:aws:iam::123456789012:role/another_role".to_string(),
- constants::AWS_S3_ROLE_SESSION_NAME.to_string() => "another_session_name".to_string(),
+ constants::AWS_IAM_ROLE_ARN.to_string() => "arn:aws:iam::123456789012:role/another_role".to_string(),
+ constants::AWS_IAM_ROLE_SESSION_NAME.to_string() => "another_session_name".to_string(),
constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string() => "another_token_file".to_string(),
constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "1".to_string(),
constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "2".to_string(),
@@ -641,8 +630,8 @@ mod tests {
constants::AWS_PROFILE.to_string() => "default".to_string(),
constants::AWS_S3_ADDRESSING_STYLE.to_string() => "virtual".to_string(),
constants::AWS_S3_LOCKING_PROVIDER.to_string() => "another_locking_provider".to_string(),
- constants::AWS_S3_ASSUME_ROLE_ARN.to_string() => "arn:aws:iam::123456789012:role/another_role".to_string(),
- constants::AWS_S3_ROLE_SESSION_NAME.to_string() => "another_session_name".to_string(),
+ constants::AWS_IAM_ROLE_ARN.to_string() => "arn:aws:iam::123456789012:role/another_role".to_string(),
+ constants::AWS_IAM_ROLE_SESSION_NAME.to_string() => "another_session_name".to_string(),
constants::AWS_WEB_IDENTITY_TOKEN_FILE.to_string() => "another_token_file".to_string(),
constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "1".to_string(),
constants::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "2".to_string(),
@@ -674,10 +663,10 @@ mod tests {
std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "wrong_secret_key");
std::env::set_var(constants::AWS_S3_LOCKING_PROVIDER, "dynamodb");
std::env::set_var(
- constants::AWS_S3_ASSUME_ROLE_ARN,
+ constants::AWS_IAM_ROLE_ARN,
"arn:aws:iam::123456789012:role/some_role",
);
- std::env::set_var(constants::AWS_S3_ROLE_SESSION_NAME, "session_name");
+ std::env::set_var(constants::AWS_IAM_ROLE_SESSION_NAME, "session_name");
std::env::set_var(constants::AWS_WEB_IDENTITY_TOKEN_FILE, "token_file");
std::env::set_var(constants::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS, "1");
@@ -799,14 +788,14 @@ mod tests {
assert!(is_aws(&options));
let minio: HashMap = hashmap! {
- crate::constants::AWS_ENDPOINT_URL.to_string() => "http://minio:8080".to_string(),
+ constants::AWS_ENDPOINT_URL.to_string() => "http://minio:8080".to_string(),
};
let options = StorageOptions::from(minio);
assert!(!is_aws(&options));
let localstack: HashMap = hashmap! {
- crate::constants::AWS_FORCE_CREDENTIAL_LOAD.to_string() => "true".to_string(),
- crate::constants::AWS_ENDPOINT_URL.to_string() => "http://minio:8080".to_string(),
+ constants::AWS_FORCE_CREDENTIAL_LOAD.to_string() => "true".to_string(),
+ constants::AWS_ENDPOINT_URL.to_string() => "http://minio:8080".to_string(),
};
let options = StorageOptions::from(localstack);
assert!(is_aws(&options));
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index d150b5b0d6..7d5bfbaf10 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -119,7 +119,6 @@ rand = "0.8"
serial_test = "3"
tempfile = "3"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
-utime = "0.3"
[features]
cdf = []
diff --git a/crates/core/src/delta_datafusion/find_files/logical.rs b/crates/core/src/delta_datafusion/find_files/logical.rs
deleted file mode 100644
index 4dd4a3b5da..0000000000
--- a/crates/core/src/delta_datafusion/find_files/logical.rs
+++ /dev/null
@@ -1,107 +0,0 @@
-use std::collections::HashSet;
-use std::hash::{Hash, Hasher};
-
-use datafusion_common::DFSchemaRef;
-use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
-
-use crate::delta_datafusion::find_files::ONLY_FILES_DF_SCHEMA;
-use crate::logstore::LogStoreRef;
-use crate::table::state::DeltaTableState;
-
-#[derive(Debug, Clone)]
-pub struct FindFilesNode {
- id: String,
- predicate: Expr,
- table_state: DeltaTableState,
- log_store: LogStoreRef,
- version: i64,
-}
-
-impl FindFilesNode {
- pub fn new(
- id: String,
- table_state: DeltaTableState,
- log_store: LogStoreRef,
- predicate: Expr,
- ) -> datafusion_common::Result {
- let version = table_state.version();
- Ok(Self {
- id,
- predicate,
- log_store,
- table_state,
-
- version,
- })
- }
-
- pub fn predicate(&self) -> Expr {
- self.predicate.clone()
- }
-
- pub fn state(&self) -> DeltaTableState {
- self.table_state.clone()
- }
-
- pub fn log_store(&self) -> LogStoreRef {
- self.log_store.clone()
- }
-}
-
-impl Eq for FindFilesNode {}
-
-impl PartialEq for FindFilesNode {
- fn eq(&self, other: &Self) -> bool {
- self.id == other.id
- }
-}
-
-impl Hash for FindFilesNode {
- fn hash(&self, state: &mut H) {
- state.write(self.id.as_bytes());
- state.finish();
- }
-}
-
-impl UserDefinedLogicalNodeCore for FindFilesNode {
- fn name(&self) -> &str {
- "FindFiles"
- }
-
- fn inputs(&self) -> Vec<&LogicalPlan> {
- vec![]
- }
-
- fn schema(&self) -> &DFSchemaRef {
- &ONLY_FILES_DF_SCHEMA
- }
-
- fn expressions(&self) -> Vec {
- vec![]
- }
-
- fn prevent_predicate_push_down_columns(&self) -> HashSet {
- HashSet::new()
- }
-
- fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- write!(
- f,
- "FindFiles[id={}, predicate=\"{}\", version={}]",
- &self.id, self.predicate, self.version,
- )
- }
-
- fn from_template(&self, exprs: &[Expr], inputs: &[LogicalPlan]) -> Self {
- self.with_exprs_and_inputs(exprs.to_vec(), inputs.to_vec())
- .unwrap()
- }
-
- fn with_exprs_and_inputs(
- &self,
- _exprs: Vec,
- _inputs: Vec,
- ) -> datafusion_common::Result {
- Ok(self.clone())
- }
-}
diff --git a/crates/core/src/delta_datafusion/find_files/mod.rs b/crates/core/src/delta_datafusion/find_files/mod.rs
deleted file mode 100644
index 0c235242c2..0000000000
--- a/crates/core/src/delta_datafusion/find_files/mod.rs
+++ /dev/null
@@ -1,285 +0,0 @@
-use std::sync::Arc;
-
-use arrow_array::cast::AsArray;
-use arrow_array::types::UInt16Type;
-use arrow_array::RecordBatch;
-use arrow_schema::SchemaBuilder;
-use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef};
-use arrow_select::concat::concat_batches;
-use async_trait::async_trait;
-use datafusion::datasource::MemTable;
-use datafusion::execution::context::{QueryPlanner, SessionState};
-use datafusion::execution::TaskContext;
-use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
-use datafusion::prelude::SessionContext;
-use datafusion_common::{DFSchemaRef, Result, ToDFSchema};
-use datafusion_expr::{col, Expr, LogicalPlan, UserDefinedLogicalNode};
-use datafusion_physical_plan::filter::FilterExec;
-use datafusion_physical_plan::limit::LocalLimitExec;
-use datafusion_physical_plan::ExecutionPlan;
-use lazy_static::lazy_static;
-
-use crate::delta_datafusion::find_files::logical::FindFilesNode;
-use crate::delta_datafusion::find_files::physical::FindFilesExec;
-use crate::delta_datafusion::{
- df_logical_schema, register_store, DeltaScanBuilder, DeltaScanConfigBuilder, PATH_COLUMN,
-};
-use crate::logstore::LogStoreRef;
-use crate::table::state::DeltaTableState;
-use crate::DeltaTableError;
-
-pub mod logical;
-pub mod physical;
-
-lazy_static! {
- static ref ONLY_FILES_SCHEMA: Arc = {
- let mut builder = SchemaBuilder::new();
- builder.push(Field::new(PATH_COLUMN, DataType::Utf8, false));
- Arc::new(builder.finish())
- };
- static ref ONLY_FILES_DF_SCHEMA: DFSchemaRef =
- ONLY_FILES_SCHEMA.clone().to_dfschema_ref().unwrap();
-}
-
-#[derive(Default)]
-struct FindFilesPlannerExtension {}
-
-#[derive(Default)]
-struct FindFilesPlanner {}
-
-#[async_trait]
-impl ExtensionPlanner for FindFilesPlannerExtension {
- async fn plan_extension(
- &self,
- _planner: &dyn PhysicalPlanner,
- node: &dyn UserDefinedLogicalNode,
- _logical_inputs: &[&LogicalPlan],
- _physical_inputs: &[Arc],
- _session_state: &SessionState,
- ) -> Result