Merge branch 'rpc_amqp'

This commit is contained in:
momania 2010-08-12 16:25:20 +02:00
commit bbb17e6eb3
30 changed files with 905 additions and 357 deletions

View file

@ -1,150 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.amqp
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.config.OneForOneStrategy
import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory}
import java.lang.IllegalArgumentException
import se.scalablesolutions.akka.util.Logging
/**
* AMQP Actor API. Implements Connection, Producer and Consumer materialized as Actors.
*
* @see se.scalablesolutions.akka.amqp.ExampleSession
*
* @author Irmo Manie
*/
object AMQP {
case class ConnectionParameters(
host: String = ConnectionFactory.DEFAULT_HOST,
port: Int = ConnectionFactory.DEFAULT_AMQP_PORT,
username: String = ConnectionFactory.DEFAULT_USER,
password: String = ConnectionFactory.DEFAULT_PASS,
virtualHost: String = ConnectionFactory.DEFAULT_VHOST,
initReconnectDelay: Long = 5000,
connectionCallback: Option[ActorRef] = None)
case class ChannelParameters(
shutdownListener: Option[ShutdownListener] = None,
channelCallback: Option[ActorRef] = None)
case class ExchangeParameters(
exchangeName: String,
exchangeType: ExchangeType,
exchangeDurable: Boolean = false,
exchangeAutoDelete: Boolean = true,
exchangePassive: Boolean = false,
configurationArguments: Map[String, AnyRef] = Map())
case class ProducerParameters(
exchangeParameters: ExchangeParameters,
producerId: Option[String] = None,
returnListener: Option[ReturnListener] = None,
channelParameters: Option[ChannelParameters] = None)
case class ConsumerParameters(
exchangeParameters: ExchangeParameters,
routingKey: String,
deliveryHandler: ActorRef,
queueName: Option[String] = None,
queueDurable: Boolean = false,
queueAutoDelete: Boolean = true,
queuePassive: Boolean = false,
queueExclusive: Boolean = false,
selfAcknowledging: Boolean = true,
channelParameters: Option[ChannelParameters] = None) {
if (queueDurable && queueName.isEmpty) {
throw new IllegalArgumentException("A queue name is required when requesting a durable queue.")
}
}
def newConnection(connectionParameters: ConnectionParameters = new ConnectionParameters): ActorRef = {
val connection: ActorRef = supervisor.newConnection(connectionParameters)
connection ! Connect
connection
}
def newProducer(connection: ActorRef, producerParameters: ProducerParameters): ActorRef = {
val producer: ActorRef = Actor.actorOf(new ProducerActor(producerParameters))
connection.startLink(producer)
producer ! Start
producer
}
def newConsumer(connection: ActorRef, consumerParameters: ConsumerParameters): ActorRef = {
val consumer: ActorRef = actorOf(new ConsumerActor(consumerParameters))
val handler = consumerParameters.deliveryHandler
if (handler.supervisor.isEmpty) consumer.startLink(handler)
connection.startLink(consumer)
consumer ! Start
consumer
}
def newRpcClient[O,I](
connection: ActorRef,
exchangeParameters: ExchangeParameters,
routingKey: String,
serializer: RpcClientSerializer[O,I],
channelParameters: Option[ChannelParameters] = None): ActorRef = {
val rpcActor: ActorRef = actorOf(new RpcClientActor[O,I](exchangeParameters, routingKey, serializer, channelParameters))
connection.startLink(rpcActor)
rpcActor ! Start
rpcActor
}
def newRpcServer[I,O](
connection: ActorRef,
exchangeParameters: ExchangeParameters,
routingKey: String,
serializer: RpcServerSerializer[I,O],
requestHandler: I => O,
queueName: Option[String] = None,
channelParameters: Option[ChannelParameters] = None) = {
val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters))
val rpcServer = actorOf(new RpcServerActor[I,O](producer, serializer, requestHandler))
val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer
, channelParameters = channelParameters
, selfAcknowledging = false
, queueName = queueName))
}
private val supervisor = new AMQPSupervisor
class AMQPSupervisor extends Logging {
class AMQPSupervisorActor extends Actor {
import self._
faultHandler = Some(OneForOneStrategy(5, 5000))
trapExit = List(classOf[Throwable])
def receive = {
case _ => {} // ignore all messages
}
}
private val supervisor = actorOf(new AMQPSupervisorActor).start
def newConnection(connectionParameters: ConnectionParameters): ActorRef = {
val connectionActor = actorOf(new FaultTolerantConnectionActor(connectionParameters))
supervisor.startLink(connectionActor)
connectionActor
}
}
trait FromBinary[T] {
def fromBinary(bytes: Array[Byte]): T
}
trait ToBinary[T] {
def toBinary(t: T): Array[Byte]
}
case class RpcClientSerializer[O,I](toBinary: ToBinary[O], fromBinary: FromBinary[I])
case class RpcServerSerializer[I,O](fromBinary: FromBinary[I], toBinary: ToBinary[O])
}

View file

