Skip to content

Commit

Permalink
Add handler for case with unexpected server disconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
ZaltsmanNikita committed Aug 11, 2020
1 parent f6469be commit e69cd05
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,12 @@ class OkHttpStompMainChannel(
}

override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
listener.onClosing(this@OkHttpStompMainChannel)
if (WebSocketCode.isUnexpectedClose(code)) {
listener.onFailed(this@OkHttpStompMainChannel, true, null)
this@OkHttpStompMainChannel.connection = null
} else {
listener.onClosing(this@OkHttpStompMainChannel)
}
}

override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.tinder.scarlet.stomp.okhttp.client

enum class WebSocketCode(val code: Int, val reason: String? = null) {

CLOSE_NORMAL(1000, "Normal closure"),
CLOSE_GOING_AWAY(1001, "Unexpected closure from the Server"),
CLOSED_NO_STATUS(1005, "Expected close status, received none"), ;

companion object {

/**
* @return true is this unexpected situation error code
*/
fun isUnexpectedClose(code: Int): Boolean {
return code == CLOSE_GOING_AWAY.code || code == CLOSED_NO_STATUS.code
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ class WebSocketConnection(
private val messageEncoder = StompMessageEncoder()
private val messageDecoder = StompMessageDecoder()

companion object {

private const val NORMAL_CLOSURE_STATUS_CODE = 1000
private const val NORMAL_CLOSURE_REASON = "Normal closure"
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -86,7 +80,7 @@ class WebSocketConnection(
* {@inheritDoc}
*/
override fun close() {
webSocket.close(NORMAL_CLOSURE_STATUS_CODE, NORMAL_CLOSURE_REASON)
webSocket.close(WebSocketCode.CLOSE_NORMAL.code, WebSocketCode.CLOSE_NORMAL.reason)
executor.shutdown()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,15 @@ import com.tinder.scarlet.ws.Send
import io.reactivex.observers.BaseTestConsumer.TestWaitStrategy.SLEEP_100MS
import org.apache.activemq.command.ActiveMQDestination
import org.apache.activemq.junit.EmbeddedActiveMQBroker
import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import java.util.logging.Logger

class OkHttpStompIntegrationTest {

@get:Rule
val broker = object : EmbeddedActiveMQBroker() {
override fun configure() {
val destination = ActiveMQDestination.createDestination(SERVER_DESTINATION, 0)
brokerService.destinations = arrayOf(destination)
brokerService.addConnector(BROKER_URL)
}
}
private var broker: TestEmbeddedActiveMQBroker = TestEmbeddedActiveMQBroker()

@get:Rule
val firstConnection = OkHttpStompWebSocketConnection.create<StompOkHttpQueueTestService>(
Expand All @@ -48,6 +43,16 @@ class OkHttpStompIntegrationTest {
)
)

@Before
fun setUp() {
startServer()
}

@After
fun tearDown() {
stopServer()
}

@Test
fun `correct receive and send messages`() {
val queueTextObserver = secondConnection.client.observeText().test()
Expand All @@ -65,6 +70,54 @@ class OkHttpStompIntegrationTest {
secondConnection.clientClosure()
}

@Test
fun `reconnect test`() {
val queueTextObserver = secondConnection.client.observeText().test()

firstConnection.open()
secondConnection.open()

restartServer()

firstConnection.clientProtocolEventObserver.awaitCountAndCheck(3) // Open -> Close -> Open
secondConnection.clientProtocolEventObserver.awaitCountAndCheck(3) // Open -> Close -> Open

for (index in 0 until 9) {
firstConnection.client.sendText("message $index")
}

firstConnection.clientClosure()

LOGGER.info("${queueTextObserver.values}")
queueTextObserver.awaitCountAndCheck(9, SLEEP_100MS)
secondConnection.clientClosure()
}

private fun startServer() {
broker = TestEmbeddedActiveMQBroker()
broker.start()
}

private fun stopServer() {
broker.stop()
}

private fun restartServer() {
broker.stop()
broker = TestEmbeddedActiveMQBroker()
broker.start()
}

private class TestEmbeddedActiveMQBroker : EmbeddedActiveMQBroker() {

override fun configure() {
val destination = ActiveMQDestination.createDestination(SERVER_DESTINATION, 0)
brokerService.destinations = arrayOf(destination)
brokerService.addConnector(BROKER_URL)
brokerService.isPersistent = false
}
}

companion object {
private val LOGGER = Logger.getLogger(OkHttpStompIntegrationTest::class.java.name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,6 @@ class OkHttpStompWebSocketConnection<SERVICE : Any>(
lateinit var clientProtocolEventObserver: TestStreamObserver<ProtocolEvent>

override fun before() {
createClientAndServer()
}

override fun after() {
clientLifecycleRegistry.onNext(LifecycleState.Completed)
}

private fun createClientAndServer() {
client = createClient()
clientProtocolEventObserver = client.observeProtocolEvent().test()
client.observeProtocolEvent().start(object : Stream.Observer<ProtocolEvent> {
Expand All @@ -101,6 +93,10 @@ class OkHttpStompWebSocketConnection<SERVICE : Any>(
})
}

override fun after() {
clientLifecycleRegistry.onNext(LifecycleState.Completed)
}

private fun createClient(): SERVICE {
val protocol = OkHttpStompClient(
configuration = OkHttpStompMainChannel.Configuration(
Expand Down

0 comments on commit e69cd05

Please sign in to comment.