Skip to content

Commit

Permalink
Test/TPCC (#243)
Browse files Browse the repository at this point in the history
* test: add Tpcc

* fix: stackoverflow with `TPCC` release mode

* chore bump fnck_sql version to 0.0.6
  • Loading branch information
KKould authored Nov 12, 2024
1 parent 9d7e0af commit 6e35bc7
Show file tree
Hide file tree
Showing 22 changed files with 1,860 additions and 29 deletions.
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[package]
name = "fnck_sql"
version = "0.0.4"
version = "0.0.6"
edition = "2021"
authors = ["Kould <[email protected]>", "Xwg <[email protected]>"]
description = "SQL as a Function for Rust"
Expand Down Expand Up @@ -82,8 +82,9 @@ pprof = { version = "0.13", features = ["flamegraph", "criterion"] }
[workspace]
members = [
"tests/sqllogictest",
"tests/macros-test"
, "fnck_sql_serde_macros"]
"tests/macros-test",
"fnck_sql_serde_macros",
"tpcc"]

[profile.release]
lto = true
19 changes: 17 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,21 @@ let fnck_sql = DataBaseBuilder::path("./data").build()?;
let tuples = fnck_sql.run("select * from t1")?;
```

### TPCC
run `cargo run -p tpcc --release` to run tpcc

- i9-13900HX
- 32.0 GB
- YMTC PC411-1024GB-B
```shell
<90th Percentile RT (MaxRT)>
New-Order : 0.882 (0.947)
Payment : 0.080 (0.095)
Order-Status : 0.235 (0.255)
Delivery : 5.386 (5.658)
Stock-Level : 0.001 (0.002)
```

#### PG Wire Service
run `cargo run --features="net"` to start server
![start](./static/images/start.gif)
Expand Down Expand Up @@ -138,8 +153,8 @@ table_function!(MyTableFunction::test_numbers(LogicalType::Integer) -> [c1: Logi
.map(|i| Ok(Tuple {
id: None,
values: vec![
Arc::new(DataValue::Int32(Some(i))),
Arc::new(DataValue::Int32(Some(i))),
DataValue::Int32(Some(i)),
DataValue::Int32(Some(i)),
]
}))) as Box<dyn Iterator<Item = Result<Tuple, DatabaseError>>>)
}));
Expand Down
4 changes: 2 additions & 2 deletions src/binder/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ impl<'a, T: Transaction> Binder<'a, '_, T> {
if !args.is_empty() {
ty = args[0].return_type();

for arg in args.iter() {
for arg in args.iter_mut() {
let temp_ty = arg.return_type();

if temp_ty == LogicalType::SqlNull {
Expand All @@ -595,7 +595,7 @@ impl<'a, T: Transaction> Binder<'a, '_, T> {
if ty == LogicalType::SqlNull && temp_ty != LogicalType::SqlNull {
ty = temp_ty;
} else if ty != temp_ty {
return Err(DatabaseError::Incomparable(ty, temp_ty));
ty = LogicalType::max_logical_type(&ty, &temp_ty)?;
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion src/binder/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,20 @@ impl<T: Transaction> Binder<'_, '_, T> {
Some(table_name.to_string()),
)? {
ScalarExpression::ColumnRef(column) => {
let expr = if matches!(expression, ScalarExpression::Empty) {
let mut expr = if matches!(expression, ScalarExpression::Empty) {
let default_value = column
.default_value()?
.ok_or(DatabaseError::DefaultNotExist)?;
ScalarExpression::Constant(default_value)
} else {
expression.clone()
};
if &expr.return_type() != column.datatype() {
expr = ScalarExpression::TypeCast {
expr: Box::new(expr),
ty: column.datatype().clone(),
}
}
value_exprs.push((column, expr));
}
_ => return Err(DatabaseError::InvalidColumn(ident.to_string())),
Expand Down
2 changes: 1 addition & 1 deletion src/execution/dql/aggregate/avg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl Accumulator for AvgAccumulator {
let quantity_ty = quantity.logical_type();

if value_ty != quantity_ty {
value = DataValue::clone(&value).cast(&quantity_ty)?
value = value.cast(&quantity_ty)?
}
let evaluator = EvaluatorFactory::binary_create(quantity_ty, BinaryOperator::Divide)?;
Ok(evaluator.0.binary_eval(&value, &quantity))
Expand Down
6 changes: 2 additions & 4 deletions src/expression/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl ScalarExpression {
pub fn eval(&self, tuple: &Tuple, schema: &[ColumnRef]) -> Result<DataValue, DatabaseError> {
let check_cast = |value: DataValue, return_type: &LogicalType| {
if value.logical_type() != *return_type {
return DataValue::clone(&value).cast(return_type);
return value.cast(return_type);
}
Ok(value)
};
Expand Down Expand Up @@ -73,9 +73,7 @@ impl ScalarExpression {
expr.eval(tuple, schema)
}
ScalarExpression::TypeCast { expr, ty, .. } => {
let value = expr.eval(tuple, schema)?;

Ok(DataValue::clone(&value).cast(ty)?)
Ok(expr.eval(tuple, schema)?.cast(ty)?)
}
ScalarExpression::Binary {
left_expr,
Expand Down
3 changes: 1 addition & 2 deletions src/function/char_length.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ impl ScalarFunctionImpl for CharLength {
let value = exprs[0].eval(tuples, columns)?;
let mut value = DataValue::clone(&value);
if !matches!(value.logical_type(), LogicalType::Varchar(_, _)) {
value = DataValue::clone(&value)
.cast(&LogicalType::Varchar(None, CharLengthUnits::Characters))?;
value = value.cast(&LogicalType::Varchar(None, CharLengthUnits::Characters))?;
}
let mut length: u64 = 0;
if let DataValue::Utf8 {
Expand Down
3 changes: 1 addition & 2 deletions src/function/lower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ impl ScalarFunctionImpl for Lower {
let value = exprs[0].eval(tuples, columns)?;
let mut value = DataValue::clone(&value);
if !matches!(value.logical_type(), LogicalType::Varchar(_, _)) {
value = DataValue::clone(&value)
.cast(&LogicalType::Varchar(None, CharLengthUnits::Characters))?;
value = value.cast(&LogicalType::Varchar(None, CharLengthUnits::Characters))?;
}
if let DataValue::Utf8 {
value: Some(value),
Expand Down
3 changes: 1 addition & 2 deletions src/function/upper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ impl ScalarFunctionImpl for Upper {
let value = exprs[0].eval(tuples, columns)?;
let mut value = DataValue::clone(&value);
if !matches!(value.logical_type(), LogicalType::Varchar(_, _)) {
value = DataValue::clone(&value)
.cast(&LogicalType::Varchar(None, CharLengthUnits::Characters))?;
value = value.cast(&LogicalType::Varchar(None, CharLengthUnits::Characters))?;
}
if let DataValue::Utf8 {
value: Some(value),
Expand Down
14 changes: 6 additions & 8 deletions src/macros/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,20 @@
macro_rules! implement_from_tuple {
($struct_name:ident, ($($field_name:ident : $field_type:ty => $closure:expr),+)) => {
impl From<(&::fnck_sql::types::tuple::SchemaRef, ::fnck_sql::types::tuple::Tuple)> for $struct_name {
fn from((schema, tuple): (&::fnck_sql::types::tuple::SchemaRef, ::fnck_sql::types::tuple::Tuple)) -> Self {
fn try_get<T: 'static>(tuple: &::fnck_sql::types::tuple::Tuple, schema: &::fnck_sql::types::tuple::SchemaRef, field_name: &str) -> Option<::fnck_sql::types::value::DataValue> {
fn from((schema, mut tuple): (&::fnck_sql::types::tuple::SchemaRef, ::fnck_sql::types::tuple::Tuple)) -> Self {
fn try_get<T: 'static>(tuple: &mut ::fnck_sql::types::tuple::Tuple, schema: &::fnck_sql::types::tuple::SchemaRef, field_name: &str) -> Option<::fnck_sql::types::value::DataValue> {
let ty = ::fnck_sql::types::LogicalType::type_trans::<T>()?;
let (idx, _) = schema
.iter()
.enumerate()
.find(|(_, col)| col.name() == field_name)?;

::fnck_sql::types::value::DataValue::clone(&tuple.values[idx])
.cast(&ty)
.ok()
std::mem::replace(&mut tuple.values[idx], ::fnck_sql::types::value::DataValue::Null).cast(&ty).ok()
}

let mut struct_instance = $struct_name::default();
$(
if let Some(value) = try_get::<$field_type>(&tuple, schema, stringify!($field_name)) {
if let Some(value) = try_get::<$field_type>(&mut tuple, schema, stringify!($field_name)) {
$closure(
&mut struct_instance,
value
Expand Down Expand Up @@ -103,7 +101,7 @@ macro_rules! scala_function {
_index += 1;

if value.logical_type() != $arg_ty {
value = ::fnck_sql::types::value::DataValue::clone(&value).cast(&$arg_ty)?;
value = value.cast(&$arg_ty)?;
}
value
}, )*)
Expand Down Expand Up @@ -197,7 +195,7 @@ macro_rules! table_function {
_index += 1;

if value.logical_type() != $arg_ty {
value = ::fnck_sql::types::value::DataValue::clone(&value).cast(&$arg_ty)?;
value = value.cast(&$arg_ty)?;
}
value
}, )*)
Expand Down
2 changes: 1 addition & 1 deletion src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ impl LogicalType {
LogicalType::Float | LogicalType::Double | LogicalType::Decimal(_, _)
),
LogicalType::Float => matches!(to, LogicalType::Double | LogicalType::Decimal(_, _)),
LogicalType::Double => false,
LogicalType::Double => matches!(to, LogicalType::Decimal(_, _)),
LogicalType::Char(..) => false,
LogicalType::Varchar(..) => false,
LogicalType::Date => matches!(
Expand Down
4 changes: 4 additions & 0 deletions src/types/tuple_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ impl TupleIdBuilder {
}

pub fn build(&mut self) -> Option<TupleId> {
if self.tmp_keys.len() != self.primary_indexes.len() {
self.tmp_keys.clear();
return None;
}
(!self.tmp_keys.is_empty()).then(|| {
if self.tmp_keys.len() == 1 {
self.tmp_keys.pop().unwrap().unwrap()
Expand Down
2 changes: 1 addition & 1 deletion src/types/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1388,7 +1388,7 @@ impl DataValue {
LogicalType::Tuple(types) => Ok(if let Some(mut values) = values {
for (i, value) in values.iter_mut().enumerate() {
if types[i] != value.logical_type() {
*value = DataValue::clone(value).cast(&types[i])?;
*value = mem::replace(value, DataValue::Null).cast(&types[i])?;
}
}
DataValue::Tuple(Some(values))
Expand Down
15 changes: 15 additions & 0 deletions tpcc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "tpcc"
version = "0.1.0"
edition = "2021"

[dependencies]
clap = { version = "4", features = ["derive"] }
chrono = { version = "0.4" }
fnck_sql = { version = "0.0.6", path = "..", package = "fnck_sql" }
indicatif = { version = "0.17" }
ordered-float = { version = "4" }
rand = { version = "0.8" }
rust_decimal = { version = "1" }
tempfile = { version = "3" }
thiserror = { version = "1" }
84 changes: 84 additions & 0 deletions tpcc/src/delivery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use crate::load::DIST_PER_WARE;
use crate::{TpccArgs, TpccError, TpccTest, TpccTransaction};
use chrono::Utc;
use fnck_sql::db::DBTransaction;
use fnck_sql::storage::Storage;
use rand::prelude::ThreadRng;
use rand::Rng;

#[derive(Debug)]
pub(crate) struct DeliveryArgs {
w_id: usize,
o_carrier_id: usize,
}

impl DeliveryArgs {
pub(crate) fn new(w_id: usize, o_carrier_id: usize) -> Self {
Self { w_id, o_carrier_id }
}
}

pub(crate) struct Delivery;
pub(crate) struct DeliveryTest;

impl<S: Storage> TpccTransaction<S> for Delivery {
type Args = DeliveryArgs;

fn run(tx: &mut DBTransaction<S>, args: &Self::Args) -> Result<(), TpccError> {
let now = Utc::now().format("%Y-%m-%d %H:%M:%S").to_string();

for d_id in 1..DIST_PER_WARE + 1 {
// "SELECT COALESCE(MIN(no_o_id),0) FROM new_orders WHERE no_d_id = ? AND no_w_id = ?"
let (_, tuple) = tx.run(format!("SELECT COALESCE(MIN(no_o_id),0) FROM new_orders WHERE no_d_id = {} AND no_w_id = {}", d_id, args.w_id))?;
let no_o_id = tuple[0].values[0].i32().unwrap();

if no_o_id == 0 {
continue;
}
// "DELETE FROM new_orders WHERE no_o_id = ? AND no_d_id = ? AND no_w_id = ?"
let _ = tx.run(format!(
"DELETE FROM new_orders WHERE no_o_id = {} AND no_d_id = {} AND no_w_id = {}",
no_o_id, d_id, args.w_id
))?;
// "SELECT o_c_id FROM orders WHERE o_id = ? AND o_d_id = ? AND o_w_id = ?"
let (_, tuple) = tx.run(format!(
"SELECT o_c_id FROM orders WHERE o_id = {} AND o_d_id = {} AND o_w_id = {}",
no_o_id, d_id, args.w_id
))?;
let c_id = tuple[0].values[0].i32().unwrap();
// "UPDATE orders SET o_carrier_id = ? WHERE o_id = ? AND o_d_id = ? AND o_w_id = ?"
let _ = tx.run(format!("UPDATE orders SET o_carrier_id = {} WHERE o_id = {} AND o_d_id = {} AND o_w_id = {}", args.o_carrier_id, no_o_id, d_id, args.w_id))?;
// "UPDATE order_line SET ol_delivery_d = ? WHERE ol_o_id = ? AND ol_d_id = ? AND ol_w_id = ?"
let _ = tx.run(format!("UPDATE order_line SET ol_delivery_d = '{}' WHERE ol_o_id = {} AND ol_d_id = {} AND ol_w_id = {}", now, no_o_id, d_id, args.w_id))?;
// "SELECT SUM(ol_amount) FROM order_line WHERE ol_o_id = ? AND ol_d_id = ? AND ol_w_id = ?"
let (_, tuple) = tx.run(format!("SELECT SUM(ol_amount) FROM order_line WHERE ol_o_id = {} AND ol_d_id = {} AND ol_w_id = {}", no_o_id, d_id, args.w_id))?;
let ol_total = tuple[0].values[0].decimal().unwrap();
// "UPDATE customer SET c_balance = c_balance + ? , c_delivery_cnt = c_delivery_cnt + 1 WHERE c_id = ? AND c_d_id = ? AND c_w_id = ?"
let _ = tx.run(format!("UPDATE customer SET c_balance = c_balance + {} , c_delivery_cnt = c_delivery_cnt + 1 WHERE c_id = {} AND c_d_id = {} AND c_w_id = {}", ol_total, c_id, d_id, args.w_id))?;
}

Ok(())
}
}

impl<S: Storage> TpccTest<S> for DeliveryTest {
fn name(&self) -> &'static str {
"Delivery"
}

fn do_transaction(
&self,
rng: &mut ThreadRng,
tx: &mut DBTransaction<S>,
num_ware: usize,
_: &TpccArgs,
) -> Result<(), TpccError> {
let w_id = rng.gen_range(0..num_ware) + 1;
let o_carrier_id = rng.gen_range(1..10);

let args = DeliveryArgs::new(w_id, o_carrier_id);
Delivery::run(tx, &args)?;

Ok(())
}
}
Loading

0 comments on commit 6e35bc7

Please sign in to comment.