making it more easy to start string and protobuf base consumers, producers and rpc style

This commit is contained in:
momania 2010-08-12 16:16:18 +02:00
parent b96f957dea
commit 505c70d116
20 changed files with 561 additions and 141 deletions

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.amqp
@ -8,7 +8,8 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.config.OneForOneStrategy
import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory}
import java.lang.IllegalArgumentException
import com.rabbitmq.client.AMQP.BasicProperties
import java.lang.{String, IllegalArgumentException}
/**
* AMQP Actor API. Implements Connection, Producer and Consumer materialized as Actors.
@ -19,43 +20,43 @@ import java.lang.IllegalArgumentException
*/
object AMQP {
case class ConnectionParameters(
host: String = ConnectionFactory.DEFAULT_HOST,
port: Int = ConnectionFactory.DEFAULT_AMQP_PORT,
username: String = ConnectionFactory.DEFAULT_USER,
password: String = ConnectionFactory.DEFAULT_PASS,
virtualHost: String = ConnectionFactory.DEFAULT_VHOST,
initReconnectDelay: Long = 5000,
connectionCallback: Option[ActorRef] = None)
host: String = ConnectionFactory.DEFAULT_HOST,
port: Int = ConnectionFactory.DEFAULT_AMQP_PORT,
username: String = ConnectionFactory.DEFAULT_USER,
password: String = ConnectionFactory.DEFAULT_PASS,
virtualHost: String = ConnectionFactory.DEFAULT_VHOST,
initReconnectDelay: Long = 5000,
connectionCallback: Option[ActorRef] = None)
case class ChannelParameters(
shutdownListener: Option[ShutdownListener] = None,
channelCallback: Option[ActorRef] = None)
shutdownListener: Option[ShutdownListener] = None,
channelCallback: Option[ActorRef] = None)
case class ExchangeParameters(
exchangeName: String,
exchangeType: ExchangeType,
exchangeDurable: Boolean = false,
exchangeAutoDelete: Boolean = true,
exchangePassive: Boolean = false,
configurationArguments: Map[String, AnyRef] = Map())
exchangeName: String,
exchangeType: ExchangeType,
exchangeDurable: Boolean = false,
exchangeAutoDelete: Boolean = true,
exchangePassive: Boolean = false,
configurationArguments: Map[String, AnyRef] = Map())
case class ProducerParameters(
exchangeParameters: ExchangeParameters,
producerId: Option[String] = None,
returnListener: Option[ReturnListener] = None,
channelParameters: Option[ChannelParameters] = None)
exchangeParameters: ExchangeParameters,
producerId: Option[String] = None,
returnListener: Option[ReturnListener] = None,
channelParameters: Option[ChannelParameters] = None)
case class ConsumerParameters(
exchangeParameters: ExchangeParameters,
routingKey: String,
deliveryHandler: ActorRef,
queueName: Option[String] = None,
queueDurable: Boolean = false,
queueAutoDelete: Boolean = true,
queuePassive: Boolean = false,
queueExclusive: Boolean = false,
selfAcknowledging: Boolean = true,
channelParameters: Option[ChannelParameters] = None) {
exchangeParameters: ExchangeParameters,
routingKey: String,
deliveryHandler: ActorRef,
queueName: Option[String] = None,
queueDurable: Boolean = false,
queueAutoDelete: Boolean = true,
queuePassive: Boolean = false,
queueExclusive: Boolean = false,
selfAcknowledging: Boolean = true,
channelParameters: Option[ChannelParameters] = None) {
if (queueDurable && queueName.isEmpty) {
throw new IllegalArgumentException("A queue name is required when requesting a durable queue.")
}
@ -84,6 +85,102 @@ object AMQP {
consumer
}
/**
* Convenience
*/
class ProducerClient[O](client: ActorRef, routingKey: String, toBinary: ToBinary[O]) {
def send(request: O, replyTo: Option[String] = None) = {
val basicProperties = new BasicProperties
basicProperties.setReplyTo(replyTo.getOrElse(null))
client ! Message(toBinary.toBinary(request), routingKey, false, false, Some(basicProperties))
}
def stop = client.stop
}
def newStringProducer(connection: ActorRef,
exchange: String,
routingKey: Option[String] = None,
producerId: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true,
passive: Boolean = true): ProducerClient[String] = {
val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic,
exchangeDurable = durable, exchangeAutoDelete = autoDelete)
val rKey = routingKey.getOrElse("%s.request".format(exchange))
val producerRef = newProducer(connection, ProducerParameters(exchangeParameters, producerId))
val toBinary = new ToBinary[String] {
def toBinary(t: String) = t.getBytes
}
new ProducerClient(producerRef, rKey, toBinary)
}
def newStringConsumer(connection: ActorRef,
exchange: String,
handler: String => Unit,
routingKey: Option[String] = None,
queueName: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true): ActorRef = {
val deliveryHandler = actor {
case Delivery(payload, _, _, _, _) => handler.apply(new String(payload))
}
val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic,
exchangeDurable = durable, exchangeAutoDelete = autoDelete)
val rKey = routingKey.getOrElse("%s.request".format(exchange))
val qName = queueName.getOrElse("%s.in".format(rKey))
newConsumer(connection, ConsumerParameters(exchangeParameters, rKey, deliveryHandler, Some(qName), durable, autoDelete))
}
def newProtobufProducer[O <: com.google.protobuf.Message](connection: ActorRef,
exchange: String,
routingKey: Option[String] = None,
producerId: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true,
passive: Boolean = true): ProducerClient[O] = {
val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic,
exchangeDurable = durable, exchangeAutoDelete = autoDelete)
val rKey = routingKey.getOrElse("%s.request".format(exchange))
val producerRef = newProducer(connection, ProducerParameters(exchangeParameters, producerId))
new ProducerClient(producerRef, rKey, new ToBinary[O] {
def toBinary(t: O) = t.toByteArray
})
}
def newProtobufConsumer[I <: com.google.protobuf.Message](connection: ActorRef,
exchange: String,
handler: I => Unit,
routingKey: Option[String] = None,
queueName: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true)(implicit manifest: Manifest[I]): ActorRef = {
val deliveryHandler = actor {
case Delivery(payload, _, _, _, _) => {
handler.apply(createProtobufFromBytes[I](payload))
}
}
val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic,
exchangeDurable = durable, exchangeAutoDelete = autoDelete)
val rKey = routingKey.getOrElse("%s.request".format(exchange))
val qName = queueName.getOrElse("%s.in".format(rKey))
newConsumer(connection, ConsumerParameters(exchangeParameters, rKey, deliveryHandler, Some(qName), durable, autoDelete))
}
/**
* Main supervisor
*/
class AMQPSupervisorActor extends Actor {
import self._
@ -93,15 +190,29 @@ object AMQP {
def receive = {
case _ => {} // ignore all messages
}
override def shutdown = {
self.shutdownLinkedActors
}
}
private val supervisor = actorOf(new AMQPSupervisorActor).start
def shutdownAll = {
supervisor.stop
supervisor.shutdownLinkedActors
}
/**
* Serialization stuff
*/
trait FromBinary[T] {
def fromBinary(bytes: Array[Byte]): T
}
trait ToBinary[T] {
def toBinary(t: T): Array[Byte]
}
private val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]])
private[amqp] def createProtobufFromBytes[I <: com.google.protobuf.Message](bytes: Array[Byte])(implicit manifest: Manifest[I]): I = {
manifest.erasure.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[I]
}
}

View file

@ -5,45 +5,63 @@
package se.scalablesolutions.akka.amqp
import rpc.RPC
import rpc.RPC.{RpcClientSerializer, RpcServerSerializer, ToBinary, FromBinary}
import rpc.RPC.{RpcClientSerializer, RpcServerSerializer}
import se.scalablesolutions.akka.actor.{Actor, ActorRegistry}
import Actor._
import java.util.concurrent.{CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.amqp.AMQP._
import java.lang.String
import se.scalablesolutions.akka.amqp.AMQP._
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol
object ExampleSession {
def main(args: Array[String]) = {
println("==== DIRECT ===")
printTopic("DIRECT")
direct
TimeUnit.SECONDS.sleep(2)
println("==== FANOUT ===")
printTopic("FANOUT")
fanout
TimeUnit.SECONDS.sleep(2)
println("==== TOPIC ===")
printTopic("TOPIC")
topic
TimeUnit.SECONDS.sleep(2)
println("==== CALLBACK ===")
printTopic("CALLBACK")
callback
TimeUnit.SECONDS.sleep(2)
printTopic("EASY STRING PRODUCER AND CONSUMER")
easyStringProducerConsumer
println("==== RPC ===")
printTopic("EASY PROTOBUF PRODUCER AND CONSUMER")
easyProtobufProducerConsumer
printTopic("RPC")
rpc
TimeUnit.SECONDS.sleep(2)
printTopic("EASY STRING RPC")
easyStringRpc
printTopic("EASY PROTOBUF RPC")
easyProtobufRpc
printTopic("Happy hAkking :-)")
// shutdown everything the amqp tree except the main AMQP supervisor
// all connections/consumers/producers will be stopped
AMQP.shutdownAll
ActorRegistry.shutdownAll
System.exit(0)
}
def printTopic(topic: String) {
println("")
println("==== " + topic + " ===")
println("")
TimeUnit.SECONDS.sleep(2)
}
def direct = {
// defaults to amqp://guest:guest@localhost:5672/
@ -117,7 +135,7 @@ object ExampleSession {
case Restarting => // not used, sent when channel or connection fails and initiates a restart
case Stopped => log.info("Channel callback: Stopped")
}
val exchangeParameters = ExchangeParameters("my_direct_exchange", ExchangeType.Direct)
val exchangeParameters = ExchangeParameters("my_callback_exchange", ExchangeType.Direct)
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
val consumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "callback.routing", actor {
@ -131,6 +149,40 @@ object ExampleSession {
connection.stop
}
def easyStringProducerConsumer = {
val connection = AMQP.newConnection()
val exchangeName = "easy.string"
// listen by default to:
// exchange = exchangeName
// routingKey = <exchange>.request
// queueName = <routingKey>.in
AMQP.newStringConsumer(connection, exchangeName, message => println("Received message: "+message))
// send by default to:
// exchange = exchangeName
// routingKey = <exchange>.request
val producer = AMQP.newStringProducer(connection, exchangeName)
producer.send("This shit is easy!")
}
def easyProtobufProducerConsumer = {
val connection = AMQP.newConnection()
val exchangeName = "easy.protobuf"
def protobufMessageHandler(message: AddressProtocol) = {
log.info("Received "+message)
}
AMQP.newProtobufConsumer(connection, exchangeName, protobufMessageHandler)
val producerClient = AMQP.newProtobufProducer[AddressProtocol](connection, exchangeName)
producerClient.send(AddressProtocol.newBuilder.setHostname("akkarocks.com").setPort(1234).build)
}
def rpc = {
val connection = AMQP.newConnection()
@ -166,4 +218,51 @@ object ExampleSession {
val response = (rpcClient !! "rpc_request")
log.info("Response: " + response)
}
def easyStringRpc = {
val connection = AMQP.newConnection()
val exchangeName = "easy.stringrpc"
// listen by default to:
// exchange = exchangeName
// routingKey = <exchange>.request
// queueName = <routingKey>.in
RPC.newStringRpcServer(connection, exchangeName, request => {
log.info("Got request: "+request)
"Response to: '"+request+"'"
})
// send by default to:
// exchange = exchangeName
// routingKey = <exchange>.request
val stringRpcClient = RPC.newStringRpcClient(connection, exchangeName)
val response = stringRpcClient.call("AMQP Rocks!")
log.info("Got response: "+response)
stringRpcClient.callAsync("AMQP is dead easy") {
case response => log.info("This is handled async: "+response)
}
}
def easyProtobufRpc = {
val connection = AMQP.newConnection()
val exchangeName = "easy.protobuf.rpc"
def protobufRequestHandler(request: AddressProtocol): AddressProtocol = {
AddressProtocol.newBuilder.setHostname(request.getHostname.reverse).setPort(request.getPort).build
}
RPC.newProtobufRpcServer(connection, exchangeName, protobufRequestHandler)
val stringRpcClient = RPC.newProtobufRpcClient[AddressProtocol, AddressProtocol](connection, exchangeName)
val response = stringRpcClient.call(AddressProtocol.newBuilder.setHostname("localhost").setPort(4321).build)
log.info("Got response: "+response)
}
}

View file

@ -26,26 +26,22 @@ object RPC {
serializer: RpcServerSerializer[I, O],
requestHandler: I => O,
queueName: Option[String] = None,
channelParameters: Option[ChannelParameters] = None) = {
channelParameters: Option[ChannelParameters] = None): RpcServerHandle = {
val producer = newProducer(connection, ProducerParameters(
ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters))
val rpcServer = actorOf(new RpcServerActor[I, O](producer, serializer, requestHandler))
val consumer = newConsumer(connection, ConsumerParameters(exchangeParameters, routingKey, rpcServer
, channelParameters = channelParameters
, selfAcknowledging = false
, queueName = queueName))
val consumer = newConsumer(connection, ConsumerParameters(exchangeParameters, routingKey, rpcServer,
channelParameters = channelParameters, selfAcknowledging = false, queueName = queueName))
RpcServerHandle(producer, consumer)
}
trait FromBinary[T] {
def fromBinary(bytes: Array[Byte]): T
case class RpcServerHandle(producer: ActorRef, consumer: ActorRef) {
def stop = {
consumer.stop
producer.stop
}
}
trait ToBinary[T] {
def toBinary(t: T): Array[Byte]
}
case class RpcClientSerializer[O, I](toBinary: ToBinary[O], fromBinary: FromBinary[I])
case class RpcServerSerializer[I, O](fromBinary: FromBinary[I], toBinary: ToBinary[O])
@ -65,15 +61,17 @@ object RPC {
responseHandler.apply(result)
}
}
def stop = client.stop
}
private val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]])
def startProtobufServer[I <: Message, O <: Message](
connection: ActorRef, exchange: String, requestHandler: I => O,
def newProtobufRpcServer[I <: Message, O <: Message](
connection: ActorRef,
exchange: String,
requestHandler: I => O,
routingKey: Option[String] = None,
queueName: Option[String] = None,
durable: Boolean = false, autoDelete: Boolean = true)(implicit manifest: Manifest[I]) = {
durable: Boolean = false,
autoDelete: Boolean = true)(implicit manifest: Manifest[I]): RpcServerHandle = {
val serializer = new RpcServerSerializer[I, O](
new FromBinary[I] {
@ -84,19 +82,15 @@ object RPC {
def toBinary(t: O) = t.toByteArray
})
val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic,
exchangeDurable = durable, exchangeAutoDelete = autoDelete)
val rKey = routingKey.getOrElse("%s.request".format(exchange))
val qName = queueName.getOrElse("%s.in".format(rKey))
newRpcServer[I, O](connection, exchangeParameters, rKey, serializer, requestHandler,
queueName = Some(qName))
startServer(connection, exchange, requestHandler, routingKey, queueName, durable, autoDelete, serializer)
}
def startProtobufClient[O <: Message, I <: Message](
connection: ActorRef, exchange: String,
def newProtobufRpcClient[O <: Message, I <: Message](
connection: ActorRef,
exchange: String,
routingKey: Option[String] = None,
durable: Boolean = false, autoDelete: Boolean = true,
durable: Boolean = false,
autoDelete: Boolean = true,
passive: Boolean = true)(implicit manifest: Manifest[I]): RpcClient[O, I] = {
@ -109,15 +103,80 @@ object RPC {
}
})
startClient(connection, exchange, routingKey, durable, autoDelete, passive, serializer)
}
def newStringRpcServer(connection: ActorRef,
exchange: String,
requestHandler: String => String,
routingKey: Option[String] = None,
queueName: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true): RpcServerHandle = {
val serializer = new RpcServerSerializer[String, String](
new FromBinary[String] {
def fromBinary(bytes: Array[Byte]): String = {
new String(bytes)
}
}, new ToBinary[String] {
def toBinary(t: String) = t.getBytes
})
startServer(connection, exchange, requestHandler, routingKey, queueName, durable, autoDelete, serializer)
}
def newStringRpcClient(connection: ActorRef,
exchange: String,
routingKey: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true,
passive: Boolean = true): RpcClient[String, String] = {
val serializer = new RpcClientSerializer[String, String](
new ToBinary[String] {
def toBinary(t: String) = t.getBytes
}, new FromBinary[String] {
def fromBinary(bytes: Array[Byte]): String = {
new String(bytes)
}
})
startClient(connection, exchange, routingKey, durable, autoDelete, passive, serializer)
}
private def startClient[O, I](connection: ActorRef,
exchange: String,
routingKey: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true,
passive: Boolean = true,
serializer: RpcClientSerializer[O, I]): RpcClient[O, I] = {
val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic,
exchangeDurable = durable, exchangeAutoDelete = autoDelete, exchangePassive = passive)
val rKey = routingKey.getOrElse("%s.request".format(exchange))
val client = newRpcClient[O, I](connection, exchangeParameters, rKey, serializer)
new RpcClient[O, I](client)
val client = newRpcClient(connection, exchangeParameters, rKey, serializer)
new RpcClient(client)
}
private def createProtobufFromBytes[I](bytes: Array[Byte])(implicit manifest: Manifest[I]): I = {
manifest.erasure.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[I]
private def startServer[I, O](connection: ActorRef,
exchange: String,
requestHandler: I => O,
routingKey: Option[String] = None,
queueName: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true,
serializer: RpcServerSerializer[I, O]): RpcServerHandle = {
val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic,
exchangeDurable = durable, exchangeAutoDelete = autoDelete)
val rKey = routingKey.getOrElse("%s.request".format(exchange))
val qName = queueName.getOrElse("%s.in".format(rKey))
newRpcServer(connection, exchangeParameters, rKey, serializer, requestHandler, queueName = Some(qName))
}
}
}

