Skip to content

Commit

Permalink
Implement LogicalPlan support for transactions (apache#5827)
Browse files Browse the repository at this point in the history
Implement LogicalPlan support for transactions (apache#5827)
  • Loading branch information
avantgardnerio authored Apr 2, 2023
1 parent 187c6f0 commit a7f828d
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 3 deletions.
12 changes: 12 additions & 0 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,18 @@ impl DefaultPhysicalPlanner {
"Unsupported logical plan: Dml".to_string(),
))
}
LogicalPlan::TransactionStart(_) => {
// DataFusion is a read-only query engine, but also a library, so consumers may implement this
Err(DataFusionError::NotImplemented(
"Unsupported logical plan: TransactionStart".to_string(),
))
}
LogicalPlan::TransactionEnd(_) => {
// DataFusion is a read-only query engine, but also a library, so consumers may implement this
Err(DataFusionError::NotImplemented(
"Unsupported logical plan: TransactionEnd".to_string(),
))
}
LogicalPlan::SetVariable(_) => {
Err(DataFusionError::Internal(
"Unsupported logical plan: SetVariable must be root of the plan".to_string(),
Expand Down
3 changes: 2 additions & 1 deletion datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ pub use plan::{
DropTable, DropView, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint,
JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare, Projection,
Repartition, SetVariable, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan,
ToStringifiedPlan, Union, Unnest, Values, Window, WriteOp,
ToStringifiedPlan, TransactionAccessMode, TransactionConclusion, TransactionEnd,
TransactionIsolationLevel, TransactionStart, Union, Unnest, Values, Window, WriteOp,
};

pub use display::display_schema;
Expand Down
71 changes: 71 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ pub enum LogicalPlan {
DescribeTable(DescribeTable),
/// Unnest a column that contains a nested list type.
Unnest(Unnest),
// Begin a transaction
TransactionStart(TransactionStart),
// Commit or rollback a transaction
TransactionEnd(TransactionEnd),
}

impl LogicalPlan {
Expand Down Expand Up @@ -171,6 +175,8 @@ impl LogicalPlan {
}
LogicalPlan::Dml(DmlStatement { table_schema, .. }) => table_schema,
LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
LogicalPlan::TransactionStart(TransactionStart { schema, .. }) => schema,
LogicalPlan::TransactionEnd(TransactionEnd { schema, .. }) => schema,
}
}

Expand Down Expand Up @@ -240,6 +246,8 @@ impl LogicalPlan {
LogicalPlan::DropTable(_)
| LogicalPlan::DropView(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::TransactionStart(_)
| LogicalPlan::TransactionEnd(_)
| LogicalPlan::SetVariable(_) => vec![],
}
}
Expand Down Expand Up @@ -360,6 +368,8 @@ impl LogicalPlan {
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::TransactionStart(_)
| LogicalPlan::TransactionEnd(_)
| LogicalPlan::SetVariable(_)
| LogicalPlan::DropView(_)
| LogicalPlan::CrossJoin(_)
Expand Down Expand Up @@ -410,6 +420,8 @@ impl LogicalPlan {
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::TransactionStart(_)
| LogicalPlan::TransactionEnd(_)
| LogicalPlan::SetVariable(_)
| LogicalPlan::DropView(_)
| LogicalPlan::DescribeTable(_) => vec![],
Expand Down Expand Up @@ -1057,6 +1069,20 @@ impl LogicalPlan {
}) => {
write!(f, "DropTable: {name:?} if not exist:={if_exists}")
}
LogicalPlan::TransactionStart(TransactionStart {
access_mode,
isolation_level,
..
}) => {
write!(f, "TransactionStart: {access_mode:?} {isolation_level:?}")
}
LogicalPlan::TransactionEnd(TransactionEnd {
conclusion,
chain,
..
}) => {
write!(f, "TransactionEnd: {conclusion:?} chain:={chain}")
}
LogicalPlan::DropView(DropView {
name, if_exists, ..
}) => {
Expand Down Expand Up @@ -1565,6 +1591,51 @@ pub struct DmlStatement {
pub input: Arc<LogicalPlan>,
}

/// Indicates if a transaction was committed or aborted
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub enum TransactionConclusion {
Commit,
Rollback,
}

/// Indicates if this transaction is allowed to write
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub enum TransactionAccessMode {
ReadOnly,
ReadWrite,
}

/// Indicates ANSI transaction isolation level
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub enum TransactionIsolationLevel {
ReadUncommitted,
ReadCommitted,
RepeatableRead,
Serializable,
}

/// Indicator that the following statements should be committed or rolled back atomically
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct TransactionStart {
/// indicates if transaction is allowed to write
pub access_mode: TransactionAccessMode,
// indicates ANSI isolation level
pub isolation_level: TransactionIsolationLevel,
/// Empty schema
pub schema: DFSchemaRef,
}

/// Indicator that any current transaction should be terminated
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct TransactionEnd {
/// whether the transaction committed or aborted
pub conclusion: TransactionConclusion,
/// if specified a new transaction is immediately started with same characteristics
pub chain: bool,
/// Empty schema
pub schema: DFSchemaRef,
}

/// Prepare a statement but do not execute it. Prepare statements can have 0 or more
/// `Expr::Placeholder` expressions that are filled in during execution
#[derive(Clone, PartialEq, Eq, Hash)]
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,8 @@ pub fn from_plan(
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::DropView(_)
| LogicalPlan::TransactionStart(_)
| LogicalPlan::TransactionEnd(_)
| LogicalPlan::SetVariable(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_) => {
Expand Down
2 changes: 2 additions & 0 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ impl OptimizerRule for CommonSubexprEliminate {
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::DropView(_)
| LogicalPlan::TransactionStart(_)
| LogicalPlan::TransactionEnd(_)
| LogicalPlan::SetVariable(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Distinct(_)
Expand Down
6 changes: 6 additions & 0 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1366,6 +1366,12 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlan::Dml(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for Dml",
)),
LogicalPlan::TransactionStart(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for Transactions",
)),
LogicalPlan::TransactionEnd(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for Transactions",
)),
LogicalPlan::DescribeTable(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for DescribeTable",
)),
Expand Down
69 changes: 67 additions & 2 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ use datafusion_common::{
};
use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
use datafusion_expr::logical_plan::builder::project;
use datafusion_expr::logical_plan::{Analyze, Prepare};
use datafusion_expr::logical_plan::{
Analyze, Prepare, TransactionAccessMode, TransactionConclusion, TransactionEnd,
TransactionIsolationLevel, TransactionStart,
};
use datafusion_expr::utils::expr_to_columns;
use datafusion_expr::{
cast, col, CreateCatalog, CreateCatalogSchema,
Expand All @@ -43,7 +46,7 @@ use sqlparser::ast;
use sqlparser::ast::{
Assignment, Expr as SQLExpr, Expr, Ident, ObjectName, ObjectType, OrderByExpr, Query,
SchemaName, SetExpr, ShowCreateObject, ShowStatementFilter, Statement, TableFactor,
TableWithJoins, UnaryOperator, Value,
TableWithJoins, TransactionMode, UnaryOperator, Value,
};

use sqlparser::parser::ParserError::ParserError;
Expand Down Expand Up @@ -393,6 +396,68 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
self.delete_to_plan(table_name, selection)
}

Statement::StartTransaction { modes } => {
let isolation_level: ast::TransactionIsolationLevel = modes
.iter()
.filter_map(|m: &ast::TransactionMode| match m {
TransactionMode::AccessMode(_) => None,
TransactionMode::IsolationLevel(level) => Some(level),
})
.last()
.copied()
.unwrap_or(ast::TransactionIsolationLevel::Serializable);
let access_mode: ast::TransactionAccessMode = modes
.iter()
.filter_map(|m: &ast::TransactionMode| match m {
TransactionMode::AccessMode(mode) => Some(mode),
TransactionMode::IsolationLevel(_) => None,
})
.last()
.copied()
.unwrap_or(ast::TransactionAccessMode::ReadWrite);
let isolation_level = match isolation_level {
ast::TransactionIsolationLevel::ReadUncommitted => {
TransactionIsolationLevel::ReadUncommitted
}
ast::TransactionIsolationLevel::ReadCommitted => {
TransactionIsolationLevel::ReadCommitted
}
ast::TransactionIsolationLevel::RepeatableRead => {
TransactionIsolationLevel::RepeatableRead
}
ast::TransactionIsolationLevel::Serializable => {
TransactionIsolationLevel::Serializable
}
};
let access_mode = match access_mode {
ast::TransactionAccessMode::ReadOnly => {
TransactionAccessMode::ReadOnly
}
ast::TransactionAccessMode::ReadWrite => {
TransactionAccessMode::ReadWrite
}
};
Ok(LogicalPlan::TransactionStart(TransactionStart {
access_mode,
isolation_level,
schema: DFSchemaRef::new(DFSchema::empty()),
}))
}
Statement::Commit { chain } => {
Ok(LogicalPlan::TransactionEnd(TransactionEnd {
conclusion: TransactionConclusion::Commit,
chain,
schema: DFSchemaRef::new(DFSchema::empty()),
}))
}
Statement::Rollback { chain } => {
Ok(LogicalPlan::TransactionEnd(TransactionEnd {
conclusion: TransactionConclusion::Rollback,
chain,
schema: DFSchemaRef::new(DFSchema::empty()),
}))
}

_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported SQL statement: {sql:?}"
))),
Expand Down
67 changes: 67 additions & 0 deletions datafusion/sql/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,73 @@ fn cast_to_invalid_decimal_type() {
}
}

