diff --git a/krossbow-stomp-core/src/commonMain/kotlin/org/hildan/krossbow/stomp/BaseStompSession.kt b/krossbow-stomp-core/src/commonMain/kotlin/org/hildan/krossbow/stomp/BaseStompSession.kt index 9b521415e..48bd9e5c3 100644 --- a/krossbow-stomp-core/src/commonMain/kotlin/org/hildan/krossbow/stomp/BaseStompSession.kt +++ b/krossbow-stomp-core/src/commonMain/kotlin/org/hildan/krossbow/stomp/BaseStompSession.kt @@ -154,7 +154,7 @@ internal class BaseStompSession( .dematerializeErrorsAndCompletion() .filterIsInstance() .firstOrNull { it.headers.receiptId == receiptId } - ?: error("Frames flow closed unexpectedly while waiting for RECEIPT frame with id='$receiptId'") + ?: throw SessionDisconnectedException("The STOMP frames flow completed unexpectedly while waiting for RECEIPT frame with id='$receiptId'") } ?: throw LostReceiptException(receiptId, frame.receiptTimeout, frame) } diff --git a/krossbow-stomp-core/src/commonMain/kotlin/org/hildan/krossbow/stomp/StompExceptions.kt b/krossbow-stomp-core/src/commonMain/kotlin/org/hildan/krossbow/stomp/StompExceptions.kt index 0fab5c760..42c20b8e0 100644 --- a/krossbow-stomp-core/src/commonMain/kotlin/org/hildan/krossbow/stomp/StompExceptions.kt +++ b/krossbow-stomp-core/src/commonMain/kotlin/org/hildan/krossbow/stomp/StompExceptions.kt @@ -35,3 +35,8 @@ class WebSocketClosedUnexpectedly( val code: Int, val reason: String?, ) : Exception("the WebSocket was closed while subscriptions were still active. Code: $code Reason: $reason") + +/** + * An exception thrown when the STOMP frames flow completed while some consumer was expecting more frames. + */ +class SessionDisconnectedException(message: String) : Exception(message) diff --git a/krossbow-stomp-core/src/commonTest/kotlin/org/hildan/krossbow/stomp/StompSessionReceiptTests.kt b/krossbow-stomp-core/src/commonTest/kotlin/org/hildan/krossbow/stomp/StompSessionReceiptTests.kt index 492a52f80..4b84f2645 100644 --- a/krossbow-stomp-core/src/commonTest/kotlin/org/hildan/krossbow/stomp/StompSessionReceiptTests.kt +++ b/krossbow-stomp-core/src/commonTest/kotlin/org/hildan/krossbow/stomp/StompSessionReceiptTests.kt @@ -21,6 +21,7 @@ import kotlin.test.assertNull import kotlin.test.assertTrue import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.minutes private val TEST_RECEIPT_TIMEOUT: Duration = 500.milliseconds @@ -121,6 +122,26 @@ class StompSessionReceiptTests { assertEquals("because why not", exception.reason) } + @Test + fun send_autoReceipt_failsOnSessionDisconnected() = runTest { + val (wsSession, stompSession) = connectWithMocks { + autoReceipt = true + receiptTimeout = 5.minutes + } + launch { + wsSession.awaitSendFrameAndSimulateCompletion() + // we simulate a disconnection between the SEND and the RECEIPT + launch { + wsSession.awaitDisconnectFrameAndSimulateCompletion() + wsSession.expectClose() + } + stompSession.disconnect() + } + assertFailsWith(SessionDisconnectedException::class) { + stompSession.sendEmptyMsg("/destination") + } + } + @Test fun send_manualReceipt_waitsForCorrectReceipt() = runTest { val (wsSession, stompSession) = connectWithMocks()