Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.2.x: Okhttp stomp protocol implementation #127

Open
wants to merge 32 commits into
base: 0.2.x
Choose a base branch
from

Conversation

Hukumister
Copy link

@Hukumister Hukumister commented Jan 10, 2020

I added an implementation of the stomp protocol, which works over okhttp, because the gaziro based implementation does not support working with this protocol on top of websocket.

@CLAassistant
Copy link

CLAassistant commented Mar 7, 2020

CLA assistant check
All committers have signed the CLA.

@Hukumister Hukumister changed the title Stomp Stomp protocol implementation Mar 7, 2020
@Hukumister Hukumister changed the title Stomp protocol implementation Okhttp stomp protocol implementation Mar 7, 2020
@Hukumister Hukumister marked this pull request as ready for review March 14, 2020 10:39
@Hukumister Hukumister requested a review from zhxnlai March 26, 2020 11:04
@miguelhrocha
Copy link

Hi everyone! Is there any estimate when would be merged @zhxnlai? This would help me a lot and solve all the problems in my life :)

@Hukumister
Copy link
Author

@miguelhrocha Hi, I think you can just use these changes and create your own module in your project before the maintainer accepts it. It's going to be faster.

Refactoring packages

Refactoring and add documentation

update stomp message encoder

refactoring and update code style

add method for sent byte array to stomp sender

make ClientOpenRequestHeaderFactory typealias as private

fix code style
refactoring stomp header accessor

refactoring

Change unknown stomp command type to heartbeat
Make method handle result as nullable

add java doc to IdGenerator

Refactoring stomp message decoder and add tests

add copyright

refactoring packages

remove extra

update test deps

fix crash

fix bug with sending bytes
rename test method

add unsubscribeAll method

add to string implementation for stomp header and messages

add awaitCountAtLeast method

move okhttp stomp integration test to another module

remove extra line in test method

Incapsulate the logic of calculating "whether to send a heartbeat settings to server"

Add field shouldRetryAfterError

Add setting shouldRetryAfterError to configuration

Refactoring OkHttpStompMainChannel

Add method disconnect with disposing logic
Change InnerWebSocketListener class modifier to private

Change string regex to use char

Add overloading method "of" in StompHeaderAccessor
Change queue to use topic
Add overloading. method awaitCountAndCheck with sleep strategy
Update StompIntegrationTest

Add message to require

Fix code style

Rewrite stomp message decoder

* Rewrite stomp message decoder using okio buffer instead of java buffer
* Update okio version

Add method awaitCountAndCheck with wait strategy

Change using system time millis to use nanoTime

* change using system time millis to use nanoTime
* add test for websocket connection
* fix code style

Remove extra method

Remove extra comment
@miguelhrocha
Copy link

Hey @HaronCode thanks for the advice! I did exactly what you proposed, and now I am able to connect successfully to my Spring Boot backend :)

I can successfully send messages, but I am not able to receive anything broadcasted from the backend. Were you able to test the integration of a subscription towards a topic? I seriously can't see what's wrong with my code, looking that I can subscribe without any problems to the StateTransition events.

This is my service for example:

interface BroadcastTopic {

    @Receive
    fun receiveBroadcast(): Flowable<String>

    @Receive
    fun observeProtocolEvent(): Flowable<ProtocolEvent>

}

I have the following configuration:

        val stompProtocol = OkHttpStompClient(
            configuration = OkHttpStompMainChannel.Configuration(host = "ws://10.0.2.2:8080/ws/websocket"),
            okHttpClient = okHttpClient(),
            requestFactory = {
                OkHttpStompClient.ClientOpenRequest(
                    login = "guest",
                    passcode = "guest",
                    okHttpRequest = okHttpRequest()
                )
            }
        )

        configuration = Scarlet.Configuration(
            lifecycle = AndroidLifecycle.ofLifecycleServiceStarted(application, this),
            messageAdapterFactories = listOf(JacksonMessageAdapter.Factory()),
            streamAdapterFactories = listOf(RxJava2StreamAdapterFactory()),
            backoffStrategy = ExponentialWithJitterBackoffStrategy(1000, 5000),
            debug = true
        )

and this is the instantiation of the Topic

broadcastTopic = scarlet!!
            .child(OkHttpStompDestination("/topic/broadcast"), configuration!!)
            .create()

I am managing everything with a CompositeDisposable

Any help will be super appreciate it :)

@Hukumister
Copy link
Author

@miguelhrocha Don't you forget about "ApplicationDestinationPrefixes" in Spring Boot backend? And try to check the destination without prefix "/topic". I don't see any mistakes in your client code, I guess you should figure out backend settings such as broker destination prefix and application destination prefix.

@miguelhrocha
Copy link

Hey @HaronCode I really appreciate all your help in this, you've been an incredible help. I think I found the issue but I think it's related to Scarlet's reflection capabilities.

So your STOMP implementation works flawlessly, I was always able to receive the message from the MainChannel. However, the problem was with the subscription of the the service that receives the string, the callback was never invoked for some reason:

@Receive
fun receiveBroadcast(): Flowable<String>

But I tried to do this and it works perfect, no issues at all:

val broadcastEvent = broadcastTopic!!.observeProtocolEvent()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .filter { it is ProtocolEvent.OnMessageReceived }
            .subscribe {
                val payload = it as ProtocolEvent.OnMessageReceived
                Log.i("BROADCASTEVENT", it.toString())
                toast((payload.message as Message.Text).value)
            }

Do you have any insight on why this happened? Feel free to answer that this is out of the scope of your PR, which completely is :) and I will create an issue

@miguelhrocha
Copy link

I also have another question @HaronCode once again, thanks for all your help. Have you been able to reconnect the client when the Server stops the connection? I have been trying to push a State.STARTED manually but that doesn't do anything

@miguelhrocha
Copy link

Hey @HaronCode me again :P this is the last time I'll bother you I think. In order to kickstart a reconnection in case of an unexpected closure from the Server I had to modify your WebSocketListener with this:

override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
            if (code == 1001) {
                listener.onFailed(this@OkHttpStompMainChannel, true, null)
                [email protected] = null
            } else {
                listener.onClosing(this@OkHttpStompMainChannel)
            }
        }

You could add it if you see it as valuable, but I am pretty sure there most be a fancier way to achieve this :) Sadly, modifying the LifecycleState didn't work for me because if the callback of onClosing starts, then the sideEffect in the StateTransition is null instead of a RetryAttempt

@veeva-mark
Copy link

any update on this? I need a stomp client library

@Hukumister
Copy link
Author

Hukumister commented Aug 11, 2020

@miguelhrocha Thank you for all your comments! I really appreciate it. I improved my implementation, I added the handler for the case with unexpected server disconnect and added integration test for it. If you use my changes can you give me some feedback, maybe you have more ideas to improve this implementation?

@francescosalamone
Copy link

Hi everyone! @zhxnlai Any news about merge this changes?

@francescosalamone
Copy link

francescosalamone commented Oct 26, 2020

Hi @HaronCode, thank you for your stomp implementation.
I'm trying to use it in my application, and I can connect to my local server. After the connection is established seems that the device continue to send the connection message, so it fails. What I want to achieve is that after connection the device subscribes to a channel, waiting for a message from the server. Here is my code

    `val clientOpenResult = OkHttpStompClient.ClientOpenRequest(
        login = LOGIN,
        passcode = PASSWORD,
        okHttpRequest = Request.Builder().apply {
            url(URL)
        }.build()
    )

    val stompProtocol = OkHttpStompClient(
        configuration = OkHttpStompMainChannel.Configuration(
            host = "URL",
            heartbeatReceiveInterval = 100,
            heartbeatSendInterval = 100
        ),
        okHttpClient = client,
        requestFactory = {
            clientOpenResult
        }
    )

    val channelFactory = stompProtocol.createChannelFactory().create(connectionListener, null)
    channelFactory?.open(clientOpenResult)

    val stompDestination = OkHttpStompDestination(destination = "jms.topic.chat")
    val destinationChannelFactory = stompDestination.createChannelFactory().create(connectionListener, channelFactory)
    destinationChannelFactory?.createMessageQueue(messageListener)
    destinationChannelFactory?.open(clientOpenResult)`

Thanks in advance.

@francescosalamone
Copy link

Hey @HaronCode I really appreciate all your help in this, you've been an incredible help. I think I found the issue but I think it's related to Scarlet's reflection capabilities.

So your STOMP implementation works flawlessly, I was always able to receive the message from the MainChannel. However, the problem was with the subscription of the the service that receives the string, the callback was never invoked for some reason:

@Receive
fun receiveBroadcast(): Flowable<String>

But I tried to do this and it works perfect, no issues at all:

val broadcastEvent = broadcastTopic!!.observeProtocolEvent()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .filter { it is ProtocolEvent.OnMessageReceived }
            .subscribe {
                val payload = it as ProtocolEvent.OnMessageReceived
                Log.i("BROADCASTEVENT", it.toString())
                toast((payload.message as Message.Text).value)
            }

Do you have any insight on why this happened? Feel free to answer that this is out of the scope of your PR, which completely is :) and I will create an issue

Hi @miguelhrocha did you figure out with this problem? I'm also able to receive messages only using ProtocolEvent.

@aaronweihe aaronweihe changed the title Okhttp stomp protocol implementation 0.2.x: Okhttp stomp protocol implementation Nov 3, 2020
@francescosalamone
Copy link

francescosalamone commented Nov 6, 2020

Hi @HaronCode, sorry for spam. I'm finally able to send and receive messages from the websocket.
I'm currently looking for a way to manually unsubscribe from a topic, do you think that is possible to add an Annotation like @Send or @Receive, so that we can use syntax like myTopic.sendCommand(StompCommand.UNSUBSCRIBE)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants