Skip to content

Commit

Permalink
finishes implementation of Once.
Browse files Browse the repository at this point in the history
Uses exists and setnx for easier implementation details.
  • Loading branch information
Heiss committed May 29, 2024
1 parent 9a37ed3 commit 7ca64d9
Showing 1 changed file with 99 additions and 62 deletions.
161 changes: 99 additions & 62 deletions src/redis/once.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::redis::bool_type::TBool;
use crate::redis::sync::Mutex;
use redis::Client;
use crate::redis::types::Generic;
use redis::{Client, Commands};
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::from_str;
use std::ops::Deref;

/// A struct which provides a way to execute a function only and exactly once.
pub struct Once {
data: Mutex<bool>,
conn: Option<redis::Connection>,
pub struct Once<T> {
data: Generic<T>,
already_called: OnceState,
}

Expand All @@ -26,26 +28,26 @@ pub enum OnceState {
Unknown,
}

impl Once {
impl<T> Once<T>
where
T: Serialize + DeserializeOwned,
{
/// Creates a new `Once` instance.
/// This is a slower version, because it connects to redis and store a default value, if nothing is there.
/// The name has to be unique and the same in all your services, because it is used as a key in redis.
pub fn new(name: &str, client: Client) -> Self {
let data = Mutex::new(TBool::with_value_default(false, name, client));
let data = Generic::new(name, client.clone());
Self {
data,
conn: None,
already_called: OnceState::Unknown,
}
}

/// Creates a new `Once` instance.
/// This is a faster version, because it does not connect to redis, but you have to provide a data object.
pub fn take_data(data: TBool) -> Self {
let data = Mutex::new(data);
pub fn take_data(data: Generic<T>) -> Self {
Self {
data,
conn: None,
already_called: OnceState::Unknown,
}
}
Expand All @@ -54,46 +56,14 @@ impl Once {
/// But this is a slower version, because it connects to redis and store a default value, if nothing is there.
/// If you want to check the completion state without connecting to redis, use [Once::is_completed].
/// Mostly used when you used [Once::take_data].
/// It helps a lot, when multiple calculations are okay, but the final result should be stored only once.
/// So you can check the completion state after each calculation.
///
/// ```
/// use dtypes::redis::sync::Once;
/// use dtypes::redis::sync::Mutex;
/// use dtypes::redis::types::Di32 as i32;
/// use redis::Client;
///
/// let client = Client::open("redis://localhost:6379").unwrap();
/// let mut once = Once::new("test", client.clone());
/// let mut data = Mutex::new(i32::with_value(1, "test_val", client));
/// let mut val = None;
/// assert_eq!(once.check_completion_state(), false);
///
/// // Do some calculations
/// let state = once.call_once(|| {
/// let d;
/// // calculations...
/// d = 35;
/// val = Some(d);
/// }).unwrap();
///
/// {
/// let mut d = data.lock().unwrap();
/// // Locks the data object and we check the completion state.
/// // Otherwise we do not want to store it.
/// assert_eq!(state, false);
/// d.store(val.expect("Here should be a value")).expect("Failed to store value");
/// }
/// ```
pub fn check_completion_state(&mut self) -> bool {
// Pushes a value to the cache if it is not already there.
let mut d = self.data.lock().unwrap();
d.cache.unwrap_or_else(|| {
d.try_get().unwrap_or_else(|| {
d.store(false).expect("Failed to store value");
false
})
})
let val = self.data.client.exists(&self.data.key).unwrap();
self.already_called = match val {
true => OnceState::Called,
false => OnceState::NotCalled,
};
val
}

/// Returns `true` if the function has already been called.
Expand All @@ -113,32 +83,99 @@ impl Once {
/// Calls the function only once.
/// If the function has already been called, it returns an error.
/// Otherwise it calls the function and stores the completion state.
/// The completion state from redis will be returned in the Result, it is not your state.
pub fn call_once<F>(&mut self, f: F) -> Result<bool, OnceError>
/// If there is already a value stored in redis, it pulls this value.
/// So if there is a value already in place, it needs a separate call to get the value.
/// ```
/// use dtypes::redis::sync::Once;
/// use redis::Client;
///
/// let client = Client::open("redis://localhost:6379").unwrap();
/// let mut once: Once<i32> = Once::new("test_once_simple", client.clone());
/// assert_eq!(once.check_completion_state(), false);
///
/// // Do some calculations
/// let state = once.call_once(|| {
/// let d;
/// // calculations...
/// d = 35;
/// d
/// }).unwrap();
///
/// assert_eq!(state, &35);
/// assert_eq!(once.check_completion_state(), true);
/// assert_eq!(once.cached(), Some(&35));
/// ```
pub fn call_once<F>(&mut self, f: F) -> Result<&T, OnceError>
where
F: FnOnce(),
F: FnOnce() -> T,
{
if self.is_completed() {
return Err(OnceError::AlreadyCalled);
}

f();
let prev_val = self.check_completion_state();
self.data
.lock()
.unwrap()
.store(true)
.expect("Failed to store value");
let call_val = f();

let val = serde_json::to_string(&call_val).expect("Failed to serialize value");
let res: Option<String> =
self.data
.client
.set_nx(&self.data.key, val)
.unwrap_or_else(|_| {
self.data
.get_conn()
.get(&self.data.key)
.expect("Failed to get value")
});
let v = res.unwrap();
let parsed_val = from_str(&v).unwrap();
self.data.cache = Some(parsed_val);

self.already_called = OnceState::Called;

Ok(prev_val)
Ok(self.data.acquire())
}
}

impl<T> Deref for Once<T> {
type Target = Generic<T>;

fn deref(&self) -> &Self::Target {
&self.data
}
}

#[cfg(test)]
mod tests {

#[test]
fn test_once_parallel() {
todo!()
use crate::redis::sync::Once;
use redis::Client;

let client = Client::open("redis://localhost:6379").unwrap();
let mut once: Once<i32> = Once::new("test_once_parallel", client.clone());
let mut once2: Once<i32> = Once::new("test_once_parallel", client.clone());

let state = once
.call_once(|| {
let d;
// calculations...
d = 35;
d
})
.unwrap();

let state2 = once2
.call_once(|| {
let d;
// calculations...
d = 1;
d
})
.unwrap();

assert_eq!(state, &35);
assert_eq!(state2, &35);
assert_eq!(once.cached().unwrap(), &35);
}
}

0 comments on commit 7ca64d9

Please sign in to comment.