Made the AMQP message consumer listener aware of if its is using a already defined queue or not

This commit is contained in:
jboner 2009-10-27 14:42:32 +01:00
parent e65a4f1ece
commit fd9070a022
2 changed files with 80 additions and 57 deletions

View file

@ -50,7 +50,6 @@ object AMQP extends Actor {
trapExit = true
start
// ====== MESSAGES =====
class Message(val payload: AnyRef,
val routingKey: String,
val mandatory: Boolean,
@ -63,6 +62,7 @@ object AMQP extends Actor {
", immediate=" + immediate +
", properties=" + properties + "]"
}
object Message {
def unapply(message: Message): Option[Tuple5[AnyRef, String, Boolean, Boolean, RabbitMQ.BasicProperties]] =
Some((message.payload, message.routingKey, message.mandatory, message.immediate, message.properties))
@ -74,15 +74,32 @@ object AMQP extends Actor {
new Message(payload, routingKey, false, false, null)
}
case class MessageConsumerListener(queueName: String, routingKey: String, actor: Actor) {
var tag: Option[String] = None
private[akka] sealed trait AMQPInternalMessage
private[akka] case class MessageConsumerListener(queueName: String, routingKey: String, isUsingExistingQueue: Boolean, actor: Actor)
extends AMQPInternalMessage {
private[akka] var tag: Option[String] = None
override def toString(): String = "MessageConsumerListener[actor=" + actor + ", queue=" + queueName + ", routingKey=" + routingKey + "]"
override def toString() =
"MessageConsumerListener[actor=" + actor +
", queue=" + queueName +
", routingKey=" + routingKey +
", tag=" + tag +
", isUsingExistingQueue=" + isUsingExistingQueue + "]"
def toString(exchangeName: String) =
"MessageConsumerListener[actor=" + actor +
", exchange=" + exchangeName +
", queue=" + queueName +
", routingKey=" + routingKey +
", tag=" + tag +
", isUsingExistingQueue=" + isUsingExistingQueue + "]"
override def hashCode(): Int = synchronized {
var result = HashCode.SEED
result = HashCode.hash(result, queueName)
result = HashCode.hash(result, routingKey)
result = HashCode.hash(result, isUsingExistingQueue)
result = if (tag.isDefined) HashCode.hash(result, tag.get) else result
result
}
@ -90,16 +107,19 @@ object AMQP extends Actor {
that != null &&
that.isInstanceOf[MessageConsumerListener] &&
that.asInstanceOf[MessageConsumerListener].queueName== queueName &&
that.asInstanceOf[MessageConsumerListener].routingKey == routingKey
that.asInstanceOf[MessageConsumerListener].routingKey == routingKey &&
that.asInstanceOf[MessageConsumerListener].isUsingExistingQueue == isUsingExistingQueue &&
that.asInstanceOf[MessageConsumerListener].tag.isDefined == tag.isDefined &&
{ if (that.asInstanceOf[MessageConsumerListener].tag.isDefined) { that.asInstanceOf[MessageConsumerListener].tag.get == tag.get } else true}
}
}
case class CancelMessageConsumerListener(consumer: MessageConsumerListener)
case class Reconnect(delay: Long)
case class Failure(cause: Throwable)
case object Stop
private[akka] case class CancelMessageConsumerListener(consumer: MessageConsumerListener) extends AMQPInternalMessage
private[akka] case class Reconnect(delay: Long) extends AMQPInternalMessage
private[akka] case class Failure(cause: Throwable) extends AMQPInternalMessage
private[akka] case object Stop extends AMQPInternalMessage
class MessageNotDeliveredException(
private[akka] class MessageNotDeliveredException(
val message: String,
val replyCode: Int,
val replyText: String,
@ -210,7 +230,7 @@ object AMQP extends Actor {
stop
}
def setupChannel = {
protected def setupChannel = {
connection = connectionFactory.newConnection(hostname, port)
channel = connection.createChannel
returnListener match {
@ -262,13 +282,48 @@ object AMQP extends Actor {
faultHandler = Some(OneForOneStrategy(5, 5000))
trapExit = true
val listeners = new HashMap[MessageConsumerListener, MessageConsumerListener]
private val listeners = new HashMap[MessageConsumerListener, MessageConsumerListener]
setupChannel
log.info("AMQP.Consumer [%s] is started", toString)
def setupChannel = {
def receive = {
case listener: MessageConsumerListener =>
startLink(listener.actor)
listeners.put(listener, listener)
setupConsumer(listener)
log.info("Message consumer listener is registered [%s]", listener)
case CancelMessageConsumerListener(listener) =>
listeners.get(listener) match {
case None => log.warning("Can't unregister message consumer listener [%s]; no such listener", listener.toString(exchangeName))
case Some(listener) =>
listeners - listener
listener.tag match {
case None => log.warning("Can't unregister message consumer listener [%s]; no listener tag", listener.toString(exchangeName))
case Some(tag) =>
channel.basicCancel(tag)
unlink(listener.actor)
listener.actor.stop
log.debug("Message consumer is cancelled and shut down [%s]", listener)
}
}
case message @ Message(payload, routingKey, mandatory, immediate, properties) =>
log.debug("Sending message [%s]", message)
channel.basicPublish(exchangeName, routingKey, mandatory, immediate, properties, serializer.out(payload))
case Reconnect(delay) => reconnect(delay)
case Failure(cause) => log.error(cause, ""); throw cause
case Stop => disconnect; stop
case unknown => throw new IllegalArgumentException("Unknown message [" + unknown + "] to AMQP Consumer [" + this + "]")
}
protected def setupChannel = {
connection = connectionFactory.newConnection(hostname, port)
channel = connection.createChannel
channel.exchangeDeclare(exchangeName.toString, exchangeType.toString,
@ -278,9 +333,12 @@ object AMQP extends Actor {
if (shutdownListener.isDefined) connection.addShutdownListener(shutdownListener.get)
}
def setupConsumer(listener: MessageConsumerListener) = {
channel.queueDeclare(listener.queueName)
channel.queueBind(listener.queueName, exchangeName, listener.routingKey)
private def setupConsumer(listener: MessageConsumerListener) = {
log.debug("Adding MessageConsumerListener %s", listener.toString(exchangeName))
if (!listener.isUsingExistingQueue) {
channel.queueDeclare(listener.queueName)
channel.queueBind(listener.queueName, exchangeName, listener.routingKey)
}
val listenerTag = channel.basicConsume(listener.queueName, false, new DefaultConsumer(channel) with Logging {
override def handleDelivery(tag: String, envelope: Envelope, properties: RabbitMQ.BasicProperties, payload: Array[Byte]) {
@ -296,7 +354,7 @@ object AMQP extends Actor {
listeners.elements.toList.map(_._2).find(_.tag == listenerTag) match {
case None => log.warning("Could not find message listener for tag [%s]; can't shut listener down", listenerTag)
case Some(listener) =>
log.warning("Message listener listener [%s] is being shutdown by [%s] due to [%s]", listener, signal.getReference, signal.getReason)
log.warning("Message listener listener [%s] is being shutdown by [%s] due to [%s]", listener.toString(exchangeName), signal.getReference, signal.getReason)
self ! CancelMessageConsumerListener(listener)
}
}
@ -304,41 +362,6 @@ object AMQP extends Actor {
listener.tag = Some(listenerTag)
}
def receive = {
case listener: MessageConsumerListener =>
startLink(listener.actor)
listeners.put(listener, listener)
setupConsumer(listener)
log.info("Message consumer listener is registered [%s]", listener)
case CancelMessageConsumerListener(hash) =>
listeners.get(hash) match {
case None => log.warning("Can't unregister message consumer listener [%s]; no such listener", hash)
case Some(listener) =>
listeners - listener
listener.tag match {
case None => log.warning("Can't unregister message consumer listener [%s]; no listener tag", listener)
case Some(tag) =>
channel.basicCancel(tag)
unlink(listener.actor)
listener.actor.stop
log.info("Message consumer is cancelled and shut down [%s]", listener)
}
}
case message @ Message(payload, routingKey, mandatory, immediate, properties) =>
log.debug("Sending message [%s]", message)
channel.basicPublish(exchangeName, routingKey, mandatory, immediate, properties, serializer.out(payload))
case Reconnect(delay) => reconnect(delay)
case Failure(cause) => log.error(cause, ""); throw cause
case Stop => disconnect; stop
case unknown => throw new IllegalArgumentException("Unknown message [" + unknown + "] to AMQP Consumer [" + this + "]")
}
override def toString(): String =
"AMQP.Consumer[hostname=" + hostname +
", port=" + port +
@ -362,7 +385,7 @@ object AMQP extends Actor {
val exchangeName: String
val connectionFactory: ConnectionFactory
def setupChannel
protected def setupChannel
def createQueue: String = channel.queueDeclare.getQueue
@ -424,7 +447,7 @@ object AMQP extends Actor {
override def postRestart(reason: AnyRef, config: Option[AnyRef]) = reconnect(initReconnectDelay)
}
def receive: PartialFunction[Any, Unit] = {
def receive = {
case _ => {} // ignore all messages
}
}

View file

@ -31,7 +31,7 @@ object ExampleSession {
def direct = {
val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, IM, ExchangeType.Direct, SERIALIZER, None, 100, false, false, Map[String, AnyRef]())
consumer ! MessageConsumerListener("@george_bush", "direct", new Actor() {
consumer ! MessageConsumerListener("@george_bush", "direct", false, new Actor() {
def receive: PartialFunction[Any, Unit] = {
case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", payload)
}
@ -42,12 +42,12 @@ object ExampleSession {
def fanout = {
val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, CHAT, ExchangeType.Fanout, SERIALIZER, None, 100, false, false, Map[String, AnyRef]())
consumer ! MessageConsumerListener("@george_bush", "", new Actor() {
consumer ! MessageConsumerListener("@george_bush", "", false, new Actor() {
def receive: PartialFunction[Any, Unit] = {
case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", payload)
}
})
consumer ! MessageConsumerListener("@barack_obama", "", new Actor() {
consumer ! MessageConsumerListener("@barack_obama", "", false, new Actor() {
def receive: PartialFunction[Any, Unit] = {
case Message(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", payload)
}