Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ShareStore extension #37

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions src/server/catalog/file/example-shares.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
shares:
- name: "share1"
schemas:
- name: "schema1"
tables:
- name: "table1"
# S3. See https://github.com/delta-io/delta-sharing#s3 for how to config the credentials
location: "s3a://<bucket-name>/<the-table-path>"
id: "00000000-0000-0000-0000-000000000000"
- name: "table2"
# Azure Blob Storage. See https://github.com/delta-io/delta-sharing#azure-blob-storage for how to config the credentials
location: "wasbs://<container-name>@<account-name}.blob.core.windows.net/<the-table-path>"
id: "00000000-0000-0000-0000-000000000001"
- name: "share2"
schemas:
- name: "schema2"
tables:
- name: "table3"
# Azure Data Lake Storage Gen2. See https://github.com/delta-io/delta-sharing#azure-data-lake-storage-gen2 for how to config the credentials
location: "abfss://<container-name>@<account-name}.dfs.core.windows.net/<the-table-path>"
historyShared: true
id: "00000000-0000-0000-0000-000000000002"
- name: "share3"
schemas:
- name: "schema3"
tables:
- name: "table4"
# Google Cloud Storage (GCS). See https://github.com/delta-io/delta-sharing#google-cloud-storage for how to config the credentials
location: "gs://<bucket-name>/<the-table-path>"
id: "00000000-0000-0000-0000-000000000003"
- name: "share4"
schemas:
- name: "schema4"
tables:
- name: "table5"
# Cloudflare R2. See https://github.com/delta-io/delta-sharing#cloudflare-r2 for how to config the credentials
location: "s3a://<bucket-name>/<the-table-path>"
id: "00000000-0000-0000-0000-000000000004"
215 changes: 215 additions & 0 deletions src/server/catalog/file/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
use serde::Deserialize;
use std::path::Path;

use crate::server::services::{error::Error, schema::Schema, share::Share, table::Table};

use super::{Page, Pagination, ShareStore};

#[derive(Debug, Clone, Hash, PartialEq, Eq, Deserialize)]
struct ShareFile {
shares: Vec<RawShare>,
}

#[derive(Debug, Clone, Hash, PartialEq, Eq, Deserialize)]
struct RawShare {
name: String,
schemas: Vec<RawSchema>,
}

#[derive(Debug, Clone, Hash, PartialEq, Eq, Deserialize)]
struct RawSchema {
name: String,
tables: Vec<RawTable>,
}

#[derive(Debug, Clone, Hash, PartialEq, Eq, Deserialize)]
struct RawTable {
name: String,
location: String,
id: String,
}

#[derive(Debug)]
pub struct YamlShareStore {
store: ShareFile,
}

impl YamlShareStore {
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self, Box<dyn std::error::Error>> {
let file = std::fs::File::open(path)?;
let shares: ShareFile = serde_yaml::from_reader(file)?;
let yss = YamlShareStore { store: shares };
Ok(yss)
}
}

