diff --git a/arrow-ipc/src/reader/stream.rs b/arrow-ipc/src/reader/stream.rs index 64191a22b33e..9cf81aaa0d1d 100644 --- a/arrow-ipc/src/reader/stream.rs +++ b/arrow-ipc/src/reader/stream.rs @@ -275,8 +275,8 @@ impl StreamDecoder { #[cfg(test)] mod tests { use super::*; - use crate::writer::StreamWriter; - use arrow_array::{Int32Array, Int64Array, RecordBatch}; + use crate::writer::{IpcWriteOptions, StreamWriter}; + use arrow_array::{Int32Array, Int64Array, RecordBatch, RunArray, DictionaryArray, types::Int32Type}; use arrow_schema::{DataType, Field, Schema}; // Further tests in arrow-integration-testing/tests/ipc_reader.rs @@ -315,4 +315,65 @@ mod tests { let err = decoder.finish().unwrap_err().to_string(); assert_eq!(err, "Ipc error: Unexpected End of Stream"); } + + #[test] + fn test_read_ree_dict_record_batches_from_buffer() { + let schema = Schema::new(vec![ + Field::new( + "test1", + DataType::RunEndEncoded( + Arc::new(Field::new("run_ends".to_string(), DataType::Int32, false)), + Arc::new(Field::new_dict( + "values".to_string(), + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + true, + 0, + false, + )), + ), + true, + ), + ]); + let batch = RecordBatch::try_new( + schema.clone().into(), + vec![ + Arc::new( + RunArray::try_new( + &Int32Array::from(vec![1, 2, 3]), + &vec![Some("a"), None, Some("a")] + .into_iter() + .collect::>(), + ) + .expect("Failed to create RunArray"), + ), + ], + ) + .expect("Failed to create RecordBatch"); + + let mut buffer = vec![]; + { + let mut writer = StreamWriter::try_new_with_options( + &mut buffer, + &schema, + IpcWriteOptions::default().with_preserve_dict_id(false), + ) + .expect("Failed to create StreamWriter"); + writer.write(&batch).expect("Failed to write RecordBatch"); + writer.finish().expect("Failed to finish StreamWriter"); + } + + let mut decoder = StreamDecoder::new(); + let buf = &mut Buffer::from(buffer.as_slice()); + while let Some(batch) = decoder + .decode(buf) + .map_err(|e| { + ArrowError::ExternalError(format!("Failed to decode record batch: {}", e).into()) + }) + .expect("Failed to decode record batch") + { + assert_eq!(batch, batch); + } + + decoder.finish().expect("Failed to finish decoder"); + } }