Skip to content

Commit

Permalink
Minor: Improve HashJoinStream docstrings (apache#8070)
Browse files Browse the repository at this point in the history
* Minor: Improve HashJoinStream docstrings

* fix comments

* Update datafusion/physical-plan/src/joins/hash_join.rs

Co-authored-by: comphead <[email protected]>

* Update datafusion/physical-plan/src/joins/hash_join.rs

Co-authored-by: comphead <[email protected]>

---------

Co-authored-by: Daniël Heres <[email protected]>
Co-authored-by: comphead <[email protected]>
  • Loading branch information
3 people authored Nov 7, 2023
1 parent f3c9009 commit 0506a5c
Showing 1 changed file with 74 additions and 47 deletions.
121 changes: 74 additions & 47 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ pub struct HashJoinExec {
pub join_type: JoinType,
/// The output schema for the join
schema: SchemaRef,
/// Build-side data
/// Future that consumes left input and builds the hash table
left_fut: OnceAsync<JoinLeftData>,
/// Shared the `RandomState` for the hashing algorithm
random_state: RandomState,
Expand Down Expand Up @@ -747,27 +747,38 @@ where
Ok(())
}

/// A stream that issues [RecordBatch]es as they arrive from the right of the join.
/// [`Stream`] for [`HashJoinExec`] that does the actual join.
///
/// This stream:
///
/// 1. Reads the entire left input (build) and constructs a hash table
///
/// 2. Streams [RecordBatch]es as they arrive from the right input (probe) and joins
/// them with the contents of the hash table
struct HashJoinStream {
/// Input schema
schema: Arc<Schema>,
/// columns from the left
/// equijoin columns from the left (build side)
on_left: Vec<Column>,
/// columns from the right used to compute the hash
/// equijoin columns from the right (probe side)
on_right: Vec<Column>,
/// join filter
/// optional join filter
filter: Option<JoinFilter>,
/// type of the join
/// type of the join (left, right, semi, etc)
join_type: JoinType,
/// future for data from left side
/// future which builds hash table from left side
left_fut: OnceFut<JoinLeftData>,
/// Keeps track of the left side rows whether they are visited
/// Which left (probe) side rows have been matches while creating output.
/// For some OUTER joins, we need to know which rows have not been matched
/// to produce the correct.
visited_left_side: Option<BooleanBufferBuilder>,
/// right
/// right (probe) input
right: SendableRecordBatchStream,
/// Random state used for hashing initialization
random_state: RandomState,
/// There is nothing to process anymore and left side is processed in case of left join
/// The join output is complete. For outer joins, this is used to
/// distinguish when the input stream is exhausted and when any unmatched
/// rows are output.
is_exhausted: bool,
/// Metrics
join_metrics: BuildProbeJoinMetrics,
Expand All @@ -785,37 +796,51 @@ impl RecordBatchStream for HashJoinStream {
}
}

// Returns build/probe indices satisfying the equality condition.
// On LEFT.b1 = RIGHT.b2
// LEFT Table:
// a1 b1 c1
// 1 1 10
// 3 3 30
// 5 5 50
// 7 7 70
// 9 8 90
// 11 8 110
// 13 10 130
// RIGHT Table:
// a2 b2 c2
// 2 2 20
// 4 4 40
// 6 6 60
// 8 8 80
// 10 10 100
// 12 10 120
// The result is
// "+----+----+-----+----+----+-----+",
// "| a1 | b1 | c1 | a2 | b2 | c2 |",
// "+----+----+-----+----+----+-----+",
// "| 9 | 8 | 90 | 8 | 8 | 80 |",
// "| 11 | 8 | 110 | 8 | 8 | 80 |",
// "| 13 | 10 | 130 | 10 | 10 | 100 |",
// "| 13 | 10 | 130 | 12 | 10 | 120 |",
// "+----+----+-----+----+----+-----+"
// And the result of build and probe indices are:
// Build indices: 4, 5, 6, 6
// Probe indices: 3, 3, 4, 5
/// Returns build/probe indices satisfying the equality condition.
///
/// # Example
///
/// For `LEFT.b1 = RIGHT.b2`:
/// LEFT Table:
/// ```text
/// a1 b1 c1
/// 1 1 10
/// 3 3 30
/// 5 5 50
/// 7 7 70
/// 9 8 90
/// 11 8 110
/// 13 10 130
/// ```
///
/// RIGHT Table:
/// ```text
/// a2 b2 c2
/// 2 2 20
/// 4 4 40
/// 6 6 60
/// 8 8 80
/// 10 10 100
/// 12 10 120
/// ```
///
/// The result is
/// ```text
/// "+----+----+-----+----+----+-----+",
/// "| a1 | b1 | c1 | a2 | b2 | c2 |",
/// "+----+----+-----+----+----+-----+",
/// "| 9 | 8 | 90 | 8 | 8 | 80 |",
/// "| 11 | 8 | 110 | 8 | 8 | 80 |",
/// "| 13 | 10 | 130 | 10 | 10 | 100 |",
/// "| 13 | 10 | 130 | 12 | 10 | 120 |",
/// "+----+----+-----+----+----+-----+"
/// ```
///
/// And the result of build and probe indices are:
/// ```text
/// Build indices: 4, 5, 6, 6
/// Probe indices: 3, 3, 4, 5
/// ```
#[allow(clippy::too_many_arguments)]
pub fn build_equal_condition_join_indices<T: JoinHashMapType>(
build_hashmap: &T,
Expand Down Expand Up @@ -1003,13 +1028,14 @@ impl HashJoinStream {
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
let build_timer = self.join_metrics.build_time.timer();
// build hash table from left (build) side, if not yet done
let left_data = match ready!(self.left_fut.get(cx)) {
Ok(left_data) => left_data,
Err(e) => return Poll::Ready(Some(Err(e))),
};
build_timer.done();

// Reserving memory for visited_left_side bitmap in case it hasn't been initialied yet
// Reserving memory for visited_left_side bitmap in case it hasn't been initialized yet
// and join_type requires to store it
if self.visited_left_side.is_none()
&& need_produce_result_in_final(self.join_type)
Expand All @@ -1024,11 +1050,11 @@ impl HashJoinStream {
let visited_left_side = self.visited_left_side.get_or_insert_with(|| {
let num_rows = left_data.1.num_rows();
if need_produce_result_in_final(self.join_type) {
// these join type need the bitmap to identify which row has be matched or unmatched.
// For the `left semi` join, need to use the bitmap to produce the matched row in the left side
// For the `left` join, need to use the bitmap to produce the unmatched row in the left side with null
// For the `left anti` join, need to use the bitmap to produce the unmatched row in the left side
// For the `full` join, need to use the bitmap to produce the unmatched row in the left side with null
// Some join types need to track which row has be matched or unmatched:
// `left semi` join: need to use the bitmap to produce the matched row in the left side
// `left` join: need to use the bitmap to produce the unmatched row in the left side with null
// `left anti` join: need to use the bitmap to produce the unmatched row in the left side
// `full` join: need to use the bitmap to produce the unmatched row in the left side with null
let mut buffer = BooleanBufferBuilder::new(num_rows);
buffer.append_n(num_rows, false);
buffer
Expand All @@ -1037,6 +1063,7 @@ impl HashJoinStream {
}
});
let mut hashes_buffer = vec![];
// get next right (probe) input batch
self.right
.poll_next_unpin(cx)
.map(|maybe_batch| match maybe_batch {
Expand Down

0 comments on commit 0506a5c

Please sign in to comment.