diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index ad3f5e2870..7dfdf83e6e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -4,7 +4,6 @@ package akka.actor import language.postfixOps - import akka.testkit._ import org.scalatest.junit.JUnitSuite import com.typesafe.config.ConfigFactory @@ -12,9 +11,9 @@ import scala.concurrent.Await import scala.concurrent.util.duration._ import scala.collection.JavaConverters import java.util.concurrent.{ TimeUnit, RejectedExecutionException, CountDownLatch, ConcurrentLinkedQueue } -import akka.pattern.ask import akka.util.Timeout import scala.concurrent.Future +import akka.pattern.ask class JavaExtensionSpec extends JavaExtension with JUnitSuite @@ -62,6 +61,12 @@ object ActorSystemSpec { } } + class Strategy extends SupervisorStrategyConfigurator { + def create() = OneForOneStrategy() { + case _ ⇒ SupervisorStrategy.Escalate + } + } + } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -187,6 +192,46 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt created filter (ref ⇒ !ref.isTerminated && !ref.asInstanceOf[ActorRefWithCell].underlying.isInstanceOf[UnstartedCell]) must be(Seq()) } + "shut down when /user fails" in { + implicit val system = ActorSystem("Stop", AkkaSpec.testConf) + EventFilter[ActorKilledException]() intercept { + system.actorFor("/user") ! Kill + awaitCond(system.isTerminated) + } + } + + "allow configuration of guardian supervisor strategy" in { + implicit val system = ActorSystem("Stop", + ConfigFactory.parseString("akka.actor.guardian-supervisor-strategy=akka.actor.StoppingSupervisorStrategy") + .withFallback(AkkaSpec.testConf)) + val a = system.actorOf(Props(new Actor { + def receive = { + case "die" ⇒ throw new Exception("hello") + } + })) + val probe = TestProbe() + probe.watch(a) + EventFilter[Exception]("hello", occurrences = 1) intercept { + a ! "die" + } + probe.expectMsg(Terminated(a)(true)) + } + + "shut down when /user escalates" in { + implicit val system = ActorSystem("Stop", + ConfigFactory.parseString("akka.actor.guardian-supervisor-strategy=\"akka.actor.ActorSystemSpec$Strategy\"") + .withFallback(AkkaSpec.testConf)) + val a = system.actorOf(Props(new Actor { + def receive = { + case "die" ⇒ throw new Exception("hello") + } + })) + EventFilter[Exception]("hello") intercept { + a ! "die" + awaitCond(system.isTerminated) + } + } + } } diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 0c57b61f8c..4cd5a876d5 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -269,8 +269,8 @@ class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf) val a = system.actorOf(Props[FooActor]) Await.result(a ? "pigdog", timeout.duration) must be("pigdog") - intercept[NotSerializableException] { - Await.result(a ? new AnyRef, timeout.duration) + EventFilter[NotSerializableException](occurrences = 1) intercept { + a ! (new AnyRef) } system stop a } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 459c9bd216..5afc1148a1 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -46,7 +46,14 @@ akka { actor { + # FQCN of the ActorRefProvider to be used; the below is the built-in default, + # another one is akka.remote.RemoteActorRefProvider in the akka-remote bundle. provider = "akka.actor.LocalActorRefProvider" + + # The guardian "/user" will use this subclass of akka.actor.SupervisorStrategyConfigurator + # to obtain its supervisorStrategy. Besides the default there is + # akka.actor.StoppingSupervisorStrategy + guardian-supervisor-strategy = "akka.actor.DefaultSupervisorStrategy" # Timeout for ActorSystem.actorOf creation-timeout = 20s diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 5df5932b21..6a9bdb0801 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -217,7 +217,7 @@ case class DeathPactException private[akka] (dead: ActorRef) * avoid cascading interrupts to other threads than the originally interrupted one. */ @SerialVersionUID(1L) -class ActorInterruptedException private[akka] (cause: Throwable) extends AkkaException(cause.getMessage, cause) with NoStackTrace +class ActorInterruptedException private[akka] (cause: Throwable) extends AkkaException(cause.getMessage, cause) /** * This message is published to the EventStream whenever an Actor receives a message it doesn't understand diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 40938ec809..2e1c7cd610 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -13,7 +13,7 @@ import scala.util.control.NonFatal import akka.actor.cell.ChildrenContainer import akka.dispatch.{ Watch, Unwatch, Terminate, SystemMessage, Suspend, Supervise, Resume, Recreate, NoMessage, MessageDispatcher, Envelope, Create, ChildTerminated } -import akka.event.Logging.{ LogEvent, Debug } +import akka.event.Logging.{ LogEvent, Debug, Error } import akka.japi.Procedure /** @@ -182,19 +182,19 @@ private[akka] trait Cell { */ def systemImpl: ActorSystemImpl /** - * Recursively suspend this actor and all its children. + * Recursively suspend this actor and all its children. Must not throw exceptions. */ def suspend(): Unit /** - * Recursively resume this actor and all its children. + * Recursively resume this actor and all its children. Must not throw exceptions. */ def resume(causedByFailure: Throwable): Unit /** - * Restart this actor (will recursively restart or stop all children). + * Restart this actor (will recursively restart or stop all children). Must not throw exceptions. */ def restart(cause: Throwable): Unit /** - * Recursively terminate this actor and all its children. + * Recursively terminate this actor and all its children. Must not throw exceptions. */ def stop(): Unit /** @@ -213,15 +213,17 @@ private[akka] trait Cell { /** * Get the stats for the named child, if that exists. */ - def getChildByName(name: String): Option[ChildRestartStats] + def getChildByName(name: String): Option[ChildStats] /** * Enqueue a message to be sent to the actor; may or may not actually * schedule the actor to run, depending on which type of cell it is. + * Must not throw exceptions. */ def tell(message: Any, sender: ActorRef): Unit /** * Enqueue a message to be sent to the actor; may or may not actually * schedule the actor to run, depending on which type of cell it is. + * Must not throw exceptions. */ def sendSystemMessage(msg: SystemMessage): Unit /** @@ -368,7 +370,7 @@ private[akka] class ActorCell( case Kill ⇒ throw new ActorKilledException("Kill") case PoisonPill ⇒ self.stop() case SelectParent(m) ⇒ parent.tell(m, msg.sender) - case SelectChildName(name, m) ⇒ for (c ← getChildByName(name)) c.child.tell(m, msg.sender) + case SelectChildName(name, m) ⇒ getChildByName(name) match { case Some(c: ChildRestartStats) ⇒ c.child.tell(m, msg.sender); case _ ⇒ } case SelectChildPattern(p, m) ⇒ for (c ← children if p.matcher(c.path.name).matches) c.tell(m, msg.sender) } } @@ -444,9 +446,13 @@ private[akka] class ActorCell( private def supervise(child: ActorRef, uid: Int): Unit = if (!isTerminating) { // Supervise is the first thing we get from a new child, so store away the UID for later use in handleFailure() - addChild(child).uid = uid - handleSupervise(child) - if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) + initChild(child) match { + case Some(crs) ⇒ + crs.uid = uid + handleSupervise(child) + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) + case None ⇒ publish(Error(self.path.toString, clazz(actor), "received Supervise from unregistered child " + child + ", this will not end well")) + } } // future extension point diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 5ad951b89e..30aacac0d8 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -306,8 +306,8 @@ private[akka] class LocalActorRef private[akka] ( */ protected def getSingleChild(name: String): InternalActorRef = actorCell.getChildByName(name) match { - case Some(crs) ⇒ crs.child.asInstanceOf[InternalActorRef] - case None ⇒ Nobody + case Some(crs: ChildRestartStats) ⇒ crs.child.asInstanceOf[InternalActorRef] + case _ ⇒ Nobody } override def getChild(names: Iterator[String]): InternalActorRef = { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 830826381a..6556adc2cd 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -294,16 +294,6 @@ trait ActorRefFactory { def stop(actor: ActorRef): Unit } -/** - * Internal Akka use only, used in implementation of system.actorOf. - */ -private[akka] case class CreateChild(props: Props, name: String) - -/** - * Internal Akka use only, used in implementation of system.actorOf. - */ -private[akka] case class CreateRandomNameChild(props: Props) - /** * Internal Akka use only, used in implementation of system.stop(child). */ @@ -317,6 +307,7 @@ class LocalActorRefProvider( override val settings: ActorSystem.Settings, val eventStream: EventStream, override val scheduler: Scheduler, + val dynamicAccess: DynamicAccess, override val deployer: Deployer) extends ActorRefProvider { // this is the constructor needed for reflectively instantiating the provider @@ -329,6 +320,7 @@ class LocalActorRefProvider( settings, eventStream, scheduler, + dynamicAccess, new Deployer(settings, dynamicAccess)) override val rootPath: ActorPath = RootActorPath(Address("akka", _systemName)) @@ -380,65 +372,12 @@ class LocalActorRefProvider( } } - /** - * Overridable supervision strategy to be used by the “/user” guardian. - */ - protected def guardianSupervisionStrategy: SupervisorStrategy = { - import akka.actor.SupervisorStrategy._ - OneForOneStrategy() { - case _: ActorKilledException ⇒ Stop - case _: ActorInitializationException ⇒ Stop - case _: Exception ⇒ Restart - } - } - - /* - * Guardians can be asked by ActorSystem to create children, i.e. top-level - * actors. Therefore these need to answer to these requests, forwarding any - * exceptions which might have occurred. - */ - private class Guardian extends Actor { - - override val supervisorStrategy: SupervisorStrategy = guardianSupervisionStrategy + private class Guardian(override val supervisorStrategy: SupervisorStrategy, isSystem: Boolean) extends Actor { def receive = { - case Terminated(_) ⇒ context.stop(self) - case CreateChild(child, name) ⇒ sender ! (try context.actorOf(child, name) catch { case NonFatal(e) ⇒ Status.Failure(e) }) - case CreateRandomNameChild(child) ⇒ sender ! (try context.actorOf(child) catch { case NonFatal(e) ⇒ Status.Failure(e) }) - case StopChild(child) ⇒ context.stop(child) - case m ⇒ deadLetters ! DeadLetter(m, sender, self) - } - - // guardian MUST NOT lose its children during restart - override def preRestart(cause: Throwable, msg: Option[Any]) {} - } - - /** - * Overridable supervision strategy to be used by the “/system” guardian. - */ - protected def systemGuardianSupervisionStrategy: SupervisorStrategy = { - import akka.actor.SupervisorStrategy._ - OneForOneStrategy() { - case _: ActorKilledException | _: ActorInitializationException ⇒ Stop - case _: Exception ⇒ Restart - } - } - - /* - * Guardians can be asked by ActorSystem to create children, i.e. top-level - * actors. Therefore these need to answer to these requests, forwarding any - * exceptions which might have occurred. - */ - private class SystemGuardian extends Actor { - - override val supervisorStrategy: SupervisorStrategy = systemGuardianSupervisionStrategy - - def receive = { - case Terminated(_) ⇒ eventStream.stopDefaultLoggers(); context.stop(self) - case CreateChild(child, name) ⇒ sender ! (try context.actorOf(child, name) catch { case NonFatal(e) ⇒ Status.Failure(e) }) - case CreateRandomNameChild(child) ⇒ sender ! (try context.actorOf(child) catch { case NonFatal(e) ⇒ Status.Failure(e) }) - case StopChild(child) ⇒ context.stop(child); sender ! "ok" - case m ⇒ deadLetters ! DeadLetter(m, sender, self) + case Terminated(_) ⇒ if (isSystem) eventStream.stopDefaultLoggers(); context.stop(self) + case StopChild(child) ⇒ context.stop(child) + case m ⇒ deadLetters ! DeadLetter(m, sender, self) } // guardian MUST NOT lose its children during restart @@ -472,10 +411,30 @@ class LocalActorRefProvider( */ def registerExtraNames(_extras: Map[String, InternalActorRef]): Unit = extraNames ++= _extras - private val guardianProps = Props(new Guardian) + private def guardianSupervisorStrategyConfigurator = + dynamicAccess.createInstanceFor[SupervisorStrategyConfigurator](settings.SupervisorStrategyClass, Seq()).fold(throw _, x ⇒ x) - lazy val rootGuardian: InternalActorRef = - new LocalActorRef(system, guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, rootPath) { + /** + * Overridable supervision strategy to be used by the “/user” guardian. + */ + protected def rootGuardianStrategy: SupervisorStrategy = OneForOneStrategy() { + case ex ⇒ + log.error(ex, "guardian failed, shutting down system") + SupervisorStrategy.Stop + } + + /** + * Overridable supervision strategy to be used by the “/user” guardian. + */ + protected def guardianStrategy: SupervisorStrategy = guardianSupervisorStrategyConfigurator.create() + + /** + * Overridable supervision strategy to be used by the “/user” guardian. + */ + protected def systemGuardianStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy + + lazy val rootGuardian: LocalActorRef = + new LocalActorRef(system, Props(new Guardian(rootGuardianStrategy, isSystem = false)), theOneWhoWalksTheBubblesOfSpaceTime, rootPath) { override def getParent: InternalActorRef = this override def getSingleChild(name: String): InternalActorRef = name match { case "temp" ⇒ tempContainer @@ -483,10 +442,15 @@ class LocalActorRefProvider( } } - lazy val guardian: LocalActorRef = new LocalActorRef(system, guardianProps, rootGuardian, rootPath / "user") + lazy val guardian: LocalActorRef = { + rootGuardian.underlying.reserveChild("user") + new LocalActorRef(system, Props(new Guardian(guardianStrategy, isSystem = false)), rootGuardian, rootPath / "user") + } - lazy val systemGuardian: LocalActorRef = - new LocalActorRef(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, rootPath / "system") + lazy val systemGuardian: LocalActorRef = { + rootGuardian.underlying.reserveChild("system") + new LocalActorRef(system, Props(new Guardian(systemGuardianStrategy, isSystem = true)), rootGuardian, rootPath / "system") + } lazy val tempContainer = new VirtualPathContainer(system.provider, tempNode, rootGuardian, log) @@ -559,4 +523,3 @@ class LocalActorRefProvider( def getExternalAddressFor(addr: Address): Option[Address] = if (addr == rootPath.address) Some(addr) else None } - diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 82fe2000b4..d3c3a558c2 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -133,6 +133,7 @@ object ActorSystem { final val ConfigVersion = getString("akka.version") final val ProviderClass = getString("akka.actor.provider") + final val SupervisorStrategyClass = getString("akka.actor.guardian-supervisor-strategy") final val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS)) final val SerializeAllMessages = getBoolean("akka.actor.serialize-messages") diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 863c4f9721..3d1c9a01c3 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -24,9 +24,11 @@ private[akka] case object ChildNameReserved extends ChildStats * ChildRestartStats is the statistics kept by every parent Actor for every child Actor * and is used for SupervisorStrategies to know how to deal with problems that occur for the children. */ -case class ChildRestartStats(child: ActorRef, var uid: Int = 0, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) +case class ChildRestartStats(child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) extends ChildStats { + var uid: Int = 0 + //FIXME How about making ChildRestartStats immutable and then move these methods into the actual supervisor strategies? def requestRestartPermission(retriesWindow: (Option[Int], Option[Int])): Boolean = retriesWindow match { @@ -61,6 +63,23 @@ case class ChildRestartStats(child: ActorRef, var uid: Int = 0, var maxNrOfRetri } } +/** + * Implement this interface in order to configure the supervisorStrategy for + * the top-level guardian actor (`/user`). An instance of this class must be + * instantiable using a no-arg constructor. + */ +trait SupervisorStrategyConfigurator { + def create(): SupervisorStrategy +} + +final class DefaultSupervisorStrategy extends SupervisorStrategyConfigurator { + override def create(): SupervisorStrategy = SupervisorStrategy.defaultStrategy +} + +final class StoppingSupervisorStrategy extends SupervisorStrategyConfigurator { + override def create(): SupervisorStrategy = SupervisorStrategy.stoppingStrategy +} + trait SupervisorStrategyLowPriorityImplicits { this: SupervisorStrategy.type ⇒ /** @@ -133,11 +152,21 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits { case _: ActorInitializationException ⇒ Stop case _: ActorKilledException ⇒ Stop case _: Exception ⇒ Restart - case _ ⇒ Escalate } OneForOneStrategy()(defaultDecider) } + /** + * This strategy resembles Erlang in that failing children are always + * terminated (one-for-one). + */ + final val stoppingStrategy: SupervisorStrategy = { + def stoppingDecider: Decider = { + case _: Exception ⇒ Stop + } + OneForOneStrategy()(stoppingDecider) + } + /** * Implicit conversion from `Seq` of Throwables to a `Decider`. * This maps the given Throwables to restarts, otherwise escalates. diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 884427ccf8..b92df6a4ce 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -108,8 +108,8 @@ private[akka] class RepointableActorRef( case "" ⇒ getChild(name) case other ⇒ underlying.getChildByName(other) match { - case Some(crs) ⇒ crs.child.asInstanceOf[InternalActorRef].getChild(name) - case None ⇒ Nobody + case Some(crs: ChildRestartStats) ⇒ crs.child.asInstanceOf[InternalActorRef].getChild(name) + case _ ⇒ Nobody } } } else this @@ -147,27 +147,16 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep * lock, double-tap (well, N-tap, really); concurrent modification is * still not possible because we’re the only thread accessing the queues. */ - var interrupted = false while (systemQueue.nonEmpty || queue.nonEmpty) { while (systemQueue.nonEmpty) { val msg = systemQueue.dequeue() - try cell.sendSystemMessage(msg) - catch { - case _: InterruptedException ⇒ interrupted = true - } + cell.sendSystemMessage(msg) } if (queue.nonEmpty) { val envelope = queue.dequeue() - try cell.tell(envelope.message, envelope.sender) - catch { - case _: InterruptedException ⇒ interrupted = true - } + cell.tell(envelope.message, envelope.sender) } } - if (interrupted) { - Thread.interrupted() // clear interrupted flag before throwing according to java convention - throw new InterruptedException - } } finally try self.swapCell(cell) finally try @@ -223,4 +212,4 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep } } -} \ No newline at end of file +} diff --git a/akka-actor/src/main/scala/akka/actor/cell/Children.scala b/akka-actor/src/main/scala/akka/actor/cell/Children.scala index 89c506fd75..5704a6b22b 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/Children.scala @@ -7,11 +7,12 @@ package akka.actor.cell import scala.annotation.tailrec import scala.collection.JavaConverters.asJavaIterableConverter import scala.util.control.NonFatal -import akka.actor.{ RepointableRef, Props, NoSerializationVerificationNeeded, InvalidActorNameException, InternalActorRef, ChildRestartStats, ActorRef } +import akka.actor._ import akka.actor.ActorCell import akka.actor.ActorPath.ElementRegex import akka.serialization.SerializationExtension import akka.util.{ Unsafe, Helpers } +import akka.actor.ChildNameReserved private[akka] trait Children { this: ActorCell ⇒ @@ -61,7 +62,7 @@ private[akka] trait Children { this: ActorCell ⇒ @inline private def swapChildrenRefs(oldChildren: ChildrenContainer, newChildren: ChildrenContainer): Boolean = Unsafe.instance.compareAndSwapObject(this, AbstractActorCell.childrenOffset, oldChildren, newChildren) - @tailrec final protected def reserveChild(name: String): Boolean = { + @tailrec final def reserveChild(name: String): Boolean = { val c = childrenRefs swapChildrenRefs(c, c.reserve(name)) || reserveChild(name) } @@ -71,24 +72,16 @@ private[akka] trait Children { this: ActorCell ⇒ swapChildrenRefs(c, c.unreserve(name)) || unreserveChild(name) } - final protected def addChild(ref: ActorRef): ChildRestartStats = { - @tailrec def rec(): ChildRestartStats = { - val c = childrenRefs - val nc = c.add(ref) - if (swapChildrenRefs(c, nc)) nc.getByName(ref.path.name).get else rec() + @tailrec final protected def initChild(ref: ActorRef): Option[ChildRestartStats] = + childrenRefs.getByName(ref.path.name) match { + case old @ Some(_: ChildRestartStats) ⇒ old.asInstanceOf[Option[ChildRestartStats]] + case Some(ChildNameReserved) ⇒ + val crs = ChildRestartStats(ref) + val name = ref.path.name + val c = childrenRefs + if (swapChildrenRefs(c, c.add(name, crs))) Some(crs) else initChild(ref) + case None ⇒ None } - /* - * This does not need to check getByRef every tailcall, because the change - * cannot happen in that direction as a race: the only entity removing a - * child is the actor itself, and the only entity which could be racing is - * somebody who calls attachChild, and there we are guaranteed that that - * child cannot yet have died (since it has not yet been created). - */ - childrenRefs.getByRef(ref) match { - case Some(old) ⇒ old - case None ⇒ rec() - } - } @tailrec final protected def shallDie(ref: ActorRef): Boolean = { val c = childrenRefs @@ -127,17 +120,17 @@ private[akka] trait Children { this: ActorCell ⇒ protected def suspendChildren(exceptFor: Set[ActorRef] = Set.empty): Unit = childrenRefs.stats foreach { - case ChildRestartStats(child, _, _, _) if !(exceptFor contains child) ⇒ child.asInstanceOf[InternalActorRef].suspend() + case ChildRestartStats(child, _, _) if !(exceptFor contains child) ⇒ child.asInstanceOf[InternalActorRef].suspend() case _ ⇒ } protected def resumeChildren(causedByFailure: Throwable, perp: ActorRef): Unit = childrenRefs.stats foreach { - case ChildRestartStats(child: InternalActorRef, _, _, _) ⇒ + case ChildRestartStats(child: InternalActorRef, _, _) ⇒ child.resume(if (perp == child) causedByFailure else null) } - def getChildByName(name: String): Option[ChildRestartStats] = childrenRefs.getByName(name) + def getChildByName(name: String): Option[ChildStats] = childrenRefs.getByName(name) protected def getChildByRef(ref: ActorRef): Option[ChildRestartStats] = childrenRefs.getByRef(ref) @@ -197,7 +190,7 @@ private[akka] trait Children { this: ActorCell ⇒ } // mailbox==null during RoutedActorCell constructor, where suspends are queued otherwise if (mailbox ne null) for (_ ← 1 to mailbox.suspendCount) actor.suspend() - addChild(actor) + initChild(actor) actor } } diff --git a/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala b/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala index 38bd239db6..e7b5fcc441 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala @@ -14,10 +14,10 @@ import akka.dispatch.SystemMessage */ private[akka] trait ChildrenContainer { - def add(child: ActorRef): ChildrenContainer + def add(name: String, stats: ChildRestartStats): ChildrenContainer def remove(child: ActorRef): ChildrenContainer - def getByName(name: String): Option[ChildRestartStats] + def getByName(name: String): Option[ChildStats] def getByRef(actor: ActorRef): Option[ChildRestartStats] def children: Iterable[ActorRef] @@ -50,8 +50,7 @@ private[akka] object ChildrenContainer { trait EmptyChildrenContainer extends ChildrenContainer { val emptyStats = TreeMap.empty[String, ChildStats] - override def add(child: ActorRef): ChildrenContainer = - new NormalChildrenContainer(emptyStats.updated(child.path.name, ChildRestartStats(child))) + override def add(name: String, stats: ChildRestartStats): ChildrenContainer = new NormalChildrenContainer(emptyStats.updated(name, stats)) override def remove(child: ActorRef): ChildrenContainer = this override def getByName(name: String): Option[ChildRestartStats] = None override def getByRef(actor: ActorRef): Option[ChildRestartStats] = None @@ -75,7 +74,7 @@ private[akka] object ChildrenContainer { * empty state while calling handleChildTerminated() for the last time. */ object TerminatedChildrenContainer extends EmptyChildrenContainer { - override def add(child: ActorRef): ChildrenContainer = this + override def add(name: String, stats: ChildRestartStats): ChildrenContainer = this override def reserve(name: String): ChildrenContainer = throw new IllegalStateException("cannot reserve actor name '" + name + "': already terminated") override def isTerminating: Boolean = true @@ -91,22 +90,18 @@ private[akka] object ChildrenContainer { */ class NormalChildrenContainer(val c: TreeMap[String, ChildStats]) extends ChildrenContainer { - override def add(child: ActorRef): ChildrenContainer = - new NormalChildrenContainer(c.updated(child.path.name, ChildRestartStats(child))) + override def add(name: String, stats: ChildRestartStats): ChildrenContainer = new NormalChildrenContainer(c.updated(name, stats)) override def remove(child: ActorRef): ChildrenContainer = NormalChildrenContainer(c - child.path.name) - override def getByName(name: String): Option[ChildRestartStats] = c.get(name) match { - case s @ Some(_: ChildRestartStats) ⇒ s.asInstanceOf[Option[ChildRestartStats]] - case _ ⇒ None - } + override def getByName(name: String): Option[ChildStats] = c.get(name) override def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match { case c @ Some(crs: ChildRestartStats) if (crs.child == actor) ⇒ c.asInstanceOf[Option[ChildRestartStats]] case _ ⇒ None } - override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _, _) ⇒ child } + override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) ⇒ child } override def stats: Iterable[ChildRestartStats] = c.values.view.collect { case c: ChildRestartStats ⇒ c } @@ -146,7 +141,7 @@ private[akka] object ChildrenContainer { case class TerminatingChildrenContainer(c: TreeMap[String, ChildStats], toDie: Set[ActorRef], reason: SuspendReason) extends ChildrenContainer { - override def add(child: ActorRef): ChildrenContainer = copy(c.updated(child.path.name, ChildRestartStats(child))) + override def add(name: String, stats: ChildRestartStats): ChildrenContainer = copy(c.updated(name, stats)) override def remove(child: ActorRef): ChildrenContainer = { val t = toDie - child @@ -157,17 +152,14 @@ private[akka] object ChildrenContainer { else copy(c - child.path.name, t) } - override def getByName(name: String): Option[ChildRestartStats] = c.get(name) match { - case s @ Some(_: ChildRestartStats) ⇒ s.asInstanceOf[Option[ChildRestartStats]] - case _ ⇒ None - } + override def getByName(name: String): Option[ChildStats] = c.get(name) override def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match { case c @ Some(crs: ChildRestartStats) if (crs.child == actor) ⇒ c.asInstanceOf[Option[ChildRestartStats]] case _ ⇒ None } - override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _, _) ⇒ child } + override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) ⇒ child } override def stats: Iterable[ChildRestartStats] = c.values.view.collect { case c: ChildRestartStats ⇒ c } diff --git a/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala index 6565eb7fe7..794c051eed 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala @@ -7,7 +7,9 @@ package akka.actor.cell import scala.annotation.tailrec import akka.actor.{ ActorRef, ActorCell } import akka.dispatch.{ Terminate, SystemMessage, Suspend, Resume, Recreate, MessageDispatcher, Mailbox, Envelope, Create } +import akka.event.Logging.Error import akka.util.Unsafe +import scala.util.control.NonFatal private[akka] trait Dispatch { this: ActorCell ⇒ @@ -63,20 +65,49 @@ private[akka] trait Dispatch { this: ActorCell ⇒ } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def suspend(): Unit = dispatcher.systemDispatch(this, Suspend()) + final def suspend(): Unit = + try dispatcher.systemDispatch(this, Suspend()) + catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) + } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def resume(causedByFailure: Throwable): Unit = dispatcher.systemDispatch(this, Resume(causedByFailure)) + final def resume(causedByFailure: Throwable): Unit = + try dispatcher.systemDispatch(this, Resume(causedByFailure)) + catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) + } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause)) + final def restart(cause: Throwable): Unit = + try dispatcher.systemDispatch(this, Recreate(cause)) + catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) + } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def stop(): Unit = dispatcher.systemDispatch(this, Terminate()) + final def stop(): Unit = + try dispatcher.systemDispatch(this, Terminate()) + catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) + } def tell(message: Any, sender: ActorRef): Unit = - dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system)) + try dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system)) + catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) + } - override def sendSystemMessage(message: SystemMessage): Unit = dispatcher.systemDispatch(this, message) + override def sendSystemMessage(message: SystemMessage): Unit = + try dispatcher.systemDispatch(this, message) + catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) + } } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala index 2730778ad0..4e5eeedcc6 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala @@ -204,7 +204,11 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ */ case Some(stats) if stats.uid == uid ⇒ if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, getAllChildStats)) throw cause - case _ ⇒ publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child)) + case Some(stats) ⇒ + publish(Debug(self.path.toString, clazz(actor), + "dropping Failed(" + cause + ") from old child " + child + " (uid=" + stats.uid + " != " + uid + ")")) + case None ⇒ + publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child)) } final protected def handleChildTerminated(child: ActorRef): SystemMessage = { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 42aa1cf148..5377013a42 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -7,8 +7,10 @@ package akka.remote import akka.actor._ import akka.dispatch._ import akka.event.{ Logging, LoggingAdapter, EventStream } +import akka.event.Logging.Error import akka.serialization.{ Serialization, SerializationExtension } import scala.concurrent.Future +import scala.util.control.NonFatal /** * Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it. @@ -24,7 +26,7 @@ class RemoteActorRefProvider( val deployer: RemoteDeployer = new RemoteDeployer(settings, dynamicAccess) - private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, deployer) + private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, dynamicAccess, deployer) @volatile private var _log = local.log @@ -229,9 +231,19 @@ private[akka] class RemoteActorRef private[akka] ( def isTerminated: Boolean = !running - def sendSystemMessage(message: SystemMessage): Unit = remote.send(message, None, this) + def sendSystemMessage(message: SystemMessage): Unit = + try remote.send(message, None, this) + catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ + remote.system.eventStream.publish(Error(e, path.toString, classOf[RemoteActorRef], "swallowing exception during message send")) + } - override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), this) + override def !(message: Any)(implicit sender: ActorRef = null): Unit = + try remote.send(message, Option(sender), this) + catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ + remote.system.eventStream.publish(Error(e, path.toString, classOf[RemoteActorRef], "swallowing exception during message send")) + } def suspend(): Unit = sendSystemMessage(Suspend()) diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index d961a75fff..33559cd56c 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -25,7 +25,17 @@ class TestActorRef[T <: Actor]( _props: Props, _supervisor: InternalActorRef, name: String) - extends LocalActorRef( + extends { + private val disregard = _supervisor match { + case l: LocalActorRef ⇒ l.underlying.reserveChild(name) + case r: RepointableActorRef ⇒ r.underlying match { + case u: UnstartedCell ⇒ throw new IllegalStateException("cannot attach a TestActor to an unstarted top-level actor, ensure that it is started by sending a message and observing the reply") + case c: ActorCell ⇒ c.reserveChild(name) + case o ⇒ _system.log.error("trying to attach child {} to unknown type of supervisor cell {}, this is not going to end well", name, o.getClass) + } + case s ⇒ _system.log.error("trying to attach child {} to unknown type of supervisor {}, this is not going to end well", name, s.getClass) + } + } with LocalActorRef( _system, _props.withDispatcher( if (_props.dispatcher == Dispatchers.DefaultDispatcherId) CallingThreadDispatcher.Id @@ -119,8 +129,9 @@ object TestActorRef { def apply[T <: Actor](props: Props, name: String)(implicit system: ActorSystem): TestActorRef[T] = apply[T](props, system.asInstanceOf[ActorSystemImpl].guardian, name) - def apply[T <: Actor](props: Props, supervisor: ActorRef, name: String)(implicit system: ActorSystem): TestActorRef[T] = + def apply[T <: Actor](props: Props, supervisor: ActorRef, name: String)(implicit system: ActorSystem): TestActorRef[T] = { new TestActorRef(system.asInstanceOf[ActorSystemImpl], system.dispatchers.prerequisites, props, supervisor.asInstanceOf[InternalActorRef], name) + } def apply[T <: Actor](implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](randomName) diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala index 6c0eb7c993..2987ede478 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala @@ -14,6 +14,7 @@ import java.lang.{ Iterable ⇒ JIterable } import scala.collection.JavaConverters import scala.concurrent.util.Duration import scala.reflect.ClassTag +import akka.actor.NoSerializationVerificationNeeded /** * Implementation helpers of the EventFilter facilities: send `Mute` @@ -39,7 +40,7 @@ object TestEvent { object Mute { def apply(filter: EventFilter, filters: EventFilter*): Mute = new Mute(filter +: filters.toSeq) } - case class Mute(filters: Seq[EventFilter]) extends TestEvent { + case class Mute(filters: Seq[EventFilter]) extends TestEvent with NoSerializationVerificationNeeded { /** * Java API */ @@ -48,7 +49,7 @@ object TestEvent { object UnMute { def apply(filter: EventFilter, filters: EventFilter*): UnMute = new UnMute(filter +: filters.toSeq) } - case class UnMute(filters: Seq[EventFilter]) extends TestEvent { + case class UnMute(filters: Seq[EventFilter]) extends TestEvent with NoSerializationVerificationNeeded { /** * Java API */