Merge branch 'master' of git@github.com:jboner/akka

This commit is contained in:
Jonas Bonér 2010-07-02 13:11:54 +02:00
commit 5d3d5a2d37
17 changed files with 1036 additions and 611 deletions

View file

@ -1,86 +1,83 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.amqp
import com.rabbitmq.client.{AMQP => RabbitMQ, _}
import com.rabbitmq.client.ConnectionFactory
import se.scalablesolutions.akka.actor.{Actor, ActorRef, IllegalActorStateException}
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.OneForOneStrategy
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.{HashCode, Logging}
import scala.collection.mutable.HashMap
import java.util.concurrent.ConcurrentHashMap
import java.util.{Timer, TimerTask}
import java.io.IOException
import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory}
import java.lang.IllegalArgumentException
/**
* AMQP Actor API. Implements Producer and Consumer materialized as Actors.
* AMQP Actor API. Implements Connection, Producer and Consumer materialized as Actors.
*
* <pre>
* val params = new ConnectionParameters
* params.setUsername("barack")
* params.setPassword("obama")
* params.setVirtualHost("/")
* params.setRequestedHeartbeat(0)
* val consumer = AMQP.newConsumer(params, hostname, port, exchange, ExchangeType.Direct, Serializer.ScalaJSON, None, 100)
* @see se.scalablesolutions.akka.amqp.ExampleSession
*
* consumer ! MessageConsumerListener(queue, routingKey, actor {
* case Message(payload, _, _, _, _) => log.debug("Received message: %s", payload)
* })
*
* val producer = AMQP.newProducer(params, hostname, port, exchange, Serializer.ScalaJSON, None, None, 100)
* producer ! Message("Hi", routingKey)
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
* @author Irmo Manie
*/
object AMQP {
case class ConnectionParameters(
host: String = ConnectionFactory.DEFAULT_HOST,
port: Int = ConnectionFactory.DEFAULT_AMQP_PORT,
username: String = ConnectionFactory.DEFAULT_USER,
password: String = ConnectionFactory.DEFAULT_PASS,
virtualHost: String = ConnectionFactory.DEFAULT_VHOST,
initReconnectDelay: Long = 5000,
connectionCallback: Option[ActorRef] = None)
case class ChannelParameters(
exchangeName: String,
exchangeType: ExchangeType,
exchangeDurable: Boolean = false,
exchangeAutoDelete: Boolean = true,
exchangePassive: Boolean = false,
shutdownListener: Option[ShutdownListener] = None,
configurationArguments: Map[String, AnyRef] = Map(),
channelCallback: Option[ActorRef] = None)
case class ProducerParameters(channelParameters: ChannelParameters,
producerId: Option[String] = None,
returnListener: Option[ReturnListener] = None)
case class ConsumerParameters(channelParameters: ChannelParameters,
routingKey: String,
deliveryHandler: ActorRef,
queueName: Option[String] = None,
queueDurable: Boolean = false,
queueAutoDelete: Boolean = true,
queuePassive: Boolean = false,
queueExclusive: Boolean = false,
selfAcknowledging: Boolean = true) {
if (queueDurable && queueName.isEmpty) {
throw new IllegalArgumentException("A queue name is required when requesting a durable queue.")
}
}
def newConnection(connectionParameters: ConnectionParameters = new ConnectionParameters): ActorRef = {
val connection: ActorRef = supervisor.newConnection(connectionParameters)
connection ! Connect
connection
}
def newProducer(connection: ActorRef, producerParameters: ProducerParameters): ActorRef = {
val producer: ActorRef = Actor.actorOf(new ProducerActor(producerParameters))
connection.startLink(producer)
producer ! Start
producer
}
def newConsumer(connection: ActorRef, consumerParameters: ConsumerParameters): ActorRef = {
val consumer: ActorRef = actorOf(new ConsumerActor(consumerParameters))
connection.startLink(consumer)
consumer ! Start
consumer
}
private val supervisor = new AMQPSupervisor
def newProducer(
config: ConnectionParameters,
hostname: String,
port: Int,
exchangeName: String,
returnListener: Option[ReturnListener],
shutdownListener: Option[ShutdownListener],
initReconnectDelay: Long): ActorRef =
supervisor.newProducer(
config, hostname, port, exchangeName, returnListener, shutdownListener, initReconnectDelay)
def newConsumer(
config: ConnectionParameters,
hostname: String,
port: Int,
exchangeName: String,
exchangeType: ExchangeType,
shutdownListener: Option[ShutdownListener],
initReconnectDelay: Long,
passive: Boolean,
durable: Boolean,
autoDelete: Boolean,
configurationArguments: Map[String, AnyRef]): ActorRef =
supervisor.newConsumer(
config, hostname, port, exchangeName, exchangeType,
shutdownListener, initReconnectDelay,
passive, durable, autoDelete, configurationArguments)
def stopConnection(connection: ActorRef) = supervisor.stopConnection(connection)
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class AMQPSupervisor extends Logging {
class AMQPSupervisorActor extends Actor {
import scala.collection.JavaConversions._
import self._
faultHandler = Some(OneForOneStrategy(5, 5000))
@ -91,523 +88,12 @@ object AMQP {
}
}
import scala.collection.JavaConversions._
private val supervisor = actorOf(new AMQPSupervisorActor).start
private val connections = new ConcurrentHashMap[ActorRef, ActorRef]
def newProducer(
config: ConnectionParameters,
hostname: String,
port: Int,
exchangeName: String,
returnListener: Option[ReturnListener],
shutdownListener: Option[ShutdownListener],
initReconnectDelay: Long): ActorRef = {
val producer = actorOf(new Producer(
new ConnectionFactory(config),
hostname, port,
exchangeName,
returnListener,
shutdownListener,
initReconnectDelay))
supervisor.startLink(producer)
producer
def newConnection(connectionParameters: ConnectionParameters): ActorRef = {
val connectionActor = actorOf(new FaultTolerantConnectionActor(connectionParameters))
supervisor.startLink(connectionActor)
connectionActor
}
def newConsumer(
config: ConnectionParameters,
hostname: String,
port: Int,
exchangeName: String,
exchangeType: ExchangeType,
shutdownListener: Option[ShutdownListener],
initReconnectDelay: Long,
passive: Boolean,
durable: Boolean,
autoDelete: Boolean,
configurationArguments: Map[String, AnyRef]): ActorRef = {
val consumer = actorOf(new Consumer(
new ConnectionFactory(config),
hostname, port,
exchangeName,
exchangeType,
shutdownListener,
initReconnectDelay,
passive,
durable,
autoDelete,
configurationArguments))
supervisor.startLink(consumer)
consumer
}
def stopConnection(connection: ActorRef) = {
connection ! Stop
supervisor.unlink(connection)
connections.remove(connection)
}
def shutdown = {
asMap(connections).valuesIterator.foreach(_ ! Stop)
exit
}
}
sealed trait AMQPMessage
private[akka] trait InternalAMQPMessage extends AMQPMessage
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class Message(val payload: Array[Byte],
val routingKey: String,
val mandatory: Boolean,
val immediate: Boolean,
val properties: RabbitMQ.BasicProperties) extends AMQPMessage {
override def toString(): String =
"Message[payload=" + payload +
", routingKey=" + routingKey +
", mandatory=" + mandatory +
", immediate=" + immediate +
", properties=" + properties + "]"
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Message {
def unapply(message: Message): Option[Tuple5[AnyRef, String, Boolean, Boolean, RabbitMQ.BasicProperties]] =
Some((message.payload, message.routingKey, message.mandatory, message.immediate, message.properties))
def apply(payload: Array[Byte], routingKey: String, mandatory: Boolean, immediate: Boolean, properties: RabbitMQ.BasicProperties): Message =
new Message(payload, routingKey, mandatory, immediate, properties)
def apply(payload: Array[Byte], routingKey: String): Message =
new Message(payload, routingKey, false, false, null)
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class MessageConsumerListener(val queueName: String,
val routingKey: String,
val exclusive: Boolean,
val autoDelete: Boolean,
val isUsingExistingQueue: Boolean,
val actor: ActorRef) extends AMQPMessage {
/**
* Creates a non-exclusive, non-autodelete message listener.
*/
def this(queueName: String, routingKey: String, actor: ActorRef) = this (queueName, routingKey, false, false, false, actor)
private[akka] var tag: Option[String] = None
override def toString() =
"MessageConsumerListener[actor=" + actor +
", queue=" + queueName +
", routingKey=" + routingKey +
", tag=" + tag +
", exclusive=" + exclusive +
", autoDelete=" + autoDelete +
", isUsingExistingQueue=" + isUsingExistingQueue + "]"
def toString(exchangeName: String) =
"MessageConsumerListener[actor=" + actor +
", exchange=" + exchangeName +
", queue=" + queueName +
", routingKey=" + routingKey +
", tag=" + tag +
", exclusive=" + exclusive +
", autoDelete=" + autoDelete +
", isUsingExistingQueue=" + isUsingExistingQueue + "]"
/**
* Hash code should only be based on on queue name and routing key.
*/
override def hashCode(): Int = synchronized {
var result = HashCode.SEED
result = HashCode.hash(result, queueName)
result = HashCode.hash(result, routingKey)
result
}
/**
* Equality should only be defined in terms of queue name and routing key.
*/
override def equals(that: Any): Boolean = synchronized {
that != null &&
that.isInstanceOf[MessageConsumerListener] &&
that.asInstanceOf[MessageConsumerListener].queueName == queueName &&
that.asInstanceOf[MessageConsumerListener].routingKey == routingKey
}
}
object MessageConsumerListener {
def apply(queueName: String,
routingKey: String,
exclusive: Boolean,
autoDelete: Boolean,
isUsingExistingQueue: Boolean,
actor: ActorRef) =
new MessageConsumerListener(queueName, routingKey, exclusive, autoDelete, isUsingExistingQueue, actor)
def apply(queueName: String,
routingKey: String,
actor: ActorRef) =
new MessageConsumerListener(queueName, routingKey, false, false, false, actor)
}
case object Stop extends AMQPMessage
case class UnregisterMessageConsumerListener(consumer: MessageConsumerListener) extends InternalAMQPMessage
private[akka] case class Reconnect(delay: Long) extends InternalAMQPMessage
private[akka] case class Failure(cause: Throwable) extends InternalAMQPMessage
private[akka] class MessageNotDeliveredException(
val message: String,
val replyCode: Int,
val replyText: String,
val exchange: String,
val routingKey: String,
val properties: RabbitMQ.BasicProperties,
val body: Array[Byte]) extends RuntimeException(message)
sealed trait ExchangeType
object ExchangeType {
case object Direct extends ExchangeType {
override def toString = "direct"
}
case object Topic extends ExchangeType {
override def toString = "topic"
}
case object Fanout extends ExchangeType {
override def toString = "fanout"
}
case object Match extends ExchangeType {
override def toString = "match"
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class Producer private[amqp](
val connectionFactory: ConnectionFactory,
val hostname: String,
val port: Int,
val exchangeName: String,
val returnListener: Option[ReturnListener],
val shutdownListener: Option[ShutdownListener],
val initReconnectDelay: Long)
extends FaultTolerantConnectionActor {
setupChannel
log.info("AMQP.Producer [%s] is started", toString)
def newRpcClient(routingKey: String): RpcClient = new RpcClient(channel, exchangeName, routingKey)
def receive = {
case message@Message(payload, routingKey, mandatory, immediate, properties) =>
log.debug("Sending message [%s]", message)
channel.basicPublish(
exchangeName, routingKey, mandatory, immediate, properties, payload.asInstanceOf[Array[Byte]])
case Stop =>
disconnect
exit
}
protected def setupChannel = {
connection = connectionFactory.newConnection(hostname, port)
channel = connection.createChannel
returnListener match {
case Some(listener) => channel.setReturnListener(listener)
case None => channel.setReturnListener(new ReturnListener() {
def handleBasicReturn(
replyCode: Int,
replyText: String,
exchange: String,
routingKey: String,
properties: RabbitMQ.BasicProperties,
body: Array[Byte]) = {
throw new MessageNotDeliveredException(
"Could not deliver message [" + body +
"] with reply code [" + replyCode +
"] with reply text [" + replyText +
"] and routing key [" + routingKey +
"] to exchange [" + exchange + "]",
replyCode, replyText, exchange, routingKey, properties, body)
}
})
}
if (shutdownListener.isDefined) connection.addShutdownListener(shutdownListener.get)
}
override def toString(): String =
"AMQP.Producer[hostname=" + hostname +
", port=" + port +
", exchange=" + exchangeName + "]"
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class Consumer private[amqp](
val connectionFactory: ConnectionFactory,
val hostname: String,
val port: Int,
val exchangeName: String,
val exchangeType: ExchangeType,
val shutdownListener: Option[ShutdownListener],
val initReconnectDelay: Long,
val passive: Boolean,
val durable: Boolean,
val autoDelete: Boolean,
val configurationArguments: Map[java.lang.String, Object])
extends FaultTolerantConnectionActor {
import scala.collection.JavaConversions._
import self._
faultHandler = Some(OneForOneStrategy(5, 5000))
trapExit = List(classOf[Throwable])
//FIXME use better strategy to convert scala.immutable.Map to java.util.Map
private val jConfigMap = configurationArguments.foldLeft(new java.util.HashMap[String,Object]){ (m,kv) => { m.put(kv._1,kv._2); m } }
private val listeners = new HashMap[MessageConsumerListener, MessageConsumerListener]
setupChannel
log.info("AMQP.Consumer [%s] is started", toString)
def newRpcServerWithCallback(body: (Array[Byte], RabbitMQ.BasicProperties) => Array[Byte]): RpcServer = {
new RpcServer(channel) {
override def handleCall(requestBody: Array[Byte], replyProperties: RabbitMQ.BasicProperties) = {
body(requestBody, replyProperties)
}
}
}
def receive = {
case listener: MessageConsumerListener =>
startLink(listener.actor)
registerListener(listener)
log.info("Message consumer listener is registered [%s]", listener)
case UnregisterMessageConsumerListener(listener) =>
unregisterListener(listener)
case Reconnect(delay) =>
reconnect(delay)
case Failure(cause) =>
log.error(cause, "Error in AMQP consumer")
throw cause
case Stop =>
listeners.iterator.toList.map(_._2).foreach(unregisterListener(_))
disconnect
exit
case message: Message =>
handleIllegalMessage(
"AMQP.Consumer [" + this + "] can't be used to send messages, ignoring message [" + message + "]")
case unknown =>
handleIllegalMessage(
"Unknown message [" + unknown + "] to AMQP Consumer [" + this + "]")
}
protected def setupChannel = {
connection = connectionFactory.newConnection(hostname, port)
channel = connection.createChannel
channel.exchangeDeclare(exchangeName.toString, exchangeType.toString, passive, durable, autoDelete, jConfigMap)
listeners.iterator.toList.map(_._2).foreach(registerListener)
if (shutdownListener.isDefined) connection.addShutdownListener(shutdownListener.get)
}
private def registerListener(listener: MessageConsumerListener) = {
log.debug("Register MessageConsumerListener %s", listener.toString(exchangeName))
listeners.put(listener, listener)
if (!listener.isUsingExistingQueue) {
log.debug("Declaring new queue for MessageConsumerListener [%s]", listener.queueName)
channel.queueDeclare(
listener.queueName,
passive, durable,
listener.exclusive, listener.autoDelete,
jConfigMap)
}
log.debug("Binding new queue for MessageConsumerListener [%s]", listener.queueName)
channel.queueBind(listener.queueName, exchangeName, listener.routingKey)
val listenerTag = channel.basicConsume(listener.queueName, true, new DefaultConsumer(channel) with Logging {
override def handleDelivery(tag: String,
envelope: Envelope,
properties: RabbitMQ.BasicProperties,
payload: Array[Byte]) {
try {
val mandatory = false // FIXME: where to find out if it's mandatory?
val immediate = false // FIXME: where to find out if it's immediate?
log.debug("Passing a message on to the MessageConsumerListener [%s]", listener.toString(exchangeName))
listener.actor ! Message(payload, envelope.getRoutingKey, mandatory, immediate, properties)
val deliveryTag = envelope.getDeliveryTag
log.debug("Acking message with delivery tag [%s]", deliveryTag)
channel.basicAck(deliveryTag, false)
} catch {
case cause =>
log.error(
cause, "Delivery of message to MessageConsumerListener [%s] failed",
listener.toString(exchangeName))
self ! Failure(cause) // pass on and re-throw exception in consumer actor to trigger restart and reconnect
}
}
override def handleShutdownSignal(listenerTag: String, signal: ShutdownSignalException) = {
def hasTag(listener: MessageConsumerListener, listenerTag: String): Boolean = {
if (listener.tag.isEmpty) throw new IllegalActorStateException(
"MessageConsumerListener [" + listener + "] does not have a tag")
listener.tag.get == listenerTag
}
listeners.iterator.toList.map(_._2).find(hasTag(_, listenerTag)) match {
case None => log.error(
"Could not find message listener for tag [%s]; can't shut listener down", listenerTag)
case Some(listener) =>
log.warning(
"MessageConsumerListener [%s] is being shutdown by [%s] due to [%s]",
listener.toString(exchangeName), signal.getReference, signal.getReason)
self ! UnregisterMessageConsumerListener(listener)
}
}
})
listener.tag = Some(listenerTag)
}
private def unregisterListener(listener: MessageConsumerListener) = {
listeners.get(listener) match {
case None => log.warning(
"Can't unregister message consumer listener [%s]; no such listener",
listener.toString(exchangeName))
case Some(listener) =>
listeners -= listener
listener.tag match {
case None => log.warning(
"Can't unregister message consumer listener [%s]; no listener tag",
listener.toString(exchangeName))
case Some(tag) =>
channel.basicCancel(tag)
unlink(listener.actor)
listener.actor.stop
log.debug("Message consumer is cancelled and shut down [%s]", listener)
}
}
}
private def handleIllegalMessage(errorMessage: String) = {
log.error(errorMessage)
throw new IllegalArgumentException(errorMessage)
}
override def toString(): String =
"AMQP.Consumer[hostname=" + hostname +
", port=" + port +
", exchange=" + exchangeName +
", type=" + exchangeType +
", passive=" + passive +
", durable=" + durable + "]"
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait FaultTolerantConnectionActor extends Actor with Logging {
val reconnectionTimer = new Timer
var connection: Connection = _
var channel: Channel = _
val hostname: String
val port: Int
val initReconnectDelay: Long
val exchangeName: String
val connectionFactory: ConnectionFactory
protected def setupChannel
def createQueue: String =
channel.queueDeclare("", false, false, true, true, null).getQueue
def createQueue(name: String) =
channel.queueDeclare(name, false, false, true, true, null).getQueue
def createQueue(name: String, durable: Boolean) =
channel.queueDeclare(name, false, durable, true, true, null).getQueue
def createQueue(name: String, passive: Boolean, durable: Boolean, exclusive: Boolean, autoDelete: Boolean) =
channel.queueDeclare(name, passive, durable, exclusive, autoDelete, null).getQueue
def createQueue(name: String, passive: Boolean, durable: Boolean, exclusive: Boolean, autoDelete: Boolean, arguments: java.util.Map[String, AnyRef]) =
channel.queueDeclare(name, passive, durable, exclusive, autoDelete, arguments).getQueue
def bindQueue(name: String) {
channel.queueBind(name, exchangeName, name)
}
def createBindQueue: String = {
val name = createQueue
channel.queueBind(name, exchangeName, name)
name
}
def createBindQueue(name: String) {
createQueue(name)
channel.queueBind(name, exchangeName, name)
}
def createBindQueue(name: String, durable: Boolean) {
channel.queueDeclare(name, durable)
channel.queueBind(name, exchangeName, name)
}
def deleteQueue(name: String) {channel.queueDelete(name)}
protected def disconnect = {
try {
channel.close
} catch {
case e: IOException => log.error("Could not close AMQP channel %s:%s [%s]", hostname, port, this)
case _ => ()
}
try {
connection.close
log.debug("Disconnected AMQP connection at %s:%s [%s]", hostname, port, this)
} catch {
case e: IOException => log.error("Could not close AMQP connection %s:%s [%s]", hostname, port, this)
case _ => ()
}
}
protected def reconnect(delay: Long) = {
disconnect
try {
setupChannel
log.debug("Successfully reconnected to AMQP Server %s:%s [%s]", hostname, port, this)
} catch {
case e: Exception =>
val waitInMillis = delay * 2
val outerActorRef = self
log.debug("Trying to reconnect to AMQP server in %n milliseconds [%s]", waitInMillis, this)
reconnectionTimer.schedule(new TimerTask() {
override def run = outerActorRef ! Reconnect(waitInMillis)
}, delay)
}
}
override def preRestart(reason: Throwable) = disconnect
override def postRestart(reason: Throwable) = reconnect(initReconnectDelay)
}
}

View file

@ -0,0 +1,56 @@
package se.scalablesolutions.akka.amqp
import se.scalablesolutions.akka.actor.ActorRef
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client.ShutdownSignalException
sealed trait AMQPMessage
sealed trait InternalAMQPMessage extends AMQPMessage
case class Message(payload: Array[Byte],
routingKey: String,
mandatory: Boolean = false,
immediate: Boolean = false,
properties: Option[BasicProperties] = None) extends AMQPMessage
case class Delivery(payload: Array[Byte],
routingKey: String,
deliveryTag: Long,
properties: BasicProperties,
sender: Option[ActorRef]) extends AMQPMessage
// connection messages
case object Connect extends AMQPMessage
case object Connected extends AMQPMessage
case object Reconnecting extends AMQPMessage
case object Disconnected extends AMQPMessage
case object ChannelRequest extends InternalAMQPMessage
// channel messages
case object Start extends AMQPMessage
case object Started extends AMQPMessage
case object Restarting extends AMQPMessage
case object Stopped extends AMQPMessage
// delivery messages
case class Acknowledge(deliveryTag: Long) extends AMQPMessage
case class Acknowledged(deliveryTag: Long) extends AMQPMessage
// internal messages
private[akka] case class Failure(cause: Throwable) extends InternalAMQPMessage
private[akka] case class ConnectionShutdown(cause: ShutdownSignalException) extends InternalAMQPMessage
private[akka] case class ChannelShutdown(cause: ShutdownSignalException) extends InternalAMQPMessage
private[akka] class MessageNotDeliveredException(
val message: String,
val replyCode: Int,
val replyText: String,
val exchange: String,
val routingKey: String,
val properties: BasicProperties,
val body: Array[Byte]) extends RuntimeException(message)

View file

@ -0,0 +1,105 @@
package se.scalablesolutions.akka.amqp
import com.rabbitmq.client.AMQP.Queue.DeclareOk
import collection.JavaConversions
import se.scalablesolutions.akka.amqp.AMQP.ConsumerParameters
import se.scalablesolutions.akka.util.Logging
import com.rabbitmq.client.{Channel, Envelope, DefaultConsumer}
import com.rabbitmq.client.AMQP.BasicProperties
import java.lang.Throwable
private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) extends FaultTolerantChannelActor(consumerParameters.channelParameters) {
import consumerParameters._
import channelParameters._
var listenerTag: Option[String] = None
def specificMessageHandler = {
case Acknowledge(deliveryTag) => acknowledgeDeliveryTag(deliveryTag, true)
case message: Message =>
handleIllegalMessage("%s can't be used to send messages, ignoring message [%s]".format(this, message))
case unknown =>
handleIllegalMessage("Unknown message [%s] to %s".format(unknown, this))
}
protected def setupChannel(ch: Channel) = {
// todo make nicer
if (!self.linkedActorsAsList.contains(deliveryHandler)) {
self.startLink(deliveryHandler)
}
val queueDeclare: DeclareOk = {
queueName match {
case Some(name) =>
log.debug("Declaring new queue [%s] for %s", name, toString)
if (queuePassive) {
ch.queueDeclarePassive(name)
} else {
ch.queueDeclare(name, queueDurable, queueExclusive, queueAutoDelete, JavaConversions.asMap(configurationArguments))
}
case None =>
log.debug("Declaring new generated queue for %s", toString)
ch.queueDeclare
}
}
log.debug("Binding new queue [%s] for %s", queueDeclare.getQueue, toString)
ch.queueBind(queueDeclare.getQueue, exchangeName, routingKey)
val tag = ch.basicConsume(queueDeclare.getQueue, false, new DefaultConsumer(ch) with Logging {
override def handleDelivery(tag: String, envelope: Envelope, properties: BasicProperties, payload: Array[Byte]) {
try {
val deliveryTag = envelope.getDeliveryTag
log.debug("Passing a message on to %s", toString)
deliveryHandler ! Delivery(payload, envelope.getRoutingKey, envelope.getDeliveryTag, properties, someSelf)
if (selfAcknowledging) {
log.debug("Self acking...")
acknowledgeDeliveryTag(deliveryTag, false)
}
} catch {
case cause =>
log.error(cause, "Delivery of message to %s failed", toString)
self ! Failure(cause) // pass on and re-throw exception in consumer actor to trigger restart and connect
}
}
})
listenerTag = Some(tag)
log.info("Intitialized %s", toString)
}
private def acknowledgeDeliveryTag(deliveryTag: Long, remoteAcknowledgement: Boolean) = {
log.debug("Acking message with delivery tag [%s]", deliveryTag)
channel.foreach{ch =>
ch.basicAck(deliveryTag, false)
if (remoteAcknowledgement) {
deliveryHandler ! Acknowledged(deliveryTag)
}
}
}
private def handleIllegalMessage(errorMessage: String) = {
log.error(errorMessage)
throw new IllegalArgumentException(errorMessage)
}
override def preRestart(reason: Throwable) = {
listenerTag = None
super.preRestart(reason)
}
override def shutdown = {
listenerTag.foreach(tag => channel.foreach(_.basicCancel(tag)))
self.linkedActorsAsList.foreach(_.stop)
super.shutdown
}
override def toString(): String =
"AMQP.Consumer[id= "+ self.id +
", exchange=" + exchangeName +
", exchangeType=" + exchangeType +
", durable=" + exchangeDurable +
", autoDelete=" + exchangeAutoDelete + "]"
}

View file

@ -4,47 +4,119 @@
package se.scalablesolutions.akka.amqp
import se.scalablesolutions.akka.actor.Actor._
import com.rabbitmq.client.ConnectionParameters
import se.scalablesolutions.akka.actor.{Actor, ActorRegistry}
import Actor._
import se.scalablesolutions.akka.amqp.AMQP.{ConnectionParameters, ConsumerParameters, ChannelParameters, ProducerParameters}
import java.util.concurrent.{CountDownLatch, TimeUnit}
object ExampleSession {
import AMQP._
val CONFIG = new ConnectionParameters
val HOSTNAME = "localhost"
val PORT = 5672
val IM = "im.whitehouse.gov"
val CHAT = "chat.whitehouse.gov"
def main(args: Array[String]) = {
println("==== DIRECT ===")
direct
Thread.sleep(1000)
TimeUnit.SECONDS.sleep(2)
println("==== FANOUT ===")
fanout
TimeUnit.SECONDS.sleep(2)
println("==== TOPIC ===")
topic
TimeUnit.SECONDS.sleep(2)
println("==== CALLBACK ===")
callback
TimeUnit.SECONDS.sleep(2)
ActorRegistry.shutdownAll
System.exit(0)
}
def direct = {
val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, IM, ExchangeType.Direct, None, 100, false, false, false, Map[String, AnyRef]())
consumer ! MessageConsumerListener("@george_bush", "direct", actor {
case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
})
val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, IM, None, None, 100)
producer ! Message("@jonas_boner: You sucked!!".getBytes, "direct")
// defaults to amqp://guest:guest@localhost:5672/
val connection = AMQP.newConnection()
val channelParameters = ChannelParameters("my_direct_exchange", ExchangeType.Direct)
val consumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "some.routing", actor {
case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
}))
val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters))
producer ! Message("@jonas_boner: You sucked!!".getBytes, "some.routing")
}
def fanout = {
val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, CHAT, ExchangeType.Fanout, None, 100, false, false, false, Map[String, AnyRef]())
consumer ! MessageConsumerListener("@george_bush", "", actor {
case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
})
consumer ! MessageConsumerListener("@barack_obama", "", actor {
case Message(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
})
val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, CHAT, None, None, 100)
// defaults to amqp://guest:guest@localhost:5672/
val connection = AMQP.newConnection()
val channelParameters = ChannelParameters("my_fanout_exchange", ExchangeType.Fanout)
val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "@george_bush", actor {
case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
}))
val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "@barack_obama", actor {
case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload))
}))
val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters))
producer ! Message("@jonas_boner: I'm going surfing".getBytes, "")
}
def topic = {
// defaults to amqp://guest:guest@localhost:5672/
val connection = AMQP.newConnection()
val channelParameters = ChannelParameters("my_topic_exchange", ExchangeType.Topic)
val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "@george_bush", actor {
case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
}))
val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "@barack_obama", actor {
case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload))
}))
val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters))
producer ! Message("@jonas_boner: You still suck!!".getBytes, "@george_bush")
producer ! Message("@jonas_boner: Yes I can!".getBytes, "@barack_obama")
}
def callback = {
val channelCountdown = new CountDownLatch(2)
val connectionCallback = actor {
case Connected => log.info("Connection callback: Connected!")
case Reconnecting => () // not used, sent when connection fails and initiates a reconnect
case Disconnected => log.info("Connection callback: Disconnected!")
}
val connection = AMQP.newConnection(new ConnectionParameters(connectionCallback = Some(connectionCallback)))
val channelCallback = actor {
case Started => {
log.info("Channel callback: Started")
channelCountdown.countDown
}
case Restarting => // not used, sent when channel or connection fails and initiates a restart
case Stopped => log.info("Channel callback: Stopped")
}
val channelParameters = ChannelParameters("my_direct_exchange", ExchangeType.Direct, channelCallback = Some(channelCallback))
val consumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "callback.routing", actor {
case _ => () // not used
}))
val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters))
// Wait until both channels (producer & consumer) are started before stopping the connection
channelCountdown.await(2, TimeUnit.SECONDS)
connection.stop
}
}

