Skip to content

Commit b51a8fc

Browse files
committed
orca: fallback to Postgres optimizer on cross-slice replicated CTE Consumer.
Inspired by greengage 51fe92e: before Expr->DXL translation, walk the physical tree and track which slice each CTE Producer and Consumer lives on. If a Consumer is on a different slice than its Producer and the Producer's distribution is replicated, force a fallback to the Postgres optimizer. The replicated filter is essential: ordinary cross-slice CTE plans (non-replicated Producer with Gather/Redistribute Consumer) are a normal ORCA pattern and must not trigger fallback. 51fe92e doesn't trigger when a CTE over a replicated table is referenced from a scalar subquery, so the query hangs. This commit replaces the single-point check with a whole-tree walker that catches both cases. Tests: shared_scan adds a scalar-subquery reproducer guarded by statement_timeout. qp_orca_fallback adds two cases over a replicated CTE: a scalar-subquery form that triggers the walker (the hang case 51fe92e missed -- fallback to Postgres), and the original 51fe92e JOIN form where ORCA emits a safe plan with a One-Time Filter (gp_execution_segment() = N) and the walker correctly stays silent (guards against false positives). (cherry picked from commit open-gpdb/gpdb@3a9aebf)
1 parent 980ed21 commit b51a8fc

9 files changed

Lines changed: 515 additions & 1 deletion

File tree

src/backend/gporca/libgpopt/include/gpopt/base/CUtils.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1026,6 +1026,21 @@ class CUtils
10261026
static CTableDescriptorHashSet *RemoveDuplicateMdids(
10271027
CMemoryPool *mp, CTableDescriptorHashSet *tabdescs);
10281028

1029+
// hash set from CTE ids
1030+
typedef CHashSet<ULONG, gpos::HashValue<ULONG>, gpos::Equals<ULONG>,
1031+
CleanupDelete<ULONG> >
1032+
UlongCteIdHashSet;
1033+
1034+
static void CollectConsumersAndProducers(CMemoryPool *mp,
1035+
CExpression *pexpr,
1036+
ULongPtrArray *cteConsumers,
1037+
UlongCteIdHashSet *cteProducerSet);
1038+
1039+
static BOOL hasUnpairedCTEConsumer(CMemoryPool *mp, CExpression *pexpr);
1040+
1041+
static BOOL FHasCrossSliceReplicatedCTEConsumer(CMemoryPool *mp,
1042+
CExpression *pexpr);
1043+
10291044
static CExpression *ReplaceColrefWithProjectExpr(CMemoryPool *mp,
10301045
CExpression *pexpr,
10311046
CColRef *pcolref,

src/backend/gporca/libgpopt/src/base/CUtils.cpp

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -978,6 +978,180 @@ CUtils::FHasCTEAnchor(CExpression *pexpr)
978978
return false;
979979
}
980980

