Skip to content

Commit

Permalink
feat: Redis backend
Browse files Browse the repository at this point in the history
  • Loading branch information
rcelha committed Nov 7, 2024
1 parent 0366aae commit 8728b02
Show file tree
Hide file tree
Showing 20 changed files with 694 additions and 6 deletions.
8 changes: 8 additions & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
experimental = ["setup-scripts"]

[script.services]
command = 'bash -c "docker compose up -d --wait ; sleep 1"'

[[profile.default.scripts]]
filter = 'test(redis::) | test(pgsql::)'
setup = ['services']
10 changes: 10 additions & 0 deletions .github/workflows/rust.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,19 @@ jobs:

test:
runs-on: ubuntu-latest
services:
redis:
image: valkey/valkey:7.2.7-alpine
ports:
- 16379:6379
steps:
- uses: actions/checkout@v2
- name: Tests
run: cargo test

build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Build
run: cargo build --release --verbose
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,15 @@ There are a few things that must be done before v0.1.0:
- [x] Support ephemeral port
- [x] Remove the need for an `Option<T>` value for `managed_state` attributes (as long as it has a 'Default')
- [ ] Support 'typed' message/response on client (TODO define what this means)
- [ ] `ServiceObject::send` shouldn't need a type for the member storage
- [x] Handle panics on messages handling
- [x] Error and panic handling on life cycle hooks (probably kill the object)
- [x] Create a test or example to show re-allocation when servers dies
- [x] Sqlite support for sql backends
- [x] PostgreSQL support for sql backends
- [!] Redis support for members storage
- [!] Redis support for state backend (loader and saver)
- [!] Redis support for object placement

### Version 0.2.0

Expand Down
4 changes: 4 additions & 0 deletions README.tpl.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,15 @@ There are a few things that must be done before v0.1.0:
- [x] Support ephemeral port
- [x] Remove the need for an `Option<T>` value for `managed_state` attributes (as long as it has a 'Default')
- [ ] Support 'typed' message/response on client (TODO define what this means)
- [ ] `ServiceObject::send` shouldn't need a type for the member storage
- [x] Handle panics on messages handling
- [x] Error and panic handling on life cycle hooks (probably kill the object)
- [x] Create a test or example to show re-allocation when servers dies
- [x] Sqlite support for sql backends
- [x] PostgreSQL support for sql backends
- [!] Redis support for members storage
- [!] Redis support for state backend (loader and saver)
- [!] Redis support for object placement

### Version 0.2.0

