Skip to content

Commit

Permalink
doc comments & naming & fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Dec 13, 2023
1 parent 130c1ff commit fa45dec
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 23 deletions.
49 changes: 30 additions & 19 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ impl ExecutionPlan for HashJoinExec {
null_equals_null: self.null_equals_null,
reservation,
state: HashJoinStreamState::WaitBuildSide,
build_side_state: BuildSideState::Initial(BuildSideInitialState { left_fut }),
build_side: BuildSide::Initial(BuildSideInitialState { left_fut }),
}))
}

Expand Down Expand Up @@ -788,51 +788,58 @@ where
Ok(())
}

/// Represents state of HashJoin build-side.
enum BuildSideState {
/// Represents build-side of hash join.
enum BuildSide {
/// Indicates that build-side not collected yet
Initial(BuildSideInitialState),
/// Indicates that build-side data has been collected
Ready(BuildSideReadyState),
}

/// Container for BuildSide::Initial related data
struct BuildSideInitialState {
/// Future for building hash table from build-side input
left_fut: OnceFut<JoinLeftData>,
}

/// Container for BuildSide::Ready related data
struct BuildSideReadyState {
/// Collected build-side data
left_data: Arc<JoinLeftData>,
/// Which build-side rows have been matched while creating output.
/// For some OUTER joins, we need to know which rows have not been matched
/// to produce the correct output.
visited_left_side: BooleanBufferBuilder,
}

impl BuildSideState {
/// Tries to extract BuildSideInitialState from BuildSideState enum.
impl BuildSide {
/// Tries to extract BuildSideInitialState from BuildSide enum.
/// Returns an error if state is not Initial.
fn try_into_initial_mut(&mut self) -> Result<&mut BuildSideInitialState> {
match self {
BuildSideState::Initial(state) => Ok(state),
BuildSide::Initial(state) => Ok(state),
_ => Err(DataFusionError::Internal(
"Expected build side in initial state".to_string(),
)),
}
}

/// Tries to extract BuildSideReadyState from BuildSideState enum.
/// Tries to extract BuildSideReadyState from BuildSide enum.
/// Returns an error if state is not Ready.
fn try_into_ready(&self) -> Result<&BuildSideReadyState> {
match self {
BuildSideState::Ready(state) => Ok(state),
BuildSide::Ready(state) => Ok(state),
_ => Err(DataFusionError::Internal(
"Expected build side in ready state".to_string(),
)),
}
}

/// Tries to extract BuildSideReadyState from BuildSideState enum.
/// Tries to extract BuildSideReadyState from BuildSide enum.
/// Returns an error if state is not Ready.
fn try_into_ready_mut(&mut self) -> Result<&mut BuildSideReadyState> {
match self {
BuildSideState::Ready(state) => Ok(state),
BuildSide::Ready(state) => Ok(state),
_ => Err(DataFusionError::Internal(
"Expected build side in ready state".to_string(),
)),
Expand All @@ -854,11 +861,15 @@ enum HashJoinStreamState {
Completed,
}

/// Container for HashJoinStreamState::ProcessProbeBatch related data
struct ProcessProbeBatchState {
/// Current probe-side batch
batch: RecordBatch,
}

impl HashJoinStreamState {
/// Tries to extract ProcessProbeBatchState from HashJoinStreamState enum.
/// Returns an error if state is not ProcessProbeBatchState.
fn try_as_process_probe_batch(&self) -> Result<&ProcessProbeBatchState> {
match self {
HashJoinStreamState::ProcessProbeBatch(state) => Ok(state),
Expand Down Expand Up @@ -900,10 +911,10 @@ struct HashJoinStream {
null_equals_null: bool,
/// Memory reservation
reservation: MemoryReservation,
/// Stream state
/// State of the stream
state: HashJoinStreamState,
/// Build side state
build_side_state: BuildSideState,
/// Build side
build_side: BuildSide,
}

impl RecordBatchStream for HashJoinStream {
Expand Down Expand Up @@ -1179,17 +1190,17 @@ impl HashJoinStream {
}
}

/// Collects build-side data by polling `OnceFut` future from initial build-side state
/// Collects build-side data by polling `OnceFut` future from initialized build-side
///
/// Updates build-side state to `Ready`, and state to `FetchProbeSide`
/// Updates build-side to `Ready`, and state to `FetchProbeSide`
fn collect_build_side(
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
let build_timer = self.join_metrics.build_time.timer();
// build hash table from left (build) side, if not yet done
let left_data = ready!(self
.build_side_state
.build_side
.try_into_initial_mut()?
.left_fut
.get_shared(cx))?;
Expand Down Expand Up @@ -1220,7 +1231,7 @@ impl HashJoinStream {
};

self.state = HashJoinStreamState::FetchProbeBatch;
self.build_side_state = BuildSideState::Ready(BuildSideReadyState {
self.build_side = BuildSide::Ready(BuildSideReadyState {
left_data,
visited_left_side,
});
Expand Down Expand Up @@ -1259,7 +1270,7 @@ impl HashJoinStream {
&mut self,
) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
let state = self.state.try_as_process_probe_batch()?;
let build_side = self.build_side_state.try_into_ready_mut()?;
let build_side = self.build_side.try_into_ready_mut()?;

self.join_metrics.input_batches.add(1);
self.join_metrics.input_rows.add(state.batch.num_rows());
Expand Down Expand Up @@ -1337,7 +1348,7 @@ impl HashJoinStream {
return Ok(StatefulStreamResult::Continue);
}

let build_side = self.build_side_state.try_into_ready()?;
let build_side = self.build_side.try_into_ready()?;

// use the global left bitmap to produce the left indices and right indices
let (left_side, right_side) =
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/stream_join_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::usize;

use crate::joins::utils::{JoinFilter, JoinHashMapType, StatefulStreamResult};
use crate::{handle_async_state, handle_state};
use crate::joins::utils::{JoinFilter, JoinHashMapType, StatefulStreamResult};

use arrow::compute::concat_batches;
use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray, RecordBatch};
Expand Down
5 changes: 2 additions & 3 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,19 +850,18 @@ impl<T: 'static> OnceFut<T> {
}
}

/// Get the result of the computation as a shared reference if it is ready, without consuming it
/// Get shared reference to the result of the computation if it is ready, without consuming it
pub(crate) fn get_shared(&mut self, cx: &mut Context<'_>) -> Poll<Result<Arc<T>>> {
if let OnceFutState::Pending(fut) = &mut self.state {
let r = ready!(fut.poll_unpin(cx));
self.state = OnceFutState::Ready(r);
}

// Cannot use loop as this would trip up the borrow checker
match &self.state {
OnceFutState::Pending(_) => unreachable!(),
OnceFutState::Ready(r) => Poll::Ready(
r.clone()
.map_err(|e| DataFusionError::External(Box::new(e.clone()))),
.map_err(|e| DataFusionError::External(Box::new(e))),
),
}
}
Expand Down

0 comments on commit fa45dec

Please sign in to comment.