@ -0,0 +1,218 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.amqp
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.config.OneForOneStrategy
import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory}
import com.rabbitmq.client.AMQP.BasicProperties
import java.lang.{String, IllegalArgumentException}
/**
* AMQP Actor API. Implements Connection, Producer and Consumer materialized as Actors.
*
* @see se.scalablesolutions.akka.amqp.ExampleSession
*
* @author Irmo Manie
*/
object AMQP {
case class ConnectionParameters(
host: String = ConnectionFactory.DEFAULT_HOST,
port: Int = ConnectionFactory.DEFAULT_AMQP_PORT,
username: String = ConnectionFactory.DEFAULT_USER,
password: String = ConnectionFactory.DEFAULT_PASS,
virtualHost: String = ConnectionFactory.DEFAULT_VHOST,
initReconnectDelay: Long = 5000,
connectionCallback: Option[ActorRef] = None)
case class ChannelParameters(
shutdownListener: Option[ShutdownListener] = None,
channelCallback: Option[ActorRef] = None)
case class ExchangeParameters(
exchangeName: String,
exchangeType: ExchangeType,
exchangeDurable: Boolean = false,
exchangeAutoDelete: Boolean = true,
exchangePassive: Boolean = false,
configurationArguments: Map[String, AnyRef] = Map())
case class ProducerParameters(
exchangeParameters: ExchangeParameters,
producerId: Option[String] = None,
returnListener: Option[ReturnListener] = None,
channelParameters: Option[ChannelParameters] = None)
case class ConsumerParameters(
exchangeParameters: ExchangeParameters,
routingKey: String,
deliveryHandler: ActorRef,
queueName: Option[String] = None,
queueDurable: Boolean = false,
queueAutoDelete: Boolean = true,
queuePassive: Boolean = false,
queueExclusive: Boolean = false,
selfAcknowledging: Boolean = true,
channelParameters: Option[ChannelParameters] = None) {
if (queueDurable && queueName.isEmpty) {
throw new IllegalArgumentException("A queue name is required when requesting a durable queue.")
}
}
def newConnection(connectionParameters: ConnectionParameters = new ConnectionParameters): ActorRef = {
val connection = actorOf(new FaultTolerantConnectionActor(connectionParameters))
supervisor.startLink(connection)
connection ! Connect
connection
}
def newProducer(connection: ActorRef, producerParameters: ProducerParameters): ActorRef = {
val producer: ActorRef = Actor.actorOf(new ProducerActor(producerParameters))
connection.startLink(producer)
producer ! Start
producer
}
def newConsumer(connection: ActorRef, consumerParameters: ConsumerParameters): ActorRef = {
val consumer: ActorRef = actorOf(new ConsumerActor(consumerParameters))
val handler = consumerParameters.deliveryHandler
if (handler.supervisor.isEmpty) consumer.startLink(handler)
connection.startLink(consumer)
consumer ! Start
consumer
}
/**
* Convenience
*/
class ProducerClient[O](client: ActorRef, routingKey: String, toBinary: ToBinary[O]) {
def send(request: O, replyTo: Option[String] = None) = {
val basicProperties = new BasicProperties
basicProperties.setReplyTo(replyTo.getOrElse(null))
client ! Message(toBinary.toBinary(request), routingKey, false, false, Some(basicProperties))
}
def stop = client.stop
}
def newStringProducer(connection: ActorRef,
exchange: String,
routingKey: Option[String] = None,
producerId: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true,
passive: Boolean = true): ProducerClient[String] = {
val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic,
exchangeDurable = durable, exchangeAutoDelete = autoDelete)
val rKey = routingKey.getOrElse("%s.request".format(exchange))
val producerRef = newProducer(connection, ProducerParameters(exchangeParameters, producerId))
val toBinary = new ToBinary[String] {
def toBinary(t: String) = t.getBytes
}
new ProducerClient(producerRef, rKey, toBinary)
}
def newStringConsumer(connection: ActorRef,
exchange: String,
handler: String => Unit,
routingKey: Option[String] = None,
queueName: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true): ActorRef = {
val deliveryHandler = actor {
case Delivery(payload, _, _, _, _) => handler.apply(new String(payload))
}
val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic,
exchangeDurable = durable, exchangeAutoDelete = autoDelete)
val rKey = routingKey.getOrElse("%s.request".format(exchange))
val qName = queueName.getOrElse("%s.in".format(rKey))
newConsumer(connection, ConsumerParameters(exchangeParameters, rKey, deliveryHandler, Some(qName), durable, autoDelete))
}
def newProtobufProducer[O <: com.google.protobuf.Message](connection: ActorRef,
exchange: String,
routingKey: Option[String] = None,
producerId: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true,
passive: Boolean = true): ProducerClient[O] = {
val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic,
exchangeDurable = durable, exchangeAutoDelete = autoDelete)
val rKey = routingKey.getOrElse("%s.request".format(exchange))
val producerRef = newProducer(connection, ProducerParameters(exchangeParameters, producerId))
new ProducerClient(producerRef, rKey, new ToBinary[O] {
def toBinary(t: O) = t.toByteArray
})
}
def newProtobufConsumer[I <: com.google.protobuf.Message](connection: ActorRef,
exchange: String,
handler: I => Unit,
routingKey: Option[String] = None,
queueName: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true)(implicit manifest: Manifest[I]): ActorRef = {
val deliveryHandler = actor {
case Delivery(payload, _, _, _, _) => {
handler.apply(createProtobufFromBytes[I](payload))
}
}
val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic,
exchangeDurable = durable, exchangeAutoDelete = autoDelete)
val rKey = routingKey.getOrElse("%s.request".format(exchange))
val qName = queueName.getOrElse("%s.in".format(rKey))
newConsumer(connection, ConsumerParameters(exchangeParameters, rKey, deliveryHandler, Some(qName), durable, autoDelete))
}
/**
* Main supervisor
*/
class AMQPSupervisorActor extends Actor {
import self._
faultHandler = Some(OneForOneStrategy(5, 5000))
trapExit = List(classOf[Throwable])
def receive = {
case _ => {} // ignore all messages
}
}
private val supervisor = actorOf(new AMQPSupervisorActor).start
def shutdownAll = {
supervisor.shutdownLinkedActors
}
/**
* Serialization stuff
*/
trait FromBinary[T] {
def fromBinary(bytes: Array[Byte]): T
}
trait ToBinary[T] {
def toBinary(t: T): Array[Byte]
}
private val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]])
private[amqp] def createProtobufFromBytes[I <: com.google.protobuf.Message](bytes: Array[Byte])(implicit manifest: Manifest[I]): I = {
manifest.erasure.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[I]
}
}

