diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala index f644842591..43a5b97990 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala @@ -10,6 +10,7 @@ import akka.testkit.AkkaSpec import akka.testkit.DefaultTimeout import akka.pattern.ask import akka.util.duration._ +import akka.util.NonFatal object SupervisorMiscSpec { val config = """ @@ -77,5 +78,81 @@ class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with Defaul expectMsg("preStart") a.isTerminated must be(false) } + + "be able to recreate child when old child is Terminated" in { + val parent = system.actorOf(Props(new Actor { + val kid = context.watch(context.actorOf(Props.empty, "foo")) + def receive = { + case Terminated(`kid`) ⇒ + try { + val newKid = context.actorOf(Props.empty, "foo") + val result = + if (newKid eq kid) "Failure: context.actorOf returned the same instance!" + else if (!kid.isTerminated) "Kid is zombie" + else if (newKid.isTerminated) "newKid was stillborn" + else if (kid.path != newKid.path) "The kids do not share the same path" + else "green" + testActor ! result + } catch { + case NonFatal(e) ⇒ testActor ! e + } + case "engage" ⇒ context.stop(kid) + } + })) + parent ! "engage" + expectMsg("green") + } + + "not be able to recreate child when old child is alive" in { + val parent = system.actorOf(Props(new Actor { + def receive = { + case "engage" ⇒ + try { + val kid = context.actorOf(Props.empty, "foo") + context.stop(kid) + context.actorOf(Props.empty, "foo") + testActor ! "red" + } catch { + case e: InvalidActorNameException ⇒ testActor ! "green" + } + } + })) + parent ! "engage" + expectMsg("green") + } + + "be able to create a similar kid in the fault handling strategy" in { + val parent = system.actorOf(Props(new Actor { + + override val supervisorStrategy = new OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider) { + override def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = { + val newKid = context.actorOf(Props.empty, child.path.name) + testActor ! { + if ((newKid ne child) && newKid.path == child.path) "green" + else "red" + } + } + } + + def receive = { + case "engage" ⇒ context.stop(context.actorOf(Props.empty, "Robert")) + } + })) + parent ! "engage" + expectMsg("green") + } + + "support suspending until all dying children have properly expired" in { + val parent = system.actorOf(Props(new Actor { + val child = context.actorOf(Props.empty, "bob") + def receive = { + case "engage" ⇒ context.stop(child); context.suspendForChildTermination(); self ! "next" + case "next" ⇒ context.actorOf(Props.empty, "bob"); testActor ! "green" + } + })) + parent ! "engage" + expectMsg("green") + } + } } diff --git a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala index a7218230e4..787f29f93a 100644 --- a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala @@ -200,9 +200,10 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd } system.stop(supervisor) - expectMsg(Logging.Debug(sname, `sclass`, "stopping")) - expectMsg(Logging.Debug(aname, `aclass`, "stopped")) - expectMsg(Logging.Debug(sname, `sclass`, "stopped")) + expectMsgAllOf( + Logging.Debug(aname, aclass, "stopped"), + Logging.Debug(sname, sclass, "stopping"), + Logging.Debug(sname, sclass, "stopped")) } } } diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 89b6f5274a..2ae32cfcf5 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -118,7 +118,9 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with "use configured nr-of-instances when FromConfig" in { val router = system.actorOf(Props[TestActor].withRouter(FromConfig), "router1") Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3) + watch(router) system.stop(router) + expectMsgType[Terminated] } "use configured nr-of-instances when router is specified" in { diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 227030fe10..0dbdeac599 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -134,6 +134,14 @@ trait ActorContext extends ActorRefFactory { */ def unwatch(subject: ActorRef): ActorRef + /** + * Suspend this actor (after finishing processing of the current message) + * until all children for which stop(child) has been called have actually + * terminated. This is useful if a new child with the same name needs to + * be created before processing can continue. + */ + def suspendForChildTermination(): Unit + final protected def writeObject(o: ObjectOutputStream): Unit = throw new NotSerializableException("ActorContext is not serializable!") } @@ -166,14 +174,98 @@ private[akka] object ActorCell { override def initialValue = Stack[ActorContext]() } - val emptyChildrenRefs = TreeMap[String, ChildRestartStats]() - final val emptyCancellable: Cancellable = new Cancellable { def isCancelled = false def cancel() {} } final val emptyReceiveTimeoutData: (Long, Cancellable) = (-1, emptyCancellable) + + trait SuspendReason + case object UserRequest extends SuspendReason + case class Recreation(cause: Throwable) extends SuspendReason + case object Termination extends SuspendReason + + trait ChildrenContainer { + def add(child: ActorRef): ChildrenContainer + def remove(child: ActorRef): ChildrenContainer + def getByName(name: String): Option[ChildRestartStats] + def getByRef(actor: ActorRef): Option[ChildRestartStats] + def children: Iterable[ActorRef] + def stats: Iterable[ChildRestartStats] + def shallDie(actor: ActorRef): ChildrenContainer + } + + object EmptyChildrenContainer extends ChildrenContainer { + def add(child: ActorRef): ChildrenContainer = + new NormalChildrenContainer(TreeMap.empty[String, ChildRestartStats].updated(child.path.name, ChildRestartStats(child))) + def remove(child: ActorRef): ChildrenContainer = this + def getByName(name: String): Option[ChildRestartStats] = None + def getByRef(actor: ActorRef): Option[ChildRestartStats] = None + def children: Iterable[ActorRef] = Nil + def stats: Iterable[ChildRestartStats] = Nil + def shallDie(actor: ActorRef): ChildrenContainer = this + override def toString = "no children" + } + + class NormalChildrenContainer(c: TreeMap[String, ChildRestartStats]) extends ChildrenContainer { + + def add(child: ActorRef): ChildrenContainer = new NormalChildrenContainer(c.updated(child.path.name, ChildRestartStats(child))) + + def remove(child: ActorRef): ChildrenContainer = NormalChildrenContainer(c - child.path.name) + + def getByName(name: String): Option[ChildRestartStats] = c get name + + def getByRef(actor: ActorRef): Option[ChildRestartStats] = c get actor.path.name match { + case c @ Some(crs) if (crs.child == actor) ⇒ c + case _ ⇒ None + } + + def children: Iterable[ActorRef] = c.values.view.map(_.child) + + def stats: Iterable[ChildRestartStats] = c.values + + def shallDie(actor: ActorRef): ChildrenContainer = TerminatingChildrenContainer(c, Set(actor), UserRequest) + + override def toString = + if (c.size > 20) c.size + " children" + else c.mkString("children:\n ", "\n ", "") + } + + object NormalChildrenContainer { + def apply(c: TreeMap[String, ChildRestartStats]): ChildrenContainer = + if (c.isEmpty) EmptyChildrenContainer + else new NormalChildrenContainer(c) + } + + case class TerminatingChildrenContainer(c: TreeMap[String, ChildRestartStats], toDie: Set[ActorRef], reason: SuspendReason) + extends ChildrenContainer { + + def add(child: ActorRef): ChildrenContainer = copy(c.updated(child.path.name, ChildRestartStats(child))) + + def remove(child: ActorRef): ChildrenContainer = + if (toDie contains child) + if (toDie.size == 1) NormalChildrenContainer(c - child.path.name) + else copy(c - child.path.name, toDie - child) + else copy(c - child.path.name) + + def getByName(name: String): Option[ChildRestartStats] = c get name + + def getByRef(actor: ActorRef): Option[ChildRestartStats] = c get actor.path.name match { + case c @ Some(crs) if (crs.child == actor) ⇒ c + case _ ⇒ None + } + + def children: Iterable[ActorRef] = c.values.view.map(_.child) + + def stats: Iterable[ChildRestartStats] = c.values + + def shallDie(actor: ActorRef): ChildrenContainer = copy(toDie = toDie + actor) + + override def toString = + if (c.size > 20) c.size + " children" + else c.mkString("children (" + toDie.size + " terminating):\n ", "\n ", "") + } } //ACTORCELL IS 64bytes and should stay that way unless very good reason not to (machine sympathy, cache line fit) @@ -221,7 +313,8 @@ private[akka] class ActorCell( var receiveTimeoutData: (Long, Cancellable) = if (_receiveTimeout.isDefined) (_receiveTimeout.get.toMillis, emptyCancellable) else emptyReceiveTimeoutData - var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs + @volatile + var childrenRefs: ChildrenContainer = EmptyChildrenContainer private def _actorOf(props: Props, name: String): ActorRef = { if (system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) { @@ -235,7 +328,7 @@ private[akka] class ActorCell( } } val actor = provider.actorOf(systemImpl, props, self, self.path / name, false, None, true) - childrenRefs = childrenRefs.updated(name, ChildRestartStats(actor)) + childrenRefs = childrenRefs.add(actor) actor } @@ -249,26 +342,20 @@ private[akka] class ActorCell( case ElementRegex() ⇒ // this is fine case _ ⇒ throw new InvalidActorNameException("illegal actor name '" + name + "', must conform to " + ElementRegex) } - if (childrenRefs contains name) + if (childrenRefs.getByName(name).isDefined) throw new InvalidActorNameException("actor name " + name + " is not unique!") _actorOf(props, name) } final def stop(actor: ActorRef): Unit = { - val a = actor.asInstanceOf[InternalActorRef] - if (a.getParent == self && (childrenRefs contains actor.path.name)) { - system.locker ! a - handleChildTerminated(actor) // will remove child from childrenRefs - } - a.stop() + if (childrenRefs.getByRef(actor).isDefined) childrenRefs = childrenRefs.shallDie(actor) + actor.asInstanceOf[InternalActorRef].stop() } var currentMessage: Envelope = null var actor: Actor = _ - var stopping = false - @volatile //This must be volatile since it isn't protected by the mailbox status var mailbox: Mailbox = _ @@ -328,7 +415,12 @@ private[akka] class ActorCell( subject } - final def children: Iterable[ActorRef] = childrenRefs.values.view.map(_.child) + final def suspendForChildTermination(): Unit = childrenRefs match { + case _: TerminatingChildrenContainer ⇒ dispatcher suspend this + case _ ⇒ + } + + final def children: Iterable[ActorRef] = childrenRefs.children /** * Impl UntypedActorContext @@ -391,19 +483,18 @@ private[akka] class ActorCell( if (failedActor ne null) { val c = currentMessage //One read only plz try { - failedActor.preRestart(cause, if (c ne null) Some(c.message) else None) + if (failedActor.context ne null) failedActor.preRestart(cause, if (c ne null) Some(c.message) else None) } finally { clearActorFields() } } - val freshActor = newActor() // this must happen after failedActor.preRestart (to scrap those children) - actor = freshActor // this must happen before postRestart has a chance to fail - freshActor.postRestart(cause) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(freshActor), "restarted")) - - dispatcher.resume(this) //FIXME should this be moved down? - - actor.supervisorStrategy.handleSupervisorRestarted(cause, self, children) + childrenRefs match { + case ct: TerminatingChildrenContainer ⇒ + childrenRefs = ct.copy(reason = Recreation(cause)) + dispatcher suspend this + case _ ⇒ + doRecreate(cause) + } } catch { case NonFatal(e) ⇒ try { dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"), e)) @@ -422,51 +513,58 @@ private[akka] class ActorCell( setReceiveTimeout(None) cancelReceiveTimeout - val c = children - if (c.isEmpty) doTerminate() - else { - // do not process normal messages while waiting for all children to terminate - dispatcher suspend this - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopping")) - // do not use stop(child) because that would dissociate the children from us, but we still want to wait for them - for (child ← c) child.asInstanceOf[InternalActorRef].stop() - stopping = true + // stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children) + for (child ← children) stop(child) + + childrenRefs match { + case ct: TerminatingChildrenContainer ⇒ + childrenRefs = ct.copy(reason = Termination) + // do not process normal messages while waiting for all children to terminate + dispatcher suspend this + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopping")) + case x ⇒ doTerminate() } } def supervise(child: ActorRef): Unit = { - childrenRefs.get(child.path.name) match { - case None ⇒ - childrenRefs = childrenRefs.updated(child.path.name, ChildRestartStats(child)) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) - case Some(ChildRestartStats(`child`, _, _)) ⇒ - // this is the nominal case where we created the child and entered it in actorCreated() above - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) - case Some(ChildRestartStats(c, _, _)) ⇒ - system.eventStream.publish(Warning(self.path.toString, clazz(actor), "Already supervising other child with same name '" + child.path.name + "', old: " + c + " new: " + child)) - } + if (childrenRefs.getByRef(child).isEmpty) childrenRefs = childrenRefs.add(child) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) } try { - if (stopping) message match { - case Terminate() ⇒ terminate() // to allow retry - case ChildTerminated(child) ⇒ handleChildTerminated(child) - case _ ⇒ - } - else message match { - case Create() ⇒ create() - case Recreate(cause) ⇒ recreate(cause) - case Link(subject) ⇒ - system.deathWatch.subscribe(self, subject) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now monitoring " + subject)) - case Unlink(subject) ⇒ - system.deathWatch.unsubscribe(self, subject) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + subject)) - case Suspend() ⇒ suspend() - case Resume() ⇒ resume() - case Terminate() ⇒ terminate() - case Supervise(child) ⇒ supervise(child) - case ChildTerminated(child) ⇒ handleChildTerminated(child) + childrenRefs match { + case TerminatingChildrenContainer(_, _, Termination) ⇒ message match { + case Terminate() ⇒ terminate() // to allow retry + case ChildTerminated(child) ⇒ handleChildTerminated(child) + case _ ⇒ + } + case TerminatingChildrenContainer(_, _, _: Recreation) ⇒ message match { + case Link(subject) ⇒ + system.deathWatch.subscribe(self, subject) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now monitoring " + subject)) + case Unlink(subject) ⇒ + system.deathWatch.unsubscribe(self, subject) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + subject)) + case Terminate() ⇒ terminate() + case Supervise(child) ⇒ supervise(child) + case ChildTerminated(child) ⇒ handleChildTerminated(child) + case _ ⇒ + } + case _ ⇒ message match { + case Create() ⇒ create() + case Recreate(cause) ⇒ recreate(cause) + case Link(subject) ⇒ + system.deathWatch.subscribe(self, subject) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now monitoring " + subject)) + case Unlink(subject) ⇒ + system.deathWatch.unsubscribe(self, subject) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + subject)) + case Suspend() ⇒ suspend() + case Resume() ⇒ resume() + case Terminate() ⇒ terminate() + case Supervise(child) ⇒ supervise(child) + case ChildTerminated(child) ⇒ handleChildTerminated(child) + } } } catch { case NonFatal(e) ⇒ @@ -544,7 +642,7 @@ private[akka] class ActorCell( case Kill ⇒ throw new ActorKilledException("Kill") case PoisonPill ⇒ self.stop() case SelectParent(m) ⇒ parent.tell(m, msg.sender) - case SelectChildName(name, m) ⇒ if (childrenRefs contains name) childrenRefs(name).child.tell(m, msg.sender) + case SelectChildName(name, m) ⇒ childrenRefs getByName name foreach (_.child.tell(m, msg.sender)) case SelectChildPattern(p, m) ⇒ for (c ← children if p.matcher(c.path.name).matches) c.tell(m, msg.sender) } } @@ -566,22 +664,52 @@ private[akka] class ActorCell( } finally { if (a ne null) a.clearBehaviorStack() clearActorFields() + actor = null } } } - final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.get(child.path.name) match { - case Some(stats) if stats.child == child ⇒ if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, childrenRefs.values)) throw cause - case Some(stats) ⇒ system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child + " matching names but not the same, was: " + stats.child)) - case None ⇒ system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child)) + private def doRecreate(cause: Throwable): Unit = try { + // after all killed children have terminated, recreate the rest, then go on to start the new instance + actor.supervisorStrategy.handleSupervisorRestarted(cause, self, children) + + val freshActor = newActor() + actor = freshActor // this must happen before postRestart has a chance to fail + + freshActor.postRestart(cause) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(freshActor), "restarted")) + + dispatcher.resume(this) + } catch { + case NonFatal(e) ⇒ try { + dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"), e)) + // prevent any further messages to be processed until the actor has been restarted + dispatcher.suspend(this) + } finally { + parent.tell(Failed(ActorInitializationException(self, "exception during re-creation", e)), self) + } + } + + final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.getByRef(child) match { + case Some(stats) ⇒ if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, childrenRefs.stats)) throw cause + case None ⇒ system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child)) } final def handleChildTerminated(child: ActorRef): Unit = { - if (childrenRefs contains child.path.name) { - childrenRefs -= child.path.name - actor.supervisorStrategy.handleChildTerminated(this, child, children) - if (stopping && childrenRefs.isEmpty) doTerminate() - } else system.locker ! ChildTerminated(child) + childrenRefs match { + case tc @ TerminatingChildrenContainer(_, _, reason) ⇒ + val n = tc.remove(child) + childrenRefs = n + actor.supervisorStrategy.handleChildTerminated(this, child, children) + if (!n.isInstanceOf[TerminatingChildrenContainer]) reason match { + case UserRequest ⇒ if (mailbox.isSuspended) dispatcher resume this + case Recreation(cause) ⇒ doRecreate(cause) + case Termination ⇒ doTerminate() + } + case _ ⇒ + childrenRefs = childrenRefs.remove(child) + actor.supervisorStrategy.handleChildTerminated(this, child, children) + } } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ @@ -608,7 +736,6 @@ private[akka] class ActorCell( final def clearActorFields(): Unit = { setActorFields(context = null, self = system.deadLetters) currentMessage = null - actor = null } final def setActorFields(context: ActorContext, self: ActorRef) { @@ -639,3 +766,4 @@ private[akka] class ActorCell( private final def clazz(o: AnyRef): Class[_] = if (o eq null) this.getClass else o.getClass } + diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index bd2c0cf196..668731e4bb 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -284,14 +284,11 @@ private[akka] class LocalActorRef private[akka] ( * Method for looking up a single child beneath this actor. Override in order * to inject “synthetic” actor paths like “/temp”. */ - protected def getSingleChild(name: String): InternalActorRef = { - if (actorCell.isTerminated) Nobody // read of the mailbox status ensures we get the latest childrenRefs - else { - val children = actorCell.childrenRefs - if (children contains name) children(name).child.asInstanceOf[InternalActorRef] - else Nobody + protected def getSingleChild(name: String): InternalActorRef = + actorCell.childrenRefs.getByName(name) match { + case Some(crs) ⇒ crs.child.asInstanceOf[InternalActorRef] + case None ⇒ Nobody } - } def getChild(names: Iterator[String]): InternalActorRef = { /* diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 255a42d87c..60ce12f4b8 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -44,12 +44,6 @@ trait ActorRefProvider { */ def deathWatch: DeathWatch - /** - * Care-taker of actor refs which await final termination but cannot be kept - * in their parent’s children list because the name shall be freed. - */ - def locker: Locker - /** * The root path for all actors within this actor system, including remote * address if enabled. @@ -333,8 +327,6 @@ class LocalActorRefProvider( val deathWatch = new LocalDeathWatch(1024) //TODO make configrable - val locker: Locker = new Locker(scheduler, settings.ReaperInterval, this, rootPath / "locker", deathWatch) - /* * generate name for temporary actor refs */ diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 1daad13963..071a26e212 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -478,8 +478,6 @@ class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Conf def hasSystemMessages = false } - def locker: Locker = provider.locker - val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites( threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess, settings)) @@ -497,7 +495,6 @@ class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Conf private lazy val _start: this.type = { // the provider is expected to start default loggers, LocalActorRefProvider does this provider.init(this) - registerOnTermination(locker.shutdown()) registerOnTermination(stopScheduler()) loadExtensions() if (LogConfigOnStart) logConfiguration() diff --git a/akka-actor/src/main/scala/akka/actor/Locker.scala b/akka-actor/src/main/scala/akka/actor/Locker.scala deleted file mode 100644 index a9eb0ff2e6..0000000000 --- a/akka-actor/src/main/scala/akka/actor/Locker.scala +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.actor - -import akka.dispatch._ -import akka.util.Duration -import java.util.concurrent.ConcurrentHashMap -import akka.event.DeathWatch - -/** - * Internal implementation detail for disposing of orphaned actors. - */ -private[akka] class Locker( - scheduler: Scheduler, - period: Duration, - val provider: ActorRefProvider, - val path: ActorPath, - val deathWatch: DeathWatch) extends MinimalActorRef { - - class DavyJones extends Runnable { - def run = { - val iter = heap.entrySet.iterator - while (iter.hasNext) { - val soul = iter.next() - deathWatch.subscribe(Locker.this, soul.getValue) // in case Terminated got lost somewhere - soul.getValue match { - case _: LocalRef ⇒ // nothing to do, they know what they signed up for - case nonlocal ⇒ nonlocal.stop() // try again in case it was due to a communications failure - } - } - } - } - - private val heap = new ConcurrentHashMap[ActorPath, InternalActorRef] - - scheduler.schedule(period, period, new DavyJones) - - override def sendSystemMessage(msg: SystemMessage): Unit = this.!(msg) - - override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match { - case Terminated(soul) ⇒ heap.remove(soul.path) - case ChildTerminated(soul) ⇒ heap.remove(soul.path) - case soul: InternalActorRef ⇒ - heap.put(soul.path, soul) - deathWatch.subscribe(this, soul) - // now re-bind the soul so that it does not drown its parent - soul match { - case local: LocalActorRef ⇒ - val cell = local.underlying - cell.parent = this - case _ ⇒ - } - case _ ⇒ // ignore - } - - def childTerminated(parent: ActorRef, ct: ChildTerminated): Unit = { - heap.get(parent.path) match { - case null ⇒ - case ref ⇒ ref.sendSystemMessage(ct) - } - } - - def shutdown(): Unit = { - import scala.collection.JavaConverters._ - for (soul ← heap.values.asScala) { - soul match { - case l: LocalActorRef ⇒ l.underlying.dispatcher.detach(l.underlying) - case _ ⇒ - } - } - } - -} diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 2d929cdded..322b50b900 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -187,10 +187,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes var nextMessage = systemDrain() try { while ((nextMessage ne null) && !isClosed) { - if (debug) println(actor.self + " processing system message " + nextMessage + " with " + - (if (actor.childrenRefs.isEmpty) "no children" - else if (actor.childrenRefs.size > 20) actor.childrenRefs.size + " children" - else actor.childrenRefs.mkString("children:\n ", "\n ", ""))) + if (debug) println(actor.self + " processing system message " + nextMessage + " with " + actor.childrenRefs) actor systemInvoke nextMessage nextMessage = nextMessage.next // don’t ever execute normal message when system message present! diff --git a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala index aee30e3c56..0bc540f970 100644 --- a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala @@ -233,7 +233,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { "creating actor with Props" in { //#creating-props import akka.actor.Props - val myActor = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), name = "myactor") + val myActor = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), name = "myactor2") //#creating-props system.stop(myActor) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 89c35115ef..5870af9f95 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -40,7 +40,6 @@ class RemoteActorRefProvider( def log: LoggingAdapter = _log def rootPath = local.rootPath - def locker = local.locker def deadLetters = local.deadLetters val deathWatch = new RemoteDeathWatch(local.deathWatch, this) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index 1af4a6103b..0d55f17915 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -274,7 +274,6 @@ trait RemoteMarshallingOps { case l: LocalRef ⇒ if (provider.remoteSettings.LogReceive) log.debug("received local message {}", remoteMessage) remoteMessage.payload match { - case ct: ChildTerminated if l.isTerminated ⇒ provider.locker.childTerminated(l, ct) case msg: SystemMessage ⇒ if (useUntrustedMode) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not send system message") diff --git a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala index 73f8a98030..4475998c4e 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala @@ -31,6 +31,7 @@ object RemoteCommunicationSpec { } } +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class RemoteCommunicationSpec extends AkkaSpec(""" akka { actor.provider = "akka.remote.RemoteActorRefProvider" @@ -123,6 +124,8 @@ akka { myref ! 43 expectMsg(43) lastSender must be theSameInstanceAs remref + r.asInstanceOf[RemoteActorRef].getParent must be(l) + system.actorFor("/user/looker/child") must be theSameInstanceAs r Await.result(l ? "child/..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l Await.result(system.actorFor(system / "looker" / "child") ? "..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l }