rework childrenRefs to make context.stop(child) fully asynchronous

- replace TreeMap with custom ChildrenContainer, which has three
  implementations: empty, normal and “terminating” (i.e. waiting for
  some child to terminate)
- split recreate() in the same way as terminate(), so that there is a
  phase during which the suspended actor waits for termination of all
  children which were stopped in preRestart
- do not null out “actor” in ActorCell during restart, because we do
  need the supervisionStrategy and nulling it out does not buy us much
  in this case anyway
- provide new ActorContext.suspendForChildTermination(), which enters
  limbo for as long there are outstanding termination requests; this
  enables code which is very similar to previously (half-working) setups
  with “synchronous” context.stop(child)

docs are still missing, plus a little polishing here and there; oh, and
before I forget: ActorCell NOW is 64 bytes again ;-)
This commit is contained in:
Roland 2012-02-29 21:10:31 +01:00
parent 4c8048588f
commit 64b523638e
13 changed files with 292 additions and 174 deletions

View file

@ -10,6 +10,7 @@ import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout
import akka.pattern.ask import akka.pattern.ask
import akka.util.duration._ import akka.util.duration._
import akka.util.NonFatal
object SupervisorMiscSpec { object SupervisorMiscSpec {
val config = """ val config = """
@ -77,5 +78,81 @@ class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with Defaul
expectMsg("preStart") expectMsg("preStart")
a.isTerminated must be(false) a.isTerminated must be(false)
} }
"be able to recreate child when old child is Terminated" in {
val parent = system.actorOf(Props(new Actor {
val kid = context.watch(context.actorOf(Props.empty, "foo"))
def receive = {
case Terminated(`kid`)
try {
val newKid = context.actorOf(Props.empty, "foo")
val result =
if (newKid eq kid) "Failure: context.actorOf returned the same instance!"
else if (!kid.isTerminated) "Kid is zombie"
else if (newKid.isTerminated) "newKid was stillborn"
else if (kid.path != newKid.path) "The kids do not share the same path"
else "green"
testActor ! result
} catch {
case NonFatal(e) testActor ! e
}
case "engage" context.stop(kid)
}
}))
parent ! "engage"
expectMsg("green")
}
"not be able to recreate child when old child is alive" in {
val parent = system.actorOf(Props(new Actor {
def receive = {
case "engage"
try {
val kid = context.actorOf(Props.empty, "foo")
context.stop(kid)
context.actorOf(Props.empty, "foo")
testActor ! "red"
} catch {
case e: InvalidActorNameException testActor ! "green"
}
}
}))
parent ! "engage"
expectMsg("green")
}
"be able to create a similar kid in the fault handling strategy" in {
val parent = system.actorOf(Props(new Actor {
override val supervisorStrategy = new OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider) {
override def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {
val newKid = context.actorOf(Props.empty, child.path.name)
testActor ! {
if ((newKid ne child) && newKid.path == child.path) "green"
else "red"
}
}
}
def receive = {
case "engage" context.stop(context.actorOf(Props.empty, "Robert"))
}
}))
parent ! "engage"
expectMsg("green")
}
"support suspending until all dying children have properly expired" in {
val parent = system.actorOf(Props(new Actor {
val child = context.actorOf(Props.empty, "bob")
def receive = {
case "engage" context.stop(child); context.suspendForChildTermination(); self ! "next"
case "next" context.actorOf(Props.empty, "bob"); testActor ! "green"
}
}))
parent ! "engage"
expectMsg("green")
}
} }
} }

View file

@ -200,9 +200,10 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
} }
system.stop(supervisor) system.stop(supervisor)
expectMsg(Logging.Debug(sname, `sclass`, "stopping")) expectMsgAllOf(
expectMsg(Logging.Debug(aname, `aclass`, "stopped")) Logging.Debug(aname, aclass, "stopped"),
expectMsg(Logging.Debug(sname, `sclass`, "stopped")) Logging.Debug(sname, sclass, "stopping"),
Logging.Debug(sname, sclass, "stopped"))
} }
} }
} }

View file

