diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 2ac2b85df5..e250ef066b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -199,6 +199,7 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { actor.mailbox = deadLetterMailbox mailBox.becomeClosed() // FIXME reschedule in tell if possible race with cleanUp is detected in order to properly clean up cleanUpMailboxFor(actor, mailBox) + mailBox.cleanUp() } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 07163d8a77..6a49ab8d0b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -194,6 +194,12 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag @inline final def dispatcher: MessageDispatcher = actor.dispatcher + + /** + * Overridable callback to clean up the mailbox, + * called when an actor is unregistered. + */ + protected[dispatch] def cleanUp() {} } trait MessageQueue { diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala index 51abd0b4d1..c5565d1026 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala @@ -22,7 +22,7 @@ class BeanstalkBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) val hostname = app.config.getString("akka.actor.mailbox.beanstalk.hostname", "0.0.0.0") val port = app.config.getInt("akka.actor.mailbox.beanstalk.port", 11300) - val defaultTimeUnit = app.AkkaConfig.DefaultTimeUnit + def defaultTimeUnit = app.AkkaConfig.DefaultTimeUnit val reconnectWindow = Duration(app.config.getInt("akka.actor.mailbox.beanstalk.reconnect-window", 5), defaultTimeUnit).toSeconds.toInt val messageSubmitDelay = Duration(app.config.getInt("akka.actor.mailbox.beanstalk.message-submit-delay", 0), defaultTimeUnit).toSeconds.toInt val messageSubmitTimeout = Duration(app.config.getInt("akka.actor.mailbox.beanstalk.message-submit-timeout", 5), defaultTimeUnit).toSeconds.toInt diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index 01b5f06cce..2473d75e39 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -40,16 +40,11 @@ class DurableMailboxException private[akka] (message: String, cause: Throwable) abstract class DurableMailbox(owner: ActorCell) extends Mailbox(owner) with DefaultSystemMessageQueue { import DurableExecutableMailboxConfig._ - val app = owner.app - val ownerPath = owner.self.path + def app = owner.app + def ownerPath = owner.self.path val ownerPathString = ownerPath.path.mkString("/") val name = "mailbox_" + Name.replaceAllIn(ownerPathString, "_") - val dispatcher: Dispatcher = owner.dispatcher match { - case e: Dispatcher ⇒ e - case _ ⇒ null - } - } trait DurableMessageSerialization { @@ -105,7 +100,7 @@ abstract class DurableMailboxType(mailboxFQN: String) extends MailboxType { } //TODO take into consideration a mailboxConfig parameter so one can have bounded mboxes and capacity etc - def create(dispatcher: MessageDispatcher, receiver: ActorCell): Mailbox = { + def create(receiver: ActorCell): Mailbox = { ReflectiveAccess.createInstance[AnyRef](mailboxClass, constructorSignature, Array[AnyRef](receiver)) match { case Right(instance) ⇒ instance.asInstanceOf[Mailbox] case Left(exception) ⇒ diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala index 1f82bc0276..c70b5972e4 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala @@ -22,7 +22,7 @@ class ZooKeeperBasedMailboxException(message: String) extends AkkaException(mess class ZooKeeperBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization { val zkServerAddresses = app.config.getString("akka.actor.mailbox.zookeeper.server-addresses", "localhost:2181") - val defaultTimeUnit = app.AkkaConfig.DefaultTimeUnit + def defaultTimeUnit = app.AkkaConfig.DefaultTimeUnit val sessionTimeout = Duration(app.config.getInt("akka.actor.mailbox.zookeeper.session-timeout", 60), defaultTimeUnit).toMillis.toInt val connectionTimeout = Duration(app.config.getInt("akka.actor.mailbox.zookeeper.connection-timeout", 60), defaultTimeUnit).toMillis.toInt val blockingQueue = app.config.getBool("akka.actor.mailbox.zookeeper.blocking-queue", true) @@ -63,5 +63,11 @@ class ZooKeeperBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) case e ⇒ false } - def close() = zkClient.close + override def cleanUp() { + try { + zkClient.close() + } catch { + case e: Exception ⇒ // ignore + } + } } diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala index 6838bc1338..da363beaaf 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala @@ -18,11 +18,6 @@ class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper", ZooKeepe super.atStartup() } - override def afterEach() { - // TOOD PN we should close the zkClient in the mailbox, would have been nice with a callback in the mailbox when it is closed - super.afterEach() - } - override def atTermination() { zkServer.shutdown super.atTermination()