#[async_trait::async_trait]
impl ShareStore for YamlShareStore {
async fn list_shares(&self, _pagination: &Pagination) -> Result<Page<Share>, Error> {
let shares = self
.store
.shares
.iter()
.map(|s| Share {
id: s.name.clone(),
name: s.name.clone(),
})
.collect::<Vec<_>>();

Ok(Page {
items: shares,
next_page_token: None,
})
}

async fn get_share(&self, name: &str) -> Result<Option<Share>, Error> {
let share = self
.store
.shares
.iter()
.find(|s| s.name == name)
.map(|s| Share {
id: s.name.clone(),
name: s.name.clone(),
});

Ok(share)
}

async fn list_schemas(
&self,
share: &str,
_pagination: &Pagination,
) -> Result<Page<Schema>, Error> {
let share = self
.store
.shares
.iter()
.find(|s| s.name == share)
.ok_or_else(|| Error::NotFound)?;

let schemas = share
.schemas
.iter()
.map(|s| Schema {
id: s.name.clone(),
name: s.name.clone(),
})
.collect::<Vec<_>>();

Ok(Page {
items: schemas,
next_page_token: None,
})
}

async fn list_tables_in_share(
&self,
share: &str,
_pagination: &Pagination,
) -> Result<Page<Table>, Error> {
let share = self
.store
.shares
.iter()
.find(|s| s.name == share)
.ok_or_else(|| Error::NotFound)?;

let tables = share
.schemas
.iter()
.flat_map(|s| {
s.tables
.iter()
.map(|t| Table {
id: t.id.clone(),
name: t.name.clone(),
location: t.location.clone(),
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();

Ok(Page {
items: tables,
next_page_token: None,
})
}

async fn list_tables_in_schema(
&self,
share: &str,
schema: &str,
_pagination: &Pagination,
) -> Result<Page<Table>, Error> {
let share = self
.store
.shares
.iter()
.find(|s| s.name == share)
.ok_or_else(|| Error::NotFound)?;

let schema = share
.schemas
.iter()
.find(|s| s.name == schema)
.ok_or_else(|| Error::NotFound)?;

let tables = schema
.tables
.iter()
.map(|t| Table {
id: t.id.clone(),
name: t.name.clone(),
location: t.location.clone(),
})
.collect::<Vec<_>>();

Ok(Page {
items: tables,
next_page_token: None,
})
}

async fn get_table(&self, share: &str, schema: &str, table: &str) -> Result<Table, Error> {
let share = self
.store
.shares
.iter()
.find(|s| s.name == share)
.ok_or_else(|| Error::NotFound)?;

let schema = share
.schemas
.iter()
.find(|s| s.name == schema)
.ok_or_else(|| Error::NotFound)?;

let table = schema
.tables
.iter()
.find(|t| t.name == table)
.ok_or_else(|| Error::NotFound)?;

Ok(Table {
id: table.id.clone(),
name: table.name.clone(),
location: table.location.clone(),
})
}
}

#[cfg(test)]
mod test {
use super::*;

#[tokio::test]
async fn test_list_shares() {
let yss =
YamlShareStore::from_file("./src/server/catalog/file/example-shares.yml").unwrap();
let shares = yss.list_shares(&Pagination::default()).await.unwrap();

assert_eq!(shares.items.len(), 4);
assert_eq!(shares.next_page_token, None);
}
}
81 changes: 81 additions & 0 deletions src/server/catalog/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use super::services::{error::Error, schema::Schema, share::Share, table::Table};

pub mod file;
pub mod postgres;

/// The `ShareStore` trait defines the interface for fetching metadata about
/// shares, schemas and tables.
#[async_trait::async_trait]
pub trait ShareStore: Send + Sync {
/// List all shares.
async fn list_shares(&self, pagination: &Pagination) -> Result<Page<Share>, Error>;

/// Get a share by name.
async fn get_share(&self, name: &str) -> Result<Option<Share>, Error>;

/// List all schemas in a share.
async fn list_schemas(
&self,
share: &str,
pagination: &Pagination,
) -> Result<Page<Schema>, Error>;

/// List all tables in a share.
async fn list_tables_in_share(
&self,
share: &str,
pagination: &Pagination,
) -> Result<Page<Table>, Error>;

/// List all tables in a schema.
async fn list_tables_in_schema(
&self,
share: &str,
schema: &str,
pagination: &Pagination,
) -> Result<Page<Table>, Error>;

/// Get a table by name.
async fn get_table(&self, share: &str, schema: &str, table: &str) -> Result<Table, Error>;
}

/// A set of parameters for paginating a list query.
#[derive(Debug, Clone, Default)]
pub struct Pagination {
max_results: Option<u32>,
page_token: Option<String>,
}

impl Pagination {
pub fn new(max_results: Option<u32>, page_token: Option<String>) -> Self {
Self {
max_results,
page_token,
}
}

pub fn max_results(&self) -> Option<u32> {
self.max_results
}

pub fn page_token(&self) -> Option<&str> {
self.page_token.as_deref()
}
}

/// A page of results from a list query.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Page<T> {
items: Vec<T>,
next_page_token: Option<String>,
}

impl<T> Page<T> {
pub fn items(&self) -> &[T] {
&self.items
}

pub fn next_page_token(&self) -> Option<&str> {
self.next_page_token.as_deref()
}
}
Loading
Loading