View file

@ -0,0 +1,17 @@
package se.scalablesolutions.akka.amqp
sealed trait ExchangeType
object ExchangeType {
case object Direct extends ExchangeType {
override def toString = "direct"
}
case object Topic extends ExchangeType {
override def toString = "topic"
}
case object Fanout extends ExchangeType {
override def toString = "fanout"
}
case object Match extends ExchangeType {
override def toString = "match"
}
}

View file

@ -0,0 +1,101 @@
package se.scalablesolutions.akka.amqp
import collection.JavaConversions
import java.lang.Throwable
import se.scalablesolutions.akka.actor.Actor
import Actor._
import se.scalablesolutions.akka.amqp.AMQP.ChannelParameters
import com.rabbitmq.client.{ShutdownSignalException, Channel, ShutdownListener}
import scala.PartialFunction
abstract private[amqp] class FaultTolerantChannelActor(channelParameters: ChannelParameters) extends Actor {
import channelParameters._
protected[amqp] var channel: Option[Channel] = None
log.info("%s is started", toString)
override def receive = channelMessageHandler orElse specificMessageHandler
// to be defined in subclassing actor
def specificMessageHandler: PartialFunction[Any, Unit]
private def channelMessageHandler: PartialFunction[Any, Unit] = {
case Start =>
// ask the connection for a new channel
self.supervisor.foreach {
sup =>
log.info("%s is requesting new channel from supervising connection", toString)
val newChannel: Option[Option[Channel]] = (sup !! ChannelRequest).as[Option[Channel]]
newChannel.foreach(ch => ch.foreach(c => setupChannelInternal(c)))
}
case ch: Channel => {
setupChannelInternal(ch)
}
case ChannelShutdown(cause) => {
closeChannel
if (cause.isHardError) {
// connection error
if (cause.isInitiatedByApplication) {
log.info("%s got normal shutdown", toString)
} else {
log.error(cause, "%s got hard error", toString)
}
} else {
// channel error
log.error(cause, "%s self restarting because of channel shutdown", toString)
notifyCallback(Restarting)
self ! Start
}
}
case Failure(cause) =>
log.error(cause, "%s self restarting because of channel failure", toString)
closeChannel
notifyCallback(Restarting)
self ! Start
}
// to be defined in subclassing actor
protected def setupChannel(ch: Channel)
private def setupChannelInternal(ch: Channel) = if (channel.isEmpty) {
log.info("Exchange declare")
if (exchangePassive) {
ch.exchangeDeclarePassive(exchangeName)
} else {
ch.exchangeDeclare(exchangeName, exchangeType.toString, exchangeDurable, exchangeAutoDelete, JavaConversions.asMap(configurationArguments))
}
ch.addShutdownListener(new ShutdownListener {
def shutdownCompleted(cause: ShutdownSignalException) = {
self ! ChannelShutdown(cause)
}
})
shutdownListener.foreach(sdl => ch.getConnection.addShutdownListener(sdl))
log.info("shutdown listener added")
setupChannel(ch)
channel = Some(ch)
notifyCallback(Started)
log.info("Channel setup for %s", toString)
}
private def closeChannel = {
channel.foreach {
ch =>
if (ch.isOpen) ch.close
notifyCallback(Stopped)
log.info("%s channel closed", toString)
}
channel = None
}
private def notifyCallback(message: AMQPMessage) = {
channelCallback.foreach(cb => if (cb.isRunning) cb ! message)
}
override def preRestart(reason: Throwable) = {
notifyCallback(Restarting)
closeChannel
}
override def shutdown = closeChannel
}

