From 0ad546edadcf0e41eec8885d858c38bc96cb0cae Mon Sep 17 00:00:00 2001 From: Tim Dikland Date: Mon, 29 Jan 2024 22:13:26 +0100 Subject: [PATCH 1/6] add ShareStore trait --- src/server.rs | 1 + src/server/catalog/file/example-shares.yml | 38 ++++ src/server/catalog/file/mod.rs | 215 +++++++++++++++++++++ src/server/catalog/mod.rs | 63 ++++++ src/server/catalog/postgres/mod.rs | 124 ++++++++++++ src/server/routers.rs | 26 +++ src/server/routers/shares.rs | 53 ++--- 7 files changed, 478 insertions(+), 42 deletions(-) create mode 100644 src/server/catalog/file/example-shares.yml create mode 100644 src/server/catalog/file/mod.rs create mode 100644 src/server/catalog/mod.rs create mode 100644 src/server/catalog/postgres/mod.rs diff --git a/src/server.rs b/src/server.rs index d29f829..8868f7f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,4 +1,5 @@ mod api_doc; +mod catalog; mod entities; mod middlewares; mod repositories; diff --git a/src/server/catalog/file/example-shares.yml b/src/server/catalog/file/example-shares.yml new file mode 100644 index 0000000..71cbfdf --- /dev/null +++ b/src/server/catalog/file/example-shares.yml @@ -0,0 +1,38 @@ +shares: + - name: "share1" + schemas: + - name: "schema1" + tables: + - name: "table1" + # S3. See https://github.com/delta-io/delta-sharing#s3 for how to config the credentials + location: "s3a:///" + id: "00000000-0000-0000-0000-000000000000" + - name: "table2" + # Azure Blob Storage. See https://github.com/delta-io/delta-sharing#azure-blob-storage for how to config the credentials + location: "wasbs://@" + id: "00000000-0000-0000-0000-000000000001" + - name: "share2" + schemas: + - name: "schema2" + tables: + - name: "table3" + # Azure Data Lake Storage Gen2. See https://github.com/delta-io/delta-sharing#azure-data-lake-storage-gen2 for how to config the credentials + location: "abfss://@" + historyShared: true + id: "00000000-0000-0000-0000-000000000002" + - name: "share3" + schemas: + - name: "schema3" + tables: + - name: "table4" + # Google Cloud Storage (GCS). See https://github.com/delta-io/delta-sharing#google-cloud-storage for how to config the credentials + location: "gs:///" + id: "00000000-0000-0000-0000-000000000003" + - name: "share4" + schemas: + - name: "schema4" + tables: + - name: "table5" + # Cloudflare R2. See https://github.com/delta-io/delta-sharing#cloudflare-r2 for how to config the credentials + location: "s3a:///" + id: "00000000-0000-0000-0000-000000000004" diff --git a/src/server/catalog/file/mod.rs b/src/server/catalog/file/mod.rs new file mode 100644 index 0000000..d98bf6f --- /dev/null +++ b/src/server/catalog/file/mod.rs @@ -0,0 +1,215 @@ +use serde::Deserialize; +use std::path::Path; + +use crate::server::services::{error::Error, schema::Schema, share::Share, table::Table}; + +use super::{Page, Pagination, ShareStore}; + +#[derive(Debug, Clone, Hash, PartialEq, Eq, Deserialize)] +struct ShareFile { + shares: Vec, +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq, Deserialize)] +struct RawShare { + name: String, + schemas: Vec, +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq, Deserialize)] +struct RawSchema { + name: String, + tables: Vec, +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq, Deserialize)] +struct RawTable { + name: String, + location: String, + id: String, +} + +#[derive(Debug)] +pub struct YamlShareStore { + store: ShareFile, +} + +impl YamlShareStore { + pub fn from_file>(path: P) -> Result> { + let file = std::fs::File::open(path)?; + let shares: ShareFile = serde_yaml::from_reader(file)?; + let yss = YamlShareStore { store: shares }; + Ok(yss) + } +} + +#[async_trait::async_trait] +impl ShareStore for YamlShareStore { + async fn list_shares(&self, _pagination: &Pagination) -> Result, Error> { + let shares = self + .store + .shares + .iter() + .map(|s| Share { + id: s.name.clone(), + name: s.name.clone(), + }) + .collect::>(); + + Ok(Page { + items: shares, + next_page_token: None, + }) + } + + async fn get_share(&self, name: &str) -> Result, Error> { + let share = self + .store + .shares + .iter() + .find(|s| s.name == name) + .map(|s| Share { + id: s.name.clone(), + name: s.name.clone(), + }); + + Ok(share) + } + + async fn list_schemas( + &self, + share: &str, + _pagination: &Pagination, + ) -> Result, Error> { + let share = self + .store + .shares + .iter() + .find(|s| s.name == share) + .ok_or_else(|| Error::NotFound)?; + + let schemas = share + .schemas + .iter() + .map(|s| Schema { + id: s.name.clone(), + name: s.name.clone(), + }) + .collect::>(); + + Ok(Page { + items: schemas, + next_page_token: None, + }) + } + + async fn list_tables_in_share( + &self, + share: &str, + _pagination: &Pagination, + ) -> Result, Error> { + let share = self + .store + .shares + .iter() + .find(|s| s.name == share) + .ok_or_else(|| Error::NotFound)?; + + let tables = share + .schemas + .iter() + .flat_map(|s| { + s.tables + .iter() + .map(|t| Table { + id: t.id.clone(), + name: t.name.clone(), + location: t.location.clone(), + }) + .collect::>() + }) + .collect::>(); + + Ok(Page { + items: tables, + next_page_token: None, + }) + } + + async fn list_tables_in_schema( + &self, + share: &str, + schema: &str, + _pagination: &Pagination, + ) -> Result, Error> { + let share = self + .store + .shares + .iter() + .find(|s| s.name == share) + .ok_or_else(|| Error::NotFound)?; + + let schema = share + .schemas + .iter() + .find(|s| s.name == schema) + .ok_or_else(|| Error::NotFound)?; + + let tables = schema + .tables + .iter() + .map(|t| Table { + id: t.id.clone(), + name: t.name.clone(), + location: t.location.clone(), + }) + .collect::>(); + + Ok(Page { + items: tables, + next_page_token: None, + }) + } + + async fn get_table(&self, share: &str, schema: &str, table: &str) -> Result { + let share = self + .store + .shares + .iter() + .find(|s| s.name == share) + .ok_or_else(|| Error::NotFound)?; + + let schema = share + .schemas + .iter() + .find(|s| s.name == schema) + .ok_or_else(|| Error::NotFound)?; + + let table = schema + .tables + .iter() + .find(|t| t.name == table) + .ok_or_else(|| Error::NotFound)?; + + Ok(Table { + id: table.id.clone(), + name: table.name.clone(), + location: table.location.clone(), + }) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[tokio::test] + async fn test_list_shares() { + let yss = + YamlShareStore::from_file("./src/server/catalog/file/example-shares.yml").unwrap(); + let shares = yss.list_shares(&Pagination::default()).await.unwrap(); + + assert_eq!(shares.items.len(), 4); + assert_eq!(shares.next_page_token, None); + } +} diff --git a/src/server/catalog/mod.rs b/src/server/catalog/mod.rs new file mode 100644 index 0000000..9948e39 --- /dev/null +++ b/src/server/catalog/mod.rs @@ -0,0 +1,63 @@ +use super::services::{error::Error, schema::Schema, share::Share, table::Table}; + +mod file; +mod postgres; + +#[async_trait::async_trait] +pub trait ShareStore: Send + Sync { + async fn list_shares(&self, pagination: &Pagination) -> Result, Error>; + + async fn get_share(&self, name: &str) -> Result, Error>; + + async fn list_schemas( + &self, + share: &str, + pagination: &Pagination, + ) -> Result, Error>; + + async fn list_tables_in_share( + &self, + share: &str, + pagination: &Pagination, + ) -> Result, Error>; + + async fn list_tables_in_schema( + &self, + share: &str, + schema: &str, + pagination: &Pagination, + ) -> Result, Error>; + + async fn get_table(&self, share: &str, schema: &str, table: &str) -> Result; +} + +#[derive(Debug, Clone, Default)] +pub struct Pagination { + max_results: Option, + page_token: Option, +} + +impl Pagination { + pub fn max_results(&self) -> Option { + self.max_results + } + + pub fn page_token(&self) -> Option<&str> { + self.page_token.as_deref() + } +} + +pub struct Page { + items: Vec, + next_page_token: Option, +} + +impl Page { + pub fn items(&self) -> &[T] { + &self.items + } + + pub fn next_page_token(&self) -> Option<&str> { + self.next_page_token.as_deref() + } +} diff --git a/src/server/catalog/postgres/mod.rs b/src/server/catalog/postgres/mod.rs new file mode 100644 index 0000000..8cece49 --- /dev/null +++ b/src/server/catalog/postgres/mod.rs @@ -0,0 +1,124 @@ +use anyhow::{Context, Result}; +use sqlx::{Execute, PgPool, Postgres, QueryBuilder}; + +use crate::server::services::{error::Error, schema::Schema, share::Share, table::Table}; + +use super::{Page, Pagination, ShareStore}; + +pub struct PostgresShareStore { + pool: PgPool, +} + +impl PostgresShareStore { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } + + pub async fn query(&self, limit: Option, after: Option<&str>) -> Result> { + let mut conn = self + .pool + .acquire() + .await + .context("failed to acquire postgres connection")?; + let mut builder: QueryBuilder<'_, Postgres> = QueryBuilder::new( + "SELECT + id::text, + name + FROM share", + ); + if let Some(name) = after { + builder.push(" WHERE name >= "); + builder.push_bind(name); + } + builder.push(" ORDER BY name "); + if let Some(limit) = limit { + builder.push(" LIMIT "); + builder.push_bind(limit as i32); + } + let mut query = sqlx::query_as::<_, Share>(builder.build().sql()); + if let Some(name) = after { + query = query.bind(name); + } + if let Some(limit) = limit { + query = query.bind(limit as i32); + } + let rows: Vec = query + .fetch_all(&mut *conn) + .await + .context("failed to list shares from [share]")?; + Ok(rows) + } + + pub async fn query_by_name(&self, name: &str) -> Result> { + let mut conn = self + .pool + .acquire() + .await + .context("failed to acquire postgres connection")?; + let row: Option = sqlx::query_as::<_, Share>( + "SELECT + id::text, + name + FROM share + WHERE name = $1", + ) + .bind(name) + .fetch_optional(&mut *conn) + .await + .context(format!(r#"failed to select "{}" from [share]"#, name))?; + Ok(row) + } +} + +#[async_trait::async_trait] +impl ShareStore for PostgresShareStore { + async fn list_shares(&self, pagination: &Pagination) -> Result, Error> { + let limit = pagination.max_results().unwrap_or(500); + let after = pagination.page_token(); + + let shares = self.query(Some(limit), after).await?; + let next = if shares.len() < limit as usize { + None + } else { + shares.last().map(|s| s.name.clone()) + }; + + Ok(Page { + items: shares, + next_page_token: next, + }) + } + + async fn get_share(&self, name: &str) -> Result, Error> { + unimplemented!() + } + + async fn list_schemas( + &self, + share: &str, + pagination: &Pagination, + ) -> Result, Error> { + unimplemented!() + } + + async fn list_tables_in_share( + &self, + share: &str, + pagination: &Pagination, + ) -> Result, Error> { + unimplemented!() + } + + async fn list_tables_in_schema( + &self, + share: &str, + schema: &str, + pagination: &Pagination, + ) -> Result, Error> { + unimplemented!() + } + + async fn get_table(&self, share: &str, schema: &str, table: &str) -> Result { + unimplemented!() + } +} diff --git a/src/server/routers.rs b/src/server/routers.rs index f5e6d05..e9eda77 100644 --- a/src/server/routers.rs +++ b/src/server/routers.rs @@ -23,8 +23,33 @@ use crate::server::api_doc::ApiDoc; use crate::server::middlewares::jwt; use crate::server::services::error::Error; +use super::services::share::Share; + +pub struct Pagination { + max_results: Option, + page_token: Option, +} + +pub struct Page { + items: Vec, + next_page_token: Option, +} + +pub trait ShareStore: Send + Sync { + fn list(&self, pagination: &Pagination) -> Result>; +} + +pub struct DefaultShareStore; + +impl ShareStore for DefaultShareStore { + fn list(&self, pagination: &Pagination) -> Result> { + unimplemented!() + } +} + pub struct State { pub pg_pool: PgPool, + pub state_store: Arc, pub gcp_service_account: Option, pub aws_credentials: Option, pub azure_credentials: Option, @@ -44,6 +69,7 @@ async fn route( ) -> Result { let state = Arc::new(State { pg_pool, + state_store: Arc::new(DefaultShareStore), gcp_service_account, aws_credentials, azure_credentials, diff --git a/src/server/routers/shares.rs b/src/server/routers/shares.rs index 6a11a67..b3e0bc9 100644 --- a/src/server/routers/shares.rs +++ b/src/server/routers/shares.rs @@ -10,6 +10,7 @@ use utoipa::IntoParams; use utoipa::ToSchema; use crate::server::entities::share::Name as ShareName; +use crate::server::routers::Pagination; use crate::server::routers::SharedState; use crate::server::services::error::Error; use crate::server::services::share::Service as ShareService; @@ -73,7 +74,7 @@ pub async fn get( #[derive(Debug, serde::Deserialize, IntoParams)] #[serde(rename_all = "camelCase")] pub struct SharesListQuery { - pub max_results: Option, + pub max_results: Option, pub page_token: Option, } @@ -104,48 +105,16 @@ pub async fn list( Extension(state): Extension, Query(query): Query, ) -> Result { - let limit = if let Some(limit) = &query.max_results { - let Ok(limit) = usize::try_from(*limit) else { - tracing::error!("requested limit is malformed"); - return Err(Error::ValidationFailed); - }; - limit - } else { - DEFAULT_PAGE_RESULTS + let pagination = Pagination { + max_results: query.max_results, + page_token: query.page_token, }; - let after = if let Some(name) = &query.page_token { - ShareName::new(name).ok() - } else { - None - }; - let Ok(shares) = - ShareService::query(Some(&((limit + 1) as i64)), after.as_ref(), &state.pg_pool).await - else { - tracing::error!( - "request is not handled correctly due to a server error while selecting shares" - ); - return Err(anyhow!("error occured while selecting share(s)").into()); + let shares = state.state_store.list(&pagination)?; + + let res = SharesListResponse { + items: shares.items, + next_page_token: shares.next_page_token, }; - if shares.len() == limit + 1 { - let next = &shares[limit]; - let shares = &shares[..limit]; - tracing::info!("shares were successfully returned"); - return Ok(( - StatusCode::OK, - Json(SharesListResponse { - items: shares.to_vec(), - next_page_token: next.name.clone().into(), - }), - ) - .into_response()); - } tracing::info!("shares were successfully returned"); - Ok(( - StatusCode::OK, - Json(SharesListResponse { - items: shares, - next_page_token: None, - }), - ) - .into_response()) + Ok((StatusCode::OK, Json(res)).into_response()) } From b8a5a662430f139df8b0a867865d75217e407344 Mon Sep 17 00:00:00 2001 From: Tim Dikland Date: Tue, 30 Jan 2024 16:01:22 +0100 Subject: [PATCH 2/6] updates handlers --- src/server/catalog/mod.rs | 7 + src/server/catalog/postgres/mod.rs | 294 +++++++++++++++++++++++- src/server/routers.rs | 51 ++-- src/server/routers/shares.rs | 24 +- src/server/routers/shares/all_tables.rs | 83 ++----- src/server/routers/shares/schemas.rs | 82 ++----- 6 files changed, 382 insertions(+), 159 deletions(-) diff --git a/src/server/catalog/mod.rs b/src/server/catalog/mod.rs index 9948e39..eb982bb 100644 --- a/src/server/catalog/mod.rs +++ b/src/server/catalog/mod.rs @@ -38,6 +38,13 @@ pub struct Pagination { } impl Pagination { + pub fn new(max_results: Option, page_token: Option) -> Self { + Self { + max_results, + page_token, + } + } + pub fn max_results(&self) -> Option { self.max_results } diff --git a/src/server/catalog/postgres/mod.rs b/src/server/catalog/postgres/mod.rs index 8cece49..5a873c8 100644 --- a/src/server/catalog/postgres/mod.rs +++ b/src/server/catalog/postgres/mod.rs @@ -1,7 +1,12 @@ use anyhow::{Context, Result}; use sqlx::{Execute, PgPool, Postgres, QueryBuilder}; -use crate::server::services::{error::Error, schema::Schema, share::Share, table::Table}; +use crate::server::services::{ + error::Error, + schema::{Schema, SchemaDetail}, + share::Share, + table::{Table, TableDetail}, +}; use super::{Page, Pagination, ShareStore}; @@ -68,6 +73,213 @@ impl PostgresShareStore { .context(format!(r#"failed to select "{}" from [share]"#, name))?; Ok(row) } + + pub async fn query_by_share_name( + &self, + share_name: &str, + limit: Option, + after: Option<&str>, + ) -> Result> { + let mut conn = self + .pool + .acquire() + .await + .context("failed to acquire postgres connection")?; + let mut builder: QueryBuilder<'_, Postgres> = QueryBuilder::new( + r#"WITH these_schemas AS ( + SELECT + DISTINCT "schema".name AS name, share.name AS share + FROM "schema" + LEFT JOIN share ON share.id = "schema".share_id + WHERE share.name = "#, + ); + builder.push_bind(share_name); + builder.push( + " + ) + SELECT + name, + share + FROM these_schemas", + ); + if let Some(name) = after { + builder.push(" WHERE name >= "); + builder.push_bind(name); + } + builder.push(" ORDER BY name "); + if let Some(limit) = limit { + builder.push(" LIMIT "); + builder.push_bind(limit as i64); + } + let mut query = sqlx::query_as::<_, SchemaDetail>(builder.build().sql()); + query = query.bind(share_name); + if let Some(name) = after { + query = query.bind(name); + } + if let Some(limit) = limit { + query = query.bind(limit as i64); + } + let rows: Vec = query + .fetch_all(&mut *conn) + .await + .context("failed to list schemas from [schema]")?; + Ok(rows) + } + + pub async fn query_table_by_share_name( + &self, + share_name: &str, + limit: Option, + after: Option<&str>, + ) -> Result> { + let mut conn = self + .pool + .acquire() + .await + .context("failed to acquire postgres connection")?; + let mut builder: QueryBuilder<'_, Postgres> = QueryBuilder::new( + r#"WITH these_tables AS ( + SELECT + "table".id AS id, + share.id AS share_id, + "table".name AS name, + "schema".name AS schema, + share.name AS share + FROM "table" + LEFT JOIN "schema" ON "schema".id = "table".schema_id + LEFT JOIN share ON share.id = "schema".share_id + WHERE share.name = "#, + ); + builder.push_bind(share_name); + builder.push( + " + ) + SELECT + name, + schema, + share + FROM these_tables", + ); + if let Some(name) = after { + builder.push(" WHERE name >= "); + builder.push_bind(name); + } + builder.push(" ORDER BY name "); + if let Some(limit) = limit { + builder.push(" LIMIT "); + builder.push_bind(limit as i64); + } + let mut query = sqlx::query_as::<_, TableDetail>(builder.build().sql()); + query = query.bind(share_name); + if let Some(name) = after { + query = query.bind(name); + } + if let Some(limit) = limit { + query = query.bind(limit as i64); + } + let rows: Vec = query + .fetch_all(&mut *conn) + .await + .context("failed to list tables from [table]")?; + Ok(rows) + } + + pub async fn query_by_fqn( + &self, + share_name: &str, + schema_name: &str, + table_name: &str, + ) -> Result> { + let mut conn = self + .pool + .acquire() + .await + .context("failed to acquire postgres connection")?; + let row: Option = sqlx::query_as::<_, Table>( + r#"SELECT + "table".id::text AS id, + "table".name AS name, + "table".location AS location + FROM "table" + LEFT JOIN "schema" ON "schema".id = "table".schema_id + LEFT JOIN share ON share.id = "schema".share_id + WHERE share.name = $1 AND "schema".name = $2 AND "table".name = $3"#, + ) + .bind(share_name) + .bind(schema_name) + .bind(table_name) + .fetch_optional(&mut *conn) + .await + .context(format!( + r#"failed to select "{}"/"{}"/"{}" from [table]"#, + share_name, schema_name, table_name, + ))?; + Ok(row) + } + + pub async fn query_by_share_and_schema_name( + &self, + share_name: &str, + schema_name: &str, + limit: Option, + after: Option<&str>, + ) -> Result> { + let mut conn = self + .pool + .acquire() + .await + .context("failed to acquire postgres connection")?; + let mut builder: QueryBuilder<'_, Postgres> = QueryBuilder::new( + r#"WITH these_tables AS ( + SELECT + "table".id AS id, + share.id AS share_id, + "table".name AS name, + "schema".name AS schema, + share.name AS share + FROM "table" + LEFT JOIN "schema" ON "schema".id = "table".schema_id + LEFT JOIN share ON share.id = "schema".share_id + WHERE share.name = "#, + ); + builder.push_bind(share_name); + builder.push(r#" AND "schema".name = "#); + builder.push_bind(schema_name); + builder.push( + " + ) + SELECT + id::text, + share_id::text, + name, + schema, + share + FROM these_tables", + ); + if let Some(name) = after { + builder.push(" WHERE name >= "); + builder.push_bind(name); + } + builder.push(" ORDER BY name "); + if let Some(limit) = limit { + builder.push(" LIMIT "); + builder.push_bind(limit as i64); + } + let mut query = sqlx::query_as::<_, TableDetail>(builder.build().sql()); + query = query.bind(share_name); + query = query.bind(schema_name); + if let Some(name) = after { + query = query.bind(name); + } + if let Some(limit) = limit { + query = query.bind(limit as i64); + } + let rows: Vec = query + .fetch_all(&mut *conn) + .await + .context("failed to list tables from [table]")?; + Ok(rows) + } } #[async_trait::async_trait] @@ -90,7 +302,8 @@ impl ShareStore for PostgresShareStore { } async fn get_share(&self, name: &str) -> Result, Error> { - unimplemented!() + let share = self.query_by_name(name).await?; + Ok(share) } async fn list_schemas( @@ -98,7 +311,26 @@ impl ShareStore for PostgresShareStore { share: &str, pagination: &Pagination, ) -> Result, Error> { - unimplemented!() + let limit = pagination.max_results().unwrap_or(500); + let after = pagination.page_token(); + + let schemas = self.query_by_share_name(share, Some(limit), after).await?; + let next = if schemas.len() < limit as usize { + None + } else { + schemas.last().map(|s| s.name.clone()) + }; + + Ok(Page { + items: schemas + .into_iter() + .map(|s| Schema { + name: s.name, + id: String::new(), + }) + .collect(), + next_page_token: next, + }) } async fn list_tables_in_share( @@ -106,7 +338,29 @@ impl ShareStore for PostgresShareStore { share: &str, pagination: &Pagination, ) -> Result, Error> { - unimplemented!() + let limit = pagination.max_results().unwrap_or(500); + let after = pagination.page_token(); + + let tables = self + .query_table_by_share_name(share, Some(limit), after) + .await?; + let next = if tables.len() < limit as usize { + None + } else { + tables.last().map(|s| s.name.clone()) + }; + + Ok(Page { + items: tables + .into_iter() + .map(|t: TableDetail| Table { + id: String::new(), + name: t.name, + location: String::new(), + }) + .collect(), + next_page_token: next, + }) } async fn list_tables_in_schema( @@ -115,10 +369,38 @@ impl ShareStore for PostgresShareStore { schema: &str, pagination: &Pagination, ) -> Result, Error> { - unimplemented!() + let limit = pagination.max_results().unwrap_or(500); + let after = pagination.page_token(); + + let tables = self + .query_by_share_and_schema_name(share, schema, Some(limit), after) + .await?; + let next = if tables.len() < limit as usize { + None + } else { + tables.last().map(|s| s.name.clone()) + }; + + Ok(Page { + items: tables + .into_iter() + .map(|t: TableDetail| Table { + id: String::new(), + name: t.name, + location: String::new(), + }) + .collect(), + next_page_token: next, + }) } async fn get_table(&self, share: &str, schema: &str, table: &str) -> Result { - unimplemented!() + let table = self.query_by_fqn(share, schema, table).await?; + + if let Some(t) = table { + Ok(t) + } else { + Err(Error::NotFound) + } } } diff --git a/src/server/routers.rs b/src/server/routers.rs index e9eda77..efcf195 100644 --- a/src/server/routers.rs +++ b/src/server/routers.rs @@ -23,26 +23,49 @@ use crate::server::api_doc::ApiDoc; use crate::server::middlewares::jwt; use crate::server::services::error::Error; +use super::catalog::{Page, Pagination, ShareStore}; +use super::services::schema::Schema; use super::services::share::Share; +use super::services::table::Table; -pub struct Pagination { - max_results: Option, - page_token: Option, -} +struct DefaultShareStore; -pub struct Page { - items: Vec, - next_page_token: Option, -} +#[async_trait::async_trait] +impl ShareStore for DefaultShareStore { + async fn list_shares(&self, pagination: &Pagination) -> Result, Error> { + unimplemented!() + } -pub trait ShareStore: Send + Sync { - fn list(&self, pagination: &Pagination) -> Result>; -} + async fn get_share(&self, name: &str) -> Result, Error> { + unimplemented!() + } -pub struct DefaultShareStore; + async fn list_schemas( + &self, + share: &str, + pagination: &Pagination, + ) -> Result, Error> { + unimplemented!() + } -impl ShareStore for DefaultShareStore { - fn list(&self, pagination: &Pagination) -> Result> { + async fn list_tables_in_share( + &self, + share: &str, + pagination: &Pagination, + ) -> Result, Error> { + unimplemented!() + } + + async fn list_tables_in_schema( + &self, + share: &str, + schema: &str, + pagination: &Pagination, + ) -> Result, Error> { + unimplemented!() + } + + async fn get_table(&self, share: &str, schema: &str, table: &str) -> Result { unimplemented!() } } diff --git a/src/server/routers/shares.rs b/src/server/routers/shares.rs index b3e0bc9..585b96b 100644 --- a/src/server/routers/shares.rs +++ b/src/server/routers/shares.rs @@ -13,7 +13,6 @@ use crate::server::entities::share::Name as ShareName; use crate::server::routers::Pagination; use crate::server::routers::SharedState; use crate::server::services::error::Error; -use crate::server::services::share::Service as ShareService; use crate::server::services::share::Share; pub mod all_tables; @@ -53,20 +52,16 @@ pub async fn get( Extension(state): Extension, Path(params): Path, ) -> Result { - let Ok(share) = ShareName::new(params.share) else { + let Ok(share_name) = ShareName::new(params.share) else { tracing::error!("requested share data is malformed"); return Err(Error::ValidationFailed); }; - let Ok(share) = ShareService::query_by_name(&share, &state.pg_pool).await else { - tracing::error!( - "request is not handled correctly due to a server error while selecting share" - ); - return Err(anyhow!("error occured while selecting share").into()); - }; - let Some(share) = share else { + + let Some(share) = state.state_store.get_share(&share_name.as_str()).await? else { tracing::error!("requested share does not exist"); return Err(Error::NotFound); }; + tracing::info!("share's metadata was successfully returned"); Ok((StatusCode::OK, Json(SharesGetResponse { share })).into_response()) } @@ -105,15 +100,12 @@ pub async fn list( Extension(state): Extension, Query(query): Query, ) -> Result { - let pagination = Pagination { - max_results: query.max_results, - page_token: query.page_token, - }; - let shares = state.state_store.list(&pagination)?; + let pagination = Pagination::new(query.max_results, query.page_token); + let shares = state.state_store.list_shares(&pagination).await?; let res = SharesListResponse { - items: shares.items, - next_page_token: shares.next_page_token, + items: shares.items().to_vec(), + next_page_token: shares.next_page_token().map(ToOwned::to_owned), }; tracing::info!("shares were successfully returned"); Ok((StatusCode::OK, Json(res)).into_response()) diff --git a/src/server/routers/shares/all_tables.rs b/src/server/routers/shares/all_tables.rs index 508a418..8f79149 100644 --- a/src/server/routers/shares/all_tables.rs +++ b/src/server/routers/shares/all_tables.rs @@ -1,4 +1,3 @@ -use anyhow::anyhow; use axum::extract::Extension; use axum::extract::Json; use axum::extract::Path; @@ -9,12 +8,10 @@ use axum::response::Response; use utoipa::IntoParams; use utoipa::ToSchema; -use crate::server::entities::share::Entity as ShareEntity; +use crate::server::catalog::Pagination; use crate::server::entities::share::Name as ShareName; -use crate::server::entities::table::Name as TableName; use crate::server::routers::SharedState; use crate::server::services::error::Error; -use crate::server::services::table::Service as TableService; use crate::server::services::table::TableDetail; const DEFAULT_PAGE_RESULTS: usize = 10; @@ -28,7 +25,7 @@ pub struct SharesAllTablesListParams { #[derive(Debug, serde::Deserialize, IntoParams)] #[serde(rename_all = "camelCase")] pub struct SharesAllTablesListQuery { - pub max_results: Option, + pub max_results: Option, pub page_token: Option, } @@ -68,63 +65,25 @@ pub async fn list( tracing::error!("requested share data is malformed"); return Err(Error::ValidationFailed); }; - let Ok(share) = ShareEntity::load(&share, &state.pg_pool).await else { - tracing::error!( - "request is not handled correctly due to a server error while selecting share" - ); - return Err(anyhow!("error occured while selecting share").into()); - }; - let Some(share) = share else { - tracing::error!("requested share does not exist"); - return Err(Error::NotFound); - }; - let limit = if let Some(limit) = &query.max_results { - let Ok(limit) = usize::try_from(*limit) else { - tracing::error!("requested limit is malformed"); - return Err(Error::ValidationFailed); - }; - limit - } else { - DEFAULT_PAGE_RESULTS - }; - let after = if let Some(name) = &query.page_token { - TableName::new(name).ok() - } else { - None - }; - let Ok(tables) = TableService::query_by_share_name( - share.name(), - Some(&((limit + 1) as i64)), - after.as_ref(), - &state.pg_pool, - ) - .await - else { - tracing::error!( - "request is not handled correctly due to a server error while selecting tables" - ); - return Err(anyhow!("error occured while selecting tables(s)").into()); + + let pagination = Pagination::new(query.max_results, query.page_token); + let tables = state + .state_store + .list_tables_in_share(share.as_str(), &pagination) + .await?; + + let res = SharesAllTablesListResponse { + items: tables + .items() + .iter() + .map(|t| TableDetail { + name: t.name.to_string(), + schema: String::new(), + share: String::new(), + }) + .collect::>(), + next_page_token: tables.next_page_token().map(|s| s.to_string()), }; - if tables.len() == limit + 1 { - let next = &tables[limit]; - let tables = &tables[..limit]; - tracing::info!("tables were successfully returned"); - return Ok(( - StatusCode::OK, - Json(SharesAllTablesListResponse { - items: tables.to_vec(), - next_page_token: next.name.clone().into(), - }), - ) - .into_response()); - } tracing::info!("tables were successfully returned"); - Ok(( - StatusCode::OK, - Json(SharesAllTablesListResponse { - items: tables, - next_page_token: None, - }), - ) - .into_response()) + Ok((StatusCode::OK, Json(res)).into_response()) } diff --git a/src/server/routers/shares/schemas.rs b/src/server/routers/shares/schemas.rs index 933646e..5c3b375 100644 --- a/src/server/routers/shares/schemas.rs +++ b/src/server/routers/shares/schemas.rs @@ -9,13 +9,11 @@ use axum::response::Response; use utoipa::IntoParams; use utoipa::ToSchema; -use crate::server::entities::schema::Name as SchemaName; -use crate::server::entities::share::Entity as ShareEntity; +use crate::server::catalog::Pagination; use crate::server::entities::share::Name as ShareName; use crate::server::routers::SharedState; use crate::server::services::error::Error; use crate::server::services::schema::SchemaDetail; -use crate::server::services::schema::Service as SchemaService; pub mod tables; @@ -30,7 +28,7 @@ pub struct SharesSchemasListParams { #[derive(Debug, serde::Deserialize, IntoParams)] #[serde(rename_all = "camelCase")] pub struct SharesSchemasListQuery { - pub max_results: Option, + pub max_results: Option, pub page_token: Option, } @@ -70,63 +68,25 @@ pub async fn list( tracing::error!("requested share data is malformed"); return Err(Error::ValidationFailed); }; - let Ok(share) = ShareEntity::load(&share, &state.pg_pool).await else { - tracing::error!( - "request is not handled correctly due to a server error while selecting share" - ); - return Err(anyhow!("error occured while selecting share").into()); - }; - let Some(share) = share else { - tracing::error!("requested share does not exist"); - return Err(Error::NotFound); - }; - let limit = if let Some(limit) = &query.max_results { - let Ok(limit) = usize::try_from(*limit) else { - tracing::error!("requested limit is malformed"); - return Err(Error::ValidationFailed); - }; - limit - } else { - DEFAULT_PAGE_RESULTS - }; - let after = if let Some(name) = &query.page_token { - SchemaName::new(name).ok() - } else { - None - }; - let Ok(schemas) = SchemaService::query_by_share_name( - share.name(), - Some(&((limit + 1) as i64)), - after.as_ref(), - &state.pg_pool, - ) - .await - else { - tracing::error!( - "request is not handled correctly due to a server error while selecting schemas" - ); - return Err(anyhow!("error occured while selecting schema(s)").into()); + + let pagination = Pagination::new(query.max_results, query.page_token); + let schemas = state + .state_store + .list_schemas(&share.as_str(), &pagination) + .await?; + + let res = SharesSchemasListResponse { + items: schemas + .items() + .iter() + .map(|s| SchemaDetail { + name: s.name.to_string(), + share: share.to_string(), + }) + .collect(), + next_page_token: schemas.next_page_token().map(|s| s.to_string()), }; - if schemas.len() == limit + 1 { - let next = &schemas[limit]; - let schemas = &schemas[..limit]; - tracing::info!("schemas were successfully returned"); - return Ok(( - StatusCode::OK, - Json(SharesSchemasListResponse { - items: schemas.to_vec(), - next_page_token: next.name.clone().into(), - }), - ) - .into_response()); - } + tracing::info!("schemas were successfully returned"); - Ok(( - StatusCode::OK, - Json(SharesSchemasListResponse { - items: schemas, - next_page_token: None, - }), - ) - .into_response()) + Ok((StatusCode::OK, Json(res)).into_response()) } From 338c0e643c5b52d411d30c1455b08c1c60f1a34f Mon Sep 17 00:00:00 2001 From: Tim Dikland Date: Tue, 30 Jan 2024 21:16:52 +0100 Subject: [PATCH 3/6] add sharestore to main router --- src/server/catalog/mod.rs | 15 +++++++- src/server/mod.rs | 10 +++++ src/server/routers/mod.rs | 49 +++---------------------- src/server/routers/shares.rs | 7 ++-- src/server/routers/shares/all_tables.rs | 2 +- src/server/routers/shares/schemas.rs | 2 +- 6 files changed, 33 insertions(+), 52 deletions(-) diff --git a/src/server/catalog/mod.rs b/src/server/catalog/mod.rs index eb982bb..75a33ed 100644 --- a/src/server/catalog/mod.rs +++ b/src/server/catalog/mod.rs @@ -1,26 +1,33 @@ use super::services::{error::Error, schema::Schema, share::Share, table::Table}; -mod file; -mod postgres; +pub mod file; +pub mod postgres; +/// The `ShareStore` trait defines the interface for fetching metadata about +/// shares, schemas and tables. #[async_trait::async_trait] pub trait ShareStore: Send + Sync { + /// List all shares. async fn list_shares(&self, pagination: &Pagination) -> Result, Error>; + /// Get a share by name. async fn get_share(&self, name: &str) -> Result, Error>; + /// List all schemas in a share. async fn list_schemas( &self, share: &str, pagination: &Pagination, ) -> Result, Error>; + /// List all tables in a share. async fn list_tables_in_share( &self, share: &str, pagination: &Pagination, ) -> Result, Error>; + /// List all tables in a schema. async fn list_tables_in_schema( &self, share: &str, @@ -28,9 +35,11 @@ pub trait ShareStore: Send + Sync { pagination: &Pagination, ) -> Result, Error>; + /// Get a table by name. async fn get_table(&self, share: &str, schema: &str, table: &str) -> Result; } +/// A set of parameters for paginating a list query. #[derive(Debug, Clone, Default)] pub struct Pagination { max_results: Option, @@ -54,6 +63,8 @@ impl Pagination { } } +/// A page of results from a list query. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Page { items: Vec, next_page_token: Option, diff --git a/src/server/mod.rs b/src/server/mod.rs index 0ecff56..52c077e 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -7,6 +7,8 @@ pub(crate) mod routers; mod services; pub(crate) mod utilities; +use std::sync::Arc; + use anyhow::{Context, Result}; use rusoto_credential::AwsCredentials; use rusoto_credential::ProvideAwsCredentials; @@ -32,8 +34,11 @@ use crate::bootstrap; pub use crate::server::middlewares::jwt::Role; use crate::server::routers::AzureLocation; +use self::catalog::ShareStore; + pub struct Server { pg_pool: PgPool, + share_store: Arc, gcp_service_account: Option, aws_credentials: Option, azure_storage_credentials: Option, @@ -44,6 +49,9 @@ impl Server { let pg_pool = bootstrap::new_pg_pool() .await .context("failed to create postgres connection pool")?; + + let share_store = Arc::new(catalog::postgres::PostgresShareStore::new(pg_pool.clone())); + let gcp_service_account = bootstrap::new_gcp_service_account().ok(); if gcp_service_account.is_none() { tracing::warn!("failed to load GCP service account"); @@ -70,6 +78,7 @@ impl Server { Ok(Server { pg_pool, + share_store, gcp_service_account, aws_credentials, azure_storage_credentials, @@ -79,6 +88,7 @@ impl Server { pub async fn start(self) -> Result<()> { routers::bind( self.pg_pool, + self.share_store, self.gcp_service_account, self.aws_credentials, self.azure_storage_credentials, diff --git a/src/server/routers/mod.rs b/src/server/routers/mod.rs index 4d54497..bba6df9 100644 --- a/src/server/routers/mod.rs +++ b/src/server/routers/mod.rs @@ -28,48 +28,6 @@ use super::services::schema::Schema; use super::services::share::Share; use super::services::table::Table; -struct DefaultShareStore; - -#[async_trait::async_trait] -impl ShareStore for DefaultShareStore { - async fn list_shares(&self, pagination: &Pagination) -> Result, Error> { - unimplemented!() - } - - async fn get_share(&self, name: &str) -> Result, Error> { - unimplemented!() - } - - async fn list_schemas( - &self, - share: &str, - pagination: &Pagination, - ) -> Result, Error> { - unimplemented!() - } - - async fn list_tables_in_share( - &self, - share: &str, - pagination: &Pagination, - ) -> Result, Error> { - unimplemented!() - } - - async fn list_tables_in_schema( - &self, - share: &str, - schema: &str, - pagination: &Pagination, - ) -> Result, Error> { - unimplemented!() - } - - async fn get_table(&self, share: &str, schema: &str, table: &str) -> Result { - unimplemented!() - } -} - #[derive(Clone)] pub enum AzureCredential { AccessKey(String), @@ -83,7 +41,7 @@ pub struct AzureLocation { pub struct State { pub pg_pool: PgPool, - pub state_store: Arc, + pub share_store: Arc, pub gcp_service_account: Option, pub aws_credentials: Option, pub azure_credentials: Option, @@ -97,13 +55,14 @@ async fn bad_request(_: Uri) -> std::result::Result { async fn route( pg_pool: PgPool, + share_store: Arc, gcp_service_account: Option, aws_credentials: Option, azure_credentials: Option, ) -> Result { let state = Arc::new(State { pg_pool, - state_store: Arc::new(DefaultShareStore), + share_store, gcp_service_account, aws_credentials, azure_credentials, @@ -189,12 +148,14 @@ async fn route( pub async fn bind( pg_pool: PgPool, + share_store: Arc, gcp_service_account: Option, aws_credentials: Option, azure_credentials: Option, ) -> Result<()> { let app = route( pg_pool, + share_store, gcp_service_account, aws_credentials, azure_credentials, diff --git a/src/server/routers/shares.rs b/src/server/routers/shares.rs index 7b0bcb9..e210de5 100644 --- a/src/server/routers/shares.rs +++ b/src/server/routers/shares.rs @@ -1,4 +1,3 @@ -use anyhow::anyhow; use axum::extract::{Extension, Json, Path, Query}; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; @@ -47,12 +46,12 @@ pub async fn get( Extension(state): Extension, Path(params): Path, ) -> Result { - let Ok(share) = ShareName::try_new(params.share) else { + let Ok(share_name) = ShareName::try_new(params.share) else { tracing::error!("requested share data is malformed"); return Err(Error::ValidationFailed); }; - let Some(share) = state.state_store.get_share(&share_name.as_str()).await? else { + let Some(share) = state.share_store.get_share(&share_name.as_str()).await? else { tracing::error!("requested share does not exist"); return Err(Error::NotFound); }; @@ -96,7 +95,7 @@ pub async fn list( Query(query): Query, ) -> Result { let pagination = Pagination::new(query.max_results, query.page_token); - let shares = state.state_store.list_shares(&pagination).await?; + let shares = state.share_store.list_shares(&pagination).await?; let res = SharesListResponse { items: shares.items().to_vec(), diff --git a/src/server/routers/shares/all_tables.rs b/src/server/routers/shares/all_tables.rs index dd9ecbd..83b6f2d 100644 --- a/src/server/routers/shares/all_tables.rs +++ b/src/server/routers/shares/all_tables.rs @@ -64,7 +64,7 @@ pub async fn list( let pagination = Pagination::new(query.max_results, query.page_token); let tables = state - .state_store + .share_store .list_tables_in_share(share.as_str(), &pagination) .await?; diff --git a/src/server/routers/shares/schemas.rs b/src/server/routers/shares/schemas.rs index 9146dba..a208b14 100644 --- a/src/server/routers/shares/schemas.rs +++ b/src/server/routers/shares/schemas.rs @@ -66,7 +66,7 @@ pub async fn list( let pagination = Pagination::new(query.max_results, query.page_token); let schemas = state - .state_store + .share_store .list_schemas(&share.as_str(), &pagination) .await?; From c1190d3f201ae334cc33b52a1ef3c6de68cd4584 Mon Sep 17 00:00:00 2001 From: Tim Dikland Date: Tue, 30 Jan 2024 21:19:29 +0100 Subject: [PATCH 4/6] fmt --- src/server/catalog/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/catalog/mod.rs b/src/server/catalog/mod.rs index 75a33ed..6c4d7f9 100644 --- a/src/server/catalog/mod.rs +++ b/src/server/catalog/mod.rs @@ -3,7 +3,7 @@ use super::services::{error::Error, schema::Schema, share::Share, table::Table}; pub mod file; pub mod postgres; -/// The `ShareStore` trait defines the interface for fetching metadata about +/// The `ShareStore` trait defines the interface for fetching metadata about /// shares, schemas and tables. #[async_trait::async_trait] pub trait ShareStore: Send + Sync { From a4781d2eff6f1182c550f74ca02ec68a297c53ec Mon Sep 17 00:00:00 2001 From: Tim Dikland Date: Tue, 30 Jan 2024 21:22:07 +0100 Subject: [PATCH 5/6] clippy --- src/server/routers/mod.rs | 8 ++++---- src/server/routers/shares.rs | 2 +- src/server/routers/shares/all_tables.rs | 2 +- src/server/routers/shares/schemas.rs | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/server/routers/mod.rs b/src/server/routers/mod.rs index bba6df9..8583ec7 100644 --- a/src/server/routers/mod.rs +++ b/src/server/routers/mod.rs @@ -23,10 +23,10 @@ use crate::server::api_doc::ApiDoc; use crate::server::middlewares::jwt; use crate::server::services::error::Error; -use super::catalog::{Page, Pagination, ShareStore}; -use super::services::schema::Schema; -use super::services::share::Share; -use super::services::table::Table; +use super::catalog::{Pagination, ShareStore}; + + + #[derive(Clone)] pub enum AzureCredential { diff --git a/src/server/routers/shares.rs b/src/server/routers/shares.rs index e210de5..918539c 100644 --- a/src/server/routers/shares.rs +++ b/src/server/routers/shares.rs @@ -51,7 +51,7 @@ pub async fn get( return Err(Error::ValidationFailed); }; - let Some(share) = state.share_store.get_share(&share_name.as_str()).await? else { + let Some(share) = state.share_store.get_share(share_name.as_str()).await? else { tracing::error!("requested share does not exist"); return Err(Error::NotFound); }; diff --git a/src/server/routers/shares/all_tables.rs b/src/server/routers/shares/all_tables.rs index 83b6f2d..aaed416 100644 --- a/src/server/routers/shares/all_tables.rs +++ b/src/server/routers/shares/all_tables.rs @@ -1,4 +1,4 @@ -use anyhow::anyhow; + use axum::extract::{Extension, Json, Path, Query}; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; diff --git a/src/server/routers/shares/schemas.rs b/src/server/routers/shares/schemas.rs index a208b14..5bfa4a6 100644 --- a/src/server/routers/shares/schemas.rs +++ b/src/server/routers/shares/schemas.rs @@ -1,4 +1,4 @@ -use anyhow::anyhow; + use axum::extract::{Extension, Json, Path, Query}; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; @@ -67,7 +67,7 @@ pub async fn list( let pagination = Pagination::new(query.max_results, query.page_token); let schemas = state .share_store - .list_schemas(&share.as_str(), &pagination) + .list_schemas(share.as_str(), &pagination) .await?; let res = SharesSchemasListResponse { From 32cdf292dd80ad85fd9a05ee436dfbdc410973dc Mon Sep 17 00:00:00 2001 From: Tim Dikland Date: Tue, 30 Jan 2024 21:23:23 +0100 Subject: [PATCH 6/6] fmt - again --- src/server/routers/mod.rs | 3 --- src/server/routers/shares/all_tables.rs | 1 - src/server/routers/shares/schemas.rs | 1 - 3 files changed, 5 deletions(-) diff --git a/src/server/routers/mod.rs b/src/server/routers/mod.rs index 8583ec7..5f13057 100644 --- a/src/server/routers/mod.rs +++ b/src/server/routers/mod.rs @@ -25,9 +25,6 @@ use crate::server::services::error::Error; use super::catalog::{Pagination, ShareStore}; - - - #[derive(Clone)] pub enum AzureCredential { AccessKey(String), diff --git a/src/server/routers/shares/all_tables.rs b/src/server/routers/shares/all_tables.rs index aaed416..0821f2d 100644 --- a/src/server/routers/shares/all_tables.rs +++ b/src/server/routers/shares/all_tables.rs @@ -1,4 +1,3 @@ - use axum::extract::{Extension, Json, Path, Query}; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; diff --git a/src/server/routers/shares/schemas.rs b/src/server/routers/shares/schemas.rs index 5bfa4a6..cede3c9 100644 --- a/src/server/routers/shares/schemas.rs +++ b/src/server/routers/shares/schemas.rs @@ -1,4 +1,3 @@ - use axum::extract::{Extension, Json, Path, Query}; use axum::http::StatusCode; use axum::response::{IntoResponse, Response};