diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index be3556e812..a4fe23b7b0 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -10,8 +10,6 @@ import se.scalablesolutions.akka.config.OneForOneStrategy import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory} import java.lang.IllegalArgumentException import se.scalablesolutions.akka.util.Logging -import se.scalablesolutions.akka.serialization.Serializer - /** * AMQP Actor API. Implements Connection, Producer and Consumer materialized as Actors. * @@ -20,7 +18,6 @@ import se.scalablesolutions.akka.serialization.Serializer * @author Irmo Manie */ object AMQP { - case class ConnectionParameters( host: String = ConnectionFactory.DEFAULT_HOST, port: Int = ConnectionFactory.DEFAULT_AMQP_PORT, @@ -57,7 +54,6 @@ object AMQP { queueExclusive: Boolean = false, selfAcknowledging: Boolean = true, channelParameters: Option[ChannelParameters] = None) { - if (queueDurable && queueName.isEmpty) { throw new IllegalArgumentException("A queue name is required when requesting a durable queue.") } @@ -84,27 +80,25 @@ object AMQP { consumer } - def newRpcClient(connection: ActorRef, + def newRpcClient[O,I](connection: ActorRef, exchangeParameters: ExchangeParameters, routingKey: String, - inSerializer: Serializer, - outSerializer: Serializer, + serializer: RpcClientSerializer[O,I], channelParameters: Option[ChannelParameters] = None): ActorRef = { - val rpcActor: ActorRef = actorOf(new RpcClientActor(exchangeParameters, routingKey, inSerializer, outSerializer, channelParameters)) + val rpcActor: ActorRef = actorOf(new RpcClientActor[O,I](exchangeParameters, routingKey, serializer, channelParameters)) connection.startLink(rpcActor) rpcActor ! Start rpcActor } - def newRpcServer(connection: ActorRef, - exchangeParameters: ExchangeParameters, - routingKey: String, - inSerializer: Serializer, - outSerializer: Serializer, - requestHandler: PartialFunction[AnyRef, AnyRef], - channelParameters: Option[ChannelParameters] = None) = { + def newRpcServer[I,O](connection: ActorRef, + exchangeParameters: ExchangeParameters, + routingKey: String, + serializer: RpcServerSerializer[I,O], + requestHandler: PartialFunction[I, O], + channelParameters: Option[ChannelParameters] = None) = { val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters)) - val rpcServer = actorOf(new RpcServerActor(producer, inSerializer, outSerializer, requestHandler)) + val rpcServer = actorOf(new RpcServerActor[I,O](producer, serializer, requestHandler)) val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer , channelParameters = channelParameters , selfAcknowledging = false)) @@ -133,4 +127,7 @@ object AMQP { connectionActor } } + + case class RpcClientSerializer[O,I](output: Function[O, Array[Byte]], input: Function[Array[Byte], I]) + case class RpcServerSerializer[I,O](input: Function[Array[Byte], I], output: Function[O, Array[Byte]]) } diff --git a/akka-amqp/src/main/scala/ExampleSession.scala b/akka-amqp/src/main/scala/ExampleSession.scala index 8049eb74ab..88ad583ea8 100644 --- a/akka-amqp/src/main/scala/ExampleSession.scala +++ b/akka-amqp/src/main/scala/ExampleSession.scala @@ -8,10 +8,10 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRegistry} import Actor._ import java.util.concurrent.{CountDownLatch, TimeUnit} import se.scalablesolutions.akka.amqp.AMQP._ -import se.scalablesolutions.akka.serialization.Serializer -import java.lang.Class +import java.lang.String object ExampleSession { + def main(args: Array[String]) = { println("==== DIRECT ===") direct @@ -97,6 +97,7 @@ object ExampleSession { } def callback = { + val channelCountdown = new CountDownLatch(2) val connectionCallback = actor { @@ -129,21 +130,23 @@ object ExampleSession { } def rpc = { + val connection = AMQP.newConnection() val exchangeParameters = ExchangeParameters("my_rpc_exchange", ExchangeType.Topic) - val stringSerializer = new Serializer { - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]) = new String(bytes) - def toBinary(obj: AnyRef) = obj.asInstanceOf[String].getBytes - } + /** Server */ + val rpcServerSerializer = new RpcServerSerializer[String, Int]({x:Array[Byte] => new String(x)}, {x:Int => Array(x.toByte)}) - val rpcServer = AMQP.newRpcServer(connection, exchangeParameters, "rpc.in.key", stringSerializer, stringSerializer, { - case "rpc_request" => "rpc_response" + val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer, { + case "rpc_request" => 3 case _ => error("unknown request") }) - val rpcClient = AMQP.newRpcClient(connection, exchangeParameters, "rpc.in.key", stringSerializer, stringSerializer) + /** Client */ + val rpcClientSerializer = new RpcClientSerializer[String, Int]({x:String => x.getBytes}, {x:Array[Byte] => x.head.toInt}) + + val rpcClient = AMQP.newRpcClient[String,Int](connection, exchangeParameters, "rpc.in.key", rpcClientSerializer) val response = (rpcClient !! "rpc_request") log.info("Response: " + response) diff --git a/akka-amqp/src/main/scala/RpcClientActor.scala b/akka-amqp/src/main/scala/RpcClientActor.scala index 8ff7d8a0ac..6880a4a36f 100644 --- a/akka-amqp/src/main/scala/RpcClientActor.scala +++ b/akka-amqp/src/main/scala/RpcClientActor.scala @@ -4,14 +4,12 @@ package se.scalablesolutions.akka.amqp -import se.scalablesolutions.akka.serialization.Serializer -import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ExchangeParameters} import com.rabbitmq.client.{Channel, RpcClient} +import se.scalablesolutions.akka.amqp.AMQP.{RpcClientSerializer, ChannelParameters, ExchangeParameters} -class RpcClientActor(exchangeParameters: ExchangeParameters, +class RpcClientActor[I,O](exchangeParameters: ExchangeParameters, routingKey: String, - inSerializer: Serializer, - outSerializer: Serializer, + serializer: RpcClientSerializer[I,O], channelParameters: Option[ChannelParameters] = None) extends FaultTolerantChannelActor(exchangeParameters, channelParameters) { import exchangeParameters._ @@ -21,12 +19,12 @@ class RpcClientActor(exchangeParameters: ExchangeParameters, log.info("%s started", this) def specificMessageHandler = { - case payload: AnyRef => { + case payload: I => { rpcClient match { case Some(client) => - val response: Array[Byte] = client.primitiveCall(inSerializer.toBinary(payload)) - self.reply(outSerializer.fromBinary(response, None)) + val response: Array[Byte] = client.primitiveCall(serializer.output(payload)) + self.reply(serializer.input(response)) case None => error("%s has no client to send messages with".format(this)) } } diff --git a/akka-amqp/src/main/scala/RpcServerActor.scala b/akka-amqp/src/main/scala/RpcServerActor.scala index fa760edda8..b2f41cec0d 100644 --- a/akka-amqp/src/main/scala/RpcServerActor.scala +++ b/akka-amqp/src/main/scala/RpcServerActor.scala @@ -6,9 +6,9 @@ package se.scalablesolutions.akka.amqp import se.scalablesolutions.akka.actor.{ActorRef, Actor} import com.rabbitmq.client.AMQP.BasicProperties -import se.scalablesolutions.akka.serialization.Serializer +import se.scalablesolutions.akka.amqp.AMQP.RpcServerSerializer -class RpcServerActor(producer: ActorRef, inSerializer: Serializer, outSerializer: Serializer, requestHandler: PartialFunction[AnyRef, AnyRef]) extends Actor { +class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,O], requestHandler: PartialFunction[I, O]) extends Actor { log.info("%s started", this) @@ -16,8 +16,8 @@ class RpcServerActor(producer: ActorRef, inSerializer: Serializer, outSerializer case Delivery(payload, _, tag, props, sender) => { log.debug("%s handling delivery with tag %d", this, tag) - val request = inSerializer.fromBinary(payload, None) - val response: Array[Byte] = outSerializer.toBinary(requestHandler(request)) + val request = serializer.input(payload) + val response: Array[Byte] = serializer.output(requestHandler(request)) log.debug("%s sending reply to %s", this, props.getReplyTo) val replyProps = new BasicProperties diff --git a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala index eebcfccce3..7a798c366d 100644 --- a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala +++ b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala @@ -11,8 +11,8 @@ import se.scalablesolutions.akka.amqp._ import se.scalablesolutions.akka.actor.Actor._ import org.scalatest.matchers.MustMatchers import java.util.concurrent.{CountDownLatch, TimeUnit} -import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters} import se.scalablesolutions.akka.serialization.Serializer +import se.scalablesolutions.akka.amqp.AMQP._ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging { @@ -29,23 +29,22 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging } val exchangeParameters = ExchangeParameters("text_topic_exchange", ExchangeType.Topic) - val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) - val stringSerializer = new Serializer { - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]) = new String(bytes) - def toBinary(obj: AnyRef) = obj.asInstanceOf[String].getBytes - } + val channelParameters = ChannelParameters(channelCallback + = Some(channelCallback)) - val rpcServer = AMQP.newRpcServer(connection, exchangeParameters, "rpc.routing", stringSerializer, stringSerializer, { - case "some_payload" => "some_result" - case _ => error("Unhandled message") + val rpcServerSerializer = new RpcServerSerializer[String, Int]({x:Array[Byte] => new String(x)}, {x:Int => Array(x.toByte)}) + val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer, { + case "some_payload" => 3 + case _ => error("unknown request") }, channelParameters = Some(channelParameters)) - val rpcClient = AMQP.newRpcClient(connection, exchangeParameters, "rpc.routing", stringSerializer, stringSerializer - , channelParameters = Some(channelParameters)) + val rpcClientSerializer = new RpcClientSerializer[String, Int]({x:String => x.getBytes}, {x:Array[Byte] => x.head.toInt}) + val rpcClient = AMQP.newRpcClient[String,Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer, + channelParameters = Some(channelParameters)) countDown.await(2, TimeUnit.SECONDS) must be (true) val response = rpcClient !! "some_payload" - response must be (Some("some_result")) + response must be (Some(3)) } finally { connection.stop }