View file

@ -0,0 +1,116 @@
package se.scalablesolutions.akka.amqp
import java.util.{TimerTask, Timer}
import java.io.IOException
import se.scalablesolutions.akka.util.Logging
import com.rabbitmq.client._
import se.scalablesolutions.akka.amqp.AMQP.ConnectionParameters
import se.scalablesolutions.akka.actor.{Exit, Actor}
import se.scalablesolutions.akka.config.ScalaConfig.{Permanent, LifeCycle}
private[amqp] class FaultTolerantConnectionActor(connectionParameters: ConnectionParameters) extends Actor with Logging {
import connectionParameters._
self.id = "amqp-connection-%s".format(host)
self.lifeCycle = Some(LifeCycle(Permanent))
val reconnectionTimer = new Timer("%s-timer".format(self.id))
val connectionFactory: ConnectionFactory = new ConnectionFactory()
connectionFactory.setHost(host)
connectionFactory.setPort(port)
connectionFactory.setUsername(username)
connectionFactory.setPassword(password)
connectionFactory.setVirtualHost(virtualHost)
var connection: Option[Connection] = None
protected def receive = {
case Connect => connect
case ChannelRequest => {
connection match {
case Some(conn) => {
val chanel: Channel = conn.createChannel
self.reply(Some(chanel))
}
case None => {
log.warning("Unable to create new channel - no connection")
reply(None)
}
}
}
case ConnectionShutdown(cause) => {
disconnect
if (cause.isHardError) {
// connection error
if (cause.isInitiatedByApplication) {
log.info("ConnectionShutdown by application [%s]", self.id)
} else {
log.error(cause, "ConnectionShutdown is hard error - self terminating")
self ! new Exit(self, cause)
}
}
}
}
private def connect = if (connection.isEmpty || !connection.get.isOpen) {
try {
connection = Some(connectionFactory.newConnection)
connection.foreach {
conn =>
conn.addShutdownListener(new ShutdownListener {
def shutdownCompleted(cause: ShutdownSignalException) = {
self ! ConnectionShutdown(cause)
}
})
log.info("Successfully (re)connected to AMQP Server %s:%s [%s]", host, port, self.id)
log.debug("Sending new channel to %d already linked actors", self.linkedActorsAsList.size)
self.linkedActorsAsList.foreach(_ ! conn.createChannel)
notifyCallback(Connected)
}
} catch {
case e: Exception =>
connection = None
log.info("Trying to connect to AMQP server in %d milliseconds [%s]"
, connectionParameters.initReconnectDelay, self.id)
reconnectionTimer.schedule(new TimerTask() {
override def run = {
notifyCallback(Reconnecting)
self ! Connect
}
}, connectionParameters.initReconnectDelay)
}
}
private def disconnect = {
try {
connection.foreach(_.close)
log.debug("Disconnected AMQP connection at %s:%s [%s]", host, port, self.id)
notifyCallback(Disconnected)
} catch {
case e: IOException => log.error("Could not close AMQP connection %s:%s [%s]", host, port, self.id)
case _ => ()
}
connection = None
}
private def notifyCallback(message: AMQPMessage) = {
connectionCallback.foreach(cb => if (cb.isRunning) cb ! message)
}
override def shutdown = {
reconnectionTimer.cancel
// make sure shutdown is called on all linked actors so they can do channel cleanup before connection is killed
self.linkedActorsAsList.foreach(_.stop)
disconnect
}
override def preRestart(reason: Throwable) = disconnect
override def postRestart(reason: Throwable) = {
notifyCallback(Reconnecting)
connect
}
}

