diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala index 3097e64cd9..26887dbff4 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala @@ -61,7 +61,9 @@ object RPC { private val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]]) def startProtobufServer[I <: Message, O <: Message]( - connection: ActorRef, serviceName: String, requestHandler: I => O)(implicit manifest: Manifest[I]) = { + connection: ActorRef, exchange: String, requestHandler: I => O, + routingKey: Option[String] = None, + queueName: Option[String] = None)(implicit manifest: Manifest[I]) = { val serializer = new RpcServerSerializer[I, O]( new FromBinary[I] { @@ -72,16 +74,18 @@ object RPC { def toBinary(t: O) = t.toByteArray }) - val exchangeParameters = new ExchangeParameters(serviceName, ExchangeType.Topic) - val routingKey = "%s.request".format(serviceName) - val queueName = "%s.in".format(routingKey) + val exchangeParameters = new ExchangeParameters(exchange, ExchangeType.Topic) + val rKey = routingKey.getOrElse("%s.request".format(exchange)) + val qName = queueName.getOrElse("%s.in".format(rKey)) - newRpcServer[I, O](connection, exchangeParameters, routingKey, serializer, requestHandler, - queueName = Some(queueName)) + newRpcServer[I, O](connection, exchangeParameters, rKey, serializer, requestHandler, + queueName = Some(qName)) } def startProtobufClient[O <: Message, I <: Message]( - connection: ActorRef, serviceName: String)(implicit manifest: Manifest[I]): RpcClient[O, I] = { + connection: ActorRef, exchange: String, + routingKey: Option[String] = None)(implicit manifest: Manifest[I]): RpcClient[O, I] = { + val serializer = new RpcClientSerializer[O, I]( new ToBinary[O] { @@ -92,11 +96,10 @@ object RPC { } }) - val exchangeParameters = new ExchangeParameters(serviceName, ExchangeType.Topic) - val routingKey = "%s.request".format(serviceName) - val queueName = "%s.in".format(routingKey) + val exchangeParameters = new ExchangeParameters(exchange, ExchangeType.Topic) + val rKey = routingKey.getOrElse("%s.request".format(exchange)) - val client = newRpcClient[O, I](connection, exchangeParameters, routingKey, serializer) + val client = newRpcClient[O, I](connection, exchangeParameters, rKey, serializer) new RpcClient[O, I](client) }