From 9b9217b2778195d2f324398a7814f94ce346aae2 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Wed, 2 Oct 2024 03:21:14 -0700 Subject: [PATCH] Speed up `pad_nulls` for `FixedLenByteArrayBuffer` (#6297) * optimize pad_nulls for fixed_len_byte_array * start refactor of benchmarks * refactor float16 * add fixed_len_byte_array to float16 benches * replace copy_within with vectorizable copy * update comment * move branch on byte_length outside of loop * reduce code duplication while preserving performance gains * formatting * silence clippy * clippy won again --- parquet/benches/arrow_reader.rs | 442 +++++++++++++----- .../array_reader/fixed_len_byte_array.rs | 56 ++- 2 files changed, 368 insertions(+), 130 deletions(-) diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs index 18e16f0a4297..f165adbe897c 100644 --- a/parquet/benches/arrow_reader.rs +++ b/parquet/benches/arrow_reader.rs @@ -68,6 +68,14 @@ fn build_test_schema() -> SchemaDescPtr { OPTIONAL BYTE_ARRAY optional_binary_leaf; REQUIRED FIXED_LEN_BYTE_ARRAY (2) mandatory_f16_leaf (Float16); OPTIONAL FIXED_LEN_BYTE_ARRAY (2) optional_f16_leaf (Float16); + REQUIRED FIXED_LEN_BYTE_ARRAY (2) mandatory_flba2_leaf; + OPTIONAL FIXED_LEN_BYTE_ARRAY (2) optional_flba2_leaf; + REQUIRED FIXED_LEN_BYTE_ARRAY (4) mandatory_flba4_leaf; + OPTIONAL FIXED_LEN_BYTE_ARRAY (4) optional_flba4_leaf; + REQUIRED FIXED_LEN_BYTE_ARRAY (8) mandatory_flba8_leaf; + OPTIONAL FIXED_LEN_BYTE_ARRAY (8) optional_flba8_leaf; + REQUIRED FIXED_LEN_BYTE_ARRAY (16) mandatory_flba16_leaf; + OPTIONAL FIXED_LEN_BYTE_ARRAY (16) optional_flba16_leaf; } "; parse_message_type(message_type) @@ -209,6 +217,50 @@ where InMemoryPageIterator::new(pages) } +// support for fixed_len_byte_arrays +fn build_encoded_flba_bytes_page_iterator( + column_desc: ColumnDescPtr, + null_density: f32, + encoding: Encoding, +) -> impl PageIterator + Clone { + let max_def_level = column_desc.max_def_level(); + let max_rep_level = column_desc.max_rep_level(); + let rep_levels = vec![0; VALUES_PER_PAGE]; + let mut rng = seedable_rng(); + let mut pages: Vec> = Vec::new(); + for _i in 0..NUM_ROW_GROUPS { + let mut column_chunk_pages = Vec::new(); + for _j in 0..PAGES_PER_GROUP { + // generate page + let mut values = Vec::with_capacity(VALUES_PER_PAGE); + let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE); + for _k in 0..VALUES_PER_PAGE { + let def_level = if rng.gen::() < null_density { + max_def_level - 1 + } else { + max_def_level + }; + if def_level == max_def_level { + // create the FLBA(BYTE_LENGTH) value + let value = (0..BYTE_LENGTH).map(|_| rng.gen()).collect::>(); + let value = + ::T::from(value); + values.push(value); + } + def_levels.push(def_level); + } + let mut page_builder = + DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true); + page_builder.add_rep_levels(max_rep_level, &rep_levels); + page_builder.add_def_levels(max_def_level, &def_levels); + page_builder.add_values::(encoding, &values); + column_chunk_pages.push(page_builder.consume()); + } + pages.push(column_chunk_pages); + } + InMemoryPageIterator::new(pages) +} + fn build_encoded_primitive_page_iterator( column_desc: ColumnDescPtr, null_density: f32, @@ -584,6 +636,13 @@ fn create_decimal_by_bytes_reader( } } +fn create_fixed_len_byte_array_reader( + page_iterator: impl PageIterator + 'static, + column_desc: ColumnDescPtr, +) -> Box { + make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap() +} + fn create_byte_array_reader( page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr, @@ -630,6 +689,7 @@ fn bench_byte_decimal( group: &mut BenchmarkGroup, mandatory_column_desc: &ColumnDescPtr, optional_column_desc: &ColumnDescPtr, + encoding: Encoding, min: i128, max: i128, ) where @@ -639,61 +699,71 @@ fn bench_byte_decimal( // all are plain encoding let mut count: usize = 0; - // plain encoded, no NULLs + // no NULLs let data = build_encoded_decimal_bytes_page_iterator::( mandatory_column_desc.clone(), 0.0, - Encoding::PLAIN, + encoding, min, max, ); - group.bench_function("plain encoded, mandatory, no NULLs", |b| { - b.iter(|| { - let array_reader = - create_decimal_by_bytes_reader(data.clone(), mandatory_column_desc.clone()); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); + group.bench_function( + encoding.to_string().to_lowercase() + " encoded, mandatory, no NULLs", + |b| { + b.iter(|| { + let array_reader = + create_decimal_by_bytes_reader(data.clone(), mandatory_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); let data = build_encoded_decimal_bytes_page_iterator::( optional_column_desc.clone(), 0.0, - Encoding::PLAIN, + encoding, min, max, ); - group.bench_function("plain encoded, optional, no NULLs", |b| { - b.iter(|| { - let array_reader = - create_decimal_by_bytes_reader(data.clone(), optional_column_desc.clone()); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); + group.bench_function( + encoding.to_string().to_lowercase() + " encoded, optional, no NULLs", + |b| { + b.iter(|| { + let array_reader = + create_decimal_by_bytes_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); // half null let data = build_encoded_decimal_bytes_page_iterator::( optional_column_desc.clone(), 0.5, - Encoding::PLAIN, + encoding, min, max, ); - group.bench_function("plain encoded, optional, half NULLs", |b| { - b.iter(|| { - let array_reader = - create_decimal_by_bytes_reader(data.clone(), optional_column_desc.clone()); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); + group.bench_function( + encoding.to_string().to_lowercase() + " encoded, optional, half NULLs", + |b| { + b.iter(|| { + let array_reader = + create_decimal_by_bytes_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); } -fn bench_byte_stream_split_f16( +fn bench_f16( group: &mut BenchmarkGroup, mandatory_column_desc: &ColumnDescPtr, optional_column_desc: &ColumnDescPtr, + encoding: Encoding, min: f32, max: f32, ) where @@ -706,113 +776,141 @@ fn bench_byte_stream_split_f16( let data = build_encoded_f16_bytes_page_iterator::( mandatory_column_desc.clone(), 0.0, - Encoding::BYTE_STREAM_SPLIT, + encoding, min, max, ); - group.bench_function("byte_stream_split encoded, mandatory, no NULLs", |b| { - b.iter(|| { - let array_reader = - create_f16_by_bytes_reader(data.clone(), mandatory_column_desc.clone()); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); + group.bench_function( + encoding.to_string().to_lowercase() + " encoded, mandatory, no NULLs", + |b| { + b.iter(|| { + let array_reader = + create_f16_by_bytes_reader(data.clone(), mandatory_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); let data = build_encoded_f16_bytes_page_iterator::( optional_column_desc.clone(), 0.0, - Encoding::BYTE_STREAM_SPLIT, + encoding, min, max, ); - group.bench_function("byte_stream_split encoded, optional, no NULLs", |b| { - b.iter(|| { - let array_reader = - create_f16_by_bytes_reader(data.clone(), optional_column_desc.clone()); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); + group.bench_function( + encoding.to_string().to_lowercase() + " encoded, optional, no NULLs", + |b| { + b.iter(|| { + let array_reader = + create_f16_by_bytes_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); let data = build_encoded_f16_bytes_page_iterator::( optional_column_desc.clone(), 0.5, - Encoding::BYTE_STREAM_SPLIT, + encoding, min, max, ); - group.bench_function("byte_stream_split encoded, optional, half NULLs", |b| { - b.iter(|| { - let array_reader = - create_f16_by_bytes_reader(data.clone(), optional_column_desc.clone()); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); + group.bench_function( + encoding.to_string().to_lowercase() + " encoded, optional, half NULLs", + |b| { + b.iter(|| { + let array_reader = + create_f16_by_bytes_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); } -fn bench_byte_stream_split_decimal( +fn bench_flba( group: &mut BenchmarkGroup, mandatory_column_desc: &ColumnDescPtr, optional_column_desc: &ColumnDescPtr, - min: i128, - max: i128, -) where - T: parquet::data_type::DataType, - T::T: From>, -{ + encoding: Encoding, +) { let mut count: usize = 0; - // byte_stream_split encoded, no NULLs - let data = build_encoded_decimal_bytes_page_iterator::( + encoding.to_string(); + // no NULLs + let data = build_encoded_flba_bytes_page_iterator::( mandatory_column_desc.clone(), 0.0, - Encoding::BYTE_STREAM_SPLIT, - min, - max, + encoding, + ); + group.bench_function( + encoding.to_string().to_lowercase() + " encoded, mandatory, no NULLs", + |b| { + b.iter(|| { + let array_reader = + create_fixed_len_byte_array_reader(data.clone(), mandatory_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, ); - group.bench_function("byte_stream_split encoded, mandatory, no NULLs", |b| { - b.iter(|| { - let array_reader = - create_decimal_by_bytes_reader(data.clone(), mandatory_column_desc.clone()); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); - let data = build_encoded_decimal_bytes_page_iterator::( + let data = build_encoded_flba_bytes_page_iterator::( optional_column_desc.clone(), 0.0, - Encoding::BYTE_STREAM_SPLIT, - min, - max, + encoding, + ); + group.bench_function( + encoding.to_string().to_lowercase() + " encoded, optional, no NULLs", + |b| { + b.iter(|| { + let array_reader = + create_fixed_len_byte_array_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, ); - group.bench_function("byte_stream_split encoded, optional, no NULLs", |b| { - b.iter(|| { - let array_reader = - create_decimal_by_bytes_reader(data.clone(), optional_column_desc.clone()); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); // half null - let data = build_encoded_decimal_bytes_page_iterator::( + let data = build_encoded_flba_bytes_page_iterator::( optional_column_desc.clone(), 0.5, + encoding, + ); + group.bench_function( + encoding.to_string().to_lowercase() + " encoded, optional, half NULLs", + |b| { + b.iter(|| { + let array_reader = + create_fixed_len_byte_array_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); +} + +fn bench_fixed_len_byte_array( + group: &mut BenchmarkGroup, + mandatory_column_desc: &ColumnDescPtr, + optional_column_desc: &ColumnDescPtr, +) { + bench_flba::( + group, + mandatory_column_desc, + optional_column_desc, + Encoding::PLAIN, + ); + bench_flba::( + group, + mandatory_column_desc, + optional_column_desc, Encoding::BYTE_STREAM_SPLIT, - min, - max, ); - group.bench_function("byte_stream_split encoded, optional, half NULLs", |b| { - b.iter(|| { - let array_reader = - create_decimal_by_bytes_reader(data.clone(), optional_column_desc.clone()); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); } fn bench_primitive( @@ -994,31 +1092,82 @@ fn bench_primitive( }); assert_eq!(count, EXPECTED_VALUE_COUNT); }); + + // byte_stream_split encoded, no NULLs + let data = build_encoded_primitive_page_iterator::( + mandatory_column_desc.clone(), + 0.0, + Encoding::BYTE_STREAM_SPLIT, + min, + max, + ); + group.bench_function("byte_stream_split encoded, mandatory, no NULLs", |b| { + b.iter(|| { + let array_reader = + create_primitive_array_reader(data.clone(), mandatory_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + let data = build_encoded_primitive_page_iterator::( + optional_column_desc.clone(), + 0.0, + Encoding::BYTE_STREAM_SPLIT, + min, + max, + ); + group.bench_function("byte_stream_split encoded, optional, no NULLs", |b| { + b.iter(|| { + let array_reader = + create_primitive_array_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + // plain encoded, half NULLs + let data = build_encoded_primitive_page_iterator::( + optional_column_desc.clone(), + 0.5, + Encoding::BYTE_STREAM_SPLIT, + min, + max, + ); + group.bench_function("byte_stream_split encoded, optional, half NULLs", |b| { + b.iter(|| { + let array_reader = + create_primitive_array_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); } -fn byte_stream_split_benches(c: &mut Criterion) { +fn float16_benches(c: &mut Criterion) { let schema = build_test_schema(); - let mut group = c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/Decimal128Array"); - let mandatory_decimal4_leaf_desc = schema.column(12); - let optional_decimal4_leaf_desc = schema.column(13); - bench_byte_stream_split_decimal::( + let mut group = c.benchmark_group("arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Float16Array"); + let mandatory_f16_leaf_desc = schema.column(17); + let optional_f16_leaf_desc = schema.column(18); + bench_f16::( &mut group, - &mandatory_decimal4_leaf_desc, - &optional_decimal4_leaf_desc, - // precision is 16: the max is 9999999999999999 - 9999999999999000, - 9999999999999999, + &mandatory_f16_leaf_desc, + &optional_f16_leaf_desc, + Encoding::PLAIN, + -1.0, + 1.0, ); group.finish(); - let mut group = c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/Float16Array"); + let mut group = c.benchmark_group("arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Float16Array"); let mandatory_f16_leaf_desc = schema.column(17); let optional_f16_leaf_desc = schema.column(18); - bench_byte_stream_split_f16::( + bench_f16::( &mut group, &mandatory_f16_leaf_desc, &optional_f16_leaf_desc, + Encoding::BYTE_STREAM_SPLIT, -1.0, 1.0, ); @@ -1063,19 +1212,36 @@ fn decimal_benches(c: &mut Criterion) { &mut group, &mandatory_decimal3_leaf_desc, &optional_decimal3_leaf_desc, + Encoding::PLAIN, + // precision is 16: the max is 9999999999999999 + 9999999999999000, + 9999999999999999, + ); + group.finish(); + + // parquet FIXED_LEN_BYTE_ARRAY, logical type decimal(16,2) + let mut group = c.benchmark_group("arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Decimal128Array"); + let mandatory_decimal4_leaf_desc = schema.column(12); + let optional_decimal4_leaf_desc = schema.column(13); + bench_byte_decimal::( + &mut group, + &mandatory_decimal4_leaf_desc, + &optional_decimal4_leaf_desc, + Encoding::PLAIN, // precision is 16: the max is 9999999999999999 9999999999999000, 9999999999999999, ); group.finish(); - let mut group = c.benchmark_group("arrow_array_reader/FIXED_LENGTH_BYTE_ARRAY/Decimal128Array"); + let mut group = c.benchmark_group("arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Decimal128Array"); let mandatory_decimal4_leaf_desc = schema.column(12); let optional_decimal4_leaf_desc = schema.column(13); bench_byte_decimal::( &mut group, &mandatory_decimal4_leaf_desc, &optional_decimal4_leaf_desc, + Encoding::BYTE_STREAM_SPLIT, // precision is 16: the max is 9999999999999999 9999999999999000, 9999999999999999, @@ -1560,12 +1726,52 @@ fn add_benches(c: &mut Criterion) { }); assert_eq!(count, EXPECTED_VALUE_COUNT); }); + + group.finish(); + + // fixed_len_byte_array benchmarks + //============================== + + let mut group = c.benchmark_group("arrow_array_reader/FixedLenByteArray(2)"); + let mandatory_flba2_leaf_desc = schema.column(19); + let optional_flba2_leaf_desc = schema.column(20); + bench_fixed_len_byte_array::<2>( + &mut group, + &mandatory_flba2_leaf_desc, + &optional_flba2_leaf_desc, + ); + group.finish(); + + let mut group = c.benchmark_group("arrow_array_reader/FixedLenByteArray(4)"); + let mandatory_flba4_leaf_desc = schema.column(21); + let optional_flba4_leaf_desc = schema.column(22); + bench_fixed_len_byte_array::<4>( + &mut group, + &mandatory_flba4_leaf_desc, + &optional_flba4_leaf_desc, + ); + group.finish(); + + let mut group = c.benchmark_group("arrow_array_reader/FixedLenByteArray(8)"); + let mandatory_flba8_leaf_desc = schema.column(23); + let optional_flba8_leaf_desc = schema.column(24); + bench_fixed_len_byte_array::<8>( + &mut group, + &mandatory_flba8_leaf_desc, + &optional_flba8_leaf_desc, + ); + group.finish(); + + let mut group = c.benchmark_group("arrow_array_reader/FixedLenByteArray(16)"); + let mandatory_flba16_leaf_desc = schema.column(25); + let optional_flba16_leaf_desc = schema.column(26); + bench_fixed_len_byte_array::<16>( + &mut group, + &mandatory_flba16_leaf_desc, + &optional_flba16_leaf_desc, + ); + group.finish(); } -criterion_group!( - benches, - add_benches, - decimal_benches, - byte_stream_split_benches, -); +criterion_group!(benches, add_benches, decimal_benches, float16_benches,); criterion_main!(benches); diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index 01692c242713..4be07ed68f1d 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -36,6 +36,7 @@ use arrow_schema::{DataType as ArrowType, IntervalUnit}; use bytes::Bytes; use half::f16; use std::any::Any; +use std::ops::Range; use std::sync::Arc; /// Returns an [`ArrayReader`] that decodes the provided fixed length byte array column @@ -233,6 +234,29 @@ struct FixedLenByteArrayBuffer { byte_length: Option, } +#[inline] +fn move_values( + buffer: &mut Vec, + byte_length: usize, + values_range: Range, + valid_mask: &[u8], + mut op: F, +) where + F: FnMut(&mut Vec, usize, usize, usize), +{ + for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { + debug_assert!(level_pos >= value_pos); + if level_pos <= value_pos { + break; + } + + let level_pos_bytes = level_pos * byte_length; + let value_pos_bytes = value_pos * byte_length; + + op(buffer, level_pos_bytes, value_pos_bytes, byte_length) + } +} + impl ValuesBuffer for FixedLenByteArrayBuffer { fn pad_nulls( &mut self, @@ -248,18 +272,26 @@ impl ValuesBuffer for FixedLenByteArrayBuffer { .resize((read_offset + levels_read) * byte_length, 0); let values_range = read_offset..read_offset + values_read; - for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { - debug_assert!(level_pos >= value_pos); - if level_pos <= value_pos { - break; - } - - let level_pos_bytes = level_pos * byte_length; - let value_pos_bytes = value_pos * byte_length; - - for i in 0..byte_length { - self.buffer[level_pos_bytes + i] = self.buffer[value_pos_bytes + i] - } + // Move the bytes from value_pos to level_pos. For values of `byte_length` <= 4, + // the simple loop is preferred as the compiler can eliminate the loop via unrolling. + // For `byte_length > 4`, we instead copy from non-overlapping slices. This allows + // the loop to be vectorized, yielding much better performance. + const VEC_CUTOFF: usize = 4; + if byte_length > VEC_CUTOFF { + let op = |buffer: &mut Vec, level_pos_bytes, value_pos_bytes, byte_length| { + let split = buffer.split_at_mut(level_pos_bytes); + let dst = &mut split.1[..byte_length]; + let src = &split.0[value_pos_bytes..value_pos_bytes + byte_length]; + dst.copy_from_slice(src); + }; + move_values(&mut self.buffer, byte_length, values_range, valid_mask, op); + } else { + let op = |buffer: &mut Vec, level_pos_bytes, value_pos_bytes, byte_length| { + for i in 0..byte_length { + buffer[level_pos_bytes + i] = buffer[value_pos_bytes + i] + } + }; + move_values(&mut self.buffer, byte_length, values_range, valid_mask, op); } } }