diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 5b1a29665868..43f698c24d05 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -68,8 +68,54 @@ use crate::{ RecordBatchStream, SendableRecordBatchStream, Statistics, }; -/// join execution plan executes partitions in parallel and combines them into a set of -/// partitions. +/// Join execution plan that executes equi-join predicates on multiple partitions using Sort-Merge +/// join algorithm and applies an optional filter post join. Can be used to join arbitrarily large +/// inputs where one or both of the inputs don't fit in the available memory. +/// +/// # Join Expressions +/// +/// Equi-join predicate (e.g. ` = `) expressions are represented by [`Self::on`]. +/// +/// Non-equality predicates, which can not be pushed down to join inputs (e.g. +/// ` != `) are known as "filter expressions" and are evaluated +/// after the equijoin predicates. They are represented by [`Self::filter`]. These are optional +/// expressions. +/// +/// # Sorting +/// +/// Assumes that both the left and right input to the join are pre-sorted. It is not the +/// responisibility of this execution plan to sort the inputs. +/// +/// # "Streamed" vs "Buffered" +/// +/// The number of record batches of streamed input currently present in the memory will depend +/// on the output batch size of the execution plan. There is no spilling support for streamed input. +/// The comparisons are performed from values of join keys in streamed input with the values of +/// join keys in buffered input. One row in streamed record batch could be matched with multiple rows in +/// buffered input batches. The streamed input is managed through the states in `StreamedState` +/// and streamed input batches are represented by `StreamedBatch`. +/// +/// Buffered input is buffered for all record batches having the same value of join key. +/// If the memory limit increases beyond the specified value and spilling is enabled, +/// buffered batches could be spilled to disk. If spilling is disabled, the execution +/// will fail under the same conditions. Multiple record batches of buffered could currently reside +/// in memory/disk during the exectution. The number of buffered batches residing in +/// memory/disk depends on the number of rows of buffered input having the same value +/// of join key as that of streamed input rows currently present in memory. Due to pre-sorted inputs, +/// the algorithm understands when it is not needed anymore, and releases the buffered batches +/// from memory/disk. The buffered input is managed through the states in `BufferedState` +/// and buffered input batches are represented by `BufferedBatch`. +/// +/// Depending on the type of join, left or right input may be selected as streamed or buffered +/// respectively. For example, in a left-outer join, the left execution plan will be selected as +/// streamed input while in a right-outer join, the right execution plan will be selected as the +/// streamed input. +/// +/// Reference for the algorithm: +/// . +/// +/// Helpful short video demonstration: +/// . #[derive(Debug, Clone)] pub struct SortMergeJoinExec { /// Left sorted joining execution plan @@ -529,6 +575,9 @@ struct StreamedJoinedChunk { buffered_indices: UInt64Builder, } +/// Represents a record batch from streamed input. +/// +/// Also stores information of matching rows from buffered batches. struct StreamedBatch { /// The streamed record batch pub batch: RecordBatch, @@ -667,8 +716,8 @@ impl BufferedBatch { } } -/// Sort-merge join stream that consumes streamed and buffered data stream -/// and produces joined output +/// Sort-Merge join stream that consumes streamed and buffered data streams +/// and produces joined output stream. struct SortMergeJoinStream { /// Current state of the stream pub state: SortMergeJoinState,