Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix variable resolution in vectorized aggregation planning #7415

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
2 changes: 2 additions & 0 deletions .unreleased/resolve-vars
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fixes: #7410 "aggregated compressed column not found" error on aggregation query.
Thanks: @uasiddiqi for reporting the "aggregated compressed column not found" error.
17 changes: 10 additions & 7 deletions tsl/src/nodes/vector_agg/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,19 @@ get_input_offset(DecompressChunkState *decompress_state, Var *var)
{
DecompressContext *dcontext = &decompress_state->decompress_context;

/*
* All variable references in the vectorized aggregation node were
* translated to uncompressed chunk variables when it was created.
*/
CustomScan *cscan = castNode(CustomScan, decompress_state->csstate.ss.ps.plan);
Ensure((Index) var->varno == (Index) cscan->scan.scanrelid,
"got vector varno %d expected %d",
var->varno,
cscan->scan.scanrelid);

CompressionColumnDescription *value_column_description = NULL;
for (int i = 0; i < dcontext->num_data_columns; i++)
{
/*
* See the column lookup in compute_plain_qual() for the discussion of
* which attribute numbers occur where. At the moment here it is
* uncompressed_scan_attno, but it might be an oversight of not rewriting
* the references into INDEX_VAR (or OUTER_VAR...?) when we create the
* VectorAgg node.
*/
CompressionColumnDescription *current_column = &dcontext->compressed_chunk_columns[i];
if (current_column->uncompressed_chunk_attno == var->varattno)
{
Expand Down
156 changes: 108 additions & 48 deletions tsl/src/nodes/vector_agg/plan.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,29 +74,44 @@
return expression_tree_mutator(node, resolve_outer_special_vars_mutator, context);
}

Var *aggregated_var = castNode(Var, node);
Ensure(aggregated_var->varno == OUTER_VAR,
"encountered unexpected varno %d as an aggregate argument",
aggregated_var->varno);

Var *var = castNode(Var, node);
CustomScan *custom = castNode(CustomScan, context);
TargetEntry *decompress_chunk_tentry =
castNode(TargetEntry, list_nth(custom->scan.plan.targetlist, aggregated_var->varattno - 1));
Var *decompressed_var = castNode(Var, decompress_chunk_tentry->expr);
if (decompressed_var->varno == INDEX_VAR)
if ((Index) var->varno == (Index) custom->scan.scanrelid)
{
/*
* This is already the uncompressed chunk var. We can see it referenced
* by expressions in the output targetlist of DecompressChunk node.
*/
return (Node *) copyObject(var);
}

if (var->varno == OUTER_VAR)
{
/*
* Reference into the output targetlist of the DecompressChunk node.
*/
TargetEntry *decompress_chunk_tentry =
castNode(TargetEntry, list_nth(custom->scan.plan.targetlist, var->varattno - 1));

return resolve_outer_special_vars_mutator((Node *) decompress_chunk_tentry->expr, context);
}

if (var->varno == INDEX_VAR)
{
/*
* This is a reference into the custom scan targetlist, we have to resolve
* it as well.
*/
decompressed_var =
castNode(Var,
castNode(TargetEntry,
list_nth(custom->custom_scan_tlist, decompressed_var->varattno - 1))
->expr);
}
Assert(decompressed_var->varno > 0);
return (Node *) copyObject(decompressed_var);
var = castNode(Var,
castNode(TargetEntry, list_nth(custom->custom_scan_tlist, var->varattno - 1))
->expr);
Assert(var->varno > 0);

return (Node *) copyObject(var);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a copyObject() here but not in the other return cases? Or, to ask it differently, should we docopyObject() also in the other return of var?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has no practical consequences here, but it is idiomatic for the expression tree mutators to return a copy. I added copyObject into the second place as well.

}

Ensure(false, "encountered unexpected varno %d as an aggregate argument", var->varno);

Check warning on line 113 in tsl/src/nodes/vector_agg/plan.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/vector_agg/plan.c#L113

Added line #L113 was not covered by tests
return node;
}

/*
Expand All @@ -115,20 +130,20 @@
* node.
*/
static Plan *
vector_agg_plan_create(Agg *agg, CustomScan *decompress_chunk)
vector_agg_plan_create(Agg *agg, CustomScan *decompress_chunk, List *resolved_targetlist)
{
CustomScan *vector_agg = (CustomScan *) makeNode(CustomScan);
vector_agg->custom_plans = list_make1(decompress_chunk);
vector_agg->methods = &scan_methods;

vector_agg->custom_scan_tlist = resolved_targetlist;

/*
* Note that this is being called from the post-planning hook, and therefore
* after set_plan_refs(). The meaning of output targetlists is different from
* the previous planning stages, and they contain special varnos referencing
* the scan targetlists.
*/
vector_agg->custom_scan_tlist =
resolve_outer_special_vars(agg->plan.targetlist, decompress_chunk);
vector_agg->scan.plan.targetlist =
build_trivial_custom_output_targetlist(vector_agg->custom_scan_tlist);

Expand Down Expand Up @@ -179,44 +194,64 @@
return false;
}

Var *aggregated_var = castNode(Var, expr);
Var *decompressed_var = castNode(Var, expr);

/*
* Check if this particular column is a segmentby or has bulk decompression
* enabled. This hook is called after set_plan_refs, and at this stage the
* output targetlist of the aggregation node uses OUTER_VAR references into
* the child scan targetlist, so first we have to translate this.
* This must be called after resolve_outer_special_vars(), so we should only
* see the uncompressed chunk variables here.
*/
Assert(aggregated_var->varno == OUTER_VAR);
TargetEntry *decompressed_target_entry =
list_nth(custom->scan.plan.targetlist, AttrNumberGetAttrOffset(aggregated_var->varattno));
Ensure((Index) decompressed_var->varno == (Index) custom->scan.scanrelid,
"expected scan varno %d got %d",
custom->scan.scanrelid,
decompressed_var->varno);

if (!IsA(decompressed_target_entry->expr, Var))
if (decompressed_var->varattno <= 0)
{
/*
* Can only aggregate the plain Vars. Not sure if this is redundant with
* the similar check above.
*/
/* Can't work with special attributes like tableoid. */
if (out_is_segmentby)
{
*out_is_segmentby = false;
}
return false;
}
Var *decompressed_var = castNode(Var, decompressed_target_entry->expr);

/*
* Now, we have to translate the decompressed varno into the compressed
* column index, to check if the column supports bulk decompression.
*/
List *decompression_map = list_nth(custom->custom_private, DCP_DecompressionMap);
List *is_segmentby_column = list_nth(custom->custom_private, DCP_IsSegmentbyColumn);
List *bulk_decompression_column = list_nth(custom->custom_private, DCP_BulkDecompressionColumn);
int compressed_column_index = 0;
for (; compressed_column_index < list_length(decompression_map); compressed_column_index++)
{
if (list_nth_int(decompression_map, compressed_column_index) == decompressed_var->varattno)
const int custom_scan_attno = list_nth_int(decompression_map, compressed_column_index);
if (custom_scan_attno <= 0)
{
continue;
}

int uncompressed_chunk_attno = 0;
if (custom->custom_scan_tlist == NIL)
{
uncompressed_chunk_attno = custom_scan_attno;
}
else
{
Var *var = castNode(Var,
castNode(TargetEntry,
list_nth(custom->custom_scan_tlist,
AttrNumberGetAttrOffset(custom_scan_attno)))
->expr);
uncompressed_chunk_attno = var->varattno;
}

if (uncompressed_chunk_attno == decompressed_var->varattno)
{
break;
}
}
Ensure(compressed_column_index < list_length(decompression_map), "compressed column not found");

List *bulk_decompression_column = list_nth(custom->custom_private, DCP_BulkDecompressionColumn);
Assert(list_length(decompression_map) == list_length(bulk_decompression_column));
const bool bulk_decompression_enabled_for_column =
list_nth_int(bulk_decompression_column, compressed_column_index);
Expand All @@ -233,6 +268,8 @@
/*
* Check if this column is a segmentby.
*/
List *is_segmentby_column = list_nth(custom->custom_private, DCP_IsSegmentbyColumn);
Assert(list_length(is_segmentby_column) == list_length(decompression_map));
const bool is_segmentby = list_nth_int(is_segmentby_column, compressed_column_index);
if (out_is_segmentby)
{
Expand Down Expand Up @@ -317,7 +354,7 @@
* Currently supports either no grouping or grouping by segmentby columns.
*/
static bool
can_vectorize_grouping(Agg *agg, CustomScan *custom)
can_vectorize_grouping(Agg *agg, CustomScan *custom, List *resolved_targetlist)
{
if (agg->numCols == 0)
{
Expand All @@ -327,7 +364,7 @@
for (int i = 0; i < agg->numCols; i++)
{
int offset = AttrNumberGetAttrOffset(agg->grpColIdx[i]);
TargetEntry *entry = list_nth(agg->plan.targetlist, offset);
TargetEntry *entry = list_nth_node(TargetEntry, resolved_targetlist, offset);

bool is_segmentby = false;
if (!is_vector_var(custom, entry->expr, &is_segmentby))
Expand Down Expand Up @@ -509,25 +546,48 @@
return plan;
}

if (!can_vectorize_grouping(agg, custom))
/*
* To make it easier to examine the variables participating in the aggregation,
* the subsequent checks are performed on the aggregated targetlist with
* all variables resolved to uncompressed chunk variables.
*/
List *resolved_targetlist = resolve_outer_special_vars(agg->plan.targetlist, custom);

if (!can_vectorize_grouping(agg, custom, resolved_targetlist))
{
/* No GROUP BY support for now. */
return plan;
}

/* Now check the aggregate functions themselves. */
/* Now check the output targetlist. */
ListCell *lc;
foreach (lc, agg->plan.targetlist)
foreach (lc, resolved_targetlist)
{
TargetEntry *target_entry = castNode(TargetEntry, lfirst(lc));
if (!IsA(target_entry->expr, Aggref))
if (IsA(target_entry->expr, Aggref))
{
continue;
Aggref *aggref = castNode(Aggref, target_entry->expr);
if (!can_vectorize_aggref(aggref, custom))
{
/* Aggregate function not vectorizable. */
return plan;
}
}

Aggref *aggref = castNode(Aggref, target_entry->expr);
if (!can_vectorize_aggref(aggref, custom))
else if (IsA(target_entry->expr, Var))
{
if (!is_vector_var(custom, target_entry->expr, NULL))
{
/* Variable not vectorizable. */
return plan;

Check warning on line 581 in tsl/src/nodes/vector_agg/plan.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/vector_agg/plan.c#L581

Added line #L581 was not covered by tests
}
}
else
{
/*
* Sometimes the plan can require this node to perform a projection,
* e.g. we can see a nested loop param in its output targetlist. We
* can't handle this case currently.
*/
return plan;
}
}
Expand All @@ -536,5 +596,5 @@
* Finally, all requirements are satisfied and we can vectorize this partial
* aggregation node.
*/
return vector_agg_plan_create(agg, custom);
return vector_agg_plan_create(agg, custom, resolved_targetlist);
}
87 changes: 75 additions & 12 deletions tsl/test/expected/vector_agg_param.out
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,42 @@ select count(compress_chunk(x)) from show_chunks('pvagg') x;
(1 row)

analyze pvagg;
explain (costs off)
-- The reference for this test is generated using the standard Postgres
-- aggregation. When you change this test, recheck the results against the
-- Postgres aggregation by uncommenting the below GUC.
-- set timescaledb.enable_vectorized_aggregation to off;
explain (verbose, costs off)
select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg where s = x) xx;
QUERY PLAN
---------------------------------------------------------------------------
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Nested Loop
-> Function Scan on unnest x
Output: x.x, (sum(pvagg.a))
-> Function Scan on pg_catalog.unnest x
Output: x.x
Function Call: unnest('{0,1,2}'::integer[])
-> Finalize Aggregate
-> Custom Scan (ChunkAppend) on pvagg
Output: sum(pvagg.a)
-> Custom Scan (ChunkAppend) on public.pvagg
Output: (PARTIAL sum(pvagg.a))
Startup Exclusion: false
Runtime Exclusion: true
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_1_1_chunk
-> Seq Scan on compress_hyper_2_3_chunk
Filter: (s = x.x)
Output: (PARTIAL sum(_hyper_1_1_chunk.a))
Grouping Policy: all compressed batches
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk
Output: _hyper_1_1_chunk.a
-> Seq Scan on _timescaledb_internal.compress_hyper_2_3_chunk
Output: compress_hyper_2_3_chunk._ts_meta_count, compress_hyper_2_3_chunk.s, compress_hyper_2_3_chunk._ts_meta_min_1, compress_hyper_2_3_chunk._ts_meta_max_1, compress_hyper_2_3_chunk.a
Filter: (compress_hyper_2_3_chunk.s = x.x)
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_1_2_chunk
-> Seq Scan on compress_hyper_2_4_chunk
Filter: (s = x.x)
(12 rows)
Output: (PARTIAL sum(_hyper_1_2_chunk.a))
Grouping Policy: all compressed batches
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_2_chunk
Output: _hyper_1_2_chunk.a
-> Seq Scan on _timescaledb_internal.compress_hyper_2_4_chunk
Output: compress_hyper_2_4_chunk._ts_meta_count, compress_hyper_2_4_chunk.s, compress_hyper_2_4_chunk._ts_meta_min_1, compress_hyper_2_4_chunk._ts_meta_max_1, compress_hyper_2_4_chunk.a
Filter: (compress_hyper_2_4_chunk.s = x.x)
(27 rows)

select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg where s = x) xx;
x | sum
Expand All @@ -47,4 +66,48 @@ select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg
2 | 1498500
(3 rows)

explain (verbose, costs off)
select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a + x) from pvagg) xx;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Nested Loop
Output: x.x, (sum((_hyper_1_1_chunk.a + x.x)))
-> Function Scan on pg_catalog.unnest x
Output: x.x
Function Call: unnest('{0,1,2}'::integer[])
-> Finalize Aggregate
Output: sum((_hyper_1_1_chunk.a + x.x))
-> Append
-> Partial Aggregate
Output: PARTIAL sum((_hyper_1_1_chunk.a + x.x))
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk
Output: _hyper_1_1_chunk.a
-> Seq Scan on _timescaledb_internal.compress_hyper_2_3_chunk
Output: compress_hyper_2_3_chunk._ts_meta_count, compress_hyper_2_3_chunk.s, compress_hyper_2_3_chunk._ts_meta_min_1, compress_hyper_2_3_chunk._ts_meta_max_1, compress_hyper_2_3_chunk.a
-> Partial Aggregate
Output: PARTIAL sum((_hyper_1_2_chunk.a + x.x))
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_2_chunk
Output: _hyper_1_2_chunk.a
-> Seq Scan on _timescaledb_internal.compress_hyper_2_4_chunk
Output: compress_hyper_2_4_chunk._ts_meta_count, compress_hyper_2_4_chunk.s, compress_hyper_2_4_chunk._ts_meta_min_1, compress_hyper_2_4_chunk._ts_meta_max_1, compress_hyper_2_4_chunk.a
(20 rows)

select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a + x) from pvagg) xx;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what this is testing. Is it just checking that the query doesn't fail (if it did fail prior to this fix)? Or is it testing that it gives correct output?

How can I know that the output (sum) is correct? Is there a non-compressed (regular) table I can compare with?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is testing an aggregate function reference that has an expression that references a nested loop parameter.

I generated a reference by running the same query with vectorized aggregation disabled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the reference output be part of the test? Now the set for turning off vector agg is a commented line so I cannot see the reference or verify that this is correct.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You uncomment this line and this generates the reference in the output file -- all queries run with normal postgres aggregation and not vectorized aggregation. Then you comment it back and run the test again and check that nothing else changes. You do this once when changing the test, I already done this, the test output has the correct results generated by standard Postgres plan. This is the approach I use for some other tests as well.

Copy link
Contributor

@erimatnor erimatnor Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pattern we have for this is to generate two output files and do a diff between them in the test. There are examples in other tests how to do this.

Having this comparison of the outputs is good because it also easily captures future errors and regressions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pattern we have for this is to generate two output files and do a diff between them in the test. There are examples in other tests how to do this.

Yeah, I know, I don't like to use it because:

  1. The test runs more than twice slower
  2. Inconvenient to view the entire test reference, you have to do extra steps to open another file for this.
  3. The PG version is not always the same as our version, e.g. some vectorized functions have better numeric stability.

This is the stuff that I remember off the top of my head, probably there are more reasons. Why is it a problem?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you also have to put the actual test queries into a separate file and run it with psql, so editing a test is also more complicated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having this comparison of the outputs is good because it also easily captures future errors and regressions.

The test I wrote also compares the outputs, only the PG output is fixed at the test editing time.

When you generate the output each time, you make it effectively compare the four different supported PG versions against each other. Not sure what's the benefit, probably you'll just run into some numeric stability change in PG and will have to painfully work around it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pattern we have for this is to generate two output files and do a diff between them in the test. There are examples in other tests how to do this.

Yeah, I know, I don't like to use it because:

1. The test runs more than twice slower

2. Inconvenient to view the entire test reference, you have to do extra steps to open another file for this.

3. The PG version is not always the same as our version, e.g. some vectorized functions have better numeric stability.

This is the stuff that I remember off the top of my head, probably there are more reasons. Why is it a problem?

I am merely giving feedback on things I think would improve the test and avoid regression, as well as for my own understanding so that I don't just approve without understanding what is going on. Only now, after I asked, it is clear that the test output is in fact different from regular PG aggregates, as you admit. Even if it is not strictly wrong, I cannot verify this in the review. It gives me pause because it was neither documented nor clear from the test. At the very least, this could have been good information to provide in the test. Having different aggregate output also means we can't easily capture regressions and it requires someone to know that they need to manually enable and inspect the output when something changes, which I think you are currently the only person who knows how to do easily.

Ideally, our tests should be easy to understand and maintain also by others, this is the perspective I have. Is there some way we can improve the test to make it easier for others to understand the aspects above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only now, after I asked, it is clear that the test output is in fact different from regular PG aggregates, as you admit.

That's not in this test, that's in different ones where I also use this pattern. What regressions do you want to avoid? This is the usual "golden test", it runs some queries and compares their output against the one captured in the reference. Most our tests are like that. Here we also have a possibility to compare the reference against the analogous PG output by uncommenting a single line in this test. What should be improved here?

x | sum
---+---------
0 | 1998000
1 | 1999998
2 | 2001996
(3 rows)

-- The plan for this query differs after PG16, x is not used as grouping key but
-- just added into the output targetlist of partial aggregation nodes.
select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg group by x) xx;
x | sum
---+---------
0 | 1998000
1 | 1998000
2 | 1998000
(3 rows)

drop table pvagg;
Loading
Loading