View file

@ -0,0 +1,54 @@
package se.scalablesolutions.akka.amqp
import com.rabbitmq.client._
import se.scalablesolutions.akka.amqp.AMQP.ProducerParameters
private[amqp] class ProducerActor(producerParameters: ProducerParameters) extends FaultTolerantChannelActor(producerParameters.channelParameters) {
import producerParameters._
import channelParameters._
producerId.foreach(id => self.id = id)
def specificMessageHandler = {
case message@Message(payload, routingKey, mandatory, immediate, properties) if channel.isDefined => {
log.debug("Sending message [%s]", message)
channel.foreach(_.basicPublish(exchangeName, routingKey, mandatory, immediate, properties.getOrElse(null), payload))
}
case message@Message(payload, routingKey, mandatory, immediate, properties) => {
log.warning("Unable to send message [%s]", message)
// FIXME: If channel is not available, messages should be queued back into the actor mailbox and actor should only react on 'Start'
}
}
protected def setupChannel(ch: Channel) {
returnListener match {
case Some(listener) => ch.setReturnListener(listener)
case None => ch.setReturnListener(new ReturnListener() {
def handleBasicReturn(
replyCode: Int,
replyText: String,
exchange: String,
routingKey: String,
properties: com.rabbitmq.client.AMQP.BasicProperties,
body: Array[Byte]) = {
throw new MessageNotDeliveredException(
"Could not deliver message [" + body +
"] with reply code [" + replyCode +
"] with reply text [" + replyText +
"] and routing key [" + routingKey +
"] to exchange [" + exchange + "]",
replyCode, replyText, exchange, routingKey, properties, body)
}
})
}
}
override def toString(): String =
"AMQP.Poducer[id= "+ self.id +
", exchange=" + exchangeName +
", exchangeType=" + exchangeType +
", durable=" + exchangeDurable +
", autoDelete=" + exchangeAutoDelete + "]"
}

