@@ -978,6 +978,180 @@ CUtils::FHasCTEAnchor(CExpression *pexpr)
978978 return false ;
979979}
980980
981+ // return CTEConsumers' and a set of CTEProducers' CTE ids in the given subtree
982+ void
983+ CUtils::CollectConsumersAndProducers (CMemoryPool *mp, CExpression *pexpr,
984+ ULongPtrArray *cteConsumers,
985+ UlongCteIdHashSet *cteProducerSet)
986+ {
987+ COperator *pop = pexpr->Pop ();
988+
989+ if (COperator::EopPhysicalCTEConsumer == pop->Eopid ())
990+ {
991+ cteConsumers->Append (GPOS_NEW (mp) ULONG (
992+ CPhysicalCTEConsumer::PopConvert (pop)->UlCTEId ()));
993+ }
994+ else if (COperator::EopPhysicalCTEProducer == pop->Eopid ())
995+ {
996+ cteProducerSet->Insert (GPOS_NEW (mp) ULONG (
997+ CPhysicalCTEProducer::PopConvert (pop)->UlCTEId ()));
998+ }
999+
1000+ for (ULONG ul = 0 ; ul < pexpr->Arity (); ul++)
1001+ {
1002+ CExpression *pexprChild = (*pexpr)[ul];
1003+
1004+ if (!pexprChild->Pop ()->FScalar ())
1005+ {
1006+ CollectConsumersAndProducers (mp, pexprChild, cteConsumers,
1007+ cteProducerSet);
1008+ }
1009+ }
1010+ }
1011+
1012+ BOOL
1013+ CUtils::hasUnpairedCTEConsumer (CMemoryPool *mp, CExpression *pexpr)
1014+ {
1015+ BOOL hasUnpairedConsumer = false ;
1016+
1017+ ULongPtrArray *cteConsumers = GPOS_NEW (mp) ULongPtrArray (mp);
1018+ UlongCteIdHashSet *cteProducerSet = GPOS_NEW (mp) UlongCteIdHashSet (mp);
1019+
1020+ CollectConsumersAndProducers (mp, pexpr, cteConsumers, cteProducerSet);
1021+
1022+ // check if every consumer's producer is in ProducerSet
1023+ for (ULONG ul = 0 ; ul < cteConsumers->Size (); ul++)
1024+ {
1025+ if (!cteProducerSet->Contains ((*cteConsumers)[ul]))
1026+ {
1027+ hasUnpairedConsumer = true ;
1028+ break ;
1029+ }
1030+ }
1031+ cteConsumers->Release ();
1032+ cteProducerSet->Release ();
1033+
1034+ return hasUnpairedConsumer;
1035+ }
1036+
1037+ // True if the distribution is replicated-like.
1038+ static BOOL
1039+ FReplicatedLikeDistribution (CDistributionSpec::EDistributionType edt)
1040+ {
1041+ return (CDistributionSpec::EdtStrictReplicated == edt ||
1042+ CDistributionSpec::EdtTaintedReplicated == edt ||
1043+ CDistributionSpec::EdtUniversal == edt);
1044+ }
1045+
1046+ struct SCTEInfo
1047+ {
1048+ ULONG cteId;
1049+ ULONG sliceId;
1050+
1051+ SCTEInfo (ULONG cte_id, ULONG slice_id) : cteId(cte_id), sliceId(slice_id)
1052+ {
1053+ }
1054+ };
1055+
1056+ typedef CDynamicPtrArray<SCTEInfo, CleanupDelete<SCTEInfo> > CTEInfoArray;
1057+
1058+ // Walk the physical tree, recording the slice id of every replicated
1059+ // CTE Producer and every CTE Consumer. Slices are delimited by Motion
1060+ // nodes: each non-scalar child of a Motion lives in a fresh slice --
1061+ // same motId-stack idea as in apply_shareinput_xslice.
1062+ static void
1063+ CollectCTESlices (CMemoryPool *mp, CExpression *pexpr, ULONG curSlice,
1064+ ULONG *pNextSlice, CTEInfoArray *prodInfos,
1065+ CTEInfoArray *consInfos)
1066+ {
1067+ COperator *pop = pexpr->Pop ();
1068+
1069+ if (COperator::EopPhysicalCTEProducer == pop->Eopid ())
1070+ {
1071+ // Producer's distribution comes from its only child -- inspect
1072+ // it there. Skip non-replicated Producers; they cannot trigger
1073+ // the cross-slice issue we are checking for.
1074+ GPOS_ASSERT (1 == pexpr->Arity ());
1075+ CExpression *pexprChild = (*pexpr)[0 ];
1076+ CDrvdPropPlan *pdpplan =
1077+ CDrvdPropPlan::Pdpplan (pexprChild->PdpDerive ());
1078+
1079+ if (FReplicatedLikeDistribution (pdpplan->Pds ()->Edt ()))
1080+ {
1081+ prodInfos->Append (GPOS_NEW (mp) SCTEInfo (
1082+ CPhysicalCTEProducer::PopConvert (pop)->UlCTEId (), curSlice));
1083+ }
1084+ }
1085+ else if (COperator::EopPhysicalCTEConsumer == pop->Eopid ())
1086+ {
1087+ // Consumer is a leaf -- record (cteId, curSlice) and let the
1088+ // caller decide later, once the whole tree has been walked.
1089+ consInfos->Append (GPOS_NEW (mp) SCTEInfo (
1090+ CPhysicalCTEConsumer::PopConvert (pop)->UlCTEId (), curSlice));
1091+ }
1092+
1093+ BOOL isMotion = CUtils::FPhysicalMotion (pop);
1094+
1095+ for (ULONG ul = 0 ; ul < pexpr->Arity (); ul++)
1096+ {
1097+ CExpression *pexprChild = (*pexpr)[ul];
1098+
1099+ if (pexprChild->Pop ()->FScalar ())
1100+ {
1101+ continue ;
1102+ }
1103+
1104+ ULONG childSlice = curSlice;
1105+ if (isMotion)
1106+ {
1107+ (*pNextSlice)++;
1108+ childSlice = *pNextSlice;
1109+ }
1110+
1111+ CollectCTESlices (mp, pexprChild, childSlice, pNextSlice, prodInfos,
1112+ consInfos);
1113+ }
1114+ }
1115+
1116+ BOOL
1117+ CUtils::FHasCrossSliceReplicatedCTEConsumer (CMemoryPool *mp, CExpression *pexpr)
1118+ {
1119+ if (NULL == pexpr)
1120+ {
1121+ return false ;
1122+ }
1123+
1124+ CTEInfoArray *prodInfos = GPOS_NEW (mp) CTEInfoArray (mp);
1125+ CTEInfoArray *consInfos = GPOS_NEW (mp) CTEInfoArray (mp);
1126+ ULONG nextSlice = 0 ;
1127+
1128+ CollectCTESlices (mp, pexpr, 0 /* curSlice*/ , &nextSlice, prodInfos,
1129+ consInfos);
1130+
1131+ BOOL cross = false ;
1132+
1133+ for (ULONG ic = 0 ; ic < consInfos->Size (); ic++)
1134+ {
1135+ SCTEInfo *cons = (*consInfos)[ic];
1136+
1137+ for (ULONG ip = 0 ; ip < prodInfos->Size (); ip++)
1138+ {
1139+ SCTEInfo *prod = (*prodInfos)[ip];
1140+ if (prod->cteId == cons->cteId && prod->sliceId != cons->sliceId )
1141+ {
1142+ cross = true ;
1143+ goto lExit;
1144+ }
1145+ }
1146+ }
1147+ lExit:
1148+
1149+ prodInfos->Release ();
1150+ consInfos->Release ();
1151+
1152+ return cross;
1153+ }
1154+
9811155// ---------------------------------------------------------------------------
9821156// @class:
9831157// CUtils::FHasSubqueryOrApply
0 commit comments