View file

@ -39,5 +39,11 @@ class RpcClientActor[I,O](
super.preRestart(reason)
}
override def shutdown = {
rpcClient.foreach(rpc => rpc.close)
super.shutdown
}
override def toString = "AMQP.RpcClient[exchange=" +exchangeName + ", routingKey=" + routingKey+ "]"
}

View file

@ -17,7 +17,7 @@ import org.junit.Test
class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers {
@Test
def connectionAndRecovery = if (AMQPTest.enabled) {
def connectionAndRecovery = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connectedLatch = new StandardLatch
val reconnectingLatch = new StandardLatch

View file

@ -18,7 +18,7 @@ import se.scalablesolutions.akka.actor.Actor._
class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers {
@Test
def consumerChannelRecovery = if (AMQPTest.enabled) {
def consumerChannelRecovery = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))
try {

View file

@ -18,7 +18,7 @@ import Actor._
class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers {
@Test
def consumerConnectionRecovery = if (AMQPTest.enabled) {
def consumerConnectionRecovery = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))
try {

View file

@ -17,7 +17,7 @@ import org.scalatest.junit.JUnitSuite
class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers {
@Test
def consumerMessageManualAcknowledge = if (AMQPTest.enabled) {
def consumerMessageManualAcknowledge = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection()
try {
val countDown = new CountDownLatch(2)

View file

@ -17,7 +17,7 @@ import org.scalatest.junit.JUnitSuite
class AMQPConsumerManualRejectTest extends JUnitSuite with MustMatchers {
@Test
def consumerMessageManualAcknowledge = if (AMQPTest.enabled) {
def consumerMessageManualAcknowledge = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection()
try {
val countDown = new CountDownLatch(2)

View file

@ -16,7 +16,7 @@ import org.junit.Test
class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers {
@Test
def consumerMessage = if (AMQPTest.enabled) {
def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection()
try {

View file

@ -17,7 +17,7 @@ import org.junit.Test
class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers {
@Test
def producerChannelRecovery = if (AMQPTest.enabled) {
def producerChannelRecovery = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))

View file

@ -17,7 +17,7 @@ import org.junit.Test
class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers {
@Test
def producerConnectionRecovery = if (AMQPTest.enabled) {
def producerConnectionRecovery = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))
try {

View file

@ -19,7 +19,7 @@ import org.junit.Test
class AMQPProducerMessageTest extends JUnitSuite with MustMatchers {
@Test
def producerMessage = if (AMQPTest.enabled) {
def producerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection: ActorRef = AMQP.newConnection()
try {

View file

@ -0,0 +1,43 @@
package se.scalablesolutions.akka.amqp.test
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
import org.scalatest.matchers.MustMatchers
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.amqp.AMQP
import org.junit.Test
import org.multiverse.api.latches.StandardLatch
import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.amqp.rpc.RPC
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol
class AMQPProtobufProducerConsumerTest extends JUnitSuite with MustMatchers {
@Test
def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection()
val responseLatch = new StandardLatch
RPC.newProtobufRpcServer(connection, "protoexchange", requestHandler)
val request = AddressProtocol.newBuilder.setHostname("testhost").setPort(4321).build
def responseHandler(response: AddressProtocol) = {
assert(response.getHostname == request.getHostname.reverse)
responseLatch.open
}
AMQP.newProtobufConsumer(connection, "", responseHandler, Some("proto.reply.key"))
val producer = AMQP.newProtobufProducer[AddressProtocol](connection, "protoexchange")
producer.send(request, Some("proto.reply.key"))
responseLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
}
def requestHandler(request: AddressProtocol): AddressProtocol = {
AddressProtocol.newBuilder.setHostname(request.getHostname.reverse).setPort(request.getPort).build
}
}

View file

@ -6,7 +6,7 @@ package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.amqp._
import rpc.RPC
import rpc.RPC.{RpcClientSerializer, RpcServerSerializer, FromBinary, ToBinary}
import rpc.RPC.{RpcClientSerializer, RpcServerSerializer}
import se.scalablesolutions.akka.actor.Actor._
import org.scalatest.matchers.MustMatchers
import java.util.concurrent.{CountDownLatch, TimeUnit}
@ -15,50 +15,47 @@ import org.scalatest.junit.JUnitSuite
import org.junit.Test
class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers {
@Test
def consumerMessage = if (AMQPTest.enabled) {
def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection()
try {
val countDown = new CountDownLatch(3)
val channelCallback = actor {
case Started => countDown.countDown
case Restarting => ()
case Stopped => ()
}
val exchangeParameters = ExchangeParameters("text_topic_exchange", ExchangeType.Topic)
val channelParameters = ChannelParameters(channelCallback
= Some(channelCallback))
val rpcServerSerializer = new RpcServerSerializer[String, Int](
new FromBinary[String] {
def fromBinary(bytes: Array[Byte]) = new String(bytes)
}, new ToBinary[Int] {
def toBinary(t: Int) = Array(t.toByte)
})
def requestHandler(request: String) = 3
val rpcServer = RPC.newRpcServer[String, Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer,
requestHandler, channelParameters = Some(channelParameters))
val rpcClientSerializer = new RpcClientSerializer[String, Int](
new ToBinary[String] {
def toBinary(t: String) = t.getBytes
}, new FromBinary[Int] {
def fromBinary(bytes: Array[Byte]) = bytes.head.toInt
})
val rpcClient = RPC.newRpcClient[String, Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer,
channelParameters = Some(channelParameters))
countDown.await(2, TimeUnit.SECONDS) must be(true)
val response = rpcClient !! "some_payload"
response must be(Some(3))
} finally {
connection.stop
val countDown = new CountDownLatch(3)
val channelCallback = actor {
case Started => countDown.countDown
case Restarting => ()
case Stopped => ()
}
val exchangeParameters = ExchangeParameters("text_topic_exchange", ExchangeType.Topic)
val channelParameters = ChannelParameters(channelCallback
= Some(channelCallback))
val rpcServerSerializer = new RpcServerSerializer[String, Int](
new FromBinary[String] {
def fromBinary(bytes: Array[Byte]) = new String(bytes)
}, new ToBinary[Int] {
def toBinary(t: Int) = Array(t.toByte)
})
def requestHandler(request: String) = 3
val rpcServer = RPC.newRpcServer[String, Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer,
requestHandler, channelParameters = Some(channelParameters))
val rpcClientSerializer = new RpcClientSerializer[String, Int](
new ToBinary[String] {
def toBinary(t: String) = t.getBytes
}, new FromBinary[Int] {
def fromBinary(bytes: Array[Byte]) = bytes.head.toInt
})
val rpcClient = RPC.newRpcClient[String, Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer,
channelParameters = Some(channelParameters))
countDown.await(2, TimeUnit.SECONDS) must be(true)
val response = rpcClient !! "some_payload"
response must be(Some(3))
}
}

View file

@ -15,13 +15,13 @@ import java.util.concurrent.TimeUnit
class AMQPRpcProtobufTest extends JUnitSuite with MustMatchers {
@Test
def consumerMessage = if (AMQPTest.enabled) {
def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection()
RPC.startProtobufServer(connection, "protoservice", requestHandler)
RPC.newProtobufRpcServer(connection, "protoservice", requestHandler)
val protobufClient = RPC.startProtobufClient[AddressProtocol, AddressProtocol](connection, "protoservice")
val protobufClient = RPC.newProtobufRpcClient[AddressProtocol, AddressProtocol](connection, "protoservice")
val request = AddressProtocol.newBuilder.setHostname("testhost").setPort(4321).build
@ -40,6 +40,7 @@ class AMQPRpcProtobufTest extends JUnitSuite with MustMatchers {
}
aSyncLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
}
def requestHandler(request: AddressProtocol): AddressProtocol = {

View file

@ -0,0 +1,47 @@
package se.scalablesolutions.akka.amqp.test
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
import org.scalatest.matchers.MustMatchers
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.amqp.AMQP
import org.junit.Test
import se.scalablesolutions.akka.amqp.rpc.RPC
import org.multiverse.api.latches.StandardLatch
import java.util.concurrent.TimeUnit
class AMQPRpcStringTest extends JUnitSuite with MustMatchers {
@Test
def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection()
RPC.newStringRpcServer(connection, "stringservice", requestHandler)
val protobufClient = RPC.newStringRpcClient(connection, "stringservice")
val request = "teststring"
protobufClient.call(request) match {
case Some(response) => assert(response == request.reverse)
case None => fail("no response")
}
val aSyncLatch = new StandardLatch
protobufClient.callAsync(request) {
case Some(response) => {
assert(response == request.reverse)
aSyncLatch.open
}
case None => fail("no response")
}
aSyncLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
}
def requestHandler(request: String): String= {
request.reverse
}
}

View file

@ -0,0 +1,44 @@
package se.scalablesolutions.akka.amqp.test
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
import org.scalatest.matchers.MustMatchers
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.amqp.AMQP
import org.junit.Test
import org.multiverse.api.latches.StandardLatch
import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.amqp.rpc.RPC
class AMQPStringProducerConsumerTest extends JUnitSuite with MustMatchers {
@Test
def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection()
val responseLatch = new StandardLatch
RPC.newStringRpcServer(connection, "stringexchange", requestHandler)
val request = "somemessage"
def responseHandler(response: String) = {
assert(response == request.reverse)
responseLatch.open
}
AMQP.newStringConsumer(connection, "", responseHandler, Some("string.reply.key"))
val producer = AMQP.newStringProducer(connection, "stringexchange")
producer.send(request, Some("string.reply.key"))
responseLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
}
def requestHandler(request: String): String= {
println("###### Reverse")
request.reverse
}
}

View file

@ -4,6 +4,19 @@
package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.amqp.AMQP
import se.scalablesolutions.akka.actor.ActorRegistry
import java.util.concurrent.TimeUnit
object AMQPTest {
def enabled = false
def enabled = true
def withCleanEndState(action: => Unit) {
try {
action
} finally {
AMQP.shutdownAll
}
}
}

View file

@ -27,7 +27,7 @@
<fileNamePattern>./logs/akka.log.%d{yyyy-MM-dd-HH}</fileNamePattern>
</rollingPolicy>
</appender>
<logger name="se.scalablesolutions" level="INFO"/>
<logger name="se.scalablesolutions" level="DEBUG"/>
<root level="INFO">
<appender-ref ref="stdout"/>
<appender-ref ref="R"/>