Skip to content

Commit

Permalink
splitting spillable buffer and sort
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Mar 27, 2023
1 parent 11ac836 commit da23c9c
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 44 deletions.
15 changes: 3 additions & 12 deletions datafusion/core/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl SizedRecordBatchStream {
batches: Vec<Arc<RecordBatch>>,
mut metrics: MemTrackingMetrics,
) -> Self {
let size = batches.iter().map(|b| batch_byte_size(b)).sum::<usize>();
let size = batches.iter().map(|b| b.get_array_memory_size()).sum::<usize>();
metrics.init_mem_used(size);
SizedRecordBatchStream {
schema,
Expand Down Expand Up @@ -191,7 +191,7 @@ pub fn compute_record_batch_statistics(
) -> Statistics {
let nb_rows = batches.iter().flatten().map(RecordBatch::num_rows).sum();

let total_byte_size = batches.iter().flatten().map(batch_byte_size).sum();
let total_byte_size = batches.iter().flatten().map(|b| b.get_array_memory_size()).sum();

let projection = match projection {
Some(p) => p,
Expand Down Expand Up @@ -621,7 +621,7 @@ impl IPCWriter {
self.writer.write(batch)?;
self.num_batches += 1;
self.num_rows += batch.num_rows() as u64;
let num_bytes: usize = batch_byte_size(batch);
let num_bytes: usize = batch.get_array_memory_size();
self.num_bytes += num_bytes as u64;
Ok(())
}
Expand All @@ -636,12 +636,3 @@ impl IPCWriter {
&self.path
}
}

/// Returns the total number of bytes of memory occupied physically by this batch.
pub fn batch_byte_size(batch: &RecordBatch) -> usize {
batch
.columns()
.iter()
.map(|array| array.get_array_memory_size())
.sum()
}
200 changes: 168 additions & 32 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::execution::memory_pool::{
human_readable_size, MemoryConsumer, MemoryReservation,
};
use crate::execution::runtime_env::RuntimeEnv;
use crate::physical_plan::common::{batch_byte_size, IPCWriter, SizedRecordBatchStream};
use crate::physical_plan::common::{IPCWriter, SizedRecordBatchStream};
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{
BaselineMetrics, CompositeMetricsSet, MemTrackingMetrics, MetricsSet,
Expand Down Expand Up @@ -61,6 +61,116 @@ use tempfile::NamedTempFile;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::task;

pub trait SpillableBatchBufferItem {
fn get_byte_size(&self) -> usize;
}

struct SpillableBatchBuffer<T>
where
T: SpillableBatchBufferItem
{
schema: SchemaRef,
in_mem_batches: Vec<T>,
spills: Vec<NamedTempFile>,
reservation: MemoryReservation,
runtime: Arc<RuntimeEnv>,
stream_converter: fn(&mut SpillableBatchBuffer) -> Result<SendableRecordBatchStream>,
}

impl<T> SpillableBatchBuffer<T>
where
T: SpillableBatchBufferItem
{
fn new(
schema: SchemaRef,
reservation: MemoryReservation,
runtime: Arc<RuntimeEnv>,
// stream_converter: fn(&mut SpillableBatchBuffer) -> Result<SendableRecordBatchStream>,
) -> Self {
Self {
schema,
in_mem_batches: Default::default(),
spills: Default::default(),
reservation,
runtime,
stream_converter,
}
}

async fn insert_batch(&mut self, batch: T) -> Result<()> {
let size = batch.get_byte_size();
if self.reservation.try_grow(size).is_err() {
self.spill().await?;
self.reservation.try_grow(size)?
}
self.in_mem_batches.push(batch);

Ok(())
}

async fn spill(&mut self) -> Result<()> {
let spillfile = self.runtime.disk_manager.create_tmp_file("BatchBuffer")?;
// let mut stream = (self.stream_converter)(self)?;
// let write_schema = self.schema.clone();

// let (sender, receiver) = tokio::sync::mpsc::channel(2);
// let path: PathBuf = spillfile.path().into();
// let handle = task::spawn_blocking(move || SpillableBatchBuffer::write_ipc(receiver, path, write_schema));
// while let Some(item) = stream.next().await {
// sender.send(item).await.ok();
// }
// drop(sender);
// match handle.await {
// Ok(r) => r,
// Err(e) => Err(DataFusionError::Execution(format!(
// "Error occurred while spilling {e}"
// ))),
// };
self.spills.push(spillfile);

Ok(())
}

fn write_ipc(
mut receiver: Receiver<Result<RecordBatch>>,
path: PathBuf,
schema: SchemaRef,
) -> Result<()> {
let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
while let Some(batch) = receiver.blocking_recv() {
writer.write(&batch?)?;
}
writer.finish()?;
debug!(
"Spilled {} batches of total {} rows to disk, memory released {}",
writer.num_batches,
writer.num_rows,
human_readable_size(writer.num_bytes as usize),
);
Ok(())
}

fn read_ipc(
path: NamedTempFile,
schema: SchemaRef,
) -> Result<SendableRecordBatchStream> {
let (sender, receiver): (
Sender<Result<RecordBatch>>,
Receiver<Result<RecordBatch>>,
) = tokio::sync::mpsc::channel(2);
let join_handle = task::spawn_blocking(move || {
if let Err(e) = read_spill(sender, path.path()) {
error!("Failure while reading spill file: {:?}. Error: {}", path, e);
}
});
Ok(RecordBatchReceiverStream::create(
&schema,
receiver,
join_handle,
))
}
}

/// Sort arbitrary size of data to get a total order (may spill several times during sorting based on free memory available).
///
/// The basic architecture of the algorithm:
Expand All @@ -83,6 +193,7 @@ struct ExternalSorter {
fetch: Option<usize>,
reservation: MemoryReservation,
partition_id: usize,
batch_buffer: SpillableBatchBuffer<BatchWithSortArray>,
}

impl ExternalSorter {
Expand All @@ -101,6 +212,16 @@ impl ExternalSorter {
.with_can_spill(true)
.register(&runtime.memory_pool);

let buffer_reservation = MemoryConsumer::new(format!("ExternalSorter.batch_buffer[{partition_id}]"))
.with_can_spill(true)
.register(&runtime.memory_pool);

let batch_buffer = SpillableBatchBuffer::new(
schema.clone(),
buffer_reservation,
runtime.clone(),
);

Self {
schema,
in_mem_batches: vec![],
Expand All @@ -113,6 +234,7 @@ impl ExternalSorter {
fetch,
reservation,
partition_id,
batch_buffer,
}
}

Expand All @@ -122,40 +244,42 @@ impl ExternalSorter {
tracking_metrics: &MemTrackingMetrics,
) -> Result<()> {
if input.num_rows() > 0 {
let size = batch_byte_size(&input);
if self.reservation.try_grow(size).is_err() {
self.spill().await?;
self.reservation.try_grow(size)?
}

self.metrics.mem_used().add(size);
// can we assume that memory for current batch is ignorable?
// let size = input.get_array_memory_size();
// if self.reservation.try_grow(size).is_err() {
// self.spill().await?;
// self.reservation.try_grow(size)?
// }


// NB timer records time taken on drop, so there are no
// calls to `timer.done()` below.
let _timer = tracking_metrics.elapsed_compute().timer();
let partial = sort_batch(input, self.schema.clone(), &self.expr, self.fetch)?;

// The resulting batch might be smaller (or larger, see #3747) than the input
// batch due to either a propagated limit or the re-construction of arrays. So
// for being reliable, we need to reflect the memory usage of the partial batch.
let new_size = batch_byte_size(&partial.sorted_batch);
match new_size.cmp(&size) {
Ordering::Greater => {
// We don't have to call try_grow here, since we have already used the
// memory (so spilling right here wouldn't help at all for the current
// operation). But we still have to record it so that other requesters
// would know about this unexpected increase in memory consumption.
let new_size_delta = new_size - size;
self.reservation.grow(new_size_delta);
self.metrics.mem_used().add(new_size_delta);
}
Ordering::Less => {
let size_delta = size - new_size;
self.reservation.shrink(size_delta);
self.metrics.mem_used().sub(size_delta);
}
Ordering::Equal => {}
}
self.in_mem_batches.push(partial);
self.metrics.mem_used().add(partial.sorted_batch.get_array_memory_size());
self.batch_buffer.insert_batch(partial).await?;

// // The resulting batch might be smaller (or larger, see #3747) than the input
// // batch due to either a propagated limit or the re-construction of arrays. So
// // for being reliable, we need to reflect the memory usage of the partial batch.
// let new_size = batch_byte_size(&partial.sorted_batch);
// match new_size.cmp(&size) {
// Ordering::Greater => {
// // We don't have to call try_grow here, since we have already used the
// // memory (so spilling right here wouldn't help at all for the current
// // operation). But we still have to record it so that other requesters
// // would know about this unexpected increase in memory consumption.
// let new_size_delta = new_size - size;
// self.reservation.grow(new_size_delta);
// self.metrics.mem_used().add(new_size_delta);
// }
// Ordering::Less => {
// let size_delta = size - new_size;
// self.reservation.shrink(size_delta);
// self.metrics.mem_used().sub(size_delta);
// }
// Ordering::Equal => {}
// }
}
Ok(())
}
Expand Down Expand Up @@ -487,7 +611,7 @@ impl SortedSizedRecordBatchStream {
sorted_iter: SortedIterator,
mut metrics: MemTrackingMetrics,
) -> Self {
let size = batches.iter().map(batch_byte_size).sum::<usize>()
let size = batches.iter().map(|b| b.get_array_memory_size()).sum::<usize>()
+ sorted_iter.memory_size();
metrics.init_mem_used(size);
let num_cols = batches[0].num_columns();
Expand Down Expand Up @@ -813,6 +937,18 @@ struct BatchWithSortArray {
sorted_batch: RecordBatch,
}

impl SpillableBatchBufferItem for BatchWithSortArray {
fn get_byte_size(&self) -> usize {
self.sorted_batch.get_array_memory_size()
}
}

impl SpillableBatchBufferItem for RecordBatch {
fn get_byte_size(&self) -> usize {
self.get_array_memory_size()
}
}

fn sort_batch(
batch: RecordBatch,
schema: SchemaRef,
Expand Down

0 comments on commit da23c9c

Please sign in to comment.