Skip to content

Commit

Permalink
Added documentation for SortMergeJoin (#13469)
Browse files Browse the repository at this point in the history
* Added documentation for SortMergeJoin

* Update datafusion/physical-plan/src/joins/sort_merge_join.rs

* Fix documentation

---------

Co-authored-by: Oleks V <[email protected]>
  • Loading branch information
athultr1997 and comphead authored Nov 25, 2024
1 parent 1e67364 commit d63f1ac
Showing 1 changed file with 53 additions and 4 deletions.
57 changes: 53 additions & 4 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,54 @@ use crate::{
RecordBatchStream, SendableRecordBatchStream, Statistics,
};

/// join execution plan executes partitions in parallel and combines them into a set of
/// partitions.
/// Join execution plan that executes equi-join predicates on multiple partitions using Sort-Merge
/// join algorithm and applies an optional filter post join. Can be used to join arbitrarily large
/// inputs where one or both of the inputs don't fit in the available memory.
///
/// # Join Expressions
///
/// Equi-join predicate (e.g. `<col1> = <col2>`) expressions are represented by [`Self::on`].
///
/// Non-equality predicates, which can not be pushed down to join inputs (e.g.
/// `<col1> != <col2>`) are known as "filter expressions" and are evaluated
/// after the equijoin predicates. They are represented by [`Self::filter`]. These are optional
/// expressions.
///
/// # Sorting
///
/// Assumes that both the left and right input to the join are pre-sorted. It is not the
/// responisibility of this execution plan to sort the inputs.
///
/// # "Streamed" vs "Buffered"
///
/// The number of record batches of streamed input currently present in the memory will depend
/// on the output batch size of the execution plan. There is no spilling support for streamed input.
/// The comparisons are performed from values of join keys in streamed input with the values of
/// join keys in buffered input. One row in streamed record batch could be matched with multiple rows in
/// buffered input batches. The streamed input is managed through the states in `StreamedState`
/// and streamed input batches are represented by `StreamedBatch`.
///
/// Buffered input is buffered for all record batches having the same value of join key.
/// If the memory limit increases beyond the specified value and spilling is enabled,
/// buffered batches could be spilled to disk. If spilling is disabled, the execution
/// will fail under the same conditions. Multiple record batches of buffered could currently reside
/// in memory/disk during the exectution. The number of buffered batches residing in
/// memory/disk depends on the number of rows of buffered input having the same value
/// of join key as that of streamed input rows currently present in memory. Due to pre-sorted inputs,
/// the algorithm understands when it is not needed anymore, and releases the buffered batches
/// from memory/disk. The buffered input is managed through the states in `BufferedState`
/// and buffered input batches are represented by `BufferedBatch`.
///
/// Depending on the type of join, left or right input may be selected as streamed or buffered
/// respectively. For example, in a left-outer join, the left execution plan will be selected as
/// streamed input while in a right-outer join, the right execution plan will be selected as the
/// streamed input.
///
/// Reference for the algorithm:
/// <https://en.wikipedia.org/wiki/Sort-merge_join>.
///
/// Helpful short video demonstration:
/// <https://www.youtube.com/watch?v=jiWCPJtDE2c>.
#[derive(Debug, Clone)]
pub struct SortMergeJoinExec {
/// Left sorted joining execution plan
Expand Down Expand Up @@ -529,6 +575,9 @@ struct StreamedJoinedChunk {
buffered_indices: UInt64Builder,
}

/// Represents a record batch from streamed input.
///
/// Also stores information of matching rows from buffered batches.
struct StreamedBatch {
/// The streamed record batch
pub batch: RecordBatch,
Expand Down Expand Up @@ -667,8 +716,8 @@ impl BufferedBatch {
}
}

/// Sort-merge join stream that consumes streamed and buffered data stream
/// and produces joined output
/// Sort-Merge join stream that consumes streamed and buffered data streams
/// and produces joined output stream.
struct SortMergeJoinStream {
/// Current state of the stream
pub state: SortMergeJoinState,
Expand Down

0 comments on commit d63f1ac

Please sign in to comment.