Skip to content

Commit

Permalink
Add csv loading benchmarks.
Browse files Browse the repository at this point in the history
  • Loading branch information
dhegberg committed Nov 24, 2024
1 parent 2482ff4 commit 3d6abbc
Show file tree
Hide file tree
Showing 9 changed files with 297 additions and 7 deletions.
12 changes: 12 additions & 0 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ main() {
run_tpch_mem "1"
run_tpch "10"
run_tpch_mem "10"
run_csv
run_parquet
run_sort
run_clickbench_1
Expand All @@ -231,6 +232,9 @@ main() {
tpch_mem10)
run_tpch_mem "10"
;;
csv)
run_csv
;;
parquet)
run_parquet
;;
Expand Down Expand Up @@ -369,6 +373,14 @@ run_parquet() {
$CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --scale-factor 1.0 --iterations 5 -o "${RESULTS_FILE}"
}

# Runs the csv benchmark
run_csv() {
RESULTS_FILE="${RESULTS_DIR}/csv.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running csv benchmark..."
$CARGO_COMMAND --bin csv -- load --path "${DATA_DIR}" --scale-factor 1.0 --iterations 5 -o "${RESULTS_FILE}"
}

# Runs the sort benchmark
run_sort() {
RESULTS_FILE="${RESULTS_DIR}/sort.json"
Expand Down
43 changes: 43 additions & 0 deletions benchmarks/src/bin/csv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::common::Result;

use datafusion_benchmarks::csv;
use structopt::StructOpt;

#[cfg(feature = "snmalloc")]
#[global_allocator]
static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;

#[derive(Debug, Clone, StructOpt)]
#[structopt(name = "Benchmarks", about = "Apache DataFusion Rust Benchmarks.")]
enum CsvBenchCmd {
/// Benchmark for loading csv files
Load(csv::RunOpt),
}

#[tokio::main]
async fn main() -> Result<()> {
let cmd = CsvBenchCmd::from_args();
match cmd {
CsvBenchCmd::Load(opt) => {
println!("Running csv load benchmarks.");
opt.run().await
}
}
}
52 changes: 52 additions & 0 deletions benchmarks/src/csv/data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Benchmark data generation

use datafusion::common::Result;
use datafusion::test_util::csv::TestCsvFile;
use std::path::PathBuf;
use structopt::StructOpt;
use test_utils::AccessLogGenerator;

// Options and builder for making a csv test file
// Note don't use docstring or else it ends up in help
#[derive(Debug, StructOpt, Clone)]
pub struct DataOpt {
/// Path to folder where the csv file will be generated
#[structopt(parse(from_os_str), required = true, short = "p", long = "path")]
path: PathBuf,

/// Total size of generated dataset. The default scale factor of 1.0 will generate a roughly 3GB csv file
#[structopt(long = "scale-factor", default_value = "1.0")]
scale_factor: f32,
}

impl DataOpt {
/// Create the csv and return the file.
///
/// See [`TestCsvFile`] for more details
pub fn build(self) -> Result<TestCsvFile> {
let path = self.path.join("logs.csv");

let generator = AccessLogGenerator::new().with_include_nulls(true);

let num_batches = 100_f32 * self.scale_factor;

TestCsvFile::try_new(path, generator.take(num_batches as usize))
}
}
70 changes: 70 additions & 0 deletions benchmarks/src/csv/load.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::path::PathBuf;

use crate::util::{BenchmarkRun, CommonOpt};
use datafusion::{common::Result, prelude::{CsvReadOptions, SessionContext}};

use datafusion_common::instant::Instant;
use structopt::StructOpt;

use super::data::DataOpt;


#[derive(Debug, StructOpt, Clone)]
#[structopt(verbatim_doc_comment)]
pub struct RunOpt {
/// Common options
#[structopt(flatten)]
common: CommonOpt,

/// Create data files
#[structopt(flatten)]
data: DataOpt,

/// Path to machine readable output file
#[structopt(parse(from_os_str), short = "o", long = "output")]
output_path: Option<PathBuf>,
}

impl RunOpt {
pub async fn run(self) -> Result<()> {
let test_file = self.data.build()?;
let mut rundata = BenchmarkRun::new();

let title = "CSV Load Speed Test.";
println!("Executing '{title}'");
rundata.start_new_case(title);
for i in 0..self.common.iterations {

let start = Instant::now();
let ctx = SessionContext::new();
let data_frame = ctx.read_csv(test_file.path().to_str().unwrap(), CsvReadOptions::default())
.await
.unwrap();
let elapsed = start.elapsed();
let ms = elapsed.as_secs_f64() * 1000.0;
println!("Iteration {i} finished in {ms} ms.");
rundata.write_iter(elapsed, data_frame.count().await.unwrap());
}
if let Some(path) = &self.output_path {
std::fs::write(path, rundata.to_json())?;
}
Ok(())
}
}
22 changes: 22 additions & 0 deletions benchmarks/src/csv/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.


mod load;
pub use load::RunOpt;

mod data;
1 change: 1 addition & 0 deletions benchmarks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! DataFusion benchmark runner
pub mod clickbench;
pub mod csv;
pub mod imdb;
pub mod parquet_filter;
pub mod sort;
Expand Down
70 changes: 70 additions & 0 deletions datafusion/core/src/test_util/csv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Helpers for writing csv files and reading them back

use std::fs::File;
use std::path::PathBuf;

use crate::arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use crate::error::Result;

use arrow::csv::WriterBuilder;

/// a CSV file that has been created for testing.
pub struct TestCsvFile {
path: PathBuf,
schema: SchemaRef,
}

impl TestCsvFile {
/// Creates a new csv file at the specified location
pub fn try_new(
path: PathBuf,
batches: impl IntoIterator<Item = RecordBatch>,
) -> Result<Self> {
let file = File::create(&path).unwrap();
let builder = WriterBuilder::new().with_header(true);
let mut writer = builder.build(file);

let mut batches = batches.into_iter();
let first_batch = batches.next().expect("need at least one record batch");
let schema = first_batch.schema();

let mut num_rows = 0;
for batch in batches {
writer.write(&batch)?;
num_rows += batch.num_rows();
}

println!("Generated test dataset with {num_rows} rows");

Ok(Self {
path,
schema,
})
}

pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}

/// The path to the csv file
pub fn path(&self) -> &std::path::Path {
self.path.as_path()
}
}
2 changes: 2 additions & 0 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#[cfg(feature = "parquet")]
pub mod parquet;

pub mod csv;

use std::any::Any;
use std::collections::HashMap;
use std::fs::File;
Expand Down
32 changes: 25 additions & 7 deletions test-utils/src/data_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct GeneratorOptions {
pods_per_host: Range<usize>,
containers_per_pod: Range<usize>,
entries_per_container: Range<usize>,
include_nulls: bool,
}

impl Default for GeneratorOptions {
Expand All @@ -42,6 +43,7 @@ impl Default for GeneratorOptions {
pods_per_host: 1..15,
containers_per_pod: 1..3,
entries_per_container: 1024..8192,
include_nulls: false,
}
}
}
Expand Down Expand Up @@ -149,13 +151,23 @@ impl BatchBuilder {
self.image.append(image).unwrap();
self.time.append_value(time);

self.client_addr.append_value(format!(
"{}.{}.{}.{}",
rng.gen::<u8>(),
rng.gen::<u8>(),
rng.gen::<u8>(),
rng.gen::<u8>()
));
if self.options.include_nulls {
// Append a null value if the option is set
// Use both "NULL" as a string and a null value
if rng.gen_bool(0.5) {
self.client_addr.append_null();
} else {
self.client_addr.append_value("NULL");
}
} else {
self.client_addr.append_value(format!(
"{}.{}.{}.{}",
rng.gen::<u8>(),
rng.gen::<u8>(),
rng.gen::<u8>(),
rng.gen::<u8>()
));
}
self.request_duration.append_value(rng.gen());
self.request_user_agent
.append_value(random_string(rng, 20..100));
Expand Down Expand Up @@ -317,6 +329,12 @@ impl AccessLogGenerator {
self.options.entries_per_container = range;
self
}

// Set the condition for null values in the generated data
pub fn with_include_nulls(mut self, include_nulls: bool) -> Self {
self.options.include_nulls = include_nulls;
self
}
}

impl Iterator for AccessLogGenerator {
Expand Down

0 comments on commit 3d6abbc

Please sign in to comment.