diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index b00ec7f57f..d88b4da3ca 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -179,11 +179,16 @@ object AMQP { /** * @author Jonas Bonér */ - case class MessageConsumerListener(queueName: String, - routingKey: String, - isUsingExistingQueue: Boolean, - actor: Actor) extends AMQPMessage { - def this(queueName: String, routingKey: String, actor: Actor) = this (queueName, routingKey, false, actor) + class MessageConsumerListener(val queueName: String, + val routingKey: String, + val exclusive: Boolean, + val autoDelete: Boolean, + val isUsingExistingQueue: Boolean, + val actor: Actor) extends AMQPMessage { + /** + * Creates a non-exclusive, non-autodelete message listener. + */ + def this(queueName: String, routingKey: String, actor: Actor) = this (queueName, routingKey, false, false, false, actor) private[akka] var tag: Option[String] = None @@ -192,6 +197,8 @@ object AMQP { ", queue=" + queueName + ", routingKey=" + routingKey + ", tag=" + tag + + ", exclusive=" + exclusive + + ", autoDelete=" + autoDelete + ", isUsingExistingQueue=" + isUsingExistingQueue + "]" def toString(exchangeName: String) = @@ -200,6 +207,8 @@ object AMQP { ", queue=" + queueName + ", routingKey=" + routingKey + ", tag=" + tag + + ", exclusive=" + exclusive + + ", autoDelete=" + autoDelete + ", isUsingExistingQueue=" + isUsingExistingQueue + "]" /** @@ -223,8 +232,18 @@ object AMQP { } } object MessageConsumerListener { - def apply(queueName: String, routingKey: String, actor: Actor) = - new MessageConsumerListener(queueName, routingKey, false, actor) + def apply(queueName: String, + routingKey: String, + exclusive: Boolean, + autoDelete: Boolean, + isUsingExistingQueue: Boolean, + actor: Actor) = + new MessageConsumerListener(queueName, routingKey, exclusive, autoDelete, isUsingExistingQueue, actor) + + def apply(queueName: String, + routingKey: String, + actor: Actor) = + new MessageConsumerListener(queueName, routingKey, false, false, false, actor) } case object Stop extends AMQPMessage @@ -397,10 +416,14 @@ object AMQP { private def registerListener(listener: MessageConsumerListener) = { log.debug("Register MessageConsumerListener %s", listener.toString(exchangeName)) listeners.put(listener, listener) - + if (!listener.isUsingExistingQueue) { log.debug("Declaring new queue for MessageConsumerListener [%s]", listener.queueName) - channel.queueDeclare(listener.queueName) + channel.queueDeclare( + listener.queueName, + passive, durable, + listener.exclusive, listener.autoDelete, + configurationArguments.asJava) } log.debug("Binding new queue for MessageConsumerListener [%s]", listener.queueName) @@ -508,6 +531,16 @@ object AMQP { def createQueue(name: String, durable: Boolean) = channel.queueDeclare(name, false, durable, true, true, null).getQueue + def createQueue(name: String, passive: Boolean, durable: Boolean, exclusive: Boolean, autoDelete: Boolean) = + channel.queueDeclare(name, passive, durable, exclusive, autoDelete, null).getQueue + + def createQueue(name: String, passive: Boolean, durable: Boolean, exclusive: Boolean, autoDelete: Boolean, arguments: java.util.Map[String, AnyRef]) = + channel.queueDeclare(name, passive, durable, exclusive, autoDelete, arguments).getQueue + + def bindQueue(name: String) { + channel.queueBind(name, exchangeName, name) + } + def createBindQueue: String = { val name = createQueue channel.queueBind(name, exchangeName, name) diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index aa618dfb95..6cba33b7ba 100755 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -17,10 +17,10 @@ object ActorRegistry extends Logging { private val actorsByClassName = new HashMap[String, List[Actor]] private val actorsById = new HashMap[String, List[Actor]] - def actorsFor(clazz: Class[_ <: Actor]): List[Actor] = synchronized { + def actorsFor[T <: Actor](clazz: Class[T]): List[T] = synchronized { actorsByClassName.get(clazz.getName) match { case None => Nil - case Some(instances) => instances + case Some(instances) => instances.asInstanceOf[List[T]] } } diff --git a/akka-samples-scala/src/main/scala/SimpleService.scala b/akka-samples-scala/src/main/scala/SimpleService.scala index 11e8304551..f657b502db 100644 --- a/akka-samples-scala/src/main/scala/SimpleService.scala +++ b/akka-samples-scala/src/main/scala/SimpleService.scala @@ -98,8 +98,7 @@ class PubSub extends Actor { * Or browse to the URL from a web browser. */ @Path("/persistentscalacount") -class PersistentSimpleService extends Actor { - makeTransactionRequired +class PersistentSimpleService extends Transactor { case object Tick private val KEY = "COUNTER" @@ -125,7 +124,7 @@ class PersistentSimpleService extends Actor { } @Path("/chat") -class Chat extends Transactor { +class Chat extends Actor { case class Chat(val who: String, val what: String, val msg: String) @Suspend