Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to akka26 and scala2.13. #197

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ jdk:
services:
- rabbitmq

script: sbt '+ test'

sudo: false

cache:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,98 +1,99 @@
package com.spingo.op_rabbit
package stream
package com.spingo.op_rabbit.stream

import akka.actor.{ActorRef,Props}
import com.spingo.op_rabbit._
import com.spingo.op_rabbit.Message._
import akka.stream.stage.GraphStage
import akka.actor.{ActorRef, Props}
import akka.actor.FSM
import akka.pattern.ask
import akka.stream.scaladsl.Sink
import akka.stream.actor._
import scala.concurrent.{Future, Promise}
import scala.concurrent.duration._
import com.timcharper.acked.AckedSink
import scala.util.{Try,Success,Failure}
import scala.util.{Try, Success,Failure}
import akka.stream._
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.GraphStageWithMaterializedValue
import akka.stream.stage.InHandler
import akka.stream.scaladsl.Flow
import akka.util.Timeout

private [stream] object MessagePublisherSinkActor {
sealed trait State
case object Running extends State
case object Stopping extends State
case object AllDoneFuturePlease
}

private class MessagePublisherSinkActor(rabbitControl: ActorRef, timeoutAfter: FiniteDuration, qos: Int) extends ActorSubscriber with FSM[MessagePublisherSinkActor.State, Unit] {
import ActorSubscriberMessage._
import MessagePublisherSinkActor._
private class MessagePublisherSink(rabbitControl: ActorRef, timeoutAfter: FiniteDuration, qos: Int) extends GraphStageWithMaterializedValue[SinkShape[(Promise[Unit],Message)], Future[Unit]] {
val in = Inlet[(Promise[Unit],Message)]("MessagePublisherSink.in")

private val queue = scala.collection.mutable.Map.empty[Long, Promise[Unit]]
private val completed = Promise[Unit]
val shape = SinkShape.of(in)

startWith(Running, ())
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Unit]) = {
val completed = Promise[Unit]()

override val requestStrategy = new MaxInFlightRequestStrategy(max = qos) {
override def inFlightInternally: Int = queue.size
}
val logic = new GraphStageLogic(shape) {
private val queue = scala.collection.mutable.Map.empty[Long, Promise[Unit]]

override def postRestart(reason: Throwable): Unit = {
stopWith(Failure(reason))
super.postRestart(reason)
}
// callback to schedule the rabbitControl responses into the stage
private val futureCallback = getAsyncCallback[Try[Message.ConfirmResponse]]({
case Success(Message.Ack(id)) =>
queue.remove(id).get.success(())
pullIfNeeded()

private def stopWith(reason: Try[Unit]): Unit = {
context stop self
completed.tryComplete(reason)
}
case Success(Message.Nack(id)) =>
queue.remove(id).get.failure(new MessageNacked(id))
pullIfNeeded()

when(Running) {
case Event(response: Message.ConfirmResponse, _) =>
handleResponse(response)
stay
case Success(Message.Fail(id, exception: Throwable)) =>
queue.remove(id).get.failure(exception)
pullIfNeeded()

case Event(OnError(e), _) =>
completed.tryFailure(e)
goto(Stopping)
case Failure(exception) =>
// currently fails the stream - maybe better just fail the message - needs additional context
fail(exception)
})

case Event(OnComplete, _) =>
goto(Stopping)
}
override def preStart(): Unit = {
// we must ensure we can acknowledge messages even on stream complete
setKeepGoing(true)
pull(in)
}

setHandler(in, new InHandler {
override def onPush(): Unit = {
val (promise, msg) = grab(in)
queue(msg.id) = promise

when(Stopping) {
case Event(response: Message.ConfirmResponse, _) =>
handleResponse(response)
if(queue.isEmpty)
stop
else
stay
}
val eventualResult = rabbitControl.ask(msg)(Timeout(timeoutAfter)).mapTo[ConfirmResponse]

whenUnhandled {
case Event(OnNext((p: Promise[Unit] @unchecked, msg: Message)), _) =>
queue(msg.id) = p
rabbitControl ! msg
stay
// TODO: which EC to schedule the callback onto?
eventualResult.onComplete(futureCallback.invoke)(materializer.executionContext)

case Event(MessagePublisherSinkActor.AllDoneFuturePlease,_) =>
sender ! completed.future
stay
}
pullIfNeeded()
}

onTransition {
case Running -> Stopping if queue.isEmpty =>
stopWith(Success(()))
}
override def onUpstreamFinish(): Unit = {
if (queue.isEmpty) complete()
}

onTermination {
case e: StopEvent =>
stopWith(Success(()))
}
override def onUpstreamFailure(ex: Throwable): Unit = {
fail(ex)
}
})

private def pullIfNeeded(): Unit = {
if (isClosed(in) && queue.isEmpty) complete()
else if (queue.size < qos && !hasBeenPulled(in)) tryPull(in)
}

private val handleResponse: Message.ConfirmResponse => Unit = {
case Message.Ack(id) =>
queue.remove(id).get.success(())
private def complete(): Unit = {
completed.success(())
completeStage()
}

case Message.Nack(id) =>
queue.remove(id).get.failure(new MessageNacked(id))
private def fail(ex: Throwable): Unit = {
completed.failure(ex)
failStage(ex)
}
}

case Message.Fail(id, exception: Throwable) =>
queue.remove(id).get.failure(exception)
(logic, completed.future)
}
}

