Improved AMQP module code

This commit is contained in:
jboner 2009-10-28 12:28:02 +01:00
parent d19a4712ba
commit ffbe3fbb6b
3 changed files with 46 additions and 25 deletions

View file

@ -58,7 +58,7 @@ object Actor {
*/
trait Actor extends Logging with TransactionManagement {
ActorRegistry.register(this)
@volatile private[this] var isRunning: Boolean = false
private[this] val remoteFlagLock = new ReadWriteLock
private[this] val transactionalFlagLock = new ReadWriteLock

View file

@ -38,7 +38,7 @@ object TransactionManagement extends TransactionManagement {
}
trait TransactionManagement extends Logging {
// FIXME is java.util.UUID better?
// FIXME http://www.assembla.com/spaces/akka/tickets/56-Change-UUID-generation-for-the-TransactionManagement-trait
var uuid = Uuid.newUuid.toString
import TransactionManagement.currentTransaction

View file

@ -99,23 +99,24 @@ object AMQP extends Actor {
", tag=" + tag +
", isUsingExistingQueue=" + isUsingExistingQueue + "]"
/**
* Hash code should only be based on on queue name and routing key.
*/
override def hashCode(): Int = synchronized {
var result = HashCode.SEED
result = HashCode.hash(result, queueName)
result = HashCode.hash(result, routingKey)
result = HashCode.hash(result, isUsingExistingQueue)
result = if (tag.isDefined) HashCode.hash(result, tag.get) else result
result
}
/**
* Equality should only be defined in terms of queue name and routing key.
*/
override def equals(that: Any): Boolean = synchronized {
that != null &&
that.isInstanceOf[MessageConsumerListener] &&
that.asInstanceOf[MessageConsumerListener].queueName== queueName &&
that.asInstanceOf[MessageConsumerListener].routingKey == routingKey &&
that.asInstanceOf[MessageConsumerListener].isUsingExistingQueue == isUsingExistingQueue &&
that.asInstanceOf[MessageConsumerListener].tag.isDefined == tag.isDefined &&
{ if (that.asInstanceOf[MessageConsumerListener].tag.isDefined) { that.asInstanceOf[MessageConsumerListener].tag.get == tag.get } else true}
that.asInstanceOf[MessageConsumerListener].routingKey == routingKey
}
}
object MessageConsumerListener {
@ -185,7 +186,7 @@ object AMQP extends Actor {
passive: Boolean,
durable: Boolean,
configurationArguments: Map[String, AnyRef]): Consumer = {
val endpoint = new Consumer(
val consumer = new Consumer(
new ConnectionFactory(config),
hostname, port,
exchangeName,
@ -195,8 +196,8 @@ object AMQP extends Actor {
passive,
durable,
configurationArguments)
startLink(endpoint)
endpoint
startLink(consumer)
consumer
}
def stopConnection(connection: FaultTolerantConnectionActor) = {
@ -227,7 +228,7 @@ object AMQP extends Actor {
log.info("AMQP.Producer [%s] is started", toString)
def newRPC(routingKey: String): RpcClient = new RpcClient(channel, exchangeName, routingKey)
def newRpcClient(routingKey: String): RpcClient = new RpcClient(channel, exchangeName, routingKey)
def receive = {
case message @ Message(payload, routingKey, mandatory, immediate, properties) =>
@ -295,10 +296,17 @@ object AMQP extends Actor {
log.info("AMQP.Consumer [%s] is started", toString)
def newRpcServerWithCallback(body: (Array[Byte], RabbitMQ.BasicProperties) => Array[Byte]): RpcServer = {
new RpcServer(channel) {
override def handleCall(requestBody: Array[Byte], replyProperties: RabbitMQ.BasicProperties) = {
body(requestBody, replyProperties)
}
}
}
def receive = {
case listener: MessageConsumerListener =>
startLink(listener.actor)
listeners.put(listener, listener)
registerConsumer(listener)
log.info("Message consumer listener is registered [%s]", listener)
@ -342,21 +350,33 @@ object AMQP extends Actor {
private def registerConsumer(listener: MessageConsumerListener) = {
log.debug("Register MessageConsumerListener %s", listener.toString(exchangeName))
listeners.put(listener, listener)
if (!listener.isUsingExistingQueue) {
log.debug("Declaring and binding new queue for MessageConsumerListener [%s]", listener.queueName)
log.debug("Declaring new queue for MessageConsumerListener [%s]", listener.queueName)
channel.queueDeclare(listener.queueName)
channel.queueBind(listener.queueName, exchangeName, listener.routingKey)
}
val listenerTag = channel.basicConsume(listener.queueName, false, new DefaultConsumer(channel) with Logging {
override def handleDelivery(tag: String, envelope: Envelope, properties: RabbitMQ.BasicProperties, payload: Array[Byte]) {
log.debug("Binding new queue for MessageConsumerListener [%s]", listener.queueName)
channel.queueBind(listener.queueName, exchangeName, listener.routingKey)
val listenerTag = channel.basicConsume(listener.queueName, true, new DefaultConsumer(channel) with Logging {
override def handleDelivery(tag: String,
envelope: Envelope,
properties: RabbitMQ.BasicProperties,
payload: Array[Byte]) {
try {
val mandatory = false // FIXME: where to find out if it's mandatory?
val immediate = false // FIXME: where to find out if it's immediate?
log.debug("Passing a message on to the MessageConsumerListener [%s]", listener.toString(exchangeName))
listener.actor ! Message(payload, envelope.getRoutingKey, mandatory, immediate, properties)
channel.basicAck(envelope.getDeliveryTag, false)
val deliveryTag = envelope.getDeliveryTag
log.debug("Acking message with delivery tag [%s]", deliveryTag)
channel.basicAck(deliveryTag, false)
} catch {
case cause => self ! Failure(cause) // pass on and re-throw exception in endpoint actor to trigger restart and reconnect
case cause =>
log.error("Delivery of message to MessageConsumerListener [%s] failed due to [%s]", listener.toString(exchangeName), cause.toString)
self ! Failure(cause) // pass on and re-throw exception in consumer actor to trigger restart and reconnect
}
}
@ -364,7 +384,8 @@ object AMQP extends Actor {
listeners.elements.toList.map(_._2).find(_.tag == listenerTag) match {
case None => log.warning("Could not find message listener for tag [%s]; can't shut listener down", listenerTag)
case Some(listener) =>
log.warning("Message listener listener [%s] is being shutdown by [%s] due to [%s]", listener.toString(exchangeName), signal.getReference, signal.getReason)
log.warning("Message listener listener [%s] is being shutdown by [%s] due to [%s]",
listener.toString(exchangeName), signal.getReference, signal.getReason)
self ! CancelMessageConsumerListener(listener)
}
}
@ -397,20 +418,20 @@ object AMQP extends Actor {
protected def setupChannel
def createQueue: String = channel.queueDeclare.getQueue
def createQueue: String = channel.queueDeclare("", false, false, true, true, null).getQueue
def createQueue(name: String) { channel.queueDeclare(name) }
def createQueue(name: String) = channel.queueDeclare(name, false, false, true, true, null).getQueue
def createQueue(name: String, durable: Boolean) { channel.queueDeclare(name, durable) }
def createQueue(name: String, durable: Boolean) = channel.queueDeclare(name, false, durable, true, true, null).getQueue
def createBindQueue: String = {
val name = channel.queueDeclare.getQueue
val name = createQueue
channel.queueBind(name, exchangeName, name)
name
}
def createBindQueue(name: String) {
channel.queueDeclare(name)
createQueue(name)
channel.queueBind(name, exchangeName, name)
}