From fd110acc23d6df555a2cf06c441101fcc14386bc Mon Sep 17 00:00:00 2001 From: momania Date: Thu, 15 Jul 2010 18:21:55 +0200 Subject: [PATCH] - rpc typing and serialization - again --- akka-amqp/src/main/scala/AMQP.scala | 14 ++++++++++++-- akka-amqp/src/main/scala/ExampleSession.scala | 17 +++++++++++++++-- akka-amqp/src/main/scala/RpcClientActor.scala | 4 ++-- akka-amqp/src/main/scala/RpcServerActor.scala | 4 ++-- .../test/scala/AMQPRpcClientServerTest.scala | 16 ++++++++++++++-- akka-amqp/src/test/scala/AMQPTest.scala | 2 +- 6 files changed, 46 insertions(+), 11 deletions(-) diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index a4fe23b7b0..8605401dbd 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -128,6 +128,16 @@ object AMQP { } } - case class RpcClientSerializer[O,I](output: Function[O, Array[Byte]], input: Function[Array[Byte], I]) - case class RpcServerSerializer[I,O](input: Function[Array[Byte], I], output: Function[O, Array[Byte]]) + trait FromBinary[T] { + def fromBinary(bytes: Array[Byte]): T + } + + trait ToBinary[T] { + def toBinary(t: T): Array[Byte] + } + + + case class RpcClientSerializer[O,I](toBinary: ToBinary[O], fromBinary: FromBinary[I]) + + case class RpcServerSerializer[I,O](fromBinary: FromBinary[I], toBinary: ToBinary[O]) } diff --git a/akka-amqp/src/main/scala/ExampleSession.scala b/akka-amqp/src/main/scala/ExampleSession.scala index 88ad583ea8..97571e2783 100644 --- a/akka-amqp/src/main/scala/ExampleSession.scala +++ b/akka-amqp/src/main/scala/ExampleSession.scala @@ -136,15 +136,28 @@ object ExampleSession { val exchangeParameters = ExchangeParameters("my_rpc_exchange", ExchangeType.Topic) /** Server */ - val rpcServerSerializer = new RpcServerSerializer[String, Int]({x:Array[Byte] => new String(x)}, {x:Int => Array(x.toByte)}) + val serverFromBinary = new FromBinary[String] { + def fromBinary(bytes: Array[Byte]) = new String(bytes) + } + val serverToBinary = new ToBinary[Int] { + def toBinary(t: Int) = Array(t.toByte) + } + val rpcServerSerializer = new RpcServerSerializer[String, Int](serverFromBinary, serverToBinary) val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer, { case "rpc_request" => 3 case _ => error("unknown request") }) + /** Client */ - val rpcClientSerializer = new RpcClientSerializer[String, Int]({x:String => x.getBytes}, {x:Array[Byte] => x.head.toInt}) + val clientToBinary = new ToBinary[String] { + def toBinary(t: String) = t.getBytes + } + val clientFromBinary = new FromBinary[Int] { + def fromBinary(bytes: Array[Byte]) = bytes.head.toInt + } + val rpcClientSerializer = new RpcClientSerializer[String, Int](clientToBinary, clientFromBinary) val rpcClient = AMQP.newRpcClient[String,Int](connection, exchangeParameters, "rpc.in.key", rpcClientSerializer) diff --git a/akka-amqp/src/main/scala/RpcClientActor.scala b/akka-amqp/src/main/scala/RpcClientActor.scala index 6880a4a36f..f8c376be7e 100644 --- a/akka-amqp/src/main/scala/RpcClientActor.scala +++ b/akka-amqp/src/main/scala/RpcClientActor.scala @@ -23,8 +23,8 @@ class RpcClientActor[I,O](exchangeParameters: ExchangeParameters, rpcClient match { case Some(client) => - val response: Array[Byte] = client.primitiveCall(serializer.output(payload)) - self.reply(serializer.input(response)) + val response: Array[Byte] = client.primitiveCall(serializer.toBinary.toBinary(payload)) + self.reply(serializer.fromBinary.fromBinary(response)) case None => error("%s has no client to send messages with".format(this)) } } diff --git a/akka-amqp/src/main/scala/RpcServerActor.scala b/akka-amqp/src/main/scala/RpcServerActor.scala index b2f41cec0d..897c041c69 100644 --- a/akka-amqp/src/main/scala/RpcServerActor.scala +++ b/akka-amqp/src/main/scala/RpcServerActor.scala @@ -16,8 +16,8 @@ class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I, case Delivery(payload, _, tag, props, sender) => { log.debug("%s handling delivery with tag %d", this, tag) - val request = serializer.input(payload) - val response: Array[Byte] = serializer.output(requestHandler(request)) + val request = serializer.fromBinary.fromBinary(payload) + val response: Array[Byte] = serializer.toBinary.toBinary(requestHandler(request)) log.debug("%s sending reply to %s", this, props.getReplyTo) val replyProps = new BasicProperties diff --git a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala index 7a798c366d..7dbfb4becd 100644 --- a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala +++ b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala @@ -32,13 +32,25 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) - val rpcServerSerializer = new RpcServerSerializer[String, Int]({x:Array[Byte] => new String(x)}, {x:Int => Array(x.toByte)}) + val serverFromBinary = new FromBinary[String] { + def fromBinary(bytes: Array[Byte]) = new String(bytes) + } + val serverToBinary = new ToBinary[Int] { + def toBinary(t: Int) = Array(t.toByte) + } + val rpcServerSerializer = new RpcServerSerializer[String, Int](serverFromBinary, serverToBinary) val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer, { case "some_payload" => 3 case _ => error("unknown request") }, channelParameters = Some(channelParameters)) - val rpcClientSerializer = new RpcClientSerializer[String, Int]({x:String => x.getBytes}, {x:Array[Byte] => x.head.toInt}) + val clientToBinary = new ToBinary[String] { + def toBinary(t: String) = t.getBytes + } + val clientFromBinary = new FromBinary[Int] { + def fromBinary(bytes: Array[Byte]) = bytes.head.toInt + } + val rpcClientSerializer = new RpcClientSerializer[String, Int](clientToBinary, clientFromBinary) val rpcClient = AMQP.newRpcClient[String,Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer, channelParameters = Some(channelParameters)) diff --git a/akka-amqp/src/test/scala/AMQPTest.scala b/akka-amqp/src/test/scala/AMQPTest.scala index e50ab673f6..0fdb845171 100644 --- a/akka-amqp/src/test/scala/AMQPTest.scala +++ b/akka-amqp/src/test/scala/AMQPTest.scala @@ -5,5 +5,5 @@ package se.scalablesolutions.akka.amqp.test object AMQPTest { - def enabled = false + def enabled = true } \ No newline at end of file