#[test]
fn plan_start_transaction() {
let sql = "start transaction";
let plan = "TransactionStart: ReadWrite Serializable";
quick_test(sql, plan);
}

#[test]
fn plan_start_transaction_isolation() {
let sql = "start transaction isolation level read committed";
let plan = "TransactionStart: ReadWrite ReadCommitted";
quick_test(sql, plan);
}

#[test]
fn plan_start_transaction_read_only() {
let sql = "start transaction read only";
let plan = "TransactionStart: ReadOnly Serializable";
quick_test(sql, plan);
}

#[test]
fn plan_start_transaction_fully_qualified() {
let sql = "start transaction isolation level read committed read only";
let plan = "TransactionStart: ReadOnly ReadCommitted";
quick_test(sql, plan);
}

#[test]
fn plan_start_transaction_overly_qualified() {
let sql = r#"start transaction
isolation level read committed
read only
isolation level repeatable read
"#;
let plan = "TransactionStart: ReadOnly RepeatableRead";
quick_test(sql, plan);
}

#[test]
fn plan_commit_transaction() {
let sql = "commit transaction";
let plan = "TransactionEnd: Commit chain:=false";
quick_test(sql, plan);
}

#[test]
fn plan_commit_transaction_chained() {
let sql = "commit transaction and chain";
let plan = "TransactionEnd: Commit chain:=true";
quick_test(sql, plan);
}

#[test]
fn plan_rollback_transaction() {
let sql = "rollback transaction";
let plan = "TransactionEnd: Rollback chain:=false";
quick_test(sql, plan);
}

#[test]
fn plan_rollback_transaction_chained() {
let sql = "rollback transaction and chain";
let plan = "TransactionEnd: Rollback chain:=true";
quick_test(sql, plan);
}

#[test]
fn plan_insert() {
let sql =
Expand Down

0 comments on commit a7f828d

Please sign in to comment.