Expand Down Expand Up @@ -129,13 +130,7 @@ object MessagePublisherSink {
@param rabbitControl An actor
@param timeoutAfter The duration for which we'll wait for a message to be acked; note, timeouts and non-acknowledged messages will cause the upstream elements to fail. The sink will not throw an exception.
*/
def apply(rabbitControl: ActorRef, timeoutAfter: FiniteDuration = 30 seconds, qos: Int = 8): AckedSink[Message, Future[Unit]] = AckedSink {
Sink.actorSubscriber[(Promise[Unit], Message)](Props(new MessagePublisherSinkActor(rabbitControl, timeoutAfter, qos))).
mapMaterializedValue { subscriber =>
implicit val akkaTimeout = akka.util.Timeout(timeoutAfter)
implicit val ec = SameThreadExecutionContext

(subscriber ? MessagePublisherSinkActor.AllDoneFuturePlease).mapTo[Future[Unit]].flatMap(identity)
}
def apply(rabbitControl: ActorRef, timeoutAfter: FiniteDuration = 30.seconds, qos: Int = 8): AckedSink[Message, Future[Unit]] = AckedSink {
new MessagePublisherSink(rabbitControl, timeoutAfter, qos)
}
}
15 changes: 10 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import java.util.Properties

val json4sVersion = "3.6.6"
val circeVersion = "0.12.3"
val akkaVersion = "2.5.25"
val playVersion = "2.7.4"
val akkaVersion = "2.6.6"
val playVersion = "2.9.0"

val appProperties = {
val prop = new Properties()
Expand Down Expand Up @@ -120,7 +120,7 @@ lazy val upickle = (project in file("./addons/upickle")).
libraryDependencies += "com.lihaoyi" %% "upickle" % (
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, 11)) => "0.7.4"
case _ => "0.8.0"
case _ => "1.2.2"
}
)).
dependsOn(core)
Expand All @@ -136,9 +136,14 @@ lazy val `akka-stream` = (project in file("./addons/akka-stream")).
settings(commonSettings: _*).
settings(
name := "op-rabbit-akka-stream",
// Temporarily depend on jitpack published version of acked-streams for scala 2.13
resolvers += "jitpack" at "https://jitpack.io",
libraryDependencies ++= Seq(
"com.timcharper" %% "acked-streams" % "2.1.1",
"com.typesafe.akka" %% "akka-stream" % akkaVersion),
// TODO: remove and switch to com.timcharper when https://github.com/timcharper/acked-stream/pull/7 gets merged and published
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timcharper/acked-stream#7 has been merged, just need to wait for a release.

// "com.timcharper" %% "acked-streams" % "2.1.1",
"com.github.deal-engine.acked-stream" %% "acked-streams" % "8f17c92",
"com.typesafe.akka" %% "akka-stream" % akkaVersion
),
unmanagedResourceDirectories in Test ++= Seq(
file(".").getAbsoluteFile / "core" / "src" / "test" / "resources"),
unmanagedSourceDirectories in Test ++= Seq(
Expand Down
2 changes: 2 additions & 0 deletions jitpack.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
install:
- sbt '+ publishM2'