981+
// return CTEConsumers' and a set of CTEProducers' CTE ids in the given subtree
982+
void
983+
CUtils::CollectConsumersAndProducers(CMemoryPool *mp, CExpression *pexpr,
984+
ULongPtrArray *cteConsumers,
985+
UlongCteIdHashSet *cteProducerSet)
986+
{
987+
COperator *pop = pexpr->Pop();
988+
989+
if (COperator::EopPhysicalCTEConsumer == pop->Eopid())
990+
{
991+
cteConsumers->Append(GPOS_NEW(mp) ULONG(
992+
CPhysicalCTEConsumer::PopConvert(pop)->UlCTEId()));
993+
}
994+
else if (COperator::EopPhysicalCTEProducer == pop->Eopid())
995+
{
996+
cteProducerSet->Insert(GPOS_NEW(mp) ULONG(
997+
CPhysicalCTEProducer::PopConvert(pop)->UlCTEId()));
998+
}
999+
1000+
for (ULONG ul = 0; ul < pexpr->Arity(); ul++)
1001+
{
1002+
CExpression *pexprChild = (*pexpr)[ul];
1003+
1004+
if (!pexprChild->Pop()->FScalar())
1005+
{
1006+
CollectConsumersAndProducers(mp, pexprChild, cteConsumers,
1007+
cteProducerSet);
1008+
}
1009+
}
1010+
}
1011+
1012+
BOOL
1013+
CUtils::hasUnpairedCTEConsumer(CMemoryPool *mp, CExpression *pexpr)
1014+
{
1015+
BOOL hasUnpairedConsumer = false;
1016+
1017+
ULongPtrArray *cteConsumers = GPOS_NEW(mp) ULongPtrArray(mp);
1018+
UlongCteIdHashSet *cteProducerSet = GPOS_NEW(mp) UlongCteIdHashSet(mp);
1019+
1020+
CollectConsumersAndProducers(mp, pexpr, cteConsumers, cteProducerSet);
1021+
1022+
// check if every consumer's producer is in ProducerSet
1023+
for (ULONG ul = 0; ul < cteConsumers->Size(); ul++)
1024+
{
1025+
if (!cteProducerSet->Contains((*cteConsumers)[ul]))
1026+
{
1027+
hasUnpairedConsumer = true;
1028+
break;
1029+
}
1030+
}
1031+
cteConsumers->Release();
1032+
cteProducerSet->Release();
1033+
1034+
return hasUnpairedConsumer;
1035+
}
1036+
1037+
// True if the distribution is replicated-like.
1038+
static BOOL
1039+
FReplicatedLikeDistribution(CDistributionSpec::EDistributionType edt)
1040+
{
1041+
return (CDistributionSpec::EdtStrictReplicated == edt ||
1042+
CDistributionSpec::EdtTaintedReplicated == edt ||
1043+
CDistributionSpec::EdtUniversal == edt);
1044+
}
1045+
1046+
struct SCTEInfo
1047+
{
1048+
ULONG cteId;
1049+
ULONG sliceId;
1050+
1051+
SCTEInfo(ULONG cte_id, ULONG slice_id) : cteId(cte_id), sliceId(slice_id)
1052+
{
1053+
}
1054+
};
1055+
1056+
typedef CDynamicPtrArray<SCTEInfo, CleanupDelete<SCTEInfo> > CTEInfoArray;
1057+
1058+
// Walk the physical tree, recording the slice id of every replicated
1059+
// CTE Producer and every CTE Consumer. Slices are delimited by Motion
1060+
// nodes: each non-scalar child of a Motion lives in a fresh slice --
1061+
// same motId-stack idea as in apply_shareinput_xslice.
1062+
static void
1063+
CollectCTESlices(CMemoryPool *mp, CExpression *pexpr, ULONG curSlice,
1064+
ULONG *pNextSlice, CTEInfoArray *prodInfos,
1065+
CTEInfoArray *consInfos)
1066+
{
1067+
COperator *pop = pexpr->Pop();
1068+
1069+
if (COperator::EopPhysicalCTEProducer == pop->Eopid())
1070+
{
1071+
// Producer's distribution comes from its only child -- inspect
1072+
// it there. Skip non-replicated Producers; they cannot trigger
1073+
// the cross-slice issue we are checking for.
1074+
GPOS_ASSERT(1 == pexpr->Arity());
1075+
CExpression *pexprChild = (*pexpr)[0];
1076+
CDrvdPropPlan *pdpplan =
1077+
CDrvdPropPlan::Pdpplan(pexprChild->PdpDerive());
1078+
1079+
if (FReplicatedLikeDistribution(pdpplan->Pds()->Edt()))
1080+
{
1081+
prodInfos->Append(GPOS_NEW(mp) SCTEInfo(
1082+
CPhysicalCTEProducer::PopConvert(pop)->UlCTEId(), curSlice));
1083+
}
1084+
}
1085+
else if (COperator::EopPhysicalCTEConsumer == pop->Eopid())
1086+
{
1087+
// Consumer is a leaf -- record (cteId, curSlice) and let the
1088+
// caller decide later, once the whole tree has been walked.
1089+
consInfos->Append(GPOS_NEW(mp) SCTEInfo(
1090+
CPhysicalCTEConsumer::PopConvert(pop)->UlCTEId(), curSlice));
1091+
}
1092+
1093+
BOOL isMotion = CUtils::FPhysicalMotion(pop);
1094+
1095+
for (ULONG ul = 0; ul < pexpr->Arity(); ul++)
1096+
{
1097+
CExpression *pexprChild = (*pexpr)[ul];
1098+
1099+
if (pexprChild->Pop()->FScalar())
1100+
{
1101+
continue;
1102+
}
1103+
1104+
ULONG childSlice = curSlice;
1105+
if (isMotion)
1106+
{
1107+
(*pNextSlice)++;
1108+
childSlice = *pNextSlice;
1109+
}
1110+
1111+
CollectCTESlices(mp, pexprChild, childSlice, pNextSlice, prodInfos,
1112+
consInfos);
1113+
}
1114+
}
1115+
1116+
BOOL
1117+
CUtils::FHasCrossSliceReplicatedCTEConsumer(CMemoryPool *mp, CExpression *pexpr)
1118+
{
1119+
if (NULL == pexpr)
1120+
{
1121+
return false;
1122+
}
1123+
1124+
CTEInfoArray *prodInfos = GPOS_NEW(mp) CTEInfoArray(mp);
1125+
CTEInfoArray *consInfos = GPOS_NEW(mp) CTEInfoArray(mp);
1126+
ULONG nextSlice = 0;
1127+
1128+
CollectCTESlices(mp, pexpr, 0 /*curSlice*/, &nextSlice, prodInfos,
1129+
consInfos);
1130+
1131+
BOOL cross = false;
1132+
1133+
for (ULONG ic = 0; ic < consInfos->Size(); ic++)
1134+
{
1135+
SCTEInfo *cons = (*consInfos)[ic];
1136+
1137+
for (ULONG ip = 0; ip < prodInfos->Size(); ip++)
1138+
{
1139+
SCTEInfo *prod = (*prodInfos)[ip];
1140+
if (prod->cteId == cons->cteId && prod->sliceId != cons->sliceId)
1141+
{
1142+
cross = true;
1143+
goto lExit;
1144+
}
1145+
}
1146+
}
1147+
lExit:
1148+
1149+
prodInfos->Release();
1150+
consInfos->Release();
1151+
1152+
return cross;
1153+
}
1154+
9811155
//---------------------------------------------------------------------------
9821156
// @class:
9831157
// CUtils::FHasSubqueryOrApply

