-
Notifications
You must be signed in to change notification settings - Fork 882
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Vectorized aggregation with grouping by one fixed-size column #7341
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #7341 +/- ##
==========================================
+ Coverage 80.06% 82.59% +2.52%
==========================================
Files 190 231 +41
Lines 37181 43037 +5856
Branches 9450 10802 +1352
==========================================
+ Hits 29770 35547 +5777
- Misses 2997 3190 +193
+ Partials 4414 4300 -114 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really good functionality but there's a lot of code to review and it is a bit hard to understand and follow so it takes time.
I think there are some things that can be done to improve the code and the readability so that it is easier to understand for others.
I've made some suggestions, and probably not all of them are valid or good ideas. But at least the places I've made suggestions for so far are areas that are particularly hard to follow.
I will submit what I have so far, and will try to do the rest later.
create_grouping_policy_batch(vector_agg_state->agg_defs, | ||
vector_agg_state->output_grouping_columns, | ||
/* partial_per_batch = */ grouping_column_offsets != NIL); | ||
if (list_length(vector_agg_state->output_grouping_columns) == 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice with a comment before this check letting the reader know that that we want to try optimizing the 1-column case in a special way and that we later fall back to the "regular" per-batch grouping if the optimization wasn't possible.
MemoryContext agg_extra_mctx) | ||
{ | ||
CountState *states = (CountState *) agg_states; | ||
for (int row = start_row; row < end_row; row++) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just double-checking that it is really correct to be non-inclusive with the end_row
here...
end_row
sounds like it is a "valid" row index as opposed to using something like num_rows
in a zero-indexed series. If end_row
is not a valid index it should probably be called something else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that's the C++ habit of mine where end
is idiomatically a past-the-end invalid iterator. In general I think the ranges with exclusive right end are very common, like [begin, end)
. Do you have a better name for this? Sometimes I write past_the_end_row
to make it absolutely clear, but this feels a little too long for common usage...
const ArrowArray *vector, MemoryContext agg_extra_mctx) | ||
{ | ||
const uint64 *valid = vector->buffers[0]; | ||
for (int row = start_row; row < end_row; row++) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here as above w.r.t. end_row
* A memory context for aggregate functions to allocate additional data, | ||
* i.e. if they store strings or float8 datum on 32-bit systems. Valid until | ||
* the grouping policy is reset. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* A memory context for aggregate functions to allocate additional data, | |
* i.e. if they store strings or float8 datum on 32-bit systems. Valid until | |
* the grouping policy is reset. | |
* A memory context for aggregate functions to store varlen type values, | |
* e.g., strings or float8 datums on 32-bit systems. Valid until | |
* the grouping policy is reset. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking more about statistical sketches or exact number of unique values, these functions have to allocate generic variable-length data somewhere, and that's the purpose of this context.
} | ||
|
||
static pg_attribute_always_inline void | ||
get_key_arrow_fixed_2(CompressedColumnValues column, int row, Datum *restrict key, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the following functions are never executed in tests. Would be good to have test coverage of these.
/* | ||
* Temporary storage of aggregate state offsets for a given batch. We keep | ||
* it in the policy because it is potentially too big to keep on stack, and | ||
* we don't want to reallocate it each batch. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took me a while to figure out what these offsets are for. Could we do a better description here?
For example (if I got things right):
The agg_state_offsets array contains an offset for each "row" in an ArrowArray that points to the aggregate states for that row. The agg state is points to is decided by the value (key) in the grouping column. Thus, the number of agg states is equal to the number of unique values in the grouping column's ArrowArray.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I'll try to describe it better. They are basically unique indexes of the grouping keys, and the array is indexed by the row index of the input batch.
{ | ||
Datum key; | ||
uint32 status; | ||
uint32 agg_state_index; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about having the Agg state in the hash table entry instead? What are the pros/cons of having it here compared to in an outside array?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hash table should be as small as possible, because we're accessing its memory randomly, and the speed of this depends on whether it fits into the caches.
HashEntry *restrict entry = h_insert(table, key, &found); | ||
if (!found) | ||
{ | ||
entry->agg_state_index = next_unused_state_index++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering why we're not initializing the aggregate states here instead of in an additional loop after the fill_offsets?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just following a common rule of thumb for high-throughput data processing: colocate similar items in memory, do the same operation on them in bulk.
In this grouping agorithm we have two major stages:
- Match input row to a unique key index (
offsets
). This uses the hash table and the key column data. - Calculate the aggregate functions. This uses their input columns and their states.
These stages are done separately, and the data required for them are grouped together. According to the same principle, each aggregate function is processed for the entire batch, not all aggregate functions for one row.
VectorAggDef *agg_def = lfirst(aggdeflc); | ||
lfirst(aggstatelc) = | ||
repalloc(lfirst(aggstatelc), | ||
policy->allocated_aggstate_rows * agg_def->func.state_bytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be nicer if the agg_def->func had an allocation function instead? Then you'd just do agg_def->func.alloc_states(num_states)
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it could be slightly confusing. The aggregate functions can't decide to allocate their memory in a different way, so this is not a part of their interface. No need to make a method that has the same single-line implementation for every aggregate.
* it in the policy because it is potentially too big to keep on stack, and | ||
* we don't want to reallocate it each batch. | ||
*/ | ||
uint32 *offsets; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another question: what if we made this and array of pointers to the agg states instead of an array of offsets?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to colocate the states of one aggregate function together because they are processed together, and to use the same offsets (unique indexes of grouping key, basically) to access the states of all aggregate functions. With pointers this would be less convenient, and also they take up more memory.
This PR prepares for #7341 It has various assorted refactorings and cosmetic changes: * Various cosmetic things I don't know where to put. * The definitions of aggregate functions and grouping columns in the vector agg node are now typed arrays and not lists. * The aggegate function implementation always work with at most one filter bitmap. This reduces the amount of code and will help to support the aggregate FILTER clauses. * Parts of the aggregate function implementations are restructured and renamed in a way that will make it easier to support hash grouping. * EXPLAIN output is added for vector agg node that mentions the grouping policy that is being used. No functional changes are expected except for the EXPLAIN output. Disable-check: force-changelog-file --------- Signed-off-by: Alexander Kuzmenkov <[email protected]> Co-authored-by: Erik Nordström <[email protected]>
This is a simplified implementation that uses the Postgres simplehash hash table with a generic Datum key for by-value fixed-size compressed columns.
The biggest improvement on a "sensible" query is about 90%, and a couple of queries show bigger improvements but these are very synthetic cases that don't make much sense:
https://grafana.ops.savannah-dev.timescale.com/d/fasYic_4z/compare-akuzm?orgId=1&var-branch=All&var-run1=3815&var-run2=3816&var-threshold=0.02&var-use_historical_thresholds=true&var-threshold_expression=2%20%2A%20percentile_cont%280.90%29&var-exact_suite_version=false&from=now-2d&to=now