diff --git a/src/backend/distributed/deparser/citus_ruleutils.c b/src/backend/distributed/deparser/citus_ruleutils.c index f99462058d9..b31c1b11c7d 100644 --- a/src/backend/distributed/deparser/citus_ruleutils.c +++ b/src/backend/distributed/deparser/citus_ruleutils.c @@ -41,6 +41,7 @@ #include "commands/sequence.h" #include "foreign/foreign.h" #include "lib/stringinfo.h" +#include "miscadmin.h" #include "nodes/nodeFuncs.h" #include "nodes/nodes.h" #include "nodes/parsenodes.h" @@ -1638,3 +1639,200 @@ RoleSpecString(RoleSpec *spec, bool withQuoteIdentifier) } } } + +/* Function to extract paramid from a FuncExpr node */ +static AttrNumber extract_paramid_from_funcexpr(FuncExpr *func) +{ + AttrNumber targetAttnum = InvalidAttrNumber; + ListCell *lc; + + /* Iterate through the arguments of the FuncExpr */ + foreach(lc, func->args) + { + Node *arg = (Node *) lfirst(lc); + + /* Check if the argument is a PARAM node */ + if (IsA(arg, Param)) + { + Param *param = (Param *) arg; + targetAttnum = param->paramid; + + break; // Exit loop once we find the PARAM node + } + } + + return targetAttnum; +} + +/* + * processTargetsIndirection - reorder targets list (from indirection) + * + * We don't change anything but the order of the target list. + * The purpose here is to be able to deparse a query tree as if it was + * provided by the PostgreSQL parser, not the rewriter (which is the one + * received by the planner hook). + * + * It's required only for UPDATE SET (MULTIEXPR) queries at the moment, other + * candidates are not supported by Citus. + */ +static void +processTargetsIndirection(List **targetList) +{ + int nAssignableCols; + int targetListPosition; + bool sawJunk = false; + List *newTargetList = NIL; + ListCell *lc; + + /* Count non-junk columns and ensure they precede junk columns */ + nAssignableCols = 0; + foreach(lc, *targetList) + { + TargetEntry *tle = lfirst_node(TargetEntry, lc); + + if (tle->resjunk) + { + sawJunk = true; + } + else + { + if (sawJunk) + elog(ERROR, "Subplan target list is out of order"); + + nAssignableCols++; + } + } + + /* If no assignable columns, return the original target list */ + if (nAssignableCols == 0) + return; + + /* Reorder the target list */ + /* we start from 1 */ + targetListPosition = 1; + while (nAssignableCols > 0) + { + nAssignableCols--; + + foreach(lc, *targetList) + { + TargetEntry *tle = lfirst_node(TargetEntry, lc); + + if (IsA(tle->expr, FuncExpr)) + { + FuncExpr *funcexpr = (FuncExpr *) tle->expr; + AttrNumber attnum = extract_paramid_from_funcexpr(funcexpr); + + if (attnum == targetListPosition) + { + ereport(DEBUG1, (errmsg("Adding FuncExpr resno: %d", tle->resno))); + newTargetList = lappend(newTargetList, tle); + targetListPosition++; + break; + } + } + else if (IsA(tle->expr, Param)) + { + Param *param = (Param *) tle->expr; + AttrNumber attnum = param->paramid; + + if (attnum == targetListPosition) + { + newTargetList = lappend(newTargetList, tle); + targetListPosition++; + break; + } + } + } + } + + /* Append any remaining junk columns */ + foreach(lc, *targetList) + { + TargetEntry *tle = lfirst_node(TargetEntry, lc); + if (tle->resjunk) + newTargetList = lappend(newTargetList, tle); + } + *targetList = newTargetList; +} + +/* + * helper function to evaluate if we are in an SET (...) + * Caller is responsible to check the command type (UPDATE) + */ +static inline bool +is_update_set_multiexpr(Query *query) +{ + ListCell *lc; + /* we're only interested by UPDATE */ + if (query->commandType != CMD_UPDATE) + return false; + + /* + * Then foreach target entry, check if one of the node or it's descendant + * is a PARAM_MULTIEXPR (i.e. a SET (a, b) = (...)) + */ + foreach(lc, query->targetList) { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + Node *expr; + + if (tle->resjunk) + continue; + + expr = strip_implicit_coercions((Node *) tle->expr); + + if (expr && IsA(expr, Param) && + ((Param *) expr)->paramkind == PARAM_MULTIEXPR) + { + return true; + } + } + + /* No multi-column set expression found */ + return false; +} + +/* + * helper function to evaluate if we are in SELECT with CTE. + */ +static inline bool +is_select_cte(Query *query) +{ + /* we are only looking for a SELECT query */ + if (query->commandType != CMD_SELECT) + return false; + /* and it must contain CTE */ + if (query->cteList == NIL) + return false; + return true; +} + +/* + * We may need to reorder parts of the planner tree we are receiving here. + * We expect to produce an SQL query text but our tree has been optimized by + * PostgreSL rewriter already... + */ +void +RebuildParserTreeFromPlannerTree(Query *query) +{ + /* Guard against excessively long or deeply-nested queries */ + CHECK_FOR_INTERRUPTS(); + + /* prevent unloyal defeat */ + check_stack_depth(); + + if (is_update_set_multiexpr(query)) + { + processTargetsIndirection(&query->targetList); + } + /* also match UPDATE in CTE, this one is recursive */ + else if (is_select_cte(query)) + { + ListCell *lc; + foreach(lc, query->cteList) + { + CommonTableExpr *cte = (CommonTableExpr *) lfirst(lc); + RebuildParserTreeFromPlannerTree((Query *) cte->ctequery); + } + } +} diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 1d6550afdb5..1df88f32fb0 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -217,6 +217,13 @@ distributed_planner(Query *parse, planContext.originalQuery = copyObject(parse); + /* + * We may need to reorder parts of the planner tree we are receiving here. + * We expect to produce an SQL query text but our tree has been optimized by + * PostgreSL rewriter already... + * FIXME is there conditions to reduce the number of calls ? + */ + RebuildParserTreeFromPlannerTree(planContext.originalQuery); if (!fastPathRouterQuery) { diff --git a/src/include/distributed/citus_ruleutils.h b/src/include/distributed/citus_ruleutils.h index 3a9c364824f..729d0b57940 100644 --- a/src/include/distributed/citus_ruleutils.h +++ b/src/include/distributed/citus_ruleutils.h @@ -60,5 +60,7 @@ extern char * generate_operator_name(Oid operid, Oid arg1, Oid arg2); extern List * getOwnedSequences_internal(Oid relid, AttrNumber attnum, char deptype); extern void AppendOptionListToString(StringInfo stringData, List *options); +extern void RebuildParserTreeFromPlannerTree(Query *query); + #endif /* CITUS_RULEUTILS_H */