View file

@ -44,6 +44,9 @@ case object Stopped extends AMQPMessage
// delivery messages
case class Acknowledge(deliveryTag: Long) extends AMQPMessage
case class Acknowledged(deliveryTag: Long) extends AMQPMessage
case class Reject(deliveryTag: Long) extends AMQPMessage
case class Rejected(deliveryTag: Long) extends AMQPMessage
class RejectionException(deliveryTag: Long) extends RuntimeException
// internal messages
private[akka] case class Failure(cause: Throwable) extends InternalAMQPMessage

View file

@ -23,6 +23,7 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
def specificMessageHandler = {
case Acknowledge(deliveryTag) => acknowledgeDeliveryTag(deliveryTag, true)
case Reject(deliveryTag) => rejectDeliveryTag(deliveryTag, true)
case message: Message =>
handleIllegalMessage("%s can't be used to send messages, ignoring message [%s]".format(this, message))
case unknown =>
@ -82,6 +83,19 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
}
}
private def rejectDeliveryTag(deliveryTag: Long, remoteAcknowledgement: Boolean) = {
log.debug("Rejecting message with delivery tag [%s]", deliveryTag)
// FIXME: when rabbitmq 1.9 arrives, basicReject should be available on the API and implemented instead of this
log.warning("Consumer is rejecting delivery with tag [%s] - " +
"for now this means we have to self terminate and kill the channel - see you in a second.")
channel.foreach{ch =>
if (remoteAcknowledgement) {
deliveryHandler ! Rejected(deliveryTag)
}
}
throw new RejectionException(deliveryTag)
}
private def handleIllegalMessage(errorMessage: String) = {
log.error(errorMessage)
throw new IllegalArgumentException(errorMessage)
@ -94,7 +108,7 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
override def shutdown = {
listenerTag.foreach(tag => channel.foreach(_.basicCancel(tag)))
self.linkedActorsAsList.foreach(_.stop)
self.shutdownLinkedActors
super.shutdown
}

View file

@ -4,44 +4,64 @@
package se.scalablesolutions.akka.amqp
import rpc.RPC
import rpc.RPC.{RpcClientSerializer, RpcServerSerializer}
import se.scalablesolutions.akka.actor.{Actor, ActorRegistry}
import Actor._
import java.util.concurrent.{CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.amqp.AMQP._
import java.lang.String
import se.scalablesolutions.akka.amqp.AMQP._
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol
object ExampleSession {
def main(args: Array[String]) = {
println("==== DIRECT ===")
printTopic("DIRECT")
direct
TimeUnit.SECONDS.sleep(2)
println("==== FANOUT ===")
printTopic("FANOUT")
fanout
TimeUnit.SECONDS.sleep(2)
println("==== TOPIC ===")
printTopic("TOPIC")
topic
TimeUnit.SECONDS.sleep(2)
println("==== CALLBACK ===")
printTopic("CALLBACK")
callback
TimeUnit.SECONDS.sleep(2)
printTopic("EASY STRING PRODUCER AND CONSUMER")
easyStringProducerConsumer
println("==== RPC ===")
printTopic("EASY PROTOBUF PRODUCER AND CONSUMER")
easyProtobufProducerConsumer
printTopic("RPC")
rpc
TimeUnit.SECONDS.sleep(2)
printTopic("EASY STRING RPC")
easyStringRpc
printTopic("EASY PROTOBUF RPC")
easyProtobufRpc
printTopic("Happy hAkking :-)")
// shutdown everything the amqp tree except the main AMQP supervisor
// all connections/consumers/producers will be stopped
AMQP.shutdownAll
ActorRegistry.shutdownAll
System.exit(0)
}
def printTopic(topic: String) {
println("")
println("==== " + topic + " ===")
println("")
TimeUnit.SECONDS.sleep(2)
}
def direct = {
// defaults to amqp://guest:guest@localhost:5672/
@ -115,7 +135,7 @@ object ExampleSession {
case Restarting => // not used, sent when channel or connection fails and initiates a restart
case Stopped => log.info("Channel callback: Stopped")
}
val exchangeParameters = ExchangeParameters("my_direct_exchange", ExchangeType.Direct)
val exchangeParameters = ExchangeParameters("my_callback_exchange", ExchangeType.Direct)
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
val consumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "callback.routing", actor {
@ -129,6 +149,40 @@ object ExampleSession {
connection.stop
}
def easyStringProducerConsumer = {
val connection = AMQP.newConnection()
val exchangeName = "easy.string"
// listen by default to:
// exchange = exchangeName
// routingKey = <exchange>.request
// queueName = <routingKey>.in
AMQP.newStringConsumer(connection, exchangeName, message => println("Received message: "+message))
// send by default to:
// exchange = exchangeName
// routingKey = <exchange>.request
val producer = AMQP.newStringProducer(connection, exchangeName)
producer.send("This shit is easy!")
}
def easyProtobufProducerConsumer = {
val connection = AMQP.newConnection()
val exchangeName = "easy.protobuf"
def protobufMessageHandler(message: AddressProtocol) = {
log.info("Received "+message)
}
AMQP.newProtobufConsumer(connection, exchangeName, protobufMessageHandler)
val producerClient = AMQP.newProtobufProducer[AddressProtocol](connection, exchangeName)
producerClient.send(AddressProtocol.newBuilder.setHostname("akkarocks.com").setPort(1234).build)
}
def rpc = {
val connection = AMQP.newConnection()
@ -146,7 +200,7 @@ object ExampleSession {
def requestHandler(request: String) = 3
val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer,
val rpcServer = RPC.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer,
requestHandler, queueName = Some("rpc.in.key.queue"))
@ -159,9 +213,56 @@ object ExampleSession {
}
val rpcClientSerializer = new RpcClientSerializer[String, Int](clientToBinary, clientFromBinary)
val rpcClient = AMQP.newRpcClient[String,Int](connection, exchangeParameters, "rpc.in.key", rpcClientSerializer)
val rpcClient = RPC.newRpcClient[String,Int](connection, exchangeParameters, "rpc.in.key", rpcClientSerializer)
val response = (rpcClient !! "rpc_request")
log.info("Response: " + response)
}
def easyStringRpc = {
val connection = AMQP.newConnection()
val exchangeName = "easy.stringrpc"
// listen by default to:
// exchange = exchangeName
// routingKey = <exchange>.request
// queueName = <routingKey>.in
RPC.newStringRpcServer(connection, exchangeName, request => {
log.info("Got request: "+request)
"Response to: '"+request+"'"
})
// send by default to:
// exchange = exchangeName
// routingKey = <exchange>.request
val stringRpcClient = RPC.newStringRpcClient(connection, exchangeName)
val response = stringRpcClient.call("AMQP Rocks!")
log.info("Got response: "+response)
stringRpcClient.callAsync("AMQP is dead easy") {
case response => log.info("This is handled async: "+response)
}
}
def easyProtobufRpc = {
val connection = AMQP.newConnection()
val exchangeName = "easy.protobuf.rpc"
def protobufRequestHandler(request: AddressProtocol): AddressProtocol = {
AddressProtocol.newBuilder.setHostname(request.getHostname.reverse).setPort(request.getPort).build
}
RPC.newProtobufRpcServer(connection, exchangeName, protobufRequestHandler)
val stringRpcClient = RPC.newProtobufRpcClient[AddressProtocol, AddressProtocol](connection, exchangeName)
val response = stringRpcClient.call(AddressProtocol.newBuilder.setHostname("localhost").setPort(4321).build)
log.info("Got response: "+response)
}
}

View file

@ -107,7 +107,7 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio
override def shutdown = {
reconnectionTimer.cancel
// make sure shutdown is called on all linked actors so they can do channel cleanup before connection is killed
self.linkedActorsAsList.foreach(_.stop)
self.shutdownLinkedActors
disconnect
}

View file

@ -0,0 +1,182 @@
package se.scalablesolutions.akka.amqp.rpc
import se.scalablesolutions.akka.amqp.AMQP._
import com.google.protobuf.Message
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import Actor._
import se.scalablesolutions.akka.amqp._
object RPC {
def newRpcClient[O, I](connection: ActorRef,
exchangeParameters: ExchangeParameters,
routingKey: String,
serializer: RpcClientSerializer[O, I],
channelParameters: Option[ChannelParameters] = None): ActorRef = {
val rpcActor: ActorRef = actorOf(new RpcClientActor[O, I](
exchangeParameters, routingKey, serializer, channelParameters))
connection.startLink(rpcActor)
rpcActor ! Start
rpcActor
}
def newRpcServer[I, O](connection: ActorRef,
exchangeParameters: ExchangeParameters,
routingKey: String,
serializer: RpcServerSerializer[I, O],
requestHandler: I => O,
queueName: Option[String] = None,
channelParameters: Option[ChannelParameters] = None): RpcServerHandle = {
val producer = newProducer(connection, ProducerParameters(
ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters))
val rpcServer = actorOf(new RpcServerActor[I, O](producer, serializer, requestHandler))
val consumer = newConsumer(connection, ConsumerParameters(exchangeParameters, routingKey, rpcServer,
channelParameters = channelParameters, selfAcknowledging = false, queueName = queueName))
RpcServerHandle(producer, consumer)
}
case class RpcServerHandle(producer: ActorRef, consumer: ActorRef) {
def stop = {
consumer.stop
producer.stop
}
}
case class RpcClientSerializer[O, I](toBinary: ToBinary[O], fromBinary: FromBinary[I])
case class RpcServerSerializer[I, O](fromBinary: FromBinary[I], toBinary: ToBinary[O])
/**
* RPC convenience
*/
class RpcClient[O, I](client: ActorRef){
def call(request: O, timeout: Long = 5000): Option[I] = {
(client.!!(request, timeout)).as[I]
}
def callAsync(request: O, timeout: Long = 5000)(responseHandler: PartialFunction[Option[I],Unit]) = {
spawn {
val result = call(request, timeout)
responseHandler.apply(result)
}
}
def stop = client.stop
}
def newProtobufRpcServer[I <: Message, O <: Message](
connection: ActorRef,
exchange: String,
requestHandler: I => O,
routingKey: Option[String] = None,
queueName: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true)(implicit manifest: Manifest[I]): RpcServerHandle = {
val serializer = new RpcServerSerializer[I, O](
new FromBinary[I] {
def fromBinary(bytes: Array[Byte]): I = {
createProtobufFromBytes[I](bytes)
}
}, new ToBinary[O] {
def toBinary(t: O) = t.toByteArray
})
startServer(connection, exchange, requestHandler, routingKey, queueName, durable, autoDelete, serializer)
}
def newProtobufRpcClient[O <: Message, I <: Message](
connection: ActorRef,
exchange: String,
routingKey: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true,
passive: Boolean = true)(implicit manifest: Manifest[I]): RpcClient[O, I] = {
val serializer = new RpcClientSerializer[O, I](
new ToBinary[O] {
def toBinary(t: O) = t.toByteArray
}, new FromBinary[I] {
def fromBinary(bytes: Array[Byte]): I = {
createProtobufFromBytes[I](bytes)
}
})
startClient(connection, exchange, routingKey, durable, autoDelete, passive, serializer)
}
def newStringRpcServer(connection: ActorRef,
exchange: String,
requestHandler: String => String,
routingKey: Option[String] = None,
queueName: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true): RpcServerHandle = {
val serializer = new RpcServerSerializer[String, String](
new FromBinary[String] {
def fromBinary(bytes: Array[Byte]): String = {
new String(bytes)
}
}, new ToBinary[String] {
def toBinary(t: String) = t.getBytes
})
startServer(connection, exchange, requestHandler, routingKey, queueName, durable, autoDelete, serializer)
}
def newStringRpcClient(connection: ActorRef,
exchange: String,
routingKey: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true,
passive: Boolean = true): RpcClient[String, String] = {
val serializer = new RpcClientSerializer[String, String](
new ToBinary[String] {
def toBinary(t: String) = t.getBytes
}, new FromBinary[String] {
def fromBinary(bytes: Array[Byte]): String = {
new String(bytes)
}
})
startClient(connection, exchange, routingKey, durable, autoDelete, passive, serializer)
}
private def startClient[O, I](connection: ActorRef,
exchange: String,
routingKey: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true,
passive: Boolean = true,
serializer: RpcClientSerializer[O, I]): RpcClient[O, I] = {
val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic,
exchangeDurable = durable, exchangeAutoDelete = autoDelete, exchangePassive = passive)
val rKey = routingKey.getOrElse("%s.request".format(exchange))
val client = newRpcClient(connection, exchangeParameters, rKey, serializer)
new RpcClient(client)
}
private def startServer[I, O](connection: ActorRef,
exchange: String,
requestHandler: I => O,
routingKey: Option[String] = None,
queueName: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true,
serializer: RpcServerSerializer[I, O]): RpcServerHandle = {
val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic,
exchangeDurable = durable, exchangeAutoDelete = autoDelete)
val rKey = routingKey.getOrElse("%s.request".format(exchange))
val qName = queueName.getOrElse("%s.in".format(rKey))
newRpcServer(connection, exchangeParameters, rKey, serializer, requestHandler, queueName = Some(qName))
}
}

View file

@ -4,11 +4,9 @@
package se.scalablesolutions.akka.amqp
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ExchangeParameters}
import com.rabbitmq.client.{Channel, RpcClient}
import se.scalablesolutions.akka.amqp.AMQP.{RpcClientSerializer, ChannelParameters, ExchangeParameters}
import rpc.RPC.RpcClientSerializer
import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ExchangeParameters}
class RpcClientActor[I,O](
exchangeParameters: ExchangeParameters,
@ -41,5 +39,11 @@ class RpcClientActor[I,O](
super.preRestart(reason)
}
override def shutdown = {
rpcClient.foreach(rpc => rpc.close)
super.shutdown
}
override def toString = "AMQP.RpcClient[exchange=" +exchangeName + ", routingKey=" + routingKey+ "]"
}

