Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use databend_base::uniq_id::GlobalUniq;
use databend_common_base::base::ProgressValues;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
Expand All @@ -31,7 +30,6 @@ use databend_common_storages_parquet::ReadSettings;
use log::debug;
use log::info;
use parking_lot::Mutex;
use parquet::file::metadata::RowGroupMetaData;

use crate::pipelines::memory_settings::MemorySettingsExt;
use crate::pipelines::processors::transforms::aggregator::NewSpilledPayload;
Expand All @@ -43,27 +41,33 @@ use crate::sessions::TableContextSpillProgress;
use crate::spillers::Layout;
use crate::spillers::SpillAdapter;
use crate::spillers::SpillTarget;
use crate::spillers::SpilledDataFile;
use crate::spillers::SpillsBufferPool;
use crate::spillers::SpillsDataWriter;

const BYTES_PER_MIB: usize = 1024 * 1024;

struct PayloadWriter {
path: String,
writer: SpillsDataWriter,
}

impl PayloadWriter {
fn try_create(prefix: &str, writer_pool_bytes: usize) -> Result<Self> {
fn try_create(
prefix: &str,
writer_pool_bytes: usize,
max_row_groups_per_file: usize,
) -> Result<Self> {
let data_operator = DataOperator::instance();
let operator = data_operator.spill_operator();
let buffer_pool = SpillsBufferPool::instance();
let file_path = format!("{}/{}", prefix, GlobalUniq::unique());
let spills_data_writer =
buffer_pool.writer(operator, file_path.clone(), writer_pool_bytes)?;
let spills_data_writer = buffer_pool.writer(
operator,
prefix.to_string(),
writer_pool_bytes,
max_row_groups_per_file,
)?;

Ok(PayloadWriter {
path: file_path,
writer: spills_data_writer,
})
}
Expand All @@ -77,9 +81,8 @@ impl PayloadWriter {
self.writer.flush()
}

fn close(self) -> Result<(String, usize, Vec<RowGroupMetaData>)> {
let (bytes_written, row_groups) = self.writer.close()?;
Ok((self.path, bytes_written, row_groups))
fn close(self) -> Result<Vec<SpilledDataFile>> {
self.writer.close()
}
}

Expand Down Expand Up @@ -114,6 +117,7 @@ struct AggregatePayloadWriters {
spill_prefix: String,
partition_count: usize,
writer_pool_bytes: usize,
max_row_groups_per_file: usize,
writers: Vec<Option<PayloadWriter>>,
write_stats: WriteStats,
ctx: Arc<QueryContext>,
Expand All @@ -124,12 +128,14 @@ impl AggregatePayloadWriters {
prefix: &str,
partition_count: usize,
writer_pool_bytes: usize,
max_row_groups_per_file: usize,
ctx: Arc<QueryContext>,
) -> Self {
AggregatePayloadWriters {
spill_prefix: prefix.to_string(),
partition_count,
writer_pool_bytes,
max_row_groups_per_file,
writers: Self::empty_writers(partition_count),
write_stats: WriteStats::default(),
ctx,
Expand All @@ -147,6 +153,7 @@ impl AggregatePayloadWriters {
self.writers[bucket] = Some(PayloadWriter::try_create(
&self.spill_prefix,
self.writer_pool_bytes,
self.max_row_groups_per_file,
)?);
}

Expand Down Expand Up @@ -187,40 +194,46 @@ impl AggregatePayloadWriters {
continue;
};

let (path, written_size, row_groups) = writer.close()?;

if written_size != 0 {
info!(
"Write aggregate spill finished: (bucket: {}, location: {}, bytes: {}, rows: {}, batch_count: {})",
partition_id,
for file in writer.close()? {
let SpilledDataFile {
path,
written_size,
row_groups.iter().map(|rg| rg.num_rows()).sum::<i64>(),
row_groups.len()
);
}
bytes_written,
row_groups,
} = file;

if bytes_written != 0 {
info!(
"Write aggregate spill finished: (bucket: {}, location: {}, bytes: {}, rows: {}, batch_count: {})",
partition_id,
path,
bytes_written,
row_groups.iter().map(|rg| rg.num_rows()).sum::<i64>(),
row_groups.len()
);
}

self.ctx.add_spill_file(
Location::Remote(path.clone()),
Layout::Aggregate,
written_size,
);
self.ctx.add_spill_file(
Location::Remote(path.clone()),
Layout::Aggregate,
bytes_written,
);

if row_groups.is_empty() {
continue;
}
if row_groups.is_empty() {
continue;
}

if written_size > 0 {
self.write_stats.add_bytes(written_size);
}
if bytes_written > 0 {
self.write_stats.add_bytes(bytes_written);
}

for row_group in row_groups {
self.write_stats.add_rows(row_group.num_rows() as usize);
spilled_payloads.push(NewSpilledPayload {
bucket: partition_id as isize,
location: path.clone(),
row_group,
});
for row_group in row_groups {
self.write_stats.add_rows(row_group.num_rows() as usize);
spilled_payloads.push(NewSpilledPayload {
bucket: partition_id as isize,
location: path.clone(),
row_group,
});
}
}
}

Expand Down Expand Up @@ -353,9 +366,17 @@ impl<P: PartitionStream> NewAggregateSpiller<P> {
.get_settings()
.get_spill_writer_memory_pool_size_mb()?
.saturating_mul(BYTES_PER_MIB);
let max_row_groups_per_file = ctx
.get_settings()
.get_max_parquet_spill_row_groups_per_file()?;

let payload_writers =
AggregatePayloadWriters::create(&spill_prefix, partition_count, writer_pool_bytes, ctx);
let payload_writers = AggregatePayloadWriters::create(
&spill_prefix,
partition_count,
writer_pool_bytes,
max_row_groups_per_file,
ctx,
);

Ok(Self {
memory_settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::sync::Arc;

use async_channel::Receiver;
use async_channel::Sender;
use databend_base::uniq_id::GlobalUniq;
use databend_common_base::base::Progress;
use databend_common_base::base::ProgressValues;
use databend_common_base::runtime::profile::Profile;
Expand Down Expand Up @@ -47,8 +46,6 @@ use crate::spillers::SpillTarget;
use crate::spillers::SpillsBufferPool;

const MATERIALIZED_CTE_SPILL_UNIT_SIZE: usize = 8 * 1024 * 1024;
/// Maximum number of row groups per file.
const MAX_ROW_GROUPS_PER_FILE: usize = 2 << 15;

#[derive(Clone)]
pub enum MaterializedCtePayload {
Expand Down Expand Up @@ -106,6 +103,7 @@ pub struct MaterializedCteSink {
senders: Vec<Sender<MaterializedCtePayload>>,
prefix: String,
writer_pool_bytes: usize,
max_row_groups_per_file: usize,
memory_settings: MemorySettings,
input_data: Option<DataBlock>,
pending_payloads: VecDeque<MaterializedCtePayload>,
Expand All @@ -126,12 +124,14 @@ impl MaterializedCteSink {
let writer_pool_bytes = settings
.get_spill_writer_memory_pool_size_mb()?
.saturating_mul(1024 * 1024);
let max_row_groups_per_file = settings.get_max_parquet_spill_row_groups_per_file()?;
Ok(ProcessorPtr::create(Box::new(Self {
ctx,
input,
senders,
prefix,
writer_pool_bytes,
max_row_groups_per_file,
memory_settings,
input_data: None,
pending_payloads: VecDeque::new(),
Expand All @@ -154,36 +154,37 @@ impl MaterializedCteSink {
let data_operator = DataOperator::instance();
let operator = data_operator.spill_operator();
let buffer_pool = SpillsBufferPool::instance();
let path = format!("{}/{}", self.prefix, GlobalUniq::unique());
let mut writer = buffer_pool.writer(operator, path.clone(), self.writer_pool_bytes)?;
let mut writer = buffer_pool.writer(
operator,
self.prefix.clone(),
self.writer_pool_bytes,
self.max_row_groups_per_file,
)?;

let schema = Arc::new(data_blocks[0].infer_schema());
let mut row_group_ranges = Vec::with_capacity(data_blocks.len());
let mut next_row_group = 0;
let mut payloads = Vec::with_capacity(data_blocks.len());
for block in data_blocks {
writer.write(block)?;
let row_group_count = writer.flush_row_groups()?;
row_group_ranges.push(next_row_group..row_group_count);
next_row_group = row_group_count;
for row_groups in writer.flush_row_groups()? {
payloads.push(MaterializedCtePayload::Spilled(
MaterializedCteSpilledPayload {
path: row_groups.path,
row_groups: Arc::new(row_groups.row_groups),
schema: schema.clone(),
},
));
}
}
let (bytes_written, row_groups) = writer.close()?;
if bytes_written > 0 {
self.ctx.add_spill_file(
Location::Remote(path.clone()),
Layout::Parquet,
bytes_written,
);
for file in writer.close()? {
if file.bytes_written > 0 {
self.ctx.add_spill_file(
Location::Remote(file.path),
Layout::Parquet,
file.bytes_written,
);
}
}
Ok(row_group_ranges
.into_iter()
.map(|range| {
MaterializedCtePayload::Spilled(MaterializedCteSpilledPayload {
path: path.clone(),
row_groups: Arc::new(row_groups[range].to_vec()),
schema: schema.clone(),
})
})
.collect())
Ok(payloads)
}

fn consume_block(&mut self, data_block: DataBlock) -> Result<()> {
Expand All @@ -192,9 +193,7 @@ impl MaterializedCteSink {
self.spilling_bytes += data_block.memory_size();
self.spilling_blocks.push(data_block);

if self.spilling_bytes >= self.spill_unit_size()
|| self.spilling_blocks.len() >= MAX_ROW_GROUPS_PER_FILE
{
if self.spilling_bytes >= self.spill_unit_size() {
self.flush_spilling_blocks()?;
}
} else {
Expand Down
Loading
Loading