Skip to content

Commit d2ee2fc

Browse files
committed
orca: implement intra-segment parallel table scan support
Add comprehensive parallel table scan capability to GPORCA optimizer, enabling worker-level parallelism within segments for improved query performance on large table scans. Key components: - New CPhysicalParallelTableScan operator and CDistributionSpecWorkerRandom distribution specification for worker-level data distribution - CXformGet2ParallelTableScan transformation with parallel safety checks (excludes CTEs, dynamic scans, foreign tables, replicated tables, etc.) - Cost model integration with parallel_setup_cost and efficiency degradation scaling (logarithmic based on worker count) - DXL serialization/deserialization for CDXLPhysicalParallelTableScan - Plan translation to PostgreSQL SeqScan nodes with parallel_aware=true - Rewindability constraints (parallel scans are non-rewindable) - GUC integration: max_parallel_workers_per_gather controls worker count
1 parent ce84219 commit d2ee2fc

85 files changed

Lines changed: 2323 additions & 48 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/build-cloudberry.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,9 @@ jobs:
342342
},
343343
{"test":"ic-cbdb-parallel",
344344
"make_configs":["src/test/regress:installcheck-cbdb-parallel"]
345+
},
346+
{"test":"ic-orca-parallel",
347+
"make_configs":["src/test/regress:installcheck-orca-parallel"]
345348
}
346349
]
347350
}'

src/backend/gpopt/config/CConfigParamMapping.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,8 @@ CConfigParamMapping::PackConfigParamInBitset(
454454
// disable table scan if the corresponding GUC is turned off
455455
traceflag_bitset->ExchangeSet(
456456
GPOPT_DISABLE_XFORM_TF(CXform::ExfGet2TableScan));
457+
traceflag_bitset->ExchangeSet(
458+
GPOPT_DISABLE_XFORM_TF(CXform::ExfGet2ParallelTableScan));
457459
}
458460

459461
if (!optimizer_enable_push_join_below_union_all)

src/backend/gpopt/gpdbwrappers.cpp

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
#include <limits> // std::numeric_limits
2626

2727
#include "gpos/base.h"
28+
#include "gpopt/base/COptCtxt.h"
29+
#include "gpopt/optimizer/COptimizerConfig.h"
2830
#include "gpos/error/CAutoExceptionStack.h"
2931
#include "gpos/error/CException.h"
3032

@@ -36,8 +38,10 @@ extern "C" {
3638
#include "access/amapi.h"
3739
#include "access/external.h"
3840
#include "access/genam.h"
41+
#include "access/parallel.h"
3942
#include "catalog/pg_aggregate.h"
4043
#include "catalog/pg_inherits.h"
44+
#include "cdb/cdbvars.h"
4145
#include "foreign/fdwapi.h"
4246
#include "nodes/nodeFuncs.h"
4347
#include "optimizer/clauses.h"
@@ -52,6 +56,9 @@ extern "C" {
5256
#include "utils/lsyscache.h"
5357
#include "utils/memutils.h"
5458
#include "utils/partcache.h"
59+
60+
extern bool enable_parallel;
61+
extern int max_parallel_workers_per_gather;
5562
}
5663
#define GP_WRAP_START \
5764
sigjmp_buf local_sigjmp_buf; \
@@ -2576,6 +2583,19 @@ gpdb::GetForeignServerId(Oid reloid)
25762583
return 0;
25772584
}
25782585

2586+
int16
2587+
gpdb::GetAppendOnlySegmentFilesCount(Relation rel)
2588+
{
2589+
GP_WRAP_START;
2590+
{
2591+
FormData_pg_appendonly aoFormData;
2592+
GetAppendOnlyEntry(rel, &aoFormData);
2593+
return aoFormData.segfilecount;
2594+
}
2595+
GP_WRAP_END;
2596+
return -1;
2597+
}
2598+
25792599
// Locks on partition leafs and indexes are held during optimizer (after
25802600
// parse-analyze stage). ORCA need this function to lock relation. Here
25812601
// we do not need to consider lock-upgrade issue, reasons are:
@@ -2734,4 +2754,36 @@ gpdb::TestexprIsHashable(Node *testexpr, List *param_ids)
27342754
return false;
27352755
}
27362756

2757+
// check if parallel mode is OK (comprehensive check)
2758+
bool
2759+
gpdb::IsParallelModeOK(void)
2760+
{
2761+
GP_WRAP_START;
2762+
{
2763+
if (!enable_parallel)
2764+
return false;
2765+
2766+
if (IS_SINGLENODE())
2767+
return false;
2768+
2769+
if (max_parallel_workers_per_gather <= 0)
2770+
return false;
2771+
2772+
// Check if parallel plans are enabled in current optimizer context
2773+
gpopt::COptCtxt *poctxt = gpopt::COptCtxt::PoctxtFromTLS();
2774+
if (nullptr != poctxt)
2775+
{
2776+
gpopt::COptimizerConfig *optimizer_config = poctxt->GetOptimizerConfig();
2777+
if (nullptr != optimizer_config)
2778+
{
2779+
if (!optimizer_config->CreateParallelPlan())
2780+
return false;
2781+
}
2782+
}
2783+
return true;
2784+
}
2785+
GP_WRAP_END;
2786+
return false; // default to disabled if no context
2787+
}
2788+
27372789
// EOF