View file

@ -4,9 +4,9 @@
package se.scalablesolutions.akka.amqp
import rpc.RPC.RpcServerSerializer
import se.scalablesolutions.akka.actor.{ActorRef, Actor}
import com.rabbitmq.client.AMQP.BasicProperties
import se.scalablesolutions.akka.amqp.AMQP.RpcServerSerializer
class RpcServerActor[I,O](
producer: ActorRef,

View file

@ -1,69 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.util.Logging
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.amqp._
import se.scalablesolutions.akka.actor.Actor._
import org.scalatest.matchers.MustMatchers
import java.util.concurrent.{CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.amqp.AMQP._
class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging {
@Test
def consumerMessage = if (AMQPTest.enabled) {
val connection = AMQP.newConnection()
try {
val countDown = new CountDownLatch(3)
val channelCallback = actor {
case Started => countDown.countDown
case Restarting => ()
case Stopped => ()
}
val exchangeParameters = ExchangeParameters("text_topic_exchange", ExchangeType.Topic)
val channelParameters = ChannelParameters(channelCallback
= Some(channelCallback))
val rpcServerSerializer = new RpcServerSerializer[String, Int](
new FromBinary[String] {
def fromBinary(bytes: Array[Byte]) = new String(bytes)
}, new ToBinary[Int] {
def toBinary(t: Int) = Array(t.toByte)
})
def requestHandler(request: String) = 3
val rpcServer = AMQP.newRpcServer[String, Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer,
requestHandler, channelParameters = Some(channelParameters))
val rpcClientSerializer = new RpcClientSerializer[String, Int](
new ToBinary[String] {
def toBinary(t: String) = t.getBytes
}, new FromBinary[Int] {
def fromBinary(bytes: Array[Byte]) = bytes.head.toInt
})
val rpcClient = AMQP.newRpcClient[String, Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer,
channelParameters = Some(channelParameters))
countDown.await(2, TimeUnit.SECONDS) must be(true)
val response = rpcClient !! "some_payload"
response must be(Some(3))
} finally {
connection.stop
}
}
@Test
def dummy {
// amqp tests need local rabbitmq server running, so a disabled by default.
// this dummy test makes sure that the whole test class doesn't fail because of missing tests
assert(true)
}
}

View file

@ -1,12 +1,9 @@
package se.scalablesolutions.akka.amqp.test
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.util.Logging
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import org.multiverse.api.latches.StandardLatch
@ -14,11 +11,13 @@ import com.rabbitmq.client.ShutdownSignalException
import se.scalablesolutions.akka.amqp._
import se.scalablesolutions.akka.amqp.AMQP.ConnectionParameters
import org.scalatest.matchers.MustMatchers
import org.scalatest.junit.JUnitSuite
import org.junit.Test
class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging {
class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers {
@Test
def connectionAndRecovery = if (AMQPTest.enabled) {
def connectionAndRecovery = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connectedLatch = new StandardLatch
val reconnectingLatch = new StandardLatch
@ -45,15 +44,9 @@ class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers with Loggi
reconnectedLatch.tryAwait(2, TimeUnit.SECONDS) must be(true)
} finally {
connection.stop
AMQP.shutdownAll
disconnectedLatch.tryAwait(2, TimeUnit.SECONDS) must be(true)
}
}
@Test
def dummy {
// amqp tests need local rabbitmq server running, so a disabled by default.
// this dummy test makes sure that the whole test class doesn't fail because of missing tests
assert(true)
}
}

View file

@ -1,12 +1,9 @@
package se.scalablesolutions.akka.amqp.test
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.util.Logging
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor._
import org.multiverse.api.latches.StandardLatch
import com.rabbitmq.client.ShutdownSignalException
import se.scalablesolutions.akka.amqp._
@ -15,11 +12,13 @@ import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.actor.ActorRef
import org.junit.Test
import se.scalablesolutions.akka.amqp.AMQP._
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor._
class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging {
class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers {
@Test
def consumerChannelRecovery = if (AMQPTest.enabled) {
def consumerChannelRecovery = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))
try {
@ -60,11 +59,4 @@ class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers with
connection.stop
}
}
@Test
def dummy {
// amqp tests need local rabbitmq server running, so a disabled by default.
// this dummy test makes sure that the whole test class doesn't fail because of missing tests
assert(true)
}
}

View file

@ -1,25 +1,24 @@
package se.scalablesolutions.akka.amqp.test
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.util.Logging
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor._
import org.multiverse.api.latches.StandardLatch
import com.rabbitmq.client.ShutdownSignalException
import se.scalablesolutions.akka.amqp._
import org.scalatest.matchers.MustMatchers
import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.actor.ActorRef
import org.junit.Test
import se.scalablesolutions.akka.amqp.AMQP._
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import Actor._
class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging {
class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers {
@Test
def consumerConnectionRecovery = if (AMQPTest.enabled) {
def consumerConnectionRecovery = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))
try {
@ -79,11 +78,4 @@ class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers wi
connection.stop
}
}
@Test
def dummy {
// amqp tests need local rabbitmq server running, so a disabled by default.
// this dummy test makes sure that the whole test class doesn't fail because of missing tests
assert(true)
}
}

