Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: concurrent commit changes to PartitionWriter #2997

Closed
wants to merge 6 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 24 additions & 27 deletions crates/core/src/operations/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use arrow_array::RecordBatch;
use arrow_schema::{ArrowError, SchemaRef as ArrowSchemaRef};
use bytes::Bytes;
use delta_kernel::expressions::Scalar;
use futures::{StreamExt, TryStreamExt};
use indexmap::IndexMap;
use object_store::{path::Path, ObjectStore};
use parquet::arrow::ArrowWriter;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use tokio::sync::RwLock;
use tracing::debug;

use crate::crate_version;
Expand Down Expand Up @@ -218,18 +218,11 @@ impl DeltaWriter {
/// This will flush all remaining data.
pub async fn close(mut self) -> DeltaResult<Vec<Add>> {
let writers = std::mem::take(&mut self.partition_writers);
let actions = futures::stream::iter(writers)
.map(|(_, writer)| async move {
let writer_actions = writer.close().await?;
Ok::<_, DeltaTableError>(writer_actions)
})
.buffered(num_cpus::get())
.try_fold(Vec::new(), |mut acc, actions| {
acc.extend(actions);
futures::future::ready(Ok(acc))
})
.await?;

let mut actions = Vec::new();
for (_, writer) in writers {
let writer_actions = writer.close().await?;
actions.extend(writer_actions);
}
Comment on lines +221 to +225
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Dan-J-D I'm not sure I understand the motivation of this change. I believe @alexwilcoxson-rel or @wjones127 introduced the original code above to ensure that the process which was running the writer would be able to concurrently produce the Vec<Action>

Based on my understanding of this change, it walks back that performance improvement but I cannot figure out for what gain 🤔

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was on a older version of the library and I just copy & pasted my local copy into github, but there were newer changes already made, so I added them back

Ok(actions)
}
}
Expand Down Expand Up @@ -378,19 +371,18 @@ impl PartitionWriter {
// write file to object store
self.object_store.put(&path, buffer.into()).await?;

self.files_written.push(
create_add(
&self.config.partition_values,
path.to_string(),
file_size,
&metadata,
self.num_indexed_cols,
&self.stats_columns,
)
.map_err(|err| WriteError::CreateAdd {
source: Box::new(err),
})?,
);
let add_metadata = create_add(
&self.config.partition_values,
path.to_string(),
file_size,
&metadata,
self.num_indexed_cols,
&self.stats_columns,
)
.map_err(|err| WriteError::CreateAdd {
source: Box::new(err),
})?;
self.files_written.push(add_metadata);

Ok(())
}
Expand Down Expand Up @@ -427,10 +419,15 @@ impl PartitionWriter {
Ok(())
}

/// Retrieves the list of [Add] actions associated with the files written.
pub async fn drain_files_written(&mut self) -> Vec<Add> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the goal for this function to allow for data files (.parquet) to be written without the process committing to the Delta table? I have read the discourse with @hntd187 but I'm still struggling to understand how this public API would be used

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So let's say in an event where a server has written a lot of data into the table but hasn't committed the additional files yet and there was a power outage. The files will be in storage but not in the metadata. So this can be used to prevent data loss (While writing data, also creating commits of the newly added files).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would something be calling drain periodically while writes are still happening then? What would be the difference vs closing the writer periodically and creating a new one?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saved cost of reallocation, re-initialization and if you are closing after a certain time frame it is possible to create just a lot of small files which aren't optimized. Then will need to OPTIMIZE.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saved cost of reallocation, re-initialization
understood, I don't know how costly it is to create a new writer

if you are closing after a certain time frame it is possible to create just a lot of small files which aren't optimized
as you write and write over time parquet is flushed to object store to a default size of 100MB files I think. Then close flushes the rest and returns the actions. Is this just trying to avoid flushing any buffered small data to object store then?

Is your writer open for a long period of time constantly receiving data?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I made an alternative to delta-kafka-ingest for my specific use case and it will be receiving data all day.

self.files_written.drain(..).collect()
}

/// Close the writer and get the new [Add] actions.
pub async fn close(mut self) -> DeltaResult<Vec<Add>> {
self.flush_arrow_writer().await?;
Ok(self.files_written)
Ok(self.files_written.clone())
Dan-J-D marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Loading