add durablility and auto-delete with defaults to rpc and with passive = true for client

This commit is contained in:
momania 2010-08-10 10:39:56 +02:00
parent 6a586d1198
commit 4770846eb7

View file

@ -13,7 +13,8 @@ object RPC {
routingKey: String,
serializer: RpcClientSerializer[O, I],
channelParameters: Option[ChannelParameters] = None): ActorRef = {
val rpcActor: ActorRef = actorOf(new RpcClientActor[O, I](exchangeParameters, routingKey, serializer, channelParameters))
val rpcActor: ActorRef = actorOf(new RpcClientActor[O, I](
exchangeParameters, routingKey, serializer, channelParameters))
connection.startLink(rpcActor)
rpcActor ! Start
rpcActor
@ -26,9 +27,10 @@ object RPC {
requestHandler: I => O,
queueName: Option[String] = None,
channelParameters: Option[ChannelParameters] = None) = {
val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters))
val producer = newProducer(connection, ProducerParameters(
ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters))
val rpcServer = actorOf(new RpcServerActor[I, O](producer, serializer, requestHandler))
val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer
val consumer = newConsumer(connection, ConsumerParameters(exchangeParameters, routingKey, rpcServer
, channelParameters = channelParameters
, selfAcknowledging = false
, queueName = queueName))
@ -63,7 +65,8 @@ object RPC {
def startProtobufServer[I <: Message, O <: Message](
connection: ActorRef, exchange: String, requestHandler: I => O,
routingKey: Option[String] = None,
queueName: Option[String] = None)(implicit manifest: Manifest[I]) = {
queueName: Option[String] = None,
durable = false, autoDelete = true)(implicit manifest: Manifest[I]) = {
val serializer = new RpcServerSerializer[I, O](
new FromBinary[I] {
@ -74,7 +77,8 @@ object RPC {
def toBinary(t: O) = t.toByteArray
})
val exchangeParameters = new ExchangeParameters(exchange, ExchangeType.Topic)
val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic,
exchangeDurable = durable, exchangeAutoDelete = autoDelete)
val rKey = routingKey.getOrElse("%s.request".format(exchange))
val qName = queueName.getOrElse("%s.in".format(rKey))
@ -84,7 +88,8 @@ object RPC {
def startProtobufClient[O <: Message, I <: Message](
connection: ActorRef, exchange: String,
routingKey: Option[String] = None)(implicit manifest: Manifest[I]): RpcClient[O, I] = {
routingKey: Option[String] = None,
durable = false, autoDelete = true, passive = true)(implicit manifest: Manifest[I]): RpcClient[O, I] = {
val serializer = new RpcClientSerializer[O, I](
@ -96,7 +101,8 @@ object RPC {
}
})
val exchangeParameters = new ExchangeParameters(exchange, ExchangeType.Topic)
val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic,
exchangeDurable = durable, exchangeAutoDelete = autoDelete, exchangePassive = passive)
val rKey = routingKey.getOrElse("%s.request".format(exchange))
val client = newRpcClient[O, I](connection, exchangeParameters, rKey, serializer)