This repository has been archived by the owner on Jun 21, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(flags): Basic flags service (#31)
- Loading branch information
1 parent
ae707cb
commit 871441b
Showing
14 changed files
with
594 additions
and
13 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
[package] | ||
name = "feature-flags" | ||
version = "0.1.0" | ||
edition = "2021" | ||
|
||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||
|
||
[dependencies] | ||
anyhow = { workspace = true } | ||
async-trait = { workspace = true } | ||
axum = { workspace = true } | ||
axum-client-ip = { workspace = true } | ||
envconfig = { workspace = true } | ||
tokio = { workspace = true } | ||
tracing = { workspace = true } | ||
tracing-subscriber = { workspace = true, features = ["env-filter"] } | ||
bytes = { workspace = true } | ||
rand = { workspace = true } | ||
redis = { version = "0.23.3", features = [ | ||
"tokio-comp", | ||
"cluster", | ||
"cluster-async", | ||
] } | ||
serde = { workspace = true } | ||
serde_json = { workspace = true } | ||
thiserror = { workspace = true } | ||
|
||
[lints] | ||
workspace = true | ||
|
||
[dev-dependencies] | ||
assert-json-diff = { workspace = true } | ||
once_cell = "1.18.0" | ||
reqwest = { workspace = true } | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
use std::collections::HashMap; | ||
|
||
use axum::http::StatusCode; | ||
use axum::response::{IntoResponse, Response}; | ||
use serde::{Deserialize, Serialize}; | ||
use thiserror::Error; | ||
|
||
#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)] | ||
pub enum FlagsResponseCode { | ||
Ok = 1, | ||
} | ||
|
||
#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)] | ||
#[serde(rename_all = "camelCase")] | ||
pub struct FlagsResponse { | ||
pub error_while_computing_flags: bool, | ||
// TODO: better typing here, support bool responses | ||
pub feature_flags: HashMap<String, String>, | ||
} | ||
|
||
#[derive(Error, Debug)] | ||
pub enum FlagError { | ||
#[error("failed to decode request: {0}")] | ||
RequestDecodingError(String), | ||
#[error("failed to parse request: {0}")] | ||
RequestParsingError(#[from] serde_json::Error), | ||
|
||
#[error("Empty distinct_id in request")] | ||
EmptyDistinctId, | ||
#[error("No distinct_id in request")] | ||
MissingDistinctId, | ||
|
||
#[error("No api_key in request")] | ||
NoTokenError, | ||
#[error("API key is not valid")] | ||
TokenValidationError, | ||
|
||
#[error("rate limited")] | ||
RateLimited, | ||
} | ||
|
||
impl IntoResponse for FlagError { | ||
fn into_response(self) -> Response { | ||
match self { | ||
FlagError::RequestDecodingError(_) | ||
| FlagError::RequestParsingError(_) | ||
| FlagError::EmptyDistinctId | ||
| FlagError::MissingDistinctId => (StatusCode::BAD_REQUEST, self.to_string()), | ||
|
||
FlagError::NoTokenError | FlagError::TokenValidationError => { | ||
(StatusCode::UNAUTHORIZED, self.to_string()) | ||
} | ||
|
||
FlagError::RateLimited => (StatusCode::TOO_MANY_REQUESTS, self.to_string()), | ||
} | ||
.into_response() | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
use std::net::SocketAddr; | ||
|
||
use envconfig::Envconfig; | ||
|
||
#[derive(Envconfig, Clone)] | ||
pub struct Config { | ||
#[envconfig(default = "127.0.0.1:0")] | ||
pub address: SocketAddr, | ||
|
||
#[envconfig(default = "postgres://posthog:posthog@localhost:15432/test_database")] | ||
pub write_database_url: String, | ||
|
||
#[envconfig(default = "postgres://posthog:posthog@localhost:15432/test_database")] | ||
pub read_database_url: String, | ||
|
||
#[envconfig(default = "1024")] | ||
pub max_concurrent_jobs: usize, | ||
|
||
#[envconfig(default = "100")] | ||
pub max_pg_connections: u32, | ||
|
||
#[envconfig(default = "redis://localhost:6379/")] | ||
pub redis_url: String, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
pub mod api; | ||
pub mod config; | ||
pub mod redis; | ||
pub mod router; | ||
pub mod server; | ||
pub mod v0_endpoint; | ||
pub mod v0_request; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
use envconfig::Envconfig; | ||
use tokio::signal; | ||
use tracing_subscriber::layer::SubscriberExt; | ||
use tracing_subscriber::util::SubscriberInitExt; | ||
use tracing_subscriber::{EnvFilter, Layer}; | ||
|
||
use feature_flags::config::Config; | ||
use feature_flags::server::serve; | ||
|
||
async fn shutdown() { | ||
let mut term = signal::unix::signal(signal::unix::SignalKind::terminate()) | ||
.expect("failed to register SIGTERM handler"); | ||
|
||
let mut interrupt = signal::unix::signal(signal::unix::SignalKind::interrupt()) | ||
.expect("failed to register SIGINT handler"); | ||
|
||
tokio::select! { | ||
_ = term.recv() => {}, | ||
_ = interrupt.recv() => {}, | ||
}; | ||
|
||
tracing::info!("Shutting down gracefully..."); | ||
} | ||
|
||
#[tokio::main] | ||
async fn main() { | ||
let config = Config::init_from_env().expect("Invalid configuration:"); | ||
|
||
// Basic logging for now: | ||
// - stdout with a level configured by the RUST_LOG envvar (default=ERROR) | ||
let log_layer = tracing_subscriber::fmt::layer().with_filter(EnvFilter::from_default_env()); | ||
tracing_subscriber::registry().with(log_layer).init(); | ||
|
||
// Open the TCP port and start the server | ||
let listener = tokio::net::TcpListener::bind(config.address) | ||
.await | ||
.expect("could not bind port"); | ||
serve(config, listener, shutdown()).await | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
use std::time::Duration; | ||
|
||
use anyhow::Result; | ||
use async_trait::async_trait; | ||
use redis::AsyncCommands; | ||
use tokio::time::timeout; | ||
|
||
// average for all commands is <10ms, check grafana | ||
const REDIS_TIMEOUT_MILLISECS: u64 = 10; | ||
|
||
/// A simple redis wrapper | ||
/// Copied from capture/src/redis.rs. | ||
/// TODO: Modify this to support hincrby, get, and set commands. | ||
|
||
#[async_trait] | ||
pub trait Client { | ||
// A very simplified wrapper, but works for our usage | ||
async fn zrangebyscore(&self, k: String, min: String, max: String) -> Result<Vec<String>>; | ||
} | ||
|
||
pub struct RedisClient { | ||
client: redis::Client, | ||
} | ||
|
||
impl RedisClient { | ||
pub fn new(addr: String) -> Result<RedisClient> { | ||
let client = redis::Client::open(addr)?; | ||
|
||
Ok(RedisClient { client }) | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl Client for RedisClient { | ||
async fn zrangebyscore(&self, k: String, min: String, max: String) -> Result<Vec<String>> { | ||
let mut conn = self.client.get_async_connection().await?; | ||
|
||
let results = conn.zrangebyscore(k, min, max); | ||
let fut = timeout(Duration::from_secs(REDIS_TIMEOUT_MILLISECS), results).await?; | ||
|
||
Ok(fut?) | ||
} | ||
} | ||
|
||
// TODO: Find if there's a better way around this. | ||
#[derive(Clone)] | ||
pub struct MockRedisClient { | ||
zrangebyscore_ret: Vec<String>, | ||
} | ||
|
||
impl MockRedisClient { | ||
pub fn new() -> MockRedisClient { | ||
MockRedisClient { | ||
zrangebyscore_ret: Vec::new(), | ||
} | ||
} | ||
|
||
pub fn zrangebyscore_ret(&mut self, ret: Vec<String>) -> Self { | ||
self.zrangebyscore_ret = ret; | ||
|
||
self.clone() | ||
} | ||
} | ||
|
||
impl Default for MockRedisClient { | ||
fn default() -> Self { | ||
Self::new() | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl Client for MockRedisClient { | ||
// A very simplified wrapper, but works for our usage | ||
async fn zrangebyscore(&self, _k: String, _min: String, _max: String) -> Result<Vec<String>> { | ||
Ok(self.zrangebyscore_ret.clone()) | ||
} | ||
} |
Oops, something went wrong.