Skip to content

Commit

Permalink
destructure option structs
Browse files Browse the repository at this point in the history
  • Loading branch information
Billy Messenger authored and Billy Messenger committed Jan 7, 2024
1 parent 6de648a commit 88b7c9b
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 51 deletions.
70 changes: 42 additions & 28 deletions core/src/read/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,23 +78,29 @@ impl<D: Decoder> ReadDiskStream<D> {
///
/// # Panics
///
/// This will panic if `stream_opts.block_size`, `stream_opts.num_look_ahead_blocks`,
/// or `stream_opts.server_msg_channel_size` is `0`.
/// This will panic if `stream_block_size`, `stream_num_look_ahead_blocks`,
/// or `stream_server_msg_channel_size` is `0`.
pub fn new<P: Into<PathBuf>>(
file: P,
start_frame: usize,
stream_opts: ReadStreamOptions<D>,
) -> Result<ReadDiskStream<D>, D::OpenError> {
assert_ne!(stream_opts.block_size, 0);
assert_ne!(stream_opts.num_look_ahead_blocks, 0);
assert_ne!(stream_opts.server_msg_channel_size, Some(0));
let ReadStreamOptions {
num_cache_blocks,
num_caches,
additional_opts,
num_look_ahead_blocks,
block_size,
server_msg_channel_size,
} = stream_opts;

assert_ne!(block_size, 0);
assert_ne!(num_look_ahead_blocks, 0);
assert_ne!(server_msg_channel_size, Some(0));

// Reserve ample space for the message channels.
let msg_channel_size = stream_opts.server_msg_channel_size.unwrap_or(
((stream_opts.num_cache_blocks + stream_opts.num_look_ahead_blocks) * 4)
+ (stream_opts.num_caches * 4)
+ 8,
);
let msg_channel_size = server_msg_channel_size
.unwrap_or(((num_cache_blocks + num_look_ahead_blocks) * 4) + (num_caches * 4) + 8);

let (to_server_tx, from_client_rx) =
RingBuffer::<ClientToServerMsg<D>>::new(msg_channel_size);
Expand All @@ -110,10 +116,9 @@ impl<D: Decoder> ReadDiskStream<D> {
ReadServerOptions {
file,
start_frame,
num_prefetch_blocks: stream_opts.num_cache_blocks
+ stream_opts.num_look_ahead_blocks,
block_size: stream_opts.block_size,
additional_opts: stream_opts.additional_opts,
num_prefetch_blocks: num_cache_blocks + num_look_ahead_blocks,
block_size,
additional_opts,
},
to_client_tx,
from_client_rx,
Expand All @@ -123,10 +128,10 @@ impl<D: Decoder> ReadDiskStream<D> {
let client = ReadDiskStream::create(
ReadDiskStreamOptions {
start_frame,
num_cache_blocks: stream_opts.num_cache_blocks,
num_look_ahead_blocks: stream_opts.num_look_ahead_blocks,
max_num_caches: stream_opts.num_caches,
block_size: stream_opts.block_size,
num_cache_blocks,
num_look_ahead_blocks,
max_num_caches: num_caches,
block_size,
file_info,
},
to_server_tx,
Expand All @@ -146,12 +151,21 @@ impl<D: Decoder> ReadDiskStream<D> {
from_server_rx: Consumer<ServerToClientMsg<D>>,
close_signal_tx: Producer<Option<HeapData<D::T>>>,
) -> Self {
let num_prefetch_blocks = opts.num_cache_blocks + opts.num_look_ahead_blocks;
let ReadDiskStreamOptions {
start_frame,
num_cache_blocks,
num_look_ahead_blocks,
max_num_caches,
block_size,
file_info,
} = opts;

let read_buffer = DataBlock::new(usize::from(opts.file_info.num_channels), opts.block_size);
let num_prefetch_blocks = num_cache_blocks + num_look_ahead_blocks;

let read_buffer = DataBlock::new(usize::from(file_info.num_channels), block_size);

// Reserve the last two caches as temporary caches.
let max_num_caches = opts.max_num_caches + 2;
let max_num_caches = max_num_caches + 2;

let mut caches: Vec<DataBlockCacheEntry<D::T>> = Vec::with_capacity(max_num_caches);
for _ in 0..max_num_caches {
Expand All @@ -166,15 +180,15 @@ impl<D: Decoder> ReadDiskStream<D> {

let mut prefetch_buffer: Vec<DataBlockEntry<D::T>> =
Vec::with_capacity(num_prefetch_blocks);
let mut wanted_start_frame = opts.start_frame;
let mut wanted_start_frame = start_frame;
for _ in 0..num_prefetch_blocks {
prefetch_buffer.push(DataBlockEntry {
use_cache_index: None,
block: None,
wanted_start_frame,
});

wanted_start_frame += opts.block_size;
wanted_start_frame += block_size;
}

let heap_data = Some(HeapData {
Expand All @@ -192,18 +206,18 @@ impl<D: Decoder> ReadDiskStream<D> {

current_block_index: 0,
next_block_index: 1,
current_block_start_frame: opts.start_frame,
current_block_start_frame: start_frame,
current_frame_in_block: 0,

temp_cache_index,
temp_seek_cache_index,

num_prefetch_blocks,
prefetch_size: num_prefetch_blocks * opts.block_size,
cache_size: opts.num_cache_blocks * opts.block_size,
block_size: opts.block_size,
prefetch_size: num_prefetch_blocks * block_size,
cache_size: num_cache_blocks * block_size,
block_size,

file_info: opts.file_info,
file_info,
fatal_error: false,
}
}
Expand Down
19 changes: 11 additions & 8 deletions core/src/read/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,19 @@ impl<D: Decoder> ReadServer<D> {
from_client_rx: Consumer<ClientToServerMsg<D>>,
close_signal_rx: Consumer<Option<HeapData<D::T>>>,
) -> Result<FileInfo<D::FileParams>, D::OpenError> {
let ReadServerOptions {
file,
start_frame,
num_prefetch_blocks,
block_size,
additional_opts,
} = opts;

let (mut open_tx, mut open_rx) =
RingBuffer::<Result<FileInfo<D::FileParams>, D::OpenError>>::new(1);

std::thread::spawn(move || {
match D::new(
opts.file,
opts.start_frame,
opts.block_size,
opts.additional_opts,
) {
match D::new(file, start_frame, block_size, additional_opts) {
Ok((decoder, file_info)) => {
let num_channels = file_info.num_channels;

Expand All @@ -63,8 +66,8 @@ impl<D: Decoder> ReadServer<D> {
block_pool: Vec::new(),
cache_pool: Vec::new(),
num_channels: usize::from(num_channels),
num_prefetch_blocks: opts.num_prefetch_blocks,
block_size: opts.block_size,
num_prefetch_blocks,
block_size,
run: true,
client_closed: false,
});
Expand Down
21 changes: 15 additions & 6 deletions core/src/write/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,26 @@ impl<E: Encoder> WriteServer<E> {
from_client_rx: Consumer<ClientToServerMsg<E>>,
close_signal_rx: Consumer<Option<HeapData<E::T>>>,
) -> Result<FileInfo<E::FileParams>, E::OpenError> {
let WriteServerOptions {
file,
num_write_blocks,
block_size,
num_channels,
sample_rate,
additional_opts,
} = opts;

let (mut open_tx, mut open_rx) =
RingBuffer::<Result<FileInfo<E::FileParams>, E::OpenError>>::new(1);

std::thread::spawn(move || {
match E::new(
opts.file,
opts.num_channels,
opts.sample_rate,
opts.block_size,
opts.num_write_blocks,
opts.additional_opts,
file,
num_channels,
sample_rate,
block_size,
num_write_blocks,
additional_opts,
) {
Ok((encoder, file_info)) => {
// Push cannot fail because only one message is ever sent.
Expand Down
25 changes: 16 additions & 9 deletions core/src/write/write_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,23 @@ impl<E: Encoder> WriteDiskStream<E> {
sample_rate: u32,
stream_opts: WriteStreamOptions<E>,
) -> Result<WriteDiskStream<E>, E::OpenError> {
let WriteStreamOptions {
additional_opts,
num_write_blocks,
block_size,
server_msg_channel_size,
} = stream_opts;

assert_ne!(num_channels, 0);
assert_ne!(sample_rate, 0);
assert_ne!(stream_opts.block_size, 0);
assert_ne!(stream_opts.num_write_blocks, 0);
assert_ne!(stream_opts.server_msg_channel_size, Some(0));
assert_ne!(block_size, 0);
assert_ne!(num_write_blocks, 0);
assert_ne!(server_msg_channel_size, Some(0));

// Reserve ample space for the message channels.
let msg_channel_size = stream_opts
.server_msg_channel_size
.unwrap_or((stream_opts.num_write_blocks * 4) + 8);
.unwrap_or((num_write_blocks * 4) + 8);

let (to_server_tx, from_client_rx) =
RingBuffer::<ClientToServerMsg<E>>::new(msg_channel_size);
Expand All @@ -70,11 +77,11 @@ impl<E: Encoder> WriteDiskStream<E> {
match WriteServer::spawn(
WriteServerOptions {
file,
num_write_blocks: stream_opts.num_write_blocks,
block_size: stream_opts.block_size,
num_write_blocks,
block_size,
num_channels,
sample_rate,
additional_opts: stream_opts.additional_opts,
additional_opts,
},
to_client_tx,
from_client_rx,
Expand All @@ -85,8 +92,8 @@ impl<E: Encoder> WriteDiskStream<E> {
to_server_tx,
from_server_rx,
close_signal_tx,
stream_opts.num_write_blocks,
stream_opts.block_size,
num_write_blocks,
block_size,
file_info,
);

Expand Down

0 comments on commit 88b7c9b

Please sign in to comment.