- rpc typing and serialization - again
This commit is contained in:
parent
5264a5ae40
commit
fd110acc23
6 changed files with 46 additions and 11 deletions
|
|
@ -128,6 +128,16 @@ object AMQP {
|
|||
}
|
||||
}
|
||||
|
||||
case class RpcClientSerializer[O,I](output: Function[O, Array[Byte]], input: Function[Array[Byte], I])
|
||||
case class RpcServerSerializer[I,O](input: Function[Array[Byte], I], output: Function[O, Array[Byte]])
|
||||
trait FromBinary[T] {
|
||||
def fromBinary(bytes: Array[Byte]): T
|
||||
}
|
||||
|
||||
trait ToBinary[T] {
|
||||
def toBinary(t: T): Array[Byte]
|
||||
}
|
||||
|
||||
|
||||
case class RpcClientSerializer[O,I](toBinary: ToBinary[O], fromBinary: FromBinary[I])
|
||||
|
||||
case class RpcServerSerializer[I,O](fromBinary: FromBinary[I], toBinary: ToBinary[O])
|
||||
}
|
||||
|
|
|
|||
|
|
@ -136,15 +136,28 @@ object ExampleSession {
|
|||
val exchangeParameters = ExchangeParameters("my_rpc_exchange", ExchangeType.Topic)
|
||||
|
||||
/** Server */
|
||||
val rpcServerSerializer = new RpcServerSerializer[String, Int]({x:Array[Byte] => new String(x)}, {x:Int => Array(x.toByte)})
|
||||
val serverFromBinary = new FromBinary[String] {
|
||||
def fromBinary(bytes: Array[Byte]) = new String(bytes)
|
||||
}
|
||||
val serverToBinary = new ToBinary[Int] {
|
||||
def toBinary(t: Int) = Array(t.toByte)
|
||||
}
|
||||
val rpcServerSerializer = new RpcServerSerializer[String, Int](serverFromBinary, serverToBinary)
|
||||
|
||||
val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer, {
|
||||
case "rpc_request" => 3
|
||||
case _ => error("unknown request")
|
||||
})
|
||||
|
||||
|
||||
/** Client */
|
||||
val rpcClientSerializer = new RpcClientSerializer[String, Int]({x:String => x.getBytes}, {x:Array[Byte] => x.head.toInt})
|
||||
val clientToBinary = new ToBinary[String] {
|
||||
def toBinary(t: String) = t.getBytes
|
||||
}
|
||||
val clientFromBinary = new FromBinary[Int] {
|
||||
def fromBinary(bytes: Array[Byte]) = bytes.head.toInt
|
||||
}
|
||||
val rpcClientSerializer = new RpcClientSerializer[String, Int](clientToBinary, clientFromBinary)
|
||||
|
||||
val rpcClient = AMQP.newRpcClient[String,Int](connection, exchangeParameters, "rpc.in.key", rpcClientSerializer)
|
||||
|
||||
|
|
|
|||
|
|
@ -23,8 +23,8 @@ class RpcClientActor[I,O](exchangeParameters: ExchangeParameters,
|
|||
|
||||
rpcClient match {
|
||||
case Some(client) =>
|
||||
val response: Array[Byte] = client.primitiveCall(serializer.output(payload))
|
||||
self.reply(serializer.input(response))
|
||||
val response: Array[Byte] = client.primitiveCall(serializer.toBinary.toBinary(payload))
|
||||
self.reply(serializer.fromBinary.fromBinary(response))
|
||||
case None => error("%s has no client to send messages with".format(this))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,8 +16,8 @@ class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,
|
|||
case Delivery(payload, _, tag, props, sender) => {
|
||||
|
||||
log.debug("%s handling delivery with tag %d", this, tag)
|
||||
val request = serializer.input(payload)
|
||||
val response: Array[Byte] = serializer.output(requestHandler(request))
|
||||
val request = serializer.fromBinary.fromBinary(payload)
|
||||
val response: Array[Byte] = serializer.toBinary.toBinary(requestHandler(request))
|
||||
|
||||
log.debug("%s sending reply to %s", this, props.getReplyTo)
|
||||
val replyProps = new BasicProperties
|
||||
|
|
|
|||
|
|
@ -32,13 +32,25 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging
|
|||
val channelParameters = ChannelParameters(channelCallback
|
||||
= Some(channelCallback))
|
||||
|
||||
val rpcServerSerializer = new RpcServerSerializer[String, Int]({x:Array[Byte] => new String(x)}, {x:Int => Array(x.toByte)})
|
||||
val serverFromBinary = new FromBinary[String] {
|
||||
def fromBinary(bytes: Array[Byte]) = new String(bytes)
|
||||
}
|
||||
val serverToBinary = new ToBinary[Int] {
|
||||
def toBinary(t: Int) = Array(t.toByte)
|
||||
}
|
||||
val rpcServerSerializer = new RpcServerSerializer[String, Int](serverFromBinary, serverToBinary)
|
||||
val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer, {
|
||||
case "some_payload" => 3
|
||||
case _ => error("unknown request")
|
||||
}, channelParameters = Some(channelParameters))
|
||||
|
||||
val rpcClientSerializer = new RpcClientSerializer[String, Int]({x:String => x.getBytes}, {x:Array[Byte] => x.head.toInt})
|
||||
val clientToBinary = new ToBinary[String] {
|
||||
def toBinary(t: String) = t.getBytes
|
||||
}
|
||||
val clientFromBinary = new FromBinary[Int] {
|
||||
def fromBinary(bytes: Array[Byte]) = bytes.head.toInt
|
||||
}
|
||||
val rpcClientSerializer = new RpcClientSerializer[String, Int](clientToBinary, clientFromBinary)
|
||||
val rpcClient = AMQP.newRpcClient[String,Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer,
|
||||
channelParameters = Some(channelParameters))
|
||||
|
||||
|
|
|
|||
|
|
@ -5,5 +5,5 @@
|
|||
package se.scalablesolutions.akka.amqp.test
|
||||
|
||||
object AMQPTest {
|
||||
def enabled = false
|
||||
def enabled = true
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue