From 4dfa5c7d760a3e8b065ed8395c1d63867a0f17e9 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Tue, 5 Nov 2024 13:41:16 +0100 Subject: [PATCH 01/15] Fix variable resolution in vectorized aggregation planning We didn't properly resolve INDEX_VARs in the output targetlist of DecompressChunk nodes, which are present when it uses a custom scan targetlist. Fix this by always working with the targetlist where these variables are resolved to uncompressed chunk variables, like we do during execution. --- tsl/src/nodes/vector_agg/exec.c | 17 +++-- tsl/src/nodes/vector_agg/plan.c | 112 +++++++++++++++++++------------- 2 files changed, 76 insertions(+), 53 deletions(-) diff --git a/tsl/src/nodes/vector_agg/exec.c b/tsl/src/nodes/vector_agg/exec.c index 7e8e59de1cd..230ed3c8ffe 100644 --- a/tsl/src/nodes/vector_agg/exec.c +++ b/tsl/src/nodes/vector_agg/exec.c @@ -25,16 +25,19 @@ get_input_offset(DecompressChunkState *decompress_state, Var *var) { DecompressContext *dcontext = &decompress_state->decompress_context; + /* + * All variable references in the tectorized aggregation node were + * translated to uncompressed chunk variables when it was created. + */ + CustomScan *cscan = castNode(CustomScan, decompress_state->csstate.ss.ps.plan); + Ensure((Index) var->varno == (Index) cscan->scan.scanrelid, + "got vector varno %d expected %d", + var->varno, + cscan->scan.scanrelid); + CompressionColumnDescription *value_column_description = NULL; for (int i = 0; i < dcontext->num_data_columns; i++) { - /* - * See the column lookup in compute_plain_qual() for the discussion of - * which attribute numbers occur where. At the moment here it is - * uncompressed_scan_attno, but it might be an oversight of not rewriting - * the references into INDEX_VAR (or OUTER_VAR...?) when we create the - * VectorAgg node. - */ CompressionColumnDescription *current_column = &dcontext->compressed_chunk_columns[i]; if (current_column->uncompressed_chunk_attno == var->varattno) { diff --git a/tsl/src/nodes/vector_agg/plan.c b/tsl/src/nodes/vector_agg/plan.c index ac150b7ea99..47eb1258b33 100644 --- a/tsl/src/nodes/vector_agg/plan.c +++ b/tsl/src/nodes/vector_agg/plan.c @@ -115,11 +115,13 @@ resolve_outer_special_vars(List *agg_tlist, CustomScan *custom) * node. */ static Plan * -vector_agg_plan_create(Agg *agg, CustomScan *decompress_chunk) +vector_agg_plan_create(Agg *agg, CustomScan *decompress_chunk, List *resolved_targetlist) { - CustomScan *custom = (CustomScan *) makeNode(CustomScan); - custom->custom_plans = list_make1(decompress_chunk); - custom->methods = &scan_methods; + CustomScan *vector_agg = (CustomScan *) makeNode(CustomScan); + vector_agg->custom_plans = list_make1(decompress_chunk); + vector_agg->methods = &scan_methods; + + vector_agg->custom_scan_tlist = resolved_targetlist; /* * Note that this is being called from the post-planning hook, and therefore @@ -127,32 +129,31 @@ vector_agg_plan_create(Agg *agg, CustomScan *decompress_chunk) * the previous planning stages, and they contain special varnos referencing * the scan targetlists. */ - custom->custom_scan_tlist = resolve_outer_special_vars(agg->plan.targetlist, decompress_chunk); - custom->scan.plan.targetlist = - build_trivial_custom_output_targetlist(custom->custom_scan_tlist); + vector_agg->scan.plan.targetlist = + build_trivial_custom_output_targetlist(vector_agg->custom_scan_tlist); /* * Copy the costs from the normal aggregation node, so that they show up in * the EXPLAIN output. They are not used for any other purposes, because * this hook is called after the planning is finished. */ - custom->scan.plan.plan_rows = agg->plan.plan_rows; - custom->scan.plan.plan_width = agg->plan.plan_width; - custom->scan.plan.startup_cost = agg->plan.startup_cost; - custom->scan.plan.total_cost = agg->plan.total_cost; + vector_agg->scan.plan.plan_rows = agg->plan.plan_rows; + vector_agg->scan.plan.plan_width = agg->plan.plan_width; + vector_agg->scan.plan.startup_cost = agg->plan.startup_cost; + vector_agg->scan.plan.total_cost = agg->plan.total_cost; - custom->scan.plan.parallel_aware = false; - custom->scan.plan.parallel_safe = decompress_chunk->scan.plan.parallel_safe; - custom->scan.plan.async_capable = false; + vector_agg->scan.plan.parallel_aware = false; + vector_agg->scan.plan.parallel_safe = decompress_chunk->scan.plan.parallel_safe; + vector_agg->scan.plan.async_capable = false; - custom->scan.plan.plan_node_id = agg->plan.plan_node_id; + vector_agg->scan.plan.plan_node_id = agg->plan.plan_node_id; Assert(agg->plan.qual == NIL); - custom->scan.plan.initPlan = agg->plan.initPlan; + vector_agg->scan.plan.initPlan = agg->plan.initPlan; - custom->scan.plan.extParam = bms_copy(agg->plan.extParam); - custom->scan.plan.allParam = bms_copy(agg->plan.allParam); + vector_agg->scan.plan.extParam = bms_copy(agg->plan.extParam); + vector_agg->scan.plan.allParam = bms_copy(agg->plan.allParam); List *grouping_col_offsets = NIL; for (int i = 0; i < agg->numCols; i++) @@ -160,9 +161,9 @@ vector_agg_plan_create(Agg *agg, CustomScan *decompress_chunk) grouping_col_offsets = lappend_int(grouping_col_offsets, AttrNumberGetAttrOffset(agg->grpColIdx[i])); } - custom->custom_private = list_make1(grouping_col_offsets); + vector_agg->custom_private = list_make1(grouping_col_offsets); - return (Plan *) custom; + return (Plan *) vector_agg; } /* @@ -178,44 +179,54 @@ is_vector_var(CustomScan *custom, Expr *expr, bool *out_is_segmentby) return false; } - Var *aggregated_var = castNode(Var, expr); + Var *decompressed_var = castNode(Var, expr); /* - * Check if this particular column is a segmentby or has bulk decompression - * enabled. This hook is called after set_plan_refs, and at this stage the - * output targetlist of the aggregation node uses OUTER_VAR references into - * the child scan targetlist, so first we have to translate this. + * This must be called after resolve_outer_special_vars(), so we should only + * see the uncompressed chunk variables here. */ - Assert(aggregated_var->varno == OUTER_VAR); - TargetEntry *decompressed_target_entry = - list_nth(custom->scan.plan.targetlist, AttrNumberGetAttrOffset(aggregated_var->varattno)); - - if (!IsA(decompressed_target_entry->expr, Var)) - { - /* - * Can only aggregate the plain Vars. Not sure if this is redundant with - * the similar check above. - */ - return false; - } - Var *decompressed_var = castNode(Var, decompressed_target_entry->expr); + Ensure((Index) decompressed_var->varno == (Index) custom->scan.scanrelid, + "expected scan varno %d got %d", + custom->scan.scanrelid, + decompressed_var->varno); /* * Now, we have to translate the decompressed varno into the compressed * column index, to check if the column supports bulk decompression. */ List *decompression_map = list_nth(custom->custom_private, DCP_DecompressionMap); - List *is_segmentby_column = list_nth(custom->custom_private, DCP_IsSegmentbyColumn); - List *bulk_decompression_column = list_nth(custom->custom_private, DCP_BulkDecompressionColumn); int compressed_column_index = 0; for (; compressed_column_index < list_length(decompression_map); compressed_column_index++) { - if (list_nth_int(decompression_map, compressed_column_index) == decompressed_var->varattno) + const int custom_scan_attno = list_nth_int(decompression_map, compressed_column_index); + if (custom_scan_attno <= 0) + { + continue; + } + + int uncompressed_chunk_attno = 0; + if (custom->custom_scan_tlist == NIL) + { + uncompressed_chunk_attno = custom_scan_attno; + } + else + { + Var *var = castNode(Var, + castNode(TargetEntry, + list_nth(custom->custom_scan_tlist, + AttrNumberGetAttrOffset(custom_scan_attno))) + ->expr); + uncompressed_chunk_attno = var->varattno; + } + + if (uncompressed_chunk_attno == decompressed_var->varattno) { break; } } Ensure(compressed_column_index < list_length(decompression_map), "compressed column not found"); + + List *bulk_decompression_column = list_nth(custom->custom_private, DCP_BulkDecompressionColumn); Assert(list_length(decompression_map) == list_length(bulk_decompression_column)); const bool bulk_decompression_enabled_for_column = list_nth_int(bulk_decompression_column, compressed_column_index); @@ -232,6 +243,8 @@ is_vector_var(CustomScan *custom, Expr *expr, bool *out_is_segmentby) /* * Check if this column is a segmentby. */ + List *is_segmentby_column = list_nth(custom->custom_private, DCP_IsSegmentbyColumn); + Assert(list_length(is_segmentby_column) == list_length(decompression_map)); const bool is_segmentby = list_nth_int(is_segmentby_column, compressed_column_index); if (out_is_segmentby) { @@ -316,7 +329,7 @@ can_vectorize_aggref(Aggref *aggref, CustomScan *custom) * Currently supports either no grouping or grouping by segmentby columns. */ static bool -can_vectorize_grouping(Agg *agg, CustomScan *custom) +can_vectorize_grouping(Agg *agg, CustomScan *custom, List *resolved_targetlist) { if (agg->numCols == 0) { @@ -326,7 +339,7 @@ can_vectorize_grouping(Agg *agg, CustomScan *custom) for (int i = 0; i < agg->numCols; i++) { int offset = AttrNumberGetAttrOffset(agg->grpColIdx[i]); - TargetEntry *entry = list_nth(agg->plan.targetlist, offset); + TargetEntry *entry = list_nth(resolved_targetlist, offset); bool is_segmentby = false; if (!is_vector_var(custom, entry->expr, &is_segmentby)) @@ -508,7 +521,14 @@ try_insert_vector_agg_node(Plan *plan) return plan; } - if (!can_vectorize_grouping(agg, custom)) + /* + * To make it easier to examine the variables participating in the aggregation, + * the subsequent checks are performed on the aggregated targetlist with + * all variables resolved to uncompressed chunk variables. + */ + List *resolved_targetlist = resolve_outer_special_vars(agg->plan.targetlist, custom); + + if (!can_vectorize_grouping(agg, custom, resolved_targetlist)) { /* No GROUP BY support for now. */ return plan; @@ -516,7 +536,7 @@ try_insert_vector_agg_node(Plan *plan) /* Now check the aggregate functions themselves. */ ListCell *lc; - foreach (lc, agg->plan.targetlist) + foreach (lc, resolved_targetlist) { TargetEntry *target_entry = castNode(TargetEntry, lfirst(lc)); if (!IsA(target_entry->expr, Aggref)) @@ -535,5 +555,5 @@ try_insert_vector_agg_node(Plan *plan) * Finally, all requirements are satisfied and we can vectorize this partial * aggregation node. */ - return vector_agg_plan_create(agg, custom); + return vector_agg_plan_create(agg, custom, resolved_targetlist); } From bc30ab8fb4fee0522ce5940a59a16689170f8596 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Tue, 5 Nov 2024 13:49:59 +0100 Subject: [PATCH 02/15] changelog --- .unreleased/resolve-vars | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .unreleased/resolve-vars diff --git a/.unreleased/resolve-vars b/.unreleased/resolve-vars new file mode 100644 index 00000000000..51f42a33713 --- /dev/null +++ b/.unreleased/resolve-vars @@ -0,0 +1,2 @@ +Fixes: #7410 "aggregated compressed column not found" error on aggregation query. +Thanks: @uasiddiqi for reporting the "aggregated compressed column not found" error. From 69ed75df7e7ebd7faf7dc348ea604f6a70f52c7c Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Tue, 5 Nov 2024 13:51:16 +0100 Subject: [PATCH 03/15] typo --- tsl/src/nodes/vector_agg/exec.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsl/src/nodes/vector_agg/exec.c b/tsl/src/nodes/vector_agg/exec.c index 230ed3c8ffe..f5019a23eb0 100644 --- a/tsl/src/nodes/vector_agg/exec.c +++ b/tsl/src/nodes/vector_agg/exec.c @@ -26,7 +26,7 @@ get_input_offset(DecompressChunkState *decompress_state, Var *var) DecompressContext *dcontext = &decompress_state->decompress_context; /* - * All variable references in the tectorized aggregation node were + * All variable references in the vectorized aggregation node were * translated to uncompressed chunk variables when it was created. */ CustomScan *cscan = castNode(CustomScan, decompress_state->csstate.ss.ps.plan); From da0e3de0cda2b5e0229e04ad14e770efc377be0e Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Tue, 5 Nov 2024 23:14:05 +0100 Subject: [PATCH 04/15] fix --- tsl/src/nodes/vector_agg/plan.c | 48 +++++++++++++++++++++------------ 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/tsl/src/nodes/vector_agg/plan.c b/tsl/src/nodes/vector_agg/plan.c index 47eb1258b33..f9b1d5527a6 100644 --- a/tsl/src/nodes/vector_agg/plan.c +++ b/tsl/src/nodes/vector_agg/plan.c @@ -74,29 +74,43 @@ resolve_outer_special_vars_mutator(Node *node, void *context) return expression_tree_mutator(node, resolve_outer_special_vars_mutator, context); } - Var *aggregated_var = castNode(Var, node); - Ensure(aggregated_var->varno == OUTER_VAR, - "encountered unexpected varno %d as an aggregate argument", - aggregated_var->varno); - + Var *var = castNode(Var, node); CustomScan *custom = castNode(CustomScan, context); - TargetEntry *decompress_chunk_tentry = - castNode(TargetEntry, list_nth(custom->scan.plan.targetlist, aggregated_var->varattno - 1)); - Var *decompressed_var = castNode(Var, decompress_chunk_tentry->expr); - if (decompressed_var->varno == INDEX_VAR) + if ((Index) var->varno == (Index) custom->scan.scanrelid) + { + /* + * This is already the ucompressed chunk var. We can see it referenced + * by expressions in the output targetlist of DecompressChunk node. + */ + return (Node *) var; + } + + if (var->varno == OUTER_VAR) + { + /* + * Reference into the output targetlist of the DecompressChunk node. + */ + TargetEntry *decompress_chunk_tentry = + castNode(TargetEntry, list_nth(custom->scan.plan.targetlist, var->varattno - 1)); + + return resolve_outer_special_vars_mutator((Node *) decompress_chunk_tentry->expr, context); + } + + if (var->varno == INDEX_VAR) { /* * This is a reference into the custom scan targetlist, we have to resolve * it as well. */ - decompressed_var = - castNode(Var, - castNode(TargetEntry, - list_nth(custom->custom_scan_tlist, decompressed_var->varattno - 1)) - ->expr); - } - Assert(decompressed_var->varno > 0); - return (Node *) copyObject(decompressed_var); + var = castNode(Var, + castNode(TargetEntry, list_nth(custom->custom_scan_tlist, var->varattno - 1)) + ->expr); + Assert(var->varno > 0); + + return (Node *) copyObject(var); + } + + Ensure(false, "encountered unexpected varno %d as an aggregate argument", var->varno); } /* From eecd1ddfb56c49c02636a9508846ebf193a2e517 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Tue, 5 Nov 2024 23:21:47 +0100 Subject: [PATCH 05/15] silence the warning --- tsl/src/nodes/vector_agg/plan.c | 1 + 1 file changed, 1 insertion(+) diff --git a/tsl/src/nodes/vector_agg/plan.c b/tsl/src/nodes/vector_agg/plan.c index f9b1d5527a6..fcd9c3840c1 100644 --- a/tsl/src/nodes/vector_agg/plan.c +++ b/tsl/src/nodes/vector_agg/plan.c @@ -111,6 +111,7 @@ resolve_outer_special_vars_mutator(Node *node, void *context) } Ensure(false, "encountered unexpected varno %d as an aggregate argument", var->varno); + return node; } /* From edaa7cb05b1ea510bda100ddb7f49f6141816ee7 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Wed, 6 Nov 2024 10:53:23 +0100 Subject: [PATCH 06/15] test --- tsl/test/expected/vector_agg_param.out | 108 ++++++++++++++++++++++--- tsl/test/sql/vector_agg_param.sql | 12 ++- 2 files changed, 107 insertions(+), 13 deletions(-) diff --git a/tsl/test/expected/vector_agg_param.out b/tsl/test/expected/vector_agg_param.out index b481d9c8a97..6835cc971e3 100644 --- a/tsl/test/expected/vector_agg_param.out +++ b/tsl/test/expected/vector_agg_param.out @@ -21,23 +21,36 @@ select count(compress_chunk(x)) from show_chunks('pvagg') x; (1 row) analyze pvagg; -explain (costs off) +explain (verbose, costs off) select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg where s = x) xx; - QUERY PLAN ---------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Nested Loop - -> Function Scan on unnest x + Output: x.x, (sum(pvagg.a)) + -> Function Scan on pg_catalog.unnest x + Output: x.x + Function Call: unnest('{0,1,2}'::integer[]) -> Finalize Aggregate - -> Custom Scan (ChunkAppend) on pvagg + Output: sum(pvagg.a) + -> Custom Scan (ChunkAppend) on public.pvagg + Output: (PARTIAL sum(pvagg.a)) + Startup Exclusion: false + Runtime Exclusion: true -> Custom Scan (VectorAgg) - -> Custom Scan (DecompressChunk) on _hyper_1_1_chunk - -> Seq Scan on compress_hyper_2_3_chunk - Filter: (s = x.x) + Output: (PARTIAL sum(_hyper_1_1_chunk.a)) + -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk + Output: _hyper_1_1_chunk.a + -> Seq Scan on _timescaledb_internal.compress_hyper_2_3_chunk + Output: compress_hyper_2_3_chunk._ts_meta_count, compress_hyper_2_3_chunk.s, compress_hyper_2_3_chunk._ts_meta_min_1, compress_hyper_2_3_chunk._ts_meta_max_1, compress_hyper_2_3_chunk.a + Filter: (compress_hyper_2_3_chunk.s = x.x) -> Custom Scan (VectorAgg) - -> Custom Scan (DecompressChunk) on _hyper_1_2_chunk - -> Seq Scan on compress_hyper_2_4_chunk - Filter: (s = x.x) -(12 rows) + Output: (PARTIAL sum(_hyper_1_2_chunk.a)) + -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_2_chunk + Output: _hyper_1_2_chunk.a + -> Seq Scan on _timescaledb_internal.compress_hyper_2_4_chunk + Output: compress_hyper_2_4_chunk._ts_meta_count, compress_hyper_2_4_chunk.s, compress_hyper_2_4_chunk._ts_meta_min_1, compress_hyper_2_4_chunk._ts_meta_max_1, compress_hyper_2_4_chunk.a + Filter: (compress_hyper_2_4_chunk.s = x.x) +(25 rows) select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg where s = x) xx; x | sum @@ -47,4 +60,75 @@ select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg 2 | 1498500 (3 rows) +explain (verbose, costs off) +select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a + x) from pvagg) xx; + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Nested Loop + Output: x.x, (sum((_hyper_1_1_chunk.a + x.x))) + -> Function Scan on pg_catalog.unnest x + Output: x.x + Function Call: unnest('{0,1,2}'::integer[]) + -> Finalize Aggregate + Output: sum((_hyper_1_1_chunk.a + x.x)) + -> Append + -> Partial Aggregate + Output: PARTIAL sum((_hyper_1_1_chunk.a + x.x)) + -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk + Output: _hyper_1_1_chunk.a + -> Seq Scan on _timescaledb_internal.compress_hyper_2_3_chunk + Output: compress_hyper_2_3_chunk._ts_meta_count, compress_hyper_2_3_chunk.s, compress_hyper_2_3_chunk._ts_meta_min_1, compress_hyper_2_3_chunk._ts_meta_max_1, compress_hyper_2_3_chunk.a + -> Partial Aggregate + Output: PARTIAL sum((_hyper_1_2_chunk.a + x.x)) + -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_2_chunk + Output: _hyper_1_2_chunk.a + -> Seq Scan on _timescaledb_internal.compress_hyper_2_4_chunk + Output: compress_hyper_2_4_chunk._ts_meta_count, compress_hyper_2_4_chunk.s, compress_hyper_2_4_chunk._ts_meta_min_1, compress_hyper_2_4_chunk._ts_meta_max_1, compress_hyper_2_4_chunk.a +(20 rows) + +select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a + x) from pvagg) xx; + x | sum +---+--------- + 0 | 1998000 + 1 | 1999998 + 2 | 2001996 +(3 rows) + +explain (verbose, costs off) +select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg group by x) xx; + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Nested Loop + Output: x.x, (sum(_hyper_1_1_chunk.a)) + -> Function Scan on pg_catalog.unnest x + Output: x.x + Function Call: unnest('{0,1,2}'::integer[]) + -> Finalize GroupAggregate + Output: sum(_hyper_1_1_chunk.a), (x.x) + Group Key: (x.x) + -> Append + -> Partial GroupAggregate + Output: (x.x), PARTIAL sum(_hyper_1_1_chunk.a) + Group Key: x.x + -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk + Output: x.x, _hyper_1_1_chunk.a + -> Seq Scan on _timescaledb_internal.compress_hyper_2_3_chunk + Output: compress_hyper_2_3_chunk._ts_meta_count, compress_hyper_2_3_chunk.s, compress_hyper_2_3_chunk._ts_meta_min_1, compress_hyper_2_3_chunk._ts_meta_max_1, compress_hyper_2_3_chunk.a + -> Partial GroupAggregate + Output: (x.x), PARTIAL sum(_hyper_1_2_chunk.a) + Group Key: x.x + -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_2_chunk + Output: x.x, _hyper_1_2_chunk.a + -> Seq Scan on _timescaledb_internal.compress_hyper_2_4_chunk + Output: compress_hyper_2_4_chunk._ts_meta_count, compress_hyper_2_4_chunk.s, compress_hyper_2_4_chunk._ts_meta_min_1, compress_hyper_2_4_chunk._ts_meta_max_1, compress_hyper_2_4_chunk.a +(23 rows) + +select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg group by x) xx; + x | sum +---+--------- + 0 | 1998000 + 1 | 1998000 + 2 | 1998000 +(3 rows) + drop table pvagg; diff --git a/tsl/test/sql/vector_agg_param.sql b/tsl/test/sql/vector_agg_param.sql index 491a877556d..3c31b8ccf0a 100644 --- a/tsl/test/sql/vector_agg_param.sql +++ b/tsl/test/sql/vector_agg_param.sql @@ -19,10 +19,20 @@ select count(compress_chunk(x)) from show_chunks('pvagg') x; analyze pvagg; -explain (costs off) +explain (verbose, costs off) select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg where s = x) xx; select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg where s = x) xx; +explain (verbose, costs off) +select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a + x) from pvagg) xx; + +select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a + x) from pvagg) xx; + +explain (verbose, costs off) +select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg group by x) xx; + +select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg group by x) xx; + drop table pvagg; From d11fec5b2c7fc06d832fb96e48bae517cd22acea Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Thu, 7 Nov 2024 16:31:49 +0100 Subject: [PATCH 07/15] fix --- scripts/upload_ci_stats.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/upload_ci_stats.sh b/scripts/upload_ci_stats.sh index 086f7648cf3..c3b938ee392 100755 --- a/scripts/upload_ci_stats.sh +++ b/scripts/upload_ci_stats.sh @@ -94,8 +94,8 @@ then match($0, /^(test| ) ([^ ]+)[ ]+\.\.\.[ ]+([^ ]+) (|\(.*\))[ ]+([0-9]+) ms$/, a) { print ENVIRON["JOB_DATE"], a[2], tolower(a[3] (a[4] ? (" " a[4]) : "")), a[5]; } - match($0, /^([^0-9]+) [0-9]+ +- ([^ ]+) +([0-9]+) ms/, a) { - print ENVIRON["JOB_DATE"], a[2], a[1], a[3]; + match($0, /^([^0-9]+) [0-9]+ +[-+] ([^ ]+) +([0-9]+) ms/, a) { + print ENVIRON["JOB_DATE"], a[2], a[1], a[3]; } ' installcheck.log > tests.tsv From 92c6fd369e1bced64816a44dd81beddc0182a455 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Tue, 12 Nov 2024 10:44:03 +0100 Subject: [PATCH 08/15] Update tsl/src/nodes/vector_agg/plan.c MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Erik Nordström <819732+erimatnor@users.noreply.github.com> Signed-off-by: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> --- tsl/src/nodes/vector_agg/plan.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsl/src/nodes/vector_agg/plan.c b/tsl/src/nodes/vector_agg/plan.c index fcd9c3840c1..07200d786e6 100644 --- a/tsl/src/nodes/vector_agg/plan.c +++ b/tsl/src/nodes/vector_agg/plan.c @@ -79,7 +79,7 @@ resolve_outer_special_vars_mutator(Node *node, void *context) if ((Index) var->varno == (Index) custom->scan.scanrelid) { /* - * This is already the ucompressed chunk var. We can see it referenced + * This is already the uncompressed chunk var. We can see it referenced * by expressions in the output targetlist of DecompressChunk node. */ return (Node *) var; From d5ad761d2904a287214173fbd0ea3f2a91bb89af Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Wed, 13 Nov 2024 22:52:54 +0100 Subject: [PATCH 09/15] fixes --- tsl/src/nodes/vector_agg/plan.c | 10 ++++++++++ tsl/test/expected/vector_agg_param.out | 2 ++ tsl/test/expected/vectorized_aggregation.out | 7 +++++++ tsl/test/sql/vector_agg_param.sql | 3 +++ tsl/test/sql/vectorized_aggregation.sql | 4 ++++ 5 files changed, 26 insertions(+) diff --git a/tsl/src/nodes/vector_agg/plan.c b/tsl/src/nodes/vector_agg/plan.c index 07200d786e6..74a89953cab 100644 --- a/tsl/src/nodes/vector_agg/plan.c +++ b/tsl/src/nodes/vector_agg/plan.c @@ -205,6 +205,16 @@ is_vector_var(CustomScan *custom, Expr *expr, bool *out_is_segmentby) custom->scan.scanrelid, decompressed_var->varno); + if (decompressed_var->varattno <= 0) + { + /* Can't work with special attributes like tableoid. */ + if (out_is_segmentby) + { + *out_is_segmentby = false; + } + return false; + } + /* * Now, we have to translate the decompressed varno into the compressed * column index, to check if the column supports bulk decompression. diff --git a/tsl/test/expected/vector_agg_param.out b/tsl/test/expected/vector_agg_param.out index 6835cc971e3..2f230b6cff4 100644 --- a/tsl/test/expected/vector_agg_param.out +++ b/tsl/test/expected/vector_agg_param.out @@ -21,6 +21,8 @@ select count(compress_chunk(x)) from show_chunks('pvagg') x; (1 row) analyze pvagg; +-- Uncomment to generate reference +set timescaledb.enable_vectorized_aggregation to off; explain (verbose, costs off) select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg where s = x) xx; QUERY PLAN diff --git a/tsl/test/expected/vectorized_aggregation.out b/tsl/test/expected/vectorized_aggregation.out index 12ed14e3e84..10896f9a4b9 100644 --- a/tsl/test/expected/vectorized_aggregation.out +++ b/tsl/test/expected/vectorized_aggregation.out @@ -3292,3 +3292,10 @@ SELECT sum(segment_by_value1) FROM testtable2 WHERE segment_by_value1 > 1000 AND (75 rows) RESET max_parallel_workers_per_gather; +-- Can't group by a system column +SELECT sum(float_value) FROM testtable2 GROUP BY tableoid ORDER BY 1 LIMIT 1; + sum +------- + 82620 +(1 row) + diff --git a/tsl/test/sql/vector_agg_param.sql b/tsl/test/sql/vector_agg_param.sql index 3c31b8ccf0a..1be5bd71495 100644 --- a/tsl/test/sql/vector_agg_param.sql +++ b/tsl/test/sql/vector_agg_param.sql @@ -18,6 +18,9 @@ select count(compress_chunk(x)) from show_chunks('pvagg') x; analyze pvagg; +-- Uncomment to generate reference +--set timescaledb.enable_vectorized_aggregation to off; + explain (verbose, costs off) select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg where s = x) xx; diff --git a/tsl/test/sql/vectorized_aggregation.sql b/tsl/test/sql/vectorized_aggregation.sql index c8844932a93..bafecd6b544 100644 --- a/tsl/test/sql/vectorized_aggregation.sql +++ b/tsl/test/sql/vectorized_aggregation.sql @@ -403,3 +403,7 @@ SET max_parallel_workers_per_gather = 0; SELECT sum(segment_by_value1) FROM testtable2 WHERE segment_by_value1 > 1000 AND int_value > 1000; RESET max_parallel_workers_per_gather; + + +-- Can't group by a system column +SELECT sum(float_value) FROM testtable2 GROUP BY tableoid ORDER BY 1 LIMIT 1; From c6eb880bd0033dcd8602baedfcef1abbc29d9f60 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Wed, 13 Nov 2024 22:56:06 +0100 Subject: [PATCH 10/15] copy --- tsl/src/nodes/vector_agg/plan.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsl/src/nodes/vector_agg/plan.c b/tsl/src/nodes/vector_agg/plan.c index 74a89953cab..44d9631d7d5 100644 --- a/tsl/src/nodes/vector_agg/plan.c +++ b/tsl/src/nodes/vector_agg/plan.c @@ -82,7 +82,7 @@ resolve_outer_special_vars_mutator(Node *node, void *context) * This is already the uncompressed chunk var. We can see it referenced * by expressions in the output targetlist of DecompressChunk node. */ - return (Node *) var; + return (Node *) copyObject(var); } if (var->varno == OUTER_VAR) From 0c59d9ab78d7059e8e264ea00745b6002c3b3086 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Wed, 13 Nov 2024 23:01:18 +0100 Subject: [PATCH 11/15] fix --- tsl/test/expected/vector_agg_param.out | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsl/test/expected/vector_agg_param.out b/tsl/test/expected/vector_agg_param.out index 2f230b6cff4..a73bea2b5d3 100644 --- a/tsl/test/expected/vector_agg_param.out +++ b/tsl/test/expected/vector_agg_param.out @@ -22,7 +22,7 @@ select count(compress_chunk(x)) from show_chunks('pvagg') x; analyze pvagg; -- Uncomment to generate reference -set timescaledb.enable_vectorized_aggregation to off; +--set timescaledb.enable_vectorized_aggregation to off; explain (verbose, costs off) select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg where s = x) xx; QUERY PLAN From e71a0154d04215a5f3f8fe6f613c788adedb3e30 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Thu, 14 Nov 2024 11:49:17 +0100 Subject: [PATCH 12/15] fixes for pg16 --- tsl/src/nodes/vector_agg/plan.c | 30 +++++++++++++++++++------ tsl/test/expected/vector_agg_param.out | 31 ++------------------------ tsl/test/sql/vector_agg_param.sql | 5 ++--- 3 files changed, 27 insertions(+), 39 deletions(-) diff --git a/tsl/src/nodes/vector_agg/plan.c b/tsl/src/nodes/vector_agg/plan.c index 44d9631d7d5..2dc0d167a5b 100644 --- a/tsl/src/nodes/vector_agg/plan.c +++ b/tsl/src/nodes/vector_agg/plan.c @@ -364,7 +364,7 @@ can_vectorize_grouping(Agg *agg, CustomScan *custom, List *resolved_targetlist) for (int i = 0; i < agg->numCols; i++) { int offset = AttrNumberGetAttrOffset(agg->grpColIdx[i]); - TargetEntry *entry = list_nth(resolved_targetlist, offset); + TargetEntry *entry = list_nth_node(TargetEntry, resolved_targetlist, offset); bool is_segmentby = false; if (!is_vector_var(custom, entry->expr, &is_segmentby)) @@ -559,19 +559,35 @@ try_insert_vector_agg_node(Plan *plan) return plan; } - /* Now check the aggregate functions themselves. */ + /* Now check the output targetlist. */ ListCell *lc; foreach (lc, resolved_targetlist) { TargetEntry *target_entry = castNode(TargetEntry, lfirst(lc)); - if (!IsA(target_entry->expr, Aggref)) + if (IsA(target_entry->expr, Aggref)) { - continue; + Aggref *aggref = castNode(Aggref, target_entry->expr); + if (!can_vectorize_aggref(aggref, custom)) + { + /* Aggregate function not vectorizable. */ + return plan; + } } - - Aggref *aggref = castNode(Aggref, target_entry->expr); - if (!can_vectorize_aggref(aggref, custom)) + else if (IsA(target_entry->expr, Var)) + { + if (!is_vector_var(custom, target_entry->expr, NULL)) + { + /* Variable not vectorizable. */ + return plan; + } + } + else { + /* + * Sometimes the plan can require this node to perform a projection, + * e.g. we can see a nested loop param in its output targetlist. We + * can't handle this case currently. + */ return plan; } } diff --git a/tsl/test/expected/vector_agg_param.out b/tsl/test/expected/vector_agg_param.out index a73bea2b5d3..1ef408e32e5 100644 --- a/tsl/test/expected/vector_agg_param.out +++ b/tsl/test/expected/vector_agg_param.out @@ -96,35 +96,8 @@ select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a + x) from p 2 | 2001996 (3 rows) -explain (verbose, costs off) -select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg group by x) xx; - QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Nested Loop - Output: x.x, (sum(_hyper_1_1_chunk.a)) - -> Function Scan on pg_catalog.unnest x - Output: x.x - Function Call: unnest('{0,1,2}'::integer[]) - -> Finalize GroupAggregate - Output: sum(_hyper_1_1_chunk.a), (x.x) - Group Key: (x.x) - -> Append - -> Partial GroupAggregate - Output: (x.x), PARTIAL sum(_hyper_1_1_chunk.a) - Group Key: x.x - -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk - Output: x.x, _hyper_1_1_chunk.a - -> Seq Scan on _timescaledb_internal.compress_hyper_2_3_chunk - Output: compress_hyper_2_3_chunk._ts_meta_count, compress_hyper_2_3_chunk.s, compress_hyper_2_3_chunk._ts_meta_min_1, compress_hyper_2_3_chunk._ts_meta_max_1, compress_hyper_2_3_chunk.a - -> Partial GroupAggregate - Output: (x.x), PARTIAL sum(_hyper_1_2_chunk.a) - Group Key: x.x - -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_2_chunk - Output: x.x, _hyper_1_2_chunk.a - -> Seq Scan on _timescaledb_internal.compress_hyper_2_4_chunk - Output: compress_hyper_2_4_chunk._ts_meta_count, compress_hyper_2_4_chunk.s, compress_hyper_2_4_chunk._ts_meta_min_1, compress_hyper_2_4_chunk._ts_meta_max_1, compress_hyper_2_4_chunk.a -(23 rows) - +-- The plan for this query differs after PG16, x is not used as grouping key but +-- just added into the output targetlist of partial aggregation nodes. select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg group by x) xx; x | sum ---+--------- diff --git a/tsl/test/sql/vector_agg_param.sql b/tsl/test/sql/vector_agg_param.sql index 1be5bd71495..244718ef27f 100644 --- a/tsl/test/sql/vector_agg_param.sql +++ b/tsl/test/sql/vector_agg_param.sql @@ -32,9 +32,8 @@ select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a + x) from p select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a + x) from pvagg) xx; -explain (verbose, costs off) -select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg group by x) xx; - +-- The plan for this query differs after PG16, x is not used as grouping key but +-- just added into the output targetlist of partial aggregation nodes. select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg group by x) xx; From d4c97f097d5eecfb6eb69cca72fa250cf2ebd8cb Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Mon, 18 Nov 2024 13:24:02 +0100 Subject: [PATCH 13/15] remove accidental change --- tsl/src/nodes/vector_agg/plan.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tsl/src/nodes/vector_agg/plan.c b/tsl/src/nodes/vector_agg/plan.c index 2dc0d167a5b..1445b8008d1 100644 --- a/tsl/src/nodes/vector_agg/plan.c +++ b/tsl/src/nodes/vector_agg/plan.c @@ -170,13 +170,13 @@ vector_agg_plan_create(Agg *agg, CustomScan *decompress_chunk, List *resolved_ta vector_agg->scan.plan.extParam = bms_copy(agg->plan.extParam); vector_agg->scan.plan.allParam = bms_copy(agg->plan.allParam); - List *grouping_col_offsets = NIL; + List *grouping_child_output_offsets = NIL; for (int i = 0; i < agg->numCols; i++) { - grouping_col_offsets = - lappend_int(grouping_col_offsets, AttrNumberGetAttrOffset(agg->grpColIdx[i])); + grouping_child_output_offsets = + lappend_int(grouping_child_output_offsets, AttrNumberGetAttrOffset(agg->grpColIdx[i])); } - vector_agg->custom_private = list_make1(grouping_col_offsets); + vector_agg->custom_private = list_make1(grouping_child_output_offsets); return (Plan *) vector_agg; } From fa9c11f1d655f064b4540b6bae04afe1bc6ee98e Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Mon, 18 Nov 2024 14:12:29 +0100 Subject: [PATCH 14/15] update test ref --- tsl/test/expected/vector_agg_param.out | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tsl/test/expected/vector_agg_param.out b/tsl/test/expected/vector_agg_param.out index 1ef408e32e5..c04e7b682b2 100644 --- a/tsl/test/expected/vector_agg_param.out +++ b/tsl/test/expected/vector_agg_param.out @@ -40,6 +40,7 @@ select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg Runtime Exclusion: true -> Custom Scan (VectorAgg) Output: (PARTIAL sum(_hyper_1_1_chunk.a)) + Grouping Policy: all compressed batches -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk Output: _hyper_1_1_chunk.a -> Seq Scan on _timescaledb_internal.compress_hyper_2_3_chunk @@ -47,12 +48,13 @@ select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg Filter: (compress_hyper_2_3_chunk.s = x.x) -> Custom Scan (VectorAgg) Output: (PARTIAL sum(_hyper_1_2_chunk.a)) + Grouping Policy: all compressed batches -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_2_chunk Output: _hyper_1_2_chunk.a -> Seq Scan on _timescaledb_internal.compress_hyper_2_4_chunk Output: compress_hyper_2_4_chunk._ts_meta_count, compress_hyper_2_4_chunk.s, compress_hyper_2_4_chunk._ts_meta_min_1, compress_hyper_2_4_chunk._ts_meta_max_1, compress_hyper_2_4_chunk.a Filter: (compress_hyper_2_4_chunk.s = x.x) -(25 rows) +(27 rows) select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg where s = x) xx; x | sum From 106a68f2466ec8b6a034cf34e2d1f0c44cf73c9d Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Tue, 19 Nov 2024 17:40:10 +0100 Subject: [PATCH 15/15] update the comment --- tsl/test/expected/vector_agg_param.out | 6 ++++-- tsl/test/sql/vector_agg_param.sql | 7 ++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/tsl/test/expected/vector_agg_param.out b/tsl/test/expected/vector_agg_param.out index c04e7b682b2..3d717b10d2a 100644 --- a/tsl/test/expected/vector_agg_param.out +++ b/tsl/test/expected/vector_agg_param.out @@ -21,8 +21,10 @@ select count(compress_chunk(x)) from show_chunks('pvagg') x; (1 row) analyze pvagg; --- Uncomment to generate reference ---set timescaledb.enable_vectorized_aggregation to off; +-- The reference for this test is generated using the standard Postgres +-- aggregation. When you change this test, recheck the results against the +-- Postgres aggregation by uncommenting the below GUC. +-- set timescaledb.enable_vectorized_aggregation to off; explain (verbose, costs off) select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg where s = x) xx; QUERY PLAN diff --git a/tsl/test/sql/vector_agg_param.sql b/tsl/test/sql/vector_agg_param.sql index 244718ef27f..d695b839376 100644 --- a/tsl/test/sql/vector_agg_param.sql +++ b/tsl/test/sql/vector_agg_param.sql @@ -18,9 +18,10 @@ select count(compress_chunk(x)) from show_chunks('pvagg') x; analyze pvagg; --- Uncomment to generate reference ---set timescaledb.enable_vectorized_aggregation to off; - +-- The reference for this test is generated using the standard Postgres +-- aggregation. When you change this test, recheck the results against the +-- Postgres aggregation by uncommenting the below GUC. +-- set timescaledb.enable_vectorized_aggregation to off; explain (verbose, costs off) select * from unnest(array[0, 1, 2]::int[]) x, lateral (select sum(a) from pvagg where s = x) xx;