diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index d687619cbd..699a6a9fc7 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -1,86 +1,83 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - package se.scalablesolutions.akka.amqp -import com.rabbitmq.client.{AMQP => RabbitMQ, _} -import com.rabbitmq.client.ConnectionFactory - -import se.scalablesolutions.akka.actor.{Actor, ActorRef, IllegalActorStateException} +import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.actor.Actor._ +import se.scalablesolutions.akka.util.Logging + import se.scalablesolutions.akka.config.OneForOneStrategy -import se.scalablesolutions.akka.config.ScalaConfig._ -import se.scalablesolutions.akka.util.{HashCode, Logging} - -import scala.collection.mutable.HashMap - -import java.util.concurrent.ConcurrentHashMap -import java.util.{Timer, TimerTask} -import java.io.IOException +import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory} +import java.lang.IllegalArgumentException /** - * AMQP Actor API. Implements Producer and Consumer materialized as Actors. + * AMQP Actor API. Implements Connection, Producer and Consumer materialized as Actors. * - *
- *   val params = new ConnectionParameters
- *   params.setUsername("barack")
- *   params.setPassword("obama")
- *   params.setVirtualHost("/")
- *   params.setRequestedHeartbeat(0)
-
- *   val consumer = AMQP.newConsumer(params, hostname, port, exchange, ExchangeType.Direct, Serializer.ScalaJSON, None, 100)
+ * @see se.scalablesolutions.akka.amqp.ExampleSession
  *
- *   consumer ! MessageConsumerListener(queue, routingKey, actor {
- *     case Message(payload, _, _, _, _) => log.debug("Received message: %s", payload)
- *   })
- *
- *   val producer = AMQP.newProducer(params, hostname, port, exchange, Serializer.ScalaJSON, None, None, 100)
- *   producer ! Message("Hi", routingKey)
- * 
- * - * @author Jonas Bonér + * @author Irmo Manie */ object AMQP { + + case class ConnectionParameters( + host: String = ConnectionFactory.DEFAULT_HOST, + port: Int = ConnectionFactory.DEFAULT_AMQP_PORT, + username: String = ConnectionFactory.DEFAULT_USER, + password: String = ConnectionFactory.DEFAULT_PASS, + virtualHost: String = ConnectionFactory.DEFAULT_VHOST, + initReconnectDelay: Long = 5000, + connectionCallback: Option[ActorRef] = None) + + case class ChannelParameters( + exchangeName: String, + exchangeType: ExchangeType, + exchangeDurable: Boolean = false, + exchangeAutoDelete: Boolean = true, + exchangePassive: Boolean = false, + shutdownListener: Option[ShutdownListener] = None, + configurationArguments: Map[String, AnyRef] = Map(), + channelCallback: Option[ActorRef] = None) + + case class ProducerParameters(channelParameters: ChannelParameters, + producerId: Option[String] = None, + returnListener: Option[ReturnListener] = None) + + case class ConsumerParameters(channelParameters: ChannelParameters, + routingKey: String, + deliveryHandler: ActorRef, + queueName: Option[String] = None, + queueDurable: Boolean = false, + queueAutoDelete: Boolean = true, + queuePassive: Boolean = false, + queueExclusive: Boolean = false, + selfAcknowledging: Boolean = true) { + if (queueDurable && queueName.isEmpty) { + throw new IllegalArgumentException("A queue name is required when requesting a durable queue.") + } + } + + def newConnection(connectionParameters: ConnectionParameters = new ConnectionParameters): ActorRef = { + val connection: ActorRef = supervisor.newConnection(connectionParameters) + connection ! Connect + connection + } + + def newProducer(connection: ActorRef, producerParameters: ProducerParameters): ActorRef = { + val producer: ActorRef = Actor.actorOf(new ProducerActor(producerParameters)) + connection.startLink(producer) + producer ! Start + producer + } + + def newConsumer(connection: ActorRef, consumerParameters: ConsumerParameters): ActorRef = { + val consumer: ActorRef = actorOf(new ConsumerActor(consumerParameters)) + connection.startLink(consumer) + consumer ! Start + consumer + } + private val supervisor = new AMQPSupervisor - def newProducer( - config: ConnectionParameters, - hostname: String, - port: Int, - exchangeName: String, - returnListener: Option[ReturnListener], - shutdownListener: Option[ShutdownListener], - initReconnectDelay: Long): ActorRef = - supervisor.newProducer( - config, hostname, port, exchangeName, returnListener, shutdownListener, initReconnectDelay) - - def newConsumer( - config: ConnectionParameters, - hostname: String, - port: Int, - exchangeName: String, - exchangeType: ExchangeType, - shutdownListener: Option[ShutdownListener], - initReconnectDelay: Long, - passive: Boolean, - durable: Boolean, - autoDelete: Boolean, - configurationArguments: Map[String, AnyRef]): ActorRef = - supervisor.newConsumer( - config, hostname, port, exchangeName, exchangeType, - shutdownListener, initReconnectDelay, - passive, durable, autoDelete, configurationArguments) - - def stopConnection(connection: ActorRef) = supervisor.stopConnection(connection) - - /** - * @author Jonas Bonér - */ class AMQPSupervisor extends Logging { - class AMQPSupervisorActor extends Actor { - import scala.collection.JavaConversions._ import self._ faultHandler = Some(OneForOneStrategy(5, 5000)) @@ -91,523 +88,12 @@ object AMQP { } } - import scala.collection.JavaConversions._ - private val supervisor = actorOf(new AMQPSupervisorActor).start - private val connections = new ConcurrentHashMap[ActorRef, ActorRef] - def newProducer( - config: ConnectionParameters, - hostname: String, - port: Int, - exchangeName: String, - returnListener: Option[ReturnListener], - shutdownListener: Option[ShutdownListener], - initReconnectDelay: Long): ActorRef = { - val producer = actorOf(new Producer( - new ConnectionFactory(config), - hostname, port, - exchangeName, - returnListener, - shutdownListener, - initReconnectDelay)) - supervisor.startLink(producer) - producer + def newConnection(connectionParameters: ConnectionParameters): ActorRef = { + val connectionActor = actorOf(new FaultTolerantConnectionActor(connectionParameters)) + supervisor.startLink(connectionActor) + connectionActor } - - def newConsumer( - config: ConnectionParameters, - hostname: String, - port: Int, - exchangeName: String, - exchangeType: ExchangeType, - shutdownListener: Option[ShutdownListener], - initReconnectDelay: Long, - passive: Boolean, - durable: Boolean, - autoDelete: Boolean, - configurationArguments: Map[String, AnyRef]): ActorRef = { - val consumer = actorOf(new Consumer( - new ConnectionFactory(config), - hostname, port, - exchangeName, - exchangeType, - shutdownListener, - initReconnectDelay, - passive, - durable, - autoDelete, - configurationArguments)) - supervisor.startLink(consumer) - consumer - } - - def stopConnection(connection: ActorRef) = { - connection ! Stop - supervisor.unlink(connection) - connections.remove(connection) - } - - def shutdown = { - asMap(connections).valuesIterator.foreach(_ ! Stop) - exit - } - } - - sealed trait AMQPMessage - private[akka] trait InternalAMQPMessage extends AMQPMessage - - /** - * @author Jonas Bonér - */ - class Message(val payload: Array[Byte], - val routingKey: String, - val mandatory: Boolean, - val immediate: Boolean, - val properties: RabbitMQ.BasicProperties) extends AMQPMessage { - override def toString(): String = - "Message[payload=" + payload + - ", routingKey=" + routingKey + - ", mandatory=" + mandatory + - ", immediate=" + immediate + - ", properties=" + properties + "]" - } - - /** - * @author Jonas Bonér - */ - object Message { - def unapply(message: Message): Option[Tuple5[AnyRef, String, Boolean, Boolean, RabbitMQ.BasicProperties]] = - Some((message.payload, message.routingKey, message.mandatory, message.immediate, message.properties)) - - def apply(payload: Array[Byte], routingKey: String, mandatory: Boolean, immediate: Boolean, properties: RabbitMQ.BasicProperties): Message = - new Message(payload, routingKey, mandatory, immediate, properties) - - def apply(payload: Array[Byte], routingKey: String): Message = - new Message(payload, routingKey, false, false, null) - } - - /** - * @author Jonas Bonér - */ - class MessageConsumerListener(val queueName: String, - val routingKey: String, - val exclusive: Boolean, - val autoDelete: Boolean, - val isUsingExistingQueue: Boolean, - val actor: ActorRef) extends AMQPMessage { - /** - * Creates a non-exclusive, non-autodelete message listener. - */ - def this(queueName: String, routingKey: String, actor: ActorRef) = this (queueName, routingKey, false, false, false, actor) - - private[akka] var tag: Option[String] = None - - override def toString() = - "MessageConsumerListener[actor=" + actor + - ", queue=" + queueName + - ", routingKey=" + routingKey + - ", tag=" + tag + - ", exclusive=" + exclusive + - ", autoDelete=" + autoDelete + - ", isUsingExistingQueue=" + isUsingExistingQueue + "]" - - def toString(exchangeName: String) = - "MessageConsumerListener[actor=" + actor + - ", exchange=" + exchangeName + - ", queue=" + queueName + - ", routingKey=" + routingKey + - ", tag=" + tag + - ", exclusive=" + exclusive + - ", autoDelete=" + autoDelete + - ", isUsingExistingQueue=" + isUsingExistingQueue + "]" - - /** - * Hash code should only be based on on queue name and routing key. - */ - override def hashCode(): Int = synchronized { - var result = HashCode.SEED - result = HashCode.hash(result, queueName) - result = HashCode.hash(result, routingKey) - result - } - - /** - * Equality should only be defined in terms of queue name and routing key. - */ - override def equals(that: Any): Boolean = synchronized { - that != null && - that.isInstanceOf[MessageConsumerListener] && - that.asInstanceOf[MessageConsumerListener].queueName == queueName && - that.asInstanceOf[MessageConsumerListener].routingKey == routingKey - } - } - object MessageConsumerListener { - def apply(queueName: String, - routingKey: String, - exclusive: Boolean, - autoDelete: Boolean, - isUsingExistingQueue: Boolean, - actor: ActorRef) = - new MessageConsumerListener(queueName, routingKey, exclusive, autoDelete, isUsingExistingQueue, actor) - - def apply(queueName: String, - routingKey: String, - actor: ActorRef) = - new MessageConsumerListener(queueName, routingKey, false, false, false, actor) - } - - case object Stop extends AMQPMessage - - case class UnregisterMessageConsumerListener(consumer: MessageConsumerListener) extends InternalAMQPMessage - - private[akka] case class Reconnect(delay: Long) extends InternalAMQPMessage - - private[akka] case class Failure(cause: Throwable) extends InternalAMQPMessage - - private[akka] class MessageNotDeliveredException( - val message: String, - val replyCode: Int, - val replyText: String, - val exchange: String, - val routingKey: String, - val properties: RabbitMQ.BasicProperties, - val body: Array[Byte]) extends RuntimeException(message) - - sealed trait ExchangeType - object ExchangeType { - case object Direct extends ExchangeType { - override def toString = "direct" - } - case object Topic extends ExchangeType { - override def toString = "topic" - } - case object Fanout extends ExchangeType { - override def toString = "fanout" - } - case object Match extends ExchangeType { - override def toString = "match" - } - } - - /** - * @author Jonas Bonér - */ - class Producer private[amqp]( - val connectionFactory: ConnectionFactory, - val hostname: String, - val port: Int, - val exchangeName: String, - val returnListener: Option[ReturnListener], - val shutdownListener: Option[ShutdownListener], - val initReconnectDelay: Long) - extends FaultTolerantConnectionActor { - setupChannel - - log.info("AMQP.Producer [%s] is started", toString) - - def newRpcClient(routingKey: String): RpcClient = new RpcClient(channel, exchangeName, routingKey) - - def receive = { - case message@Message(payload, routingKey, mandatory, immediate, properties) => - log.debug("Sending message [%s]", message) - channel.basicPublish( - exchangeName, routingKey, mandatory, immediate, properties, payload.asInstanceOf[Array[Byte]]) - case Stop => - disconnect - exit - } - - protected def setupChannel = { - connection = connectionFactory.newConnection(hostname, port) - channel = connection.createChannel - returnListener match { - case Some(listener) => channel.setReturnListener(listener) - case None => channel.setReturnListener(new ReturnListener() { - def handleBasicReturn( - replyCode: Int, - replyText: String, - exchange: String, - routingKey: String, - properties: RabbitMQ.BasicProperties, - body: Array[Byte]) = { - throw new MessageNotDeliveredException( - "Could not deliver message [" + body + - "] with reply code [" + replyCode + - "] with reply text [" + replyText + - "] and routing key [" + routingKey + - "] to exchange [" + exchange + "]", - replyCode, replyText, exchange, routingKey, properties, body) - } - }) - } - if (shutdownListener.isDefined) connection.addShutdownListener(shutdownListener.get) - } - - override def toString(): String = - "AMQP.Producer[hostname=" + hostname + - ", port=" + port + - ", exchange=" + exchangeName + "]" - } - - /** - * @author Jonas Bonér - */ - class Consumer private[amqp]( - val connectionFactory: ConnectionFactory, - val hostname: String, - val port: Int, - val exchangeName: String, - val exchangeType: ExchangeType, - val shutdownListener: Option[ShutdownListener], - val initReconnectDelay: Long, - val passive: Boolean, - val durable: Boolean, - val autoDelete: Boolean, - val configurationArguments: Map[java.lang.String, Object]) - extends FaultTolerantConnectionActor { - - import scala.collection.JavaConversions._ - import self._ - - faultHandler = Some(OneForOneStrategy(5, 5000)) - trapExit = List(classOf[Throwable]) - - //FIXME use better strategy to convert scala.immutable.Map to java.util.Map - private val jConfigMap = configurationArguments.foldLeft(new java.util.HashMap[String,Object]){ (m,kv) => { m.put(kv._1,kv._2); m } } - - private val listeners = new HashMap[MessageConsumerListener, MessageConsumerListener] - - setupChannel - - log.info("AMQP.Consumer [%s] is started", toString) - - def newRpcServerWithCallback(body: (Array[Byte], RabbitMQ.BasicProperties) => Array[Byte]): RpcServer = { - new RpcServer(channel) { - override def handleCall(requestBody: Array[Byte], replyProperties: RabbitMQ.BasicProperties) = { - body(requestBody, replyProperties) - } - } - } - - def receive = { - case listener: MessageConsumerListener => - startLink(listener.actor) - registerListener(listener) - log.info("Message consumer listener is registered [%s]", listener) - - case UnregisterMessageConsumerListener(listener) => - unregisterListener(listener) - - case Reconnect(delay) => - reconnect(delay) - - case Failure(cause) => - log.error(cause, "Error in AMQP consumer") - throw cause - - case Stop => - listeners.iterator.toList.map(_._2).foreach(unregisterListener(_)) - disconnect - exit - - case message: Message => - handleIllegalMessage( - "AMQP.Consumer [" + this + "] can't be used to send messages, ignoring message [" + message + "]") - - case unknown => - handleIllegalMessage( - "Unknown message [" + unknown + "] to AMQP Consumer [" + this + "]") - } - - protected def setupChannel = { - connection = connectionFactory.newConnection(hostname, port) - channel = connection.createChannel - channel.exchangeDeclare(exchangeName.toString, exchangeType.toString, passive, durable, autoDelete, jConfigMap) - listeners.iterator.toList.map(_._2).foreach(registerListener) - if (shutdownListener.isDefined) connection.addShutdownListener(shutdownListener.get) - } - - private def registerListener(listener: MessageConsumerListener) = { - log.debug("Register MessageConsumerListener %s", listener.toString(exchangeName)) - listeners.put(listener, listener) - - if (!listener.isUsingExistingQueue) { - log.debug("Declaring new queue for MessageConsumerListener [%s]", listener.queueName) - channel.queueDeclare( - listener.queueName, - passive, durable, - listener.exclusive, listener.autoDelete, - jConfigMap) - } - - log.debug("Binding new queue for MessageConsumerListener [%s]", listener.queueName) - channel.queueBind(listener.queueName, exchangeName, listener.routingKey) - - val listenerTag = channel.basicConsume(listener.queueName, true, new DefaultConsumer(channel) with Logging { - override def handleDelivery(tag: String, - envelope: Envelope, - properties: RabbitMQ.BasicProperties, - payload: Array[Byte]) { - try { - val mandatory = false // FIXME: where to find out if it's mandatory? - val immediate = false // FIXME: where to find out if it's immediate? - log.debug("Passing a message on to the MessageConsumerListener [%s]", listener.toString(exchangeName)) - listener.actor ! Message(payload, envelope.getRoutingKey, mandatory, immediate, properties) - val deliveryTag = envelope.getDeliveryTag - log.debug("Acking message with delivery tag [%s]", deliveryTag) - channel.basicAck(deliveryTag, false) - } catch { - case cause => - log.error( - cause, "Delivery of message to MessageConsumerListener [%s] failed", - listener.toString(exchangeName)) - self ! Failure(cause) // pass on and re-throw exception in consumer actor to trigger restart and reconnect - } - } - - override def handleShutdownSignal(listenerTag: String, signal: ShutdownSignalException) = { - def hasTag(listener: MessageConsumerListener, listenerTag: String): Boolean = { - if (listener.tag.isEmpty) throw new IllegalActorStateException( - "MessageConsumerListener [" + listener + "] does not have a tag") - listener.tag.get == listenerTag - } - listeners.iterator.toList.map(_._2).find(hasTag(_, listenerTag)) match { - case None => log.error( - "Could not find message listener for tag [%s]; can't shut listener down", listenerTag) - case Some(listener) => - log.warning( - "MessageConsumerListener [%s] is being shutdown by [%s] due to [%s]", - listener.toString(exchangeName), signal.getReference, signal.getReason) - self ! UnregisterMessageConsumerListener(listener) - } - } - }) - listener.tag = Some(listenerTag) - } - - private def unregisterListener(listener: MessageConsumerListener) = { - listeners.get(listener) match { - case None => log.warning( - "Can't unregister message consumer listener [%s]; no such listener", - listener.toString(exchangeName)) - case Some(listener) => - listeners -= listener - listener.tag match { - case None => log.warning( - "Can't unregister message consumer listener [%s]; no listener tag", - listener.toString(exchangeName)) - case Some(tag) => - channel.basicCancel(tag) - unlink(listener.actor) - listener.actor.stop - log.debug("Message consumer is cancelled and shut down [%s]", listener) - } - } - } - - private def handleIllegalMessage(errorMessage: String) = { - log.error(errorMessage) - throw new IllegalArgumentException(errorMessage) - } - - override def toString(): String = - "AMQP.Consumer[hostname=" + hostname + - ", port=" + port + - ", exchange=" + exchangeName + - ", type=" + exchangeType + - ", passive=" + passive + - ", durable=" + durable + "]" - } - - /** - * @author Jonas Bonér - */ - trait FaultTolerantConnectionActor extends Actor with Logging { - val reconnectionTimer = new Timer - - var connection: Connection = _ - var channel: Channel = _ - - val hostname: String - val port: Int - val initReconnectDelay: Long - val exchangeName: String - val connectionFactory: ConnectionFactory - - protected def setupChannel - - def createQueue: String = - channel.queueDeclare("", false, false, true, true, null).getQueue - - def createQueue(name: String) = - channel.queueDeclare(name, false, false, true, true, null).getQueue - - def createQueue(name: String, durable: Boolean) = - channel.queueDeclare(name, false, durable, true, true, null).getQueue - - def createQueue(name: String, passive: Boolean, durable: Boolean, exclusive: Boolean, autoDelete: Boolean) = - channel.queueDeclare(name, passive, durable, exclusive, autoDelete, null).getQueue - - def createQueue(name: String, passive: Boolean, durable: Boolean, exclusive: Boolean, autoDelete: Boolean, arguments: java.util.Map[String, AnyRef]) = - channel.queueDeclare(name, passive, durable, exclusive, autoDelete, arguments).getQueue - - def bindQueue(name: String) { - channel.queueBind(name, exchangeName, name) - } - - def createBindQueue: String = { - val name = createQueue - channel.queueBind(name, exchangeName, name) - name - } - - def createBindQueue(name: String) { - createQueue(name) - channel.queueBind(name, exchangeName, name) - } - - def createBindQueue(name: String, durable: Boolean) { - channel.queueDeclare(name, durable) - channel.queueBind(name, exchangeName, name) - } - - def deleteQueue(name: String) {channel.queueDelete(name)} - - protected def disconnect = { - try { - channel.close - } catch { - case e: IOException => log.error("Could not close AMQP channel %s:%s [%s]", hostname, port, this) - case _ => () - } - try { - connection.close - log.debug("Disconnected AMQP connection at %s:%s [%s]", hostname, port, this) - } catch { - case e: IOException => log.error("Could not close AMQP connection %s:%s [%s]", hostname, port, this) - case _ => () - } - } - - protected def reconnect(delay: Long) = { - disconnect - try { - setupChannel - log.debug("Successfully reconnected to AMQP Server %s:%s [%s]", hostname, port, this) - } catch { - case e: Exception => - val waitInMillis = delay * 2 - val outerActorRef = self - log.debug("Trying to reconnect to AMQP server in %n milliseconds [%s]", waitInMillis, this) - reconnectionTimer.schedule(new TimerTask() { - override def run = outerActorRef ! Reconnect(waitInMillis) - }, delay) - } - } - - override def preRestart(reason: Throwable) = disconnect - - override def postRestart(reason: Throwable) = reconnect(initReconnectDelay) } } diff --git a/akka-amqp/src/main/scala/AMQPMessage.scala b/akka-amqp/src/main/scala/AMQPMessage.scala new file mode 100644 index 0000000000..f1e6668935 --- /dev/null +++ b/akka-amqp/src/main/scala/AMQPMessage.scala @@ -0,0 +1,56 @@ +package se.scalablesolutions.akka.amqp + +import se.scalablesolutions.akka.actor.ActorRef +import com.rabbitmq.client.AMQP.BasicProperties +import com.rabbitmq.client.ShutdownSignalException + +sealed trait AMQPMessage +sealed trait InternalAMQPMessage extends AMQPMessage + +case class Message(payload: Array[Byte], + routingKey: String, + mandatory: Boolean = false, + immediate: Boolean = false, + properties: Option[BasicProperties] = None) extends AMQPMessage + +case class Delivery(payload: Array[Byte], + routingKey: String, + deliveryTag: Long, + properties: BasicProperties, + sender: Option[ActorRef]) extends AMQPMessage + + + +// connection messages +case object Connect extends AMQPMessage + +case object Connected extends AMQPMessage +case object Reconnecting extends AMQPMessage +case object Disconnected extends AMQPMessage + +case object ChannelRequest extends InternalAMQPMessage + +// channel messages +case object Start extends AMQPMessage + +case object Started extends AMQPMessage +case object Restarting extends AMQPMessage +case object Stopped extends AMQPMessage + +// delivery messages +case class Acknowledge(deliveryTag: Long) extends AMQPMessage +case class Acknowledged(deliveryTag: Long) extends AMQPMessage + +// internal messages +private[akka] case class Failure(cause: Throwable) extends InternalAMQPMessage +private[akka] case class ConnectionShutdown(cause: ShutdownSignalException) extends InternalAMQPMessage +private[akka] case class ChannelShutdown(cause: ShutdownSignalException) extends InternalAMQPMessage + +private[akka] class MessageNotDeliveredException( + val message: String, + val replyCode: Int, + val replyText: String, + val exchange: String, + val routingKey: String, + val properties: BasicProperties, + val body: Array[Byte]) extends RuntimeException(message) diff --git a/akka-amqp/src/main/scala/ConsumerActor.scala b/akka-amqp/src/main/scala/ConsumerActor.scala new file mode 100644 index 0000000000..0d9ca5cc05 --- /dev/null +++ b/akka-amqp/src/main/scala/ConsumerActor.scala @@ -0,0 +1,105 @@ +package se.scalablesolutions.akka.amqp + +import com.rabbitmq.client.AMQP.Queue.DeclareOk +import collection.JavaConversions +import se.scalablesolutions.akka.amqp.AMQP.ConsumerParameters +import se.scalablesolutions.akka.util.Logging +import com.rabbitmq.client.{Channel, Envelope, DefaultConsumer} +import com.rabbitmq.client.AMQP.BasicProperties +import java.lang.Throwable + +private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) extends FaultTolerantChannelActor(consumerParameters.channelParameters) { + import consumerParameters._ + import channelParameters._ + + var listenerTag: Option[String] = None + + def specificMessageHandler = { + case Acknowledge(deliveryTag) => acknowledgeDeliveryTag(deliveryTag, true) + case message: Message => + handleIllegalMessage("%s can't be used to send messages, ignoring message [%s]".format(this, message)) + case unknown => + handleIllegalMessage("Unknown message [%s] to %s".format(unknown, this)) + } + + protected def setupChannel(ch: Channel) = { + + // todo make nicer + if (!self.linkedActorsAsList.contains(deliveryHandler)) { + self.startLink(deliveryHandler) + } + val queueDeclare: DeclareOk = { + queueName match { + case Some(name) => + log.debug("Declaring new queue [%s] for %s", name, toString) + if (queuePassive) { + ch.queueDeclarePassive(name) + } else { + ch.queueDeclare(name, queueDurable, queueExclusive, queueAutoDelete, JavaConversions.asMap(configurationArguments)) + } + case None => + log.debug("Declaring new generated queue for %s", toString) + ch.queueDeclare + } + } + + log.debug("Binding new queue [%s] for %s", queueDeclare.getQueue, toString) + ch.queueBind(queueDeclare.getQueue, exchangeName, routingKey) + + val tag = ch.basicConsume(queueDeclare.getQueue, false, new DefaultConsumer(ch) with Logging { + override def handleDelivery(tag: String, envelope: Envelope, properties: BasicProperties, payload: Array[Byte]) { + try { + val deliveryTag = envelope.getDeliveryTag + log.debug("Passing a message on to %s", toString) + deliveryHandler ! Delivery(payload, envelope.getRoutingKey, envelope.getDeliveryTag, properties, someSelf) + + if (selfAcknowledging) { + log.debug("Self acking...") + acknowledgeDeliveryTag(deliveryTag, false) + } + } catch { + case cause => + log.error(cause, "Delivery of message to %s failed", toString) + self ! Failure(cause) // pass on and re-throw exception in consumer actor to trigger restart and connect + } + } + }) + listenerTag = Some(tag) + log.info("Intitialized %s", toString) + } + + private def acknowledgeDeliveryTag(deliveryTag: Long, remoteAcknowledgement: Boolean) = { + log.debug("Acking message with delivery tag [%s]", deliveryTag) + channel.foreach{ch => + ch.basicAck(deliveryTag, false) + if (remoteAcknowledgement) { + deliveryHandler ! Acknowledged(deliveryTag) + } + } + } + + private def handleIllegalMessage(errorMessage: String) = { + log.error(errorMessage) + throw new IllegalArgumentException(errorMessage) + } + + + override def preRestart(reason: Throwable) = { + listenerTag = None + super.preRestart(reason) + } + + override def shutdown = { + listenerTag.foreach(tag => channel.foreach(_.basicCancel(tag))) + self.linkedActorsAsList.foreach(_.stop) + super.shutdown + } + + override def toString(): String = + "AMQP.Consumer[id= "+ self.id + + ", exchange=" + exchangeName + + ", exchangeType=" + exchangeType + + ", durable=" + exchangeDurable + + ", autoDelete=" + exchangeAutoDelete + "]" +} + diff --git a/akka-amqp/src/main/scala/ExampleSession.scala b/akka-amqp/src/main/scala/ExampleSession.scala index d2fffd2ae1..79cd97c5bf 100644 --- a/akka-amqp/src/main/scala/ExampleSession.scala +++ b/akka-amqp/src/main/scala/ExampleSession.scala @@ -4,47 +4,119 @@ package se.scalablesolutions.akka.amqp -import se.scalablesolutions.akka.actor.Actor._ - -import com.rabbitmq.client.ConnectionParameters +import se.scalablesolutions.akka.actor.{Actor, ActorRegistry} +import Actor._ +import se.scalablesolutions.akka.amqp.AMQP.{ConnectionParameters, ConsumerParameters, ChannelParameters, ProducerParameters} +import java.util.concurrent.{CountDownLatch, TimeUnit} object ExampleSession { - import AMQP._ - val CONFIG = new ConnectionParameters - val HOSTNAME = "localhost" - val PORT = 5672 - - val IM = "im.whitehouse.gov" - val CHAT = "chat.whitehouse.gov" - def main(args: Array[String]) = { println("==== DIRECT ===") direct - Thread.sleep(1000) + TimeUnit.SECONDS.sleep(2) println("==== FANOUT ===") fanout + + TimeUnit.SECONDS.sleep(2) + + println("==== TOPIC ===") + topic + + TimeUnit.SECONDS.sleep(2) + + println("==== CALLBACK ===") + callback + + TimeUnit.SECONDS.sleep(2) + + ActorRegistry.shutdownAll + System.exit(0) } def direct = { - val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, IM, ExchangeType.Direct, None, 100, false, false, false, Map[String, AnyRef]()) - consumer ! MessageConsumerListener("@george_bush", "direct", actor { - case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]])) - }) - val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, IM, None, None, 100) - producer ! Message("@jonas_boner: You sucked!!".getBytes, "direct") + + // defaults to amqp://guest:guest@localhost:5672/ + val connection = AMQP.newConnection() + + val channelParameters = ChannelParameters("my_direct_exchange", ExchangeType.Direct) + + val consumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "some.routing", actor { + case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) + })) + + val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters)) + producer ! Message("@jonas_boner: You sucked!!".getBytes, "some.routing") } def fanout = { - val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, CHAT, ExchangeType.Fanout, None, 100, false, false, false, Map[String, AnyRef]()) - consumer ! MessageConsumerListener("@george_bush", "", actor { - case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]])) - }) - consumer ! MessageConsumerListener("@barack_obama", "", actor { - case Message(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload.asInstanceOf[Array[Byte]])) - }) - val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, CHAT, None, None, 100) + + // defaults to amqp://guest:guest@localhost:5672/ + val connection = AMQP.newConnection() + + val channelParameters = ChannelParameters("my_fanout_exchange", ExchangeType.Fanout) + + val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "@george_bush", actor { + case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) + })) + + val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "@barack_obama", actor { + case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload)) + })) + + val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters)) producer ! Message("@jonas_boner: I'm going surfing".getBytes, "") } + + def topic = { + + // defaults to amqp://guest:guest@localhost:5672/ + val connection = AMQP.newConnection() + + val channelParameters = ChannelParameters("my_topic_exchange", ExchangeType.Topic) + + val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "@george_bush", actor { + case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) + })) + + val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "@barack_obama", actor { + case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload)) + })) + + val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters)) + producer ! Message("@jonas_boner: You still suck!!".getBytes, "@george_bush") + producer ! Message("@jonas_boner: Yes I can!".getBytes, "@barack_obama") + } + + def callback = { + val channelCountdown = new CountDownLatch(2) + + val connectionCallback = actor { + case Connected => log.info("Connection callback: Connected!") + case Reconnecting => () // not used, sent when connection fails and initiates a reconnect + case Disconnected => log.info("Connection callback: Disconnected!") + } + val connection = AMQP.newConnection(new ConnectionParameters(connectionCallback = Some(connectionCallback))) + + val channelCallback = actor { + case Started => { + log.info("Channel callback: Started") + channelCountdown.countDown + } + case Restarting => // not used, sent when channel or connection fails and initiates a restart + case Stopped => log.info("Channel callback: Stopped") + } + val channelParameters = ChannelParameters("my_direct_exchange", ExchangeType.Direct, channelCallback = Some(channelCallback)) + + val consumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "callback.routing", actor { + case _ => () // not used + })) + + val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters)) + + // Wait until both channels (producer & consumer) are started before stopping the connection + channelCountdown.await(2, TimeUnit.SECONDS) + connection.stop + } } diff --git a/akka-amqp/src/main/scala/ExchangeType.scala b/akka-amqp/src/main/scala/ExchangeType.scala new file mode 100644 index 0000000000..b9598fbed9 --- /dev/null +++ b/akka-amqp/src/main/scala/ExchangeType.scala @@ -0,0 +1,17 @@ +package se.scalablesolutions.akka.amqp + +sealed trait ExchangeType +object ExchangeType { + case object Direct extends ExchangeType { + override def toString = "direct" + } + case object Topic extends ExchangeType { + override def toString = "topic" + } + case object Fanout extends ExchangeType { + override def toString = "fanout" + } + case object Match extends ExchangeType { + override def toString = "match" + } +} diff --git a/akka-amqp/src/main/scala/FaultTolerantChannelActor.scala b/akka-amqp/src/main/scala/FaultTolerantChannelActor.scala new file mode 100644 index 0000000000..40cd0ff753 --- /dev/null +++ b/akka-amqp/src/main/scala/FaultTolerantChannelActor.scala @@ -0,0 +1,101 @@ +package se.scalablesolutions.akka.amqp + +import collection.JavaConversions +import java.lang.Throwable +import se.scalablesolutions.akka.actor.Actor +import Actor._ +import se.scalablesolutions.akka.amqp.AMQP.ChannelParameters +import com.rabbitmq.client.{ShutdownSignalException, Channel, ShutdownListener} +import scala.PartialFunction + +abstract private[amqp] class FaultTolerantChannelActor(channelParameters: ChannelParameters) extends Actor { + import channelParameters._ + + protected[amqp] var channel: Option[Channel] = None + log.info("%s is started", toString) + + override def receive = channelMessageHandler orElse specificMessageHandler + + // to be defined in subclassing actor + def specificMessageHandler: PartialFunction[Any, Unit] + + private def channelMessageHandler: PartialFunction[Any, Unit] = { + case Start => + // ask the connection for a new channel + self.supervisor.foreach { + sup => + log.info("%s is requesting new channel from supervising connection", toString) + val newChannel: Option[Option[Channel]] = (sup !! ChannelRequest).as[Option[Channel]] + newChannel.foreach(ch => ch.foreach(c => setupChannelInternal(c))) + } + case ch: Channel => { + setupChannelInternal(ch) + } + case ChannelShutdown(cause) => { + closeChannel + if (cause.isHardError) { + // connection error + if (cause.isInitiatedByApplication) { + log.info("%s got normal shutdown", toString) + } else { + log.error(cause, "%s got hard error", toString) + } + } else { + // channel error + log.error(cause, "%s self restarting because of channel shutdown", toString) + notifyCallback(Restarting) + self ! Start + } + } + case Failure(cause) => + log.error(cause, "%s self restarting because of channel failure", toString) + closeChannel + notifyCallback(Restarting) + self ! Start + } + + // to be defined in subclassing actor + protected def setupChannel(ch: Channel) + + private def setupChannelInternal(ch: Channel) = if (channel.isEmpty) { + log.info("Exchange declare") + if (exchangePassive) { + ch.exchangeDeclarePassive(exchangeName) + } else { + ch.exchangeDeclare(exchangeName, exchangeType.toString, exchangeDurable, exchangeAutoDelete, JavaConversions.asMap(configurationArguments)) + } + ch.addShutdownListener(new ShutdownListener { + def shutdownCompleted(cause: ShutdownSignalException) = { + self ! ChannelShutdown(cause) + } + }) + shutdownListener.foreach(sdl => ch.getConnection.addShutdownListener(sdl)) + + log.info("shutdown listener added") + setupChannel(ch) + channel = Some(ch) + notifyCallback(Started) + log.info("Channel setup for %s", toString) + } + + private def closeChannel = { + channel.foreach { + ch => + if (ch.isOpen) ch.close + notifyCallback(Stopped) + log.info("%s channel closed", toString) + } + channel = None + } + + private def notifyCallback(message: AMQPMessage) = { + channelCallback.foreach(cb => if (cb.isRunning) cb ! message) + } + + override def preRestart(reason: Throwable) = { + notifyCallback(Restarting) + closeChannel + } + + override def shutdown = closeChannel +} \ No newline at end of file diff --git a/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala b/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala new file mode 100644 index 0000000000..486b2e1311 --- /dev/null +++ b/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala @@ -0,0 +1,116 @@ +package se.scalablesolutions.akka.amqp + +import java.util.{TimerTask, Timer} +import java.io.IOException +import se.scalablesolutions.akka.util.Logging +import com.rabbitmq.client._ +import se.scalablesolutions.akka.amqp.AMQP.ConnectionParameters +import se.scalablesolutions.akka.actor.{Exit, Actor} +import se.scalablesolutions.akka.config.ScalaConfig.{Permanent, LifeCycle} + +private[amqp] class FaultTolerantConnectionActor(connectionParameters: ConnectionParameters) extends Actor with Logging { + import connectionParameters._ + + self.id = "amqp-connection-%s".format(host) + self.lifeCycle = Some(LifeCycle(Permanent)) + + val reconnectionTimer = new Timer("%s-timer".format(self.id)) + + val connectionFactory: ConnectionFactory = new ConnectionFactory() + connectionFactory.setHost(host) + connectionFactory.setPort(port) + connectionFactory.setUsername(username) + connectionFactory.setPassword(password) + connectionFactory.setVirtualHost(virtualHost) + + var connection: Option[Connection] = None + + protected def receive = { + case Connect => connect + case ChannelRequest => { + connection match { + case Some(conn) => { + val chanel: Channel = conn.createChannel + self.reply(Some(chanel)) + } + case None => { + log.warning("Unable to create new channel - no connection") + reply(None) + } + } + } + case ConnectionShutdown(cause) => { + disconnect + if (cause.isHardError) { + // connection error + if (cause.isInitiatedByApplication) { + log.info("ConnectionShutdown by application [%s]", self.id) + } else { + log.error(cause, "ConnectionShutdown is hard error - self terminating") + self ! new Exit(self, cause) + } + } + } + } + + private def connect = if (connection.isEmpty || !connection.get.isOpen) { + + try { + connection = Some(connectionFactory.newConnection) + connection.foreach { + conn => + conn.addShutdownListener(new ShutdownListener { + def shutdownCompleted(cause: ShutdownSignalException) = { + self ! ConnectionShutdown(cause) + } + }) + log.info("Successfully (re)connected to AMQP Server %s:%s [%s]", host, port, self.id) + log.debug("Sending new channel to %d already linked actors", self.linkedActorsAsList.size) + self.linkedActorsAsList.foreach(_ ! conn.createChannel) + notifyCallback(Connected) + } + } catch { + case e: Exception => + connection = None + log.info("Trying to connect to AMQP server in %d milliseconds [%s]" + , connectionParameters.initReconnectDelay, self.id) + reconnectionTimer.schedule(new TimerTask() { + override def run = { + notifyCallback(Reconnecting) + self ! Connect + } + }, connectionParameters.initReconnectDelay) + } + } + + private def disconnect = { + try { + connection.foreach(_.close) + log.debug("Disconnected AMQP connection at %s:%s [%s]", host, port, self.id) + notifyCallback(Disconnected) + } catch { + case e: IOException => log.error("Could not close AMQP connection %s:%s [%s]", host, port, self.id) + case _ => () + } + connection = None + } + + private def notifyCallback(message: AMQPMessage) = { + connectionCallback.foreach(cb => if (cb.isRunning) cb ! message) + } + + override def shutdown = { + reconnectionTimer.cancel + // make sure shutdown is called on all linked actors so they can do channel cleanup before connection is killed + self.linkedActorsAsList.foreach(_.stop) + disconnect + } + + override def preRestart(reason: Throwable) = disconnect + + override def postRestart(reason: Throwable) = { + notifyCallback(Reconnecting) + connect + } + +} diff --git a/akka-amqp/src/main/scala/ProducerActor.scala b/akka-amqp/src/main/scala/ProducerActor.scala new file mode 100644 index 0000000000..3e703dcfec --- /dev/null +++ b/akka-amqp/src/main/scala/ProducerActor.scala @@ -0,0 +1,54 @@ +package se.scalablesolutions.akka.amqp + +import com.rabbitmq.client._ +import se.scalablesolutions.akka.amqp.AMQP.ProducerParameters + +private[amqp] class ProducerActor(producerParameters: ProducerParameters) extends FaultTolerantChannelActor(producerParameters.channelParameters) { + import producerParameters._ + import channelParameters._ + + producerId.foreach(id => self.id = id) + + def specificMessageHandler = { + + case message@Message(payload, routingKey, mandatory, immediate, properties) if channel.isDefined => { + log.debug("Sending message [%s]", message) + channel.foreach(_.basicPublish(exchangeName, routingKey, mandatory, immediate, properties.getOrElse(null), payload)) + } + case message@Message(payload, routingKey, mandatory, immediate, properties) => { + log.warning("Unable to send message [%s]", message) + // FIXME: If channel is not available, messages should be queued back into the actor mailbox and actor should only react on 'Start' + } + } + + protected def setupChannel(ch: Channel) { + returnListener match { + case Some(listener) => ch.setReturnListener(listener) + case None => ch.setReturnListener(new ReturnListener() { + def handleBasicReturn( + replyCode: Int, + replyText: String, + exchange: String, + routingKey: String, + properties: com.rabbitmq.client.AMQP.BasicProperties, + body: Array[Byte]) = { + throw new MessageNotDeliveredException( + "Could not deliver message [" + body + + "] with reply code [" + replyCode + + "] with reply text [" + replyText + + "] and routing key [" + routingKey + + "] to exchange [" + exchange + "]", + replyCode, replyText, exchange, routingKey, properties, body) + } + }) + } + } + + override def toString(): String = + "AMQP.Poducer[id= "+ self.id + + ", exchange=" + exchangeName + + ", exchangeType=" + exchangeType + + ", durable=" + exchangeDurable + + ", autoDelete=" + exchangeAutoDelete + "]" +} + diff --git a/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala new file mode 100644 index 0000000000..527c5aa55a --- /dev/null +++ b/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala @@ -0,0 +1,47 @@ +package se.scalablesolutions.akka.amqp.test + +import se.scalablesolutions.akka.util.Logging +import org.scalatest.junit.JUnitSuite +import org.junit.Test +import java.util.concurrent.TimeUnit +import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import org.multiverse.api.latches.StandardLatch +import com.rabbitmq.client.ShutdownSignalException +import se.scalablesolutions.akka.amqp._ +import se.scalablesolutions.akka.amqp.AMQP.ConnectionParameters +import org.scalatest.matchers.MustMatchers + +class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { + +// @Test + def connectionAndRecovery = { + val connectedLatch = new StandardLatch + val reconnectingLatch = new StandardLatch + val reconnectedLatch = new StandardLatch + val disconnectedLatch = new StandardLatch + + val connectionCallback: ActorRef = Actor.actor({ + case Connected => + if (!connectedLatch.isOpen) { + connectedLatch.open + } else { + reconnectedLatch.open + } + case Reconnecting => reconnectingLatch.open + case Disconnected => disconnectedLatch.open + }) + + val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50, connectionCallback = Some(connectionCallback))) + try { + connectedLatch.tryAwait(2, TimeUnit.SECONDS) must be(true) + + connection ! new ConnectionShutdown(new ShutdownSignalException(true, false, "TestException", "TestRef")) + reconnectingLatch.tryAwait(2, TimeUnit.SECONDS) must be(true) + reconnectedLatch.tryAwait(2, TimeUnit.SECONDS) must be(true) + + } finally { + connection.stop + disconnectedLatch.tryAwait(2, TimeUnit.SECONDS) must be(true) + } + } +} \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala new file mode 100644 index 0000000000..b0c845242b --- /dev/null +++ b/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala @@ -0,0 +1,58 @@ +package se.scalablesolutions.akka.amqp.test + +import se.scalablesolutions.akka.util.Logging +import org.scalatest.junit.JUnitSuite +import se.scalablesolutions.akka.actor.Actor._ +import org.multiverse.api.latches.StandardLatch +import com.rabbitmq.client.ShutdownSignalException +import se.scalablesolutions.akka.amqp._ +import org.scalatest.matchers.MustMatchers +import se.scalablesolutions.akka.amqp.AMQP.{ConsumerParameters, ChannelParameters, ProducerParameters, ConnectionParameters} +import java.util.concurrent.TimeUnit +import se.scalablesolutions.akka.actor.ActorRef +import org.junit.Test + +class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging { + +// @Test + def consumerChannelRecovery = { + + val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) + try { + val producer = AMQP.newProducer(connection, ProducerParameters( + ChannelParameters("text_exchange", ExchangeType.Direct))) + + val consumerStartedLatch = new StandardLatch + val consumerRestartedLatch = new StandardLatch + val consumerChannelCallback: ActorRef = actor { + case Started => { + if (!consumerStartedLatch.isOpen) { + consumerStartedLatch.open + } else { + consumerRestartedLatch.open + } + } + case Restarting => () + case Stopped => () + } + + val payloadLatch = new StandardLatch + val consumerChannelParameters = ChannelParameters("text_exchange", ExchangeType.Direct, channelCallback = Some(consumerChannelCallback)) + val consumer = AMQP.newConsumer(connection, ConsumerParameters(consumerChannelParameters, "non.interesting.routing.key", actor { + case Delivery(payload, _, _, _, _) => payloadLatch.open + })) + consumerStartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + + val listenerLatch = new StandardLatch + + consumer ! new ChannelShutdown(new ShutdownSignalException(false, false, "TestException", "TestRef")) + + consumerRestartedLatch.tryAwait(4, TimeUnit.SECONDS) must be (true) + + producer ! Message("some_payload".getBytes, "non.interesting.routing.key") + payloadLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + } finally { + connection.stop + } + } +} \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala new file mode 100644 index 0000000000..e1f483a237 --- /dev/null +++ b/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala @@ -0,0 +1,76 @@ +package se.scalablesolutions.akka.amqp.test + +import se.scalablesolutions.akka.util.Logging +import org.scalatest.junit.JUnitSuite +import se.scalablesolutions.akka.actor.Actor._ +import org.multiverse.api.latches.StandardLatch +import com.rabbitmq.client.ShutdownSignalException +import se.scalablesolutions.akka.amqp._ +import org.scalatest.matchers.MustMatchers +import se.scalablesolutions.akka.amqp.AMQP.{ConsumerParameters, ChannelParameters, ProducerParameters, ConnectionParameters} +import java.util.concurrent.TimeUnit +import se.scalablesolutions.akka.actor.ActorRef +import org.junit.Test + +class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { + +// @Test + def consumerConnectionRecovery = { + + val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) + try { + val producerStartedLatch = new StandardLatch + val producerRestartedLatch = new StandardLatch + val producerChannelCallback: ActorRef = actor { + case Started => { + if (!producerStartedLatch.isOpen) { + producerStartedLatch.open + } else { + producerRestartedLatch.open + } + } + case Restarting => () + case Stopped => () + } + + val producer = AMQP.newProducer(connection, ProducerParameters( + ChannelParameters("text_exchange", ExchangeType.Direct, channelCallback = Some(producerChannelCallback)))) + producerStartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + + + val consumerStartedLatch = new StandardLatch + val consumerRestartedLatch = new StandardLatch + val consumerChannelCallback: ActorRef = actor { + case Started => { + if (!consumerStartedLatch.isOpen) { + consumerStartedLatch.open + } else { + consumerRestartedLatch.open + } + } + case Restarting => () + case Stopped => () + } + + + val payloadLatch = new StandardLatch + val consumerChannelParameters = ChannelParameters("text_exchange", ExchangeType.Direct, channelCallback = Some(consumerChannelCallback)) + val consumer = AMQP.newConsumer(connection, ConsumerParameters(consumerChannelParameters, "non.interesting.routing.key", actor { + case Delivery(payload, _, _, _, _) => payloadLatch.open + })) + consumerStartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + + val listenerLatch = new StandardLatch + + connection ! new ConnectionShutdown(new ShutdownSignalException(true, false, "TestException", "TestRef")) + + producerRestartedLatch.tryAwait(4, TimeUnit.SECONDS) must be (true) + consumerRestartedLatch.tryAwait(4, TimeUnit.SECONDS) must be (true) + + producer ! Message("some_payload".getBytes, "non.interesting.routing.key") + payloadLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + } finally { + connection.stop + } + } +} \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala b/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala new file mode 100644 index 0000000000..d5aabf7cd5 --- /dev/null +++ b/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala @@ -0,0 +1,47 @@ +package se.scalablesolutions.akka.amqp.test + +import se.scalablesolutions.akka.util.Logging +import org.scalatest.junit.JUnitSuite +import se.scalablesolutions.akka.amqp.AMQP.{ConsumerParameters, ChannelParameters, ProducerParameters} +import org.multiverse.api.latches.StandardLatch +import se.scalablesolutions.akka.actor.Actor._ +import org.scalatest.matchers.MustMatchers +import se.scalablesolutions.akka.amqp._ +import org.junit.{After, Test} +import se.scalablesolutions.akka.actor.{ActorRegistry, ActorRef} +import java.util.concurrent.{CountDownLatch, TimeUnit} + +class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers with Logging { + +// @Test + def consumerMessageManualAcknowledge = { + val connection = AMQP.newConnection() + try { + val countDown = new CountDownLatch(2) + val channelCallback = actor { + case Started => countDown.countDown + case Restarting => () + case Stopped => () + } + val channelParameters = ChannelParameters("text_exchange",ExchangeType.Direct, channelCallback = Some(channelCallback)) + + val acknowledgeLatch = new StandardLatch + var deliveryTagCheck: Long = -1 + val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "manual.ack.this", actor { + case Delivery(payload, _, deliveryTag, _, sender) => { + deliveryTagCheck = deliveryTag + sender.foreach(_ ! Acknowledge(deliveryTag)) + } + case Acknowledged(deliveryTag) => if (deliveryTagCheck == deliveryTag) acknowledgeLatch.open + }, selfAcknowledging = false)) + + val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters)) + countDown.await(2, TimeUnit.SECONDS) must be (true) + producer ! Message("some_payload".getBytes, "manual.ack.this") + + acknowledgeLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + } finally { + connection.stop + } + } +} \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala b/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala new file mode 100644 index 0000000000..739f731f4e --- /dev/null +++ b/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala @@ -0,0 +1,42 @@ +package se.scalablesolutions.akka.amqp.test + +import se.scalablesolutions.akka.util.Logging +import org.scalatest.junit.JUnitSuite +import org.junit.Test +import se.scalablesolutions.akka.amqp._ +import se.scalablesolutions.akka.amqp.AMQP.{ConsumerParameters, ChannelParameters, ProducerParameters} +import org.multiverse.api.latches.StandardLatch +import se.scalablesolutions.akka.actor.Actor._ +import org.scalatest.matchers.MustMatchers +import java.util.concurrent.{CountDownLatch, TimeUnit} + +class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers with Logging { + +// @Test + def consumerMessage = { + val connection = AMQP.newConnection() + try { + + val countDown = new CountDownLatch(2) + val channelCallback = actor { + case Started => countDown.countDown + case Restarting => () + case Stopped => () + } + + val channelParameters = ChannelParameters("text_exchange",ExchangeType.Direct, channelCallback = Some(channelCallback)) + + val payloadLatch = new StandardLatch + val consumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "non.interesting.routing.key", actor { + case Delivery(payload, _, _, _, _) => payloadLatch.open + })) + + val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters)) + countDown.await(2, TimeUnit.SECONDS) must be (true) + producer ! Message("some_payload".getBytes, "non.interesting.routing.key") + payloadLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + } finally { + connection.stop + } + } +} \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala new file mode 100644 index 0000000000..d33f0914ba --- /dev/null +++ b/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala @@ -0,0 +1,52 @@ +package se.scalablesolutions.akka.amqp.test + +import se.scalablesolutions.akka.util.Logging +import org.scalatest.junit.JUnitSuite +import org.junit.Test +import junit.framework.Assert +import java.util.concurrent.TimeUnit +import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import org.multiverse.api.latches.StandardLatch +import com.rabbitmq.client.ShutdownSignalException +import se.scalablesolutions.akka.amqp._ +import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ProducerParameters, ConnectionParameters} +import org.scalatest.matchers.MustMatchers + +class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging { + +// @Test + def producerChannelRecovery = { + + val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) + + try { + val startedLatch = new StandardLatch + val restartingLatch = new StandardLatch + val restartedLatch = new StandardLatch + + val producerCallback: ActorRef = Actor.actor({ + case Started => { + if (!startedLatch.isOpen) { + startedLatch.open + } else { + restartedLatch.open + } + } + case Restarting => restartingLatch.open + case Stopped => () + }) + + val producerParameters = ProducerParameters( + ChannelParameters("text_exchange", ExchangeType.Direct, channelCallback = Some(producerCallback))) + + val producer = AMQP.newProducer(connection, producerParameters) + startedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + + producer ! new ChannelShutdown(new ShutdownSignalException(false, false, "TestException", "TestRef")) + restartingLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + restartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + } finally { + connection.stop + } + } +} \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala new file mode 100644 index 0000000000..f5e134682b --- /dev/null +++ b/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala @@ -0,0 +1,50 @@ +package se.scalablesolutions.akka.amqp.test + +import se.scalablesolutions.akka.util.Logging +import org.scalatest.junit.JUnitSuite +import org.junit.Test +import java.util.concurrent.TimeUnit +import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import org.multiverse.api.latches.StandardLatch +import com.rabbitmq.client.ShutdownSignalException +import se.scalablesolutions.akka.amqp._ +import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ProducerParameters, ConnectionParameters} +import org.scalatest.matchers.MustMatchers + +class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { + +// @Test + def producerConnectionRecovery = { + + val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) + try { + val startedLatch = new StandardLatch + val restartingLatch = new StandardLatch + val restartedLatch = new StandardLatch + + val producerCallback: ActorRef = Actor.actor({ + case Started => { + if (!startedLatch.isOpen) { + startedLatch.open + } else { + restartedLatch.open + } + } + case Restarting => restartingLatch.open + case Stopped => () + }) + + val producerParameters = ProducerParameters( + ChannelParameters("text_exchange", ExchangeType.Direct, channelCallback = Some(producerCallback))) + + val producer = AMQP.newProducer(connection, producerParameters) + startedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + + connection ! new ConnectionShutdown(new ShutdownSignalException(true, false, "TestException", "TestRef")) + restartingLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + restartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + } finally { + connection.stop + } + } +} \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala b/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala new file mode 100644 index 0000000000..95ee80d5a7 --- /dev/null +++ b/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala @@ -0,0 +1,41 @@ +package se.scalablesolutions.akka.amqp.test + +import se.scalablesolutions.akka.util.Logging +import org.scalatest.junit.JUnitSuite +import org.junit.Test +import java.util.concurrent.TimeUnit +import se.scalablesolutions.akka.actor.ActorRef +import org.multiverse.api.latches.StandardLatch +import se.scalablesolutions.akka.amqp._ +import com.rabbitmq.client.ReturnListener +import com.rabbitmq.client.AMQP.BasicProperties +import java.lang.String +import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ProducerParameters} +import org.scalatest.matchers.MustMatchers + +class AMQPProducerMessageTest extends JUnitSuite with MustMatchers with Logging { + +// @Test + def producerMessage = { + + val connection: ActorRef = AMQP.newConnection() + try { + val returnLatch = new StandardLatch + val returnListener = new ReturnListener { + def handleBasicReturn(replyCode: Int, replyText: String, exchange: String, routingKey: String, properties: BasicProperties, body: Array[Byte]) = { + returnLatch.open + } + } + val producerParameters = ProducerParameters( + ChannelParameters("text_exchange", ExchangeType.Direct), + returnListener = Some(returnListener)) + + val producer = AMQP.newProducer(connection, producerParameters) + + producer ! new Message("some_payload".getBytes, "non.interesing.routing.key", mandatory = true) + returnLatch.tryAwait(2, TimeUnit.SECONDS) must be(true) + } finally { + connection.stop + } + } +} \ No newline at end of file diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 4d99b7e491..d41f453260 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -205,7 +205,12 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { class AkkaAMQPProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with CodeFellowPlugin { val commons_io = "commons-io" % "commons-io" % "1.4" % "compile" - val rabbit = "com.rabbitmq" % "amqp-client" % "1.7.2" % "compile" + val rabbit = "com.rabbitmq" % "amqp-client" % "1.8.0" % "compile" + + // testing + val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "test" intransitive() + val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test" + val junit = "junit" % "junit" % "4.5" % "test" } class AkkaHttpProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with CodeFellowPlugin {