=typ #24279 introduce special startup state for typed guardian actor

This allows the untyped actorsystem to be initialized completely before starting up
the typed guardian code.
This commit is contained in:
Johannes Rudolph 2018-01-24 15:28:22 +01:00
parent c6965edc21
commit 79a111d64b
No known key found for this signature in database
GPG key ID: 4D293A24CCD39E19
5 changed files with 88 additions and 16 deletions

View file

@ -4,6 +4,7 @@
package akka.actor.typed
package internal
import akka.Done
import akka.actor.InvalidMessageException
import akka.actor.typed.scaladsl.Behaviors
import akka.testkit.typed.TestInbox
@ -54,7 +55,12 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll
// see issue #24172
"shutdown if guardian shuts down immediately" in {
withSystem("shutdown", Behaviors.stopped[String], doTerminate = false) { sys: ActorSystem[String]
val stoppable =
Behaviors.immutable[Done] {
case (ctx, Done) Behaviors.stopped
}
withSystem("shutdown", stoppable, doTerminate = false) { sys: ActorSystem[Done]
sys ! Done
sys.whenTerminated.futureValue
}
}

View file

@ -162,7 +162,7 @@ class AdapterSpec extends AkkaSpec {
for { _ 0 to 10 } {
var system: akka.actor.typed.ActorSystem[NotUsed] = null
try {
system = ActorSystem.create(Behavior.stopped[NotUsed], "AdapterSpec-stopping-guardian")
system = ActorSystem.create(Behaviors.deferred[NotUsed](_ Behavior.stopped[NotUsed]), "AdapterSpec-stopping-guardian")
} finally if (system != null) shutdown(system.toUntyped)
}
}

View file

@ -4,7 +4,7 @@
package akka.actor.typed
import scala.concurrent.ExecutionContext
import akka.{ actor a, event e }
import akka.{ actor a }
import java.util.concurrent.{ CompletionStage, ThreadFactory }
import akka.actor.setup.ActorSystemSetup
@ -18,6 +18,7 @@ import akka.annotation.ApiMayChange
import java.util.Optional
import akka.actor.BootstrapSetup
import akka.actor.typed.internal.adapter.GuardianActorAdapter
import akka.actor.typed.receptionist.Receptionist
/**
@ -211,15 +212,16 @@ object ActorSystem {
executionContext: Option[ExecutionContext] = None): ActorSystem[T] = {
Behavior.validateAsInitial(guardianBehavior)
require(Behavior.isAlive(guardianBehavior))
val cl = classLoader.getOrElse(akka.actor.ActorSystem.findClassLoader())
val appConfig = config.getOrElse(ConfigFactory.load(cl))
val setup = ActorSystemSetup(BootstrapSetup(classLoader, config, executionContext))
val untyped = new a.ActorSystemImpl(name, appConfig, cl, executionContext,
Some(PropsAdapter(() guardianBehavior, guardianProps)), setup)
Some(PropsAdapter(() guardianBehavior, guardianProps, isGuardian = true)), setup)
untyped.start()
val adapter: ActorSystemAdapter.AdapterExtension = ActorSystemAdapter.AdapterExtension(untyped)
adapter.adapter
untyped.guardian ! GuardianActorAdapter.Start
ActorSystemAdapter.AdapterExtension(untyped).adapter
}
/**

View file

@ -6,7 +6,6 @@ package internal
package adapter
import scala.annotation.tailrec
import akka.{ actor a }
import akka.annotation.InternalApi
import akka.util.OptionVal
@ -20,13 +19,16 @@ import akka.util.OptionVal
var behavior: Behavior[T] = _initialBehavior
if (!isAlive(behavior)) context.stop(self)
val ctx = new ActorContextAdapter[T](context)
var _ctx: ActorContextAdapter[T] = _
def ctx: ActorContextAdapter[T] =
if (_ctx ne null) _ctx
else throw new IllegalStateException("Context was accessed before typed actor was started.")
var failures: Map[a.ActorRef, Throwable] = Map.empty
def receive = {
def receive = running
def running: Receive = {
case a.Terminated(ref)
val msg =
if (failures contains ref) {
@ -109,7 +111,15 @@ import akka.util.OptionVal
a.SupervisorStrategy.Stop
}
override def preStart(): Unit = {
override def preStart(): Unit =
if (!isAlive(behavior))
context.stop(self)
else
start()
protected def start(): Unit = {
context.become(running)
initializeContext()
behavior = validateAsInitial(undefer(behavior, ctx))
if (!isAlive(behavior)) context.stop(self)
}
@ -120,6 +130,7 @@ import akka.util.OptionVal
}
override def postRestart(reason: Throwable): Unit = {
initializeContext()
behavior = validateAsInitial(undefer(behavior, ctx))
if (!isAlive(behavior)) context.stop(self)
}
@ -135,6 +146,57 @@ import akka.util.OptionVal
}
case b Behavior.interpretSignal(b, ctx, PostStop)
}
behavior = Behavior.stopped
}
protected def initializeContext(): Unit = {
_ctx = new ActorContextAdapter[T](context)
}
}
/**
* INTERNAL API
*
* A special adapter for the guardian which will defer processing until a special `Start` signal has been received.
* That will allow to defer typed processing until the untyped ActorSystem has completely started up.
*/
@InternalApi
private[typed] class GuardianActorAdapter[T](_initialBehavior: Behavior[T]) extends ActorAdapter[T](_initialBehavior) {
import Behavior._
override def preStart(): Unit =
if (!isAlive(behavior))
context.stop(self)
else
context.become(waitingForStart(Nil))
def waitingForStart(stashed: List[Any]): Receive = {
case GuardianActorAdapter.Start
start()
stashed.reverse.foreach(receive)
case other
// unlikely to happen but not impossible
context.become(waitingForStart(other :: stashed))
}
override def postRestart(reason: Throwable): Unit = {
initializeContext()
super.postRestart(reason)
}
override def postStop(): Unit = {
initializeContext()
super.postStop()
}
}
/**
* INTERNAL API
*/
@InternalApi private[typed] object GuardianActorAdapter {
case object Start
}

View file

@ -12,10 +12,12 @@ import akka.annotation.InternalApi
* INTERNAL API
*/
@InternalApi private[akka] object PropsAdapter {
def apply[T](behavior: () Behavior[T], deploy: Props = Props.empty): akka.actor.Props = {
val props = akka.actor.Props(new ActorAdapter(behavior()))
def apply[T](behavior: () Behavior[T], deploy: Props = Props.empty, isGuardian: Boolean = false): akka.actor.Props = {
val props =
if (isGuardian)
akka.actor.Props(new GuardianActorAdapter(behavior()))
else
akka.actor.Props(new ActorAdapter(behavior()))
(deploy.firstOrElse[DispatcherSelector](DispatcherDefault()) match {
case _: DispatcherDefault props