Merge pull request #24406 from jrudolph/jr/review-24302

Introduce special starting state in ActorAdapter for guardian actor #24279
This commit is contained in:
Johannes Rudolph 2018-02-01 17:20:02 +01:00 committed by GitHub
commit e4397db44a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 305 additions and 51 deletions

View file

@ -4,6 +4,7 @@
package akka.actor.typed
package internal
import akka.Done
import akka.actor.InvalidMessageException
import akka.actor.typed.scaladsl.Behaviors
import akka.testkit.typed.TestInbox
@ -54,8 +55,12 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll
// see issue #24172
"shutdown if guardian shuts down immediately" in {
pending
withSystem("shutdown", Behaviors.stopped[String], doTerminate = false) { sys: ActorSystem[String]
val stoppable =
Behaviors.immutable[Done] {
case (ctx, Done) Behaviors.stopped
}
withSystem("shutdown", stoppable, doTerminate = false) { sys: ActorSystem[Done]
sys ! Done
sys.whenTerminated.futureValue
}
}

View file

@ -5,15 +5,15 @@ package akka.actor.typed.scaladsl.adapter
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.actor.typed.ActorRef
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Terminated }
import akka.actor.{ InvalidMessageException, Props }
import akka.actor.typed.Behavior
import akka.actor.typed.Terminated
import akka.actor.typed.scaladsl.Behaviors
import akka.{ actor untyped }
import akka.{ Done, NotUsed, actor untyped }
import akka.testkit._
import akka.actor.typed.Behavior.UntypedBehavior
import scala.concurrent.Await
object AdapterSpec {
val untyped1: untyped.Props = untyped.Props(new Untyped1)
@ -157,6 +157,31 @@ class AdapterSpec extends AkkaSpec {
typed1 should be theSameInstanceAs typed2
}
"not crash if guardian is stopped" in {
for { _ 0 to 10 } {
var system: akka.actor.typed.ActorSystem[NotUsed] = null
try {
system = ActorSystem.create(Behaviors.deferred[NotUsed](_ Behavior.stopped[NotUsed]), "AdapterSpec-stopping-guardian")
} finally if (system != null) shutdown(system.toUntyped)
}
}
"not crash if guardian is stopped very quickly" in {
for { _ 0 to 10 } {
var system: akka.actor.typed.ActorSystem[Done] = null
try {
system = ActorSystem.create(Behaviors.immutable[Done] { (ctx, msg)
ctx.self ! Done
msg match {
case Done Behaviors.stopped
}
}, "AdapterSpec-stopping-guardian-2")
} finally if (system != null) shutdown(system.toUntyped)
}
}
}
"Adapted actors" must {

View file

@ -7,16 +7,14 @@ package docs.akka.typed
import java.net.URLEncoder
import java.nio.charset.StandardCharsets
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.Terminated
import akka.NotUsed
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Terminated }
import akka.testkit.typed.TestKit
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.{ Await, Future }
//#imports
import akka.actor.typed.TypedAkkaSpecWithShutdown
@ -107,7 +105,7 @@ class IntroSpec extends TestKit with TypedAkkaSpecWithShutdown {
import IntroSpec._
"Hello world" must {
"must say hello" in {
"say hello" in {
// TODO Implicits.global is not something we would like to encourage in docs
//#hello-world
import HelloWorld._
@ -128,7 +126,7 @@ class IntroSpec extends TestKit with TypedAkkaSpecWithShutdown {
//#hello-world
}
"must chat" in {
"chat" in {
//#chatroom-gabbler
import ChatRoom._
@ -152,24 +150,20 @@ class IntroSpec extends TestKit with TypedAkkaSpecWithShutdown {
//#chatroom-gabbler
//#chatroom-main
val main: Behavior[String] =
val main: Behavior[NotUsed] =
Behaviors.deferred { ctx
val chatRoom = ctx.spawn(ChatRoom.behavior, "chatroom")
val gabblerRef = ctx.spawn(gabbler, "gabbler")
ctx.watch(gabblerRef)
chatRoom ! GetSession("ol Gabbler", gabblerRef)
Behaviors.immutablePartial[String] {
case (_, "go")
chatRoom ! GetSession("ol Gabbler", gabblerRef)
Behaviors.same
} onSignal {
Behaviors.onSignal {
case (_, Terminated(ref))
Behaviors.stopped
}
}
val system = ActorSystem(main, "ChatRoomDemo")
system ! "go"
Await.result(system.whenTerminated, 3.seconds)
//#chatroom-main
}

View file

@ -4,7 +4,7 @@
package akka.actor.typed
import scala.concurrent.ExecutionContext
import akka.{ actor a, event e }
import akka.{ actor a }
import java.util.concurrent.{ CompletionStage, ThreadFactory }
import akka.actor.setup.ActorSystemSetup
@ -18,6 +18,7 @@ import akka.annotation.ApiMayChange
import java.util.Optional
import akka.actor.BootstrapSetup
import akka.actor.typed.internal.adapter.GuardianActorAdapter
import akka.actor.typed.receptionist.Receptionist
/**
@ -211,15 +212,16 @@ object ActorSystem {
executionContext: Option[ExecutionContext] = None): ActorSystem[T] = {
Behavior.validateAsInitial(guardianBehavior)
require(Behavior.isAlive(guardianBehavior))
val cl = classLoader.getOrElse(akka.actor.ActorSystem.findClassLoader())
val appConfig = config.getOrElse(ConfigFactory.load(cl))
val setup = ActorSystemSetup(BootstrapSetup(classLoader, config, executionContext))
val untyped = new a.ActorSystemImpl(name, appConfig, cl, executionContext,
Some(PropsAdapter(() guardianBehavior, guardianProps)), setup)
Some(PropsAdapter(() guardianBehavior, guardianProps, isGuardian = true)), setup)
untyped.start()
val adapter: ActorSystemAdapter.AdapterExtension = ActorSystemAdapter.AdapterExtension(untyped)
adapter.adapter
untyped.guardian ! GuardianActorAdapter.Start
ActorSystemAdapter.AdapterExtension(untyped).adapter
}
/**

View file

@ -6,7 +6,6 @@ package internal
package adapter
import scala.annotation.tailrec
import akka.{ actor a }
import akka.annotation.InternalApi
import akka.util.OptionVal
@ -20,13 +19,16 @@ import akka.util.OptionVal
var behavior: Behavior[T] = _initialBehavior
if (!isAlive(behavior)) context.stop(self)
val ctx = new ActorContextAdapter[T](context)
var _ctx: ActorContextAdapter[T] = _
def ctx: ActorContextAdapter[T] =
if (_ctx ne null) _ctx
else throw new IllegalStateException("Context was accessed before typed actor was started.")
var failures: Map[a.ActorRef, Throwable] = Map.empty
def receive = {
def receive = running
def running: Receive = {
case a.Terminated(ref)
val msg =
if (failures contains ref) {
@ -109,7 +111,15 @@ import akka.util.OptionVal
a.SupervisorStrategy.Stop
}
override def preStart(): Unit = {
override def preStart(): Unit =
if (!isAlive(behavior))
context.stop(self)
else
start()
protected def start(): Unit = {
context.become(running)
initializeContext()
behavior = validateAsInitial(undefer(behavior, ctx))
if (!isAlive(behavior)) context.stop(self)
}
@ -120,6 +130,7 @@ import akka.util.OptionVal
}
override def postRestart(reason: Throwable): Unit = {
initializeContext()
behavior = validateAsInitial(undefer(behavior, ctx))
if (!isAlive(behavior)) context.stop(self)
}
@ -135,6 +146,57 @@ import akka.util.OptionVal
}
case b Behavior.interpretSignal(b, ctx, PostStop)
}
behavior = Behavior.stopped
}
protected def initializeContext(): Unit = {
_ctx = new ActorContextAdapter[T](context)
}
}
/**
* INTERNAL API
*
* A special adapter for the guardian which will defer processing until a special `Start` signal has been received.
* That will allow to defer typed processing until the untyped ActorSystem has completely started up.
*/
@InternalApi
private[typed] class GuardianActorAdapter[T](_initialBehavior: Behavior[T]) extends ActorAdapter[T](_initialBehavior) {
import Behavior._
override def preStart(): Unit =
if (!isAlive(behavior))
context.stop(self)
else
context.become(waitingForStart(Nil))
def waitingForStart(stashed: List[Any]): Receive = {
case GuardianActorAdapter.Start
start()
stashed.reverse.foreach(receive)
case other
// unlikely to happen but not impossible
context.become(waitingForStart(other :: stashed))
}
override def postRestart(reason: Throwable): Unit = {
initializeContext()
super.postRestart(reason)
}
override def postStop(): Unit = {
initializeContext()
super.postStop()
}
}
/**
* INTERNAL API
*/
@InternalApi private[typed] object GuardianActorAdapter {
case object Start
}

View file

@ -29,6 +29,7 @@ import scala.compat.java8.FutureConverters
*/
@InternalApi private[akka] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl)
extends ActorSystem[T] with ActorRef[T] with internal.ActorRefImpl[T] with ExtensionsImpl {
untyped.assertInitialized()
import ActorRefAdapter.sendSystemMessage

View file

@ -12,10 +12,12 @@ import akka.annotation.InternalApi
* INTERNAL API
*/
@InternalApi private[akka] object PropsAdapter {
def apply[T](behavior: () Behavior[T], deploy: Props = Props.empty): akka.actor.Props = {
val props = akka.actor.Props(new ActorAdapter(behavior()))
def apply[T](behavior: () Behavior[T], deploy: Props = Props.empty, isGuardian: Boolean = false): akka.actor.Props = {
val props =
if (isGuardian)
akka.actor.Props(new GuardianActorAdapter(behavior()))
else
akka.actor.Props(new ActorAdapter(behavior()))
(deploy.firstOrElse[DispatcherSelector](DispatcherDefault()) match {
case _: DispatcherDefault props

View file

@ -24,6 +24,7 @@ import scala.util.control.{ ControlThrowable, NonFatal }
import java.util.Optional
import akka.actor.setup.{ ActorSystemSetup, Setup }
import akka.annotation.InternalApi
import scala.compat.java8.FutureConverters
import scala.compat.java8.OptionConverters._
@ -643,6 +644,10 @@ abstract class ExtendedActorSystem extends ActorSystem {
}
/**
* Internal API
*/
@InternalApi
private[akka] class ActorSystemImpl(
val name: String,
applicationConfig: Config,
@ -803,10 +808,25 @@ private[akka] class ActorSystemImpl(
def /(actorName: String): ActorPath = guardian.path / actorName
def /(path: Iterable[String]): ActorPath = guardian.path / path
@volatile private var _initialized = false
/**
* Asserts that the ActorSystem has been fully initialized. Can be used to guard code blocks that might accidentally
* be run during initialization but require a fully initialized ActorSystem before proceeding.
*/
def assertInitialized(): Unit =
if (!_initialized)
throw new IllegalStateException(
"The calling code expected that the ActorSystem was initialized but it wasn't yet. " +
"This is probably a bug in the ActorSystem initialization sequence often related to initialization of extensions. " +
"Please report at https://github.com/akka/akka/issues."
)
private lazy val _start: this.type = try {
registerOnTermination(stopScheduler())
// the provider is expected to start default loggers, LocalActorRefProvider does this
provider.init(this)
// at this point it should be initialized "enough" for most extensions that we might want to guard against otherwise
_initialized = true
if (settings.LogDeadLetters > 0)
logDeadLetterListener = Some(systemActorOf(Props[DeadLetterListener], "deadLetterListener"))
eventStream.startUnsubscriber()

View file

@ -10,11 +10,15 @@ import akka.cluster.typed.*;
import akka.testkit.typed.javadsl.TestProbe;
import docs.akka.cluster.typed.BasicClusterManualSpec;
//FIXME make these tests
public class BasicClusterExampleTest {
// FIXME these tests are awaiting typed Java testkit to be able to await cluster forming like in BasicClusterExampleSpec
public class BasicClusterExampleTest { // extends JUnitSuite {
// @Test
public void clusterApiExample() {
ActorSystem<Object> system = ActorSystem.create(Behaviors.empty(), "ClusterSystem", BasicClusterManualSpec.clusterConfig());
ActorSystem<Object> system2 = ActorSystem.create(Behaviors.empty(), "ClusterSystem", BasicClusterManualSpec.clusterConfig());
ActorSystem<Object> system = ActorSystem.create(Behaviors.empty(), "ClusterSystem",
BasicClusterManualSpec.noPort().withFallback(BasicClusterManualSpec.clusterConfig()));
ActorSystem<Object> system2 = ActorSystem.create(Behaviors.empty(), "ClusterSystem",
BasicClusterManualSpec.noPort().withFallback(BasicClusterManualSpec.clusterConfig()));
try {
//#cluster-create
@ -26,18 +30,28 @@ public class BasicClusterExampleTest {
cluster.manager().tell(Join.create(cluster.selfMember().address()));
//#cluster-join
cluster2.manager().tell(Join.create(cluster.selfMember().address()));
// TODO wait for/verify cluster to form
//#cluster-leave
cluster2.manager().tell(Leave.create(cluster2.selfMember().address()));
//#cluster-leave
// TODO wait for/verify node 2 leaving
} finally {
system.terminate();
system2.terminate();
}
}
// @Test
public void clusterLeave() throws Exception {
ActorSystem<Object> system = ActorSystem.create(Behaviors.empty(), "ClusterSystem", BasicClusterManualSpec.clusterConfig());
ActorSystem<Object> system2 = ActorSystem.create(Behaviors.empty(), "ClusterSystem", BasicClusterManualSpec.clusterConfig());
ActorSystem<Object> system = ActorSystem.create(Behaviors.empty(), "ClusterSystem",
BasicClusterManualSpec.noPort().withFallback(BasicClusterManualSpec.clusterConfig()));
ActorSystem<Object> system2 = ActorSystem.create(Behaviors.empty(), "ClusterSystem",
BasicClusterManualSpec.noPort().withFallback(BasicClusterManualSpec.clusterConfig()));
try {
Cluster cluster = Cluster.get(system);

View file

@ -6,12 +6,14 @@ import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.receptionist.Receptionist;
import akka.actor.typed.receptionist.ServiceKey;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
public class ReceptionistExampleTest {
public class ReceptionistExampleTest extends JUnitSuite {
public static class PingPongExample {
//#ping-service
@ -68,6 +70,7 @@ public class ReceptionistExampleTest {
//#pinger-guardian
}
@Test
public void workPlease() throws Exception {
ActorSystem<Receptionist.Listing<PingPongExample.Ping>> system =
ActorSystem.create(PingPongExample.guardian(), "ReceptionistExample");

View file

@ -0,0 +1,131 @@
/**
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.cluster.typed
import akka.actor.InvalidMessageException
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.{ PostStop, Terminated }
import akka.actor.typed.scaladsl.Behaviors
import akka.testkit.typed.TestInbox
import com.typesafe.config.ConfigFactory
import org.scalatest._
import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.duration._
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.util.control.NonFatal
class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll
with ScalaFutures with Eventually {
override implicit val patienceConfig = PatienceConfig(1.second)
val config = ConfigFactory.parseString(
"""
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
""").withFallback(ConfigFactory.load())
def system[T](behavior: Behavior[T], name: String) = ActorSystem(behavior, name, config)
def suite = "adapter"
case class Probe(msg: String, replyTo: ActorRef[String])
def withSystem[T](name: String, behavior: Behavior[T], doTerminate: Boolean = true)(block: ActorSystem[T] Unit): Terminated = {
val sys = system(behavior, s"$suite-$name")
try {
block(sys)
if (doTerminate) sys.terminate().futureValue else sys.whenTerminated.futureValue
} catch {
case NonFatal(ex)
sys.terminate()
throw ex
}
}
"An ActorSystem" must {
"start the guardian actor and terminate when it terminates" in {
val t = withSystem(
"a",
Behaviors.immutable[Probe] { case (_, p) p.replyTo ! p.msg; Behaviors.stopped }, doTerminate = false) { sys
val inbox = TestInbox[String]("a")
sys ! Probe("hello", inbox.ref)
eventually {
inbox.hasMessages should ===(true)
}
inbox.receiveAll() should ===("hello" :: Nil)
}
val p = t.ref.path
p.name should ===("/")
p.address.system should ===(suite + "-a")
}
// see issue #24172
"shutdown if guardian shuts down immediately" in {
pending
withSystem("shutdown", Behaviors.stopped[String], doTerminate = false) { sys: ActorSystem[String]
sys.whenTerminated.futureValue
}
}
"terminate the guardian actor" in {
val inbox = TestInbox[String]("terminate")
val sys = system(
Behaviors.immutable[Probe] {
case (_, _) Behaviors.unhandled
} onSignal {
case (_, PostStop)
inbox.ref ! "done"
Behaviors.same
},
"terminate")
sys.terminate().futureValue
inbox.receiveAll() should ===("done" :: Nil)
}
"log to the event stream" in {
pending
}
"have a name" in {
withSystem("name", Behaviors.empty[String]) { sys
sys.name should ===(suite + "-name")
}
}
"report its uptime" in {
withSystem("uptime", Behaviors.empty[String]) { sys
sys.uptime should be < 1L
Thread.sleep(2000)
sys.uptime should be >= 1L
}
}
"have a working thread factory" in {
withSystem("thread", Behaviors.empty[String]) { sys
val p = Promise[Int]
sys.threadFactory.newThread(new Runnable {
def run(): Unit = p.success(42)
}).start()
p.future.futureValue should ===(42)
}
}
"be able to run Futures" in {
withSystem("futures", Behaviors.empty[String]) { sys
val f = Future(42)(sys.executionContext)
f.futureValue should ===(42)
}
}
"not allow null messages" in {
withSystem("null-messages", Behaviors.empty[String]) { sys
intercept[InvalidMessageException] {
sys ! null
}
}
}
}
}

View file

@ -197,16 +197,11 @@ class ReceptionistExampleSpec extends WordSpec with ScalaFutures {
"A remote basic example" must {
"show register" in {
// FIXME cannot use guardian as it touches receptionist #24279
import scaladsl.adapter._
val system1 = akka.actor.ActorSystem("PingPongExample", clusterConfig)
val system2 = akka.actor.ActorSystem("PingPongExample", clusterConfig)
val system1 = ActorSystem(guardianJustPingService, "PingPongExample", clusterConfig)
val system2 = ActorSystem(guardianJustPinger, "PingPongExample", clusterConfig)
system1.spawnAnonymous(guardianJustPingService)
system2.spawnAnonymous(guardianJustPinger)
val cluster1 = Cluster(system1.toTyped)
val cluster2 = Cluster(system2.toTyped)
val cluster1 = Cluster(system1)
val cluster2 = Cluster(system2)
cluster1.manager ! Join(cluster1.selfMember.address)
cluster1.manager ! Join(cluster2.selfMember.address)