- made rpc handler reqular function instead of partial function

- add queuename as optional parameter for rpc server (for i.e. loadbalancing purposes)
This commit is contained in:
momania 2010-08-06 13:35:44 +02:00
parent c48ca68db4
commit 32149173df
4 changed files with 27 additions and 29 deletions

View file

@ -144,10 +144,10 @@ object ExampleSession {
}
val rpcServerSerializer = new RpcServerSerializer[String, Int](serverFromBinary, serverToBinary)
val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer, {
case "rpc_request" => 3
case _ => error("unknown request")
})
def requestHandler(request: String) = 3
val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer,
requestHandler, queueName = Some("rpc.in.key.queue"))
/** Client */

View file

@ -8,7 +8,7 @@ import se.scalablesolutions.akka.actor.{ActorRef, Actor}
import com.rabbitmq.client.AMQP.BasicProperties
import se.scalablesolutions.akka.amqp.AMQP.RpcServerSerializer
class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,O], requestHandler: PartialFunction[I, O]) extends Actor {
class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,O], requestHandler: I => O) extends Actor {
log.info("%s started", this)

View file

@ -11,11 +11,9 @@ import se.scalablesolutions.akka.amqp._
import se.scalablesolutions.akka.actor.Actor._
import org.scalatest.matchers.MustMatchers
import java.util.concurrent.{CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.amqp.AMQP._
class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging {
@Test
def consumerMessage = if (AMQPTest.enabled) {
val connection = AMQP.newConnection()
@ -32,31 +30,31 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging
val channelParameters = ChannelParameters(channelCallback
= Some(channelCallback))
val serverFromBinary = new FromBinary[String] {
def fromBinary(bytes: Array[Byte]) = new String(bytes)
}
val serverToBinary = new ToBinary[Int] {
def toBinary(t: Int) = Array(t.toByte)
}
val rpcServerSerializer = new RpcServerSerializer[String, Int](serverFromBinary, serverToBinary)
val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer, {
case "some_payload" => 3
case _ => error("unknown request")
}, channelParameters = Some(channelParameters))
val rpcServerSerializer = new RpcServerSerializer[String, Int](
new FromBinary[String] {
def fromBinary(bytes: Array[Byte]) = new String(bytes)
}, new ToBinary[Int] {
def toBinary(t: Int) = Array(t.toByte)
})
val clientToBinary = new ToBinary[String] {
def toBinary(t: String) = t.getBytes
}
val clientFromBinary = new FromBinary[Int] {
def fromBinary(bytes: Array[Byte]) = bytes.head.toInt
}
val rpcClientSerializer = new RpcClientSerializer[String, Int](clientToBinary, clientFromBinary)
val rpcClient = AMQP.newRpcClient[String,Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer,
def requestHandler(request: String) = 3
val rpcServer = AMQP.newRpcServer[String, Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer,
requestHandler, channelParameters = Some(channelParameters))
val rpcClientSerializer = new RpcClientSerializer[String, Int](
new ToBinary[String] {
def toBinary(t: String) = t.getBytes
}, new FromBinary[Int] {
def fromBinary(bytes: Array[Byte]) = bytes.head.toInt
})
val rpcClient = AMQP.newRpcClient[String, Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer,
channelParameters = Some(channelParameters))
countDown.await(2, TimeUnit.SECONDS) must be (true)
countDown.await(2, TimeUnit.SECONDS) must be(true)
val response = rpcClient !! "some_payload"
response must be (Some(3))
response must be(Some(3))
} finally {
connection.stop
}

View file

@ -5,5 +5,5 @@
package se.scalablesolutions.akka.amqp.test
object AMQPTest {
def enabled = false
def enabled = true
}