Skip to content

Commit

Permalink
fix: inconsistent types in array_agg_distinct merge_batch
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Aug 22, 2023
1 parent 6aa423b commit d518789
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
20 changes: 14 additions & 6 deletions datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ use std::collections::HashSet;
use crate::aggregate::utils::down_cast_any_ref;
use crate::expressions::format_state_name;
use crate::{AggregateExpr, PhysicalExpr};
use datafusion_common::Result;
use datafusion_common::ScalarValue;
use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::Accumulator;

/// Expression for a ARRAY_AGG(DISTINCT) aggregation.
Expand Down Expand Up @@ -147,11 +146,20 @@ impl Accumulator for DistinctArrayAggAccumulator {
return Ok(());
}

for array in states {
for j in 0..array.len() {
self.values.insert(ScalarValue::try_from_array(array, j)?);
assert!(
states.len() == 1,
"array_agg_distinct states must contain single array"
);
let state = &states[0];
(0..state.len()).try_for_each(|i| {
let scalar = ScalarValue::try_from_array(state, i)?;
if let ScalarValue::List(Some(values), _) = scalar {
self.values.extend(values);
Ok(())
} else {
internal_err!("array_agg_distinct state must be list")
}
}
})?;

Ok(())
}
Expand Down
4 changes: 4 additions & 0 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1280,6 +1280,10 @@ NULL NULL 781 7.81 125 -117 100
# ----
# [4, 2, 3, 5, 1]

# additional count(1) forces array_agg_distinct instead of array_agg over aggregated by c2 data
statement ok
SELECT array_agg(distinct c2), count(1) FROM aggregate_test_100

# aggregate_time_min_and_max
query TT
select min(t), max(t) from (select '00:00:00' as t union select '00:00:01' union select '00:00:02')
Expand Down

0 comments on commit d518789

Please sign in to comment.