View file

@ -0,0 +1,47 @@
package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.util.Logging
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import org.multiverse.api.latches.StandardLatch
import com.rabbitmq.client.ShutdownSignalException
import se.scalablesolutions.akka.amqp._
import se.scalablesolutions.akka.amqp.AMQP.ConnectionParameters
import org.scalatest.matchers.MustMatchers
class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def connectionAndRecovery = {
val connectedLatch = new StandardLatch
val reconnectingLatch = new StandardLatch
val reconnectedLatch = new StandardLatch
val disconnectedLatch = new StandardLatch
val connectionCallback: ActorRef = Actor.actor({
case Connected =>
if (!connectedLatch.isOpen) {
connectedLatch.open
} else {
reconnectedLatch.open
}
case Reconnecting => reconnectingLatch.open
case Disconnected => disconnectedLatch.open
})
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50, connectionCallback = Some(connectionCallback)))
try {
connectedLatch.tryAwait(2, TimeUnit.SECONDS) must be(true)
connection ! new ConnectionShutdown(new ShutdownSignalException(true, false, "TestException", "TestRef"))
reconnectingLatch.tryAwait(2, TimeUnit.SECONDS) must be(true)
reconnectedLatch.tryAwait(2, TimeUnit.SECONDS) must be(true)
} finally {
connection.stop
disconnectedLatch.tryAwait(2, TimeUnit.SECONDS) must be(true)
}
}
}

