From f36ad6c2f7b91d1ed4c6fc7d301d99c97cecbfc0 Mon Sep 17 00:00:00 2001 From: momania Date: Thu, 8 Jul 2010 11:28:00 +0200 Subject: [PATCH] added rpc server and unit test --- akka-amqp/src/main/scala/AMQP.scala | 26 ++++++-- .../scala/FaultTolerantChannelActor.scala | 2 - akka-amqp/src/main/scala/RpcClientActor.scala | 11 ++++ akka-amqp/src/main/scala/RpcServerActor.scala | 28 +++++++++ .../test/scala/AMQPRpcClientServerTest.scala | 59 +++++++++++++++++++ 5 files changed, 119 insertions(+), 7 deletions(-) create mode 100644 akka-amqp/src/main/scala/RpcServerActor.scala create mode 100644 akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 3f56272846..a3549ea88e 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -84,15 +84,31 @@ object AMQP { consumer } - def newRpcClient(connection: ActorRef, exchangeParameters: ExchangeParameters, routingKey: String, deliveryHandler: ActorRef): ActorRef = { + def newRpcClient(connection: ActorRef, + exchangeParameters: ExchangeParameters, + routingKey: String, + deliveryHandler: ActorRef, + channelParameters: Option[ChannelParameters] = None): ActorRef = { val replyToRoutingKey = UUID.randomUUID.toString - val producer = newProducer(connection, new ProducerParameters(exchangeParameters)) - val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, replyToRoutingKey, deliveryHandler)) - val rpcActor: ActorRef = actorOf(new RpcClientActor(producer, routingKey, replyToRoutingKey)) - connection.startLink(rpcActor) + val producer = newProducer(connection, new ProducerParameters(exchangeParameters, channelParameters = channelParameters)) + val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, replyToRoutingKey, deliveryHandler, channelParameters = channelParameters)) + val rpcActor: ActorRef = actorOf(new RpcClientActor(producer, routingKey, replyToRoutingKey)).start rpcActor } + def newRpcServer(connection: ActorRef, + exchangeParameters: ExchangeParameters, + routingKey: String, + requestHandler: Function[Array[Byte], Array[Byte]], + channelParameters: Option[ChannelParameters] = None) = { + val producer = newProducer(connection, new ProducerParameters(exchangeParameters, channelParameters = channelParameters)) + val rpcServer: ActorRef = actorOf(new RpcServerActor(producer, requestHandler)).start + val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer + , channelParameters = channelParameters + , selfAcknowledging = false)) + + } + private val supervisor = new AMQPSupervisor class AMQPSupervisor extends Logging { diff --git a/akka-amqp/src/main/scala/FaultTolerantChannelActor.scala b/akka-amqp/src/main/scala/FaultTolerantChannelActor.scala index 78ba6d3fbb..433c5d31f8 100644 --- a/akka-amqp/src/main/scala/FaultTolerantChannelActor.scala +++ b/akka-amqp/src/main/scala/FaultTolerantChannelActor.scala @@ -62,7 +62,6 @@ abstract private[amqp] class FaultTolerantChannelActor(exchangeParameters: Excha protected def setupChannel(ch: Channel) private def setupChannelInternal(ch: Channel) = if (channel.isEmpty) { - log.info("Exchange declare") if (exchangePassive) { ch.exchangeDeclarePassive(exchangeName) } else { @@ -75,7 +74,6 @@ abstract private[amqp] class FaultTolerantChannelActor(exchangeParameters: Excha }) channelParameters.foreach(_.shutdownListener.foreach(sdl => ch.getConnection.addShutdownListener(sdl))) - log.info("shutdown listener added") setupChannel(ch) channel = Some(ch) notifyCallback(Started) diff --git a/akka-amqp/src/main/scala/RpcClientActor.scala b/akka-amqp/src/main/scala/RpcClientActor.scala index 979b2f2e56..cf2d131394 100644 --- a/akka-amqp/src/main/scala/RpcClientActor.scala +++ b/akka-amqp/src/main/scala/RpcClientActor.scala @@ -6,9 +6,14 @@ package se.scalablesolutions.akka.amqp import se.scalablesolutions.akka.actor.{ActorRef, Actor} import com.rabbitmq.client.AMQP.BasicProperties +import se.scalablesolutions.akka.config.ScalaConfig.{LifeCycle, Permanent} class RpcClientActor(producer: ActorRef, routingKey: String, replyTo: String) extends Actor { + + self.lifeCycle = Some(LifeCycle(Permanent)) + log.info("%s started", this) + protected def receive = { case payload: Array[Byte] => { val props = new BasicProperties @@ -16,4 +21,10 @@ class RpcClientActor(producer: ActorRef, routingKey: String, replyTo: String) ex producer ! new Message(payload, routingKey, properties = Some(props)) } } + + override def toString(): String = + "AMQP.RpcClient[producerId=" + producer.id + + ", routingKey=" + routingKey+ + ", replyTo=" + replyTo + "]" + } \ No newline at end of file diff --git a/akka-amqp/src/main/scala/RpcServerActor.scala b/akka-amqp/src/main/scala/RpcServerActor.scala new file mode 100644 index 0000000000..b1bfee9df5 --- /dev/null +++ b/akka-amqp/src/main/scala/RpcServerActor.scala @@ -0,0 +1,28 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.amqp + +import se.scalablesolutions.akka.actor.{ActorRef, Actor} +import se.scalablesolutions.akka.config.ScalaConfig.{Permanent, LifeCycle} + +class RpcServerActor(producer: ActorRef, requestHandler: Function[Array[Byte], Array[Byte]]) extends Actor { + + self.lifeCycle = Some(LifeCycle(Permanent)) + + log.info("%s started", this) + + protected def receive = { + case Delivery(payload, _, tag, props, sender) => { + + val response: Array[Byte] = requestHandler(payload) + + log.info("Sending reply to %s", props.getReplyTo) + producer ! new Message(response, props.getReplyTo) + + sender.foreach(_ ! Acknowledge(tag)) + } + case Acknowledged(tag) => log.debug("todo") + } +} \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala new file mode 100644 index 0000000000..7a0aae06e4 --- /dev/null +++ b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.amqp.test + +import se.scalablesolutions.akka.util.Logging +import org.scalatest.junit.JUnitSuite +import org.junit.Test +import se.scalablesolutions.akka.amqp._ +import org.multiverse.api.latches.StandardLatch +import se.scalablesolutions.akka.actor.Actor._ +import org.scalatest.matchers.MustMatchers +import java.util.concurrent.{CountDownLatch, TimeUnit} +import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters} + +class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging { + + @Test + def consumerMessage = { + val connection = AMQP.newConnection() + try { + + val countDown = new CountDownLatch(4) + val channelCallback = actor { + case Started => countDown.countDown + case Restarting => () + case Stopped => () + } + + val exchangeParameters = ExchangeParameters("text_exchange",ExchangeType.Topic) + val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) + + def requestHandler(request: Array[Byte]): Array[Byte] = { + "someresult".getBytes + } + + val rpcServer = AMQP.newRpcServer(connection, exchangeParameters, "rpc.routing", requestHandler, channelParameters = Some(channelParameters)) + + val payloadLatch = new StandardLatch + val rpcClient = AMQP.newRpcClient(connection, exchangeParameters, "rpc.routing", actor { + case Delivery(payload, _, _, _, _) => payloadLatch.open + }, channelParameters = Some(channelParameters)) + + countDown.await(2, TimeUnit.SECONDS) must be (true) + rpcClient ! "some_payload".getBytes + payloadLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + } finally { + connection.stop + } + } + + @Test + def dummy { + // amqp tests need local rabbitmq server running, so a disabled by default. + // this dummy test makes sure that the whole test class doesn't fail because of missing tests + assert(true) + } +} \ No newline at end of file