Skip to content
This repository has been archived by the owner on Jan 12, 2022. It is now read-only.

Commit

Permalink
see #18: upgrade to akka 2.2
Browse files Browse the repository at this point in the history
  • Loading branch information
sstone committed Nov 17, 2013
1 parent 47b45c1 commit c43ac7b
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 36 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ are synced to Maven Central

* version 1.1 (master branch) is compatible with Scala 2.9.2 and Akka 2.0.5
* version 1.1 (scala2.10 branch) is compatible with Scala 2.10 and Akka 2.1.0
* version 1.3 (scala2.10 branch) is compatible with Scala 2.10.1 and Akka 2.1.2
* version 1.3 (scala2.10 branch) is compatible with Scala 2.10.X and Akka 2.1.X
* version 1.4 (scala2.10 branch) targets Scala 2.10.X and Akka 2.2.X

## Calculator demo

Expand Down
16 changes: 8 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
<url>https://github.com/sstone/akka-amqp-proxies</url>

<properties>
<maven.compiler.source>1.6</maven.compiler.source>
<maven.compiler.target>1.6</maven.compiler.target>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.10.1</scala.version>
<akka.version>2.1.2</akka.version>
<scala.version>2.10.3</scala.version>
<akka.version>2.2.3</akka.version>
</properties>

<licenses>
Expand Down Expand Up @@ -53,8 +53,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
<source>1.7</source>
<target>1.7</target>
</configuration>
<executions>
<execution>
Expand All @@ -81,7 +81,7 @@
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.3</version>
<version>3.1.6</version>
<executions>
<execution>
<id>process-resources</id>
Expand Down Expand Up @@ -134,7 +134,7 @@
<dependency>
<groupId>com.github.sstone</groupId>
<artifactId>amqp-client_2.10</artifactId>
<version>1.1</version>
<version>1.3-ML1</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
Expand Down
7 changes: 6 additions & 1 deletion src/main/scala/com/github.sstone/amqp/proxy/AmqpProxy.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.github.sstone.amqp.proxy

import akka.actor.{ActorLogging, Actor, ActorRef}
import akka.actor.{Props, ActorLogging, Actor, ActorRef}
import akka.serialization.Serializer
import akka.pattern.{ask, pipe}
import akka.util.Timeout
Expand Down Expand Up @@ -104,6 +104,11 @@ object AmqpProxy {
}
}

object ProxyClient {
def props(client: ActorRef, exchange: String, routingKey: String, serializer: Serializer, timeout: Timeout = 30 seconds, mandatory: Boolean = true, immediate: Boolean = false, deliveryMode: Int = 1): Props =
Props(new ProxyClient(client, exchange, routingKey, serializer, timeout, mandatory, immediate, deliveryMode))
}