src/backend/gporca/libgpopt/src/translate/CTranslatorExprToDXL.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,20 @@ CTranslatorExprToDXL::PdxlnTranslate(CExpression *pexpr,
265265

266266
GPOS_ASSERT(nullptr == m_pdpplan);
267267

268+
// Walk the physical tree and detect a CTE Consumer placed on a
269+
// different slice than its Producer when the Producer's output is
270+
// replicated-like (StrictReplicated/TaintedReplicated/Universal).
271+
// Fall back to the Postgres optimizer if it is detected because
272+
// it breaks Producer-Consumer locality and can hang the
273+
// query at execution.
274+
if (CUtils::FHasCrossSliceReplicatedCTEConsumer(m_mp, pexpr))
275+
{
276+
GPOS_RAISE(
277+
gpdxl::ExmaDXL, gpdxl::ExmiExpr2DXLUnsupportedFeature,
278+
GPOS_WSZ_LIT(
279+
"CTE Consumer placed on a different slice than its replicated Producer"));
280+
}
281+
268282
m_pdpplan = CDrvdPropPlan::Pdpplan(pexpr->PdpDerive());
269283
m_pdpplan->AddRef();
270284

@@ -4250,6 +4264,10 @@ CTranslatorExprToDXL::BuildScalarSubplans(
42504264
{
42514265
const ULONG size = pdrgpcrInner->Size();
42524266

4267+
// Fallback to Postgres optimizer if the SubPlan's inner expression contains a
4268+
// CTE Consumer whose Producer lives outside this subtree. Such a Consumer
4269+
// would become a cross-slice Shared Scan reader without a local Producer,
4270+
// which can hang the query or fail at execution time.
42534271
CDXLNodeArray *pdrgpdxlnInner = GPOS_NEW(m_mp) CDXLNodeArray(m_mp);
42544272
for (ULONG ul = 0; ul < size; ul++)
42554273
{

src/test/regress/expected/qp_orca_fallback.out

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,12 +287,96 @@ select array_agg(a order by b)
287287

288288
-- Orca should fallback if a function in 'from' clause uses 'WITH ORDINALITY'
289289
SELECT * FROM jsonb_array_elements('["b", "a"]'::jsonb) WITH ORDINALITY;
290-
value | ordinality
290+
value | ordinality
291291
-------+------------
292292
"b" | 1
293293
"a" | 2
294294
(2 rows)
295295

296+
-- The walker that detects a CTE Consumer on a different slice than its
297+
-- replicated Producer. Without it ORCA would emit a plan with cross-slice
298+
-- replicated CTE Consumers that hangs at execution.
299+
-- start_ignore
300+
DROP TABLE IF EXISTS tbl1, tbl2;
301+
NOTICE: table "tbl1" does not exist, skipping
302+
NOTICE: table "tbl2" does not exist, skipping
303+
-- end_ignore
304+
CREATE TABLE tbl2 (id numeric, refrcode varchar(255), referenceid numeric)
305+
DISTRIBUTED REPLICATED;
306+
CREATE TABLE tbl1 (id bigserial, iscalctrg varchar(15) NOT NULL,
307+
iscalcdetail varchar(15))
308+
DISTRIBUTED REPLICATED;
309+
INSERT INTO tbl2 SELECT i, 'A'||(i%5), 101991
310+
FROM generate_series(1, 50000) i;
311+
INSERT INTO tbl1 (iscalctrg, iscalcdetail)
312+
SELECT 'A'||(i%5), 'A'||(i%7) FROM generate_series(1, 50000) i;
313+
ANALYZE tbl1;
314+
ANALYZE tbl2;
315+
-- Case 1: walker triggers fallback. With scalar subqueries on the CTE
316+
-- ORCA produces a plan whose CTE Producer is replicated and Consumers
317+
-- live on a different slice -- the walker raises ExmiExpr2DXLUnsupported
318+
-- and trace_fallback DETAIL says "CTE Consumer placed on a different
319+
-- slice than its replicated Producer".
320+
EXPLAIN (COSTS OFF)
321+
WITH t2 AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991)
322+
SELECT p.iscalctrg,
323+
(SELECT refrcode FROM t2 WHERE refrcode = p.iscalctrg LIMIT 1) AS r,
324+
(SELECT refrcode FROM t2 WHERE refrcode = p.iscalcdetail LIMIT 1) AS r1
325+
FROM tbl1 p
326+
LIMIT 1;
327+
QUERY PLAN
328+
----------------------------------------------------------------------------------------
329+
Gather Motion 1:1 (slice1; segments: 1)
330+
-> Limit
331+
-> Seq Scan on tbl1 p
332+
SubPlan 1
333+
-> Limit
334+
-> Result
335+
Filter: ((tbl2.refrcode)::text = (p.iscalctrg)::text)
336+
-> Materialize
337+
-> Seq Scan on tbl2
338+
Filter: (referenceid = '101991'::numeric)
339+
SubPlan 2
340+
-> Limit
341+
-> Result
342+
Filter: ((tbl2_1.refrcode)::text = (p.iscalcdetail)::text)
343+
-> Materialize
344+
-> Seq Scan on tbl2 tbl2_1
345+
Filter: (referenceid = '101991'::numeric)
346+
Optimizer: Postgres query optimizer
347+
(18 rows)
348+
349+
-- Case 2: walker correctly stays silent. The same CTE referenced from a
350+
-- JOIN: ORCA pins the Producer body to a single segment with a One-Time
351+
-- Filter (gp_execution_segment() = N), so the Producer's child
352+
-- distribution is EdtSingleton, not replicated -- the walker skips it.
353+
EXPLAIN (COSTS OFF)
354+
WITH t1 AS (SELECT * FROM tbl1),
355+
t2 AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991)
356+
SELECT p.* FROM t1 p
357+
JOIN t2 r ON p.iscalctrg = r.refrcode
358+
JOIN t2 r1 ON p.iscalcdetail = r1.refrcode
359+
LIMIT 1;
360+
QUERY PLAN
361+
---------------------------------------------------------------------------------
362+
Gather Motion 1:1 (slice1; segments: 1)
363+
-> Limit
364+
-> Hash Join
365+
Hash Cond: ((tbl1.iscalcdetail)::text = (r1.refrcode)::text)
366+
-> Hash Join
367+
Hash Cond: ((tbl2.refrcode)::text = (tbl1.iscalctrg)::text)
368+
-> Seq Scan on tbl2
369+
Filter: (referenceid = '101991'::numeric)
370+
-> Hash
371+
-> Seq Scan on tbl1
372+
-> Hash
373+
-> Subquery Scan on r1
374+
-> Seq Scan on tbl2 tbl2_1
375+
Filter: (referenceid = '101991'::numeric)
376+
Optimizer: Postgres query optimizer
377+
(15 rows)
378+
379+
DROP TABLE tbl1, tbl2;
296380
-- start_ignore
297381
-- FIXME: gpcheckcat fails due to mismatching distribution policy if this table isn't dropped
298382
-- Keep this table around once this is fixed

0 commit comments

Comments
 (0)