View file

@ -1,12 +1,9 @@
package se.scalablesolutions.akka.amqp.test
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.util.Logging
import org.scalatest.junit.JUnitSuite
import org.multiverse.api.latches.StandardLatch
import se.scalablesolutions.akka.actor.Actor._
import org.scalatest.matchers.MustMatchers
import se.scalablesolutions.akka.amqp._
@ -14,11 +11,13 @@ import org.junit.Test
import se.scalablesolutions.akka.actor.ActorRef
import java.util.concurrent.{CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters}
import org.multiverse.api.latches.StandardLatch
import org.scalatest.junit.JUnitSuite
class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers with Logging {
class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers {
@Test
def consumerMessageManualAcknowledge = if (AMQPTest.enabled) {
def consumerMessageManualAcknowledge = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection()
try {
val countDown = new CountDownLatch(2)
@ -57,11 +56,4 @@ class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers wit
connection.stop
}
}
@Test
def dummy {
// amqp tests need local rabbitmq server running, so a disabled by default.
// this dummy test makes sure that the whole test class doesn't fail because of missing tests
assert(true)
}
}

View file

@ -0,0 +1,53 @@
package se.scalablesolutions.akka.amqp.test
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
import se.scalablesolutions.akka.actor.Actor._
import org.scalatest.matchers.MustMatchers
import se.scalablesolutions.akka.amqp._
import org.junit.Test
import se.scalablesolutions.akka.actor.ActorRef
import java.util.concurrent.{CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters}
import org.multiverse.api.latches.StandardLatch
import org.scalatest.junit.JUnitSuite
class AMQPConsumerManualRejectTest extends JUnitSuite with MustMatchers {
@Test
def consumerMessageManualAcknowledge = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection()
try {
val countDown = new CountDownLatch(2)
val restartingLatch = new StandardLatch
val channelCallback = actor {
case Started => countDown.countDown
case Restarting => restartingLatch.open
case Stopped => ()
}
val exchangeParameters = ExchangeParameters("text_exchange",ExchangeType.Direct)
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
val rejectedLatch = new StandardLatch
val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "manual.reject.this", actor {
case Delivery(payload, _, deliveryTag, _, sender) => {
sender.foreach(_ ! Reject(deliveryTag))
}
case Rejected(deliveryTag) => rejectedLatch.open
}, queueName = Some("self.reject.queue"), selfAcknowledging = false, queueAutoDelete = false, channelParameters = Some(channelParameters)))
val producer = AMQP.newProducer(connection,
ProducerParameters(exchangeParameters, channelParameters = Some(channelParameters)))
countDown.await(2, TimeUnit.SECONDS) must be (true)
producer ! Message("some_payload".getBytes, "manual.reject.this")
rejectedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
restartingLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
} finally {
connection.stop
}
}
}

