Skip to content

Commit

Permalink
refactor: ColumnId -> Ulid
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Oct 17, 2024
1 parent a857c8f commit 1853027
Show file tree
Hide file tree
Showing 27 changed files with 524 additions and 286 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ thiserror = { version = "1" }
tokio = { version = "1.36", features = ["full"], optional = true }
tracing = { version = "0.1" }
typetag = { version = "0.2" }
ulid = { version = "1", features = ["serde"] }

[dev-dependencies]
cargo-tarpaulin = { version = "0.27" }
Expand Down
4 changes: 2 additions & 2 deletions src/binder/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::expression::{AliasType, ScalarExpression};
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::value::{DataValue, Utf8Type};
use crate::types::LogicalType;
use crate::types::{ColumnId, LogicalType};

macro_rules! try_alias {
($context:expr, $full_name:expr) => {
Expand Down Expand Up @@ -231,7 +231,7 @@ impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
sub_query: LogicalPlan,
) -> Result<(ScalarExpression, LogicalPlan), DatabaseError> {
let mut alias_column = ColumnCatalog::clone(&column);
alias_column.set_ref_table(self.context.temp_table(), 0);
alias_column.set_ref_table(self.context.temp_table(), ColumnId::new());

let alias_expr = ScalarExpression::Alias {
expr: Box::new(ScalarExpression::ColumnRef(column)),
Expand Down
75 changes: 52 additions & 23 deletions src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,12 @@ pub(crate) fn is_valid_identifier(s: &str) -> bool {
#[cfg(test)]
pub mod test {
use crate::binder::{is_valid_identifier, Binder, BinderContext};
use crate::catalog::{ColumnCatalog, ColumnDesc};
use crate::catalog::{ColumnCatalog, ColumnDesc, TableCatalog};
use crate::errors::DatabaseError;
use crate::planner::LogicalPlan;
use crate::storage::rocksdb::RocksStorage;
use crate::storage::{Storage, TableCache, Transaction};
use crate::types::ColumnId;
use crate::types::LogicalType::Integer;
use crate::utils::lru::ShardingLruCache;
use std::hash::RandomState;
Expand All @@ -395,6 +396,56 @@ pub mod test {
use std::sync::Arc;
use tempfile::TempDir;

pub(crate) struct TableState<S: Storage> {
pub(crate) table: TableCatalog,
pub(crate) table_cache: Arc<TableCache>,
pub(crate) storage: S,
}

impl<S: Storage> TableState<S> {
pub(crate) fn plan<T: AsRef<str>>(&self, sql: T) -> Result<LogicalPlan, DatabaseError> {
let scala_functions = Default::default();
let table_functions = Default::default();
let transaction = self.storage.transaction()?;
let mut binder = Binder::new(
BinderContext::new(
&self.table_cache,
&transaction,
&scala_functions,
&table_functions,
Arc::new(AtomicUsize::new(0)),
),
None,
);
let stmt = crate::parser::parse_sql(sql)?;

Ok(binder.bind(&stmt[0])?)
}

pub(crate) fn column_id_by_name(&self, name: &str) -> &ColumnId {
self.table.get_column_id_by_name(name).unwrap()
}
}

pub(crate) fn build_t1_table() -> Result<TableState<RocksStorage>, DatabaseError> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let storage = build_test_catalog(&table_cache, temp_dir.path())?;
let table = {
let transaction = storage.transaction()?;
transaction
.table(&table_cache, Arc::new("t1".to_string()))
.unwrap()
.clone()
};

Ok(TableState {
table,
table_cache,
storage,
})
}

pub(crate) fn build_test_catalog(
table_cache: &TableCache,
path: impl Into<PathBuf> + Send,
Expand Down Expand Up @@ -443,28 +494,6 @@ pub mod test {
Ok(storage)
}

pub fn select_sql_run<S: AsRef<str>>(sql: S) -> Result<LogicalPlan, DatabaseError> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let table_cache = Arc::new(ShardingLruCache::new(4, 1, RandomState::new())?);
let storage = build_test_catalog(&table_cache, temp_dir.path())?;
let transaction = storage.transaction()?;
let scala_functions = Default::default();
let table_functions = Default::default();
let mut binder = Binder::new(
BinderContext::new(
&table_cache,
&transaction,
&scala_functions,
&table_functions,
Arc::new(AtomicUsize::new(0)),
),
None,
);
let stmt = crate::parser::parse_sql(sql)?;

Ok(binder.bind(&stmt[0])?)
}

#[test]
pub fn test_valid_identifier() {
debug_assert!(is_valid_identifier("valid_table"));
Expand Down
27 changes: 15 additions & 12 deletions src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::planner::operator::union::UnionOperator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::tuple::{Schema, SchemaRef};
use crate::types::LogicalType;
use crate::types::{ColumnId, LogicalType};
use itertools::Itertools;
use sqlparser::ast::{
Distinct, Expr, Ident, Join, JoinConstraint, JoinOperator, Offset, OrderByExpr, Query, Select,
Expand Down Expand Up @@ -352,7 +352,7 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> {
for (alias, column) in aliases_with_columns {
let mut alias_column = ColumnCatalog::clone(&column);
alias_column.set_name(alias.clone());
alias_column.set_ref_table(table_alias.clone(), column.id().unwrap_or(0));
alias_column.set_ref_table(table_alias.clone(), column.id().unwrap_or(ColumnId::new()));

let alias_column_expr = ScalarExpression::Alias {
expr: Box::new(ScalarExpression::ColumnRef(column)),
Expand Down Expand Up @@ -983,31 +983,34 @@ impl<'a: 'b, 'b, T: Transaction> Binder<'a, 'b, T> {

#[cfg(test)]
mod tests {
use crate::binder::test::select_sql_run;
use crate::binder::test::build_t1_table;
use crate::errors::DatabaseError;

#[test]
fn test_select_bind() -> Result<(), DatabaseError> {
let plan_1 = select_sql_run("select * from t1")?;
let table_states = build_t1_table()?;

let plan_1 = table_states.plan("select * from t1")?;
println!("just_col:\n {:#?}", plan_1);
let plan_2 = select_sql_run("select t1.c1, t1.c2 from t1")?;
let plan_2 = table_states.plan("select t1.c1, t1.c2 from t1")?;
println!("table_with_col:\n {:#?}", plan_2);
let plan_3 = select_sql_run("select t1.c1, t1.c2 from t1 where c1 > 2")?;
let plan_3 = table_states.plan("select t1.c1, t1.c2 from t1 where c1 > 2")?;
println!("table_with_col_and_c1_compare_constant:\n {:#?}", plan_3);
let plan_4 = select_sql_run("select t1.c1, t1.c2 from t1 where c1 > c2")?;
let plan_4 = table_states.plan("select t1.c1, t1.c2 from t1 where c1 > c2")?;
println!("table_with_col_and_c1_compare_c2:\n {:#?}", plan_4);
let plan_5 = select_sql_run("select avg(t1.c1) from t1")?;
let plan_5 = table_states.plan("select avg(t1.c1) from t1")?;
println!("table_with_col_and_c1_avg:\n {:#?}", plan_5);
let plan_6 = select_sql_run("select t1.c1, t1.c2 from t1 where (t1.c1 - t1.c2) > 1")?;
let plan_6 = table_states.plan("select t1.c1, t1.c2 from t1 where (t1.c1 - t1.c2) > 1")?;
println!("table_with_col_nested:\n {:#?}", plan_6);

let plan_7 = select_sql_run("select * from t1 limit 1")?;
let plan_7 = table_states.plan("select * from t1 limit 1")?;
println!("limit:\n {:#?}", plan_7);

let plan_8 = select_sql_run("select * from t1 offset 2")?;
let plan_8 = table_states.plan("select * from t1 offset 2")?;
println!("offset:\n {:#?}", plan_8);

let plan_9 = select_sql_run("select c1, c3 from t1 inner join t2 on c1 = c3 and c1 > 1")?;
let plan_9 =
table_states.plan("select c1, c3 from t1 inner join t2 on c1 = c3 and c1 > 1")?;
println!("join:\n {:#?}", plan_9);

Ok(())
Expand Down
11 changes: 11 additions & 0 deletions src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,17 @@ impl ColumnCatalog {
}
}

#[cfg(test)]
impl ColumnSummary {
pub(crate) fn column_id(&self) -> Option<&ColumnId> {
if let ColumnRelation::Table { column_id, .. } = &self.relation {
Some(column_id)
} else {
None
}
}
}

/// The descriptor of a column.
#[derive(Debug, Clone, PartialEq, Eq, Hash, ReferenceSerialization)]
pub struct ColumnDesc {
Expand Down
37 changes: 19 additions & 18 deletions src/catalog/table.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::Arc;
use std::{slice, vec};

use crate::catalog::{ColumnCatalog, ColumnRef, ColumnRelation};
use crate::errors::DatabaseError;
use crate::types::index::{IndexMeta, IndexMetaRef, IndexType};
use crate::types::tuple::SchemaRef;
use crate::types::{ColumnId, LogicalType};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::Arc;
use std::{slice, vec};
use ulid::Generator;

pub type TableName = Arc<String>;

Expand Down Expand Up @@ -41,9 +41,9 @@ impl TableCatalog {
self.columns.get(id).map(|i| &self.schema_ref[*i])
}

#[allow(dead_code)]
pub(crate) fn get_column_id_by_name(&self, name: &str) -> Option<ColumnId> {
self.column_idxs.get(name).map(|(id, _)| id).cloned()
#[cfg(test)]
pub(crate) fn get_column_id_by_name(&self, name: &str) -> Option<&ColumnId> {
self.column_idxs.get(name).map(|(id, _)| id)
}

pub(crate) fn get_column_by_name(&self, name: &str) -> Option<&ColumnRef> {
Expand Down Expand Up @@ -87,17 +87,15 @@ impl TableCatalog {
}

/// Add a column to the table catalog.
pub(crate) fn add_column(&mut self, mut col: ColumnCatalog) -> Result<ColumnId, DatabaseError> {
pub(crate) fn add_column(
&mut self,
mut col: ColumnCatalog,
generator: &mut Generator,
) -> Result<ColumnId, DatabaseError> {
if self.column_idxs.contains_key(col.name()) {
return Err(DatabaseError::DuplicateColumn(col.name().to_string()));
}

let col_id = self
.columns
.iter()
.last()
.map(|(column_id, _)| column_id + 1)
.unwrap_or(0);
let col_id = generator.generate().unwrap();

col.summary.relation = ColumnRelation::Table {
column_id: col_id,
Expand Down Expand Up @@ -155,8 +153,11 @@ impl TableCatalog {
indexes: vec![],
schema_ref: Arc::new(vec![]),
};
let mut generator = Generator::new();
for col_catalog in columns.into_iter() {
let _ = table_catalog.add_column(col_catalog)?;
let _ = table_catalog
.add_column(col_catalog, &mut generator)
.unwrap();
}

Ok(table_catalog)
Expand Down
3 changes: 2 additions & 1 deletion src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ pub(crate) mod test {
true,
ColumnDesc::new(LogicalType::Integer, false, false, None).unwrap(),
);
column.set_ref_table(Arc::new("a".to_string()), 0);
let number_column_id = schema[0].summary.column_id().unwrap();
column.set_ref_table(Arc::new("a".to_string()), *number_column_id);

debug_assert_eq!(schema, Arc::new(vec![ColumnRef::from(column)]));
debug_assert_eq!(
Expand Down
18 changes: 9 additions & 9 deletions src/execution/dml/copy_from_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,20 +108,20 @@ fn return_result(size: usize, tx: Sender<Tuple>) -> Result<(), DatabaseError> {

#[cfg(test)]
mod tests {
use super::*;
use crate::binder::copy::ExtSource;
use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnRef, ColumnRelation, ColumnSummary};
use crate::db::DataBaseBuilder;
use crate::errors::DatabaseError;
use crate::storage::Storage;
use crate::types::LogicalType;
use sqlparser::ast::CharLengthUnits;
use std::io::Write;
use std::ops::{Coroutine, CoroutineState};
use std::pin::Pin;
use std::sync::Arc;
use tempfile::TempDir;

use super::*;
use crate::binder::copy::ExtSource;
use crate::errors::DatabaseError;
use crate::storage::Storage;
use crate::types::LogicalType;
use ulid::Ulid;

#[test]
fn read_csv() -> Result<(), DatabaseError> {
Expand All @@ -135,7 +135,7 @@ mod tests {
summary: ColumnSummary {
name: "a".to_string(),
relation: ColumnRelation::Table {
column_id: 0,
column_id: Ulid::new(),
table_name: Arc::new("t1".to_string()),
},
},
Expand All @@ -146,7 +146,7 @@ mod tests {
summary: ColumnSummary {
name: "b".to_string(),
relation: ColumnRelation::Table {
column_id: 1,
column_id: Ulid::new(),
table_name: Arc::new("t1".to_string()),
},
},
Expand All @@ -157,7 +157,7 @@ mod tests {
summary: ColumnSummary {
name: "c".to_string(),
relation: ColumnRelation::Table {
column_id: 2,
column_id: Ulid::new(),
table_name: Arc::new("t1".to_string()),
},
},
Expand Down
10 changes: 8 additions & 2 deletions src/expression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,7 @@ mod test {
use crate::function::numbers::Numbers;
use crate::serdes::{ReferenceSerialization, ReferenceTables};
use crate::storage::rocksdb::{RocksStorage, RocksTransaction};
use crate::storage::{Storage, TableCache};
use crate::storage::{Storage, TableCache, Transaction};
use crate::types::evaluator::boolean::BooleanNotUnaryEvaluator;
use crate::types::evaluator::int32::Int32PlusBinaryEvaluator;
use crate::types::evaluator::{BinaryEvaluatorBox, UnaryEvaluatorBox};
Expand Down Expand Up @@ -1345,6 +1345,12 @@ mod test {

let mut cursor = Cursor::new(Vec::new());
let mut reference_tables = ReferenceTables::new();
let c3_column_id = {
let table = transaction
.table(&table_cache, Arc::new("t1".to_string()))
.unwrap();
*table.get_column_id_by_name("c3").unwrap()
};

fn_assert(
&mut cursor,
Expand Down Expand Up @@ -1374,7 +1380,7 @@ mod test {
summary: ColumnSummary {
name: "c3".to_string(),
relation: ColumnRelation::Table {
column_id: 2,
column_id: c3_column_id,
table_name: Arc::new("t1".to_string()),
},
},
Expand Down
Loading

0 comments on commit 1853027

Please sign in to comment.