Skip to content

Commit

Permalink
Merge branch 'main' into sketch-blocked-aggr-state-management
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Sep 17, 2024
2 parents 4a48d3a + a08f923 commit 7b61328
Show file tree
Hide file tree
Showing 164 changed files with 23,989 additions and 19,176 deletions.
50 changes: 25 additions & 25 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/apache/datafusion"
rust-version = "1.76"
version = "41.0.0"
version = "42.0.0"

[workspace.dependencies]
# We turn off default-features for some dependencies here so the workspaces which inherit them can
Expand Down Expand Up @@ -88,31 +88,31 @@ arrow-string = { version = "53.0.0", default-features = false }
async-trait = "0.1.73"
bigdecimal = "=0.4.1"
bytes = "1.4"
chrono = { version = "0.4.34", default-features = false }
chrono = { version = "0.4.38", default-features = false }
ctor = "0.2.0"
dashmap = "6.0.1"
datafusion = { path = "datafusion/core", version = "41.0.0", default-features = false }
datafusion-catalog = { path = "datafusion/catalog", version = "41.0.0" }
datafusion-common = { path = "datafusion/common", version = "41.0.0", default-features = false }
datafusion-common-runtime = { path = "datafusion/common-runtime", version = "41.0.0" }
datafusion-execution = { path = "datafusion/execution", version = "41.0.0" }
datafusion-expr = { path = "datafusion/expr", version = "41.0.0" }
datafusion-expr-common = { path = "datafusion/expr-common", version = "41.0.0" }
datafusion-functions = { path = "datafusion/functions", version = "41.0.0" }
datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "41.0.0" }
datafusion-functions-aggregate-common = { path = "datafusion/functions-aggregate-common", version = "41.0.0" }
datafusion-functions-nested = { path = "datafusion/functions-nested", version = "41.0.0" }
datafusion-functions-window = { path = "datafusion/functions-window", version = "41.0.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "41.0.0", default-features = false }
datafusion-physical-expr = { path = "datafusion/physical-expr", version = "41.0.0", default-features = false }
datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "41.0.0", default-features = false }
datafusion-physical-optimizer = { path = "datafusion/physical-optimizer", version = "41.0.0" }
datafusion-physical-plan = { path = "datafusion/physical-plan", version = "41.0.0" }
datafusion-proto = { path = "datafusion/proto", version = "41.0.0" }
datafusion-proto-common = { path = "datafusion/proto-common", version = "41.0.0" }
datafusion-sql = { path = "datafusion/sql", version = "41.0.0" }
datafusion-sqllogictest = { path = "datafusion/sqllogictest", version = "41.0.0" }
datafusion-substrait = { path = "datafusion/substrait", version = "41.0.0" }
datafusion = { path = "datafusion/core", version = "42.0.0", default-features = false }
datafusion-catalog = { path = "datafusion/catalog", version = "42.0.0" }
datafusion-common = { path = "datafusion/common", version = "42.0.0", default-features = false }
datafusion-common-runtime = { path = "datafusion/common-runtime", version = "42.0.0" }
datafusion-execution = { path = "datafusion/execution", version = "42.0.0" }
datafusion-expr = { path = "datafusion/expr", version = "42.0.0" }
datafusion-expr-common = { path = "datafusion/expr-common", version = "42.0.0" }
datafusion-functions = { path = "datafusion/functions", version = "42.0.0" }
datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "42.0.0" }
datafusion-functions-aggregate-common = { path = "datafusion/functions-aggregate-common", version = "42.0.0" }
datafusion-functions-nested = { path = "datafusion/functions-nested", version = "42.0.0" }
datafusion-functions-window = { path = "datafusion/functions-window", version = "42.0.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "42.0.0", default-features = false }
datafusion-physical-expr = { path = "datafusion/physical-expr", version = "42.0.0", default-features = false }
datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "42.0.0", default-features = false }
datafusion-physical-optimizer = { path = "datafusion/physical-optimizer", version = "42.0.0" }
datafusion-physical-plan = { path = "datafusion/physical-plan", version = "42.0.0" }
datafusion-proto = { path = "datafusion/proto", version = "42.0.0" }
datafusion-proto-common = { path = "datafusion/proto-common", version = "42.0.0" }
datafusion-sql = { path = "datafusion/sql", version = "42.0.0" }
datafusion-sqllogictest = { path = "datafusion/sqllogictest", version = "42.0.0" }
datafusion-substrait = { path = "datafusion/substrait", version = "42.0.0" }
doc-comment = "0.3"
env_logger = "0.11"
futures = "0.3"
Expand All @@ -137,7 +137,7 @@ rand = "0.8"
regex = "1.8"
rstest = "0.22.0"
serde_json = "1"
sqlparser = { version = "0.50.0", features = ["visitor"] }
sqlparser = { version = "0.51.0", features = ["visitor"] }
tempfile = "3"
thiserror = "1.0.44"
tokio = { version = "1.36", features = ["macros", "rt", "sync"] }
Expand Down
44 changes: 33 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,36 @@
[discord-badge]: https://img.shields.io/discord/885562378132000778.svg?logo=discord&style=flat-square
[discord-url]: https://discord.com/invite/Qw5gKqHxUM