src/backend/gpopt/translate/CTranslatorDXLToPlStmt.cpp

Lines changed: 217 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ extern "C" {
3030
#include "partitioning/partdesc.h"
3131
#include "storage/lmgr.h"
3232
#include "utils/guc.h"
33+
#include "optimizer/cost.h"
3334
#include "utils/lsyscache.h"
3435
#include "utils/partcache.h"
3536
#include "utils/rel.h"
@@ -83,6 +84,7 @@ extern "C" {
8384
#include "naucrates/dxl/operators/CDXLPhysicalSplit.h"
8485
#include "naucrates/dxl/operators/CDXLPhysicalTVF.h"
8586
#include "naucrates/dxl/operators/CDXLPhysicalTableScan.h"
87+
#include "naucrates/dxl/operators/CDXLPhysicalParallelTableScan.h"
8688
#include "naucrates/dxl/operators/CDXLPhysicalValuesScan.h"
8789
#include "naucrates/dxl/operators/CDXLPhysicalWindow.h"
8890
#include "naucrates/dxl/operators/CDXLScalarBitmapBoolOp.h"
@@ -348,6 +350,12 @@ CTranslatorDXLToPlStmt::TranslateDXLOperatorToPlan(
348350
ctxt_translation_prev_siblings);
349351
break;
350352
}
353+
case EdxlopPhysicalParallelTableScan:
354+
{
355+
plan = TranslateDXLParallelTblScan(dxlnode, output_context,
356+
ctxt_translation_prev_siblings);
357+
break;
358+
}
351359
case EdxlopPhysicalIndexScan:
352360
{
353361
plan = TranslateDXLIndexScan(dxlnode, output_context,
@@ -712,14 +720,118 @@ CTranslatorDXLToPlStmt::TranslateDXLTblScan(
712720
}
713721

714722

723+
//---------------------------------------------------------------------------
724+
// @function:
725+
// CTranslatorDXLToPlStmt::TranslateDXLParallelTblScan
726+
//
727+
// @doc:
728+
// Translates a DXL parallel table scan node into a parallel SeqScan node
729+
Plan *
730+
CTranslatorDXLToPlStmt::TranslateDXLParallelTblScan(
731+
const CDXLNode *tbl_scan_dxlnode, CDXLTranslateContext *output_context,
732+
CDXLTranslationContextArray * /*ctxt_translation_prev_siblings*/)
733+
{
734+
// translate table descriptor into a range table entry
735+
CDXLPhysicalParallelTableScan *phy_parallel_tbl_scan_dxlop =
736+
CDXLPhysicalParallelTableScan::Cast(tbl_scan_dxlnode->GetOperator());
737+
738+
ULONG parallel_workers = phy_parallel_tbl_scan_dxlop->UlParallelWorkers();
739+
740+
// translation context for column mappings in the base relation
741+
CDXLTranslateContextBaseTable base_table_context(m_mp);
742+
743+
const CDXLTableDescr *dxl_table_descr =
744+
phy_parallel_tbl_scan_dxlop->GetDXLTableDescr();
745+
const IMDRelation *md_rel =
746+
m_md_accessor->RetrieveRel(dxl_table_descr->MDId());
747+
748+
// Lock any table we are to scan, since it may not have been properly locked
749+
// by the parser (e.g in case of generated scans for partitioned tables)
750+
OID oidRel = CMDIdGPDB::CastMdid(md_rel->MDId())->Oid();
751+
GPOS_ASSERT(dxl_table_descr->LockMode() != -1);
752+
gpdb::GPDBLockRelationOid(oidRel, dxl_table_descr->LockMode());
753+
754+
Index index = ProcessDXLTblDescr(dxl_table_descr, &base_table_context);
755+
756+
// a table scan node must have 2 children: projection list and filter
757+
GPOS_ASSERT(2 == tbl_scan_dxlnode->Arity());
758+
759+
// translate proj list and filter
760+
CDXLNode *project_list_dxlnode = (*tbl_scan_dxlnode)[EdxltsIndexProjList];
761+
CDXLNode *filter_dxlnode = (*tbl_scan_dxlnode)[EdxltsIndexFilter];
762+
763+
List *targetlist = NIL;
764+
765+
// List to hold the quals after translating filter_dxlnode node.
766+
List *query_quals = NIL;
767+
768+
TranslateProjListAndFilter(
769+
project_list_dxlnode, filter_dxlnode,
770+
&base_table_context, // translate context for the base table
771+
nullptr, // translate_ctxt_left and pdxltrctxRight,
772+
&targetlist, &query_quals, output_context);
773+
774+
Plan *plan = nullptr;
775+
Plan *plan_return = nullptr;
776+
777+
// Parallel table scans are always sequential scans (not foreign scans)
778+
SeqScan *seq_scan = MakeNode(SeqScan);
779+
seq_scan->scanrelid = index;
780+
plan = &(seq_scan->plan);
781+
plan_return = (Plan *) seq_scan;
782+
783+
// Set parallel execution flags
784+
plan->parallel_aware = true;
785+
plan->parallel_safe = true;
786+
plan->parallel = (int) parallel_workers;
787+
788+
plan->targetlist = targetlist;
789+
790+
// List to hold the quals which contain both security quals and query
791+
// quals.
792+
List *security_query_quals = NIL;
793+
794+
// Fetching the RTE of the relation from the rewritten parse tree
795+
// based on the oidRel and adding the security quals of the RTE in
796+
// the security_query_quals list.
797+
AddSecurityQuals(oidRel, &security_query_quals, &index);
798+
799+
// The security quals should always be executed first when
800+
// compared to other quals. So appending query quals to the
801+
// security_query_quals list after the security quals.
802+
security_query_quals =
803+
gpdb::ListConcat(security_query_quals, query_quals);
804+
plan->qual = security_query_quals;
805+
806+
if (md_rel->IsNonBlockTable())
807+
{
808+
CheckSafeTargetListForAOTables(plan->targetlist);
809+
}
810+
811+
plan->plan_node_id = m_dxl_to_plstmt_context->GetNextPlanId();
812+
813+
// translate operator costs
814+
TranslatePlanCosts(tbl_scan_dxlnode, plan);
815+
816+
// Adjust row count to per-worker statistics
817+
if (parallel_workers > 1)
818+
{
819+
plan->plan_rows = ceil(plan->plan_rows / parallel_workers);
820+
}
821+
822+
SetParamIds(plan);
823+
824+
return plan_return;
825+
}
826+
827+
715828
//---------------------------------------------------------------------------
716829
// @function:
717830
// CTranslatorDXLToPlStmt::SetIndexVarAttnoWalker
718831
//
719832
// @doc:
720833
// Walker to set index var attno's,
721834
// attnos of index vars are set to their relative positions in index keys,
722-
// skip any outer references while walking the expression tree
723835
//
724836
//---------------------------------------------------------------------------
725837
BOOL
@@ -2415,15 +2527,34 @@ CTranslatorDXLToPlStmt::TranslateDXLMotion(
24152527
sendslice->directDispatch.contentIds = NIL;
24162528
sendslice->directDispatch.haveProcessedAnyCalculations = false;
24172529

2530+
// set parallel workers if needed
2531+
ULONG child_index = motion_dxlop->GetRelationChildIdx();
2532+
CDXLNode *child_dxlnode = (*motion_dxlnode)[child_index];
2533+
ULONG child_parallel_workers = ExtractParallelWorkersFromDXL(child_dxlnode);
2534+
if (child_parallel_workers > 1)
2535+
{
2536+
// Determine parallel workers based on enable_parallel and gang type
2537+
bool supports_parallel = (sendslice->gangType == GANGTYPE_PRIMARY_READER ||
2538+
sendslice->gangType == GANGTYPE_PRIMARY_WRITER);
2539+
2540+
if (supports_parallel)
2541+
{
2542+
sendslice->parallel_workers = child_parallel_workers;
2543+
}
2544+
else
2545+
{
2546+
// Disable parallel for: non-PRIMARY gang types
2547+
// (SINGLETON_READER, ENTRYDB_READER, UNALLOCATED)
2548+
sendslice->parallel_workers = 0;
2549+
}
2550+
}
2551+
24182552
motion->motionID = sendslice->sliceIndex;
24192553

24202554
// translate motion child
24212555
// child node is in the same position in broadcast and gather motion nodes
24222556
// but different in redistribute motion nodes
2423-
2424-
ULONG child_index = motion_dxlop->GetRelationChildIdx();
2425-
2426-
CDXLNode *child_dxlnode = (*motion_dxlnode)[child_index];
2557+
// Note: child_index and child_dxlnode already defined above
24272558

24282559
CDXLTranslateContext child_context(m_mp, false,
24292560
output_context->GetColIdToParamIdMap());
@@ -2576,6 +2707,16 @@ CTranslatorDXLToPlStmt::TranslateDXLMotion(
25762707
return nullptr;
25772708
}
25782709

2710+
// Adjust row count for parallel execution in the sending slice
2711+
// The Motion node receives rows from all parallel workers, so we need to
2712+
// account for the fact that each worker processes a fraction of the rows.
2713+
// TranslatePlanCosts() already divided by numsegments, but if we have
2714+
// parallel workers, each segment is further subdivided among workers.
2715+
if (sendslice->parallel_workers > 1)
2716+
{
2717+
plan->plan_rows = ceil(plan->plan_rows / sendslice->parallel_workers);
2718+
}
2719+
25792720
SetParamIds(plan);
25802721

25812722
return (Plan *) motion;
@@ -7282,4 +7423,75 @@ CTranslatorDXLToPlStmt::IsIndexForOrderBy(
72827423
}
72837424
return false;
72847425
}
7426+
7427+
//---------------------------------------------------------------------------
7428+
// @function:
7429+
// CTranslatorDXLToPlStmt::ExtractParallelWorkersFromDXL
7430+
//
7431+
// @doc:
7432+
// Extract parallel workers count from DXL node tree recursively.
7433+
// Since parallel degree is uniform across all parallel scans in a query,
7434+
// returns the first parallel degree found from any CDXLPhysicalParallelTableScan,
7435+
// or 1 if no parallel scan exists.
7436+
//
7437+
//---------------------------------------------------------------------------
7438+
ULONG
7439+
CTranslatorDXLToPlStmt::ExtractParallelWorkersFromDXL(const CDXLNode *dxlnode)
7440+
{
7441+
if (nullptr == dxlnode)
7442+
{
7443+
return 1;
7444+
}
7445+
7446+
CDXLOperator *dxlop = dxlnode->GetOperator();
7447+
if (EdxlopPhysicalParallelTableScan == dxlop->GetDXLOperator())
7448+
{
7449+
// Return parallel workers from the parallel table scan operator
7450+
// All parallel scans in the query share the same parallel degree
7451+
CDXLPhysicalParallelTableScan *parallel_scan_dxlop =
7452+
CDXLPhysicalParallelTableScan::Cast(dxlop);
7453+
return parallel_scan_dxlop->UlParallelWorkers();
7454+
}
7455+
else if (EdxlopPhysicalTableScan == dxlop->GetDXLOperator() ||
7456+
EdxlopPhysicalDynamicTableScan == dxlop->GetDXLOperator() ||
7457+
EdxlopPhysicalIndexScan == dxlop->GetDXLOperator() ||
7458+
EdxlopPhysicalIndexOnlyScan == dxlop->GetDXLOperator() ||
7459+
EdxlopPhysicalBitmapTableScan == dxlop->GetDXLOperator() ||
7460+
EdxlopPhysicalDynamicBitmapTableScan == dxlop->GetDXLOperator() ||
7461+
EdxlopPhysicalForeignScan == dxlop->GetDXLOperator() ||
7462+
EdxlopPhysicalDynamicForeignScan == dxlop->GetDXLOperator() ||
7463+
EdxlopPhysicalDynamicIndexScan == dxlop->GetDXLOperator() ||
7464+
EdxlopPhysicalDynamicIndexOnlyScan == dxlop->GetDXLOperator() ||
7465+
EdxlopPhysicalValuesScan == dxlop->GetDXLOperator())
7466+
{
7467+
// Non-parallel scans (table, index, bitmap, foreign, values)
7468+
// These are leaf nodes in terms of parallel worker extraction
7469+
// Return 1 to indicate no parallel workers
7470+
return 1;
7471+
}
7472+
else if (EdxlopPhysicalMotionGather == dxlop->GetDXLOperator() ||
7473+
EdxlopPhysicalMotionBroadcast == dxlop->GetDXLOperator() ||
7474+
EdxlopPhysicalMotionRedistribute == dxlop->GetDXLOperator() ||
7475+
EdxlopPhysicalMotionRandom == dxlop->GetDXLOperator() ||
7476+
EdxlopPhysicalMotionRoutedDistribute == dxlop->GetDXLOperator())
7477+
{
7478+
// Motion node creates a slice boundary - do not recurse into child
7479+
// The child's parallel workers belong to the sending slice, not receiving slice
7480+
// Return 0 to indicate the receiving slice (current slice) has no parallel workers
7481+
return 1;
7482+
}
7483+
7484+
// Recursively check child nodes, return early when first parallel scan is found
7485+
for (ULONG ul = 0; ul < dxlnode->Arity(); ul++)
7486+
{
7487+
ULONG child_parallel_workers = ExtractParallelWorkersFromDXL((*dxlnode)[ul]);
7488+
if (child_parallel_workers > 1)
7489+
{
7490+
return child_parallel_workers;
7491+
}
7492+
}
7493+
7494+
return 1;
7495+
}
7496+
72857497
// EOF

0 commit comments

Comments
 (0)