View file

@ -0,0 +1,58 @@
package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.util.Logging
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor._
import org.multiverse.api.latches.StandardLatch
import com.rabbitmq.client.ShutdownSignalException
import se.scalablesolutions.akka.amqp._
import org.scalatest.matchers.MustMatchers
import se.scalablesolutions.akka.amqp.AMQP.{ConsumerParameters, ChannelParameters, ProducerParameters, ConnectionParameters}
import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.actor.ActorRef
import org.junit.Test
class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def consumerChannelRecovery = {
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))
try {
val producer = AMQP.newProducer(connection, ProducerParameters(
ChannelParameters("text_exchange", ExchangeType.Direct)))
val consumerStartedLatch = new StandardLatch
val consumerRestartedLatch = new StandardLatch
val consumerChannelCallback: ActorRef = actor {
case Started => {
if (!consumerStartedLatch.isOpen) {
consumerStartedLatch.open
} else {
consumerRestartedLatch.open
}
}
case Restarting => ()
case Stopped => ()
}
val payloadLatch = new StandardLatch
val consumerChannelParameters = ChannelParameters("text_exchange", ExchangeType.Direct, channelCallback = Some(consumerChannelCallback))
val consumer = AMQP.newConsumer(connection, ConsumerParameters(consumerChannelParameters, "non.interesting.routing.key", actor {
case Delivery(payload, _, _, _, _) => payloadLatch.open
}))
consumerStartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
val listenerLatch = new StandardLatch
consumer ! new ChannelShutdown(new ShutdownSignalException(false, false, "TestException", "TestRef"))
consumerRestartedLatch.tryAwait(4, TimeUnit.SECONDS) must be (true)
producer ! Message("some_payload".getBytes, "non.interesting.routing.key")
payloadLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
} finally {
connection.stop
}
}
}

