diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala index 26887dbff4..60a1a6733c 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala @@ -13,7 +13,8 @@ object RPC { routingKey: String, serializer: RpcClientSerializer[O, I], channelParameters: Option[ChannelParameters] = None): ActorRef = { - val rpcActor: ActorRef = actorOf(new RpcClientActor[O, I](exchangeParameters, routingKey, serializer, channelParameters)) + val rpcActor: ActorRef = actorOf(new RpcClientActor[O, I]( + exchangeParameters, routingKey, serializer, channelParameters)) connection.startLink(rpcActor) rpcActor ! Start rpcActor @@ -26,9 +27,10 @@ object RPC { requestHandler: I => O, queueName: Option[String] = None, channelParameters: Option[ChannelParameters] = None) = { - val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters)) + val producer = newProducer(connection, ProducerParameters( + ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters)) val rpcServer = actorOf(new RpcServerActor[I, O](producer, serializer, requestHandler)) - val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer + val consumer = newConsumer(connection, ConsumerParameters(exchangeParameters, routingKey, rpcServer , channelParameters = channelParameters , selfAcknowledging = false , queueName = queueName)) @@ -63,7 +65,8 @@ object RPC { def startProtobufServer[I <: Message, O <: Message]( connection: ActorRef, exchange: String, requestHandler: I => O, routingKey: Option[String] = None, - queueName: Option[String] = None)(implicit manifest: Manifest[I]) = { + queueName: Option[String] = None, + durable = false, autoDelete = true)(implicit manifest: Manifest[I]) = { val serializer = new RpcServerSerializer[I, O]( new FromBinary[I] { @@ -74,7 +77,8 @@ object RPC { def toBinary(t: O) = t.toByteArray }) - val exchangeParameters = new ExchangeParameters(exchange, ExchangeType.Topic) + val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, + exchangeDurable = durable, exchangeAutoDelete = autoDelete) val rKey = routingKey.getOrElse("%s.request".format(exchange)) val qName = queueName.getOrElse("%s.in".format(rKey)) @@ -84,7 +88,8 @@ object RPC { def startProtobufClient[O <: Message, I <: Message]( connection: ActorRef, exchange: String, - routingKey: Option[String] = None)(implicit manifest: Manifest[I]): RpcClient[O, I] = { + routingKey: Option[String] = None, + durable = false, autoDelete = true, passive = true)(implicit manifest: Manifest[I]): RpcClient[O, I] = { val serializer = new RpcClientSerializer[O, I]( @@ -96,7 +101,8 @@ object RPC { } }) - val exchangeParameters = new ExchangeParameters(exchange, ExchangeType.Topic) + val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, + exchangeDurable = durable, exchangeAutoDelete = autoDelete, exchangePassive = passive) val rKey = routingKey.getOrElse("%s.request".format(exchange)) val client = newRpcClient[O, I](connection, exchangeParameters, rKey, serializer)