Expand Down
14 changes: 14 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
services:
redis:
image: valkey/valkey:7.2.7-alpine
ports:
- 16379:6379
pgsql:
image: postgres:16.4
restart: always
environment:
POSTGRES_PASSWORD: test
POSTGRES_USER: test
POSTGRES_DB: test
ports:
- 15432:5432
3 changes: 2 additions & 1 deletion rio-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ sqlx = { version = "0.6", features = [
async-stream = "0.3.5"
derive_builder = "0.20.0"
sync_wrapper = "1.0.1"
papaya = "0.1.4"
env_logger = "0.11.5"
log = { version = "0.4.22", features = ["kv"] }
redis = "0.27.5"
bb8-redis = "0.17.0"

[dev-dependencies]
lazy_static = "1.4.0"
1 change: 1 addition & 0 deletions rio-rs/src/cluster/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use chrono::{DateTime, TimeZone, Utc};
use crate::errors::MembershipError;

pub mod local;
pub mod redis;
pub mod sql;

/// Represents a running [Server](crate::server::Server).
Expand Down
118 changes: 118 additions & 0 deletions rio-rs/src/cluster/storage/redis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use async_trait::async_trait;
use bb8_redis::{bb8::Pool, RedisConnectionManager};
use chrono::{DateTime, Utc};
use redis::AsyncCommands;

use super::{Member, MembersStorage, MembershipResult, MembershipUnitResult};

#[derive(Clone)]
pub struct RedisMembersStorage {
pool: Pool<RedisConnectionManager>,
key_prefix: String,
}

impl RedisMembersStorage {
pub async fn from_connect_string(connection_string: &str, key_prefix: Option<String>) -> Self {
let conn_manager = RedisConnectionManager::new(connection_string).expect("TODO");
let pool = Pool::builder().build(conn_manager).await.expect("TODO");
let key_prefix = key_prefix.unwrap_or_default();
RedisMembersStorage { pool, key_prefix }
}
}

fn member_to_string(member: &Member) -> String {
let member = format!(
"{};{};{};{}",
member.ip,
member.port,
member.active(),
member.last_seen().to_rfc3339()
);
member
}

fn parse_member(member: &str) -> Member {
let mut split_member = member.split(";");
let ip = split_member.next().expect("TODO").to_string();
let port = split_member.next().expect("TODO").to_string();
let mut parsed_member = Member::new(ip, port);
parsed_member.active = split_member
.next()
.expect("TODO next")
.parse()
.expect("TODO parse");
let last_seen = split_member.next().expect("TODO");
parsed_member.last_seen = DateTime::parse_from_rfc3339(&last_seen)
.expect("TODO")
.to_utc();
parsed_member
}

#[async_trait]
impl MembersStorage for RedisMembersStorage {
async fn push(&self, member: Member) -> MembershipUnitResult {
let mut client = self.pool.get().await.expect("TODO");
let member_id = format!("{}:{}", member.ip, member.port,);
let member_val = member_to_string(&member);

let key = format!("{}members", self.key_prefix);
let _: () = client
.hset(&key, member_id, member_val)
.await
.expect("TODO");
Ok(())
}

async fn remove(&self, ip: &str, port: &str) -> MembershipUnitResult {
let mut client = self.pool.get().await.expect("TODO");
let member_id = format!("{};{}", ip, port);
let key = format!("{}members", self.key_prefix);
let _: () = client.hdel(&key, member_id).await.expect("TODO");
Ok(())
}

async fn set_is_active(&self, ip: &str, port: &str, is_active: bool) -> MembershipUnitResult {
let mut client = self.pool.get().await.expect("TODO");
let member_id = format!("{};{}", ip, port);
let key = format!("{}members", self.key_prefix);
let raw_member: Option<String> = client.hget(&key, &member_id).await.expect("TODO");
let mut member = raw_member
.map(|x| parse_member(&x))
.unwrap_or_else(|| Member::new(ip.to_string(), port.to_string()));
member.active = is_active;
self.push(member).await?;
Ok(())
}

async fn members(&self) -> MembershipResult<Vec<Member>> {
let mut client = self.pool.get().await.expect("TODO");
let key = format!("{}members", self.key_prefix);
let members_raw: Vec<(String, String)> = client.hgetall(&key).await.expect("TODO");
let members: Vec<Member> = members_raw.iter().map(|(_, x)| parse_member(x)).collect();
Ok(members)
}

async fn notify_failure(&self, ip: &str, port: &str) -> MembershipUnitResult {
let mut client = self.pool.get().await.expect("TODO");
let k = format!("{}member_failures;{};{}", self.key_prefix, ip, port);
let now = chrono::Local::now().to_utc();
let ts = now.timestamp();
let _: () = client.rpush(&k, ts).await.expect("TODO");
let _: () = client.ltrim(&k, 0, 99).await.expect("TODO");
Ok(())
}

async fn member_failures(&self, ip: &str, port: &str) -> MembershipResult<Vec<DateTime<Utc>>> {
let mut client = self.pool.get().await.expect("TODO");
let k = format!("{}member_failures;{};{}", self.key_prefix, ip, port);
let values: Vec<String> = client.lrange(&k, 0, -1).await.expect("TODO");
let parsed_values = values
.iter()
.map(|x| {
let ts: i64 = x.parse().expect("TODO");
DateTime::from_timestamp(ts, 0).expect("TODO").to_utc()
})
.collect();
Ok(parsed_values)
}
}
2 changes: 1 addition & 1 deletion rio-rs/src/cluster/storage/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl MembersStorage for SqlMembersStorage {
sqlx::query("UPDATE cluster_provider_members SET active = $3 WHERE ip = $1 and port = $2")
.bind(ip)
.bind(port)
.bind(is_active as i32)
.bind(is_active)
.execute(&self.pool)
.err_into()
.await
Expand Down
8 changes: 7 additions & 1 deletion rio-rs/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,17 @@ impl From<MembershipError> for ClusterProviderServeError {
}

/// Error type for service object state management
#[derive(Error, Debug)]
#[derive(Error, Debug, PartialEq)]
pub enum LoadStateError {
#[error("object not found")]
ObjectNotFound,

#[error("unknown error")]
Unknown,

#[error("deserialization error")]
DeserializationError,

#[error("serialization error")]
SerializationError,
}
1 change: 1 addition & 0 deletions rio-rs/src/object_placement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use async_trait::async_trait;
use crate::ObjectId;

pub mod local;
pub mod redis;
pub mod sql;

/// Struct providing placement information
Expand Down
76 changes: 76 additions & 0 deletions rio-rs/src/object_placement/redis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
//! SQL implementation of the trait [ObjectPlacementProvider] to work with relational databases
//!
//! This uses [sqlx] under the hood

use std::collections::HashSet;

use async_trait::async_trait;
use bb8_redis::{bb8::Pool, RedisConnectionManager};
use redis::AsyncCommands;

use super::{ObjectPlacement, ObjectPlacementProvider};
use crate::ObjectId;

#[derive(Clone)]
pub struct RedisObjectPlacementProvider {
pool: Pool<RedisConnectionManager>,
key_prefix: String,
}

impl RedisObjectPlacementProvider {
pub async fn from_connect_string(connection_string: &str, key_prefix: Option<String>) -> Self {
let conn_manager = RedisConnectionManager::new(connection_string).expect("TODO");
let pool = Pool::builder().build(conn_manager).await.expect("TODO");
let key_prefix = key_prefix.unwrap_or_default();
Self { pool, key_prefix }
}
}

#[async_trait]
impl ObjectPlacementProvider for RedisObjectPlacementProvider {
async fn update(&self, object_placement: ObjectPlacement) {
let object_id = format!(
"{}:{}",
object_placement.object_id.0, object_placement.object_id.1
);
let k1 = format!("{}{}", self.key_prefix, object_id);
let mut client = self.pool.get().await.expect("TODO");

if let Some(server_address) = object_placement.server_address {
let k2 = format!("{}{}", self.key_prefix, server_address);
let mut pipe = redis::pipe();
pipe.set(&k1, &server_address).sadd(&k2, &object_id);
let _: () = pipe.exec_async(&mut *client).await.expect("TODO");
} else {
// If there is no server associated with the allocation
// it means we can remove the placement associated with the object
let _: () = client.del(&k1).await.expect("TODO: delete");
}
}

async fn lookup(&self, object_id: &ObjectId) -> Option<String> {
let k = format!("{}{}:{}", self.key_prefix, object_id.0, object_id.1);
let mut client = self.pool.get().await.expect("TODO");
let placement: Option<String> = client.get(&k).await.expect("TODO");
placement
}

async fn clean_server(&self, address: String) {
let k = format!("{}{}", self.key_prefix, address);
let mut client = self.pool.get().await.expect("TODO");
let objects_in_server: HashSet<String> =
client.smembers(&k).await.expect("TODO: List objects");
for object_id in objects_in_server.iter() {
let k = format!("{}{}", self.key_prefix, object_id);
let _: () = client.del(&k).await.expect("TODO: clean object placement");
}
let _: () = client.del(&k).await.expect("TODO: delete server alloc");
}

async fn remove(&self, object_id: &ObjectId) {
let object_id = format!("{}:{}", object_id.0, object_id.1);
let k = format!("{}{}", self.key_prefix, object_id);
let mut client = self.pool.get().await.expect("TODO");
let _: () = client.del(&k).await.expect("TODO: clean object placement");
}
}
2 changes: 1 addition & 1 deletion rio-rs/src/state/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ where
if let Some(x) = self.data.get(&k) {
Ok(serde_json::from_str(&x).expect("TODO"))
} else {
Err(LoadStateError::Unknown)
Err(LoadStateError::ObjectNotFound)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions rio-rs/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use serde::de::DeserializeOwned;
use serde::Serialize;

pub mod local;
pub mod redis;
pub mod sql;

/// The `StateLoader` defines an interface to load serialized state from a source
Expand Down
Loading

0 comments on commit 8728b02

Please sign in to comment.