View file

@ -0,0 +1,76 @@
package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.util.Logging
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.actor.Actor._
import org.multiverse.api.latches.StandardLatch
import com.rabbitmq.client.ShutdownSignalException
import se.scalablesolutions.akka.amqp._
import org.scalatest.matchers.MustMatchers
import se.scalablesolutions.akka.amqp.AMQP.{ConsumerParameters, ChannelParameters, ProducerParameters, ConnectionParameters}
import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.actor.ActorRef
import org.junit.Test
class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def consumerConnectionRecovery = {
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))
try {
val producerStartedLatch = new StandardLatch
val producerRestartedLatch = new StandardLatch
val producerChannelCallback: ActorRef = actor {
case Started => {
if (!producerStartedLatch.isOpen) {
producerStartedLatch.open
} else {
producerRestartedLatch.open
}
}
case Restarting => ()
case Stopped => ()
}
val producer = AMQP.newProducer(connection, ProducerParameters(
ChannelParameters("text_exchange", ExchangeType.Direct, channelCallback = Some(producerChannelCallback))))
producerStartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
val consumerStartedLatch = new StandardLatch
val consumerRestartedLatch = new StandardLatch
val consumerChannelCallback: ActorRef = actor {
case Started => {
if (!consumerStartedLatch.isOpen) {
consumerStartedLatch.open
} else {
consumerRestartedLatch.open
}
}
case Restarting => ()
case Stopped => ()
}
val payloadLatch = new StandardLatch
val consumerChannelParameters = ChannelParameters("text_exchange", ExchangeType.Direct, channelCallback = Some(consumerChannelCallback))
val consumer = AMQP.newConsumer(connection, ConsumerParameters(consumerChannelParameters, "non.interesting.routing.key", actor {
case Delivery(payload, _, _, _, _) => payloadLatch.open
}))
consumerStartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
val listenerLatch = new StandardLatch
connection ! new ConnectionShutdown(new ShutdownSignalException(true, false, "TestException", "TestRef"))
producerRestartedLatch.tryAwait(4, TimeUnit.SECONDS) must be (true)
consumerRestartedLatch.tryAwait(4, TimeUnit.SECONDS) must be (true)
producer ! Message("some_payload".getBytes, "non.interesting.routing.key")
payloadLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
} finally {
connection.stop
}
}
}

View file

