diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index a3549ea88e..974a7196a6 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -11,6 +11,7 @@ import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory} import java.lang.IllegalArgumentException import se.scalablesolutions.akka.util.{Logging} import java.util.UUID +import se.scalablesolutions.akka.serialization.Serializer /** * AMQP Actor API. Implements Connection, Producer and Consumer materialized as Actors. @@ -87,22 +88,24 @@ object AMQP { def newRpcClient(connection: ActorRef, exchangeParameters: ExchangeParameters, routingKey: String, - deliveryHandler: ActorRef, + inSerializer: Serializer, + outSerializer: Serializer, channelParameters: Option[ChannelParameters] = None): ActorRef = { - val replyToRoutingKey = UUID.randomUUID.toString - val producer = newProducer(connection, new ProducerParameters(exchangeParameters, channelParameters = channelParameters)) - val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, replyToRoutingKey, deliveryHandler, channelParameters = channelParameters)) - val rpcActor: ActorRef = actorOf(new RpcClientActor(producer, routingKey, replyToRoutingKey)).start + val rpcActor: ActorRef = actorOf(new RpcClientActor(exchangeParameters, routingKey, inSerializer, outSerializer, channelParameters)) + connection.startLink(rpcActor) + rpcActor ! Start rpcActor } def newRpcServer(connection: ActorRef, exchangeParameters: ExchangeParameters, routingKey: String, - requestHandler: Function[Array[Byte], Array[Byte]], + inSerializer: Serializer, + outSerializer: Serializer, + requestHandler: PartialFunction[AnyRef, AnyRef], channelParameters: Option[ChannelParameters] = None) = { - val producer = newProducer(connection, new ProducerParameters(exchangeParameters, channelParameters = channelParameters)) - val rpcServer: ActorRef = actorOf(new RpcServerActor(producer, requestHandler)).start + val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters)) + val rpcServer = actorOf(new RpcServerActor(producer, inSerializer, outSerializer, requestHandler)) val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer , channelParameters = channelParameters , selfAcknowledging = false)) diff --git a/akka-amqp/src/main/scala/ExampleSession.scala b/akka-amqp/src/main/scala/ExampleSession.scala index 435d921906..8049eb74ab 100644 --- a/akka-amqp/src/main/scala/ExampleSession.scala +++ b/akka-amqp/src/main/scala/ExampleSession.scala @@ -8,6 +8,8 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRegistry} import Actor._ import java.util.concurrent.{CountDownLatch, TimeUnit} import se.scalablesolutions.akka.amqp.AMQP._ +import se.scalablesolutions.akka.serialization.Serializer +import java.lang.Class object ExampleSession { def main(args: Array[String]) = { @@ -31,6 +33,11 @@ object ExampleSession { TimeUnit.SECONDS.sleep(2) + println("==== RPC ===") + rpc + + TimeUnit.SECONDS.sleep(2) + ActorRegistry.shutdownAll System.exit(0) } @@ -120,4 +127,25 @@ object ExampleSession { channelCountdown.await(2, TimeUnit.SECONDS) connection.stop } + + def rpc = { + val connection = AMQP.newConnection() + + val exchangeParameters = ExchangeParameters("my_rpc_exchange", ExchangeType.Topic) + + val stringSerializer = new Serializer { + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]) = new String(bytes) + def toBinary(obj: AnyRef) = obj.asInstanceOf[String].getBytes + } + + val rpcServer = AMQP.newRpcServer(connection, exchangeParameters, "rpc.in.key", stringSerializer, stringSerializer, { + case "rpc_request" => "rpc_response" + case _ => error("unknown request") + }) + + val rpcClient = AMQP.newRpcClient(connection, exchangeParameters, "rpc.in.key", stringSerializer, stringSerializer) + + val response = (rpcClient !! "rpc_request") + log.info("Response: " + response) + } } diff --git a/akka-amqp/src/main/scala/FaultTolerantChannelActor.scala b/akka-amqp/src/main/scala/FaultTolerantChannelActor.scala index 433c5d31f8..40bcd5de57 100644 --- a/akka-amqp/src/main/scala/FaultTolerantChannelActor.scala +++ b/akka-amqp/src/main/scala/FaultTolerantChannelActor.scala @@ -12,7 +12,9 @@ import com.rabbitmq.client.{ShutdownSignalException, Channel, ShutdownListener} import scala.PartialFunction import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters} -abstract private[amqp] class FaultTolerantChannelActor(exchangeParameters: ExchangeParameters, channelParameters: Option[ChannelParameters]) extends Actor { +abstract private[amqp] class FaultTolerantChannelActor( + exchangeParameters: ExchangeParameters, channelParameters: Option[ChannelParameters]) extends Actor { + import exchangeParameters._ protected[amqp] var channel: Option[Channel] = None @@ -62,10 +64,12 @@ abstract private[amqp] class FaultTolerantChannelActor(exchangeParameters: Excha protected def setupChannel(ch: Channel) private def setupChannelInternal(ch: Channel) = if (channel.isEmpty) { - if (exchangePassive) { - ch.exchangeDeclarePassive(exchangeName) - } else { - ch.exchangeDeclare(exchangeName, exchangeType.toString, exchangeDurable, exchangeAutoDelete, JavaConversions.asMap(configurationArguments)) + if (exchangeName != "") { + if (exchangePassive) { + ch.exchangeDeclarePassive(exchangeName) + } else { + ch.exchangeDeclare(exchangeName, exchangeType.toString, exchangeDurable, exchangeAutoDelete, JavaConversions.asMap(configurationArguments)) + } } ch.addShutdownListener(new ShutdownListener { def shutdownCompleted(cause: ShutdownSignalException) = { diff --git a/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala b/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala index 6e3a936256..1bf8e2e088 100644 --- a/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala +++ b/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala @@ -6,13 +6,12 @@ package se.scalablesolutions.akka.amqp import java.util.{TimerTask, Timer} import java.io.IOException -import se.scalablesolutions.akka.util.Logging import com.rabbitmq.client._ import se.scalablesolutions.akka.amqp.AMQP.ConnectionParameters import se.scalablesolutions.akka.actor.{Exit, Actor} import se.scalablesolutions.akka.config.ScalaConfig.{Permanent, LifeCycle} -private[amqp] class FaultTolerantConnectionActor(connectionParameters: ConnectionParameters) extends Actor with Logging { +private[amqp] class FaultTolerantConnectionActor(connectionParameters: ConnectionParameters) extends Actor { import connectionParameters._ self.id = "amqp-connection-%s".format(host) diff --git a/akka-amqp/src/main/scala/ProducerActor.scala b/akka-amqp/src/main/scala/ProducerActor.scala index e544e4fc63..db290a5ac1 100644 --- a/akka-amqp/src/main/scala/ProducerActor.scala +++ b/akka-amqp/src/main/scala/ProducerActor.scala @@ -9,6 +9,7 @@ import se.scalablesolutions.akka.amqp.AMQP.ProducerParameters private[amqp] class ProducerActor(producerParameters: ProducerParameters) extends FaultTolerantChannelActor(producerParameters.exchangeParameters, producerParameters.channelParameters) { + import producerParameters._ import exchangeParameters._ diff --git a/akka-amqp/src/main/scala/RpcClientActor.scala b/akka-amqp/src/main/scala/RpcClientActor.scala index cf2d131394..796568e011 100644 --- a/akka-amqp/src/main/scala/RpcClientActor.scala +++ b/akka-amqp/src/main/scala/RpcClientActor.scala @@ -4,27 +4,38 @@ package se.scalablesolutions.akka.amqp -import se.scalablesolutions.akka.actor.{ActorRef, Actor} -import com.rabbitmq.client.AMQP.BasicProperties -import se.scalablesolutions.akka.config.ScalaConfig.{LifeCycle, Permanent} +import se.scalablesolutions.akka.serialization.Serializer +import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ExchangeParameters} +import com.rabbitmq.client.{Channel, RpcClient} -class RpcClientActor(producer: ActorRef, routingKey: String, replyTo: String) extends Actor { +class RpcClientActor(exchangeParameters: ExchangeParameters, + routingKey: String, + inSerializer: Serializer, + outSerializer: Serializer, + channelParameters: Option[ChannelParameters] = None) extends FaultTolerantChannelActor(exchangeParameters, channelParameters) { + + import exchangeParameters._ + + var rpcClient: Option[RpcClient] = None - self.lifeCycle = Some(LifeCycle(Permanent)) - log.info("%s started", this) - protected def receive = { - case payload: Array[Byte] => { - val props = new BasicProperties - props.setReplyTo(replyTo) - producer ! new Message(payload, routingKey, properties = Some(props)) + def specificMessageHandler = { + case payload: AnyRef => { + + rpcClient.foreach {client => + val response: Array[Byte] = client.primitiveCall(inSerializer.toBinary(payload)) + reply(outSerializer.fromBinary(response, None)) + } } } + protected def setupChannel(ch: Channel) = { + rpcClient = Some(new RpcClient(ch, exchangeName, routingKey)) + } + override def toString(): String = - "AMQP.RpcClient[producerId=" + producer.id + - ", routingKey=" + routingKey+ - ", replyTo=" + replyTo + "]" + "AMQP.RpcClient[exchange=" +exchangeName + + ", routingKey=" + routingKey+ "]" } \ No newline at end of file diff --git a/akka-amqp/src/main/scala/RpcServerActor.scala b/akka-amqp/src/main/scala/RpcServerActor.scala index b1bfee9df5..45d1653dcb 100644 --- a/akka-amqp/src/main/scala/RpcServerActor.scala +++ b/akka-amqp/src/main/scala/RpcServerActor.scala @@ -5,24 +5,30 @@ package se.scalablesolutions.akka.amqp import se.scalablesolutions.akka.actor.{ActorRef, Actor} -import se.scalablesolutions.akka.config.ScalaConfig.{Permanent, LifeCycle} +import com.rabbitmq.client.AMQP.BasicProperties +import se.scalablesolutions.akka.serialization.Serializer -class RpcServerActor(producer: ActorRef, requestHandler: Function[Array[Byte], Array[Byte]]) extends Actor { - - self.lifeCycle = Some(LifeCycle(Permanent)) +class RpcServerActor(producer: ActorRef, inSerializer: Serializer, outSerializer: Serializer, requestHandler: PartialFunction[AnyRef, AnyRef]) extends Actor { log.info("%s started", this) protected def receive = { case Delivery(payload, _, tag, props, sender) => { - val response: Array[Byte] = requestHandler(payload) + log.debug("%s handling delivery with tag %d", this, tag) + val request = inSerializer.fromBinary(payload, None) + val response: Array[Byte] = outSerializer.toBinary(requestHandler(request)) - log.info("Sending reply to %s", props.getReplyTo) - producer ! new Message(response, props.getReplyTo) + log.info("%s sending reply to %s", this, props.getReplyTo) + val replyProps = new BasicProperties + replyProps.setCorrelationId(props.getCorrelationId) + producer ! new Message(response, props.getReplyTo, properties = Some(replyProps)) sender.foreach(_ ! Acknowledge(tag)) } - case Acknowledged(tag) => log.debug("todo") + case Acknowledged(tag) => log.debug("%s acknowledged delivery with tag %d", this, tag) } + + override def toString(): String = + "AMQP.RpcServer[producerId=" + producer.id + "]" } \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala index 7a0aae06e4..9bbf6dd451 100644 --- a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala +++ b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala @@ -13,6 +13,7 @@ import se.scalablesolutions.akka.actor.Actor._ import org.scalatest.matchers.MustMatchers import java.util.concurrent.{CountDownLatch, TimeUnit} import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters} +import se.scalablesolutions.akka.serialization.Serializer class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging { @@ -21,7 +22,7 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging val connection = AMQP.newConnection() try { - val countDown = new CountDownLatch(4) + val countDown = new CountDownLatch(3) val channelCallback = actor { case Started => countDown.countDown case Restarting => () @@ -30,21 +31,22 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging val exchangeParameters = ExchangeParameters("text_exchange",ExchangeType.Topic) val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) - - def requestHandler(request: Array[Byte]): Array[Byte] = { - "someresult".getBytes + val stringSerializer = new Serializer { + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]) = new String(bytes) + def toBinary(obj: AnyRef) = obj.asInstanceOf[String].getBytes } - - val rpcServer = AMQP.newRpcServer(connection, exchangeParameters, "rpc.routing", requestHandler, channelParameters = Some(channelParameters)) - val payloadLatch = new StandardLatch - val rpcClient = AMQP.newRpcClient(connection, exchangeParameters, "rpc.routing", actor { - case Delivery(payload, _, _, _, _) => payloadLatch.open + val rpcServer = AMQP.newRpcServer(connection, exchangeParameters, "rpc.routing", stringSerializer, stringSerializer, { + case "some_payload" => "some_result" + case _ => error("Unhandled message") }, channelParameters = Some(channelParameters)) + val rpcClient = AMQP.newRpcClient(connection, exchangeParameters, "rpc.routing", stringSerializer, stringSerializer + , channelParameters = Some(channelParameters)) + countDown.await(2, TimeUnit.SECONDS) must be (true) - rpcClient ! "some_payload".getBytes - payloadLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + val response = rpcClient !! "some_payload" + response must be (Some("some_result")) } finally { connection.stop }