added rpc server and unit test

This commit is contained in:
momania 2010-07-08 11:28:00 +02:00
parent fd6db0382b
commit f36ad6c2f7
5 changed files with 119 additions and 7 deletions

View file

@ -84,15 +84,31 @@ object AMQP {
consumer
}
def newRpcClient(connection: ActorRef, exchangeParameters: ExchangeParameters, routingKey: String, deliveryHandler: ActorRef): ActorRef = {
def newRpcClient(connection: ActorRef,
exchangeParameters: ExchangeParameters,
routingKey: String,
deliveryHandler: ActorRef,
channelParameters: Option[ChannelParameters] = None): ActorRef = {
val replyToRoutingKey = UUID.randomUUID.toString
val producer = newProducer(connection, new ProducerParameters(exchangeParameters))
val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, replyToRoutingKey, deliveryHandler))
val rpcActor: ActorRef = actorOf(new RpcClientActor(producer, routingKey, replyToRoutingKey))
connection.startLink(rpcActor)
val producer = newProducer(connection, new ProducerParameters(exchangeParameters, channelParameters = channelParameters))
val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, replyToRoutingKey, deliveryHandler, channelParameters = channelParameters))
val rpcActor: ActorRef = actorOf(new RpcClientActor(producer, routingKey, replyToRoutingKey)).start
rpcActor
}
def newRpcServer(connection: ActorRef,
exchangeParameters: ExchangeParameters,
routingKey: String,
requestHandler: Function[Array[Byte], Array[Byte]],
channelParameters: Option[ChannelParameters] = None) = {
val producer = newProducer(connection, new ProducerParameters(exchangeParameters, channelParameters = channelParameters))
val rpcServer: ActorRef = actorOf(new RpcServerActor(producer, requestHandler)).start
val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer
, channelParameters = channelParameters
, selfAcknowledging = false))
}
private val supervisor = new AMQPSupervisor
class AMQPSupervisor extends Logging {

View file

@ -62,7 +62,6 @@ abstract private[amqp] class FaultTolerantChannelActor(exchangeParameters: Excha
protected def setupChannel(ch: Channel)
private def setupChannelInternal(ch: Channel) = if (channel.isEmpty) {
log.info("Exchange declare")
if (exchangePassive) {
ch.exchangeDeclarePassive(exchangeName)
} else {
@ -75,7 +74,6 @@ abstract private[amqp] class FaultTolerantChannelActor(exchangeParameters: Excha
})
channelParameters.foreach(_.shutdownListener.foreach(sdl => ch.getConnection.addShutdownListener(sdl)))
log.info("shutdown listener added")
setupChannel(ch)
channel = Some(ch)
notifyCallback(Started)

View file

@ -6,9 +6,14 @@ package se.scalablesolutions.akka.amqp
import se.scalablesolutions.akka.actor.{ActorRef, Actor}
import com.rabbitmq.client.AMQP.BasicProperties
import se.scalablesolutions.akka.config.ScalaConfig.{LifeCycle, Permanent}
class RpcClientActor(producer: ActorRef, routingKey: String, replyTo: String) extends Actor {
self.lifeCycle = Some(LifeCycle(Permanent))
log.info("%s started", this)
protected def receive = {
case payload: Array[Byte] => {
val props = new BasicProperties
@ -16,4 +21,10 @@ class RpcClientActor(producer: ActorRef, routingKey: String, replyTo: String) ex
producer ! new Message(payload, routingKey, properties = Some(props))
}
}
override def toString(): String =
"AMQP.RpcClient[producerId=" + producer.id +
", routingKey=" + routingKey+
", replyTo=" + replyTo + "]"
}

View file

@ -0,0 +1,28 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.amqp
import se.scalablesolutions.akka.actor.{ActorRef, Actor}
import se.scalablesolutions.akka.config.ScalaConfig.{Permanent, LifeCycle}
class RpcServerActor(producer: ActorRef, requestHandler: Function[Array[Byte], Array[Byte]]) extends Actor {
self.lifeCycle = Some(LifeCycle(Permanent))
log.info("%s started", this)
protected def receive = {
case Delivery(payload, _, tag, props, sender) => {
val response: Array[Byte] = requestHandler(payload)
log.info("Sending reply to %s", props.getReplyTo)
producer ! new Message(response, props.getReplyTo)
sender.foreach(_ ! Acknowledge(tag))
}
case Acknowledged(tag) => log.debug("todo")
}
}

View file

@ -0,0 +1,59 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.util.Logging
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.amqp._
import org.multiverse.api.latches.StandardLatch
import se.scalablesolutions.akka.actor.Actor._
import org.scalatest.matchers.MustMatchers
import java.util.concurrent.{CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters}
class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging {
@Test
def consumerMessage = {
val connection = AMQP.newConnection()
try {
val countDown = new CountDownLatch(4)
val channelCallback = actor {
case Started => countDown.countDown
case Restarting => ()
case Stopped => ()
}
val exchangeParameters = ExchangeParameters("text_exchange",ExchangeType.Topic)
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
def requestHandler(request: Array[Byte]): Array[Byte] = {
"someresult".getBytes
}
val rpcServer = AMQP.newRpcServer(connection, exchangeParameters, "rpc.routing", requestHandler, channelParameters = Some(channelParameters))
val payloadLatch = new StandardLatch
val rpcClient = AMQP.newRpcClient(connection, exchangeParameters, "rpc.routing", actor {
case Delivery(payload, _, _, _, _) => payloadLatch.open
}, channelParameters = Some(channelParameters))
countDown.await(2, TimeUnit.SECONDS) must be (true)
rpcClient ! "some_payload".getBytes
payloadLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
} finally {
connection.stop
}
}
@Test
def dummy {
// amqp tests need local rabbitmq server running, so a disabled by default.
// this dummy test makes sure that the whole test class doesn't fail because of missing tests
assert(true)
}
}