diff --git a/hook-common/src/pgqueue.rs b/hook-common/src/pgqueue.rs index 5377a0e..af91fbd 100644 --- a/hook-common/src/pgqueue.rs +++ b/hook-common/src/pgqueue.rs @@ -7,6 +7,7 @@ use std::{str::FromStr, sync::Arc}; use async_trait::async_trait; use chrono; use serde; +use sqlx::postgres::any::AnyConnectionBackend; use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions}; use thiserror::Error; use tokio::sync::Mutex; @@ -333,14 +334,18 @@ pub struct PgTransactionBatch<'c, J, M> { } impl<'c, J, M> PgTransactionBatch<'_, J, M> { - pub async fn commit(&mut self) -> PgQueueResult<()> { + pub async fn commit(self) -> PgQueueResult<()> { let mut txn_guard = self.shared_txn.lock().await; - let txn = txn_guard.take().unwrap(); - txn.commit().await.map_err(|e| PgQueueError::QueryError { - command: "COMMIT".to_owned(), - error: e, - })?; + txn_guard + .as_deref_mut() + .ok_or(PgQueueError::TransactionAlreadyClosedError)? + .commit() + .await + .map_err(|e| PgQueueError::QueryError { + command: "COMMIT".to_owned(), + error: e, + })?; Ok(()) }