From 32149173df404c87d37663b52bbf8ca6076ea4d5 Mon Sep 17 00:00:00 2001 From: momania Date: Fri, 6 Aug 2010 13:35:44 +0200 Subject: [PATCH] - made rpc handler reqular function instead of partial function - add queuename as optional parameter for rpc server (for i.e. loadbalancing purposes) --- akka-amqp/src/main/scala/ExampleSession.scala | 8 ++-- akka-amqp/src/main/scala/RpcServerActor.scala | 2 +- .../test/scala/AMQPRpcClientServerTest.scala | 44 +++++++++---------- akka-amqp/src/test/scala/AMQPTest.scala | 2 +- 4 files changed, 27 insertions(+), 29 deletions(-) diff --git a/akka-amqp/src/main/scala/ExampleSession.scala b/akka-amqp/src/main/scala/ExampleSession.scala index 97571e2783..4fa1358a29 100644 --- a/akka-amqp/src/main/scala/ExampleSession.scala +++ b/akka-amqp/src/main/scala/ExampleSession.scala @@ -144,10 +144,10 @@ object ExampleSession { } val rpcServerSerializer = new RpcServerSerializer[String, Int](serverFromBinary, serverToBinary) - val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer, { - case "rpc_request" => 3 - case _ => error("unknown request") - }) + def requestHandler(request: String) = 3 + + val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer, + requestHandler, queueName = Some("rpc.in.key.queue")) /** Client */ diff --git a/akka-amqp/src/main/scala/RpcServerActor.scala b/akka-amqp/src/main/scala/RpcServerActor.scala index c64ef9058e..99d74d9b56 100644 --- a/akka-amqp/src/main/scala/RpcServerActor.scala +++ b/akka-amqp/src/main/scala/RpcServerActor.scala @@ -8,7 +8,7 @@ import se.scalablesolutions.akka.actor.{ActorRef, Actor} import com.rabbitmq.client.AMQP.BasicProperties import se.scalablesolutions.akka.amqp.AMQP.RpcServerSerializer -class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,O], requestHandler: PartialFunction[I, O]) extends Actor { +class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,O], requestHandler: I => O) extends Actor { log.info("%s started", this) diff --git a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala index dcaec4cd06..c585675098 100644 --- a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala +++ b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala @@ -11,11 +11,9 @@ import se.scalablesolutions.akka.amqp._ import se.scalablesolutions.akka.actor.Actor._ import org.scalatest.matchers.MustMatchers import java.util.concurrent.{CountDownLatch, TimeUnit} -import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.amqp.AMQP._ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging { - @Test def consumerMessage = if (AMQPTest.enabled) { val connection = AMQP.newConnection() @@ -32,31 +30,31 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) - val serverFromBinary = new FromBinary[String] { - def fromBinary(bytes: Array[Byte]) = new String(bytes) - } - val serverToBinary = new ToBinary[Int] { - def toBinary(t: Int) = Array(t.toByte) - } - val rpcServerSerializer = new RpcServerSerializer[String, Int](serverFromBinary, serverToBinary) - val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer, { - case "some_payload" => 3 - case _ => error("unknown request") - }, channelParameters = Some(channelParameters)) + val rpcServerSerializer = new RpcServerSerializer[String, Int]( + new FromBinary[String] { + def fromBinary(bytes: Array[Byte]) = new String(bytes) + }, new ToBinary[Int] { + def toBinary(t: Int) = Array(t.toByte) + }) - val clientToBinary = new ToBinary[String] { - def toBinary(t: String) = t.getBytes - } - val clientFromBinary = new FromBinary[Int] { - def fromBinary(bytes: Array[Byte]) = bytes.head.toInt - } - val rpcClientSerializer = new RpcClientSerializer[String, Int](clientToBinary, clientFromBinary) - val rpcClient = AMQP.newRpcClient[String,Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer, + def requestHandler(request: String) = 3 + + val rpcServer = AMQP.newRpcServer[String, Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer, + requestHandler, channelParameters = Some(channelParameters)) + + val rpcClientSerializer = new RpcClientSerializer[String, Int]( + new ToBinary[String] { + def toBinary(t: String) = t.getBytes + }, new FromBinary[Int] { + def fromBinary(bytes: Array[Byte]) = bytes.head.toInt + }) + + val rpcClient = AMQP.newRpcClient[String, Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer, channelParameters = Some(channelParameters)) - countDown.await(2, TimeUnit.SECONDS) must be (true) + countDown.await(2, TimeUnit.SECONDS) must be(true) val response = rpcClient !! "some_payload" - response must be (Some(3)) + response must be(Some(3)) } finally { connection.stop } diff --git a/akka-amqp/src/test/scala/AMQPTest.scala b/akka-amqp/src/test/scala/AMQPTest.scala index 5ff9157bc5..8cad27ba60 100644 --- a/akka-amqp/src/test/scala/AMQPTest.scala +++ b/akka-amqp/src/test/scala/AMQPTest.scala @@ -5,5 +5,5 @@ package se.scalablesolutions.akka.amqp.test object AMQPTest { - def enabled = false + def enabled = true }