View file

@ -1,23 +1,22 @@
package se.scalablesolutions.akka.amqp.test
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.util.Logging
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.amqp._
import org.multiverse.api.latches.StandardLatch
import se.scalablesolutions.akka.actor.Actor._
import org.scalatest.matchers.MustMatchers
import java.util.concurrent.{CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers with Logging {
class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers {
@Test
def consumerMessage = if (AMQPTest.enabled) {
def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection()
try {
@ -46,11 +45,4 @@ class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers with Logging
connection.stop
}
}
@Test
def dummy {
// amqp tests need local rabbitmq server running, so a disabled by default.
// this dummy test makes sure that the whole test class doesn't fail because of missing tests
assert(true)
}
}

View file

@ -1,12 +1,9 @@
package se.scalablesolutions.akka.amqp.test
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.util.Logging
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import org.multiverse.api.latches.StandardLatch
@ -14,11 +11,13 @@ import com.rabbitmq.client.ShutdownSignalException
import se.scalablesolutions.akka.amqp._
import org.scalatest.matchers.MustMatchers
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters, ProducerParameters, ConnectionParameters}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging {
class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers {
@Test
def producerChannelRecovery = if (AMQPTest.enabled) {
def producerChannelRecovery = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))
@ -53,11 +52,4 @@ class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers with
connection.stop
}
}
@Test
def dummy {
// amqp tests need local rabbitmq server running, so a disabled by default.
// this dummy test makes sure that the whole test class doesn't fail because of missing tests
assert(true)
}
}

