Skip to content

Commit

Permalink
test: add Tpcc
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Nov 12, 2024
1 parent 9d7e0af commit 7c14309
Show file tree
Hide file tree
Showing 20 changed files with 1,833 additions and 22 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ pprof = { version = "0.13", features = ["flamegraph", "criterion"] }
[workspace]
members = [
"tests/sqllogictest",
"tests/macros-test"
, "fnck_sql_serde_macros"]
"tests/macros-test",
"fnck_sql_serde_macros",
"tpcc"]

[profile.release]
lto = true
4 changes: 2 additions & 2 deletions src/binder/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ impl<'a, T: Transaction> Binder<'a, '_, T> {
if !args.is_empty() {
ty = args[0].return_type();

for arg in args.iter() {
for arg in args.iter_mut() {
let temp_ty = arg.return_type();

if temp_ty == LogicalType::SqlNull {
Expand All @@ -595,7 +595,7 @@ impl<'a, T: Transaction> Binder<'a, '_, T> {
if ty == LogicalType::SqlNull && temp_ty != LogicalType::SqlNull {
ty = temp_ty;
} else if ty != temp_ty {
return Err(DatabaseError::Incomparable(ty, temp_ty));
ty = LogicalType::max_logical_type(&ty, &temp_ty)?;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/execution/dql/aggregate/avg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl Accumulator for AvgAccumulator {
let quantity_ty = quantity.logical_type();

if value_ty != quantity_ty {
value = DataValue::clone(&value).cast(&quantity_ty)?
value = value.cast(&quantity_ty)?
}
let evaluator = EvaluatorFactory::binary_create(quantity_ty, BinaryOperator::Divide)?;
Ok(evaluator.0.binary_eval(&value, &quantity))
Expand Down
6 changes: 2 additions & 4 deletions src/expression/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl ScalarExpression {
pub fn eval(&self, tuple: &Tuple, schema: &[ColumnRef]) -> Result<DataValue, DatabaseError> {
let check_cast = |value: DataValue, return_type: &LogicalType| {
if value.logical_type() != *return_type {
return DataValue::clone(&value).cast(return_type);
return value.cast(return_type);
}
Ok(value)
};
Expand Down Expand Up @@ -73,9 +73,7 @@ impl ScalarExpression {
expr.eval(tuple, schema)
}
ScalarExpression::TypeCast { expr, ty, .. } => {
let value = expr.eval(tuple, schema)?;

Ok(DataValue::clone(&value).cast(ty)?)
Ok(expr.eval(tuple, schema)?.cast(ty)?)
}
ScalarExpression::Binary {
left_expr,
Expand Down
3 changes: 1 addition & 2 deletions src/function/char_length.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ impl ScalarFunctionImpl for CharLength {
let value = exprs[0].eval(tuples, columns)?;
let mut value = DataValue::clone(&value);
if !matches!(value.logical_type(), LogicalType::Varchar(_, _)) {
value = DataValue::clone(&value)
.cast(&LogicalType::Varchar(None, CharLengthUnits::Characters))?;
value = value.cast(&LogicalType::Varchar(None, CharLengthUnits::Characters))?;
}
let mut length: u64 = 0;
if let DataValue::Utf8 {
Expand Down
3 changes: 1 addition & 2 deletions src/function/lower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ impl ScalarFunctionImpl for Lower {
let value = exprs[0].eval(tuples, columns)?;
let mut value = DataValue::clone(&value);
if !matches!(value.logical_type(), LogicalType::Varchar(_, _)) {
value = DataValue::clone(&value)
.cast(&LogicalType::Varchar(None, CharLengthUnits::Characters))?;
value = value.cast(&LogicalType::Varchar(None, CharLengthUnits::Characters))?;
}
if let DataValue::Utf8 {
value: Some(value),
Expand Down
3 changes: 1 addition & 2 deletions src/function/upper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ impl ScalarFunctionImpl for Upper {
let value = exprs[0].eval(tuples, columns)?;
let mut value = DataValue::clone(&value);
if !matches!(value.logical_type(), LogicalType::Varchar(_, _)) {
value = DataValue::clone(&value)
.cast(&LogicalType::Varchar(None, CharLengthUnits::Characters))?;
value = value.cast(&LogicalType::Varchar(None, CharLengthUnits::Characters))?;
}
if let DataValue::Utf8 {
value: Some(value),
Expand Down
8 changes: 3 additions & 5 deletions src/macros/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ macro_rules! implement_from_tuple {
.enumerate()
.find(|(_, col)| col.name() == field_name)?;

::fnck_sql::types::value::DataValue::clone(&tuple.values[idx])
.cast(&ty)
.ok()
tuple.values[idx].cast(&ty).ok()
}

let mut struct_instance = $struct_name::default();
Expand Down Expand Up @@ -103,7 +101,7 @@ macro_rules! scala_function {
_index += 1;

if value.logical_type() != $arg_ty {
value = ::fnck_sql::types::value::DataValue::clone(&value).cast(&$arg_ty)?;
value = value.cast(&$arg_ty)?;
}
value
}, )*)
Expand Down Expand Up @@ -197,7 +195,7 @@ macro_rules! table_function {
_index += 1;

if value.logical_type() != $arg_ty {
value = ::fnck_sql::types::value::DataValue::clone(&value).cast(&$arg_ty)?;
value = value.cast(&$arg_ty)?;
}
value
}, )*)
Expand Down
2 changes: 1 addition & 1 deletion src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ impl LogicalType {
LogicalType::Float | LogicalType::Double | LogicalType::Decimal(_, _)
),
LogicalType::Float => matches!(to, LogicalType::Double | LogicalType::Decimal(_, _)),
LogicalType::Double => false,
LogicalType::Double => matches!(to, LogicalType::Decimal(_, _)),
LogicalType::Char(..) => false,
LogicalType::Varchar(..) => false,
LogicalType::Date => matches!(
Expand Down
4 changes: 4 additions & 0 deletions src/types/tuple_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ impl TupleIdBuilder {
}

pub fn build(&mut self) -> Option<TupleId> {
if self.tmp_keys.len() != self.primary_indexes.len() {
self.tmp_keys.clear();
return None;
}
(!self.tmp_keys.is_empty()).then(|| {
if self.tmp_keys.len() == 1 {
self.tmp_keys.pop().unwrap().unwrap()
Expand Down
3 changes: 2 additions & 1 deletion src/types/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ impl DataValue {
}
Utf8Type::Fixed(len) => match unit {
CharLengthUnits::Characters => {
debug_assert!((*len as usize) >= v.len());
let chars_len = *len as usize;
let string_bytes =
format!("{:len$}", v, len = chars_len).into_bytes();
Expand Down Expand Up @@ -1388,7 +1389,7 @@ impl DataValue {
LogicalType::Tuple(types) => Ok(if let Some(mut values) = values {
for (i, value) in values.iter_mut().enumerate() {
if types[i] != value.logical_type() {
*value = DataValue::clone(value).cast(&types[i])?;
*value = mem::replace(value, DataValue::Null).cast(&types[i])?;
}
}
DataValue::Tuple(Some(values))
Expand Down
15 changes: 15 additions & 0 deletions tpcc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "tpcc"
version = "0.1.0"
edition = "2021"

[dependencies]
clap = { version = "4", features = ["derive"] }
chrono = { version = "0.4" }
fnck_sql = { version = "0.0.4", path = "..", package = "fnck_sql" }
indicatif = { version = "0.17" }
ordered-float = { version = "4" }
rand = { version = "0.8" }
rust_decimal = { version = "1" }
tempfile = { version = "3" }
thiserror = { version = "1" }
84 changes: 84 additions & 0 deletions tpcc/src/delivery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use crate::load::DIST_PER_WARE;
use crate::{TpccArgs, TpccError, TpccTest, TpccTransaction};
use chrono::Utc;
use fnck_sql::db::DBTransaction;
use fnck_sql::storage::Storage;
use rand::prelude::ThreadRng;
use rand::Rng;

#[derive(Debug)]
pub(crate) struct DeliveryArgs {
w_id: usize,
o_carrier_id: usize,
}

impl DeliveryArgs {
pub(crate) fn new(w_id: usize, o_carrier_id: usize) -> Self {
Self { w_id, o_carrier_id }
}
}

pub(crate) struct Delivery;
pub(crate) struct DeliveryTest;

impl<S: Storage> TpccTransaction<S> for Delivery {
type Args = DeliveryArgs;

fn run(tx: &mut DBTransaction<S>, args: &Self::Args) -> Result<(), TpccError> {
let now = Utc::now().format("%Y-%m-%d %H:%M:%S").to_string();

for d_id in 1..DIST_PER_WARE + 1 {
// "SELECT COALESCE(MIN(no_o_id),0) FROM new_orders WHERE no_d_id = ? AND no_w_id = ?"
let (_, tuple) = tx.run(format!("SELECT COALESCE(MIN(no_o_id),0) FROM new_orders WHERE no_d_id = {} AND no_w_id = {}", d_id, args.w_id))?;
let no_o_id = tuple[0].values[0].i32().unwrap();

if no_o_id == 0 {
continue;
}
// "DELETE FROM new_orders WHERE no_o_id = ? AND no_d_id = ? AND no_w_id = ?"
let _ = tx.run(format!(
"DELETE FROM new_orders WHERE no_o_id = {} AND no_d_id = {} AND no_w_id = {}",
no_o_id, d_id, args.w_id
))?;
// "SELECT o_c_id FROM orders WHERE o_id = ? AND o_d_id = ? AND o_w_id = ?"
let (_, tuple) = tx.run(format!(
"SELECT o_c_id FROM orders WHERE o_id = {} AND o_d_id = {} AND o_w_id = {}",
no_o_id, d_id, args.w_id
))?;
let c_id = tuple[0].values[0].i32().unwrap();
// "UPDATE orders SET o_carrier_id = ? WHERE o_id = ? AND o_d_id = ? AND o_w_id = ?"
let _ = tx.run(format!("UPDATE orders SET o_carrier_id = {} WHERE o_id = {} AND o_d_id = {} AND o_w_id = {}", args.o_carrier_id, no_o_id, d_id, args.w_id))?;
// "UPDATE order_line SET ol_delivery_d = ? WHERE ol_o_id = ? AND ol_d_id = ? AND ol_w_id = ?"
let _ = tx.run(format!("UPDATE order_line SET ol_delivery_d = '{}' WHERE ol_o_id = {} AND ol_d_id = {} AND ol_w_id = {}", now, no_o_id, d_id, args.w_id))?;
// "SELECT SUM(ol_amount) FROM order_line WHERE ol_o_id = ? AND ol_d_id = ? AND ol_w_id = ?"
let (_, tuple) = tx.run(format!("SELECT SUM(ol_amount) FROM order_line WHERE ol_o_id = {} AND ol_d_id = {} AND ol_w_id = {}", no_o_id, d_id, args.w_id))?;
let ol_total = tuple[0].values[0].decimal().unwrap();
// "UPDATE customer SET c_balance = c_balance + ? , c_delivery_cnt = c_delivery_cnt + 1 WHERE c_id = ? AND c_d_id = ? AND c_w_id = ?"
let _ = tx.run(format!("UPDATE customer SET c_balance = c_balance + {} , c_delivery_cnt = c_delivery_cnt + 1 WHERE c_id = {} AND c_d_id = {} AND c_w_id = {}", ol_total, c_id, d_id, args.w_id))?;
}

Ok(())
}
}

impl<S: Storage> TpccTest<S> for DeliveryTest {
fn name(&self) -> &'static str {
"Delivery"
}

fn do_transaction(
&self,
rng: &mut ThreadRng,
tx: &mut DBTransaction<S>,
num_ware: usize,
_: &TpccArgs,
) -> Result<(), TpccError> {
let w_id = rng.gen_range(0..num_ware) + 1;
let o_carrier_id = rng.gen_range(1..10);

let args = DeliveryArgs::new(w_id, o_carrier_id);
Delivery::run(tx, &args)?;

Ok(())
}
}
Loading

0 comments on commit 7c14309

Please sign in to comment.