From f05447408cc3e84c8a30bdc81d14e2b239a0752f Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 14 Aug 2012 08:07:49 +0200 Subject: [PATCH 1/6] make guardian supervisorStrategy configurable, see #2376 --- akka-actor/src/main/resources/reference.conf | 7 ++ .../scala/akka/actor/ActorRefProvider.scala | 99 +++++-------------- .../main/scala/akka/actor/ActorSystem.scala | 1 + .../main/scala/akka/actor/FaultHandling.scala | 29 +++++- .../akka/remote/RemoteActorRefProvider.scala | 2 +- 5 files changed, 64 insertions(+), 74 deletions(-) diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 723347ce98..df89284993 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -46,7 +46,14 @@ akka { actor { + # FQCN of the ActorRefProvider to be used; the below is the built-in default, + # another one is akka.remote.RemoteActorRefProvider in the akka-remote bundle. provider = "akka.actor.LocalActorRefProvider" + + # The guardian "/user" will use this subclass of akka.actor.SupervisorStrategyConfigurator + # to obtain its supervisorstrategy. Besides the default there is + # akka.actor.StoppingSupervisorStrategy + guardian-supervisor-strategy = "akka.actor.DefaultSupervisorStrategy" # Timeout for ActorSystem.actorOf creation-timeout = 20s diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 830826381a..ae2955e11e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -294,16 +294,6 @@ trait ActorRefFactory { def stop(actor: ActorRef): Unit } -/** - * Internal Akka use only, used in implementation of system.actorOf. - */ -private[akka] case class CreateChild(props: Props, name: String) - -/** - * Internal Akka use only, used in implementation of system.actorOf. - */ -private[akka] case class CreateRandomNameChild(props: Props) - /** * Internal Akka use only, used in implementation of system.stop(child). */ @@ -317,6 +307,7 @@ class LocalActorRefProvider( override val settings: ActorSystem.Settings, val eventStream: EventStream, override val scheduler: Scheduler, + val dynamicAccess: DynamicAccess, override val deployer: Deployer) extends ActorRefProvider { // this is the constructor needed for reflectively instantiating the provider @@ -329,6 +320,7 @@ class LocalActorRefProvider( settings, eventStream, scheduler, + dynamicAccess, new Deployer(settings, dynamicAccess)) override val rootPath: ActorPath = RootActorPath(Address("akka", _systemName)) @@ -380,65 +372,12 @@ class LocalActorRefProvider( } } - /** - * Overridable supervision strategy to be used by the “/user” guardian. - */ - protected def guardianSupervisionStrategy: SupervisorStrategy = { - import akka.actor.SupervisorStrategy._ - OneForOneStrategy() { - case _: ActorKilledException ⇒ Stop - case _: ActorInitializationException ⇒ Stop - case _: Exception ⇒ Restart - } - } - - /* - * Guardians can be asked by ActorSystem to create children, i.e. top-level - * actors. Therefore these need to answer to these requests, forwarding any - * exceptions which might have occurred. - */ - private class Guardian extends Actor { - - override val supervisorStrategy: SupervisorStrategy = guardianSupervisionStrategy + private class Guardian(override val supervisorStrategy: SupervisorStrategy) extends Actor { def receive = { - case Terminated(_) ⇒ context.stop(self) - case CreateChild(child, name) ⇒ sender ! (try context.actorOf(child, name) catch { case NonFatal(e) ⇒ Status.Failure(e) }) - case CreateRandomNameChild(child) ⇒ sender ! (try context.actorOf(child) catch { case NonFatal(e) ⇒ Status.Failure(e) }) - case StopChild(child) ⇒ context.stop(child) - case m ⇒ deadLetters ! DeadLetter(m, sender, self) - } - - // guardian MUST NOT lose its children during restart - override def preRestart(cause: Throwable, msg: Option[Any]) {} - } - - /** - * Overridable supervision strategy to be used by the “/system” guardian. - */ - protected def systemGuardianSupervisionStrategy: SupervisorStrategy = { - import akka.actor.SupervisorStrategy._ - OneForOneStrategy() { - case _: ActorKilledException | _: ActorInitializationException ⇒ Stop - case _: Exception ⇒ Restart - } - } - - /* - * Guardians can be asked by ActorSystem to create children, i.e. top-level - * actors. Therefore these need to answer to these requests, forwarding any - * exceptions which might have occurred. - */ - private class SystemGuardian extends Actor { - - override val supervisorStrategy: SupervisorStrategy = systemGuardianSupervisionStrategy - - def receive = { - case Terminated(_) ⇒ eventStream.stopDefaultLoggers(); context.stop(self) - case CreateChild(child, name) ⇒ sender ! (try context.actorOf(child, name) catch { case NonFatal(e) ⇒ Status.Failure(e) }) - case CreateRandomNameChild(child) ⇒ sender ! (try context.actorOf(child) catch { case NonFatal(e) ⇒ Status.Failure(e) }) - case StopChild(child) ⇒ context.stop(child); sender ! "ok" - case m ⇒ deadLetters ! DeadLetter(m, sender, self) + case Terminated(_) ⇒ if (context.self.path.name == "system") eventStream.stopDefaultLoggers(); context.stop(self) + case StopChild(child) ⇒ context.stop(child) + case m ⇒ deadLetters ! DeadLetter(m, sender, self) } // guardian MUST NOT lose its children during restart @@ -472,10 +411,26 @@ class LocalActorRefProvider( */ def registerExtraNames(_extras: Map[String, InternalActorRef]): Unit = extraNames ++= _extras - private val guardianProps = Props(new Guardian) + private lazy val guardianSupervisorStrategyConfigurator = + dynamicAccess.createInstanceFor[SupervisorStrategyConfigurator](settings.SupervisorStrategyClass, Seq()).fold(throw _, x ⇒ x) + + /** + * Overridable supervision strategy to be used by the “/user” guardian. + */ + protected def rootGuardianStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy + + /** + * Overridable supervision strategy to be used by the “/user” guardian. + */ + protected def guardianStrategy: SupervisorStrategy = guardianSupervisorStrategyConfigurator.create() + + /** + * Overridable supervision strategy to be used by the “/user” guardian. + */ + protected def systemGuardianStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy lazy val rootGuardian: InternalActorRef = - new LocalActorRef(system, guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, rootPath) { + new LocalActorRef(system, Props(new Guardian(rootGuardianStrategy)), theOneWhoWalksTheBubblesOfSpaceTime, rootPath) { override def getParent: InternalActorRef = this override def getSingleChild(name: String): InternalActorRef = name match { case "temp" ⇒ tempContainer @@ -483,10 +438,11 @@ class LocalActorRefProvider( } } - lazy val guardian: LocalActorRef = new LocalActorRef(system, guardianProps, rootGuardian, rootPath / "user") + lazy val guardian: LocalActorRef = + new LocalActorRef(system, Props(new Guardian(guardianStrategy)), rootGuardian, rootPath / "user") lazy val systemGuardian: LocalActorRef = - new LocalActorRef(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, rootPath / "system") + new LocalActorRef(system, Props(new Guardian(systemGuardianStrategy)), rootGuardian, rootPath / "system") lazy val tempContainer = new VirtualPathContainer(system.provider, tempNode, rootGuardian, log) @@ -559,4 +515,3 @@ class LocalActorRefProvider( def getExternalAddressFor(addr: Address): Option[Address] = if (addr == rootPath.address) Some(addr) else None } - diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index d10f7ba29c..146602ada0 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -133,6 +133,7 @@ object ActorSystem { final val ConfigVersion = getString("akka.version") final val ProviderClass = getString("akka.actor.provider") + final val SupervisorStrategyClass = getString("akka.actor.guardian-supervisor-strategy") final val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS)) final val SerializeAllMessages = getBoolean("akka.actor.serialize-messages") diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 863c4f9721..4d1bf7f046 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -61,6 +61,23 @@ case class ChildRestartStats(child: ActorRef, var uid: Int = 0, var maxNrOfRetri } } +/** + * Implement this interface in order to configure the supervisorStrategy for + * the top-level guardian actor (`/user`). An instance of this class must be + * instantiable using a no-arg constructor. + */ +trait SupervisorStrategyConfigurator { + def create(): SupervisorStrategy +} + +class DefaultSupervisorStrategy extends SupervisorStrategyConfigurator { + override def create(): SupervisorStrategy = SupervisorStrategy.defaultStrategy +} + +class StoppingSupervisorStrategy extends SupervisorStrategyConfigurator { + override def create(): SupervisorStrategy = SupervisorStrategy.stoppingStrategy +} + trait SupervisorStrategyLowPriorityImplicits { this: SupervisorStrategy.type ⇒ /** @@ -133,11 +150,21 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits { case _: ActorInitializationException ⇒ Stop case _: ActorKilledException ⇒ Stop case _: Exception ⇒ Restart - case _ ⇒ Escalate } OneForOneStrategy()(defaultDecider) } + /** + * This strategy resembles Erlang in that failing children are always + * terminated. + */ + final val stoppingStrategy: SupervisorStrategy = { + def stoppingDecider: Decider = { + case _: Exception ⇒ Stop + } + OneForOneStrategy()(stoppingDecider) + } + /** * Implicit conversion from `Seq` of Throwables to a `Decider`. * This maps the given Throwables to restarts, otherwise escalates. diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 131b5d76be..be029881ed 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -24,7 +24,7 @@ private[akka] class RemoteActorRefProvider( val deployer: RemoteDeployer = new RemoteDeployer(settings, dynamicAccess) - private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, deployer) + private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, dynamicAccess, deployer) @volatile private var _log = local.log From 496fd331dbf3ee07db7f134a10acf21efc75b8ef Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 14 Aug 2012 15:41:07 +0200 Subject: [PATCH 2/6] review comments (Viktor), see #2376 --- akka-actor/src/main/resources/reference.conf | 2 +- akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala | 2 +- akka-actor/src/main/scala/akka/actor/FaultHandling.scala | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index df89284993..2ab6b69b63 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -51,7 +51,7 @@ akka { provider = "akka.actor.LocalActorRefProvider" # The guardian "/user" will use this subclass of akka.actor.SupervisorStrategyConfigurator - # to obtain its supervisorstrategy. Besides the default there is + # to obtain its supervisorStrategy. Besides the default there is # akka.actor.StoppingSupervisorStrategy guardian-supervisor-strategy = "akka.actor.DefaultSupervisorStrategy" diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index ae2955e11e..52cab34e63 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -411,7 +411,7 @@ class LocalActorRefProvider( */ def registerExtraNames(_extras: Map[String, InternalActorRef]): Unit = extraNames ++= _extras - private lazy val guardianSupervisorStrategyConfigurator = + private def guardianSupervisorStrategyConfigurator = dynamicAccess.createInstanceFor[SupervisorStrategyConfigurator](settings.SupervisorStrategyClass, Seq()).fold(throw _, x ⇒ x) /** diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 4d1bf7f046..9623c7eef7 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -70,11 +70,11 @@ trait SupervisorStrategyConfigurator { def create(): SupervisorStrategy } -class DefaultSupervisorStrategy extends SupervisorStrategyConfigurator { +final class DefaultSupervisorStrategy extends SupervisorStrategyConfigurator { override def create(): SupervisorStrategy = SupervisorStrategy.defaultStrategy } -class StoppingSupervisorStrategy extends SupervisorStrategyConfigurator { +final class StoppingSupervisorStrategy extends SupervisorStrategyConfigurator { override def create(): SupervisorStrategy = SupervisorStrategy.stoppingStrategy } @@ -156,7 +156,7 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits { /** * This strategy resembles Erlang in that failing children are always - * terminated. + * terminated (one-for-one). */ final val stoppingStrategy: SupervisorStrategy = { def stoppingDecider: Decider = { From f7ea9bf3dd620e5a21f2276c939d07cfc78e1dcf Mon Sep 17 00:00:00 2001 From: Roland Date: Wed, 15 Aug 2012 15:25:43 +0200 Subject: [PATCH 3/6] add tests for guardian strategies, see #2376 - this discovered some pretty nice race conditions when creating actors synchronously (i.e. system.actorOf) vs. the recent fault-handling fix which discards Failed from old incarnations of a certain child - as a consequence, all actor creation MUST be registered with the parent before dispatching the Supervise message --- .../scala/akka/actor/ActorSystemSpec.scala | 50 ++++++++++++++++++- .../src/main/scala/akka/actor/ActorCell.scala | 16 +++--- .../src/main/scala/akka/actor/ActorRef.scala | 4 +- .../scala/akka/actor/ActorRefProvider.scala | 10 ++-- .../main/scala/akka/actor/FaultHandling.scala | 4 +- .../akka/actor/RepointableActorRef.scala | 4 +- .../main/scala/akka/actor/cell/Children.scala | 39 ++++++--------- .../akka/actor/cell/ChildrenContainer.scala | 28 ++++------- .../scala/akka/actor/cell/FaultHandling.scala | 6 ++- .../scala/akka/testkit/TestActorRef.scala | 15 +++++- 10 files changed, 116 insertions(+), 60 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index ad3f5e2870..06fbc99dfd 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -4,7 +4,6 @@ package akka.actor import language.postfixOps - import akka.testkit._ import org.scalatest.junit.JUnitSuite import com.typesafe.config.ConfigFactory @@ -12,9 +11,9 @@ import scala.concurrent.Await import scala.concurrent.util.duration._ import scala.collection.JavaConverters import java.util.concurrent.{ TimeUnit, RejectedExecutionException, CountDownLatch, ConcurrentLinkedQueue } -import akka.pattern.ask import akka.util.Timeout import scala.concurrent.Future +import akka.pattern.ask class JavaExtensionSpec extends JavaExtension with JUnitSuite @@ -62,6 +61,12 @@ object ActorSystemSpec { } } + class Strategy extends SupervisorStrategyConfigurator { + def create() = OneForOneStrategy() { + case _ ⇒ SupervisorStrategy.Escalate + } + } + } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -187,6 +192,47 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt created filter (ref ⇒ !ref.isTerminated && !ref.asInstanceOf[ActorRefWithCell].underlying.isInstanceOf[UnstartedCell]) must be(Seq()) } + "shut down when /user fails" in { + implicit val system = ActorSystem("Stop", AkkaSpec.testConf) + EventFilter[ActorKilledException]() intercept { + system.actorFor("/user") ! Kill + awaitCond(system.isTerminated) + } + } + + "allow configuration of guardian supervisor strategy" in { + implicit val system = ActorSystem("Stop", + ConfigFactory.parseString("akka.actor.guardian-supervisor-strategy=akka.actor.StoppingSupervisorStrategy") + .withFallback(AkkaSpec.testConf)) + val a = system.actorOf(Props(new Actor { + def receive = { + case "die" ⇒ throw new Exception("hello") + } + })) + val probe = TestProbe() + probe.watch(a) + EventFilter[Exception]("hello", occurrences = 1) intercept { + a ! "die" + } + probe.expectMsg(Terminated(a)(true)) + } + + "shut down when /user escalates" in { + implicit val system = ActorSystem("Stop", + ConfigFactory.parseString("akka.actor.guardian-supervisor-strategy=\"akka.actor.ActorSystemSpec$Strategy\"") + .withFallback(AkkaSpec.testConf)) + val a = system.actorOf(Props(new Actor { + def receive = { + case "die" ⇒ throw new Exception("hello") + } + })) + EventFilter[Exception]("hello") intercept { + Thread.sleep(250) + a ! "die" + awaitCond(system.isTerminated) + } + } + } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 40938ec809..209ed70478 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -13,7 +13,7 @@ import scala.util.control.NonFatal import akka.actor.cell.ChildrenContainer import akka.dispatch.{ Watch, Unwatch, Terminate, SystemMessage, Suspend, Supervise, Resume, Recreate, NoMessage, MessageDispatcher, Envelope, Create, ChildTerminated } -import akka.event.Logging.{ LogEvent, Debug } +import akka.event.Logging.{ LogEvent, Debug, Error } import akka.japi.Procedure /** @@ -213,7 +213,7 @@ private[akka] trait Cell { /** * Get the stats for the named child, if that exists. */ - def getChildByName(name: String): Option[ChildRestartStats] + def getChildByName(name: String): Option[ChildStats] /** * Enqueue a message to be sent to the actor; may or may not actually * schedule the actor to run, depending on which type of cell it is. @@ -368,7 +368,7 @@ private[akka] class ActorCell( case Kill ⇒ throw new ActorKilledException("Kill") case PoisonPill ⇒ self.stop() case SelectParent(m) ⇒ parent.tell(m, msg.sender) - case SelectChildName(name, m) ⇒ for (c ← getChildByName(name)) c.child.tell(m, msg.sender) + case SelectChildName(name, m) ⇒ getChildByName(name) match { case Some(c: ChildRestartStats) ⇒ c.child.tell(m, msg.sender); case _ ⇒ } case SelectChildPattern(p, m) ⇒ for (c ← children if p.matcher(c.path.name).matches) c.tell(m, msg.sender) } } @@ -444,9 +444,13 @@ private[akka] class ActorCell( private def supervise(child: ActorRef, uid: Int): Unit = if (!isTerminating) { // Supervise is the first thing we get from a new child, so store away the UID for later use in handleFailure() - addChild(child).uid = uid - handleSupervise(child) - if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) + initChild(child) match { + case Some(crs) ⇒ + crs.uid = uid + handleSupervise(child) + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) + case None ⇒ publish(Error(self.path.toString, clazz(actor), "received Supervise from unregistered child " + child + ", this will not end well")) + } } // future extension point diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 5ad951b89e..30aacac0d8 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -306,8 +306,8 @@ private[akka] class LocalActorRef private[akka] ( */ protected def getSingleChild(name: String): InternalActorRef = actorCell.getChildByName(name) match { - case Some(crs) ⇒ crs.child.asInstanceOf[InternalActorRef] - case None ⇒ Nobody + case Some(crs: ChildRestartStats) ⇒ crs.child.asInstanceOf[InternalActorRef] + case _ ⇒ Nobody } override def getChild(names: Iterator[String]): InternalActorRef = { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 52cab34e63..422636c1ed 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -429,7 +429,7 @@ class LocalActorRefProvider( */ protected def systemGuardianStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy - lazy val rootGuardian: InternalActorRef = + lazy val rootGuardian: LocalActorRef = new LocalActorRef(system, Props(new Guardian(rootGuardianStrategy)), theOneWhoWalksTheBubblesOfSpaceTime, rootPath) { override def getParent: InternalActorRef = this override def getSingleChild(name: String): InternalActorRef = name match { @@ -438,11 +438,15 @@ class LocalActorRefProvider( } } - lazy val guardian: LocalActorRef = + lazy val guardian: LocalActorRef = { + rootGuardian.underlying.reserveChild("user") new LocalActorRef(system, Props(new Guardian(guardianStrategy)), rootGuardian, rootPath / "user") + } - lazy val systemGuardian: LocalActorRef = + lazy val systemGuardian: LocalActorRef = { + rootGuardian.underlying.reserveChild("system") new LocalActorRef(system, Props(new Guardian(systemGuardianStrategy)), rootGuardian, rootPath / "system") + } lazy val tempContainer = new VirtualPathContainer(system.provider, tempNode, rootGuardian, log) diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 9623c7eef7..58c9a62967 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -24,9 +24,11 @@ private[akka] case object ChildNameReserved extends ChildStats * ChildRestartStats is the statistics kept by every parent Actor for every child Actor * and is used for SupervisorStrategies to know how to deal with problems that occur for the children. */ -case class ChildRestartStats(child: ActorRef, var uid: Int = 0, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) +case class ChildRestartStats(child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) extends ChildStats { + var uid = 0 + //FIXME How about making ChildRestartStats immutable and then move these methods into the actual supervisor strategies? def requestRestartPermission(retriesWindow: (Option[Int], Option[Int])): Boolean = retriesWindow match { diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 450a7afc34..7c38c0cff5 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -108,8 +108,8 @@ private[akka] class RepointableActorRef( case "" ⇒ getChild(name) case other ⇒ underlying.getChildByName(other) match { - case Some(crs) ⇒ crs.child.asInstanceOf[InternalActorRef].getChild(name) - case None ⇒ Nobody + case Some(crs: ChildRestartStats) ⇒ crs.child.asInstanceOf[InternalActorRef].getChild(name) + case _ ⇒ Nobody } } } else this diff --git a/akka-actor/src/main/scala/akka/actor/cell/Children.scala b/akka-actor/src/main/scala/akka/actor/cell/Children.scala index b2761519fe..eac359b8ab 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/Children.scala @@ -7,11 +7,12 @@ package akka.actor.cell import scala.annotation.tailrec import scala.collection.JavaConverters.asJavaIterableConverter import scala.util.control.NonFatal -import akka.actor.{ RepointableRef, Props, NoSerializationVerificationNeeded, InvalidActorNameException, InternalActorRef, ChildRestartStats, ActorRef } +import akka.actor._ import akka.actor.ActorCell import akka.actor.ActorPath.ElementRegex import akka.serialization.SerializationExtension import akka.util.{ Unsafe, Helpers } +import akka.actor.ChildNameReserved private[akka] trait Children { this: ActorCell ⇒ @@ -57,7 +58,7 @@ private[akka] trait Children { this: ActorCell ⇒ @inline private def swapChildrenRefs(oldChildren: ChildrenContainer, newChildren: ChildrenContainer): Boolean = Unsafe.instance.compareAndSwapObject(this, AbstractActorCell.childrenOffset, oldChildren, newChildren) - @tailrec final protected def reserveChild(name: String): Boolean = { + @tailrec final def reserveChild(name: String): Boolean = { val c = childrenRefs swapChildrenRefs(c, c.reserve(name)) || reserveChild(name) } @@ -67,24 +68,16 @@ private[akka] trait Children { this: ActorCell ⇒ swapChildrenRefs(c, c.unreserve(name)) || unreserveChild(name) } - final protected def addChild(ref: ActorRef): ChildRestartStats = { - @tailrec def rec(): ChildRestartStats = { - val c = childrenRefs - val nc = c.add(ref) - if (swapChildrenRefs(c, nc)) nc.getByName(ref.path.name).get else rec() + @tailrec final protected def initChild(ref: ActorRef): Option[ChildRestartStats] = + childrenRefs.getByName(ref.path.name) match { + case old @ Some(_: ChildRestartStats) ⇒ old.asInstanceOf[Option[ChildRestartStats]] + case Some(ChildNameReserved) ⇒ + val crs = ChildRestartStats(ref) + val name = ref.path.name + val c = childrenRefs + if (swapChildrenRefs(c, c.add(name, crs))) Some(crs) else initChild(ref) + case None ⇒ None } - /* - * This does not need to check getByRef every tailcall, because the change - * cannot happen in that direction as a race: the only entity removing a - * child is the actor itself, and the only entity which could be racing is - * somebody who calls attachChild, and there we are guaranteed that that - * child cannot yet have died (since it has not yet been created). - */ - childrenRefs.getByRef(ref) match { - case Some(old) ⇒ old - case None ⇒ rec() - } - } @tailrec final protected def shallDie(ref: ActorRef): Boolean = { val c = childrenRefs @@ -123,17 +116,17 @@ private[akka] trait Children { this: ActorCell ⇒ protected def suspendChildren(exceptFor: Set[ActorRef] = Set.empty): Unit = childrenRefs.stats foreach { - case ChildRestartStats(child, _, _, _) if !(exceptFor contains child) ⇒ child.asInstanceOf[InternalActorRef].suspend() + case ChildRestartStats(child, _, _) if !(exceptFor contains child) ⇒ child.asInstanceOf[InternalActorRef].suspend() case _ ⇒ } protected def resumeChildren(causedByFailure: Throwable, perp: ActorRef): Unit = childrenRefs.stats foreach { - case ChildRestartStats(child: InternalActorRef, _, _, _) ⇒ + case ChildRestartStats(child: InternalActorRef, _, _) ⇒ child.resume(if (perp == child) causedByFailure else null) } - def getChildByName(name: String): Option[ChildRestartStats] = childrenRefs.getByName(name) + def getChildByName(name: String): Option[ChildStats] = childrenRefs.getByName(name) protected def getChildByRef(ref: ActorRef): Option[ChildRestartStats] = childrenRefs.getByRef(ref) @@ -193,7 +186,7 @@ private[akka] trait Children { this: ActorCell ⇒ } // mailbox==null during RoutedActorCell constructor, where suspends are queued otherwise if (mailbox ne null) for (_ ← 1 to mailbox.suspendCount) actor.suspend() - addChild(actor) + initChild(actor) actor } } diff --git a/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala b/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala index 38bd239db6..e7b5fcc441 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala @@ -14,10 +14,10 @@ import akka.dispatch.SystemMessage */ private[akka] trait ChildrenContainer { - def add(child: ActorRef): ChildrenContainer + def add(name: String, stats: ChildRestartStats): ChildrenContainer def remove(child: ActorRef): ChildrenContainer - def getByName(name: String): Option[ChildRestartStats] + def getByName(name: String): Option[ChildStats] def getByRef(actor: ActorRef): Option[ChildRestartStats] def children: Iterable[ActorRef] @@ -50,8 +50,7 @@ private[akka] object ChildrenContainer { trait EmptyChildrenContainer extends ChildrenContainer { val emptyStats = TreeMap.empty[String, ChildStats] - override def add(child: ActorRef): ChildrenContainer = - new NormalChildrenContainer(emptyStats.updated(child.path.name, ChildRestartStats(child))) + override def add(name: String, stats: ChildRestartStats): ChildrenContainer = new NormalChildrenContainer(emptyStats.updated(name, stats)) override def remove(child: ActorRef): ChildrenContainer = this override def getByName(name: String): Option[ChildRestartStats] = None override def getByRef(actor: ActorRef): Option[ChildRestartStats] = None @@ -75,7 +74,7 @@ private[akka] object ChildrenContainer { * empty state while calling handleChildTerminated() for the last time. */ object TerminatedChildrenContainer extends EmptyChildrenContainer { - override def add(child: ActorRef): ChildrenContainer = this + override def add(name: String, stats: ChildRestartStats): ChildrenContainer = this override def reserve(name: String): ChildrenContainer = throw new IllegalStateException("cannot reserve actor name '" + name + "': already terminated") override def isTerminating: Boolean = true @@ -91,22 +90,18 @@ private[akka] object ChildrenContainer { */ class NormalChildrenContainer(val c: TreeMap[String, ChildStats]) extends ChildrenContainer { - override def add(child: ActorRef): ChildrenContainer = - new NormalChildrenContainer(c.updated(child.path.name, ChildRestartStats(child))) + override def add(name: String, stats: ChildRestartStats): ChildrenContainer = new NormalChildrenContainer(c.updated(name, stats)) override def remove(child: ActorRef): ChildrenContainer = NormalChildrenContainer(c - child.path.name) - override def getByName(name: String): Option[ChildRestartStats] = c.get(name) match { - case s @ Some(_: ChildRestartStats) ⇒ s.asInstanceOf[Option[ChildRestartStats]] - case _ ⇒ None - } + override def getByName(name: String): Option[ChildStats] = c.get(name) override def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match { case c @ Some(crs: ChildRestartStats) if (crs.child == actor) ⇒ c.asInstanceOf[Option[ChildRestartStats]] case _ ⇒ None } - override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _, _) ⇒ child } + override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) ⇒ child } override def stats: Iterable[ChildRestartStats] = c.values.view.collect { case c: ChildRestartStats ⇒ c } @@ -146,7 +141,7 @@ private[akka] object ChildrenContainer { case class TerminatingChildrenContainer(c: TreeMap[String, ChildStats], toDie: Set[ActorRef], reason: SuspendReason) extends ChildrenContainer { - override def add(child: ActorRef): ChildrenContainer = copy(c.updated(child.path.name, ChildRestartStats(child))) + override def add(name: String, stats: ChildRestartStats): ChildrenContainer = copy(c.updated(name, stats)) override def remove(child: ActorRef): ChildrenContainer = { val t = toDie - child @@ -157,17 +152,14 @@ private[akka] object ChildrenContainer { else copy(c - child.path.name, t) } - override def getByName(name: String): Option[ChildRestartStats] = c.get(name) match { - case s @ Some(_: ChildRestartStats) ⇒ s.asInstanceOf[Option[ChildRestartStats]] - case _ ⇒ None - } + override def getByName(name: String): Option[ChildStats] = c.get(name) override def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match { case c @ Some(crs: ChildRestartStats) if (crs.child == actor) ⇒ c.asInstanceOf[Option[ChildRestartStats]] case _ ⇒ None } - override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _, _) ⇒ child } + override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) ⇒ child } override def stats: Iterable[ChildRestartStats] = c.values.view.collect { case c: ChildRestartStats ⇒ c } diff --git a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala index 1d51777db1..3eaea06a22 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala @@ -201,7 +201,11 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ */ case Some(stats) if stats.uid == uid ⇒ if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, getAllChildStats)) throw cause - case _ ⇒ publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child)) + case Some(stats) ⇒ + publish(Debug(self.path.toString, clazz(actor), + "dropping Failed(" + cause + ") from old child " + child + " (uid=" + stats.uid + " != " + uid + ")")) + case None ⇒ + publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child)) } final protected def handleChildTerminated(child: ActorRef): SystemMessage = { diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index d961a75fff..33559cd56c 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -25,7 +25,17 @@ class TestActorRef[T <: Actor]( _props: Props, _supervisor: InternalActorRef, name: String) - extends LocalActorRef( + extends { + private val disregard = _supervisor match { + case l: LocalActorRef ⇒ l.underlying.reserveChild(name) + case r: RepointableActorRef ⇒ r.underlying match { + case u: UnstartedCell ⇒ throw new IllegalStateException("cannot attach a TestActor to an unstarted top-level actor, ensure that it is started by sending a message and observing the reply") + case c: ActorCell ⇒ c.reserveChild(name) + case o ⇒ _system.log.error("trying to attach child {} to unknown type of supervisor cell {}, this is not going to end well", name, o.getClass) + } + case s ⇒ _system.log.error("trying to attach child {} to unknown type of supervisor {}, this is not going to end well", name, s.getClass) + } + } with LocalActorRef( _system, _props.withDispatcher( if (_props.dispatcher == Dispatchers.DefaultDispatcherId) CallingThreadDispatcher.Id @@ -119,8 +129,9 @@ object TestActorRef { def apply[T <: Actor](props: Props, name: String)(implicit system: ActorSystem): TestActorRef[T] = apply[T](props, system.asInstanceOf[ActorSystemImpl].guardian, name) - def apply[T <: Actor](props: Props, supervisor: ActorRef, name: String)(implicit system: ActorSystem): TestActorRef[T] = + def apply[T <: Actor](props: Props, supervisor: ActorRef, name: String)(implicit system: ActorSystem): TestActorRef[T] = { new TestActorRef(system.asInstanceOf[ActorSystemImpl], system.dispatchers.prerequisites, props, supervisor.asInstanceOf[InternalActorRef], name) + } def apply[T <: Actor](implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](randomName) From c1c05ef95ef3b54b2069975745c7ad058248c0d5 Mon Sep 17 00:00:00 2001 From: Roland Date: Wed, 15 Aug 2012 21:46:05 +0200 Subject: [PATCH 4/6] fix CTD vs. RepointableRef by swallowing exceptions during send - it was always intended that tell() (and sendSystemMessage()) shall not throw any exceptions - this is implemented by swallowing in ActorCell (suspend/resume/restart/stop/!/sendSystemMessage) and in RemoteActorRef (!/sendSystemMessage) - current implementation uses a normal method, which adds overhead but keeps the code in one place (ActorCell.catchingSend); this is a great opportunity for making use of macros --- .../akka/serialization/SerializeSpec.scala | 4 ++-- .../src/main/scala/akka/actor/Actor.scala | 2 +- .../src/main/scala/akka/actor/ActorCell.scala | 18 ++++++++++++++---- .../scala/akka/actor/ActorRefProvider.scala | 6 +++++- .../scala/akka/actor/RepointableActorRef.scala | 12 ++---------- .../main/scala/akka/actor/cell/Dispatch.scala | 18 ++++++++++++------ .../akka/remote/RemoteActorRefProvider.scala | 6 ++++-- .../scala/akka/testkit/TestEventListener.scala | 5 +++-- 8 files changed, 43 insertions(+), 28 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 0c57b61f8c..4cd5a876d5 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -269,8 +269,8 @@ class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf) val a = system.actorOf(Props[FooActor]) Await.result(a ? "pigdog", timeout.duration) must be("pigdog") - intercept[NotSerializableException] { - Await.result(a ? new AnyRef, timeout.duration) + EventFilter[NotSerializableException](occurrences = 1) intercept { + a ! (new AnyRef) } system stop a } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 5df5932b21..6a9bdb0801 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -217,7 +217,7 @@ case class DeathPactException private[akka] (dead: ActorRef) * avoid cascading interrupts to other threads than the originally interrupted one. */ @SerialVersionUID(1L) -class ActorInterruptedException private[akka] (cause: Throwable) extends AkkaException(cause.getMessage, cause) with NoStackTrace +class ActorInterruptedException private[akka] (cause: Throwable) extends AkkaException(cause.getMessage, cause) /** * This message is published to the EventStream whenever an Actor receives a message it doesn't understand diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 209ed70478..59e740af2e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -182,19 +182,19 @@ private[akka] trait Cell { */ def systemImpl: ActorSystemImpl /** - * Recursively suspend this actor and all its children. + * Recursively suspend this actor and all its children. Must not throw exceptions. */ def suspend(): Unit /** - * Recursively resume this actor and all its children. + * Recursively resume this actor and all its children. Must not throw exceptions. */ def resume(causedByFailure: Throwable): Unit /** - * Restart this actor (will recursively restart or stop all children). + * Restart this actor (will recursively restart or stop all children). Must not throw exceptions. */ def restart(cause: Throwable): Unit /** - * Recursively terminate this actor and all its children. + * Recursively terminate this actor and all its children. Must not throw exceptions. */ def stop(): Unit /** @@ -217,11 +217,13 @@ private[akka] trait Cell { /** * Enqueue a message to be sent to the actor; may or may not actually * schedule the actor to run, depending on which type of cell it is. + * Must not throw exceptions. */ def tell(message: Any, sender: ActorRef): Unit /** * Enqueue a message to be sent to the actor; may or may not actually * schedule the actor to run, depending on which type of cell it is. + * Must not throw exceptions. */ def sendSystemMessage(msg: SystemMessage): Unit /** @@ -259,6 +261,14 @@ private[akka] object ActorCell { final val emptyBehaviorStack: List[Actor.Receive] = Nil final val emptyActorRefSet: Set[ActorRef] = TreeSet.empty + + final def catchingSend(system: ActorSystem, source: String, clazz: Class[_], code: ⇒ Unit): Unit = { + try code + catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ + system.eventStream.publish(Error(e, source, clazz, "swallowing exception during message send")) + } + } } //ACTORCELL IS 64bytes and should stay that way unless very good reason not to (machine sympathy, cache line fit) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 422636c1ed..079651d216 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -417,7 +417,11 @@ class LocalActorRefProvider( /** * Overridable supervision strategy to be used by the “/user” guardian. */ - protected def rootGuardianStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy + protected def rootGuardianStrategy: SupervisorStrategy = OneForOneStrategy() { + case ex ⇒ + log.error(ex, "guardian failed, shutting down system") + SupervisorStrategy.Stop + } /** * Overridable supervision strategy to be used by the “/user” guardian. diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 7c38c0cff5..d3a1a66432 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -147,24 +147,16 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep * lock, double-tap (well, N-tap, really); concurrent modification is * still not possible because we’re the only thread accessing the queues. */ - var interrupted = false while (systemQueue.nonEmpty || queue.nonEmpty) { while (systemQueue.nonEmpty) { val msg = systemQueue.dequeue() - try cell.sendSystemMessage(msg) - catch { - case _: InterruptedException ⇒ interrupted = true - } + cell.sendSystemMessage(msg) } if (queue.nonEmpty) { val envelope = queue.dequeue() - try cell.tell(envelope.message, envelope.sender) - catch { - case _: InterruptedException ⇒ interrupted = true - } + cell.tell(envelope.message, envelope.sender) } } - if (interrupted) throw new InterruptedException } finally try self.swapCell(cell) finally try diff --git a/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala index 6565eb7fe7..435a0f97bb 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala @@ -63,20 +63,26 @@ private[akka] trait Dispatch { this: ActorCell ⇒ } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def suspend(): Unit = dispatcher.systemDispatch(this, Suspend()) + final def suspend(): Unit = + ActorCell.catchingSend(system, self.path.toString, clazz(actor), dispatcher.systemDispatch(this, Suspend())) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def resume(causedByFailure: Throwable): Unit = dispatcher.systemDispatch(this, Resume(causedByFailure)) + final def resume(causedByFailure: Throwable): Unit = + ActorCell.catchingSend(system, self.path.toString, clazz(actor), dispatcher.systemDispatch(this, Resume(causedByFailure))) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause)) + final def restart(cause: Throwable): Unit = + ActorCell.catchingSend(system, self.path.toString, clazz(actor), dispatcher.systemDispatch(this, Recreate(cause))) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def stop(): Unit = dispatcher.systemDispatch(this, Terminate()) + final def stop(): Unit = + ActorCell.catchingSend(system, self.path.toString, clazz(actor), dispatcher.systemDispatch(this, Terminate())) def tell(message: Any, sender: ActorRef): Unit = - dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system)) + ActorCell.catchingSend(system, self.path.toString, clazz(actor), + dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system))) - override def sendSystemMessage(message: SystemMessage): Unit = dispatcher.systemDispatch(this, message) + override def sendSystemMessage(message: SystemMessage): Unit = + ActorCell.catchingSend(system, self.path.toString, clazz(actor), dispatcher.systemDispatch(this, message)) } \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index be029881ed..a35fd81042 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -229,9 +229,11 @@ private[akka] class RemoteActorRef private[akka] ( def isTerminated: Boolean = !running - def sendSystemMessage(message: SystemMessage): Unit = remote.send(message, None, this) + def sendSystemMessage(message: SystemMessage): Unit = + ActorCell.catchingSend(remote.system, path.toString, classOf[RemoteActorRef], remote.send(message, None, this)) - override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), this) + override def !(message: Any)(implicit sender: ActorRef = null): Unit = + ActorCell.catchingSend(remote.system, path.toString, classOf[RemoteActorRef], remote.send(message, Option(sender), this)) def suspend(): Unit = sendSystemMessage(Suspend()) diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala index 6c0eb7c993..2987ede478 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala @@ -14,6 +14,7 @@ import java.lang.{ Iterable ⇒ JIterable } import scala.collection.JavaConverters import scala.concurrent.util.Duration import scala.reflect.ClassTag +import akka.actor.NoSerializationVerificationNeeded /** * Implementation helpers of the EventFilter facilities: send `Mute` @@ -39,7 +40,7 @@ object TestEvent { object Mute { def apply(filter: EventFilter, filters: EventFilter*): Mute = new Mute(filter +: filters.toSeq) } - case class Mute(filters: Seq[EventFilter]) extends TestEvent { + case class Mute(filters: Seq[EventFilter]) extends TestEvent with NoSerializationVerificationNeeded { /** * Java API */ @@ -48,7 +49,7 @@ object TestEvent { object UnMute { def apply(filter: EventFilter, filters: EventFilter*): UnMute = new UnMute(filter +: filters.toSeq) } - case class UnMute(filters: Seq[EventFilter]) extends TestEvent { + case class UnMute(filters: Seq[EventFilter]) extends TestEvent with NoSerializationVerificationNeeded { /** * Java API */ From 85dcfd87d1e53c0e8183363974148e8fe2cb5647 Mon Sep 17 00:00:00 2001 From: Roland Date: Thu, 16 Aug 2012 11:31:53 +0200 Subject: [PATCH 5/6] inline manually CatchingSend() --- .../src/main/scala/akka/actor/ActorCell.scala | 8 ---- .../main/scala/akka/actor/cell/Dispatch.scala | 39 +++++++++++++++---- .../akka/remote/RemoteActorRefProvider.scala | 14 ++++++- 3 files changed, 44 insertions(+), 17 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 59e740af2e..2e1c7cd610 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -261,14 +261,6 @@ private[akka] object ActorCell { final val emptyBehaviorStack: List[Actor.Receive] = Nil final val emptyActorRefSet: Set[ActorRef] = TreeSet.empty - - final def catchingSend(system: ActorSystem, source: String, clazz: Class[_], code: ⇒ Unit): Unit = { - try code - catch { - case e @ (_: InterruptedException | NonFatal(_)) ⇒ - system.eventStream.publish(Error(e, source, clazz, "swallowing exception during message send")) - } - } } //ACTORCELL IS 64bytes and should stay that way unless very good reason not to (machine sympathy, cache line fit) diff --git a/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala index 435a0f97bb..794c051eed 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala @@ -7,7 +7,9 @@ package akka.actor.cell import scala.annotation.tailrec import akka.actor.{ ActorRef, ActorCell } import akka.dispatch.{ Terminate, SystemMessage, Suspend, Resume, Recreate, MessageDispatcher, Mailbox, Envelope, Create } +import akka.event.Logging.Error import akka.util.Unsafe +import scala.util.control.NonFatal private[akka] trait Dispatch { this: ActorCell ⇒ @@ -64,25 +66,48 @@ private[akka] trait Dispatch { this: ActorCell ⇒ // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ final def suspend(): Unit = - ActorCell.catchingSend(system, self.path.toString, clazz(actor), dispatcher.systemDispatch(this, Suspend())) + try dispatcher.systemDispatch(this, Suspend()) + catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) + } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ final def resume(causedByFailure: Throwable): Unit = - ActorCell.catchingSend(system, self.path.toString, clazz(actor), dispatcher.systemDispatch(this, Resume(causedByFailure))) + try dispatcher.systemDispatch(this, Resume(causedByFailure)) + catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) + } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ final def restart(cause: Throwable): Unit = - ActorCell.catchingSend(system, self.path.toString, clazz(actor), dispatcher.systemDispatch(this, Recreate(cause))) + try dispatcher.systemDispatch(this, Recreate(cause)) + catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) + } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ final def stop(): Unit = - ActorCell.catchingSend(system, self.path.toString, clazz(actor), dispatcher.systemDispatch(this, Terminate())) + try dispatcher.systemDispatch(this, Terminate()) + catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) + } def tell(message: Any, sender: ActorRef): Unit = - ActorCell.catchingSend(system, self.path.toString, clazz(actor), - dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system))) + try dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system)) + catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) + } override def sendSystemMessage(message: SystemMessage): Unit = - ActorCell.catchingSend(system, self.path.toString, clazz(actor), dispatcher.systemDispatch(this, message)) + try dispatcher.systemDispatch(this, message) + catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) + } } \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index a35fd81042..17f78ee202 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -7,8 +7,10 @@ package akka.remote import akka.actor._ import akka.dispatch._ import akka.event.{ Logging, LoggingAdapter, EventStream } +import akka.event.Logging.Error import akka.serialization.{ Serialization, SerializationExtension } import scala.concurrent.Future +import scala.util.control.NonFatal /** * Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it. @@ -230,10 +232,18 @@ private[akka] class RemoteActorRef private[akka] ( def isTerminated: Boolean = !running def sendSystemMessage(message: SystemMessage): Unit = - ActorCell.catchingSend(remote.system, path.toString, classOf[RemoteActorRef], remote.send(message, None, this)) + try remote.send(message, None, this) + catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ + remote.system.eventStream.publish(Error(e, path.toString, classOf[RemoteActorRef], "swallowing exception during message send")) + } override def !(message: Any)(implicit sender: ActorRef = null): Unit = - ActorCell.catchingSend(remote.system, path.toString, classOf[RemoteActorRef], remote.send(message, Option(sender), this)) + try remote.send(message, Option(sender), this) + catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ + remote.system.eventStream.publish(Error(e, path.toString, classOf[RemoteActorRef], "swallowing exception during message send")) + } def suspend(): Unit = sendSystemMessage(Suspend()) From a98cdffae58c0185058ccf5f22d35dc337be504e Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 17 Aug 2012 14:27:56 +0200 Subject: [PATCH 6/6] last round of fixes, see #2376 --- .../src/test/scala/akka/actor/ActorSystemSpec.scala | 1 - .../src/main/scala/akka/actor/ActorRefProvider.scala | 10 +++++----- .../src/main/scala/akka/actor/FaultHandling.scala | 2 +- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index 06fbc99dfd..7dfdf83e6e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -227,7 +227,6 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt } })) EventFilter[Exception]("hello") intercept { - Thread.sleep(250) a ! "die" awaitCond(system.isTerminated) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 079651d216..6556adc2cd 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -372,10 +372,10 @@ class LocalActorRefProvider( } } - private class Guardian(override val supervisorStrategy: SupervisorStrategy) extends Actor { + private class Guardian(override val supervisorStrategy: SupervisorStrategy, isSystem: Boolean) extends Actor { def receive = { - case Terminated(_) ⇒ if (context.self.path.name == "system") eventStream.stopDefaultLoggers(); context.stop(self) + case Terminated(_) ⇒ if (isSystem) eventStream.stopDefaultLoggers(); context.stop(self) case StopChild(child) ⇒ context.stop(child) case m ⇒ deadLetters ! DeadLetter(m, sender, self) } @@ -434,7 +434,7 @@ class LocalActorRefProvider( protected def systemGuardianStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy lazy val rootGuardian: LocalActorRef = - new LocalActorRef(system, Props(new Guardian(rootGuardianStrategy)), theOneWhoWalksTheBubblesOfSpaceTime, rootPath) { + new LocalActorRef(system, Props(new Guardian(rootGuardianStrategy, isSystem = false)), theOneWhoWalksTheBubblesOfSpaceTime, rootPath) { override def getParent: InternalActorRef = this override def getSingleChild(name: String): InternalActorRef = name match { case "temp" ⇒ tempContainer @@ -444,12 +444,12 @@ class LocalActorRefProvider( lazy val guardian: LocalActorRef = { rootGuardian.underlying.reserveChild("user") - new LocalActorRef(system, Props(new Guardian(guardianStrategy)), rootGuardian, rootPath / "user") + new LocalActorRef(system, Props(new Guardian(guardianStrategy, isSystem = false)), rootGuardian, rootPath / "user") } lazy val systemGuardian: LocalActorRef = { rootGuardian.underlying.reserveChild("system") - new LocalActorRef(system, Props(new Guardian(systemGuardianStrategy)), rootGuardian, rootPath / "system") + new LocalActorRef(system, Props(new Guardian(systemGuardianStrategy, isSystem = true)), rootGuardian, rootPath / "system") } lazy val tempContainer = new VirtualPathContainer(system.provider, tempNode, rootGuardian, log) diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 58c9a62967..3d1c9a01c3 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -27,7 +27,7 @@ private[akka] case object ChildNameReserved extends ChildStats case class ChildRestartStats(child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) extends ChildStats { - var uid = 0 + var uid: Int = 0 //FIXME How about making ChildRestartStats immutable and then move these methods into the actual supervisor strategies? def requestRestartPermission(retriesWindow: (Option[Int], Option[Int])): Boolean =