Roland found a nasty race between activate in supervisor and locally, this commit should fix it, and rename activate to point
This commit is contained in:
parent
9c66407b66
commit
84aef55846
6 changed files with 22 additions and 23 deletions
|
|
@ -361,10 +361,10 @@ private[akka] class ActorCell(
|
|||
case null ⇒ faultResume(inRespToFailure)
|
||||
case w: WaitingForChildren ⇒ w.enqueue(message)
|
||||
}
|
||||
case Terminate() ⇒ terminate()
|
||||
case Supervise(child, uid) ⇒ supervise(child, uid)
|
||||
case ChildTerminated(child) ⇒ todo = handleChildTerminated(child)
|
||||
case NoMessage ⇒ // only here to suppress warning
|
||||
case Terminate() ⇒ terminate()
|
||||
case Supervise(child, async, uid) ⇒ supervise(child, async, uid)
|
||||
case ChildTerminated(child) ⇒ todo = handleChildTerminated(child)
|
||||
case NoMessage ⇒ // only here to suppress warning
|
||||
}
|
||||
} catch {
|
||||
case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(Nil, e, "error while processing " + message)
|
||||
|
|
@ -492,21 +492,21 @@ private[akka] class ActorCell(
|
|||
}
|
||||
}
|
||||
|
||||
private def supervise(child: ActorRef, uid: Int): Unit = if (!isTerminating) {
|
||||
private def supervise(child: ActorRef, async: Boolean, uid: Int): Unit = if (!isTerminating) {
|
||||
// Supervise is the first thing we get from a new child, so store away the UID for later use in handleFailure()
|
||||
initChild(child) match {
|
||||
case Some(crs) ⇒
|
||||
crs.uid = uid
|
||||
handleSupervise(child)
|
||||
handleSupervise(child, async)
|
||||
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
|
||||
case None ⇒ publish(Error(self.path.toString, clazz(actor), "received Supervise from unregistered child " + child + ", this will not end well"))
|
||||
}
|
||||
}
|
||||
|
||||
// future extension point
|
||||
protected def handleSupervise(child: ActorRef): Unit = child match {
|
||||
case r: RepointableActorRef ⇒ r.activate()
|
||||
case _ ⇒
|
||||
protected def handleSupervise(child: ActorRef, async: Boolean): Unit = child match {
|
||||
case r: RepointableActorRef if async ⇒ r.point()
|
||||
case _ ⇒
|
||||
}
|
||||
|
||||
final protected def clearActorFields(actorInstance: Actor): Unit = {
|
||||
|
|
|
|||
|
|
@ -381,7 +381,7 @@ class LocalActorRefProvider(
|
|||
|
||||
override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff {
|
||||
message match {
|
||||
case Supervise(_, _) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead
|
||||
case Supervise(_, _, _) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead
|
||||
case ChildTerminated(_) ⇒ stop()
|
||||
case _ ⇒ log.error(this + " received unexpected system message [" + message + "]")
|
||||
}
|
||||
|
|
@ -585,14 +585,13 @@ class LocalActorRefProvider(
|
|||
if (settings.DebugRouterMisconfiguration && deployer.lookup(path).isDefined)
|
||||
log.warning("Configuration says that {} should be a router, but code disagrees. Remove the config or add a routerConfig to its Props.")
|
||||
|
||||
if (async) new RepointableActorRef(system, props, supervisor, path).initialize()
|
||||
if (async) new RepointableActorRef(system, props, supervisor, path).initialize(async)
|
||||
else new LocalActorRef(system, props, supervisor, path)
|
||||
case router ⇒
|
||||
val lookup = if (lookupDeploy) deployer.lookup(path) else None
|
||||
val fromProps = Iterator(props.deploy.copy(routerConfig = props.deploy.routerConfig withFallback router))
|
||||
val d = fromProps ++ deploy.iterator ++ lookup.iterator reduce ((a, b) ⇒ b withFallback a)
|
||||
val ref = new RoutedActorRef(system, props.withRouter(d.routerConfig), supervisor, path).initialize()
|
||||
if (async) ref else ref.activate()
|
||||
new RoutedActorRef(system, props.withRouter(d.routerConfig), supervisor, path).initialize(async)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -53,14 +53,15 @@ private[akka] class RepointableActorRef(
|
|||
*
|
||||
* This is protected so that others can have different initialization.
|
||||
*/
|
||||
def initialize(): this.type =
|
||||
def initialize(async: Boolean): this.type =
|
||||
underlying match {
|
||||
case null ⇒
|
||||
val uid = ThreadLocalRandom.current.nextInt()
|
||||
swapCell(new UnstartedCell(system, this, props, supervisor, uid))
|
||||
supervisor.sendSystemMessage(Supervise(this, uid))
|
||||
supervisor.sendSystemMessage(Supervise(this, async, uid))
|
||||
if (!async) point()
|
||||
this
|
||||
case other ⇒ this
|
||||
case other ⇒ throw new IllegalStateException("initialize called more than once!")
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -69,7 +70,7 @@ private[akka] class RepointableActorRef(
|
|||
* modification of the `underlying` field, though it is safe to send messages
|
||||
* at any time.
|
||||
*/
|
||||
def activate(): this.type =
|
||||
def point(): this.type =
|
||||
underlying match {
|
||||
case u: UnstartedCell ⇒ u.replaceWith(newCell(u)); this
|
||||
case null ⇒ throw new IllegalStateException("underlying cell is null")
|
||||
|
|
@ -80,9 +81,8 @@ private[akka] class RepointableActorRef(
|
|||
* This is called by activate() to obtain the cell which is to replace the
|
||||
* unstarted cell. The cell must be fully functional.
|
||||
*/
|
||||
def newCell(old: Cell): Cell =
|
||||
new ActorCell(system, this, props, supervisor).
|
||||
init(old.asInstanceOf[UnstartedCell].uid, sendSupervise = false).start()
|
||||
def newCell(old: UnstartedCell): Cell =
|
||||
new ActorCell(system, this, props, supervisor).init(old.uid, sendSupervise = false).start()
|
||||
|
||||
def start(): Unit = ()
|
||||
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ private[akka] trait Dispatch { this: ActorCell ⇒
|
|||
|
||||
if (sendSupervise) {
|
||||
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
||||
parent.sendSystemMessage(akka.dispatch.Supervise(self, uid))
|
||||
parent.sendSystemMessage(akka.dispatch.Supervise(self, async = false, uid))
|
||||
parent ! NullMessage // read ScalaDoc of NullMessage to see why
|
||||
}
|
||||
this
|
||||
|
|
|
|||
|
|
@ -108,7 +108,7 @@ private[akka] case class Terminate() extends SystemMessage // sent to self from
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] case class Supervise(child: ActorRef, uid: Int) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start
|
||||
private[akka] case class Supervise(child: ActorRef, async: Boolean, uid: Int) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
|
|||
|
||||
_props.routerConfig.verifyConfig()
|
||||
|
||||
override def newCell(old: Cell): Cell = new RoutedActorCell(system, this, props, supervisor, old.asInstanceOf[UnstartedCell].uid)
|
||||
override def newCell(old: UnstartedCell): Cell = new RoutedActorCell(system, this, props, supervisor, old.uid)
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue