Skip to content

Commit

Permalink
Simplifications (#12639)
Browse files Browse the repository at this point in the history
  • Loading branch information
akurmustafa authored Sep 27, 2024
1 parent 79d40c4 commit 84e9ce8
Showing 1 changed file with 10 additions and 13 deletions.
23 changes: 10 additions & 13 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ use crate::{
SendableRecordBatchStream, Statistics,
};

use arrow::compute::{concat_batches, lexsort_to_indices, take, SortColumn};
use arrow::compute::{concat_batches, lexsort_to_indices, SortColumn};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow::row::{RowConverter, SortField};
use arrow_array::{Array, RecordBatchOptions, UInt32Array};
use arrow_schema::DataType;
use datafusion_common::utils::get_arrayref_at_indices;
use datafusion_common::{internal_err, Result};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
Expand Down Expand Up @@ -350,12 +351,8 @@ impl ExternalSorter {
self.fetch,
self.reservation.new_empty(),
)
} else if !self.in_mem_batches.is_empty() {
self.in_mem_sort_stream(self.metrics.baseline.clone())
} else {
Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
&self.schema,
))))
self.in_mem_sort_stream(self.metrics.baseline.clone())
}
}

Expand Down Expand Up @@ -500,15 +497,19 @@ impl ExternalSorter {
&mut self,
metrics: BaselineMetrics,
) -> Result<SendableRecordBatchStream> {
assert_ne!(self.in_mem_batches.len(), 0);
if self.in_mem_batches.is_empty() {
return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
&self.schema,
))));
}

// The elapsed compute timer is updated when the value is dropped.
// There is no need for an explicit call to drop.
let elapsed_compute = metrics.elapsed_compute().clone();
let _timer = elapsed_compute.timer();

if self.in_mem_batches.len() == 1 {
let batch = self.in_mem_batches.remove(0);
let batch = self.in_mem_batches.swap_remove(0);
let reservation = self.reservation.take();
return self.sort_batch_stream(batch, metrics, reservation);
}
Expand Down Expand Up @@ -616,11 +617,7 @@ pub fn sort_batch(
lexsort_to_indices(&sort_columns, fetch)?
};

let columns = batch
.columns()
.iter()
.map(|c| take(c.as_ref(), &indices, None))
.collect::<Result<_, _>>()?;
let columns = get_arrayref_at_indices(batch.columns(), &indices)?;

let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
Ok(RecordBatch::try_new_with_options(
Expand Down

0 comments on commit 84e9ce8

Please sign in to comment.