@ -0,0 +1,47 @@
package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.util.Logging
import org.scalatest.junit.JUnitSuite
import se.scalablesolutions.akka.amqp.AMQP.{ConsumerParameters, ChannelParameters, ProducerParameters}
import org.multiverse.api.latches.StandardLatch
import se.scalablesolutions.akka.actor.Actor._
import org.scalatest.matchers.MustMatchers
import se.scalablesolutions.akka.amqp._
import org.junit.{After, Test}
import se.scalablesolutions.akka.actor.{ActorRegistry, ActorRef}
import java.util.concurrent.{CountDownLatch, TimeUnit}
class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def consumerMessageManualAcknowledge = {
val connection = AMQP.newConnection()
try {
val countDown = new CountDownLatch(2)
val channelCallback = actor {
case Started => countDown.countDown
case Restarting => ()
case Stopped => ()
}
val channelParameters = ChannelParameters("text_exchange",ExchangeType.Direct, channelCallback = Some(channelCallback))
val acknowledgeLatch = new StandardLatch
var deliveryTagCheck: Long = -1
val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "manual.ack.this", actor {
case Delivery(payload, _, deliveryTag, _, sender) => {
deliveryTagCheck = deliveryTag
sender.foreach(_ ! Acknowledge(deliveryTag))
}
case Acknowledged(deliveryTag) => if (deliveryTagCheck == deliveryTag) acknowledgeLatch.open
}, selfAcknowledging = false))
val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters))
countDown.await(2, TimeUnit.SECONDS) must be (true)
producer ! Message("some_payload".getBytes, "manual.ack.this")
acknowledgeLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
} finally {
connection.stop
}
}
}

View file

@ -0,0 +1,42 @@
package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.util.Logging
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.amqp._
import se.scalablesolutions.akka.amqp.AMQP.{ConsumerParameters, ChannelParameters, ProducerParameters}
import org.multiverse.api.latches.StandardLatch
import se.scalablesolutions.akka.actor.Actor._
import org.scalatest.matchers.MustMatchers
import java.util.concurrent.{CountDownLatch, TimeUnit}
class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def consumerMessage = {
val connection = AMQP.newConnection()
try {
val countDown = new CountDownLatch(2)
val channelCallback = actor {
case Started => countDown.countDown
case Restarting => ()
case Stopped => ()
}
val channelParameters = ChannelParameters("text_exchange",ExchangeType.Direct, channelCallback = Some(channelCallback))
val payloadLatch = new StandardLatch
val consumer = AMQP.newConsumer(connection, ConsumerParameters(channelParameters, "non.interesting.routing.key", actor {
case Delivery(payload, _, _, _, _) => payloadLatch.open
}))
val producer = AMQP.newProducer(connection, ProducerParameters(channelParameters))
countDown.await(2, TimeUnit.SECONDS) must be (true)
producer ! Message("some_payload".getBytes, "non.interesting.routing.key")
payloadLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
} finally {
connection.stop
}
}
}

View file

@ -0,0 +1,52 @@
package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.util.Logging
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import junit.framework.Assert
import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import org.multiverse.api.latches.StandardLatch
import com.rabbitmq.client.ShutdownSignalException
import se.scalablesolutions.akka.amqp._
import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ProducerParameters, ConnectionParameters}
import org.scalatest.matchers.MustMatchers
class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def producerChannelRecovery = {
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))
try {
val startedLatch = new StandardLatch
val restartingLatch = new StandardLatch
val restartedLatch = new StandardLatch
val producerCallback: ActorRef = Actor.actor({
case Started => {
if (!startedLatch.isOpen) {
startedLatch.open
} else {
restartedLatch.open
}
}
case Restarting => restartingLatch.open
case Stopped => ()
})
val producerParameters = ProducerParameters(
ChannelParameters("text_exchange", ExchangeType.Direct, channelCallback = Some(producerCallback)))
val producer = AMQP.newProducer(connection, producerParameters)
startedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
producer ! new ChannelShutdown(new ShutdownSignalException(false, false, "TestException", "TestRef"))
restartingLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
restartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
} finally {
connection.stop
}
}
}

View file

@ -0,0 +1,50 @@
package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.util.Logging
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import org.multiverse.api.latches.StandardLatch
import com.rabbitmq.client.ShutdownSignalException
import se.scalablesolutions.akka.amqp._
import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ProducerParameters, ConnectionParameters}
import org.scalatest.matchers.MustMatchers
class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def producerConnectionRecovery = {
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))
try {
val startedLatch = new StandardLatch
val restartingLatch = new StandardLatch
val restartedLatch = new StandardLatch
val producerCallback: ActorRef = Actor.actor({
case Started => {
if (!startedLatch.isOpen) {
startedLatch.open
} else {
restartedLatch.open
}
}
case Restarting => restartingLatch.open
case Stopped => ()
})
val producerParameters = ProducerParameters(
ChannelParameters("text_exchange", ExchangeType.Direct, channelCallback = Some(producerCallback)))
val producer = AMQP.newProducer(connection, producerParameters)
startedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
connection ! new ConnectionShutdown(new ShutdownSignalException(true, false, "TestException", "TestRef"))
restartingLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
restartedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
} finally {
connection.stop
}
}
}

View file

@ -0,0 +1,41 @@
package se.scalablesolutions.akka.amqp.test
import se.scalablesolutions.akka.util.Logging
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.actor.ActorRef
import org.multiverse.api.latches.StandardLatch
import se.scalablesolutions.akka.amqp._
import com.rabbitmq.client.ReturnListener
import com.rabbitmq.client.AMQP.BasicProperties
import java.lang.String
import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ProducerParameters}
import org.scalatest.matchers.MustMatchers
class AMQPProducerMessageTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def producerMessage = {
val connection: ActorRef = AMQP.newConnection()
try {
val returnLatch = new StandardLatch
val returnListener = new ReturnListener {
def handleBasicReturn(replyCode: Int, replyText: String, exchange: String, routingKey: String, properties: BasicProperties, body: Array[Byte]) = {
returnLatch.open
}
}
val producerParameters = ProducerParameters(
ChannelParameters("text_exchange", ExchangeType.Direct),
returnListener = Some(returnListener))
val producer = AMQP.newProducer(connection, producerParameters)
producer ! new Message("some_payload".getBytes, "non.interesing.routing.key", mandatory = true)
returnLatch.tryAwait(2, TimeUnit.SECONDS) must be(true)
} finally {
connection.stop
}
}
}

View file

@ -205,7 +205,12 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
class AkkaAMQPProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with CodeFellowPlugin {
val commons_io = "commons-io" % "commons-io" % "1.4" % "compile"
val rabbit = "com.rabbitmq" % "amqp-client" % "1.7.2" % "compile"
val rabbit = "com.rabbitmq" % "amqp-client" % "1.8.0" % "compile"
// testing
val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "test" intransitive()
val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test"
val junit = "junit" % "junit" % "4.5" % "test"
}
class AkkaHttpProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with CodeFellowPlugin {