ActorCell: move out and reuse children updaters, add stress test

This commit is contained in:
Roland 2012-06-07 15:10:19 +02:00
parent dd596a20cb
commit 8cd11550fa
2 changed files with 86 additions and 57 deletions

View file

@ -28,13 +28,19 @@ object ActorSystemSpec {
class Waves extends Actor {
var master: ActorRef = _
var terminaters = Set[ActorRef]()
def receive = {
case n: Int
master = sender
for (i 1 to n) context.watch(context.system.actorOf(Props[Terminater])) ! "run"
case Terminated(child) if context.actorFor(child.path.parent) == self
if (context.children.isEmpty) {
terminaters = Set() ++ (for (i 1 to n) yield {
val man = context.watch(context.system.actorOf(Props[Terminater]))
man ! "run"
man
})
case Terminated(child) if terminaters contains child
terminaters -= child
if (terminaters.isEmpty) {
master ! "done"
context stop self
}
@ -57,7 +63,7 @@ object ActorSystemSpec {
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExtension$"]""") {
class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExtension$"]""") with ImplicitSender {
"An ActorSystem" must {
@ -154,6 +160,24 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt
Await.result(Future.sequence(waves), timeout.duration + 5.seconds) must be === Seq("done", "done", "done")
}
"reliable deny creation of actors while shutting down" in {
val system = ActorSystem()
system.scheduler.scheduleOnce(200 millis) { system.shutdown() }
var failing = false
var created = Vector.empty[ActorRef]
while (!system.isTerminated) {
try {
val t = system.actorOf(Props[ActorSystemSpec.Terminater])
failing must not be true // because once failing => always failing (its due to shutdown)
created :+= t
} catch {
case e: Exception failing = true
}
}
println(created.last)
created filter (!_.isTerminated) must be(Seq())
}
}
}

View file

@ -202,7 +202,13 @@ private[akka] object ActorCell {
def children: Iterable[ActorRef]
def stats: Iterable[ChildRestartStats]
def shallDie(actor: ActorRef): ChildrenContainer
/**
* reserve that name or throw an exception
*/
def reserve(name: String): ChildrenContainer
/**
* cancel a reservation
*/
def unreserve(name: String): ChildrenContainer
}
@ -233,7 +239,8 @@ private[akka] object ActorCell {
*/
object TerminatedChildrenContainer extends EmptyChildrenContainer {
override def add(child: ActorRef): ChildrenContainer = this
override def reserve(name: String): ChildrenContainer = this
override def reserve(name: String): ChildrenContainer =
throw new IllegalStateException("cannot reserve actor name '" + name + "': already terminated")
}
/**
@ -326,10 +333,13 @@ private[akka] object ActorCell {
def shallDie(actor: ActorRef): ChildrenContainer = copy(toDie = toDie + actor)
def reserve(name: String): ChildrenContainer =
if (c contains name)
throw new InvalidActorNameException("actor name " + name + " is not unique!")
else copy(c = c.updated(name, ChildNameReserved))
def reserve(name: String): ChildrenContainer = reason match {
case Termination throw new IllegalStateException("cannot reserve actor name '" + name + "': terminating")
case _
if (c contains name)
throw new InvalidActorNameException("actor name " + name + " is not unique!")
else copy(c = c.updated(name, ChildNameReserved))
}
def unreserve(name: String): ChildrenContainer = c get name match {
case Some(ChildNameReserved) copy(c = c - name)
@ -394,6 +404,40 @@ private[akka] class ActorCell(
private def swapChildrenRefs(oldChildren: ChildrenContainer, newChildren: ChildrenContainer): Boolean =
Unsafe.instance.compareAndSwapObject(this, childrenOffset, oldChildren, newChildren)
@tailrec private def reserveChild(name: String): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.reserve(name)) || reserveChild(name)
}
@tailrec private def unreserveChild(name: String): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.unreserve(name)) || unreserveChild(name)
}
@tailrec private def addChild(ref: ActorRef): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.add(ref)) || addChild(ref)
}
@tailrec private def shallDie(ref: ActorRef): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.shallDie(ref)) || shallDie(ref)
}
@tailrec private def removeChild(ref: ActorRef): ChildrenContainer = {
val c = childrenRefs
val n = c.remove(ref)
if (swapChildrenRefs(c, n)) n
else removeChild(ref)
}
@tailrec private def setChildrenTerminationReason(reason: SuspendReason): Boolean = {
childrenRefs match {
case c: TerminatingChildrenContainer swapChildrenRefs(c, c.copy(reason = reason)) || setChildrenTerminationReason(reason)
case _ false
}
}
private def isTerminating = childrenRefs match {
case TerminatingChildrenContainer(_, _, Termination) true
case TerminatedChildrenContainer true
@ -418,27 +462,16 @@ private[akka] class ActorCell(
// in case we are currently terminating, swallow creation requests and return EmptyLocalActorRef
if (isTerminating) provider.actorFor(self, Seq(name))
else {
@tailrec def reserve(name: String): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.reserve(name)) || reserve(name)
}
reserve(name)
reserveChild(name)
// this name will either be unreserved or overwritten with a real child below
val actor =
try provider.actorOf(systemImpl, props, self, self.path / name, false, None, true)
catch {
case NonFatal(e)
@tailrec def unreserve(name: String): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.unreserve(name)) || unreserve(name)
}
unreserve(name)
unreserveChild(name)
throw e
}
@tailrec def add(ref: ActorRef): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.add(ref)) || add(ref)
}
add(actor)
addChild(actor)
actor
}
}
@ -457,10 +490,6 @@ private[akka] class ActorCell(
}
final def stop(actor: ActorRef): Unit = {
@tailrec def shallDie(ref: ActorRef): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.shallDie(ref)) || shallDie(ref)
}
if (childrenRefs.getByRef(actor).isDefined) shallDie(actor)
actor.asInstanceOf[InternalActorRef].stop()
}
@ -622,13 +651,7 @@ private[akka] class ActorCell(
}
childrenRefs match {
case ct: TerminatingChildrenContainer
@tailrec def rec(cause: Throwable): Boolean = {
childrenRefs match {
case c: TerminatingChildrenContainer swapChildrenRefs(c, c.copy(reason = Recreation(cause))) || rec(cause)
case _ true // cannot happen
}
}
rec(cause)
setChildrenTerminationReason(Recreation(cause))
dispatcher suspend this
case _
doRecreate(cause, failedActor)
@ -686,13 +709,7 @@ private[akka] class ActorCell(
childrenRefs match {
case ct: TerminatingChildrenContainer
@tailrec def rec(): Boolean = {
childrenRefs match {
case c: TerminatingChildrenContainer swapChildrenRefs(c, c.copy(reason = Termination)) || rec()
case _ true // cannot happen
}
}
rec()
setChildrenTerminationReason(Termination)
// do not process normal messages while waiting for all children to terminate
dispatcher suspend this
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopping"))
@ -701,13 +718,7 @@ private[akka] class ActorCell(
}
def supervise(child: ActorRef): Unit = if (!isTerminating) {
if (childrenRefs.getByRef(child).isEmpty) {
@tailrec def add(ref: ActorRef): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.add(ref)) || add(ref)
}
add(child)
}
if (childrenRefs.getByRef(child).isEmpty) addChild(child)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
}
@ -870,15 +881,9 @@ private[akka] class ActorCell(
}
final def handleChildTerminated(child: ActorRef): Unit = try {
@tailrec def remove(ref: ActorRef): ChildrenContainer = {
val c = childrenRefs
val n = c.remove(ref)
if (swapChildrenRefs(c, n)) n
else remove(ref)
}
childrenRefs match {
case tc @ TerminatingChildrenContainer(_, _, reason)
val n = remove(child)
val n = removeChild(child)
actor.supervisorStrategy.handleChildTerminated(this, child, children)
if (!n.isInstanceOf[TerminatingChildrenContainer]) reason match {
case Recreation(cause) doRecreate(cause, actor) // doRecreate since this is the continuation of "recreate"
@ -886,7 +891,7 @@ private[akka] class ActorCell(
case _
}
case _
remove(child)
removeChild(child)
actor.supervisorStrategy.handleChildTerminated(this, child, children)
}
} catch {