diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala index 6faa54a610..2b3a461e41 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala @@ -4,6 +4,7 @@ package akka.actor.typed package internal +import akka.Done import akka.actor.InvalidMessageException import akka.actor.typed.scaladsl.Behaviors import akka.testkit.typed.TestInbox @@ -54,7 +55,12 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll // see issue #24172 "shutdown if guardian shuts down immediately" in { - withSystem("shutdown", Behaviors.stopped[String], doTerminate = false) { sys: ActorSystem[String] ⇒ + val stoppable = + Behaviors.immutable[Done] { + case (ctx, Done) ⇒ Behaviors.stopped + } + withSystem("shutdown", stoppable, doTerminate = false) { sys: ActorSystem[Done] ⇒ + sys ! Done sys.whenTerminated.futureValue } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/AdapterSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/AdapterSpec.scala index f58645d027..6687914121 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/AdapterSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/adapter/AdapterSpec.scala @@ -162,7 +162,7 @@ class AdapterSpec extends AkkaSpec { for { _ ← 0 to 10 } { var system: akka.actor.typed.ActorSystem[NotUsed] = null try { - system = ActorSystem.create(Behavior.stopped[NotUsed], "AdapterSpec-stopping-guardian") + system = ActorSystem.create(Behaviors.deferred[NotUsed](_ ⇒ Behavior.stopped[NotUsed]), "AdapterSpec-stopping-guardian") } finally if (system != null) shutdown(system.toUntyped) } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala index a7b20caf71..8053376040 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala @@ -4,7 +4,7 @@ package akka.actor.typed import scala.concurrent.ExecutionContext -import akka.{ actor ⇒ a, event ⇒ e } +import akka.{ actor ⇒ a } import java.util.concurrent.{ CompletionStage, ThreadFactory } import akka.actor.setup.ActorSystemSetup @@ -18,6 +18,7 @@ import akka.annotation.ApiMayChange import java.util.Optional import akka.actor.BootstrapSetup +import akka.actor.typed.internal.adapter.GuardianActorAdapter import akka.actor.typed.receptionist.Receptionist /** @@ -211,15 +212,16 @@ object ActorSystem { executionContext: Option[ExecutionContext] = None): ActorSystem[T] = { Behavior.validateAsInitial(guardianBehavior) + require(Behavior.isAlive(guardianBehavior)) val cl = classLoader.getOrElse(akka.actor.ActorSystem.findClassLoader()) val appConfig = config.getOrElse(ConfigFactory.load(cl)) val setup = ActorSystemSetup(BootstrapSetup(classLoader, config, executionContext)) val untyped = new a.ActorSystemImpl(name, appConfig, cl, executionContext, - Some(PropsAdapter(() ⇒ guardianBehavior, guardianProps)), setup) + Some(PropsAdapter(() ⇒ guardianBehavior, guardianProps, isGuardian = true)), setup) untyped.start() - val adapter: ActorSystemAdapter.AdapterExtension = ActorSystemAdapter.AdapterExtension(untyped) - adapter.adapter + untyped.guardian ! GuardianActorAdapter.Start + ActorSystemAdapter.AdapterExtension(untyped).adapter } /** diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala index 555eb2362a..899cd4f0a5 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala @@ -6,7 +6,6 @@ package internal package adapter import scala.annotation.tailrec - import akka.{ actor ⇒ a } import akka.annotation.InternalApi import akka.util.OptionVal @@ -20,13 +19,16 @@ import akka.util.OptionVal var behavior: Behavior[T] = _initialBehavior - if (!isAlive(behavior)) context.stop(self) - - val ctx = new ActorContextAdapter[T](context) + var _ctx: ActorContextAdapter[T] = _ + def ctx: ActorContextAdapter[T] = + if (_ctx ne null) _ctx + else throw new IllegalStateException("Context was accessed before typed actor was started.") var failures: Map[a.ActorRef, Throwable] = Map.empty - def receive = { + def receive = running + + def running: Receive = { case a.Terminated(ref) ⇒ val msg = if (failures contains ref) { @@ -109,7 +111,15 @@ import akka.util.OptionVal a.SupervisorStrategy.Stop } - override def preStart(): Unit = { + override def preStart(): Unit = + if (!isAlive(behavior)) + context.stop(self) + else + start() + + protected def start(): Unit = { + context.become(running) + initializeContext() behavior = validateAsInitial(undefer(behavior, ctx)) if (!isAlive(behavior)) context.stop(self) } @@ -120,6 +130,7 @@ import akka.util.OptionVal } override def postRestart(reason: Throwable): Unit = { + initializeContext() behavior = validateAsInitial(undefer(behavior, ctx)) if (!isAlive(behavior)) context.stop(self) } @@ -135,6 +146,57 @@ import akka.util.OptionVal } case b ⇒ Behavior.interpretSignal(b, ctx, PostStop) } + behavior = Behavior.stopped } + + protected def initializeContext(): Unit = { + _ctx = new ActorContextAdapter[T](context) + } +} + +/** + * INTERNAL API + * + * A special adapter for the guardian which will defer processing until a special `Start` signal has been received. + * That will allow to defer typed processing until the untyped ActorSystem has completely started up. + */ +@InternalApi +private[typed] class GuardianActorAdapter[T](_initialBehavior: Behavior[T]) extends ActorAdapter[T](_initialBehavior) { + import Behavior._ + + override def preStart(): Unit = + if (!isAlive(behavior)) + context.stop(self) + else + context.become(waitingForStart(Nil)) + + def waitingForStart(stashed: List[Any]): Receive = { + case GuardianActorAdapter.Start ⇒ + start() + + stashed.reverse.foreach(receive) + case other ⇒ + // unlikely to happen but not impossible + context.become(waitingForStart(other :: stashed)) + } + + override def postRestart(reason: Throwable): Unit = { + initializeContext() + + super.postRestart(reason) + } + + override def postStop(): Unit = { + initializeContext() + + super.postStop() + } +} +/** + * INTERNAL API + */ +@InternalApi private[typed] object GuardianActorAdapter { + case object Start + } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/PropsAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/PropsAdapter.scala index 5ec32a4493..8ecdd19ffa 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/PropsAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/PropsAdapter.scala @@ -12,10 +12,12 @@ import akka.annotation.InternalApi * INTERNAL API */ @InternalApi private[akka] object PropsAdapter { - - def apply[T](behavior: () ⇒ Behavior[T], deploy: Props = Props.empty): akka.actor.Props = { - - val props = akka.actor.Props(new ActorAdapter(behavior())) + def apply[T](behavior: () ⇒ Behavior[T], deploy: Props = Props.empty, isGuardian: Boolean = false): akka.actor.Props = { + val props = + if (isGuardian) + akka.actor.Props(new GuardianActorAdapter(behavior())) + else + akka.actor.Props(new ActorAdapter(behavior())) (deploy.firstOrElse[DispatcherSelector](DispatcherDefault()) match { case _: DispatcherDefault ⇒ props