From 560a1d4609f903b0d1441e908eae1627a973ce4a Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Sun, 28 Sep 2025 12:13:30 +0200 Subject: [PATCH 1/4] Statistics on dirtied and allocated local buffers. It is a preliminary commit that tracks the state of temp buffers. The main goal of this statistic is to provide the optimiser with numbers to compute the cost estimation of flushing temporary buffers. Such a flush may be necessary if the optimiser decides to build a plan that includes a parallel section of the query, which involves scanning a temporary table. --- src/backend/storage/buffer/localbuf.c | 20 ++++++++++++++++++++ src/include/storage/bufmgr.h | 5 +++++ 2 files changed, 25 insertions(+) diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index 15aac7d1c9fe4..dc84e7665c3b3 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -44,6 +44,10 @@ typedef struct int NLocBuffer = 0; /* until buffers are initialized */ + +int allocated_localbufs = 0; +int dirtied_localbufs = 0; + BufferDesc *LocalBufferDescriptors = NULL; Block *LocalBufferBlockPointers = NULL; int32 *LocalRefCount = NULL; @@ -509,7 +513,10 @@ MarkLocalBufferDirty(Buffer buffer) buf_state = pg_atomic_read_u32(&bufHdr->state); if (!(buf_state & BM_DIRTY)) + { pgBufferUsage.local_blks_dirtied++; + dirtied_localbufs++; + } buf_state |= BM_DIRTY; @@ -570,6 +577,12 @@ TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bit /* Clear earlier errors, if this IO failed, it'll be marked again */ buf_state &= ~BM_IO_ERROR; + if (buf_state & BM_DIRTY) + { + Assert(dirtied_localbufs > 0); + dirtied_localbufs--; + } + if (clear_dirty) buf_state &= ~BM_DIRTY; @@ -609,6 +622,12 @@ InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced) uint32 buf_state; LocalBufferLookupEnt *hresult; + if (pg_atomic_read_u32(&bufHdr->state) & BM_DIRTY) + { + Assert(dirtied_localbufs > 0); + dirtied_localbufs--; + } + /* * It's possible that we started IO on this buffer before e.g. aborting * the transaction that created a table. We need to wait for that IO to @@ -947,6 +966,7 @@ GetLocalBufferStorage(void) this_buf = cur_block + next_buf_in_block * BLCKSZ; next_buf_in_block++; total_bufs_allocated++; + allocated_localbufs++; /* * Caller's PinLocalBuffer() was too early for Valgrind updates, so do it diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 97c1124c12acf..87591edd18fbd 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -188,6 +188,11 @@ extern PGDLLIMPORT char *BufferBlocks; /* in localbuf.c */ extern PGDLLIMPORT int NLocBuffer; + +/* Local buffer statistics */ +extern PGDLLIMPORT int allocated_localbufs; +extern PGDLLIMPORT int dirtied_localbufs; + extern PGDLLIMPORT Block *LocalBufferBlockPointers; extern PGDLLIMPORT int32 *LocalRefCount; From 17628d823c7f8ca410fbeaccba553de9abdbac19 Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Sun, 28 Sep 2025 12:35:27 +0200 Subject: [PATCH 2/4] Make the parallel_safe flag smarter. Change the concept of parallel safety slightly: a query subtree may be parallel-safe if it includes a temporary table scan, but each buffer of this temporary table is flushed to disk. In this case, minor changes within the planner and executor may allow scanning the temporary table in parallel. By this commit, the optimiser uses the 'parallel_safe' flag to indicate that the subtree refers to a source with temporary storage. Path's parallel_safe field may be used in cost-based optimisation, Plan's parallel_safe field indicates if a Gather or GatherMerge node should flush all temporary buffers before launching any parallel worker. We don't make this flag very selective. If different paths of the same RelOptInfo have various targets, we indicate that each path requires buffer flushing, even if only one of them actually needs it. --- src/backend/optimizer/path/allpaths.c | 30 +-- src/backend/optimizer/path/costsize.c | 2 + src/backend/optimizer/path/equivclass.c | 9 +- src/backend/optimizer/plan/createplan.c | 20 +- src/backend/optimizer/plan/planmain.c | 2 +- src/backend/optimizer/plan/planner.c | 52 +++-- src/backend/optimizer/plan/setrefs.c | 2 +- src/backend/optimizer/plan/subselect.c | 4 +- src/backend/optimizer/util/clauses.c | 24 ++- src/backend/optimizer/util/pathnode.c | 187 +++++++++++------- src/backend/optimizer/util/relnode.c | 10 +- src/backend/utils/misc/guc_parameters.dat | 7 + src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/nodes/execnodes.h | 2 + src/include/nodes/pathnodes.h | 6 +- src/include/nodes/plannodes.h | 7 +- src/include/nodes/primnodes.h | 14 +- src/include/optimizer/clauses.h | 2 +- src/include/optimizer/optimizer.h | 6 + src/include/optimizer/planmain.h | 2 +- 20 files changed, 265 insertions(+), 124 deletions(-) diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 4c43fd0b19b23..58e64e2063137 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -655,23 +655,25 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, /* This should only be called for baserels and appendrel children. */ Assert(IS_SIMPLE_REL(rel)); + /* Set if the data source refers temp storage somehow */ + rel->needs_temp_safety = false; + /* Assorted checks based on rtekind. */ switch (rte->rtekind) { case RTE_RELATION: /* - * Currently, parallel workers can't access the leader's temporary - * tables. We could possibly relax this if we wrote all of its - * local buffers at the start of the query and made no changes - * thereafter (maybe we could allow hint bit changes), and if we - * taught the workers to read them. Writing a large number of - * temporary buffers could be expensive, though, and we don't have - * the rest of the necessary infrastructure right now anyway. So - * for now, bail out if we see a temporary table. + * It is not free to process objects with a temporary storage in + * parallel because we need to flush temporary buffers beforehand. + * So, hide this feature under a GUC. */ if (get_rel_persistence(rte->relid) == RELPERSISTENCE_TEMP) - return; + { + if (!extended_parallel_processing) + return; + rel->needs_temp_safety = true; + } /* * Table sampling can be pushed down to workers if the sample @@ -683,7 +685,7 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, if (proparallel != PROPARALLEL_SAFE) return; - if (!is_parallel_safe(root, (Node *) rte->tablesample->args)) + if (!is_parallel_safe(root, (Node *) rte->tablesample->args, &rel->needs_temp_safety)) return; } @@ -749,7 +751,7 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, case RTE_FUNCTION: /* Check for parallel-restricted functions. */ - if (!is_parallel_safe(root, (Node *) rte->functions)) + if (!is_parallel_safe(root, (Node *) rte->functions, &rel->needs_temp_safety)) return; break; @@ -759,7 +761,7 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, case RTE_VALUES: /* Check for parallel-restricted functions. */ - if (!is_parallel_safe(root, (Node *) rte->values_lists)) + if (!is_parallel_safe(root, (Node *) rte->values_lists, &rel->needs_temp_safety)) return; break; @@ -800,14 +802,14 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, * outer join clauses work correctly. It would likely break equivalence * classes, too. */ - if (!is_parallel_safe(root, (Node *) rel->baserestrictinfo)) + if (!is_parallel_safe(root, (Node *) rel->baserestrictinfo, &rel->needs_temp_safety)) return; /* * Likewise, if the relation's outputs are not parallel-safe, give up. * (Usually, they're just Vars, but sometimes they're not.) */ - if (!is_parallel_safe(root, (Node *) rel->reltarget->exprs)) + if (!is_parallel_safe(root, (Node *) rel->reltarget->exprs, &rel->needs_temp_safety)) return; /* We have a winner. */ diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index a39cc793b4d82..35a45c50f3fe3 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -164,6 +164,8 @@ bool enable_partition_pruning = true; bool enable_presorted_aggregate = true; bool enable_async_append = true; +bool extended_parallel_processing = true; + typedef struct { PlannerInfo *root; diff --git a/src/backend/optimizer/path/equivclass.c b/src/backend/optimizer/path/equivclass.c index 441f12f6c50cf..1573ffc5ce0b2 100644 --- a/src/backend/optimizer/path/equivclass.c +++ b/src/backend/optimizer/path/equivclass.c @@ -1015,6 +1015,7 @@ find_computable_ec_member(PlannerInfo *root, { List *emvars; ListCell *lc2; + bool needs_temp_flush = false; /* * We shouldn't be trying to sort by an equivalence class that @@ -1049,9 +1050,11 @@ find_computable_ec_member(PlannerInfo *root, /* * If requested, reject expressions that are not parallel-safe. We * check this last because it's a rather expensive test. + * TODO: Not sure if it is really necessary. */ if (require_parallel_safe && - !is_parallel_safe(root, (Node *) em->em_expr)) + (!is_parallel_safe(root, (Node *) em->em_expr, &needs_temp_flush) || + needs_temp_flush)) continue; return em; /* found usable expression */ @@ -1093,6 +1096,7 @@ relation_can_be_sorted_early(PlannerInfo *root, RelOptInfo *rel, foreach(lc, target->exprs) { Expr *targetexpr = (Expr *) lfirst(lc); + bool needs_temp_flush = false; em = find_ec_member_matching_expr(ec, targetexpr, rel->relids); if (!em) @@ -1112,7 +1116,8 @@ relation_can_be_sorted_early(PlannerInfo *root, RelOptInfo *rel, * check this last because it's a rather expensive test. */ if (require_parallel_safe && - !is_parallel_safe(root, (Node *) em->em_expr)) + (!is_parallel_safe(root, (Node *) em->em_expr, &needs_temp_flush) || + needs_temp_flush)) continue; return true; diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index bc417f9384018..40e5468960035 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -100,7 +100,7 @@ static Plan *create_projection_plan(PlannerInfo *root, ProjectionPath *best_path, int flags); static Plan *inject_projection_plan(Plan *subplan, List *tlist, - bool parallel_safe); + ParallelSafe parallel_safe); static Sort *create_sort_plan(PlannerInfo *root, SortPath *best_path, int flags); static IncrementalSort *create_incrementalsort_plan(PlannerInfo *root, IncrementalSortPath *best_path, int flags); @@ -1956,7 +1956,7 @@ create_projection_plan(PlannerInfo *root, ProjectionPath *best_path, int flags) * to apply (since the tlist might be unsafe even if the child plan is safe). */ static Plan * -inject_projection_plan(Plan *subplan, List *tlist, bool parallel_safe) +inject_projection_plan(Plan *subplan, List *tlist, ParallelSafe parallel_safe) { Plan *plan; @@ -1988,7 +1988,7 @@ inject_projection_plan(Plan *subplan, List *tlist, bool parallel_safe) * flag of the FDW's own Path node. */ Plan * -change_plan_targetlist(Plan *subplan, List *tlist, bool tlist_parallel_safe) +change_plan_targetlist(Plan *subplan, List *tlist, ParallelSafe tlist_parallel_safe) { /* * If the top plan node can't do projections and its existing target list @@ -2004,7 +2004,7 @@ change_plan_targetlist(Plan *subplan, List *tlist, bool tlist_parallel_safe) { /* Else we can just replace the plan node's tlist */ subplan->targetlist = tlist; - subplan->parallel_safe &= tlist_parallel_safe; + subplan->parallel_safe = tlist_parallel_safe; } return subplan; } @@ -4195,7 +4195,8 @@ create_nestloop_plan(PlannerInfo *root, List *otherclauses; List *nestParams; List *outer_tlist; - bool outer_parallel_safe; + ParallelSafe outer_parallel_safe; + bool needs_temp_flush = false; Relids saveOuterRels = root->curOuterRels; ListCell *lc; @@ -4311,8 +4312,13 @@ create_nestloop_plan(PlannerInfo *root, true); outer_tlist = lappend(outer_tlist, tle); /* ... and track whether tlist is (still) parallel-safe */ - if (outer_parallel_safe) - outer_parallel_safe = is_parallel_safe(root, (Node *) phv); + if (outer_parallel_safe > PARALLEL_UNSAFE) + { + if (!is_parallel_safe(root, (Node *) phv, &needs_temp_flush)) + outer_parallel_safe = PARALLEL_UNSAFE; + else if (needs_temp_flush) + outer_parallel_safe = NEEDS_TEMP_FLUSH; + } } if (outer_tlist != outer_plan->targetlist) outer_plan = change_plan_targetlist(outer_plan, outer_tlist, diff --git a/src/backend/optimizer/plan/planmain.c b/src/backend/optimizer/plan/planmain.c index eefc486a5666c..af9492e20fa96 100644 --- a/src/backend/optimizer/plan/planmain.c +++ b/src/backend/optimizer/plan/planmain.c @@ -126,7 +126,7 @@ query_planner(PlannerInfo *root, (root->query_level > 1 || debug_parallel_query != DEBUG_PARALLEL_OFF)) final_rel->consider_parallel = - is_parallel_safe(root, parse->jointree->quals); + is_parallel_safe(root, parse->jointree->quals, &final_rel->needs_temp_safety); /* * The only path for it is a trivial Result path. We cheat a diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 1268ea92b6f0d..0fffc455e776d 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -536,7 +536,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, gather->plan.plan_rows = top_plan->plan_rows; gather->plan.plan_width = top_plan->plan_width; gather->plan.parallel_aware = false; - gather->plan.parallel_safe = false; + gather->plan.parallel_safe = PARALLEL_UNSAFE; /* * Delete the initplans' cost from top_plan. We needn't add it to the @@ -1473,6 +1473,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, List *final_targets; List *final_targets_contain_srfs; bool final_target_parallel_safe; + bool needs_temp_flush = false; RelOptInfo *current_rel; RelOptInfo *final_rel; FinalPathExtraData extra; @@ -1524,7 +1525,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, /* And check whether it's parallel safe */ final_target_parallel_safe = - is_parallel_safe(root, (Node *) final_target->exprs); + is_parallel_safe(root, (Node *) final_target->exprs, &needs_temp_flush); /* The setop result tlist couldn't contain any SRFs */ Assert(!parse->hasTargetSRFs); @@ -1694,7 +1695,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, */ final_target = create_pathtarget(root, root->processed_tlist); final_target_parallel_safe = - is_parallel_safe(root, (Node *) final_target->exprs); + is_parallel_safe(root, (Node *) final_target->exprs, &needs_temp_flush); /* * If ORDER BY was given, consider whether we should use a post-sort @@ -1707,7 +1708,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, final_target, &have_postponed_srfs); sort_input_target_parallel_safe = - is_parallel_safe(root, (Node *) sort_input_target->exprs); + is_parallel_safe(root, (Node *) sort_input_target->exprs, &needs_temp_flush); } else { @@ -1726,7 +1727,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, final_target, activeWindows); grouping_target_parallel_safe = - is_parallel_safe(root, (Node *) grouping_target->exprs); + is_parallel_safe(root, (Node *) grouping_target->exprs, &needs_temp_flush); } else { @@ -1745,7 +1746,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, { scanjoin_target = make_group_input_target(root, final_target); scanjoin_target_parallel_safe = - is_parallel_safe(root, (Node *) scanjoin_target->exprs); + is_parallel_safe(root, (Node *) scanjoin_target->exprs, &needs_temp_flush); } else { @@ -1797,6 +1798,18 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, scanjoin_targets_contain_srfs = NIL; } + /* + * Each path may have individual target containing or not references to + * relations with temporary storages. There were attempts to do it + * smartly that end up with a new Target::needs_temp_flush field that + * seems too invasive for this first attempt. + * So, just set current_rel flag as needed for temp buffers flushing and + * let Gather to do the job earlier than it could be. + * XXX: we need to be sure that no one new path created with all these + * target lists till now. + */ + current_rel->needs_temp_safety |= needs_temp_flush; + /* Apply scan/join target. */ scanjoin_target_same_exprs = list_length(scanjoin_targets) == 1 && equal(scanjoin_target->exprs, current_rel->reltarget->exprs); @@ -1905,9 +1918,13 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, * query. */ if (current_rel->consider_parallel && - is_parallel_safe(root, parse->limitOffset) && - is_parallel_safe(root, parse->limitCount)) + is_parallel_safe(root, parse->limitOffset, &needs_temp_flush) && + is_parallel_safe(root, parse->limitCount, &needs_temp_flush)) + { final_rel->consider_parallel = true; + final_rel->needs_temp_safety |= + current_rel->needs_temp_safety | needs_temp_flush; + } /* * If the current_rel belongs to a single FDW, so does the final_rel. @@ -3950,8 +3967,11 @@ make_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel, * target list and HAVING quals are parallel-safe. */ if (input_rel->consider_parallel && target_parallel_safe && - is_parallel_safe(root, havingQual)) + is_parallel_safe(root, (Node *) havingQual, &grouped_rel->needs_temp_safety)) + { grouped_rel->consider_parallel = true; + grouped_rel->needs_temp_safety |= input_rel->needs_temp_safety; + } /* * If the input rel belongs to a single FDW, so does the grouped rel. @@ -4570,8 +4590,11 @@ create_window_paths(PlannerInfo *root, * target list and active windows for non-parallel-safe constructs. */ if (input_rel->consider_parallel && output_target_parallel_safe && - is_parallel_safe(root, (Node *) activeWindows)) + is_parallel_safe(root, (Node *) activeWindows, &window_rel->needs_temp_safety)) + { window_rel->consider_parallel = true; + window_rel->needs_temp_safety |= input_rel->needs_temp_safety; + } /* * If the input rel belongs to a single FDW, so does the window rel. @@ -7033,10 +7056,12 @@ plan_create_index_workers(Oid tableOid, Oid indexOid) * Currently, parallel workers can't access the leader's temporary tables. * Furthermore, any index predicate or index expressions must be parallel * safe. + * TODO: Is this hard to enable? */ if (heap->rd_rel->relpersistence == RELPERSISTENCE_TEMP || - !is_parallel_safe(root, (Node *) RelationGetIndexExpressions(index)) || - !is_parallel_safe(root, (Node *) RelationGetIndexPredicate(index))) + !is_parallel_safe(root, (Node *) RelationGetIndexExpressions(index), &rel->needs_temp_safety) || + !is_parallel_safe(root, (Node *) RelationGetIndexPredicate(index), &rel->needs_temp_safety) || + rel->needs_temp_safety) { parallel_workers = 0; goto done; @@ -8822,7 +8847,8 @@ create_partial_unique_paths(PlannerInfo *root, RelOptInfo *input_rel, * nothing to do if there's anything in the targetlist that's * parallel-restricted. */ - if (!is_parallel_safe(root, (Node *) unique_rel->reltarget->exprs)) + if (!is_parallel_safe(root, (Node *) unique_rel->reltarget->exprs, + &unique_rel->needs_temp_safety)) return; cheapest_partial_path = linitial(input_rel->partial_pathlist); diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index cd7ea1e6b5873..72f5efd747600 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -1582,7 +1582,7 @@ clean_up_removed_plan_level(Plan *parent, Plan *child) child->startup_cost += initplan_cost; child->total_cost += initplan_cost; if (unsafe_initplans) - child->parallel_safe = false; + child->parallel_safe = PARALLEL_UNSAFE; /* * Attach plans this way so that parent's initplans are processed diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index ff63d20f8d536..cd1061e339d43 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -1009,7 +1009,7 @@ SS_process_ctes(PlannerInfo *root) * CTE scans are not considered for parallelism (cf * set_rel_consider_parallel). */ - splan->parallel_safe = false; + splan->parallel_safe = PARALLEL_UNSAFE; splan->setParam = NIL; splan->parParam = NIL; splan->args = NIL; @@ -2308,7 +2308,7 @@ SS_charge_for_initplans(PlannerInfo *root, RelOptInfo *final_rel) path->startup_cost += initplan_cost; path->total_cost += initplan_cost; if (unsafe_initplans) - path->parallel_safe = false; + path->parallel_safe = PARALLEL_UNSAFE; } /* diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index 67b7de16fc5c3..9f82b5189da31 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -95,6 +95,7 @@ typedef struct char max_hazard; /* worst proparallel hazard found so far */ char max_interesting; /* worst proparallel hazard of interest */ List *safe_param_ids; /* PARAM_EXEC Param IDs to treat as safe */ + bool hasTempObject; } max_parallel_hazard_context; static bool contain_agg_clause_walker(Node *node, void *context); @@ -760,13 +761,17 @@ max_parallel_hazard(Query *parse) * * root->glob->maxParallelHazard must previously have been set to the * result of max_parallel_hazard() on the whole query. + * + * Expression may contain a reference to subplan that employs temporary + * relations. That's why the flag needs_temp_flush is introduced. */ bool -is_parallel_safe(PlannerInfo *root, Node *node) +is_parallel_safe(PlannerInfo *root, Node *node, bool *needs_temp_flush) { max_parallel_hazard_context context; PlannerInfo *proot; ListCell *l; + bool is_safe; /* * Even if the original querytree contained nothing unsafe, we need to @@ -798,7 +803,20 @@ is_parallel_safe(PlannerInfo *root, Node *node) } } - return !max_parallel_hazard_walker(node, &context); + is_safe = !max_parallel_hazard_walker(node, &context); + + /* + * If the expression is parallel-safe, detect if it needs temp buffers + * flushing before the execution start. Don't care changing it if + * the expression is unsafe - it can't be executed by parallel workers + * anyway. + * In some cases user is interested in only negative result to test an idea. + * So, if incoming poointer is NULL, skip this step. + */ + if (needs_temp_flush && is_safe && context.hasTempObject) + *needs_temp_flush = NEEDS_TEMP_FLUSH; + + return is_safe; } /* core logic for all parallel-hazard checks */ @@ -920,6 +938,8 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context)) return true; save_safe_param_ids = context->safe_param_ids; + context->hasTempObject = + context->hasTempObject || (subplan->parallel_safe == NEEDS_TEMP_FLUSH); context->safe_param_ids = list_concat_copy(context->safe_param_ids, subplan->paramIds); if (max_parallel_hazard_walker(subplan->testexpr, context)) diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index b6be4ddbd01b2..37de58140e8cb 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -969,6 +969,17 @@ add_partial_path_precheck(RelOptInfo *parent_rel, int disabled_nodes, return true; } +static inline ParallelSafe +parallel_safety(RelOptInfo *rel) +{ + if (!rel->consider_parallel) + return PARALLEL_UNSAFE; + + if (rel->needs_temp_safety) + return NEEDS_TEMP_FLUSH; + + return PARALLEL_SAFE; +} /***************************************************************************** * PATH NODE CREATION ROUTINES @@ -991,7 +1002,7 @@ create_seqscan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = (parallel_workers > 0); - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = parallel_workers; pathnode->pathkeys = NIL; /* seqscan has unordered result */ @@ -1015,7 +1026,7 @@ create_samplescan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = NIL; /* samplescan has unordered result */ @@ -1067,7 +1078,7 @@ create_index_path(PlannerInfo *root, pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = pathkeys; @@ -1110,7 +1121,7 @@ create_bitmap_heap_path(PlannerInfo *root, pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->path.parallel_aware = (parallel_degree > 0); - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = parallel_degree; pathnode->path.pathkeys = NIL; /* always unordered */ @@ -1162,7 +1173,7 @@ create_bitmap_and_path(PlannerInfo *root, * without actually iterating over the list of children. */ pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; /* always unordered */ @@ -1214,7 +1225,7 @@ create_bitmap_or_path(PlannerInfo *root, * without actually iterating over the list of children. */ pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; /* always unordered */ @@ -1243,7 +1254,7 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals, pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; /* always unordered */ @@ -1273,7 +1284,7 @@ create_tidrangescan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->path.parallel_aware = (parallel_workers > 0); - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = parallel_workers; pathnode->path.pathkeys = NIL; /* always unordered */ @@ -1334,7 +1345,7 @@ create_append_path(PlannerInfo *root, required_outer); pathnode->path.parallel_aware = parallel_aware; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = parallel_workers; pathnode->path.pathkeys = pathkeys; @@ -1375,8 +1386,8 @@ create_append_path(PlannerInfo *root, { Path *subpath = (Path *) lfirst(l); - pathnode->path.parallel_safe = pathnode->path.parallel_safe && - subpath->parallel_safe; + pathnode->path.parallel_safe = Min(pathnode->path.parallel_safe, + subpath->parallel_safe); /* All child paths must have same parameterization */ Assert(bms_equal(PATH_REQ_OUTER(subpath), required_outer)); @@ -1492,7 +1503,7 @@ create_merge_append_path(PlannerInfo *root, pathnode->path.pathtarget = rel->reltarget; pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = pathkeys; pathnode->subpaths = subpaths; @@ -1524,8 +1535,8 @@ create_merge_append_path(PlannerInfo *root, Assert(bms_is_empty(PATH_REQ_OUTER(subpath))); pathnode->path.rows += subpath->rows; - pathnode->path.parallel_safe = pathnode->path.parallel_safe && - subpath->parallel_safe; + pathnode->path.parallel_safe = Min(pathnode->path.parallel_safe, + subpath->parallel_safe); if (!pathkeys_count_contained_in(pathkeys, subpath->pathkeys, &presorted_keys)) @@ -1617,7 +1628,7 @@ create_group_result_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.pathtarget = target; pathnode->path.param_info = NULL; /* there are no other rels... */ pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; pathnode->quals = havingqual; @@ -1666,8 +1677,7 @@ create_material_path(RelOptInfo *rel, Path *subpath) pathnode->path.pathtarget = rel->reltarget; pathnode->path.param_info = subpath->param_info; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = Min(parallel_safety(rel), subpath->parallel_safe); pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.pathkeys = subpath->pathkeys; @@ -1701,8 +1711,8 @@ create_memoize_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, pathnode->path.pathtarget = rel->reltarget; pathnode->path.param_info = subpath->param_info; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = Min(parallel_safety(rel), + subpath->parallel_safe); pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.pathkeys = subpath->pathkeys; @@ -1813,7 +1823,7 @@ create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = false; + pathnode->path.parallel_safe = PARALLEL_UNSAFE; pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; /* Gather has unordered result */ @@ -1856,8 +1866,8 @@ create_subqueryscan_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = Min(parallel_safety(rel), + subpath->parallel_safe); pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.pathkeys = pathkeys; pathnode->subpath = subpath; @@ -1885,7 +1895,7 @@ create_functionscan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = pathkeys; @@ -1911,7 +1921,7 @@ create_tablefuncscan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = NIL; /* result is always unordered */ @@ -1937,7 +1947,7 @@ create_valuesscan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = NIL; /* result is always unordered */ @@ -1963,7 +1973,7 @@ create_ctescan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = pathkeys; @@ -1989,7 +1999,7 @@ create_namedtuplestorescan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = NIL; /* result is always unordered */ @@ -2015,7 +2025,7 @@ create_resultscan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = NIL; /* result is always unordered */ @@ -2041,7 +2051,7 @@ create_worktablescan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = NIL; /* result is always unordered */ @@ -2084,7 +2094,7 @@ create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.rows = rows; pathnode->path.disabled_nodes = disabled_nodes; @@ -2138,7 +2148,7 @@ create_foreign_join_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.pathtarget = target ? target : rel->reltarget; pathnode->path.param_info = NULL; /* XXX see above */ pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.rows = rows; pathnode->path.disabled_nodes = disabled_nodes; @@ -2187,7 +2197,7 @@ create_foreign_upper_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.pathtarget = target ? target : rel->reltarget; pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.rows = rows; pathnode->path.disabled_nodes = disabled_nodes; @@ -2351,8 +2361,9 @@ create_nestloop_path(PlannerInfo *root, required_outer, &restrict_clauses); pathnode->jpath.path.parallel_aware = false; - pathnode->jpath.path.parallel_safe = joinrel->consider_parallel && - outer_path->parallel_safe && inner_path->parallel_safe; + pathnode->jpath.path.parallel_safe = Min(Min(parallel_safety(joinrel), + outer_path->parallel_safe), + inner_path->parallel_safe); /* This is a foolish way to estimate parallel_workers, but for now... */ pathnode->jpath.path.parallel_workers = outer_path->parallel_workers; pathnode->jpath.path.pathkeys = pathkeys; @@ -2417,8 +2428,9 @@ create_mergejoin_path(PlannerInfo *root, required_outer, &restrict_clauses); pathnode->jpath.path.parallel_aware = false; - pathnode->jpath.path.parallel_safe = joinrel->consider_parallel && - outer_path->parallel_safe && inner_path->parallel_safe; + pathnode->jpath.path.parallel_safe = Min(Min(parallel_safety(joinrel), + outer_path->parallel_safe), + inner_path->parallel_safe); /* This is a foolish way to estimate parallel_workers, but for now... */ pathnode->jpath.path.parallel_workers = outer_path->parallel_workers; pathnode->jpath.path.pathkeys = pathkeys; @@ -2483,8 +2495,9 @@ create_hashjoin_path(PlannerInfo *root, &restrict_clauses); pathnode->jpath.path.parallel_aware = joinrel->consider_parallel && parallel_hash; - pathnode->jpath.path.parallel_safe = joinrel->consider_parallel && - outer_path->parallel_safe && inner_path->parallel_safe; + pathnode->jpath.path.parallel_safe = Min(Min(parallel_safety(joinrel), + outer_path->parallel_safe), + inner_path->parallel_safe); /* This is a foolish way to estimate parallel_workers, but for now... */ pathnode->jpath.path.parallel_workers = outer_path->parallel_workers; @@ -2513,6 +2526,33 @@ create_hashjoin_path(PlannerInfo *root, return pathnode; } +static inline ParallelSafe +compute_parallel_safety(PlannerInfo *root, RelOptInfo *rel, + PathTarget *target, Path *subpath) +{ + ParallelSafe level = PARALLEL_SAFE; + bool needs_temp_flush = false; + + if (!rel->consider_parallel) + return PARALLEL_UNSAFE; + + if (rel->needs_temp_safety) + level = NEEDS_TEMP_FLUSH; + + if (subpath) + level = Min(level, subpath->parallel_safe); + + if (target) + { + if (!is_parallel_safe(root, (Node *) target->exprs, &needs_temp_flush)) + return PARALLEL_UNSAFE; + + if (needs_temp_flush) + level = Min(level, NEEDS_TEMP_FLUSH); + } + return level; +} + /* * create_projection_path * Creates a pathnode that represents performing a projection. @@ -2551,9 +2591,9 @@ create_projection_path(PlannerInfo *root, pathnode->path.pathtarget = target; pathnode->path.param_info = subpath->param_info; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe && - is_parallel_safe(root, (Node *) target->exprs); + + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, target, subpath); + pathnode->path.parallel_workers = subpath->parallel_workers; /* Projection does not change the sort order */ pathnode->path.pathkeys = subpath->pathkeys; @@ -2661,9 +2701,12 @@ apply_projection_to_path(PlannerInfo *root, * arrange for the subpath to return the required target list so that * workers can help project. But if there is something that is not * parallel-safe in the target expressions, then we can't. + * + * XXX: don't need flag here because create_projection_path will check the + * target safety anyway. */ if ((IsA(path, GatherPath) || IsA(path, GatherMergePath)) && - is_parallel_safe(root, (Node *) target->exprs)) + is_parallel_safe(root, (Node *) target->exprs, NULL)) { /* * We always use create_projection_path here, even if the subpath is @@ -2697,14 +2740,14 @@ apply_projection_to_path(PlannerInfo *root, } } else if (path->parallel_safe && - !is_parallel_safe(root, (Node *) target->exprs)) + !is_parallel_safe(root, (Node *) target->exprs, NULL)) { /* * We're inserting a parallel-restricted target list into a path * currently marked parallel-safe, so we have to mark it as no longer * safe. */ - path->parallel_safe = false; + path->parallel_safe = PARALLEL_UNSAFE; } return path; @@ -2735,9 +2778,7 @@ create_set_projection_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe && - is_parallel_safe(root, (Node *) target->exprs); + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, target, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; /* Projection does not change the sort order XXX? */ pathnode->path.pathkeys = subpath->pathkeys; @@ -2806,8 +2847,7 @@ create_incremental_sort_path(PlannerInfo *root, pathnode->path.pathtarget = subpath->pathtarget; pathnode->path.param_info = subpath->param_info; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.pathkeys = pathkeys; @@ -2853,8 +2893,7 @@ create_sort_path(PlannerInfo *root, pathnode->path.pathtarget = subpath->pathtarget; pathnode->path.param_info = subpath->param_info; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.pathkeys = pathkeys; @@ -2899,8 +2938,7 @@ create_group_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; /* Group doesn't change sort ordering */ pathnode->path.pathkeys = subpath->pathkeys; @@ -2954,8 +2992,7 @@ create_unique_path(PlannerInfo *root, pathnode->path.pathtarget = subpath->pathtarget; pathnode->path.param_info = subpath->param_info; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; /* Unique doesn't change the input ordering */ pathnode->path.pathkeys = subpath->pathkeys; @@ -3010,8 +3047,7 @@ create_agg_path(PlannerInfo *root, pathnode->path.pathtarget = target; pathnode->path.param_info = subpath->param_info; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; if (aggstrategy == AGG_SORTED) @@ -3094,8 +3130,7 @@ create_groupingsets_path(PlannerInfo *root, pathnode->path.pathtarget = target; pathnode->path.param_info = subpath->param_info; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->subpath = subpath; @@ -3255,7 +3290,7 @@ create_minmaxagg_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = true; /* might change below */ + pathnode->path.parallel_safe = PARALLEL_SAFE; /* might change below */ pathnode->path.parallel_workers = 0; /* Result is one unordered row */ pathnode->path.rows = 1; @@ -3273,7 +3308,7 @@ create_minmaxagg_path(PlannerInfo *root, initplan_disabled_nodes += mminfo->path->disabled_nodes; initplan_cost += mminfo->pathcost; if (!mminfo->path->parallel_safe) - pathnode->path.parallel_safe = false; + pathnode->path.parallel_safe = PARALLEL_UNSAFE; } /* add tlist eval cost for each output row, plus cpu_tuple_cost */ @@ -3301,10 +3336,16 @@ create_minmaxagg_path(PlannerInfo *root, * we are in a subquery then it can be useful for the outer query to know * that this one is parallel-safe.) */ - if (pathnode->path.parallel_safe) - pathnode->path.parallel_safe = - is_parallel_safe(root, (Node *) target->exprs) && - is_parallel_safe(root, (Node *) quals); + if (pathnode->path.parallel_safe > PARALLEL_UNSAFE) + { + bool needs_temp_flush = false; + + if (!is_parallel_safe(root, (Node *) target->exprs, &needs_temp_flush) || + !is_parallel_safe(root, (Node *) quals, &needs_temp_flush)) + pathnode->path.parallel_safe = PARALLEL_UNSAFE; + else if (needs_temp_flush) + pathnode->path.parallel_safe = NEEDS_TEMP_FLUSH; + } return pathnode; } @@ -3349,8 +3390,7 @@ create_windowagg_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; /* WindowAgg preserves the input sort order */ pathnode->path.pathkeys = subpath->pathkeys; @@ -3419,8 +3459,7 @@ create_setop_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - leftpath->parallel_safe && rightpath->parallel_safe; + pathnode->path.parallel_safe = Min(compute_parallel_safety(root, rel, NULL, leftpath), rightpath->parallel_safe); pathnode->path.parallel_workers = leftpath->parallel_workers + rightpath->parallel_workers; /* SetOp preserves the input sort order if in sort mode */ @@ -3537,8 +3576,7 @@ create_recursiveunion_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - leftpath->parallel_safe && rightpath->parallel_safe; + pathnode->path.parallel_safe = Min(compute_parallel_safety(root, rel, NULL, leftpath), rightpath->parallel_safe); /* Foolish, but we'll do it like joins for now: */ pathnode->path.parallel_workers = leftpath->parallel_workers; /* RecursiveUnion result is always unsorted */ @@ -3577,7 +3615,7 @@ create_lockrows_path(PlannerInfo *root, RelOptInfo *rel, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = false; + pathnode->path.parallel_safe = PARALLEL_UNSAFE; pathnode->path.parallel_workers = 0; pathnode->path.rows = subpath->rows; @@ -3656,7 +3694,7 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = false; + pathnode->path.parallel_safe = PARALLEL_UNSAFE; pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; @@ -3742,8 +3780,7 @@ create_limit_path(PlannerInfo *root, RelOptInfo *rel, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.rows = subpath->rows; pathnode->path.disabled_nodes = subpath->disabled_nodes; diff --git a/src/backend/optimizer/util/relnode.c b/src/backend/optimizer/util/relnode.c index 405f4dae1092a..5514c1ba73542 100644 --- a/src/backend/optimizer/util/relnode.c +++ b/src/backend/optimizer/util/relnode.c @@ -225,6 +225,7 @@ build_simple_rel(PlannerInfo *root, int relid, RelOptInfo *parent) rel->consider_startup = (root->tuple_fraction > 0); rel->consider_param_startup = false; /* might get changed later */ rel->consider_parallel = false; /* might get changed later */ + rel->needs_temp_safety = false; /* might get changed later */ rel->reltarget = create_empty_pathtarget(); rel->pathlist = NIL; rel->ppilist = NIL; @@ -822,6 +823,7 @@ build_join_rel(PlannerInfo *root, joinrel->consider_startup = (root->tuple_fraction > 0); joinrel->consider_param_startup = false; joinrel->consider_parallel = false; + joinrel->needs_temp_safety = false; joinrel->reltarget = create_empty_pathtarget(); joinrel->pathlist = NIL; joinrel->ppilist = NIL; @@ -959,9 +961,13 @@ build_join_rel(PlannerInfo *root, * here. */ if (inner_rel->consider_parallel && outer_rel->consider_parallel && - is_parallel_safe(root, (Node *) restrictlist) && - is_parallel_safe(root, (Node *) joinrel->reltarget->exprs)) + is_parallel_safe(root, (Node *) restrictlist, &joinrel->needs_temp_safety) && + is_parallel_safe(root, (Node *) joinrel->reltarget->exprs, &joinrel->needs_temp_safety)) + { joinrel->consider_parallel = true; + joinrel->needs_temp_safety |= + (inner_rel->needs_temp_safety | outer_rel->needs_temp_safety); + } /* Add the joinrel to the PlannerInfo. */ add_join_rel(root, joinrel); diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index ac0c7c36c5617..9099013e975d6 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -1024,6 +1024,13 @@ boot_val => '"$system"', }, +{ name => 'extended_parallel_processing', type => 'bool', context => 'PGC_BACKEND', group => 'QUERY_TUNING_METHOD', + short_desc => 'Enable extra features of parallel pocessing.', + flags => 'GUC_EXPLAIN', + variable => 'extended_parallel_processing', + boot_val => 'true', +}, + { name => 'external_pid_file', type => 'string', context => 'PGC_POSTMASTER', group => 'FILE_LOCATIONS', short_desc => 'Writes the postmaster PID to the specified file.', flags => 'GUC_SUPERUSER_ONLY', diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index dc9e2255f8a7f..5f18eae807650 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -429,6 +429,7 @@ #enable_distinct_reordering = on #enable_self_join_elimination = on #enable_eager_aggregate = on +#extended_parallel_processing = on # - Planner Cost Constants - diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 3968429f99194..aecf631bd456f 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -743,6 +743,8 @@ typedef struct EState bool es_use_parallel_mode; /* can we use parallel workers? */ + bool es_need_temptables_flush; /* Do we still need to flush dirty temp pages? */ + int es_parallel_workers_to_launch; /* number of workers to * launch. */ int es_parallel_workers_launched; /* number of workers actually diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index b5ff456ef7fab..a01e72cf5effe 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -941,6 +941,10 @@ typedef struct RelOptInfo bool consider_param_startup; /* consider parallel paths? */ bool consider_parallel; + /* If the rel is allowed to be processed in parallel, does it need to flush + * temporary buffers? + */ + bool needs_temp_safety; /* * default result targetlist for Paths scanning this relation; list of @@ -1899,7 +1903,7 @@ typedef struct Path /* engage parallel-aware logic? */ bool parallel_aware; /* OK to use as part of parallel plan? */ - bool parallel_safe; + ParallelSafe parallel_safe; /* desired # of workers; 0 = not parallel */ int parallel_workers; diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index c4393a9432116..c9989a862cca7 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -212,7 +212,7 @@ typedef struct Plan /* engage parallel-aware logic? */ bool parallel_aware; /* OK to use as part of parallel plan? */ - bool parallel_safe; + ParallelSafe parallel_safe; /* * information needed for asynchronous execution @@ -1343,6 +1343,8 @@ typedef struct Gather bool single_copy; /* suppress EXPLAIN display (for testing)? */ bool invisible; + /* Signal if any object with temporary storage is scanned in this subtree */ + bool process_temp_tables; /* * param id's of initplans which are referred at gather or one of its @@ -1382,6 +1384,9 @@ typedef struct GatherMerge /* NULLS FIRST/LAST directions */ bool *nullsFirst pg_node_attr(array_size(numCols)); + /* Signal if any objects with temporary storage are scanned in this subtree */ + bool process_temp_tables; + /* * param id's of initplans which are referred at gather merge or one of * its child nodes diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index 1b4436f2ff6d4..d20b83cfb0647 100644 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -1050,6 +1050,18 @@ typedef struct SubLink ParseLoc location; /* token location, or -1 if unknown */ } SubLink; +/* + * Start from zero and put NEEDS_TEMP_FLUSH as a first positive value. + * In this case if someone still uses true/false values for this type it just + * causes more temp buffers flushes without an error. + */ +typedef enum ParallelSafe +{ + PARALLEL_UNSAFE = 0, + NEEDS_TEMP_FLUSH, + PARALLEL_SAFE, +} ParallelSafe; + /* * SubPlan - executable expression node for a subplan (sub-SELECT) * @@ -1114,7 +1126,7 @@ typedef struct SubPlan bool unknownEqFalse; /* true if it's okay to return FALSE when the * spec result is UNKNOWN; this allows much * simpler handling of null values */ - bool parallel_safe; /* is the subplan parallel-safe? */ + ParallelSafe parallel_safe; /* is the subplan parallel-safe? */ /* Note: parallel_safe does not consider contents of testexpr or args */ /* Information for passing params into and out of the subselect: */ /* setParam and parParam are lists of integers (param IDs) */ diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h index fc38eae5c5a0f..e9f72dcd9cc95 100644 --- a/src/include/optimizer/clauses.h +++ b/src/include/optimizer/clauses.h @@ -33,7 +33,7 @@ extern double expression_returns_set_rows(PlannerInfo *root, Node *clause); extern bool contain_subplans(Node *clause); extern char max_parallel_hazard(Query *parse); -extern bool is_parallel_safe(PlannerInfo *root, Node *node); +extern bool is_parallel_safe(PlannerInfo *root, Node *node, bool *needs_temp_flush); extern bool contain_nonstrict_functions(Node *clause); extern bool contain_exec_param(Node *clause, List *param_ids); extern bool contain_leaked_vars(Node *clause); diff --git a/src/include/optimizer/optimizer.h b/src/include/optimizer/optimizer.h index 44ec5296a183f..22e8b3d998cc0 100644 --- a/src/include/optimizer/optimizer.h +++ b/src/include/optimizer/optimizer.h @@ -80,6 +80,12 @@ extern PGDLLIMPORT double parallel_setup_cost; extern PGDLLIMPORT double recursive_worktable_factor; extern PGDLLIMPORT int effective_cache_size; +/* + * Enable extended feature of parallel query processing such as parallel + * temporary tables scan. + */ +extern PGDLLIMPORT bool extended_parallel_processing; + extern double clamp_row_est(double nrows); extern int32 clamp_width_est(int64 tuple_width); diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h index 00addf1599250..0eb417883acdd 100644 --- a/src/include/optimizer/planmain.h +++ b/src/include/optimizer/planmain.h @@ -45,7 +45,7 @@ extern ForeignScan *make_foreignscan(List *qptlist, List *qpqual, List *fdw_scan_tlist, List *fdw_recheck_quals, Plan *outer_plan); extern Plan *change_plan_targetlist(Plan *subplan, List *tlist, - bool tlist_parallel_safe); + ParallelSafe tlist_parallel_safe); extern Plan *materialize_finished_plan(Plan *subplan); extern bool is_projection_capable_path(Path *path); extern bool is_projection_capable_plan(Plan *plan); From 4259f9edfbc30505f746e69c74107d3c9a8fd2e2 Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Sun, 28 Sep 2025 13:11:25 +0200 Subject: [PATCH 3/4] Enable parallel scan of temp tables. This commit adds a flag to Gather and GatherMerge that indicates whether the subtree contains temporary tables. Additionally, to prevent multiple flush attempts, EState has a flag that indicates whether temporary buffers have already been written to disk. Employing these two flags, Gather flushes temporary buffers before launching any parallel worker. Add some checks to detect accidential scanning of a temp table with not yet flushed buffers. --- src/backend/executor/execParallel.c | 4 ++ src/backend/executor/execUtils.c | 1 + src/backend/executor/nodeGather.c | 12 ++++++ src/backend/executor/nodeGatherMerge.c | 8 ++++ src/backend/optimizer/plan/createplan.c | 11 +++-- src/backend/optimizer/plan/planner.c | 1 + src/backend/storage/buffer/bufmgr.c | 6 +++ src/backend/storage/buffer/localbuf.c | 52 ++++++++++++++++++------ src/backend/utils/cache/relcache.c | 8 ++++ src/include/nodes/execnodes.h | 2 +- src/include/storage/buf_internals.h | 1 + src/include/storage/bufmgr.h | 1 + src/test/regress/expected/temp.out | 54 +++++++++++++++++++++++++ src/test/regress/sql/temp.sql | 39 ++++++++++++++++++ 14 files changed, 183 insertions(+), 17 deletions(-) diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 26200c5a3d6e5..40d8fa44c19cf 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -45,6 +45,7 @@ #include "jit/jit.h" #include "nodes/nodeFuncs.h" #include "pgstat.h" +#include "storage/bufmgr.h" #include "tcop/tcopprot.h" #include "utils/datum.h" #include "utils/dsa.h" @@ -78,6 +79,7 @@ typedef struct FixedParallelExecutorState dsa_pointer param_exec; int eflags; int jit_flags; + int dirtied_localbufs; /* Just for debugging purposes */ } FixedParallelExecutorState; /* @@ -768,6 +770,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, fpes->param_exec = InvalidDsaPointer; fpes->eflags = estate->es_top_eflags; fpes->jit_flags = estate->es_jit_flags; + fpes->dirtied_localbufs = dirtied_localbufs; shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes); /* Store query string */ @@ -1464,6 +1467,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Get fixed-size state. */ fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false); + dirtied_localbufs = fpes->dirtied_localbufs; /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */ receiver = ExecParallelGetReceiver(seg, toc); diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c index fdc65c2b42b33..09acdb18652d9 100644 --- a/src/backend/executor/execUtils.c +++ b/src/backend/executor/execUtils.c @@ -161,6 +161,7 @@ CreateExecutorState(void) estate->es_use_parallel_mode = false; estate->es_parallel_workers_to_launch = 0; estate->es_parallel_workers_launched = 0; + estate->es_tempbufs_flushed = false; /* Is the backend's temp buffers were flushed? */ estate->es_jit_flags = 0; estate->es_jit = NULL; diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index dc7d1830259f5..572a54df6add2 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -36,6 +36,7 @@ #include "executor/tqueue.h" #include "miscadmin.h" #include "optimizer/optimizer.h" +#include "storage/bufmgr.h" #include "utils/wait_event.h" @@ -161,6 +162,17 @@ ExecGather(PlanState *pstate) { ParallelContext *pcxt; + /* + * Flush temporary buffers if this parallel section contains + * any objects with temporary storage type. Don't bother to do it + * more than once per the query execution. + */ + if (gather->process_temp_tables && !estate->es_tempbufs_flushed) + { + FlushAllBuffers(); + estate->es_tempbufs_flushed = true; + } + /* Initialize, or re-initialize, shared state needed by workers. */ if (!node->pei) node->pei = ExecInitParallelPlan(outerPlanState(node), diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index c04522fea4d9e..4232a5a3a0bb4 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -22,6 +22,7 @@ #include "lib/binaryheap.h" #include "miscadmin.h" #include "optimizer/optimizer.h" +#include "storage/bufmgr.h" /* * When we read tuples from workers, it's a good idea to read several at once @@ -205,6 +206,13 @@ ExecGatherMerge(PlanState *pstate) { ParallelContext *pcxt; + /* The same as in the ExecGather */ + if (gm->process_temp_tables && !estate->es_tempbufs_flushed) + { + FlushAllBuffers(); + estate->es_tempbufs_flushed = true; + } + /* Initialize, or re-initialize, shared state needed by workers. */ if (!node->pei) node->pei = ExecInitParallelPlan(outerPlanState(node), diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 40e5468960035..4997cd2722d5a 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -298,7 +298,8 @@ static Unique *make_unique_from_pathkeys(Plan *lefttree, List *pathkeys, int numCols, Relids relids); static Gather *make_gather(List *qptlist, List *qpqual, - int nworkers, int rescan_param, bool single_copy, Plan *subplan); + int nworkers, int rescan_param, bool single_copy, + Plan *subplan, bool process_temp_tables); static SetOp *make_setop(SetOpCmd cmd, SetOpStrategy strategy, List *tlist, Plan *lefttree, Plan *righttree, List *groupList, Cardinality numGroups); @@ -1777,12 +1778,14 @@ create_gather_plan(PlannerInfo *root, GatherPath *best_path) tlist = build_path_tlist(root, &best_path->path); + Assert(best_path->subpath->parallel_safe > PARALLEL_UNSAFE); gather_plan = make_gather(tlist, NIL, best_path->num_workers, assign_special_exec_param(root), best_path->single_copy, - subplan); + subplan, + best_path->subpath->parallel_safe == NEEDS_TEMP_FLUSH); copy_generic_path_info(&gather_plan->plan, &best_path->path); @@ -6790,7 +6793,8 @@ make_gather(List *qptlist, int nworkers, int rescan_param, bool single_copy, - Plan *subplan) + Plan *subplan, + bool process_temp_tables) { Gather *node = makeNode(Gather); Plan *plan = &node->plan; @@ -6804,6 +6808,7 @@ make_gather(List *qptlist, node->single_copy = single_copy; node->invisible = false; node->initParam = NULL; + node->process_temp_tables = process_temp_tables; return node; } diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 0fffc455e776d..4f2bec2f5cd2c 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -514,6 +514,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, gather->num_workers = 1; gather->single_copy = true; gather->invisible = (debug_parallel_query == DEBUG_PARALLEL_REGRESS); + gather->process_temp_tables = (best_path->parallel_safe == NEEDS_TEMP_FLUSH); /* Transfer any initPlans to the new top node */ gather->plan.initPlan = top_plan->initPlan; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index eb55102b0d7fe..301b2ecd8828d 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -5112,6 +5112,12 @@ FlushRelationBuffers(Relation rel) } } +void +FlushAllBuffers(void) +{ + FlushAllLocalBuffers(); +} + /* --------------------------------------------------------------------- * FlushRelationsAllBuffers * diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index dc84e7665c3b3..1bde8738a2d1b 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -189,6 +189,12 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln) instr_time io_start; Page localpage = (char *) LocalBufHdrGetBlock(bufHdr); + /* + * Parallel temp table scan allows an access to temp tables. So, to be + * paranoid enough we should check it each time, flushing local buffer. + */ + Assert(!IsParallelWorker()); + Assert(LocalRefCount[-BufferDescriptorGetBuffer(bufHdr) - 1] > 0); /* @@ -750,19 +756,6 @@ InitLocalBuffers(void) HASHCTL info; int i; - /* - * Parallel workers can't access data in temporary tables, because they - * have no visibility into the local buffers of their leader. This is a - * convenient, low-cost place to provide a backstop check for that. Note - * that we don't wish to prevent a parallel worker from accessing catalog - * metadata about a temp table, so checks at higher levels would be - * inappropriate. - */ - if (IsParallelWorker()) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot access temporary tables during a parallel operation"))); - /* Allocate and zero buffer headers and auxiliary arrays */ LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc)); LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block)); @@ -1040,3 +1033,36 @@ AtProcExit_LocalBuffers(void) */ CheckForLocalBufferLeaks(); } + +/* + * Flush each temporary buffer page to the disk. + * + * It is costly operation needed solely to let temporary tables, indexes and + * 'toasts' participate in a parallel query plan. + */ +void +FlushAllLocalBuffers(void) +{ + int i; + + for (i = 0; i < NLocBuffer; i++) + { + BufferDesc *bufHdr = GetLocalBufferDescriptor(i); + uint32 buf_state; + + if (LocalBufHdrGetBlock(bufHdr) == NULL) + continue; + + buf_state = pg_atomic_read_u32(&bufHdr->state); + + /* XXX only valid dirty pages need to be flushed? */ + if ((buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) + { + PinLocalBuffer(bufHdr, false); + FlushLocalBuffer(bufHdr, NULL); + UnpinLocalBuffer(BufferDescriptorGetBuffer(bufHdr)); + } + } + + Assert(dirtied_localbufs == 0); +} diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 2d0cb7bcfd4a6..32d548677e2b8 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -2132,6 +2132,10 @@ RelationIdGetRelation(Oid relationId) Assert(rd->rd_isvalid || (rd->rd_isnailed && !criticalRelcachesBuilt)); } + + /* Consistency check to be paranoid introducing parallel temp scan. */ + Assert(!(rd != NULL && RelationUsesLocalBuffers(rd) && IsParallelWorker() && dirtied_localbufs != 0)); + return rd; } @@ -2142,6 +2146,10 @@ RelationIdGetRelation(Oid relationId) rd = RelationBuildDesc(relationId, true); if (RelationIsValid(rd)) RelationIncrementReferenceCount(rd); + + /* Consistency check to be paranoid introducing parallel temp scan. */ + Assert(!(rd != NULL && RelationUsesLocalBuffers(rd) && IsParallelWorker() && dirtied_localbufs != 0)); + return rd; } diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index aecf631bd456f..8c2c9727aaaff 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -743,7 +743,7 @@ typedef struct EState bool es_use_parallel_mode; /* can we use parallel workers? */ - bool es_need_temptables_flush; /* Do we still need to flush dirty temp pages? */ + bool es_tempbufs_flushed; /* Do we still need to flush dirty temp pages? */ int es_parallel_workers_to_launch; /* number of workers to * launch. */ diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 5400c56a965f0..0dd00415b44c6 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -542,6 +542,7 @@ extern void TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits, bool release_aio); extern bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait); extern void FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln); +extern void FlushAllLocalBuffers(void); extern void InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced); extern void DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber *forkNum, int nforks, diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 87591edd18fbd..1084c1e115a8d 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -284,6 +284,7 @@ extern BlockNumber RelationGetNumberOfBlocksInFork(Relation relation, ForkNumber forkNum); extern void FlushOneBuffer(Buffer buffer); extern void FlushRelationBuffers(Relation rel); +extern void FlushAllBuffers(void); extern void FlushRelationsAllBuffers(SMgrRelation *smgrs, int nrels); extern void CreateAndCopyRelationData(RelFileLocator src_rlocator, RelFileLocator dst_rlocator, diff --git a/src/test/regress/expected/temp.out b/src/test/regress/expected/temp.out index a50c7ae88a9c8..c7ccc71ca7e47 100644 --- a/src/test/regress/expected/temp.out +++ b/src/test/regress/expected/temp.out @@ -566,3 +566,57 @@ SELECT count(*), max(a) max_a, min(a) min_a, max(cnt) max_cnt FROM test_temp; -- cleanup DROP FUNCTION test_temp_pin(int, int); +-- Test visibility of a temporary table tuples in parallel workers +-- Although explain prints the number of workers planned and launched, In this +-- case it shouldn't cause test results float because the debugging +-- option force usage of at least single worker (normally no one is needed here +-- and we don't expect more than one worker. +CREATE TEMP TABLE test AS (SELECT x FROM generate_series(1,100) AS x); +VACUUM ANALYZE test; +SET max_parallel_workers_per_gather = 1; +SET parallel_setup_cost = 0; +SET parallel_tuple_cost = 0; +SET min_parallel_table_scan_size = 0; +SET min_parallel_index_scan_size = 0; +SET debug_parallel_query = 'on'; +-- Temp buffers will not be seen without flushing dirty buffers +EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF, BUFFERS OFF) +SELECT * FROM test; + QUERY PLAN +------------------------------------------------------------- + Gather (actual rows=100.00 loops=1) + Workers Planned: 1 + Workers Launched: 1 + -> Parallel Seq Scan on test (actual rows=50.00 loops=2) +(4 rows) + +-- Check temporary indexes too +CREATE INDEX idx1 ON test(x); +SET enable_seqscan = 'off'; +EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF, BUFFERS OFF) +SELECT * FROM test; + QUERY PLAN +------------------------------------------------------------------------------- + Gather (actual rows=100.00 loops=1) + Workers Planned: 1 + Workers Launched: 1 + -> Parallel Index Only Scan using idx1 on test (actual rows=50.00 loops=2) + Heap Fetches: 0 + Index Searches: 1 +(6 rows) + +RESET enable_seqscan; +-- a view doesn't have a storage - it shouldn't cause issues. +CREATE TEMP TABLE table_a (id integer); +CREATE TEMP VIEW view_a AS SELECT * FROM table_a; +SELECT view_a FROM view_a; + view_a +-------- +(0 rows) + +RESET debug_parallel_query; +RESET min_parallel_index_scan_size; +RESET min_parallel_table_scan_size; +RESET max_parallel_workers_per_gather; +RESET parallel_setup_cost; +RESET parallel_tuple_cost; diff --git a/src/test/regress/sql/temp.sql b/src/test/regress/sql/temp.sql index d50472ddced89..65c519bdabdec 100644 --- a/src/test/regress/sql/temp.sql +++ b/src/test/regress/sql/temp.sql @@ -418,3 +418,42 @@ SELECT count(*), max(a) max_a, min(a) min_a, max(cnt) max_cnt FROM test_temp; -- cleanup DROP FUNCTION test_temp_pin(int, int); + +-- Test visibility of a temporary table tuples in parallel workers +-- Although explain prints the number of workers planned and launched, In this +-- case it shouldn't cause test results float because the debugging +-- option force usage of at least single worker (normally no one is needed here +-- and we don't expect more than one worker. +CREATE TEMP TABLE test AS (SELECT x FROM generate_series(1,100) AS x); +VACUUM ANALYZE test; + +SET max_parallel_workers_per_gather = 1; +SET parallel_setup_cost = 0; +SET parallel_tuple_cost = 0; +SET min_parallel_table_scan_size = 0; +SET min_parallel_index_scan_size = 0; +SET debug_parallel_query = 'on'; + +-- Temp buffers will not be seen without flushing dirty buffers +EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF, BUFFERS OFF) +SELECT * FROM test; + +-- Check temporary indexes too +CREATE INDEX idx1 ON test(x); +SET enable_seqscan = 'off'; +EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF, BUFFERS OFF) +SELECT * FROM test; + +RESET enable_seqscan; + +-- a view doesn't have a storage - it shouldn't cause issues. +CREATE TEMP TABLE table_a (id integer); +CREATE TEMP VIEW view_a AS SELECT * FROM table_a; +SELECT view_a FROM view_a; + +RESET debug_parallel_query; +RESET min_parallel_index_scan_size; +RESET min_parallel_table_scan_size; +RESET max_parallel_workers_per_gather; +RESET parallel_setup_cost; +RESET parallel_tuple_cost; From 3dd4f3ace2924ba33dc2c4175fd87f354867dc89 Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Sun, 28 Sep 2025 13:17:53 +0200 Subject: [PATCH 4/4] Make the optimiser aware of the parallel temp scan Consider the extra cost of flushing temporary tables in partial path comparisons. With this commit, the optimiser gains a rationale for cost-based decision on enabling the parallel scan of subtrees that include temporary tables. It is achieved by adding to the path comparison routine an extra 'flush buffers' weighting factor. It is trivial to calculate the cost by tracking the number of dirtied temporary buffers and multiplying it by the write_page_cost parameter. The functions compare_path_costs and compare_fractional_path_costs were modified to account for this additional factor. --- src/backend/optimizer/path/costsize.c | 24 ++++++++++++ src/backend/optimizer/util/pathnode.c | 37 +++++++++++++++---- src/backend/utils/misc/guc_parameters.dat | 9 +++++ src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/optimizer/cost.h | 2 + src/include/optimizer/optimizer.h | 1 + 6 files changed, 66 insertions(+), 8 deletions(-) diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 35a45c50f3fe3..205baa0dd5cf6 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -104,6 +104,7 @@ #include "optimizer/plancat.h" #include "optimizer/restrictinfo.h" #include "parser/parsetree.h" +#include "storage/bufmgr.h" #include "utils/lsyscache.h" #include "utils/selfuncs.h" #include "utils/spccache.h" @@ -129,6 +130,7 @@ double seq_page_cost = DEFAULT_SEQ_PAGE_COST; double random_page_cost = DEFAULT_RANDOM_PAGE_COST; +double write_page_cost = DEFAULT_WRITE_PAGE_COST; double cpu_tuple_cost = DEFAULT_CPU_TUPLE_COST; double cpu_index_tuple_cost = DEFAULT_CPU_INDEX_TUPLE_COST; double cpu_operator_cost = DEFAULT_CPU_OPERATOR_COST; @@ -6655,3 +6657,25 @@ compute_gather_rows(Path *path) return clamp_row_est(path->rows * get_parallel_divisor(path)); } + +/* + * Before the launch parallel workers in a SELECT query, the leader process must + * flush all dirty pages in temp buffers to guarantee equal access to the data + * in each parallel worker. + * It seems difficult to calculate specific set of tables, indexes and toasts + * that may be touched inside the subtree. Moreover, stored procedures may also + * scan temporary tables. So, it makes sense to flush all temporary buffers. + * Here we calculate the cost of such operation to allow small queries do not + * activate expensive parallel scan over temp resources. + */ +Cost +tempbuf_flush_extra_cost() +{ + if (!extended_parallel_processing) + /* Fast exit if feature is disabled */ + return 0.0; + + /* Hopefully, we have an statistics on the number of dirtied buffers */ + Assert(dirtied_localbufs >= 0); + return write_page_cost * dirtied_localbufs; +} diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 37de58140e8cb..233495c219e67 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -69,6 +69,12 @@ static bool pathlist_is_reparameterizable_by_child(List *pathlist, int compare_path_costs(Path *path1, Path *path2, CostSelector criterion) { + Cost startup_cost1 = path1->startup_cost; + Cost startup_cost2 = path2->startup_cost; + Cost total_cost1 = path1->total_cost; + Cost total_cost2 = path2->total_cost; + Cost extra_cost = tempbuf_flush_extra_cost(); + /* Number of disabled nodes, if different, trumps all else. */ if (unlikely(path1->disabled_nodes != path2->disabled_nodes)) { @@ -78,35 +84,50 @@ compare_path_costs(Path *path1, Path *path2, CostSelector criterion) return +1; } + /* + * Add an extra cost of temporary buffers flushing fofr the time + * of comparison only. + */ + if (path1->parallel_safe == NEEDS_TEMP_FLUSH) + { + startup_cost1 += extra_cost; + total_cost1 += extra_cost; + } + if (path2->parallel_safe == NEEDS_TEMP_FLUSH) + { + startup_cost2 += extra_cost; + total_cost2 += extra_cost; + } + if (criterion == STARTUP_COST) { - if (path1->startup_cost < path2->startup_cost) + if (startup_cost1 < startup_cost2) return -1; - if (path1->startup_cost > path2->startup_cost) + if (startup_cost1 > startup_cost2) return +1; /* * If paths have the same startup cost (not at all unlikely), order * them by total cost. */ - if (path1->total_cost < path2->total_cost) + if (total_cost1 < total_cost2) return -1; - if (path1->total_cost > path2->total_cost) + if (total_cost1 > total_cost2) return +1; } else { - if (path1->total_cost < path2->total_cost) + if (total_cost1 < total_cost2) return -1; - if (path1->total_cost > path2->total_cost) + if (total_cost1 > total_cost2) return +1; /* * If paths have the same total cost, order them by startup cost. */ - if (path1->startup_cost < path2->startup_cost) + if (startup_cost1 < startup_cost2) return -1; - if (path1->startup_cost > path2->startup_cost) + if (startup_cost1 > startup_cost2) return +1; } return 0; diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index 9099013e975d6..f9e37f8b7c2c3 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -3502,6 +3502,15 @@ max => 'MAX_KILOBYTES', }, + +{ name => 'write_page_cost', type => 'real', context => 'PGC_USERSET', group => 'QUERY_TUNING_COST', + short_desc => 'Sets the planner\'s estimate of the cost of a disk page flushing.', + flags => 'GUC_EXPLAIN', + variable => 'write_page_cost', + boot_val => 'DEFAULT_WRITE_PAGE_COST', + min => '0', + max => 'DBL_MAX', +}, { name => 'xmlbinary', type => 'enum', context => 'PGC_USERSET', group => 'CLIENT_CONN_STATEMENT', short_desc => 'Sets how binary values are to be encoded in XML.', variable => 'xmlbinary', diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 5f18eae807650..a3cd0d03eb3b0 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -435,6 +435,7 @@ #seq_page_cost = 1.0 # measured on an arbitrary scale #random_page_cost = 4.0 # same scale as above +#write_page_cost = 5.0 # same scale as above #cpu_tuple_cost = 0.01 # same scale as above #cpu_index_tuple_cost = 0.005 # same scale as above #cpu_operator_cost = 0.0025 # same scale as above diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index b523bcda8f3d0..d68d8b8b77470 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -23,6 +23,7 @@ /* If you change these, update backend/utils/misc/postgresql.conf.sample */ #define DEFAULT_SEQ_PAGE_COST 1.0 #define DEFAULT_RANDOM_PAGE_COST 4.0 +#define DEFAULT_WRITE_PAGE_COST 5.0 /* Make it a little more than random read. */ #define DEFAULT_CPU_TUPLE_COST 0.01 #define DEFAULT_CPU_INDEX_TUPLE_COST 0.005 #define DEFAULT_CPU_OPERATOR_COST 0.0025 @@ -222,5 +223,6 @@ extern double compute_bitmap_pages(PlannerInfo *root, RelOptInfo *baserel, Path *bitmapqual, double loop_count, Cost *cost_p, double *tuples_p); extern double compute_gather_rows(Path *path); +extern Cost tempbuf_flush_extra_cost(void); #endif /* COST_H */ diff --git a/src/include/optimizer/optimizer.h b/src/include/optimizer/optimizer.h index 22e8b3d998cc0..bd264d3ebe13e 100644 --- a/src/include/optimizer/optimizer.h +++ b/src/include/optimizer/optimizer.h @@ -72,6 +72,7 @@ extern Selectivity clauselist_selectivity_ext(PlannerInfo *root, /* widely used cost parameters */ extern PGDLLIMPORT double seq_page_cost; extern PGDLLIMPORT double random_page_cost; +extern PGDLLIMPORT double write_page_cost; extern PGDLLIMPORT double cpu_tuple_cost; extern PGDLLIMPORT double cpu_index_tuple_cost; extern PGDLLIMPORT double cpu_operator_cost;