@ -118,7 +118,9 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
"use configured nr-of-instances when FromConfig" in { "use configured nr-of-instances when FromConfig" in {
val router = system.actorOf(Props[TestActor].withRouter(FromConfig), "router1") val router = system.actorOf(Props[TestActor].withRouter(FromConfig), "router1")
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3) Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3)
watch(router)
system.stop(router) system.stop(router)
expectMsgType[Terminated]
} }
"use configured nr-of-instances when router is specified" in { "use configured nr-of-instances when router is specified" in {

View file

@ -134,6 +134,14 @@ trait ActorContext extends ActorRefFactory {
*/ */
def unwatch(subject: ActorRef): ActorRef def unwatch(subject: ActorRef): ActorRef
/**
* Suspend this actor (after finishing processing of the current message)
* until all children for which stop(child) has been called have actually
* terminated. This is useful if a new child with the same name needs to
* be created before processing can continue.
*/
def suspendForChildTermination(): Unit
final protected def writeObject(o: ObjectOutputStream): Unit = final protected def writeObject(o: ObjectOutputStream): Unit =
throw new NotSerializableException("ActorContext is not serializable!") throw new NotSerializableException("ActorContext is not serializable!")
} }
@ -166,14 +174,98 @@ private[akka] object ActorCell {
override def initialValue = Stack[ActorContext]() override def initialValue = Stack[ActorContext]()
} }
val emptyChildrenRefs = TreeMap[String, ChildRestartStats]()
final val emptyCancellable: Cancellable = new Cancellable { final val emptyCancellable: Cancellable = new Cancellable {
def isCancelled = false def isCancelled = false
def cancel() {} def cancel() {}
} }
final val emptyReceiveTimeoutData: (Long, Cancellable) = (-1, emptyCancellable) final val emptyReceiveTimeoutData: (Long, Cancellable) = (-1, emptyCancellable)
trait SuspendReason
case object UserRequest extends SuspendReason
case class Recreation(cause: Throwable) extends SuspendReason
case object Termination extends SuspendReason
trait ChildrenContainer {
def add(child: ActorRef): ChildrenContainer
def remove(child: ActorRef): ChildrenContainer
def getByName(name: String): Option[ChildRestartStats]
def getByRef(actor: ActorRef): Option[ChildRestartStats]
def children: Iterable[ActorRef]
def stats: Iterable[ChildRestartStats]
def shallDie(actor: ActorRef): ChildrenContainer
}
object EmptyChildrenContainer extends ChildrenContainer {
def add(child: ActorRef): ChildrenContainer =
new NormalChildrenContainer(TreeMap.empty[String, ChildRestartStats].updated(child.path.name, ChildRestartStats(child)))
def remove(child: ActorRef): ChildrenContainer = this
def getByName(name: String): Option[ChildRestartStats] = None
def getByRef(actor: ActorRef): Option[ChildRestartStats] = None
def children: Iterable[ActorRef] = Nil
def stats: Iterable[ChildRestartStats] = Nil
def shallDie(actor: ActorRef): ChildrenContainer = this
override def toString = "no children"
}
class NormalChildrenContainer(c: TreeMap[String, ChildRestartStats]) extends ChildrenContainer {
def add(child: ActorRef): ChildrenContainer = new NormalChildrenContainer(c.updated(child.path.name, ChildRestartStats(child)))
def remove(child: ActorRef): ChildrenContainer = NormalChildrenContainer(c - child.path.name)
def getByName(name: String): Option[ChildRestartStats] = c get name
def getByRef(actor: ActorRef): Option[ChildRestartStats] = c get actor.path.name match {
case c @ Some(crs) if (crs.child == actor) c
case _ None
}
def children: Iterable[ActorRef] = c.values.view.map(_.child)
def stats: Iterable[ChildRestartStats] = c.values
def shallDie(actor: ActorRef): ChildrenContainer = TerminatingChildrenContainer(c, Set(actor), UserRequest)
override def toString =
if (c.size > 20) c.size + " children"
else c.mkString("children:\n ", "\n ", "")
}
object NormalChildrenContainer {
def apply(c: TreeMap[String, ChildRestartStats]): ChildrenContainer =
if (c.isEmpty) EmptyChildrenContainer
else new NormalChildrenContainer(c)
}
case class TerminatingChildrenContainer(c: TreeMap[String, ChildRestartStats], toDie: Set[ActorRef], reason: SuspendReason)
extends ChildrenContainer {
def add(child: ActorRef): ChildrenContainer = copy(c.updated(child.path.name, ChildRestartStats(child)))
def remove(child: ActorRef): ChildrenContainer =
if (toDie contains child)
if (toDie.size == 1) NormalChildrenContainer(c - child.path.name)
else copy(c - child.path.name, toDie - child)
else copy(c - child.path.name)
def getByName(name: String): Option[ChildRestartStats] = c get name
def getByRef(actor: ActorRef): Option[ChildRestartStats] = c get actor.path.name match {
case c @ Some(crs) if (crs.child == actor) c
case _ None
}
def children: Iterable[ActorRef] = c.values.view.map(_.child)
def stats: Iterable[ChildRestartStats] = c.values
def shallDie(actor: ActorRef): ChildrenContainer = copy(toDie = toDie + actor)
override def toString =
if (c.size > 20) c.size + " children"
else c.mkString("children (" + toDie.size + " terminating):\n ", "\n ", "")
}
} }
//ACTORCELL IS 64bytes and should stay that way unless very good reason not to (machine sympathy, cache line fit) //ACTORCELL IS 64bytes and should stay that way unless very good reason not to (machine sympathy, cache line fit)
@ -221,7 +313,8 @@ private[akka] class ActorCell(
var receiveTimeoutData: (Long, Cancellable) = var receiveTimeoutData: (Long, Cancellable) =
if (_receiveTimeout.isDefined) (_receiveTimeout.get.toMillis, emptyCancellable) else emptyReceiveTimeoutData if (_receiveTimeout.isDefined) (_receiveTimeout.get.toMillis, emptyCancellable) else emptyReceiveTimeoutData
var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs @volatile
var childrenRefs: ChildrenContainer = EmptyChildrenContainer
private def _actorOf(props: Props, name: String): ActorRef = { private def _actorOf(props: Props, name: String): ActorRef = {
if (system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) { if (system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) {
@ -235,7 +328,7 @@ private[akka] class ActorCell(
} }
} }
val actor = provider.actorOf(systemImpl, props, self, self.path / name, false, None, true) val actor = provider.actorOf(systemImpl, props, self, self.path / name, false, None, true)
childrenRefs = childrenRefs.updated(name, ChildRestartStats(actor)) childrenRefs = childrenRefs.add(actor)
actor actor
} }
@ -249,26 +342,20 @@ private[akka] class ActorCell(
case ElementRegex() // this is fine case ElementRegex() // this is fine
case _ throw new InvalidActorNameException("illegal actor name '" + name + "', must conform to " + ElementRegex) case _ throw new InvalidActorNameException("illegal actor name '" + name + "', must conform to " + ElementRegex)
} }
if (childrenRefs contains name) if (childrenRefs.getByName(name).isDefined)
throw new InvalidActorNameException("actor name " + name + " is not unique!") throw new InvalidActorNameException("actor name " + name + " is not unique!")
_actorOf(props, name) _actorOf(props, name)
} }
final def stop(actor: ActorRef): Unit = { final def stop(actor: ActorRef): Unit = {
val a = actor.asInstanceOf[InternalActorRef] if (childrenRefs.getByRef(actor).isDefined) childrenRefs = childrenRefs.shallDie(actor)
if (a.getParent == self && (childrenRefs contains actor.path.name)) { actor.asInstanceOf[InternalActorRef].stop()
system.locker ! a
handleChildTerminated(actor) // will remove child from childrenRefs
}
a.stop()
} }
var currentMessage: Envelope = null var currentMessage: Envelope = null
var actor: Actor = _ var actor: Actor = _
var stopping = false
@volatile //This must be volatile since it isn't protected by the mailbox status @volatile //This must be volatile since it isn't protected by the mailbox status
var mailbox: Mailbox = _ var mailbox: Mailbox = _
@ -328,7 +415,12 @@ private[akka] class ActorCell(
subject subject
} }
final def children: Iterable[ActorRef] = childrenRefs.values.view.map(_.child) final def suspendForChildTermination(): Unit = childrenRefs match {
case _: TerminatingChildrenContainer dispatcher suspend this
case _
}
final def children: Iterable[ActorRef] = childrenRefs.children
/** /**
* Impl UntypedActorContext * Impl UntypedActorContext
@ -391,19 +483,18 @@ private[akka] class ActorCell(
if (failedActor ne null) { if (failedActor ne null) {
val c = currentMessage //One read only plz val c = currentMessage //One read only plz
try { try {
failedActor.preRestart(cause, if (c ne null) Some(c.message) else None) if (failedActor.context ne null) failedActor.preRestart(cause, if (c ne null) Some(c.message) else None)
} finally { } finally {
clearActorFields() clearActorFields()
} }
} }
val freshActor = newActor() // this must happen after failedActor.preRestart (to scrap those children) childrenRefs match {
actor = freshActor // this must happen before postRestart has a chance to fail case ct: TerminatingChildrenContainer
freshActor.postRestart(cause) childrenRefs = ct.copy(reason = Recreation(cause))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(freshActor), "restarted")) dispatcher suspend this
case _
dispatcher.resume(this) //FIXME should this be moved down? doRecreate(cause)
}
actor.supervisorStrategy.handleSupervisorRestarted(cause, self, children)
} catch { } catch {
case NonFatal(e) try { case NonFatal(e) try {
dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"), e)) dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"), e))
@ -422,51 +513,58 @@ private[akka] class ActorCell(
setReceiveTimeout(None) setReceiveTimeout(None)
cancelReceiveTimeout cancelReceiveTimeout
val c = children // stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children)
if (c.isEmpty) doTerminate() for (child children) stop(child)
else {
// do not process normal messages while waiting for all children to terminate childrenRefs match {
dispatcher suspend this case ct: TerminatingChildrenContainer
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopping")) childrenRefs = ct.copy(reason = Termination)
// do not use stop(child) because that would dissociate the children from us, but we still want to wait for them // do not process normal messages while waiting for all children to terminate
for (child c) child.asInstanceOf[InternalActorRef].stop() dispatcher suspend this
stopping = true if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopping"))
case x doTerminate()
} }
} }
def supervise(child: ActorRef): Unit = { def supervise(child: ActorRef): Unit = {
childrenRefs.get(child.path.name) match { if (childrenRefs.getByRef(child).isEmpty) childrenRefs = childrenRefs.add(child)
case None if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
childrenRefs = childrenRefs.updated(child.path.name, ChildRestartStats(child))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
case Some(ChildRestartStats(`child`, _, _))
// this is the nominal case where we created the child and entered it in actorCreated() above
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
case Some(ChildRestartStats(c, _, _))
system.eventStream.publish(Warning(self.path.toString, clazz(actor), "Already supervising other child with same name '" + child.path.name + "', old: " + c + " new: " + child))
}
} }
try { try {
if (stopping) message match { childrenRefs match {
case Terminate() terminate() // to allow retry case TerminatingChildrenContainer(_, _, Termination) message match {
case ChildTerminated(child) handleChildTerminated(child) case Terminate() terminate() // to allow retry
case _ case ChildTerminated(child) handleChildTerminated(child)
} case _
else message match { }
case Create() create() case TerminatingChildrenContainer(_, _, _: Recreation) message match {
case Recreate(cause) recreate(cause) case Link(subject)
case Link(subject) system.deathWatch.subscribe(self, subject)
system.deathWatch.subscribe(self, subject) if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now monitoring " + subject))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now monitoring " + subject)) case Unlink(subject)
case Unlink(subject) system.deathWatch.unsubscribe(self, subject)
system.deathWatch.unsubscribe(self, subject) if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + subject))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + subject)) case Terminate() terminate()
case Suspend() suspend() case Supervise(child) supervise(child)
case Resume() resume() case ChildTerminated(child) handleChildTerminated(child)
case Terminate() terminate() case _
case Supervise(child) supervise(child) }
case ChildTerminated(child) handleChildTerminated(child) case _ message match {
case Create() create()
case Recreate(cause) recreate(cause)
case Link(subject)
system.deathWatch.subscribe(self, subject)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now monitoring " + subject))
case Unlink(subject)
system.deathWatch.unsubscribe(self, subject)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + subject))
case Suspend() suspend()
case Resume() resume()
case Terminate() terminate()
case Supervise(child) supervise(child)
case ChildTerminated(child) handleChildTerminated(child)
}
} }
} catch { } catch {
case NonFatal(e) case NonFatal(e)
@ -544,7 +642,7 @@ private[akka] class ActorCell(
case Kill throw new ActorKilledException("Kill") case Kill throw new ActorKilledException("Kill")
case PoisonPill self.stop() case PoisonPill self.stop()
case SelectParent(m) parent.tell(m, msg.sender) case SelectParent(m) parent.tell(m, msg.sender)
case SelectChildName(name, m) if (childrenRefs contains name) childrenRefs(name).child.tell(m, msg.sender) case SelectChildName(name, m) childrenRefs getByName name foreach (_.child.tell(m, msg.sender))
case SelectChildPattern(p, m) for (c children if p.matcher(c.path.name).matches) c.tell(m, msg.sender) case SelectChildPattern(p, m) for (c children if p.matcher(c.path.name).matches) c.tell(m, msg.sender)
} }
} }
@ -566,22 +664,52 @@ private[akka] class ActorCell(
} finally { } finally {
if (a ne null) a.clearBehaviorStack() if (a ne null) a.clearBehaviorStack()
clearActorFields() clearActorFields()
actor = null
} }
} }
} }
final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.get(child.path.name) match { private def doRecreate(cause: Throwable): Unit = try {
case Some(stats) if stats.child == child if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, childrenRefs.values)) throw cause // after all killed children have terminated, recreate the rest, then go on to start the new instance
case Some(stats) system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child + " matching names but not the same, was: " + stats.child)) actor.supervisorStrategy.handleSupervisorRestarted(cause, self, children)
case None system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child))
val freshActor = newActor()
actor = freshActor // this must happen before postRestart has a chance to fail
freshActor.postRestart(cause)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
dispatcher.resume(this)
} catch {
case NonFatal(e) try {
dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"), e))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
} finally {
parent.tell(Failed(ActorInitializationException(self, "exception during re-creation", e)), self)
}
}
final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.getByRef(child) match {
case Some(stats) if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, childrenRefs.stats)) throw cause
case None system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child))
} }
final def handleChildTerminated(child: ActorRef): Unit = { final def handleChildTerminated(child: ActorRef): Unit = {
if (childrenRefs contains child.path.name) { childrenRefs match {
childrenRefs -= child.path.name case tc @ TerminatingChildrenContainer(_, _, reason)
actor.supervisorStrategy.handleChildTerminated(this, child, children) val n = tc.remove(child)
if (stopping && childrenRefs.isEmpty) doTerminate() childrenRefs = n
} else system.locker ! ChildTerminated(child) actor.supervisorStrategy.handleChildTerminated(this, child, children)
if (!n.isInstanceOf[TerminatingChildrenContainer]) reason match {
case UserRequest if (mailbox.isSuspended) dispatcher resume this
case Recreation(cause) doRecreate(cause)
case Termination doTerminate()
}
case _
childrenRefs = childrenRefs.remove(child)
actor.supervisorStrategy.handleChildTerminated(this, child, children)
}
} }
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
@ -608,7 +736,6 @@ private[akka] class ActorCell(
final def clearActorFields(): Unit = { final def clearActorFields(): Unit = {
setActorFields(context = null, self = system.deadLetters) setActorFields(context = null, self = system.deadLetters)
currentMessage = null currentMessage = null
actor = null
} }
final def setActorFields(context: ActorContext, self: ActorRef) { final def setActorFields(context: ActorContext, self: ActorRef) {
@ -639,3 +766,4 @@ private[akka] class ActorCell(
private final def clazz(o: AnyRef): Class[_] = if (o eq null) this.getClass else o.getClass private final def clazz(o: AnyRef): Class[_] = if (o eq null) this.getClass else o.getClass
} }

View file

@ -284,14 +284,11 @@ private[akka] class LocalActorRef private[akka] (
* Method for looking up a single child beneath this actor. Override in order * Method for looking up a single child beneath this actor. Override in order
* to inject synthetic actor paths like /temp. * to inject synthetic actor paths like /temp.
*/ */
protected def getSingleChild(name: String): InternalActorRef = { protected def getSingleChild(name: String): InternalActorRef =
if (actorCell.isTerminated) Nobody // read of the mailbox status ensures we get the latest childrenRefs actorCell.childrenRefs.getByName(name) match {
else { case Some(crs) crs.child.asInstanceOf[InternalActorRef]
val children = actorCell.childrenRefs case None Nobody
if (children contains name) children(name).child.asInstanceOf[InternalActorRef]
else Nobody
} }
}
def getChild(names: Iterator[String]): InternalActorRef = { def getChild(names: Iterator[String]): InternalActorRef = {
/* /*

View file

@ -44,12 +44,6 @@ trait ActorRefProvider {
*/ */
def deathWatch: DeathWatch def deathWatch: DeathWatch
/**
* Care-taker of actor refs which await final termination but cannot be kept
* in their parents children list because the name shall be freed.
*/
def locker: Locker
/** /**
* The root path for all actors within this actor system, including remote * The root path for all actors within this actor system, including remote
* address if enabled. * address if enabled.
@ -333,8 +327,6 @@ class LocalActorRefProvider(
val deathWatch = new LocalDeathWatch(1024) //TODO make configrable val deathWatch = new LocalDeathWatch(1024) //TODO make configrable
val locker: Locker = new Locker(scheduler, settings.ReaperInterval, this, rootPath / "locker", deathWatch)
/* /*
* generate name for temporary actor refs * generate name for temporary actor refs
*/ */

View file

@ -478,8 +478,6 @@ class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Conf
def hasSystemMessages = false def hasSystemMessages = false
} }
def locker: Locker = provider.locker
val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites( val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(
threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess, settings)) threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess, settings))
@ -497,7 +495,6 @@ class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Conf
private lazy val _start: this.type = { private lazy val _start: this.type = {
// the provider is expected to start default loggers, LocalActorRefProvider does this // the provider is expected to start default loggers, LocalActorRefProvider does this
provider.init(this) provider.init(this)
registerOnTermination(locker.shutdown())
registerOnTermination(stopScheduler()) registerOnTermination(stopScheduler())
loadExtensions() loadExtensions()
if (LogConfigOnStart) logConfiguration() if (LogConfigOnStart) logConfiguration()

View file

@ -1,74 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import akka.dispatch._
import akka.util.Duration
import java.util.concurrent.ConcurrentHashMap
import akka.event.DeathWatch
/**
* Internal implementation detail for disposing of orphaned actors.
*/
private[akka] class Locker(
scheduler: Scheduler,
period: Duration,
val provider: ActorRefProvider,
val path: ActorPath,
val deathWatch: DeathWatch) extends MinimalActorRef {
class DavyJones extends Runnable {
def run = {
val iter = heap.entrySet.iterator
while (iter.hasNext) {
val soul = iter.next()
deathWatch.subscribe(Locker.this, soul.getValue) // in case Terminated got lost somewhere
soul.getValue match {
case _: LocalRef // nothing to do, they know what they signed up for
case nonlocal nonlocal.stop() // try again in case it was due to a communications failure
}
}
}
}
private val heap = new ConcurrentHashMap[ActorPath, InternalActorRef]
scheduler.schedule(period, period, new DavyJones)
override def sendSystemMessage(msg: SystemMessage): Unit = this.!(msg)
override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match {
case Terminated(soul) heap.remove(soul.path)
case ChildTerminated(soul) heap.remove(soul.path)
case soul: InternalActorRef
heap.put(soul.path, soul)
deathWatch.subscribe(this, soul)
// now re-bind the soul so that it does not drown its parent
soul match {
case local: LocalActorRef
val cell = local.underlying
cell.parent = this
case _
}
case _ // ignore
}
def childTerminated(parent: ActorRef, ct: ChildTerminated): Unit = {
heap.get(parent.path) match {
case null
case ref ref.sendSystemMessage(ct)
}
}
def shutdown(): Unit = {
import scala.collection.JavaConverters._
for (soul heap.values.asScala) {
soul match {
case l: LocalActorRef l.underlying.dispatcher.detach(l.underlying)
case _
}
}
}
}

View file

@ -187,10 +187,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
var nextMessage = systemDrain() var nextMessage = systemDrain()
try { try {
while ((nextMessage ne null) && !isClosed) { while ((nextMessage ne null) && !isClosed) {
if (debug) println(actor.self + " processing system message " + nextMessage + " with " + if (debug) println(actor.self + " processing system message " + nextMessage + " with " + actor.childrenRefs)
(if (actor.childrenRefs.isEmpty) "no children"
else if (actor.childrenRefs.size > 20) actor.childrenRefs.size + " children"
else actor.childrenRefs.mkString("children:\n ", "\n ", "")))
actor systemInvoke nextMessage actor systemInvoke nextMessage
nextMessage = nextMessage.next nextMessage = nextMessage.next
// dont ever execute normal message when system message present! // dont ever execute normal message when system message present!

View file

@ -233,7 +233,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
"creating actor with Props" in { "creating actor with Props" in {
//#creating-props //#creating-props
import akka.actor.Props import akka.actor.Props
val myActor = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), name = "myactor") val myActor = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), name = "myactor2")
//#creating-props //#creating-props
system.stop(myActor) system.stop(myActor)

View file

@ -40,7 +40,6 @@ class RemoteActorRefProvider(
def log: LoggingAdapter = _log def log: LoggingAdapter = _log
def rootPath = local.rootPath def rootPath = local.rootPath
def locker = local.locker
def deadLetters = local.deadLetters def deadLetters = local.deadLetters
val deathWatch = new RemoteDeathWatch(local.deathWatch, this) val deathWatch = new RemoteDeathWatch(local.deathWatch, this)

View file

@ -274,7 +274,6 @@ trait RemoteMarshallingOps {
case l: LocalRef case l: LocalRef
if (provider.remoteSettings.LogReceive) log.debug("received local message {}", remoteMessage) if (provider.remoteSettings.LogReceive) log.debug("received local message {}", remoteMessage)
remoteMessage.payload match { remoteMessage.payload match {
case ct: ChildTerminated if l.isTerminated provider.locker.childTerminated(l, ct)
case msg: SystemMessage case msg: SystemMessage
if (useUntrustedMode) if (useUntrustedMode)
throw new SecurityException("RemoteModule server is operating is untrusted mode, can not send system message") throw new SecurityException("RemoteModule server is operating is untrusted mode, can not send system message")

View file

@ -31,6 +31,7 @@ object RemoteCommunicationSpec {
} }
} }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RemoteCommunicationSpec extends AkkaSpec(""" class RemoteCommunicationSpec extends AkkaSpec("""
akka { akka {
actor.provider = "akka.remote.RemoteActorRefProvider" actor.provider = "akka.remote.RemoteActorRefProvider"
@ -123,6 +124,8 @@ akka {
myref ! 43 myref ! 43
expectMsg(43) expectMsg(43)
lastSender must be theSameInstanceAs remref lastSender must be theSameInstanceAs remref
r.asInstanceOf[RemoteActorRef].getParent must be(l)
system.actorFor("/user/looker/child") must be theSameInstanceAs r
Await.result(l ? "child/..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l Await.result(l ? "child/..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l
Await.result(system.actorFor(system / "looker" / "child") ? "..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l Await.result(system.actorFor(system / "looker" / "child") ? "..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l
} }