From ffbe3fbb6bccc106910d14585d7665831b9691c2 Mon Sep 17 00:00:00 2001 From: jboner Date: Wed, 28 Oct 2009 12:28:02 +0100 Subject: [PATCH] Improved AMQP module code --- akka-actors/src/main/scala/actor/Actor.scala | 2 +- .../scala/stm/TransactionManagement.scala | 2 +- akka-amqp/src/main/scala/AMQP.scala | 67 ++++++++++++------- 3 files changed, 46 insertions(+), 25 deletions(-) diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index 0802719a64..73d6691644 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -58,7 +58,7 @@ object Actor { */ trait Actor extends Logging with TransactionManagement { ActorRegistry.register(this) - + @volatile private[this] var isRunning: Boolean = false private[this] val remoteFlagLock = new ReadWriteLock private[this] val transactionalFlagLock = new ReadWriteLock diff --git a/akka-actors/src/main/scala/stm/TransactionManagement.scala b/akka-actors/src/main/scala/stm/TransactionManagement.scala index 64ac76cbe6..f8bf35c10d 100644 --- a/akka-actors/src/main/scala/stm/TransactionManagement.scala +++ b/akka-actors/src/main/scala/stm/TransactionManagement.scala @@ -38,7 +38,7 @@ object TransactionManagement extends TransactionManagement { } trait TransactionManagement extends Logging { - // FIXME is java.util.UUID better? + // FIXME http://www.assembla.com/spaces/akka/tickets/56-Change-UUID-generation-for-the-TransactionManagement-trait var uuid = Uuid.newUuid.toString import TransactionManagement.currentTransaction diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 6ccab7c6c9..83620a6265 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -99,23 +99,24 @@ object AMQP extends Actor { ", tag=" + tag + ", isUsingExistingQueue=" + isUsingExistingQueue + "]" + /** + * Hash code should only be based on on queue name and routing key. + */ override def hashCode(): Int = synchronized { var result = HashCode.SEED result = HashCode.hash(result, queueName) result = HashCode.hash(result, routingKey) - result = HashCode.hash(result, isUsingExistingQueue) - result = if (tag.isDefined) HashCode.hash(result, tag.get) else result result } + /** + * Equality should only be defined in terms of queue name and routing key. + */ override def equals(that: Any): Boolean = synchronized { that != null && that.isInstanceOf[MessageConsumerListener] && that.asInstanceOf[MessageConsumerListener].queueName== queueName && - that.asInstanceOf[MessageConsumerListener].routingKey == routingKey && - that.asInstanceOf[MessageConsumerListener].isUsingExistingQueue == isUsingExistingQueue && - that.asInstanceOf[MessageConsumerListener].tag.isDefined == tag.isDefined && - { if (that.asInstanceOf[MessageConsumerListener].tag.isDefined) { that.asInstanceOf[MessageConsumerListener].tag.get == tag.get } else true} + that.asInstanceOf[MessageConsumerListener].routingKey == routingKey } } object MessageConsumerListener { @@ -185,7 +186,7 @@ object AMQP extends Actor { passive: Boolean, durable: Boolean, configurationArguments: Map[String, AnyRef]): Consumer = { - val endpoint = new Consumer( + val consumer = new Consumer( new ConnectionFactory(config), hostname, port, exchangeName, @@ -195,8 +196,8 @@ object AMQP extends Actor { passive, durable, configurationArguments) - startLink(endpoint) - endpoint + startLink(consumer) + consumer } def stopConnection(connection: FaultTolerantConnectionActor) = { @@ -227,7 +228,7 @@ object AMQP extends Actor { log.info("AMQP.Producer [%s] is started", toString) - def newRPC(routingKey: String): RpcClient = new RpcClient(channel, exchangeName, routingKey) + def newRpcClient(routingKey: String): RpcClient = new RpcClient(channel, exchangeName, routingKey) def receive = { case message @ Message(payload, routingKey, mandatory, immediate, properties) => @@ -295,10 +296,17 @@ object AMQP extends Actor { log.info("AMQP.Consumer [%s] is started", toString) + def newRpcServerWithCallback(body: (Array[Byte], RabbitMQ.BasicProperties) => Array[Byte]): RpcServer = { + new RpcServer(channel) { + override def handleCall(requestBody: Array[Byte], replyProperties: RabbitMQ.BasicProperties) = { + body(requestBody, replyProperties) + } + } + } + def receive = { case listener: MessageConsumerListener => startLink(listener.actor) - listeners.put(listener, listener) registerConsumer(listener) log.info("Message consumer listener is registered [%s]", listener) @@ -342,21 +350,33 @@ object AMQP extends Actor { private def registerConsumer(listener: MessageConsumerListener) = { log.debug("Register MessageConsumerListener %s", listener.toString(exchangeName)) + listeners.put(listener, listener) + if (!listener.isUsingExistingQueue) { - log.debug("Declaring and binding new queue for MessageConsumerListener [%s]", listener.queueName) + log.debug("Declaring new queue for MessageConsumerListener [%s]", listener.queueName) channel.queueDeclare(listener.queueName) - channel.queueBind(listener.queueName, exchangeName, listener.routingKey) } - val listenerTag = channel.basicConsume(listener.queueName, false, new DefaultConsumer(channel) with Logging { - override def handleDelivery(tag: String, envelope: Envelope, properties: RabbitMQ.BasicProperties, payload: Array[Byte]) { + log.debug("Binding new queue for MessageConsumerListener [%s]", listener.queueName) + channel.queueBind(listener.queueName, exchangeName, listener.routingKey) + + val listenerTag = channel.basicConsume(listener.queueName, true, new DefaultConsumer(channel) with Logging { + override def handleDelivery(tag: String, + envelope: Envelope, + properties: RabbitMQ.BasicProperties, + payload: Array[Byte]) { try { val mandatory = false // FIXME: where to find out if it's mandatory? val immediate = false // FIXME: where to find out if it's immediate? + log.debug("Passing a message on to the MessageConsumerListener [%s]", listener.toString(exchangeName)) listener.actor ! Message(payload, envelope.getRoutingKey, mandatory, immediate, properties) - channel.basicAck(envelope.getDeliveryTag, false) + val deliveryTag = envelope.getDeliveryTag + log.debug("Acking message with delivery tag [%s]", deliveryTag) + channel.basicAck(deliveryTag, false) } catch { - case cause => self ! Failure(cause) // pass on and re-throw exception in endpoint actor to trigger restart and reconnect + case cause => + log.error("Delivery of message to MessageConsumerListener [%s] failed due to [%s]", listener.toString(exchangeName), cause.toString) + self ! Failure(cause) // pass on and re-throw exception in consumer actor to trigger restart and reconnect } } @@ -364,7 +384,8 @@ object AMQP extends Actor { listeners.elements.toList.map(_._2).find(_.tag == listenerTag) match { case None => log.warning("Could not find message listener for tag [%s]; can't shut listener down", listenerTag) case Some(listener) => - log.warning("Message listener listener [%s] is being shutdown by [%s] due to [%s]", listener.toString(exchangeName), signal.getReference, signal.getReason) + log.warning("Message listener listener [%s] is being shutdown by [%s] due to [%s]", + listener.toString(exchangeName), signal.getReference, signal.getReason) self ! CancelMessageConsumerListener(listener) } } @@ -397,20 +418,20 @@ object AMQP extends Actor { protected def setupChannel - def createQueue: String = channel.queueDeclare.getQueue + def createQueue: String = channel.queueDeclare("", false, false, true, true, null).getQueue - def createQueue(name: String) { channel.queueDeclare(name) } + def createQueue(name: String) = channel.queueDeclare(name, false, false, true, true, null).getQueue - def createQueue(name: String, durable: Boolean) { channel.queueDeclare(name, durable) } + def createQueue(name: String, durable: Boolean) = channel.queueDeclare(name, false, durable, true, true, null).getQueue def createBindQueue: String = { - val name = channel.queueDeclare.getQueue + val name = createQueue channel.queueBind(name, exchangeName, name) name } def createBindQueue(name: String) { - channel.queueDeclare(name) + createQueue(name) channel.queueBind(name, exchangeName, name) }