From cf76aa137b100f94b2f274321182d148ddebbb1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Sun, 23 May 2010 16:13:33 +0200 Subject: [PATCH] Fixed regression bug in AMQP supervisor code --- akka-amqp/src/main/scala/AMQP.scala | 37 ++++++++++++++++------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 3dcef18a44..ae2df7dc40 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -77,16 +77,25 @@ object AMQP { /** * @author Jonas Bonér */ - class AMQPSupervisor extends Actor with Logging { + class AMQPSupervisor extends Logging { + + class AMQPSupervisorActor extends Actor { + import scala.collection.JavaConversions._ + import self._ + + faultHandler = Some(OneForOneStrategy(5, 5000)) + trapExit = List(classOf[Throwable]) + + def receive = { + case _ => {} // ignore all messages + } + } + import scala.collection.JavaConversions._ - import self._ + private val supervisor = actorOf[AMQPSupervisorActor].start private val connections = new ConcurrentHashMap[ActorRef, ActorRef] - self.faultHandler = Some(OneForOneStrategy(5, 5000)) - self.trapExit = List(classOf[Throwable]) - start - def newProducer( config: ConnectionParameters, hostname: String, @@ -95,14 +104,14 @@ object AMQP { returnListener: Option[ReturnListener], shutdownListener: Option[ShutdownListener], initReconnectDelay: Long): ActorRef = { - val producer = actorOf( new Producer( + val producer = actorOf(new Producer( new ConnectionFactory(config), hostname, port, exchangeName, returnListener, shutdownListener, initReconnectDelay)) - startLink(producer) + supervisor.startLink(producer) producer } @@ -118,7 +127,7 @@ object AMQP { durable: Boolean, autoDelete: Boolean, configurationArguments: Map[String, AnyRef]): ActorRef = { - val consumer = actorOf( new Consumer( + val consumer = actorOf(new Consumer( new ConnectionFactory(config), hostname, port, exchangeName, @@ -129,24 +138,20 @@ object AMQP { durable, autoDelete, configurationArguments)) - startLink(consumer) + supervisor.startLink(consumer) consumer } def stopConnection(connection: ActorRef) = { connection ! Stop - unlink(connection) + supervisor.unlink(connection) connections.remove(connection) } - override def shutdown = { + def shutdown = { asMap(connections).valuesIterator.foreach(_ ! Stop) exit } - - def receive = { - case _ => {} // ignore all messages - } } sealed trait AMQPMessage