=typ #24279 Add tests for ActorSystem startup / shutdown scenarios
This commit is contained in:
parent
4af523a012
commit
c6965edc21
8 changed files with 202 additions and 36 deletions
|
|
@ -54,7 +54,6 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll
|
|||
|
||||
// see issue #24172
|
||||
"shutdown if guardian shuts down immediately" in {
|
||||
pending
|
||||
withSystem("shutdown", Behaviors.stopped[String], doTerminate = false) { sys: ActorSystem[String] ⇒
|
||||
sys.whenTerminated.futureValue
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,15 +5,15 @@ package akka.actor.typed.scaladsl.adapter
|
|||
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.control.NoStackTrace
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Terminated }
|
||||
import akka.actor.{ InvalidMessageException, Props }
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.Terminated
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.{ actor ⇒ untyped }
|
||||
import akka.{ Done, NotUsed, actor ⇒ untyped }
|
||||
import akka.testkit._
|
||||
import akka.actor.typed.Behavior.UntypedBehavior
|
||||
|
||||
import scala.concurrent.Await
|
||||
|
||||
object AdapterSpec {
|
||||
val untyped1: untyped.Props = untyped.Props(new Untyped1)
|
||||
|
||||
|
|
@ -157,6 +157,31 @@ class AdapterSpec extends AkkaSpec {
|
|||
|
||||
typed1 should be theSameInstanceAs typed2
|
||||
}
|
||||
|
||||
"not crash if guardian is stopped" in {
|
||||
for { _ ← 0 to 10 } {
|
||||
var system: akka.actor.typed.ActorSystem[NotUsed] = null
|
||||
try {
|
||||
system = ActorSystem.create(Behavior.stopped[NotUsed], "AdapterSpec-stopping-guardian")
|
||||
} finally if (system != null) shutdown(system.toUntyped)
|
||||
}
|
||||
}
|
||||
|
||||
"not crash if guardian is stopped very quickly" in {
|
||||
for { _ ← 0 to 10 } {
|
||||
var system: akka.actor.typed.ActorSystem[Done] = null
|
||||
try {
|
||||
system = ActorSystem.create(Behaviors.immutable[Done] { (ctx, msg) ⇒
|
||||
ctx.self ! Done
|
||||
msg match {
|
||||
case Done ⇒ Behaviors.stopped
|
||||
}
|
||||
|
||||
}, "AdapterSpec-stopping-guardian-2")
|
||||
|
||||
} finally if (system != null) shutdown(system.toUntyped)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"Adapted actors" must {
|
||||
|
|
|
|||
|
|
@ -7,16 +7,14 @@ package docs.akka.typed
|
|||
import java.net.URLEncoder
|
||||
import java.nio.charset.StandardCharsets
|
||||
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.ActorSystem
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.Terminated
|
||||
import akka.NotUsed
|
||||
import akka.actor.typed.scaladsl.AskPattern._
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Terminated }
|
||||
import akka.testkit.typed.TestKit
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.Future
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{ Await, Future }
|
||||
//#imports
|
||||
|
||||
import akka.actor.typed.TypedAkkaSpecWithShutdown
|
||||
|
|
@ -107,7 +105,7 @@ class IntroSpec extends TestKit with TypedAkkaSpecWithShutdown {
|
|||
import IntroSpec._
|
||||
|
||||
"Hello world" must {
|
||||
"must say hello" in {
|
||||
"say hello" in {
|
||||
// TODO Implicits.global is not something we would like to encourage in docs
|
||||
//#hello-world
|
||||
import HelloWorld._
|
||||
|
|
@ -128,7 +126,7 @@ class IntroSpec extends TestKit with TypedAkkaSpecWithShutdown {
|
|||
//#hello-world
|
||||
}
|
||||
|
||||
"must chat" in {
|
||||
"chat" in {
|
||||
//#chatroom-gabbler
|
||||
import ChatRoom._
|
||||
|
||||
|
|
@ -152,24 +150,20 @@ class IntroSpec extends TestKit with TypedAkkaSpecWithShutdown {
|
|||
//#chatroom-gabbler
|
||||
|
||||
//#chatroom-main
|
||||
val main: Behavior[String] =
|
||||
val main: Behavior[NotUsed] =
|
||||
Behaviors.deferred { ctx ⇒
|
||||
val chatRoom = ctx.spawn(ChatRoom.behavior, "chatroom")
|
||||
val gabblerRef = ctx.spawn(gabbler, "gabbler")
|
||||
ctx.watch(gabblerRef)
|
||||
chatRoom ! GetSession("ol’ Gabbler", gabblerRef)
|
||||
|
||||
Behaviors.immutablePartial[String] {
|
||||
case (_, "go") ⇒
|
||||
chatRoom ! GetSession("ol’ Gabbler", gabblerRef)
|
||||
Behaviors.same
|
||||
} onSignal {
|
||||
Behaviors.onSignal {
|
||||
case (_, Terminated(ref)) ⇒
|
||||
Behaviors.stopped
|
||||
}
|
||||
}
|
||||
|
||||
val system = ActorSystem(main, "ChatRoomDemo")
|
||||
system ! "go"
|
||||
Await.result(system.whenTerminated, 3.seconds)
|
||||
//#chatroom-main
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import scala.util.control.{ ControlThrowable, NonFatal }
|
|||
import java.util.Optional
|
||||
|
||||
import akka.actor.setup.{ ActorSystemSetup, Setup }
|
||||
import akka.annotation.InternalApi
|
||||
|
||||
import scala.compat.java8.FutureConverters
|
||||
import scala.compat.java8.OptionConverters._
|
||||
|
|
@ -643,6 +644,10 @@ abstract class ExtendedActorSystem extends ActorSystem {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal API
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] class ActorSystemImpl(
|
||||
val name: String,
|
||||
applicationConfig: Config,
|
||||
|
|
|
|||
|
|
@ -10,11 +10,15 @@ import akka.cluster.typed.*;
|
|||
import akka.testkit.typed.javadsl.TestProbe;
|
||||
import docs.akka.cluster.typed.BasicClusterManualSpec;
|
||||
|
||||
//FIXME make these tests
|
||||
public class BasicClusterExampleTest {
|
||||
// FIXME these tests are awaiting typed Java testkit to be able to await cluster forming like in BasicClusterExampleSpec
|
||||
public class BasicClusterExampleTest { // extends JUnitSuite {
|
||||
|
||||
// @Test
|
||||
public void clusterApiExample() {
|
||||
ActorSystem<Object> system = ActorSystem.create(Behaviors.empty(), "ClusterSystem", BasicClusterManualSpec.clusterConfig());
|
||||
ActorSystem<Object> system2 = ActorSystem.create(Behaviors.empty(), "ClusterSystem", BasicClusterManualSpec.clusterConfig());
|
||||
ActorSystem<Object> system = ActorSystem.create(Behaviors.empty(), "ClusterSystem",
|
||||
BasicClusterManualSpec.noPort().withFallback(BasicClusterManualSpec.clusterConfig()));
|
||||
ActorSystem<Object> system2 = ActorSystem.create(Behaviors.empty(), "ClusterSystem",
|
||||
BasicClusterManualSpec.noPort().withFallback(BasicClusterManualSpec.clusterConfig()));
|
||||
|
||||
try {
|
||||
//#cluster-create
|
||||
|
|
@ -26,18 +30,28 @@ public class BasicClusterExampleTest {
|
|||
cluster.manager().tell(Join.create(cluster.selfMember().address()));
|
||||
//#cluster-join
|
||||
|
||||
cluster2.manager().tell(Join.create(cluster.selfMember().address()));
|
||||
|
||||
// TODO wait for/verify cluster to form
|
||||
|
||||
//#cluster-leave
|
||||
cluster2.manager().tell(Leave.create(cluster2.selfMember().address()));
|
||||
//#cluster-leave
|
||||
|
||||
// TODO wait for/verify node 2 leaving
|
||||
|
||||
} finally {
|
||||
system.terminate();
|
||||
system2.terminate();
|
||||
}
|
||||
}
|
||||
|
||||
// @Test
|
||||
public void clusterLeave() throws Exception {
|
||||
ActorSystem<Object> system = ActorSystem.create(Behaviors.empty(), "ClusterSystem", BasicClusterManualSpec.clusterConfig());
|
||||
ActorSystem<Object> system2 = ActorSystem.create(Behaviors.empty(), "ClusterSystem", BasicClusterManualSpec.clusterConfig());
|
||||
ActorSystem<Object> system = ActorSystem.create(Behaviors.empty(), "ClusterSystem",
|
||||
BasicClusterManualSpec.noPort().withFallback(BasicClusterManualSpec.clusterConfig()));
|
||||
ActorSystem<Object> system2 = ActorSystem.create(Behaviors.empty(), "ClusterSystem",
|
||||
BasicClusterManualSpec.noPort().withFallback(BasicClusterManualSpec.clusterConfig()));
|
||||
|
||||
try {
|
||||
Cluster cluster = Cluster.get(system);
|
||||
|
|
|
|||
|
|
@ -6,12 +6,14 @@ import akka.actor.typed.Behavior;
|
|||
import akka.actor.typed.javadsl.Behaviors;
|
||||
import akka.actor.typed.receptionist.Receptionist;
|
||||
import akka.actor.typed.receptionist.ServiceKey;
|
||||
import org.junit.Test;
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
import scala.concurrent.Await;
|
||||
import scala.concurrent.duration.Duration;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class ReceptionistExampleTest {
|
||||
public class ReceptionistExampleTest extends JUnitSuite {
|
||||
|
||||
public static class PingPongExample {
|
||||
//#ping-service
|
||||
|
|
@ -68,6 +70,7 @@ public class ReceptionistExampleTest {
|
|||
//#pinger-guardian
|
||||
}
|
||||
|
||||
@Test
|
||||
public void workPlease() throws Exception {
|
||||
ActorSystem<Receptionist.Listing<PingPongExample.Ping>> system =
|
||||
ActorSystem.create(PingPongExample.guardian(), "ReceptionistExample");
|
||||
|
|
|
|||
|
|
@ -0,0 +1,131 @@
|
|||
/**
|
||||
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
package akka.cluster.typed
|
||||
|
||||
import akka.actor.InvalidMessageException
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.ActorSystem
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.{ PostStop, Terminated }
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.testkit.typed.TestInbox
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest._
|
||||
import org.scalatest.concurrent.Eventually
|
||||
import org.scalatest.concurrent.ScalaFutures
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.Promise
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll
|
||||
with ScalaFutures with Eventually {
|
||||
|
||||
override implicit val patienceConfig = PatienceConfig(1.second)
|
||||
val config = ConfigFactory.parseString(
|
||||
"""
|
||||
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
""").withFallback(ConfigFactory.load())
|
||||
def system[T](behavior: Behavior[T], name: String) = ActorSystem(behavior, name, config)
|
||||
def suite = "adapter"
|
||||
|
||||
case class Probe(msg: String, replyTo: ActorRef[String])
|
||||
|
||||
def withSystem[T](name: String, behavior: Behavior[T], doTerminate: Boolean = true)(block: ActorSystem[T] ⇒ Unit): Terminated = {
|
||||
val sys = system(behavior, s"$suite-$name")
|
||||
try {
|
||||
block(sys)
|
||||
if (doTerminate) sys.terminate().futureValue else sys.whenTerminated.futureValue
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒
|
||||
sys.terminate()
|
||||
throw ex
|
||||
}
|
||||
}
|
||||
|
||||
"An ActorSystem" must {
|
||||
"start the guardian actor and terminate when it terminates" in {
|
||||
val t = withSystem(
|
||||
"a",
|
||||
Behaviors.immutable[Probe] { case (_, p) ⇒ p.replyTo ! p.msg; Behaviors.stopped }, doTerminate = false) { sys ⇒
|
||||
val inbox = TestInbox[String]("a")
|
||||
sys ! Probe("hello", inbox.ref)
|
||||
eventually {
|
||||
inbox.hasMessages should ===(true)
|
||||
}
|
||||
inbox.receiveAll() should ===("hello" :: Nil)
|
||||
}
|
||||
val p = t.ref.path
|
||||
p.name should ===("/")
|
||||
p.address.system should ===(suite + "-a")
|
||||
}
|
||||
|
||||
// see issue #24172
|
||||
"shutdown if guardian shuts down immediately" in {
|
||||
pending
|
||||
withSystem("shutdown", Behaviors.stopped[String], doTerminate = false) { sys: ActorSystem[String] ⇒
|
||||
sys.whenTerminated.futureValue
|
||||
}
|
||||
}
|
||||
|
||||
"terminate the guardian actor" in {
|
||||
val inbox = TestInbox[String]("terminate")
|
||||
val sys = system(
|
||||
Behaviors.immutable[Probe] {
|
||||
case (_, _) ⇒ Behaviors.unhandled
|
||||
} onSignal {
|
||||
case (_, PostStop) ⇒
|
||||
inbox.ref ! "done"
|
||||
Behaviors.same
|
||||
},
|
||||
"terminate")
|
||||
sys.terminate().futureValue
|
||||
inbox.receiveAll() should ===("done" :: Nil)
|
||||
}
|
||||
|
||||
"log to the event stream" in {
|
||||
pending
|
||||
}
|
||||
|
||||
"have a name" in {
|
||||
withSystem("name", Behaviors.empty[String]) { sys ⇒
|
||||
sys.name should ===(suite + "-name")
|
||||
}
|
||||
}
|
||||
|
||||
"report its uptime" in {
|
||||
withSystem("uptime", Behaviors.empty[String]) { sys ⇒
|
||||
sys.uptime should be < 1L
|
||||
Thread.sleep(2000)
|
||||
sys.uptime should be >= 1L
|
||||
}
|
||||
}
|
||||
|
||||
"have a working thread factory" in {
|
||||
withSystem("thread", Behaviors.empty[String]) { sys ⇒
|
||||
val p = Promise[Int]
|
||||
sys.threadFactory.newThread(new Runnable {
|
||||
def run(): Unit = p.success(42)
|
||||
}).start()
|
||||
p.future.futureValue should ===(42)
|
||||
}
|
||||
}
|
||||
|
||||
"be able to run Futures" in {
|
||||
withSystem("futures", Behaviors.empty[String]) { sys ⇒
|
||||
val f = Future(42)(sys.executionContext)
|
||||
f.futureValue should ===(42)
|
||||
}
|
||||
}
|
||||
|
||||
"not allow null messages" in {
|
||||
withSystem("null-messages", Behaviors.empty[String]) { sys ⇒
|
||||
intercept[InvalidMessageException] {
|
||||
sys ! null
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -197,16 +197,11 @@ class ReceptionistExampleSpec extends WordSpec with ScalaFutures {
|
|||
|
||||
"A remote basic example" must {
|
||||
"show register" in {
|
||||
// FIXME cannot use guardian as it touches receptionist #24279
|
||||
import scaladsl.adapter._
|
||||
val system1 = akka.actor.ActorSystem("PingPongExample", clusterConfig)
|
||||
val system2 = akka.actor.ActorSystem("PingPongExample", clusterConfig)
|
||||
val system1 = ActorSystem(guardianJustPingService, "PingPongExample", clusterConfig)
|
||||
val system2 = ActorSystem(guardianJustPinger, "PingPongExample", clusterConfig)
|
||||
|
||||
system1.spawnAnonymous(guardianJustPingService)
|
||||
system2.spawnAnonymous(guardianJustPinger)
|
||||
|
||||
val cluster1 = Cluster(system1.toTyped)
|
||||
val cluster2 = Cluster(system2.toTyped)
|
||||
val cluster1 = Cluster(system1)
|
||||
val cluster2 = Cluster(system2)
|
||||
|
||||
cluster1.manager ! Join(cluster1.selfMember.address)
|
||||
cluster1.manager ! Join(cluster2.selfMember.address)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue