Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 79 additions & 7 deletions src/backend/gporca/libgpdbcost/src/CCostModelGPDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,34 @@ CCostModelGPDB::CostHashAgg(CMemoryPool *mp, CExpressionHandle &exprhdl,
if ((COperator::EgbaggtypeLocal == popAgg->Egbaggtype()) &&
popAgg->FGeneratesDuplicates())
{
num_output_rows = num_output_rows * pcmgpdb->UlHosts();
// Use NDV of grouping columns from child statistics to estimate the
// actual output rows of local partial aggregation, rather than relying
// solely on GPORCA's cardinality estimate (which can be inflated after
// multi-table joins).
//
// The local partial agg's output is bounded by the NDV of its grouping
// key. GetNDVs() returns the global NDV (total across all segments),
// so num_output_rows = global NDV, capped at global input rows.
//
// This lets the optimizer distinguish:
// - High NDV (≈ input rows): partial agg streams nearly as many rows
// as input → little benefit, cost approaches 1-phase.
// - Low NDV (<<< input rows): partial agg significantly reduces data
// before redistribution → 2-phase preferred.
const CColRefArray *pdrgpcrGrpCols = popAgg->PdrgpcrGroupingCols();
if (pci->Pcstats(0) != nullptr && pdrgpcrGrpCols->Size() == 1)
{
CColRef *colref = (*pdrgpcrGrpCols)[0];
CDouble ndv = pci->Pcstats(0)->GetNDVs(colref);

num_output_rows =
std::max(1.0, std::min(ndv.Get() * pcmgpdb->UlHosts(),
num_output_rows * pcmgpdb->UlHosts()));
}
else
{
num_output_rows = num_output_rows * pcmgpdb->UlHosts();
}
}

// get the number of grouping columns
Expand All @@ -852,9 +879,32 @@ CCostModelGPDB::CostHashAgg(CMemoryPool *mp, CExpressionHandle &exprhdl,
pcmgpdb->GetCostModelParams()
->PcpLookup(CCostModelParamsGPDB::EcpHashAggOutputTupWidthCostUnit)
->Get();
const CDouble dHashAggSpillingMemThreshold =
pcmgpdb->GetCostModelParams()
->PcpLookup(CCostModelParamsGPDB::EcpHJSpillingMemThreshold)
->Get();
const CDouble dHashAggInputTupColumnSpillingCostUnit =
pcmgpdb->GetCostModelParams()
->PcpLookup(
CCostModelParamsGPDB::EcpHJFeedingTupColumnSpillingCostUnit)
->Get();
const CDouble dHashAggInputTupWidthSpillingCostUnit =
pcmgpdb->GetCostModelParams()
->PcpLookup(
CCostModelParamsGPDB::EcpHJFeedingTupWidthSpillingCostUnit)
->Get();
const CDouble dHashAggOutputTupWidthSpillingCostUnit =
pcmgpdb->GetCostModelParams()
->PcpLookup(
CCostModelParamsGPDB::EcpHJHashingTupWidthSpillingCostUnit)
->Get();
GPOS_ASSERT(0 < dHashAggInputTupColumnCostUnit);
GPOS_ASSERT(0 < dHashAggInputTupWidthCostUnit);
GPOS_ASSERT(0 < dHashAggOutputTupWidthCostUnit);
GPOS_ASSERT(0 < dHashAggSpillingMemThreshold);
GPOS_ASSERT(0 < dHashAggInputTupColumnSpillingCostUnit);
GPOS_ASSERT(0 < dHashAggInputTupWidthSpillingCostUnit);
GPOS_ASSERT(0 < dHashAggOutputTupWidthSpillingCostUnit);

// hashAgg cost contains three parts: build hash table, aggregate tuples, and output tuples.
// 1. build hash table is correlated with the number of num_input_rows
Expand All @@ -863,12 +913,34 @@ CCostModelGPDB::CostHashAgg(CMemoryPool *mp, CExpressionHandle &exprhdl,
// algorithm and thus is ignored.
// 3. cost of output tuples is correlated with num_output_rows and
// width of returning tuples.
CCost costLocal = CCost(
pci->NumRebinds() *
(num_input_rows * ulGrpCols * dHashAggInputTupColumnCostUnit +
num_input_rows * ulGrpCols * pci->Width() *
dHashAggInputTupWidthCostUnit +
num_output_rows * pci->Width() * dHashAggOutputTupWidthCostUnit));
//
// The hash table holds one entry per distinct group, so its memory
// footprint is approximately num_output_rows * width. When this
// exceeds the spilling threshold the aggregator writes batches to disk
// and re-reads them, which is reflected by higher cost unit values.
CCost costLocal(0);
if (num_output_rows * pci->Width() <= dHashAggSpillingMemThreshold)
{
// groups fit in memory
costLocal = CCost(
pci->NumRebinds() *
(num_input_rows * ulGrpCols * dHashAggInputTupColumnCostUnit +
num_input_rows * ulGrpCols * pci->Width() *
dHashAggInputTupWidthCostUnit +
num_output_rows * pci->Width() * dHashAggOutputTupWidthCostUnit));
}
else
{
// groups spill to disk
costLocal = CCost(
pci->NumRebinds() *
(num_input_rows * ulGrpCols *
dHashAggInputTupColumnSpillingCostUnit +
num_input_rows * ulGrpCols * pci->Width() *
dHashAggInputTupWidthSpillingCostUnit +
num_output_rows * pci->Width() *
dHashAggOutputTupWidthSpillingCostUnit));
}
CCost costChild =
CostChildren(mp, exprhdl, pci, pcmgpdb->GetCostModelParams());

Expand Down
92 changes: 43 additions & 49 deletions src/test/regress/expected/tpcds_q04_optimizer.out
Original file line number Diff line number Diff line change
Expand Up @@ -5437,67 +5437,61 @@ INSERT INTO pg_statistic VALUES (
-> Append
-> Result
Filter: (((('s'::text) = 's'::text) AND (date_dim.d_year = 2001) AND ((sum(((((store_sales.ss_ext_list_price - store_sales.ss_ext_wholesale_cost) - store_sales.ss_ext_discount_amt) + store_sales.ss_ext_sales_price) / '2'::numeric))) > '0'::numeric)) OR ((('s'::text) = 's'::text) AND (date_dim.d_year = 2002)) OR ((('s'::text) = 'c'::text) AND (date_dim.d_year = 2001) AND ((sum(((((store_sales.ss_ext_list_price - store_sales.ss_ext_wholesale_cost) - store_sales.ss_ext_discount_amt) + store_sales.ss_ext_sales_price) / '2'::numeric))) > '0'::numeric)) OR ((('s'::text) = 'c'::text) AND (date_dim.d_year = 2002)) OR ((('s'::text) = 'w'::text) AND (date_dim.d_year = 2001) AND ((sum(((((store_sales.ss_ext_list_price - store_sales.ss_ext_wholesale_cost) - store_sales.ss_ext_discount_amt) + store_sales.ss_ext_sales_price) / '2'::numeric))) > '0'::numeric)) OR ((('s'::text) = 'w'::text) AND (date_dim.d_year = 2002)))
-> Finalize HashAggregate
-> HashAggregate
Group Key: customer.c_customer_id, customer.c_first_name, customer.c_last_name, customer.c_preferred_cust_flag, customer.c_birth_country, customer.c_login, customer.c_email_address, date_dim.d_year
-> Redistribute Motion 3:3 (slice2; segments: 3)
Hash Key: customer.c_customer_id, customer.c_first_name, customer.c_last_name, customer.c_preferred_cust_flag, customer.c_birth_country, customer.c_login, customer.c_email_address, date_dim.d_year
-> Streaming Partial HashAggregate
Group Key: customer.c_customer_id, customer.c_first_name, customer.c_last_name, customer.c_preferred_cust_flag, customer.c_birth_country, customer.c_login, customer.c_email_address, date_dim.d_year
-> Hash Join
Hash Cond: (store_sales.ss_customer_sk = customer.c_customer_sk)
-> Redistribute Motion 3:3 (slice3; segments: 3)
Hash Key: store_sales.ss_customer_sk
-> Hash Join
Hash Cond: (store_sales.ss_sold_date_sk = date_dim.d_date_sk)
-> Seq Scan on store_sales
-> Hash
-> Broadcast Motion 3:3 (slice4; segments: 3)
-> Seq Scan on date_dim
Filter: ((d_year = 2001) OR (d_year = 2002))
-> Hash
-> Seq Scan on customer
-> Hash Join
Hash Cond: (store_sales.ss_customer_sk = customer.c_customer_sk)
-> Redistribute Motion 3:3 (slice3; segments: 3)
Hash Key: store_sales.ss_customer_sk
-> Hash Join
Hash Cond: (store_sales.ss_sold_date_sk = date_dim.d_date_sk)
-> Seq Scan on store_sales
-> Hash
-> Broadcast Motion 3:3 (slice4; segments: 3)
-> Seq Scan on date_dim
Filter: ((d_year = 2001) OR (d_year = 2002))
-> Hash
-> Seq Scan on customer
-> Result
Filter: (((('c'::text) = 's'::text) AND (date_dim_1.d_year = 2001) AND ((sum(((((catalog_sales.cs_ext_list_price - catalog_sales.cs_ext_wholesale_cost) - catalog_sales.cs_ext_discount_amt) + catalog_sales.cs_ext_sales_price) / '2'::numeric))) > '0'::numeric)) OR ((('c'::text) = 's'::text) AND (date_dim_1.d_year = 2002)) OR ((('c'::text) = 'c'::text) AND (date_dim_1.d_year = 2001) AND ((sum(((((catalog_sales.cs_ext_list_price - catalog_sales.cs_ext_wholesale_cost) - catalog_sales.cs_ext_discount_amt) + catalog_sales.cs_ext_sales_price) / '2'::numeric))) > '0'::numeric)) OR ((('c'::text) = 'c'::text) AND (date_dim_1.d_year = 2002)) OR ((('c'::text) = 'w'::text) AND (date_dim_1.d_year = 2001) AND ((sum(((((catalog_sales.cs_ext_list_price - catalog_sales.cs_ext_wholesale_cost) - catalog_sales.cs_ext_discount_amt) + catalog_sales.cs_ext_sales_price) / '2'::numeric))) > '0'::numeric)) OR ((('c'::text) = 'w'::text) AND (date_dim_1.d_year = 2002)))
-> Finalize HashAggregate
-> HashAggregate
Group Key: customer_1.c_customer_id, customer_1.c_first_name, customer_1.c_last_name, customer_1.c_preferred_cust_flag, customer_1.c_birth_country, customer_1.c_login, customer_1.c_email_address, date_dim_1.d_year
-> Redistribute Motion 3:3 (slice5; segments: 3)
Hash Key: customer_1.c_customer_id, customer_1.c_first_name, customer_1.c_last_name, customer_1.c_preferred_cust_flag, customer_1.c_birth_country, customer_1.c_login, customer_1.c_email_address, date_dim_1.d_year
-> Streaming Partial HashAggregate
Group Key: customer_1.c_customer_id, customer_1.c_first_name, customer_1.c_last_name, customer_1.c_preferred_cust_flag, customer_1.c_birth_country, customer_1.c_login, customer_1.c_email_address, date_dim_1.d_year
-> Hash Join
Hash Cond: (catalog_sales.cs_bill_customer_sk = customer_1.c_customer_sk)
-> Redistribute Motion 3:3 (slice6; segments: 3)
Hash Key: catalog_sales.cs_bill_customer_sk
-> Hash Join
Hash Cond: (catalog_sales.cs_sold_date_sk = date_dim_1.d_date_sk)
-> Seq Scan on catalog_sales
-> Hash
-> Broadcast Motion 3:3 (slice7; segments: 3)
-> Seq Scan on date_dim date_dim_1
Filter: ((d_year = 2001) OR (d_year = 2002))
-> Hash
-> Seq Scan on customer customer_1
-> Hash Join
Hash Cond: (catalog_sales.cs_bill_customer_sk = customer_1.c_customer_sk)
-> Redistribute Motion 3:3 (slice6; segments: 3)
Hash Key: catalog_sales.cs_bill_customer_sk
-> Hash Join
Hash Cond: (catalog_sales.cs_sold_date_sk = date_dim_1.d_date_sk)
-> Seq Scan on catalog_sales
-> Hash
-> Broadcast Motion 3:3 (slice7; segments: 3)
-> Seq Scan on date_dim date_dim_1
Filter: ((d_year = 2001) OR (d_year = 2002))
-> Hash
-> Seq Scan on customer customer_1
-> Result
Filter: (((('w'::text) = 's'::text) AND (date_dim_2.d_year = 2001) AND ((sum(((((web_sales.ws_ext_list_price - web_sales.ws_ext_wholesale_cost) - web_sales.ws_ext_discount_amt) + web_sales.ws_ext_sales_price) / '2'::numeric))) > '0'::numeric)) OR ((('w'::text) = 's'::text) AND (date_dim_2.d_year = 2002)) OR ((('w'::text) = 'c'::text) AND (date_dim_2.d_year = 2001) AND ((sum(((((web_sales.ws_ext_list_price - web_sales.ws_ext_wholesale_cost) - web_sales.ws_ext_discount_amt) + web_sales.ws_ext_sales_price) / '2'::numeric))) > '0'::numeric)) OR ((('w'::text) = 'c'::text) AND (date_dim_2.d_year = 2002)) OR ((('w'::text) = 'w'::text) AND (date_dim_2.d_year = 2001) AND ((sum(((((web_sales.ws_ext_list_price - web_sales.ws_ext_wholesale_cost) - web_sales.ws_ext_discount_amt) + web_sales.ws_ext_sales_price) / '2'::numeric))) > '0'::numeric)) OR ((('w'::text) = 'w'::text) AND (date_dim_2.d_year = 2002)))
-> Finalize HashAggregate
-> HashAggregate
Group Key: customer_2.c_customer_id, customer_2.c_first_name, customer_2.c_last_name, customer_2.c_preferred_cust_flag, customer_2.c_birth_country, customer_2.c_login, customer_2.c_email_address, date_dim_2.d_year
-> Redistribute Motion 3:3 (slice8; segments: 3)
Hash Key: customer_2.c_customer_id, customer_2.c_first_name, customer_2.c_last_name, customer_2.c_preferred_cust_flag, customer_2.c_birth_country, customer_2.c_login, customer_2.c_email_address, date_dim_2.d_year
-> Streaming Partial HashAggregate
Group Key: customer_2.c_customer_id, customer_2.c_first_name, customer_2.c_last_name, customer_2.c_preferred_cust_flag, customer_2.c_birth_country, customer_2.c_login, customer_2.c_email_address, date_dim_2.d_year
-> Hash Join
Hash Cond: (web_sales.ws_bill_customer_sk = customer_2.c_customer_sk)
-> Redistribute Motion 3:3 (slice9; segments: 3)
Hash Key: web_sales.ws_bill_customer_sk
-> Hash Join
Hash Cond: (web_sales.ws_sold_date_sk = date_dim_2.d_date_sk)
-> Seq Scan on web_sales
-> Hash
-> Broadcast Motion 3:3 (slice10; segments: 3)
-> Seq Scan on date_dim date_dim_2
Filter: ((d_year = 2001) OR (d_year = 2002))
-> Hash
-> Seq Scan on customer customer_2
-> Hash Join
Hash Cond: (web_sales.ws_bill_customer_sk = customer_2.c_customer_sk)
-> Redistribute Motion 3:3 (slice9; segments: 3)
Hash Key: web_sales.ws_bill_customer_sk
-> Hash Join
Hash Cond: (web_sales.ws_sold_date_sk = date_dim_2.d_date_sk)
-> Seq Scan on web_sales
-> Hash
-> Broadcast Motion 3:3 (slice10; segments: 3)
-> Seq Scan on date_dim date_dim_2
Filter: ((d_year = 2001) OR (d_year = 2002))
-> Hash
-> Seq Scan on customer customer_2
-> Redistribute Motion 1:3 (slice11)
-> Limit
-> Gather Motion 3:1 (slice12; segments: 3)
Expand Down Expand Up @@ -5553,5 +5547,5 @@ INSERT INTO pg_statistic VALUES (
Filter: ((share0_ref2.sale_type = 's'::text) AND (share0_ref2.d_year = 2001) AND (share0_ref2.sum > '0'::numeric))
-> Shared Scan (share slice:id 18:0)
Optimizer: GPORCA
(125 rows)
(119 rows)

Loading