Added durable and auto-delete to AMQP

This commit is contained in:
Jonas Bonér 2009-12-25 11:37:43 +01:00
parent 1f3a38228b
commit 48dff08848
3 changed files with 46 additions and 14 deletions

View file

@ -179,11 +179,16 @@ object AMQP {
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
case class MessageConsumerListener(queueName: String,
routingKey: String,
isUsingExistingQueue: Boolean,
actor: Actor) extends AMQPMessage {
def this(queueName: String, routingKey: String, actor: Actor) = this (queueName, routingKey, false, actor)
class MessageConsumerListener(val queueName: String,
val routingKey: String,
val exclusive: Boolean,
val autoDelete: Boolean,
val isUsingExistingQueue: Boolean,
val actor: Actor) extends AMQPMessage {
/**
* Creates a non-exclusive, non-autodelete message listener.
*/
def this(queueName: String, routingKey: String, actor: Actor) = this (queueName, routingKey, false, false, false, actor)
private[akka] var tag: Option[String] = None
@ -192,6 +197,8 @@ object AMQP {
", queue=" + queueName +
", routingKey=" + routingKey +
", tag=" + tag +
", exclusive=" + exclusive +
", autoDelete=" + autoDelete +
", isUsingExistingQueue=" + isUsingExistingQueue + "]"
def toString(exchangeName: String) =
@ -200,6 +207,8 @@ object AMQP {
", queue=" + queueName +
", routingKey=" + routingKey +
", tag=" + tag +
", exclusive=" + exclusive +
", autoDelete=" + autoDelete +
", isUsingExistingQueue=" + isUsingExistingQueue + "]"
/**
@ -223,8 +232,18 @@ object AMQP {
}
}
object MessageConsumerListener {
def apply(queueName: String, routingKey: String, actor: Actor) =
new MessageConsumerListener(queueName, routingKey, false, actor)
def apply(queueName: String,
routingKey: String,
exclusive: Boolean,
autoDelete: Boolean,
isUsingExistingQueue: Boolean,
actor: Actor) =
new MessageConsumerListener(queueName, routingKey, exclusive, autoDelete, isUsingExistingQueue, actor)
def apply(queueName: String,
routingKey: String,
actor: Actor) =
new MessageConsumerListener(queueName, routingKey, false, false, false, actor)
}
case object Stop extends AMQPMessage
@ -397,10 +416,14 @@ object AMQP {
private def registerListener(listener: MessageConsumerListener) = {
log.debug("Register MessageConsumerListener %s", listener.toString(exchangeName))
listeners.put(listener, listener)
if (!listener.isUsingExistingQueue) {
log.debug("Declaring new queue for MessageConsumerListener [%s]", listener.queueName)
channel.queueDeclare(listener.queueName)
channel.queueDeclare(
listener.queueName,
passive, durable,
listener.exclusive, listener.autoDelete,
configurationArguments.asJava)
}
log.debug("Binding new queue for MessageConsumerListener [%s]", listener.queueName)
@ -508,6 +531,16 @@ object AMQP {
def createQueue(name: String, durable: Boolean) =
channel.queueDeclare(name, false, durable, true, true, null).getQueue
def createQueue(name: String, passive: Boolean, durable: Boolean, exclusive: Boolean, autoDelete: Boolean) =
channel.queueDeclare(name, passive, durable, exclusive, autoDelete, null).getQueue
def createQueue(name: String, passive: Boolean, durable: Boolean, exclusive: Boolean, autoDelete: Boolean, arguments: java.util.Map[String, AnyRef]) =
channel.queueDeclare(name, passive, durable, exclusive, autoDelete, arguments).getQueue
def bindQueue(name: String) {
channel.queueBind(name, exchangeName, name)
}
def createBindQueue: String = {
val name = createQueue
channel.queueBind(name, exchangeName, name)

View file

@ -17,10 +17,10 @@ object ActorRegistry extends Logging {
private val actorsByClassName = new HashMap[String, List[Actor]]
private val actorsById = new HashMap[String, List[Actor]]
def actorsFor(clazz: Class[_ <: Actor]): List[Actor] = synchronized {
def actorsFor[T <: Actor](clazz: Class[T]): List[T] = synchronized {
actorsByClassName.get(clazz.getName) match {
case None => Nil
case Some(instances) => instances
case Some(instances) => instances.asInstanceOf[List[T]]
}
}

View file

@ -98,8 +98,7 @@ class PubSub extends Actor {
* Or browse to the URL from a web browser.
*/
@Path("/persistentscalacount")
class PersistentSimpleService extends Actor {
makeTransactionRequired
class PersistentSimpleService extends Transactor {
case object Tick
private val KEY = "COUNTER"
@ -125,7 +124,7 @@ class PersistentSimpleService extends Actor {
}
@Path("/chat")
class Chat extends Transactor {
class Chat extends Actor {
case class Chat(val who: String, val what: String, val msg: String)
@Suspend