diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 55ca1a8e99..ce7deaf655 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -50,7 +50,6 @@ object AMQP extends Actor { trapExit = true start - // ====== MESSAGES ===== class Message(val payload: AnyRef, val routingKey: String, val mandatory: Boolean, @@ -63,6 +62,7 @@ object AMQP extends Actor { ", immediate=" + immediate + ", properties=" + properties + "]" } + object Message { def unapply(message: Message): Option[Tuple5[AnyRef, String, Boolean, Boolean, RabbitMQ.BasicProperties]] = Some((message.payload, message.routingKey, message.mandatory, message.immediate, message.properties)) @@ -74,15 +74,32 @@ object AMQP extends Actor { new Message(payload, routingKey, false, false, null) } - case class MessageConsumerListener(queueName: String, routingKey: String, actor: Actor) { - var tag: Option[String] = None + private[akka] sealed trait AMQPInternalMessage + private[akka] case class MessageConsumerListener(queueName: String, routingKey: String, isUsingExistingQueue: Boolean, actor: Actor) + extends AMQPInternalMessage { + private[akka] var tag: Option[String] = None - override def toString(): String = "MessageConsumerListener[actor=" + actor + ", queue=" + queueName + ", routingKey=" + routingKey + "]" + override def toString() = + "MessageConsumerListener[actor=" + actor + + ", queue=" + queueName + + ", routingKey=" + routingKey + + ", tag=" + tag + + ", isUsingExistingQueue=" + isUsingExistingQueue + "]" + + def toString(exchangeName: String) = + "MessageConsumerListener[actor=" + actor + + ", exchange=" + exchangeName + + ", queue=" + queueName + + ", routingKey=" + routingKey + + ", tag=" + tag + + ", isUsingExistingQueue=" + isUsingExistingQueue + "]" override def hashCode(): Int = synchronized { var result = HashCode.SEED result = HashCode.hash(result, queueName) result = HashCode.hash(result, routingKey) + result = HashCode.hash(result, isUsingExistingQueue) + result = if (tag.isDefined) HashCode.hash(result, tag.get) else result result } @@ -90,16 +107,19 @@ object AMQP extends Actor { that != null && that.isInstanceOf[MessageConsumerListener] && that.asInstanceOf[MessageConsumerListener].queueName== queueName && - that.asInstanceOf[MessageConsumerListener].routingKey == routingKey + that.asInstanceOf[MessageConsumerListener].routingKey == routingKey && + that.asInstanceOf[MessageConsumerListener].isUsingExistingQueue == isUsingExistingQueue && + that.asInstanceOf[MessageConsumerListener].tag.isDefined == tag.isDefined && + { if (that.asInstanceOf[MessageConsumerListener].tag.isDefined) { that.asInstanceOf[MessageConsumerListener].tag.get == tag.get } else true} } } - case class CancelMessageConsumerListener(consumer: MessageConsumerListener) - case class Reconnect(delay: Long) - case class Failure(cause: Throwable) - case object Stop + private[akka] case class CancelMessageConsumerListener(consumer: MessageConsumerListener) extends AMQPInternalMessage + private[akka] case class Reconnect(delay: Long) extends AMQPInternalMessage + private[akka] case class Failure(cause: Throwable) extends AMQPInternalMessage + private[akka] case object Stop extends AMQPInternalMessage - class MessageNotDeliveredException( + private[akka] class MessageNotDeliveredException( val message: String, val replyCode: Int, val replyText: String, @@ -210,7 +230,7 @@ object AMQP extends Actor { stop } - def setupChannel = { + protected def setupChannel = { connection = connectionFactory.newConnection(hostname, port) channel = connection.createChannel returnListener match { @@ -262,13 +282,48 @@ object AMQP extends Actor { faultHandler = Some(OneForOneStrategy(5, 5000)) trapExit = true - val listeners = new HashMap[MessageConsumerListener, MessageConsumerListener] + private val listeners = new HashMap[MessageConsumerListener, MessageConsumerListener] setupChannel log.info("AMQP.Consumer [%s] is started", toString) - def setupChannel = { + def receive = { + case listener: MessageConsumerListener => + startLink(listener.actor) + listeners.put(listener, listener) + setupConsumer(listener) + log.info("Message consumer listener is registered [%s]", listener) + + case CancelMessageConsumerListener(listener) => + listeners.get(listener) match { + case None => log.warning("Can't unregister message consumer listener [%s]; no such listener", listener.toString(exchangeName)) + case Some(listener) => + listeners - listener + listener.tag match { + case None => log.warning("Can't unregister message consumer listener [%s]; no listener tag", listener.toString(exchangeName)) + case Some(tag) => + channel.basicCancel(tag) + unlink(listener.actor) + listener.actor.stop + log.debug("Message consumer is cancelled and shut down [%s]", listener) + } + } + + case message @ Message(payload, routingKey, mandatory, immediate, properties) => + log.debug("Sending message [%s]", message) + channel.basicPublish(exchangeName, routingKey, mandatory, immediate, properties, serializer.out(payload)) + + case Reconnect(delay) => reconnect(delay) + + case Failure(cause) => log.error(cause, ""); throw cause + + case Stop => disconnect; stop + + case unknown => throw new IllegalArgumentException("Unknown message [" + unknown + "] to AMQP Consumer [" + this + "]") + } + + protected def setupChannel = { connection = connectionFactory.newConnection(hostname, port) channel = connection.createChannel channel.exchangeDeclare(exchangeName.toString, exchangeType.toString, @@ -278,9 +333,12 @@ object AMQP extends Actor { if (shutdownListener.isDefined) connection.addShutdownListener(shutdownListener.get) } - def setupConsumer(listener: MessageConsumerListener) = { - channel.queueDeclare(listener.queueName) - channel.queueBind(listener.queueName, exchangeName, listener.routingKey) + private def setupConsumer(listener: MessageConsumerListener) = { + log.debug("Adding MessageConsumerListener %s", listener.toString(exchangeName)) + if (!listener.isUsingExistingQueue) { + channel.queueDeclare(listener.queueName) + channel.queueBind(listener.queueName, exchangeName, listener.routingKey) + } val listenerTag = channel.basicConsume(listener.queueName, false, new DefaultConsumer(channel) with Logging { override def handleDelivery(tag: String, envelope: Envelope, properties: RabbitMQ.BasicProperties, payload: Array[Byte]) { @@ -296,7 +354,7 @@ object AMQP extends Actor { listeners.elements.toList.map(_._2).find(_.tag == listenerTag) match { case None => log.warning("Could not find message listener for tag [%s]; can't shut listener down", listenerTag) case Some(listener) => - log.warning("Message listener listener [%s] is being shutdown by [%s] due to [%s]", listener, signal.getReference, signal.getReason) + log.warning("Message listener listener [%s] is being shutdown by [%s] due to [%s]", listener.toString(exchangeName), signal.getReference, signal.getReason) self ! CancelMessageConsumerListener(listener) } } @@ -304,41 +362,6 @@ object AMQP extends Actor { listener.tag = Some(listenerTag) } - def receive = { - case listener: MessageConsumerListener => - startLink(listener.actor) - listeners.put(listener, listener) - setupConsumer(listener) - log.info("Message consumer listener is registered [%s]", listener) - - case CancelMessageConsumerListener(hash) => - listeners.get(hash) match { - case None => log.warning("Can't unregister message consumer listener [%s]; no such listener", hash) - case Some(listener) => - listeners - listener - listener.tag match { - case None => log.warning("Can't unregister message consumer listener [%s]; no listener tag", listener) - case Some(tag) => - channel.basicCancel(tag) - unlink(listener.actor) - listener.actor.stop - log.info("Message consumer is cancelled and shut down [%s]", listener) - } - } - - case message @ Message(payload, routingKey, mandatory, immediate, properties) => - log.debug("Sending message [%s]", message) - channel.basicPublish(exchangeName, routingKey, mandatory, immediate, properties, serializer.out(payload)) - - case Reconnect(delay) => reconnect(delay) - - case Failure(cause) => log.error(cause, ""); throw cause - - case Stop => disconnect; stop - - case unknown => throw new IllegalArgumentException("Unknown message [" + unknown + "] to AMQP Consumer [" + this + "]") - } - override def toString(): String = "AMQP.Consumer[hostname=" + hostname + ", port=" + port + @@ -362,7 +385,7 @@ object AMQP extends Actor { val exchangeName: String val connectionFactory: ConnectionFactory - def setupChannel + protected def setupChannel def createQueue: String = channel.queueDeclare.getQueue @@ -424,7 +447,7 @@ object AMQP extends Actor { override def postRestart(reason: AnyRef, config: Option[AnyRef]) = reconnect(initReconnectDelay) } - def receive: PartialFunction[Any, Unit] = { + def receive = { case _ => {} // ignore all messages } } diff --git a/akka-amqp/src/main/scala/ExampleSession.scala b/akka-amqp/src/main/scala/ExampleSession.scala index dedc6bf5ba..9f4d13aed9 100644 --- a/akka-amqp/src/main/scala/ExampleSession.scala +++ b/akka-amqp/src/main/scala/ExampleSession.scala @@ -31,7 +31,7 @@ object ExampleSession { def direct = { val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, IM, ExchangeType.Direct, SERIALIZER, None, 100, false, false, Map[String, AnyRef]()) - consumer ! MessageConsumerListener("@george_bush", "direct", new Actor() { + consumer ! MessageConsumerListener("@george_bush", "direct", false, new Actor() { def receive: PartialFunction[Any, Unit] = { case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", payload) } @@ -42,12 +42,12 @@ object ExampleSession { def fanout = { val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, CHAT, ExchangeType.Fanout, SERIALIZER, None, 100, false, false, Map[String, AnyRef]()) - consumer ! MessageConsumerListener("@george_bush", "", new Actor() { + consumer ! MessageConsumerListener("@george_bush", "", false, new Actor() { def receive: PartialFunction[Any, Unit] = { case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", payload) } }) - consumer ! MessageConsumerListener("@barack_obama", "", new Actor() { + consumer ! MessageConsumerListener("@barack_obama", "", false, new Actor() { def receive: PartialFunction[Any, Unit] = { case Message(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", payload) }