From 8cd11550fad39ce361f418a76f8379e8ca14b18a Mon Sep 17 00:00:00 2001 From: Roland Date: Thu, 7 Jun 2012 15:10:19 +0200 Subject: [PATCH] ActorCell: move out and reuse children updaters, add stress test --- .../scala/akka/actor/ActorSystemSpec.scala | 32 ++++- .../src/main/scala/akka/actor/ActorCell.scala | 111 +++++++++--------- 2 files changed, 86 insertions(+), 57 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index 33a41c25c8..b9540fbf33 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -28,13 +28,19 @@ object ActorSystemSpec { class Waves extends Actor { var master: ActorRef = _ + var terminaters = Set[ActorRef]() def receive = { case n: Int ⇒ master = sender - for (i ← 1 to n) context.watch(context.system.actorOf(Props[Terminater])) ! "run" - case Terminated(child) if context.actorFor(child.path.parent) == self ⇒ - if (context.children.isEmpty) { + terminaters = Set() ++ (for (i ← 1 to n) yield { + val man = context.watch(context.system.actorOf(Props[Terminater])) + man ! "run" + man + }) + case Terminated(child) if terminaters contains child ⇒ + terminaters -= child + if (terminaters.isEmpty) { master ! "done" context stop self } @@ -57,7 +63,7 @@ object ActorSystemSpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExtension$"]""") { +class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExtension$"]""") with ImplicitSender { "An ActorSystem" must { @@ -154,6 +160,24 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt Await.result(Future.sequence(waves), timeout.duration + 5.seconds) must be === Seq("done", "done", "done") } + "reliable deny creation of actors while shutting down" in { + val system = ActorSystem() + system.scheduler.scheduleOnce(200 millis) { system.shutdown() } + var failing = false + var created = Vector.empty[ActorRef] + while (!system.isTerminated) { + try { + val t = system.actorOf(Props[ActorSystemSpec.Terminater]) + failing must not be true // because once failing => always failing (it’s due to shutdown) + created :+= t + } catch { + case e: Exception ⇒ failing = true + } + } + println(created.last) + created filter (!_.isTerminated) must be(Seq()) + } + } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index acebede751..ab8571100f 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -202,7 +202,13 @@ private[akka] object ActorCell { def children: Iterable[ActorRef] def stats: Iterable[ChildRestartStats] def shallDie(actor: ActorRef): ChildrenContainer + /** + * reserve that name or throw an exception + */ def reserve(name: String): ChildrenContainer + /** + * cancel a reservation + */ def unreserve(name: String): ChildrenContainer } @@ -233,7 +239,8 @@ private[akka] object ActorCell { */ object TerminatedChildrenContainer extends EmptyChildrenContainer { override def add(child: ActorRef): ChildrenContainer = this - override def reserve(name: String): ChildrenContainer = this + override def reserve(name: String): ChildrenContainer = + throw new IllegalStateException("cannot reserve actor name '" + name + "': already terminated") } /** @@ -326,10 +333,13 @@ private[akka] object ActorCell { def shallDie(actor: ActorRef): ChildrenContainer = copy(toDie = toDie + actor) - def reserve(name: String): ChildrenContainer = - if (c contains name) - throw new InvalidActorNameException("actor name " + name + " is not unique!") - else copy(c = c.updated(name, ChildNameReserved)) + def reserve(name: String): ChildrenContainer = reason match { + case Termination ⇒ throw new IllegalStateException("cannot reserve actor name '" + name + "': terminating") + case _ ⇒ + if (c contains name) + throw new InvalidActorNameException("actor name " + name + " is not unique!") + else copy(c = c.updated(name, ChildNameReserved)) + } def unreserve(name: String): ChildrenContainer = c get name match { case Some(ChildNameReserved) ⇒ copy(c = c - name) @@ -394,6 +404,40 @@ private[akka] class ActorCell( private def swapChildrenRefs(oldChildren: ChildrenContainer, newChildren: ChildrenContainer): Boolean = Unsafe.instance.compareAndSwapObject(this, childrenOffset, oldChildren, newChildren) + @tailrec private def reserveChild(name: String): Boolean = { + val c = childrenRefs + swapChildrenRefs(c, c.reserve(name)) || reserveChild(name) + } + + @tailrec private def unreserveChild(name: String): Boolean = { + val c = childrenRefs + swapChildrenRefs(c, c.unreserve(name)) || unreserveChild(name) + } + + @tailrec private def addChild(ref: ActorRef): Boolean = { + val c = childrenRefs + swapChildrenRefs(c, c.add(ref)) || addChild(ref) + } + + @tailrec private def shallDie(ref: ActorRef): Boolean = { + val c = childrenRefs + swapChildrenRefs(c, c.shallDie(ref)) || shallDie(ref) + } + + @tailrec private def removeChild(ref: ActorRef): ChildrenContainer = { + val c = childrenRefs + val n = c.remove(ref) + if (swapChildrenRefs(c, n)) n + else removeChild(ref) + } + + @tailrec private def setChildrenTerminationReason(reason: SuspendReason): Boolean = { + childrenRefs match { + case c: TerminatingChildrenContainer ⇒ swapChildrenRefs(c, c.copy(reason = reason)) || setChildrenTerminationReason(reason) + case _ ⇒ false + } + } + private def isTerminating = childrenRefs match { case TerminatingChildrenContainer(_, _, Termination) ⇒ true case TerminatedChildrenContainer ⇒ true @@ -418,27 +462,16 @@ private[akka] class ActorCell( // in case we are currently terminating, swallow creation requests and return EmptyLocalActorRef if (isTerminating) provider.actorFor(self, Seq(name)) else { - @tailrec def reserve(name: String): Boolean = { - val c = childrenRefs - swapChildrenRefs(c, c.reserve(name)) || reserve(name) - } - reserve(name) + reserveChild(name) + // this name will either be unreserved or overwritten with a real child below val actor = try provider.actorOf(systemImpl, props, self, self.path / name, false, None, true) catch { case NonFatal(e) ⇒ - @tailrec def unreserve(name: String): Boolean = { - val c = childrenRefs - swapChildrenRefs(c, c.unreserve(name)) || unreserve(name) - } - unreserve(name) + unreserveChild(name) throw e } - @tailrec def add(ref: ActorRef): Boolean = { - val c = childrenRefs - swapChildrenRefs(c, c.add(ref)) || add(ref) - } - add(actor) + addChild(actor) actor } } @@ -457,10 +490,6 @@ private[akka] class ActorCell( } final def stop(actor: ActorRef): Unit = { - @tailrec def shallDie(ref: ActorRef): Boolean = { - val c = childrenRefs - swapChildrenRefs(c, c.shallDie(ref)) || shallDie(ref) - } if (childrenRefs.getByRef(actor).isDefined) shallDie(actor) actor.asInstanceOf[InternalActorRef].stop() } @@ -622,13 +651,7 @@ private[akka] class ActorCell( } childrenRefs match { case ct: TerminatingChildrenContainer ⇒ - @tailrec def rec(cause: Throwable): Boolean = { - childrenRefs match { - case c: TerminatingChildrenContainer ⇒ swapChildrenRefs(c, c.copy(reason = Recreation(cause))) || rec(cause) - case _ ⇒ true // cannot happen - } - } - rec(cause) + setChildrenTerminationReason(Recreation(cause)) dispatcher suspend this case _ ⇒ doRecreate(cause, failedActor) @@ -686,13 +709,7 @@ private[akka] class ActorCell( childrenRefs match { case ct: TerminatingChildrenContainer ⇒ - @tailrec def rec(): Boolean = { - childrenRefs match { - case c: TerminatingChildrenContainer ⇒ swapChildrenRefs(c, c.copy(reason = Termination)) || rec() - case _ ⇒ true // cannot happen - } - } - rec() + setChildrenTerminationReason(Termination) // do not process normal messages while waiting for all children to terminate dispatcher suspend this if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopping")) @@ -701,13 +718,7 @@ private[akka] class ActorCell( } def supervise(child: ActorRef): Unit = if (!isTerminating) { - if (childrenRefs.getByRef(child).isEmpty) { - @tailrec def add(ref: ActorRef): Boolean = { - val c = childrenRefs - swapChildrenRefs(c, c.add(ref)) || add(ref) - } - add(child) - } + if (childrenRefs.getByRef(child).isEmpty) addChild(child) if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) } @@ -870,15 +881,9 @@ private[akka] class ActorCell( } final def handleChildTerminated(child: ActorRef): Unit = try { - @tailrec def remove(ref: ActorRef): ChildrenContainer = { - val c = childrenRefs - val n = c.remove(ref) - if (swapChildrenRefs(c, n)) n - else remove(ref) - } childrenRefs match { case tc @ TerminatingChildrenContainer(_, _, reason) ⇒ - val n = remove(child) + val n = removeChild(child) actor.supervisorStrategy.handleChildTerminated(this, child, children) if (!n.isInstanceOf[TerminatingChildrenContainer]) reason match { case Recreation(cause) ⇒ doRecreate(cause, actor) // doRecreate since this is the continuation of "recreate" @@ -886,7 +891,7 @@ private[akka] class ActorCell( case _ ⇒ } case _ ⇒ - remove(child) + removeChild(child) actor.supervisorStrategy.handleChildTerminated(this, child, children) } } catch {