[Website](https://github.com/apache/datafusion) |
[Guides](https://github.com/apache/datafusion/tree/main/docs) |
[Website](https://datafusion.apache.org/) |
[API Docs](https://docs.rs/datafusion/latest/datafusion/) |
[Chat](https://discord.com/channels/885562378132000778/885562378132000781)

<img src="./docs/source/_static/images/2x_bgwhite_original.png" width="512" alt="logo"/>

Apache DataFusion is a very fast, extensible query engine for building high-quality data-centric systems in
[Rust](http://rustlang.org), using the [Apache Arrow](https://arrow.apache.org)
in-memory format. [Python Bindings](https://github.com/apache/datafusion-python) are also available. DataFusion offers SQL and Dataframe APIs, excellent [performance](https://benchmark.clickhouse.com/), built-in support for CSV, Parquet, JSON, and Avro, extensive customization, and a great community.
<a href="https://datafusion.apache.org/">
<img src="./docs/source/_static/images/2x_bgwhite_original.png" width="512" alt="logo"/>
</a>

DataFusion is an extensible query engine written in [Rust] that
uses [Apache Arrow] as its in-memory format. DataFusion's target users are
developers building fast and feature rich database and analytic systems,
customized to particular workloads. See [use cases] for examples.

"Out of the box," DataFusion offers [SQL] and [`Dataframe`] APIs,
excellent [performance], built-in support for CSV, Parquet, JSON, and Avro,
extensive customization, and a great community.
[Python Bindings] are also available.

DataFusion features a full query planner, a columnar, streaming, multi-threaded,
vectorized execution engine, and partitioned data sources. You can
customize DataFusion at almost all points including additional data sources,
query languages, functions, custom operators and more.
See the [Architecture] section for more details.

[rust]: http://rustlang.org
[apache arrow]: https://arrow.apache.org
[use cases]: https://datafusion.apache.org/user-guide/introduction.html#use-cases
[python bindings]: https://github.com/apache/datafusion-python
[performance]: https://benchmark.clickhouse.com/
[architecture]: https://datafusion.apache.org/contributor-guide/architecture.html

Here are links to some important information

Expand Down Expand Up @@ -97,9 +117,11 @@ Optional features:

## Rust Version Compatibility Policy

DataFusion's Minimum Required Stable Rust Version (MSRV) policy is to support
each stable Rust version for 6 months after it is
[released](https://github.com/rust-lang/rust/blob/master/RELEASES.md). This
generally translates to support for the most recent 3 to 4 stable Rust versions.
DataFusion's Minimum Required Stable Rust Version (MSRV) policy is to support stable [4 latest
Rust versions](https://releases.rs) OR the stable minor Rust version as of 4 months, whichever is lower.

For example, given the releases `1.78.0`, `1.79.0`, `1.80.0`, `1.80.1` and `1.81.0` DataFusion will support 1.78.0, which is 3 minor versions prior to the most minor recent `1.81`.

If a hotfix is released for the minimum supported Rust version (MSRV), the MSRV will be the minor version with all hotfixes, even if it surpasses the four-month window.

We enforce this policy using a [MSRV CI Check](https://github.com/search?q=repo%3Aapache%2Fdatafusion+rust-version+language%3ATOML+path%3A%2F%5ECargo.toml%2F&type=code)
58 changes: 57 additions & 1 deletion benchmarks/queries/clickbench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ LIMIT 10;

### Q3: What is the income distribution for users in specific regions

**Question**: "What regions and social networks have the highest variance of parameter price
**Question**: "What regions and social networks have the highest variance of parameter price?"

**Important Query Properties**: STDDEV and VAR aggregation functions, GROUP BY multiple small ints

Expand All @@ -73,6 +73,62 @@ ORDER BY s DESC
LIMIT 10;
```

### Q4: Response start time distribution analysis (median)

**Question**: Find the WatchIDs with the highest median "ResponseStartTiming" without Java enabled

**Important Query Properties**: MEDIAN, functions, high cardinality grouping that skips intermediate aggregation

Note this query is somewhat synthetic as "WatchID" is almost unique (there are a few duplicates)

```sql
SELECT "ClientIP", "WatchID", COUNT(*) c, MIN("ResponseStartTiming") tmin, MEDIAN("ResponseStartTiming") tmed, MAX("ResponseStartTiming") tmax
FROM 'hits.parquet'
WHERE "JavaEnable" = 0 -- filters to 32M of 100M rows
GROUP BY "ClientIP", "WatchID"
HAVING c > 1
ORDER BY tmed DESC
LIMIT 10;
```

Results look like

+-------------+---------------------+---+------+------+------+
| ClientIP | WatchID | c | tmin | tmed | tmax |
+-------------+---------------------+---+------+------+------+
| 1611957945 | 6655575552203051303 | 2 | 0 | 0 | 0 |
| -1402644643 | 8566928176839891583 | 2 | 0 | 0 | 0 |
+-------------+---------------------+---+------+------+------+


### Q5: Response start time distribution analysis (p95)

**Question**: Find the WatchIDs with the highest p95 "ResponseStartTiming" without Java enabled

**Important Query Properties**: APPROX_PERCENTILE_CONT, functions, high cardinality grouping that skips intermediate aggregation

Note this query is somewhat synthetic as "WatchID" is almost unique (there are a few duplicates)

```sql
SELECT "ClientIP", "WatchID", COUNT(*) c, MIN("ResponseStartTiming") tmin, APPROX_PERCENTILE_CONT("ResponseStartTiming", 0.95) tp95, MAX("ResponseStartTiming") tmax
FROM 'hits.parquet'
WHERE "JavaEnable" = 0 -- filters to 32M of 100M rows
GROUP BY "ClientIP", "WatchID"
HAVING c > 1
ORDER BY tp95 DESC
LIMIT 10;
```

Results look like

+-------------+---------------------+---+------+------+------+
| ClientIP | WatchID | c | tmin | tp95 | tmax |
+-------------+---------------------+---+------+------+------+
| 1611957945 | 6655575552203051303 | 2 | 0 | 0 | 0 |
| -1402644643 | 8566928176839891583 | 2 | 0 | 0 | 0 |
+-------------+---------------------+---+------+------+------+


## Data Notes

Here are some interesting statistics about the data used in the queries
Expand Down
2 changes: 2 additions & 0 deletions benchmarks/queries/clickbench/extended.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ SELECT COUNT(DISTINCT "SearchPhrase"), COUNT(DISTINCT "MobilePhone"), COUNT(DIST
SELECT COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserCountry"), COUNT(DISTINCT "BrowserLanguage") FROM hits;
SELECT "BrowserCountry", COUNT(DISTINCT "SocialNetwork"), COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserLanguage"), COUNT(DISTINCT "SocialAction") FROM hits GROUP BY 1 ORDER BY 2 DESC LIMIT 10;
SELECT "SocialSourceNetworkID", "RegionID", COUNT(*), AVG("Age"), AVG("ParamPrice"), STDDEV("ParamPrice") as s, VAR("ParamPrice") FROM hits GROUP BY "SocialSourceNetworkID", "RegionID" HAVING s IS NOT NULL ORDER BY s DESC LIMIT 10;
SELECT "ClientIP", "WatchID", COUNT(*) c, MIN("ResponseStartTiming") tmin, MEDIAN("ResponseStartTiming") tmed, MAX("ResponseStartTiming") tmax FROM hits WHERE "JavaEnable" = 0 GROUP BY "ClientIP", "WatchID" HAVING c > 1 ORDER BY tmed DESC LIMIT 10;
SELECT "ClientIP", "WatchID", COUNT(*) c, MIN("ResponseStartTiming") tmin, APPROX_PERCENTILE_CONT("ResponseStartTiming", 0.95) tp95, MAX("ResponseStartTiming") tmax FROM 'hits' WHERE "JavaEnable" = 0 GROUP BY "ClientIP", "WatchID" HAVING c > 1 ORDER BY tp95 DESC LIMIT 10;
2 changes: 1 addition & 1 deletion benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl RunOpt {
.options_mut()
.execution
.parquet
.schema_force_string_view = self.common.string_view;
.schema_force_view_types = self.common.force_view_types;

let ctx = SessionContext::new_with_config(config);
self.register_hits(&ctx).await?;
Expand Down
9 changes: 5 additions & 4 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl RunOpt {
.options_mut()
.execution
.parquet
.schema_force_string_view = self.common.string_view;
.schema_force_view_types = self.common.force_view_types;
let ctx = SessionContext::new_with_config(config);

// register tables
Expand Down Expand Up @@ -268,7 +268,8 @@ impl RunOpt {
}
"parquet" => {
let path = format!("{path}/{table}");
let format = ParquetFormat::default().with_enable_pruning(true);
let format = ParquetFormat::default()
.with_options(ctx.state().table_options().parquet.clone());

(Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
}
Expand Down Expand Up @@ -344,7 +345,7 @@ mod tests {
partitions: Some(2),
batch_size: 8192,
debug: false,
string_view: false,
force_view_types: false,
};
let opt = RunOpt {
query: Some(query),
Expand Down Expand Up @@ -378,7 +379,7 @@ mod tests {
partitions: Some(2),
batch_size: 8192,
debug: false,
string_view: false,
force_view_types: false,
};
let opt = RunOpt {
query: Some(query),
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct CommonOpt {
/// If true, will use StringView/BinaryViewArray instead of String/BinaryArray
/// when reading ParquetFiles
#[structopt(long)]
pub string_view: bool,
pub force_view_types: bool,
}

impl CommonOpt {
Expand Down
Loading

0 comments on commit 7b61328

Please sign in to comment.