rpc typing and serialization
This commit is contained in:
parent
4d0b503bf0
commit
5264a5ae40
5 changed files with 46 additions and 49 deletions
|
|
@ -10,8 +10,6 @@ import se.scalablesolutions.akka.config.OneForOneStrategy
|
|||
import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory}
|
||||
import java.lang.IllegalArgumentException
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.serialization.Serializer
|
||||
|
||||
/**
|
||||
* AMQP Actor API. Implements Connection, Producer and Consumer materialized as Actors.
|
||||
*
|
||||
|
|
@ -20,7 +18,6 @@ import se.scalablesolutions.akka.serialization.Serializer
|
|||
* @author Irmo Manie
|
||||
*/
|
||||
object AMQP {
|
||||
|
||||
case class ConnectionParameters(
|
||||
host: String = ConnectionFactory.DEFAULT_HOST,
|
||||
port: Int = ConnectionFactory.DEFAULT_AMQP_PORT,
|
||||
|
|
@ -57,7 +54,6 @@ object AMQP {
|
|||
queueExclusive: Boolean = false,
|
||||
selfAcknowledging: Boolean = true,
|
||||
channelParameters: Option[ChannelParameters] = None) {
|
||||
|
||||
if (queueDurable && queueName.isEmpty) {
|
||||
throw new IllegalArgumentException("A queue name is required when requesting a durable queue.")
|
||||
}
|
||||
|
|
@ -84,27 +80,25 @@ object AMQP {
|
|||
consumer
|
||||
}
|
||||
|
||||
def newRpcClient(connection: ActorRef,
|
||||
def newRpcClient[O,I](connection: ActorRef,
|
||||
exchangeParameters: ExchangeParameters,
|
||||
routingKey: String,
|
||||
inSerializer: Serializer,
|
||||
outSerializer: Serializer,
|
||||
serializer: RpcClientSerializer[O,I],
|
||||
channelParameters: Option[ChannelParameters] = None): ActorRef = {
|
||||
val rpcActor: ActorRef = actorOf(new RpcClientActor(exchangeParameters, routingKey, inSerializer, outSerializer, channelParameters))
|
||||
val rpcActor: ActorRef = actorOf(new RpcClientActor[O,I](exchangeParameters, routingKey, serializer, channelParameters))
|
||||
connection.startLink(rpcActor)
|
||||
rpcActor ! Start
|
||||
rpcActor
|
||||
}
|
||||
|
||||
def newRpcServer(connection: ActorRef,
|
||||
exchangeParameters: ExchangeParameters,
|
||||
routingKey: String,
|
||||
inSerializer: Serializer,
|
||||
outSerializer: Serializer,
|
||||
requestHandler: PartialFunction[AnyRef, AnyRef],
|
||||
channelParameters: Option[ChannelParameters] = None) = {
|
||||
def newRpcServer[I,O](connection: ActorRef,
|
||||
exchangeParameters: ExchangeParameters,
|
||||
routingKey: String,
|
||||
serializer: RpcServerSerializer[I,O],
|
||||
requestHandler: PartialFunction[I, O],
|
||||
channelParameters: Option[ChannelParameters] = None) = {
|
||||
val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters))
|
||||
val rpcServer = actorOf(new RpcServerActor(producer, inSerializer, outSerializer, requestHandler))
|
||||
val rpcServer = actorOf(new RpcServerActor[I,O](producer, serializer, requestHandler))
|
||||
val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer
|
||||
, channelParameters = channelParameters
|
||||
, selfAcknowledging = false))
|
||||
|
|
@ -133,4 +127,7 @@ object AMQP {
|
|||
connectionActor
|
||||
}
|
||||
}
|
||||
|
||||
case class RpcClientSerializer[O,I](output: Function[O, Array[Byte]], input: Function[Array[Byte], I])
|
||||
case class RpcServerSerializer[I,O](input: Function[Array[Byte], I], output: Function[O, Array[Byte]])
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,10 +8,10 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRegistry}
|
|||
import Actor._
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
import se.scalablesolutions.akka.amqp.AMQP._
|
||||
import se.scalablesolutions.akka.serialization.Serializer
|
||||
import java.lang.Class
|
||||
import java.lang.String
|
||||
|
||||
object ExampleSession {
|
||||
|
||||
def main(args: Array[String]) = {
|
||||
println("==== DIRECT ===")
|
||||
direct
|
||||
|
|
@ -97,6 +97,7 @@ object ExampleSession {
|
|||
}
|
||||
|
||||
def callback = {
|
||||
|
||||
val channelCountdown = new CountDownLatch(2)
|
||||
|
||||
val connectionCallback = actor {
|
||||
|
|
@ -129,21 +130,23 @@ object ExampleSession {
|
|||
}
|
||||
|
||||
def rpc = {
|
||||
|
||||
val connection = AMQP.newConnection()
|
||||
|
||||
val exchangeParameters = ExchangeParameters("my_rpc_exchange", ExchangeType.Topic)
|
||||
|
||||
val stringSerializer = new Serializer {
|
||||
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]) = new String(bytes)
|
||||
def toBinary(obj: AnyRef) = obj.asInstanceOf[String].getBytes
|
||||
}
|
||||
/** Server */
|
||||
val rpcServerSerializer = new RpcServerSerializer[String, Int]({x:Array[Byte] => new String(x)}, {x:Int => Array(x.toByte)})
|
||||
|
||||
val rpcServer = AMQP.newRpcServer(connection, exchangeParameters, "rpc.in.key", stringSerializer, stringSerializer, {
|
||||
case "rpc_request" => "rpc_response"
|
||||
val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer, {
|
||||
case "rpc_request" => 3
|
||||
case _ => error("unknown request")
|
||||
})
|
||||
|
||||
val rpcClient = AMQP.newRpcClient(connection, exchangeParameters, "rpc.in.key", stringSerializer, stringSerializer)
|
||||
/** Client */
|
||||
val rpcClientSerializer = new RpcClientSerializer[String, Int]({x:String => x.getBytes}, {x:Array[Byte] => x.head.toInt})
|
||||
|
||||
val rpcClient = AMQP.newRpcClient[String,Int](connection, exchangeParameters, "rpc.in.key", rpcClientSerializer)
|
||||
|
||||
val response = (rpcClient !! "rpc_request")
|
||||
log.info("Response: " + response)
|
||||
|
|
|
|||
|
|
@ -4,14 +4,12 @@
|
|||
|
||||
package se.scalablesolutions.akka.amqp
|
||||
|
||||
import se.scalablesolutions.akka.serialization.Serializer
|
||||
import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ExchangeParameters}
|
||||
import com.rabbitmq.client.{Channel, RpcClient}
|
||||
import se.scalablesolutions.akka.amqp.AMQP.{RpcClientSerializer, ChannelParameters, ExchangeParameters}
|
||||
|
||||
class RpcClientActor(exchangeParameters: ExchangeParameters,
|
||||
class RpcClientActor[I,O](exchangeParameters: ExchangeParameters,
|
||||
routingKey: String,
|
||||
inSerializer: Serializer,
|
||||
outSerializer: Serializer,
|
||||
serializer: RpcClientSerializer[I,O],
|
||||
channelParameters: Option[ChannelParameters] = None) extends FaultTolerantChannelActor(exchangeParameters, channelParameters) {
|
||||
|
||||
import exchangeParameters._
|
||||
|
|
@ -21,12 +19,12 @@ class RpcClientActor(exchangeParameters: ExchangeParameters,
|
|||
log.info("%s started", this)
|
||||
|
||||
def specificMessageHandler = {
|
||||
case payload: AnyRef => {
|
||||
case payload: I => {
|
||||
|
||||
rpcClient match {
|
||||
case Some(client) =>
|
||||
val response: Array[Byte] = client.primitiveCall(inSerializer.toBinary(payload))
|
||||
self.reply(outSerializer.fromBinary(response, None))
|
||||
val response: Array[Byte] = client.primitiveCall(serializer.output(payload))
|
||||
self.reply(serializer.input(response))
|
||||
case None => error("%s has no client to send messages with".format(this))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,9 +6,9 @@ package se.scalablesolutions.akka.amqp
|
|||
|
||||
import se.scalablesolutions.akka.actor.{ActorRef, Actor}
|
||||
import com.rabbitmq.client.AMQP.BasicProperties
|
||||
import se.scalablesolutions.akka.serialization.Serializer
|
||||
import se.scalablesolutions.akka.amqp.AMQP.RpcServerSerializer
|
||||
|
||||
class RpcServerActor(producer: ActorRef, inSerializer: Serializer, outSerializer: Serializer, requestHandler: PartialFunction[AnyRef, AnyRef]) extends Actor {
|
||||
class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,O], requestHandler: PartialFunction[I, O]) extends Actor {
|
||||
|
||||
log.info("%s started", this)
|
||||
|
||||
|
|
@ -16,8 +16,8 @@ class RpcServerActor(producer: ActorRef, inSerializer: Serializer, outSerializer
|
|||
case Delivery(payload, _, tag, props, sender) => {
|
||||
|
||||
log.debug("%s handling delivery with tag %d", this, tag)
|
||||
val request = inSerializer.fromBinary(payload, None)
|
||||
val response: Array[Byte] = outSerializer.toBinary(requestHandler(request))
|
||||
val request = serializer.input(payload)
|
||||
val response: Array[Byte] = serializer.output(requestHandler(request))
|
||||
|
||||
log.debug("%s sending reply to %s", this, props.getReplyTo)
|
||||
val replyProps = new BasicProperties
|
||||
|
|
|
|||
|
|
@ -11,8 +11,8 @@ import se.scalablesolutions.akka.amqp._
|
|||
import se.scalablesolutions.akka.actor.Actor._
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters}
|
||||
import se.scalablesolutions.akka.serialization.Serializer
|
||||
import se.scalablesolutions.akka.amqp.AMQP._
|
||||
|
||||
class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging {
|
||||
|
||||
|
|
@ -29,23 +29,22 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging
|
|||
}
|
||||
|
||||
val exchangeParameters = ExchangeParameters("text_topic_exchange", ExchangeType.Topic)
|
||||
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
|
||||
val stringSerializer = new Serializer {
|
||||
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]) = new String(bytes)
|
||||
def toBinary(obj: AnyRef) = obj.asInstanceOf[String].getBytes
|
||||
}
|
||||
val channelParameters = ChannelParameters(channelCallback
|
||||
= Some(channelCallback))
|
||||
|
||||
val rpcServer = AMQP.newRpcServer(connection, exchangeParameters, "rpc.routing", stringSerializer, stringSerializer, {
|
||||
case "some_payload" => "some_result"
|
||||
case _ => error("Unhandled message")
|
||||
val rpcServerSerializer = new RpcServerSerializer[String, Int]({x:Array[Byte] => new String(x)}, {x:Int => Array(x.toByte)})
|
||||
val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer, {
|
||||
case "some_payload" => 3
|
||||
case _ => error("unknown request")
|
||||
}, channelParameters = Some(channelParameters))
|
||||
|
||||
val rpcClient = AMQP.newRpcClient(connection, exchangeParameters, "rpc.routing", stringSerializer, stringSerializer
|
||||
, channelParameters = Some(channelParameters))
|
||||
val rpcClientSerializer = new RpcClientSerializer[String, Int]({x:String => x.getBytes}, {x:Array[Byte] => x.head.toInt})
|
||||
val rpcClient = AMQP.newRpcClient[String,Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer,
|
||||
channelParameters = Some(channelParameters))
|
||||
|
||||
countDown.await(2, TimeUnit.SECONDS) must be (true)
|
||||
val response = rpcClient !! "some_payload"
|
||||
response must be (Some("some_result"))
|
||||
response must be (Some(3))
|
||||
} finally {
|
||||
connection.stop
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue