fix typed ActorSystemSpec #26614

This commit is contained in:
Patrik Nordwall 2019-03-27 08:30:42 +01:00 committed by Johan Andrén
parent 38930aa318
commit 076b16485b
3 changed files with 97 additions and 53 deletions

View file

@ -5,16 +5,18 @@
package akka.actor.typed
package internal
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.control.NonFatal
import akka.Done
import akka.actor.InvalidMessageException
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.testkit.typed.scaladsl.TestInbox
import akka.actor.typed.scaladsl.Behaviors
import org.scalatest._
import org.scalatest.concurrent.{ Eventually, ScalaFutures }
import scala.concurrent.duration._
import scala.concurrent.{ Future, Promise }
import scala.util.control.NonFatal
import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.ScalaFutures
class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with ScalaFutures with Eventually {
@ -39,10 +41,10 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
"An ActorSystem" must {
"start the guardian actor and terminate when it terminates" in {
val t = withSystem(
"a",
Behaviors.receive[Probe] { case (_, p) => p.replyTo ! p.message; Behaviors.stopped },
doTerminate = false) { sys =>
val t = withSystem("a", Behaviors.receiveMessage[Probe] { p =>
p.replyTo ! p.message
Behaviors.stopped
}, doTerminate = false) { sys =>
val inbox = TestInbox[String]("a")
sys ! Probe("hello", inbox.ref)
eventually {
@ -58,8 +60,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
// see issue #24172
"shutdown if guardian shuts down immediately" in {
val stoppable =
Behaviors.receive[Done] {
case (context, Done) => Behaviors.stopped
Behaviors.receiveMessage[Done] { _ =>
Behaviors.stopped
}
withSystem("shutdown", stoppable, doTerminate = false) { sys: ActorSystem[Done] =>
sys ! Done
@ -69,21 +71,35 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
"terminate the guardian actor" in {
val inbox = TestInbox[String]("terminate")
val sys = system(
Behaviors
.receive[Probe] {
case (_, _) => Behaviors.unhandled
}
.receiveSignal {
case (_, PostStop) =>
inbox.ref ! "done"
Behaviors.same
},
"terminate")
val sys = system(Behaviors.setup[Any] { _ =>
inbox.ref ! "started"
Behaviors.receiveSignal {
case (_, PostStop) =>
inbox.ref ! "done"
Behaviors.same
}
}, "terminate")
eventually {
inbox.hasMessages should ===(true)
}
inbox.receiveAll() should ===("started" :: Nil)
// now we know that the guardian has started, and should receive PostStop
sys.terminate().futureValue
inbox.receiveAll() should ===("done" :: Nil)
}
"be able to terminate immediately" in {
val sys = system(Behaviors.receiveMessage[Probe] { _ =>
Behaviors.unhandled
}, "terminate")
// for this case the guardian might not have been started before
// the system terminates and then it will not receive PostStop, which
// is OK since it wasn't really started yet
sys.terminate().futureValue
}
"log to the event stream" in {
pending
}

View file

@ -236,7 +236,7 @@ object ActorSystem {
appConfig,
cl,
executionContext,
Some(PropsAdapter(() => new GuardianStartupBehavior(guardianBehavior), guardianProps)),
Some(PropsAdapter[Any](() => new GuardianStartupBehavior(guardianBehavior), guardianProps)),
setup)
system.start()

View file

@ -4,28 +4,38 @@
package akka.cluster.typed
import akka.actor.InvalidMessageException
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, PostStop, Terminated }
import akka.actor.testkit.typed.scaladsl.TestInbox
import com.typesafe.config.ConfigFactory
import org.scalatest._
import org.scalatest.concurrent.{ Eventually, ScalaFutures }
import scala.concurrent.{ Future, Promise }
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.control.NonFatal
import akka.Done
import akka.actor.InvalidMessageException
import akka.actor.testkit.typed.scaladsl.TestInbox
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.PostStop
import akka.actor.typed.Terminated
import akka.actor.typed.scaladsl.Behaviors
import com.typesafe.config.ConfigFactory
import org.scalatest._
import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.ScalaFutures
class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with ScalaFutures with Eventually {
override implicit val patienceConfig = PatienceConfig(1.second)
val config = ConfigFactory.parseString("""
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
""").withFallback(ConfigFactory.load())
akka.actor.provider = cluster
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.remote.artery.canonical.hostname = 127.0.0.1
""")
def system[T](behavior: Behavior[T], name: String) = ActorSystem(behavior, name, config)
def suite = "adapter"
case class Probe(msg: String, replyTo: ActorRef[String])
case class Probe(message: String, replyTo: ActorRef[String])
def withSystem[T](name: String, behavior: Behavior[T], doTerminate: Boolean = true)(
block: ActorSystem[T] => Unit): Terminated = {
@ -42,10 +52,10 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
"An ActorSystem" must {
"start the guardian actor and terminate when it terminates" in {
val t = withSystem(
"a",
Behaviors.receive[Probe] { case (_, p) => p.replyTo ! p.msg; Behaviors.stopped },
doTerminate = false) { sys =>
val t = withSystem("a", Behaviors.receiveMessage[Probe] { p =>
p.replyTo ! p.message
Behaviors.stopped
}, doTerminate = false) { sys =>
val inbox = TestInbox[String]("a")
sys ! Probe("hello", inbox.ref)
eventually {
@ -60,29 +70,47 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
// see issue #24172
"shutdown if guardian shuts down immediately" in {
pending
withSystem("shutdown", Behaviors.stopped[String], doTerminate = false) { sys: ActorSystem[String] =>
val stoppable =
Behaviors.receiveMessage[Done] { _ =>
Behaviors.stopped
}
withSystem("shutdown", stoppable, doTerminate = false) { sys: ActorSystem[Done] =>
sys ! Done
sys.whenTerminated.futureValue
}
}
"terminate the guardian actor" in {
val inbox = TestInbox[String]("terminate")
val sys = system(
Behaviors
.receive[Probe] {
case (_, _) => Behaviors.unhandled
}
.receiveSignal {
case (_, PostStop) =>
inbox.ref ! "done"
Behaviors.same
},
"terminate")
val sys = system(Behaviors.setup[Any] { _ =>
inbox.ref ! "started"
Behaviors.receiveSignal {
case (_, PostStop) =>
inbox.ref ! "done"
Behaviors.same
}
}, "terminate")
eventually {
inbox.hasMessages should ===(true)
}
inbox.receiveAll() should ===("started" :: Nil)
// now we know that the guardian has started, and should receive PostStop
sys.terminate().futureValue
inbox.receiveAll() should ===("done" :: Nil)
}
"be able to terminate immediately" in {
val sys = system(Behaviors.receiveMessage[Probe] { _ =>
Behaviors.unhandled
}, "terminate")
// for this case the guardian might not have been started before
// the system terminates and then it will not receive PostStop, which
// is OK since it wasn't really started yet
sys.terminate().futureValue
}
"log to the event stream" in {
pending
}