/**
* standard one-request/one response proxy, which allows to write (myActor ? MyRequest).mapTo[MyResponse]
* @param client AMQP RPC Client
Expand Down
12 changes: 6 additions & 6 deletions src/main/scala/com/github.sstone/amqp/proxy/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ object Server {
val conn = system.actorOf(Props(new ConnectionOwner(connFactory)), name = "conn")
val exchange = ExchangeParameters(name = "amq.direct", exchangeType = "", passive = true)
val queue = QueueParameters(name = "calculator", passive = false, autodelete = true)
val channelParams = Some(ChannelParameters(qos = 1))
val channelParams = ChannelParameters(qos = 1)
// create an AMQP RPC server which consumes messages from queue "calculator" and passes
// them to our Calculator actor
val server = ConnectionOwner.createActor(
val server = ConnectionOwner.createChildActor(
conn,
Props(new RpcServer(queue, exchange, "calculator", new AmqpProxy.ProxyServer(calc), channelParams)),
2 second)
RpcServer.props(queue, exchange, "calculator", new AmqpProxy.ProxyServer(calc), channelParams),
name = Some("server"))

Amqp.waitForConnection(system, server).await()
}
Expand All @@ -69,10 +69,10 @@ object Client {
connFactory.setHost("localhost")
// create a "connection owner" actor, which will try and reconnect automatically if the connection ins lost
val conn = system.actorOf(Props(new ConnectionOwner(connFactory)), name = "conn")
val client = ConnectionOwner.createActor(conn, Props(new RpcClient()), 5 second)
val client = ConnectionOwner.createChildActor(conn, RpcClient.props())
Amqp.waitForConnection(system, client).await()
val proxy = system.actorOf(
Props(new AmqpProxy.ProxyClient(client, "amq.direct", "calculator", JsonSerializer)),
AmqpProxy.ProxyClient.props(client, "amq.direct", "calculator", JsonSerializer),
name = "proxy")
Client.compute(proxy)
}
Expand Down
15 changes: 7 additions & 8 deletions src/test/scala/com/github.sstone/amqp/proxy/ErrorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class ErrorTest extends TestKit(ActorSystem("TestSystem")) with WordSpec with Sh
val conn = system.actorOf(Props(new ConnectionOwner(connFactory)), name = "conn")
val exchange = ExchangeParameters(name = "amq.direct", exchangeType = "", passive = true)
val queue = QueueParameters(name = "error", passive = false, autodelete = true)
val channelParams = Some(ChannelParameters(qos = 1))
val channelParams = ChannelParameters(qos = 1)

case class ErrorRequest(foo: String)

Expand All @@ -36,15 +36,14 @@ class ErrorTest extends TestKit(ActorSystem("TestSystem")) with WordSpec with Sh
}))
// create an AMQP proxy server which consumes messages from the "error" queue and passes
// them to our nogood actor
val server = ConnectionOwner.createActor(
val server = ConnectionOwner.createChildActor(
conn,
Props(new RpcServer(queue, exchange, "error", new AmqpProxy.ProxyServer(nogood), channelParams)),
2 second)
RpcServer.props(queue, exchange, "error", new AmqpProxy.ProxyServer(nogood), channelParams))

// create an AMQP proxy client in front of the "error queue"
val client = ConnectionOwner.createActor(conn, Props(new RpcClient()), 5 second)
val client = ConnectionOwner.createChildActor(conn, RpcClient.props())
val proxy = system.actorOf(
Props(new AmqpProxy.ProxyClient(client, "amq.direct", "error", JsonSerializer)),
AmqpProxy.ProxyClient.props(client, "amq.direct", "error", JsonSerializer),
name = "proxy")

Amqp.waitForConnection(system, server).await()
Expand All @@ -56,8 +55,8 @@ class ErrorTest extends TestKit(ActorSystem("TestSystem")) with WordSpec with Sh
"handle client-side serialization errors" in {
val connFactory = new ConnectionFactory()
val conn = system.actorOf(Props(new ConnectionOwner(connFactory)))
val client = ConnectionOwner.createActor(conn, Props(new RpcClient()), 5 second)
val proxy = system.actorOf(Props(new AmqpProxy.ProxyClient(client, "amq.direct", "client_side_error", JsonSerializer)))
val client = ConnectionOwner.createChildActor(conn, RpcClient.props())
val proxy = system.actorOf(AmqpProxy.ProxyClient.props(client, "amq.direct", "client_side_error", JsonSerializer))

val badrequest = Map(1 -> 1) // lift-json will not serialize this, Map keys must be Strings
val thrown = evaluating(Await.result(proxy ? badrequest, 5 seconds)) should produce[AmqpProxyException]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class RemoteGpbCallTest extends TestKit(ActorSystem("TestSystem")) with WordSpec
val conn = system.actorOf(Props(new ConnectionOwner(connFactory)), name = "conn")
val exchange = ExchangeParameters(name = "amq.direct", exchangeType = "", passive = true)
val queue = QueueParameters(name = "calculator-gpb", passive = false, autodelete = true)
val channelParams = Some(ChannelParameters(qos = 1))
val channelParams = ChannelParameters(qos = 1)

// create a simple calculator actor
val calc = system.actorOf(Props(new Actor() {
Expand All @@ -34,15 +34,14 @@ class RemoteGpbCallTest extends TestKit(ActorSystem("TestSystem")) with WordSpec
}))
// create an AMQP proxy server which consumes messages from the "calculator" queue and passes
// them to our Calculator actor
val server = ConnectionOwner.createActor(
val server = ConnectionOwner.createChildActor(
conn,
Props(new RpcServer(queue, exchange, "calculator-gpb", new AmqpProxy.ProxyServer(calc), channelParams)),
2 second)
RpcServer.props(queue, exchange, "calculator-gpb", new AmqpProxy.ProxyServer(calc), channelParams))

// create an AMQP proxy client in front of the "calculator queue"
val client = ConnectionOwner.createActor(conn, Props(new RpcClient()), 5 second)
val client = ConnectionOwner.createChildActor(conn, RpcClient.props())
val proxy = system.actorOf(
Props(new AmqpProxy.ProxyClient(client, "amq.direct", "calculator-gpb", ProtobufSerializer)),
AmqpProxy.ProxyClient.props(client, "amq.direct", "calculator-gpb", ProtobufSerializer),
name = "proxy")

Amqp.waitForConnection(system, server).await()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class RemoteJsonCallTest extends TestKit(ActorSystem("TestSystem")) with WordSpe
val conn = system.actorOf(Props(new ConnectionOwner(connFactory)), name = "conn")
val exchange = ExchangeParameters(name = "amq.direct", exchangeType = "", passive = true)
val queue = QueueParameters(name = "calculator", passive = false, autodelete = true)
val channelParams = Some(ChannelParameters(qos = 1))
val channelParams = ChannelParameters(qos = 1)

case class AddRequest(x: Int, y: Int)
case class AddResponse(x: Int, y: Int, sum: Int)
Expand All @@ -36,15 +36,14 @@ class RemoteJsonCallTest extends TestKit(ActorSystem("TestSystem")) with WordSpe
}))
// create an AMQP proxy server which consumes messages from the "calculator" queue and passes
// them to our Calculator actor
val server = ConnectionOwner.createActor(
val server = ConnectionOwner.createChildActor(
conn,
Props(new RpcServer(queue, exchange, "calculator", new AmqpProxy.ProxyServer(calc), channelParams)),
2 second)
RpcServer.props(queue, exchange, "calculator", new AmqpProxy.ProxyServer(calc), channelParams))

// create an AMQP proxy client in front of the "calculator queue"
val client = ConnectionOwner.createActor(conn, Props(new RpcClient()), 5 second)
val client = ConnectionOwner.createChildActor(conn, RpcClient.props())
val proxy = system.actorOf(
Props(new AmqpProxy.ProxyClient(client, "amq.direct", "calculator", JsonSerializer)),
AmqpProxy.ProxyClient.props(client, "amq.direct", "calculator", JsonSerializer),
name = "proxy")

Amqp.waitForConnection(system, server).await()
Expand Down

0 comments on commit c43ac7b

Please sign in to comment.