Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into hash_join_batch_size
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Nov 7, 2023
2 parents 02651f6 + 0506a5c commit bc26d47
Show file tree
Hide file tree
Showing 19 changed files with 495 additions and 94 deletions.
14 changes: 1 addition & 13 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,7 @@ jobs:
# test datafusion-sql examples
cargo run --example sql
# test datafusion-examples
cargo run --example avro_sql --features=datafusion/avro
cargo run --example csv_sql
cargo run --example custom_datasource
cargo run --example dataframe
cargo run --example dataframe_in_memory
cargo run --example deserialize_to_struct
cargo run --example expr_api
cargo run --example parquet_sql
cargo run --example parquet_sql_multiple_files
cargo run --example memtable
cargo run --example rewrite_expr
cargo run --example simple_udf
cargo run --example simple_udaf
ci/scripts/rust_example.sh
- name: Verify Working Directory Clean
run: git diff --exit-code

Expand Down
35 changes: 35 additions & 0 deletions ci/scripts/rust_example.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

set -ex
cd datafusion-examples/examples/
cargo fmt --all -- --check

files=$(ls .)
for filename in $files
do
example_name=`basename $filename ".rs"`
# Skip tests that rely on external storage and flight
# todo: Currently, catalog.rs is placed in the external-dependence directory because there is a problem parsing
# the parquet file of the external parquet-test that it currently relies on.
# We will wait for this issue[https://github.com/apache/arrow-datafusion/issues/8041] to be resolved.
if [ ! -d $filename ]; then
cargo run --example $example_name
fi
done
8 changes: 5 additions & 3 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,18 @@ cargo run --example csv_sql

- [`avro_sql.rs`](examples/avro_sql.rs): Build and run a query plan from a SQL statement against a local AVRO file
- [`csv_sql.rs`](examples/csv_sql.rs): Build and run a query plan from a SQL statement against a local CSV file
- [`catalog.rs`](examples/external_dependency/catalog.rs): Register the table into a custom catalog
- [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider)
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame against a local parquet file
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3
- [`dataframe_in_memory.rs`](examples/dataframe_in_memory.rs): Run a query using a DataFrame against data in memory
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde
- [`expr_api.rs`](examples/expr_api.rs): Use the `Expr` construction and simplification API
- [`flight_sql_server.rs`](examples/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients
- [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients
- [`memtable.rs`](examples/memtable.rs): Create an query data in memory using SQL and `RecordBatch`es
- [`parquet_sql.rs`](examples/parquet_sql.rs): Build and run a query plan from a SQL statement against a local Parquet file
- [`parquet_sql_multiple_files.rs`](examples/parquet_sql_multiple_files.rs): Build and run a query plan from a SQL statement against multiple local Parquet files
- [`query-aws-s3.rs`](examples/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3
- [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3
- [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP
- [`rewrite_expr.rs`](examples/rewrite_expr.rs): Define and invoke a custom Query Optimizer pass
- [`simple_udaf.rs`](examples/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF)
Expand All @@ -62,4 +64,4 @@ cargo run --example csv_sql

## Distributed

- [`flight_client.rs`](examples/flight_client.rs) and [`flight_server.rs`](examples/flight_server.rs): Run DataFusion as a standalone process and execute SQL queries from a client using the Flight protocol.
- [`flight_client.rs`](examples/flight/flight_client.rs) and [`flight_server.rs`](examples/flight/flight_server.rs): Run DataFusion as a standalone process and execute SQL queries from a client using the Flight protocol.
File renamed without changes.
2 changes: 1 addition & 1 deletion datafusion-examples/examples/simple_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn create_context() -> Result<SessionContext> {

// declare a table in memory. In spark API, this corresponds to createDataFrame(...).
println!("pwd: {}", std::env::current_dir().unwrap().display());
let csv_path = "datafusion/core/tests/data/cars.csv".to_string();
let csv_path = "../../datafusion/core/tests/data/cars.csv".to_string();
let read_options = CsvReadOptions::default().has_header(true);

ctx.register_csv("cars", &csv_path, read_options).await?;
Expand Down
17 changes: 17 additions & 0 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,23 @@ impl SessionContext {
let table_paths = table_paths.to_urls()?;
let session_config = self.copied_config();
let listing_options = options.to_listing_options(&session_config);

let option_extension = listing_options.file_extension.clone();

if table_paths.is_empty() {
return exec_err!("No table paths were provided");
}

// check if the file extension matches the expected extension
for path in &table_paths {
let file_name = path.prefix().filename().unwrap_or_default();
if !path.as_str().ends_with(&option_extension) && file_name.contains('.') {
return exec_err!(
"File '{file_name}' does not match the expected extension '{option_extension}'"
);
}
}

let resolved_schema = options
.get_resolved_schema(&session_config, self.state(), table_paths[0].clone())
.await?;
Expand Down
123 changes: 123 additions & 0 deletions datafusion/core/src/execution/context/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ impl SessionContext {
mod tests {
use async_trait::async_trait;

use crate::arrow::array::{Float32Array, Int32Array};
use crate::arrow::datatypes::{DataType, Field, Schema};
use crate::arrow::record_batch::RecordBatch;
use crate::dataframe::DataFrameWriteOptions;
use crate::parquet::basic::Compression;
use crate::test_util::parquet_test_data;

use super::*;
Expand Down Expand Up @@ -132,6 +137,124 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn read_from_different_file_extension() -> Result<()> {
let ctx = SessionContext::new();

// Make up a new dataframe.
let write_df = ctx.read_batch(RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("purchase_id", DataType::Int32, false),
Field::new("price", DataType::Float32, false),
Field::new("quantity", DataType::Int32, false),
])),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
Arc::new(Float32Array::from(vec![1.12, 3.40, 2.33, 9.10, 6.66])),
Arc::new(Int32Array::from(vec![1, 3, 2, 4, 3])),
],
)?)?;

// Write the dataframe to a parquet file named 'output1.parquet'
write_df
.clone()
.write_parquet(
"output1.parquet",
DataFrameWriteOptions::new().with_single_file_output(true),
Some(
WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build(),
),
)
.await?;

// Write the dataframe to a parquet file named 'output2.parquet.snappy'
write_df
.clone()
.write_parquet(
"output2.parquet.snappy",
DataFrameWriteOptions::new().with_single_file_output(true),
Some(
WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build(),
),
)
.await?;

// Write the dataframe to a parquet file named 'output3.parquet.snappy.parquet'
write_df
.write_parquet(
"output3.parquet.snappy.parquet",
DataFrameWriteOptions::new().with_single_file_output(true),
Some(
WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build(),
),
)
.await?;

// Read the dataframe from 'output1.parquet' with the default file extension.
let read_df = ctx
.read_parquet(
"output1.parquet",
ParquetReadOptions {
..Default::default()
},
)
.await?;

let results = read_df.collect().await?;
let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
assert_eq!(total_rows, 5);

// Read the dataframe from 'output2.parquet.snappy' with the correct file extension.
let read_df = ctx
.read_parquet(
"output2.parquet.snappy",
ParquetReadOptions {
file_extension: "snappy",
..Default::default()
},
)
.await?;
let results = read_df.collect().await?;
let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
assert_eq!(total_rows, 5);

// Read the dataframe from 'output3.parquet.snappy.parquet' with the wrong file extension.
let read_df = ctx
.read_parquet(
"output2.parquet.snappy",
ParquetReadOptions {
..Default::default()
},
)
.await;

assert_eq!(
read_df.unwrap_err().strip_backtrace(),
"Execution error: File 'output2.parquet.snappy' does not match the expected extension '.parquet'"
);

// Read the dataframe from 'output3.parquet.snappy.parquet' with the correct file extension.
let read_df = ctx
.read_parquet(
"output3.parquet.snappy.parquet",
ParquetReadOptions {
..Default::default()
},
)
.await?;

let results = read_df.collect().await?;
let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
assert_eq!(total_rows, 5);
Ok(())
}

// Test for compilation error when calling read_* functions from an #[async_trait] function.
// See https://github.com/apache/arrow-datafusion/issues/1154
#[async_trait]
Expand Down
8 changes: 6 additions & 2 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,7 @@ mod tests {
AggregateExpr, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
};

use datafusion_execution::memory_pool::FairSpillPool;
use futures::{FutureExt, Stream};

// Generate a schema which consists of 5 columns (a, b, c, d, e)
Expand Down Expand Up @@ -1271,8 +1272,11 @@ mod tests {
fn new_spill_ctx(batch_size: usize, max_memory: usize) -> Arc<TaskContext> {
let session_config = SessionConfig::new().with_batch_size(batch_size);
let runtime = Arc::new(
RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(max_memory, 1.0))
.unwrap(),
RuntimeEnv::new(
RuntimeConfig::default()
.with_memory_pool(Arc::new(FairSpillPool::new(max_memory))),
)
.unwrap(),
);
let task_ctx = TaskContext::default()
.with_session_config(session_config)
Expand Down
Loading

0 comments on commit bc26d47

Please sign in to comment.