diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 9cf2b5b3df..9b2ef58d04 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -361,10 +361,10 @@ private[akka] class ActorCell( case null ⇒ faultResume(inRespToFailure) case w: WaitingForChildren ⇒ w.enqueue(message) } - case Terminate() ⇒ terminate() - case Supervise(child, uid) ⇒ supervise(child, uid) - case ChildTerminated(child) ⇒ todo = handleChildTerminated(child) - case NoMessage ⇒ // only here to suppress warning + case Terminate() ⇒ terminate() + case Supervise(child, async, uid) ⇒ supervise(child, async, uid) + case ChildTerminated(child) ⇒ todo = handleChildTerminated(child) + case NoMessage ⇒ // only here to suppress warning } } catch { case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(Nil, e, "error while processing " + message) @@ -492,21 +492,21 @@ private[akka] class ActorCell( } } - private def supervise(child: ActorRef, uid: Int): Unit = if (!isTerminating) { + private def supervise(child: ActorRef, async: Boolean, uid: Int): Unit = if (!isTerminating) { // Supervise is the first thing we get from a new child, so store away the UID for later use in handleFailure() initChild(child) match { case Some(crs) ⇒ crs.uid = uid - handleSupervise(child) + handleSupervise(child, async) if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) case None ⇒ publish(Error(self.path.toString, clazz(actor), "received Supervise from unregistered child " + child + ", this will not end well")) } } // future extension point - protected def handleSupervise(child: ActorRef): Unit = child match { - case r: RepointableActorRef ⇒ r.activate() - case _ ⇒ + protected def handleSupervise(child: ActorRef, async: Boolean): Unit = child match { + case r: RepointableActorRef if async ⇒ r.point() + case _ ⇒ } final protected def clearActorFields(actorInstance: Actor): Unit = { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index b0b4c3d939..5c6cfc640c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -381,7 +381,7 @@ class LocalActorRefProvider( override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff { message match { - case Supervise(_, _) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead + case Supervise(_, _, _) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead case ChildTerminated(_) ⇒ stop() case _ ⇒ log.error(this + " received unexpected system message [" + message + "]") } @@ -585,14 +585,13 @@ class LocalActorRefProvider( if (settings.DebugRouterMisconfiguration && deployer.lookup(path).isDefined) log.warning("Configuration says that {} should be a router, but code disagrees. Remove the config or add a routerConfig to its Props.") - if (async) new RepointableActorRef(system, props, supervisor, path).initialize() + if (async) new RepointableActorRef(system, props, supervisor, path).initialize(async) else new LocalActorRef(system, props, supervisor, path) case router ⇒ val lookup = if (lookupDeploy) deployer.lookup(path) else None val fromProps = Iterator(props.deploy.copy(routerConfig = props.deploy.routerConfig withFallback router)) val d = fromProps ++ deploy.iterator ++ lookup.iterator reduce ((a, b) ⇒ b withFallback a) - val ref = new RoutedActorRef(system, props.withRouter(d.routerConfig), supervisor, path).initialize() - if (async) ref else ref.activate() + new RoutedActorRef(system, props.withRouter(d.routerConfig), supervisor, path).initialize(async) } } diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 8621ca116e..ddc49b1e22 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -53,14 +53,15 @@ private[akka] class RepointableActorRef( * * This is protected so that others can have different initialization. */ - def initialize(): this.type = + def initialize(async: Boolean): this.type = underlying match { case null ⇒ val uid = ThreadLocalRandom.current.nextInt() swapCell(new UnstartedCell(system, this, props, supervisor, uid)) - supervisor.sendSystemMessage(Supervise(this, uid)) + supervisor.sendSystemMessage(Supervise(this, async, uid)) + if (!async) point() this - case other ⇒ this + case other ⇒ throw new IllegalStateException("initialize called more than once!") } /** @@ -69,7 +70,7 @@ private[akka] class RepointableActorRef( * modification of the `underlying` field, though it is safe to send messages * at any time. */ - def activate(): this.type = + def point(): this.type = underlying match { case u: UnstartedCell ⇒ u.replaceWith(newCell(u)); this case null ⇒ throw new IllegalStateException("underlying cell is null") @@ -80,9 +81,8 @@ private[akka] class RepointableActorRef( * This is called by activate() to obtain the cell which is to replace the * unstarted cell. The cell must be fully functional. */ - def newCell(old: Cell): Cell = - new ActorCell(system, this, props, supervisor). - init(old.asInstanceOf[UnstartedCell].uid, sendSupervise = false).start() + def newCell(old: UnstartedCell): Cell = + new ActorCell(system, this, props, supervisor).init(old.uid, sendSupervise = false).start() def start(): Unit = () diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala index aefd2bcc55..469aac78c2 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala @@ -55,7 +55,7 @@ private[akka] trait Dispatch { this: ActorCell ⇒ if (sendSupervise) { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - parent.sendSystemMessage(akka.dispatch.Supervise(self, uid)) + parent.sendSystemMessage(akka.dispatch.Supervise(self, async = false, uid)) parent ! NullMessage // read ScalaDoc of NullMessage to see why } this diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 1b9de36e77..8f13e5fa11 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -108,7 +108,7 @@ private[akka] case class Terminate() extends SystemMessage // sent to self from /** * INTERNAL API */ -private[akka] case class Supervise(child: ActorRef, uid: Int) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start +private[akka] case class Supervise(child: ActorRef, async: Boolean, uid: Int) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start /** * INTERNAL API */ diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 49f54c70ae..1363d1d8d6 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -37,7 +37,7 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup _props.routerConfig.verifyConfig() - override def newCell(old: Cell): Cell = new RoutedActorCell(system, this, props, supervisor, old.asInstanceOf[UnstartedCell].uid) + override def newCell(old: UnstartedCell): Cell = new RoutedActorCell(system, this, props, supervisor, old.uid) }