Skip to content

Commit

Permalink
arrow-ipc: Add test for streaming IPC with REE dicts
Browse files Browse the repository at this point in the history
  • Loading branch information
brancz committed Sep 17, 2024
1 parent 0491294 commit 6f2a14e
Showing 1 changed file with 63 additions and 2 deletions.
65 changes: 63 additions & 2 deletions arrow-ipc/src/reader/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ impl StreamDecoder {
#[cfg(test)]
mod tests {
use super::*;
use crate::writer::StreamWriter;
use arrow_array::{Int32Array, Int64Array, RecordBatch};
use crate::writer::{IpcWriteOptions, StreamWriter};
use arrow_array::{Int32Array, Int64Array, RecordBatch, RunArray, DictionaryArray, types::Int32Type};
use arrow_schema::{DataType, Field, Schema};

// Further tests in arrow-integration-testing/tests/ipc_reader.rs
Expand Down Expand Up @@ -315,4 +315,65 @@ mod tests {
let err = decoder.finish().unwrap_err().to_string();
assert_eq!(err, "Ipc error: Unexpected End of Stream");
}

#[test]
fn test_read_ree_dict_record_batches_from_buffer() {
let schema = Schema::new(vec![
Field::new(
"test1",
DataType::RunEndEncoded(
Arc::new(Field::new("run_ends".to_string(), DataType::Int32, false)),
Arc::new(Field::new_dict(
"values".to_string(),
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
true,
0,
false,
)),
),
true,
),
]);
let batch = RecordBatch::try_new(
schema.clone().into(),
vec![
Arc::new(
RunArray::try_new(
&Int32Array::from(vec![1, 2, 3]),
&vec![Some("a"), None, Some("a")]
.into_iter()
.collect::<DictionaryArray<Int32Type>>(),
)
.expect("Failed to create RunArray"),
),
],
)
.expect("Failed to create RecordBatch");

let mut buffer = vec![];
{
let mut writer = StreamWriter::try_new_with_options(
&mut buffer,
&schema,
IpcWriteOptions::default().with_preserve_dict_id(false),
)
.expect("Failed to create StreamWriter");
writer.write(&batch).expect("Failed to write RecordBatch");
writer.finish().expect("Failed to finish StreamWriter");
}

let mut decoder = StreamDecoder::new();
let buf = &mut Buffer::from(buffer.as_slice());
while let Some(batch) = decoder
.decode(buf)
.map_err(|e| {
ArrowError::ExternalError(format!("Failed to decode record batch: {}", e).into())
})
.expect("Failed to decode record batch")
{
assert_eq!(batch, batch);
}

decoder.finish().expect("Failed to finish decoder");
}
}

0 comments on commit 6f2a14e

Please sign in to comment.