Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: concurrent commit changes to PartitionWriter #2997

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

Dan-J-D
Copy link

@Dan-J-D Dan-J-D commented Nov 17, 2024

Description

Added drain_files_written() to PartitionWriter to drain the not yet committed changes.

Related Issue(s)

None

Documentation

None

@github-actions github-actions bot added the binding/rust Issues for the Rust crate label Nov 17, 2024
Copy link

ACTION NEEDED

delta-rs follows the Conventional Commits specification for release automation.

The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification.

@Dan-J-D Dan-J-D changed the title add: concurrent commit changes to PartitionWriter feat: concurrent commit changes to PartitionWriter Nov 17, 2024
Copy link

codecov bot commented Nov 17, 2024

Codecov Report

Attention: Patch coverage is 57.14286% with 9 lines in your changes missing coverage. Please review.

Project coverage is 72.36%. Comparing base (d4f18b3) to head (07b0bb7).

Files with missing lines Patch % Lines
crates/core/src/operations/writer.rs 57.14% 4 Missing and 5 partials ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##             main    #2997   +/-   ##
=======================================
  Coverage   72.35%   72.36%           
=======================================
  Files         128      128           
  Lines       40467    40464    -3     
  Branches    40467    40464    -3     
=======================================
- Hits        29281    29280    -1     
  Misses       9308     9308           
+ Partials     1878     1876    -2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@hntd187
Copy link
Collaborator

hntd187 commented Nov 17, 2024

What is the motivation behind this change? Can you give more details behind what you are trying to accomplish here?

@Dan-J-D
Copy link
Author

Dan-J-D commented Nov 17, 2024

What is the motivation behind this change? Can you give more details behind what you are trying to accomplish here?

I am making a ingest from a third party kafka alike software and PartitionWriter does alot of the heavy lifting I need. I think that it could also make a lot of other people's implementation for this to be more used.

Essentially, this just expands the default behavior to allow for a multi ingest system that concurrently updates a table. Instead of needing to reopen a new writer every time you would like to commit the newly updated data.

It doesn't break previous versions, aswell as adding new feature for future developments.

@rtyler rtyler self-assigned this Nov 17, 2024
@hntd187
Copy link
Collaborator

hntd187 commented Nov 18, 2024

You have to open a new one every time because you shouldn't be committing partitions concurrently with a locking service or mechanism. I do not think this is correct in that case because if you are sharing a partition writer between commits then you could end up putting add actions in the wrong commits. They are meant to be ordered and conflict free, which I don't know if this would accomplish.

@Dan-J-D
Copy link
Author

Dan-J-D commented Nov 18, 2024

You have to open a new one every time because you shouldn't be committing partitions concurrently with a locking service or mechanism. I do not think this is correct in that case because if you are sharing a partition writer between commits then you could end up putting add actions in the wrong commits. They are meant to be ordered and conflict free, which I don't know if this would accomplish.

When I talk about concurrently i'm more talking about multi system, I am not in need for the locking mechanism. If I remove the locking mechanism, will this solve your issue and PR be merged?

@hntd187
Copy link
Collaborator

hntd187 commented Nov 18, 2024

There wouldn't be anything left in your PR if you removed the locking mechanism here, right?

@Dan-J-D
Copy link
Author

Dan-J-D commented Nov 18, 2024

There wouldn't be anything left in your PR if you removed the locking mechanism here, right?

This would still be left

pub async fn drain_files_written(&self) -> Vec<Add> {
    self.files_written.write().await.drain(..).collect()
}

Dan-J-D and others added 4 commits November 20, 2024 13:12
added `drain_files_written()` in order to do concurrent updates while writing

Signed-off-by: dan-j-d <[email protected]>
Signed-off-by: dan-j-d <[email protected]>
Signed-off-by: stretchadito <[email protected]>
Signed-off-by: dan-j-d <[email protected]>
Signed-off-by: dan-j-d <[email protected]>
@Dan-J-D
Copy link
Author

Dan-J-D commented Nov 20, 2024

@hntd187 I have removed the lock implementation.

Comment on lines +221 to +225
let mut actions = Vec::new();
for (_, writer) in writers {
let writer_actions = writer.close().await?;
actions.extend(writer_actions);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Dan-J-D I'm not sure I understand the motivation of this change. I believe @alexwilcoxson-rel or @wjones127 introduced the original code above to ensure that the process which was running the writer would be able to concurrently produce the Vec<Action>

Based on my understanding of this change, it walks back that performance improvement but I cannot figure out for what gain 🤔

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was on a older version of the library and I just copy & pasted my local copy into github, but there were newer changes already made, so I added them back

@@ -427,10 +419,15 @@ impl PartitionWriter {
Ok(())
}

/// Retrieves the list of [Add] actions associated with the files written.
pub async fn drain_files_written(&mut self) -> Vec<Add> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the goal for this function to allow for data files (.parquet) to be written without the process committing to the Delta table? I have read the discourse with @hntd187 but I'm still struggling to understand how this public API would be used

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So let's say in an event where a server has written a lot of data into the table but hasn't committed the additional files yet and there was a power outage. The files will be in storage but not in the metadata. So this can be used to prevent data loss (While writing data, also creating commits of the newly added files).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would something be calling drain periodically while writes are still happening then? What would be the difference vs closing the writer periodically and creating a new one?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saved cost of reallocation, re-initialization and if you are closing after a certain time frame it is possible to create just a lot of small files which aren't optimized. Then will need to OPTIMIZE.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saved cost of reallocation, re-initialization
understood, I don't know how costly it is to create a new writer

if you are closing after a certain time frame it is possible to create just a lot of small files which aren't optimized
as you write and write over time parquet is flushed to object store to a default size of 100MB files I think. Then close flushes the rest and returns the actions. Is this just trying to avoid flushing any buffered small data to object store then?

Is your writer open for a long period of time constantly receiving data?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I made an alternative to delta-kafka-ingest for my specific use case and it will be receiving data all day.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/rust Issues for the Rust crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants