Skip to content

Commit

Permalink
perf: close partition writers concurrently to improve writes with man…
Browse files Browse the repository at this point in the history
…y partitions

(cherry picked from commit af17bb2)
Signed-off-by: Alex Wilcoxson <[email protected]>

chore: fmt
  • Loading branch information
alexwilcoxson-rel authored and rtyler committed Nov 12, 2024
1 parent 7a3b3ec commit 9b733e3
Showing 1 changed file with 13 additions and 5 deletions.
18 changes: 13 additions & 5 deletions crates/core/src/operations/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use arrow_array::RecordBatch;
use arrow_schema::{ArrowError, SchemaRef as ArrowSchemaRef};
use bytes::Bytes;
use delta_kernel::expressions::Scalar;
use futures::{StreamExt, TryStreamExt};
use indexmap::IndexMap;
use object_store::{path::Path, ObjectStore};
use parquet::arrow::ArrowWriter;
Expand Down Expand Up @@ -217,11 +218,18 @@ impl DeltaWriter {
/// This will flush all remaining data.
pub async fn close(mut self) -> DeltaResult<Vec<Add>> {
let writers = std::mem::take(&mut self.partition_writers);
let mut actions = Vec::new();
for (_, writer) in writers {
let writer_actions = writer.close().await?;
actions.extend(writer_actions);
}
let actions = futures::stream::iter(writers)
.map(|(_, writer)| async move {
let writer_actions = writer.close().await?;
Ok::<_, DeltaTableError>(writer_actions)
})
.buffered(num_cpus::get())
.try_fold(Vec::new(), |mut acc, actions| {
acc.extend(actions);
futures::future::ready(Ok(acc))
})
.await?;

Ok(actions)
}
}
Expand Down

0 comments on commit 9b733e3

Please sign in to comment.