View file

@ -1,12 +1,9 @@
package se.scalablesolutions.akka.amqp.test
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.util.Logging
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import org.multiverse.api.latches.StandardLatch
@ -14,11 +11,13 @@ import com.rabbitmq.client.ShutdownSignalException
import se.scalablesolutions.akka.amqp._
import org.scalatest.matchers.MustMatchers
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters, ProducerParameters, ConnectionParameters}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging {
class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers {
@Test
def producerConnectionRecovery = if (AMQPTest.enabled) {
def producerConnectionRecovery = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))
try {
@ -52,11 +51,4 @@ class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers wi
connection.stop
}
}
@Test
def dummy {
// amqp tests need local rabbitmq server running, so a disabled by default.
// this dummy test makes sure that the whole test class doesn't fail because of missing tests
assert(true)
}
}

View file

@ -1,12 +1,9 @@
package se.scalablesolutions.akka.amqp.test
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.util.Logging
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.actor.ActorRef
import org.multiverse.api.latches.StandardLatch
@ -16,11 +13,13 @@ import com.rabbitmq.client.AMQP.BasicProperties
import java.lang.String
import org.scalatest.matchers.MustMatchers
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ProducerParameters}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
class AMQPProducerMessageTest extends JUnitSuite with MustMatchers with Logging {
class AMQPProducerMessageTest extends JUnitSuite with MustMatchers {
@Test
def producerMessage = if (AMQPTest.enabled) {
def producerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection: ActorRef = AMQP.newConnection()
try {
@ -41,11 +40,4 @@ class AMQPProducerMessageTest extends JUnitSuite with MustMatchers with Logging
connection.stop
}
}
@Test
def dummy {
// amqp tests need local rabbitmq server running, so a disabled by default.
// this dummy test makes sure that the whole test class doesn't fail because of missing tests
assert(true)
}
}

View file

@ -0,0 +1,43 @@
package se.scalablesolutions.akka.amqp.test
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
import org.scalatest.matchers.MustMatchers
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.amqp.AMQP
import org.junit.Test
import org.multiverse.api.latches.StandardLatch
import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.amqp.rpc.RPC
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol
class AMQPProtobufProducerConsumerTest extends JUnitSuite with MustMatchers {
@Test
def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection()
val responseLatch = new StandardLatch
RPC.newProtobufRpcServer(connection, "protoexchange", requestHandler)
val request = AddressProtocol.newBuilder.setHostname("testhost").setPort(4321).build
def responseHandler(response: AddressProtocol) = {
assert(response.getHostname == request.getHostname.reverse)
responseLatch.open
}
AMQP.newProtobufConsumer(connection, "", responseHandler, Some("proto.reply.key"))
val producer = AMQP.newProtobufProducer[AddressProtocol](connection, "protoexchange")
producer.send(request, Some("proto.reply.key"))
responseLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
}
def requestHandler(request: AddressProtocol): AddressProtocol = {
AddressProtocol.newBuilder.setHostname(request.getHostname.reverse).setPort(request.getPort).build
}
}

View file

@ -0,0 +1,61 @@
package se.scalablesolutions.akka.amqp.test
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
import se.scalablesolutions.akka.amqp._
import rpc.RPC
import rpc.RPC.{RpcClientSerializer, RpcServerSerializer}
import se.scalablesolutions.akka.actor.Actor._
import org.scalatest.matchers.MustMatchers
import java.util.concurrent.{CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.amqp.AMQP._
import org.scalatest.junit.JUnitSuite
import org.junit.Test
class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers {
@Test
def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection()
val countDown = new CountDownLatch(3)
val channelCallback = actor {
case Started => countDown.countDown
case Restarting => ()
case Stopped => ()
}
val exchangeParameters = ExchangeParameters("text_topic_exchange", ExchangeType.Topic)
val channelParameters = ChannelParameters(channelCallback
= Some(channelCallback))
val rpcServerSerializer = new RpcServerSerializer[String, Int](
new FromBinary[String] {
def fromBinary(bytes: Array[Byte]) = new String(bytes)
}, new ToBinary[Int] {
def toBinary(t: Int) = Array(t.toByte)
})
def requestHandler(request: String) = 3
val rpcServer = RPC.newRpcServer[String, Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer,
requestHandler, channelParameters = Some(channelParameters))
val rpcClientSerializer = new RpcClientSerializer[String, Int](
new ToBinary[String] {
def toBinary(t: String) = t.getBytes
}, new FromBinary[Int] {
def fromBinary(bytes: Array[Byte]) = bytes.head.toInt
})
val rpcClient = RPC.newRpcClient[String, Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer,
channelParameters = Some(channelParameters))
countDown.await(2, TimeUnit.SECONDS) must be(true)
val response = rpcClient !! "some_payload"
response must be(Some(3))
}
}

View file

@ -0,0 +1,49 @@
package se.scalablesolutions.akka.amqp.test
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
import org.scalatest.matchers.MustMatchers
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.amqp.AMQP
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol
import org.junit.Test
import se.scalablesolutions.akka.amqp.rpc.RPC
import org.multiverse.api.latches.StandardLatch
import java.util.concurrent.TimeUnit
class AMQPRpcProtobufTest extends JUnitSuite with MustMatchers {
@Test
def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection()
RPC.newProtobufRpcServer(connection, "protoservice", requestHandler)
val protobufClient = RPC.newProtobufRpcClient[AddressProtocol, AddressProtocol](connection, "protoservice")
val request = AddressProtocol.newBuilder.setHostname("testhost").setPort(4321).build
protobufClient.call(request) match {
case Some(response) => assert(response.getHostname == request.getHostname.reverse)
case None => fail("no response")
}
val aSyncLatch = new StandardLatch
protobufClient.callAsync(request) {
case Some(response) => {
assert(response.getHostname == request.getHostname.reverse)
aSyncLatch.open
}
case None => fail("no response")
}
aSyncLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
}
def requestHandler(request: AddressProtocol): AddressProtocol = {
AddressProtocol.newBuilder.setHostname(request.getHostname.reverse).setPort(request.getPort).build
}
}

View file

@ -0,0 +1,47 @@
package se.scalablesolutions.akka.amqp.test
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
import org.scalatest.matchers.MustMatchers
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.amqp.AMQP
import org.junit.Test
import se.scalablesolutions.akka.amqp.rpc.RPC
import org.multiverse.api.latches.StandardLatch
import java.util.concurrent.TimeUnit
class AMQPRpcStringTest extends JUnitSuite with MustMatchers {
@Test
def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection()
RPC.newStringRpcServer(connection, "stringservice", requestHandler)
val protobufClient = RPC.newStringRpcClient(connection, "stringservice")
val request = "teststring"
protobufClient.call(request) match {
case Some(response) => assert(response == request.reverse)
case None => fail("no response")
}
val aSyncLatch = new StandardLatch
protobufClient.callAsync(request) {
case Some(response) => {
assert(response == request.reverse)
aSyncLatch.open
}
case None => fail("no response")
}
aSyncLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
}
def requestHandler(request: String): String= {
request.reverse
}
}

View file

@ -0,0 +1,44 @@
package se.scalablesolutions.akka.amqp.test
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
import org.scalatest.matchers.MustMatchers
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.amqp.AMQP
import org.junit.Test
import org.multiverse.api.latches.StandardLatch
import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.amqp.rpc.RPC
class AMQPStringProducerConsumerTest extends JUnitSuite with MustMatchers {
@Test
def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState {
val connection = AMQP.newConnection()
val responseLatch = new StandardLatch
RPC.newStringRpcServer(connection, "stringexchange", requestHandler)
val request = "somemessage"
def responseHandler(response: String) = {
assert(response == request.reverse)
responseLatch.open
}
AMQP.newStringConsumer(connection, "", responseHandler, Some("string.reply.key"))
val producer = AMQP.newStringProducer(connection, "stringexchange")
producer.send(request, Some("string.reply.key"))
responseLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
}
def requestHandler(request: String): String= {
println("###### Reverse")
request.reverse
}
}

View file

@ -4,6 +4,16 @@
package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.amqp.AMQP
object AMQPTest {
def enabled = false
def withCleanEndState(action: => Unit) {
try {
action
} finally {
AMQP.shutdownAll
}
}
}

View file

@ -27,7 +27,7 @@
<fileNamePattern>./logs/akka.log.%d{yyyy-MM-dd-HH}</fileNamePattern>
</rollingPolicy>
</appender>
<logger name="se.scalablesolutions" level="INFO"/>
<logger name="se.scalablesolutions" level="DEBUG"/>
<root level="INFO">
<appender-ref ref="stdout"/>
<appender-ref ref="R"/>

View file

@ -374,6 +374,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
class AkkaAMQPProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with CodeFellowPlugin {
val commons_io = Dependencies.commons_io
val rabbit = Dependencies.rabbit
val protobuf = Dependencies.protobuf
// testing
val junit = Dependencies.junit