From bd2a3de88a840d27dfaa2e704bb181d1cb883fbb Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Thu, 4 Jan 2018 15:09:38 +0000 Subject: [PATCH] Typed async testkit (#24203) * Typed async testkit * Pull out useful parts from internal TypedSpec into a public TestKit * Port internal tests to use the external testkit --- .../testing/async/BasicAsyncTestingTest.java | 19 +- .../akka/actor/typed/ActorContextSpec.scala | 146 ++++++++++- .../test/scala/akka/actor/typed/AskSpec.scala | 46 ++-- .../scala/akka/actor/typed/BehaviorSpec.scala | 17 +- .../scala/akka/actor/typed/DeferredSpec.scala | 26 +- .../akka/actor/typed/ExtensionsSpec.scala | 2 +- .../akka/actor/typed/PerformanceSpec.scala | 104 -------- .../scala/akka/actor/typed/PropsSpec.scala | 2 +- ...tarterSpec.scala => SupervisionSpec.scala} | 235 ++++++++++-------- .../scala/akka/actor/typed/TimerSpec.scala | 29 ++- .../akka/actor/typed/TypedAkkaSpec.scala | 31 +++ .../scala/akka/actor/typed/TypedSpec.scala | 228 ----------------- .../scala/akka/actor/typed/WatchSpec.scala | 77 +++--- .../internal/MiscMessageSerializerSpec.scala | 10 +- .../receptionist/LocalReceptionistSpec.scala | 104 +++----- .../typed/scaladsl/ImmutablePartialSpec.scala | 17 +- .../actor/typed/scaladsl/OnSignalSpec.scala | 8 +- .../scala/docs/akka/typed/IntroSpec.scala | 4 +- .../docs/akka/typed/MutableIntroSpec.scala | 5 +- .../testing/async/BasicAsyncTestingSpec.scala | 17 +- .../akka/actor/typed/SupervisorStrategy.scala | 2 +- .../akka/actor/typed/scaladsl/Actor.scala | 8 +- .../ClusterShardingPersistenceSpec.scala | 22 +- .../sharding/typed/ClusterShardingSpec.scala | 40 ++- .../typed/ShardingSerializerSpec.scala | 9 +- ...nstantRateEntityRecoveryStrategySpec.scala | 5 +- .../ddata/typed/javadsl/ReplicatorTest.java | 22 +- .../ddata/typed/scaladsl/ReplicatorSpec.scala | 24 +- .../akka/cluster/typed/ClusterApiSpec.scala | 9 +- .../typed/ClusterSingletonApiSpec.scala | 11 +- .../ClusterSingletonPersistenceSpec.scala | 13 +- .../ClusterReceptionistSpec.scala | 16 +- akka-docs/src/main/paradox/actors-typed.md | 6 +- .../main/paradox/cluster-sharding-typed.md | 8 +- .../src/main/paradox/fault-tolerance-typed.md | 20 ++ akka-docs/src/main/paradox/index-typed.md | 1 + akka-docs/src/main/paradox/testing-typed.md | 28 ++- .../typed/scaladsl/PersistentActorSpec.scala | 26 +- .../MiscMessageSerializerSpec.scala | 2 +- .../akka/testkit/typed/BehaviourTestkit.scala | 4 +- .../testkit/typed/StubbedActorContext.scala | 1 + .../scala/akka/testkit/typed/TestKit.scala | 101 ++++++-- .../testkit/typed/BehaviorTestkitSpec.scala | 47 ++-- .../typed/testkit/BehaviorTestkitSpec.scala | 0 44 files changed, 716 insertions(+), 836 deletions(-) delete mode 100644 akka-actor-typed-tests/src/test/scala/akka/actor/typed/PerformanceSpec.scala rename akka-actor-typed-tests/src/test/scala/akka/actor/typed/{RestarterSpec.scala => SupervisionSpec.scala} (63%) create mode 100644 akka-actor-typed-tests/src/test/scala/akka/actor/typed/TypedAkkaSpec.scala delete mode 100644 akka-actor-typed-tests/src/test/scala/akka/actor/typed/TypedSpec.scala create mode 100644 akka-docs/src/main/paradox/fault-tolerance-typed.md create mode 100644 akka-testkit-typed/src/test/scala/akka/typed/testkit/BehaviorTestkitSpec.scala diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/async/BasicAsyncTestingTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/async/BasicAsyncTestingTest.java index 7bbbe8d367..220ca7dc55 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/async/BasicAsyncTestingTest.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/async/BasicAsyncTestingTest.java @@ -4,7 +4,6 @@ package jdocs.akka.typed.testing.async; import akka.actor.typed.ActorRef; -import akka.actor.typed.ActorSystem; import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.Actor; import akka.testkit.typed.javadsl.TestProbe; @@ -15,7 +14,7 @@ import org.junit.Test; //#test-header public class BasicAsyncTestingTest extends TestKit { public BasicAsyncTestingTest() { - super(ActorSystem.create(Actor.empty(), "BasicAsyncTestingTest")); + super("BasicAsyncTestingTest"); } //#test-header @@ -52,11 +51,21 @@ public class BasicAsyncTestingTest extends TestKit { @Test public void testVerifyingAResponse() { - //#test + //#test-spawn TestProbe probe = new TestProbe<>(system(), testkitSettings()); - ActorRef pinger = actorOf(echoActor, "ping"); + ActorRef pinger = spawn(echoActor, "ping"); pinger.tell(new Ping("hello", probe.ref())); probe.expectMsg(new Pong("hello")); - //#test + //#test-spawn + } + + @Test + public void testVerifyingAResponseAnonymous() { + //#test-spawn-anonymous + TestProbe probe = new TestProbe<>(system(), testkitSettings()); + ActorRef pinger = spawn(echoActor); + pinger.tell(new Ping("hello", probe.ref())); + probe.expectMsg(new Pong("hello")); + //#test-spawn-anonymous } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala index 18ecbcae0d..73f53ca513 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala @@ -3,13 +3,19 @@ */ package akka.actor.typed -import scala.concurrent.duration._ -import scala.concurrent.Future +import akka.actor.typed.scaladsl.Actor._ +import akka.actor.typed.scaladsl.{ Actor, AskPattern } +import akka.actor.{ ActorInitializationException, DeadLetterSuppression, InvalidMessageException } +import akka.testkit.AkkaSpec +import akka.testkit.TestEvent.Mute import com.typesafe.config.ConfigFactory -import akka.actor.{ DeadLetterSuppression, InvalidMessageException } -import akka.actor.typed.scaladsl.Actor +import org.scalactic.CanEqual +import scala.concurrent.{ Await, Future } +import scala.concurrent.duration._ import scala.language.existentials +import scala.reflect.ClassTag +import scala.util.control.{ NoStackTrace, NonFatal } object ActorContextSpec { @@ -227,8 +233,8 @@ object ActorContextSpec { case BecomeInert(replyTo) ⇒ replyTo ! BecameInert Actor.immutable[Command] { - case (_, Ping(replyTo)) ⇒ - replyTo ! Pong2 + case (_, Ping(r)) ⇒ + r ! Pong2 Actor.same case (_, Throw(ex)) ⇒ throw ex @@ -257,21 +263,136 @@ object ActorContextSpec { } } + sealed abstract class Start + case object Start extends Start + + sealed trait GuardianCommand + case class RunTest[T](name: String, behavior: Behavior[T], replyTo: ActorRef[Status], timeout: FiniteDuration) extends GuardianCommand + case class Terminate(reply: ActorRef[Status]) extends GuardianCommand + case class Create[T](behavior: Behavior[T], name: String)(val replyTo: ActorRef[ActorRef[T]]) extends GuardianCommand + + sealed trait Status + case object Success extends Status + case class Failed(thr: Throwable) extends Status + case object Timedout extends Status + + class SimulatedException(message: String) extends RuntimeException(message) with NoStackTrace + + def guardian(outstanding: Map[ActorRef[_], ActorRef[Status]] = Map.empty): Behavior[GuardianCommand] = + Actor.immutable[GuardianCommand] { + case (ctx, r: RunTest[t]) ⇒ + val test = ctx.spawn(r.behavior, r.name) + ctx.schedule(r.timeout, r.replyTo, Timedout) + ctx.watch(test) + guardian(outstanding + ((test, r.replyTo))) + case (_, Terminate(reply)) ⇒ + reply ! Success + stopped + case (ctx, c: Create[t]) ⇒ + c.replyTo ! ctx.spawn(c.behavior, c.name) + same + } onSignal { + case (ctx, t @ Terminated(test)) ⇒ + outstanding get test match { + case Some(reply) ⇒ + if (t.failure eq null) reply ! Success + else reply ! Failed(t.failure) + guardian(outstanding - test) + case None ⇒ same + } + case _ ⇒ same + } } -abstract class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( - """|akka { +abstract class ActorContextSpec extends TypedAkkaSpec { + import ActorContextSpec._ + + val config = ConfigFactory.parseString( + """|akka { | loglevel = WARNING | actor.debug { | lifecycle = off | autoreceive = off | } | typed.loggers = ["akka.testkit.typed.TestEventListener"] - |}""".stripMargin)) { + |}""".stripMargin) - import ActorContextSpec._ + implicit lazy val system: ActorSystem[GuardianCommand] = + ActorSystem(guardian(), AkkaSpec.getCallerName(classOf[ActorContextSpec]), config = Some(config withFallback AkkaSpec.testConf)) val expectTimeout = 3.seconds + import AskPattern._ + + implicit def scheduler = system.scheduler + + lazy val blackhole = await(system ? Create(immutable[Any] { case _ ⇒ same }, "blackhole")) + + override def afterAll(): Unit = { + Await.result(system.terminate, timeout.duration) + } + + // TODO remove after basing on ScalaTest 3 with async support + import akka.testkit._ + + def await[T](f: Future[T]): T = Await.result(f, timeout.duration * 1.1) + + /** + * Run an Actor-based test. The test procedure is most conveniently + * formulated using the [[StepWise]] behavior type. + */ + def runTest[T: ClassTag](name: String)(behavior: Behavior[T])(implicit system: ActorSystem[GuardianCommand]): Future[Status] = + system ? (RunTest(name, behavior, _, timeout.duration)) + + // TODO remove after basing on ScalaTest 3 with async support + def sync(f: Future[Status])(implicit system: ActorSystem[GuardianCommand]): Unit = { + def unwrap(ex: Throwable): Throwable = ex match { + case ActorInitializationException(_, _, ex) ⇒ ex + case other ⇒ other + } + + try await(f) match { + case Success ⇒ () + case Failed(ex) ⇒ + unwrap(ex) match { + case ex2: SimulatedException ⇒ + throw ex2 + case _ ⇒ + println(system.printTree) + throw unwrap(ex) + } + case Timedout ⇒ + println(system.printTree) + fail("test timed out") + } catch { + case ex: SimulatedException ⇒ + throw ex + case NonFatal(ex) ⇒ + println(system.printTree) + throw ex + } + } + + def muteExpectedException[T <: Exception: ClassTag]( + message: String = null, + source: String = null, + start: String = "", + pattern: String = null, + occurrences: Int = Int.MaxValue)(implicit system: ActorSystem[GuardianCommand]): EventFilter = { + val filter = EventFilter(message, source, start, pattern, occurrences) + system.eventStream.publish(Mute(filter)) + filter + } + + // for ScalaTest === compare of Class objects + implicit def classEqualityConstraint[A, B]: CanEqual[Class[A], Class[B]] = + new CanEqual[Class[A], Class[B]] { + def areEqual(a: Class[A], b: Class[B]) = a == b + } + + implicit def setEqualityConstraint[A, T <: Set[_ <: A]]: CanEqual[Set[A], T] = + new CanEqual[Set[A], T] { + def areEqual(a: Set[A], b: T) = a == b + } /** * The name for the set of tests to be instantiated, used for keeping the test case actors’ names unique. @@ -286,7 +407,7 @@ abstract class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( private def mySuite: String = suite + "Adapted" def setup(name: String, wrapper: Option[Behavior[Command] ⇒ Behavior[Command]] = None, ignorePostStop: Boolean = true)( - proc: (scaladsl.ActorContext[Event], StepWise.Steps[Event, ActorRef[Command]]) ⇒ StepWise.Steps[Event, _]): Future[TypedSpec.Status] = + proc: (scaladsl.ActorContext[Event], StepWise.Steps[Event, ActorRef[Command]]) ⇒ StepWise.Steps[Event, _]): Future[Status] = runTest(s"$mySuite-$name")(StepWise[Event] { (ctx, startWith) ⇒ val b = behavior(ctx, ignorePostStop) val props = wrapper.map(_(b)).getOrElse(b) @@ -671,7 +792,7 @@ abstract class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( } } -import ActorContextSpec._ +import akka.actor.typed.ActorContextSpec._ class NormalActorContextSpec extends ActorContextSpec { override def suite = "normal" @@ -705,4 +826,3 @@ class TapActorContextSpec extends ActorContextSpec { override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] = Actor.tap((_, _) ⇒ (), (_, _) ⇒ (), subject(ctx.self, ignorePostStop)) } - diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala index e8b24c2d13..427e93ba69 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala @@ -3,13 +3,15 @@ */ package akka.actor.typed -import scala.concurrent.ExecutionContext -import scala.concurrent.duration._ -import org.scalatest.concurrent.ScalaFutures -import akka.util.Timeout -import akka.pattern.AskTimeoutException +import akka.actor.typed.internal.adapter.ActorSystemAdapter +import akka.actor.typed.scaladsl.Actor import akka.actor.typed.scaladsl.Actor._ import akka.actor.typed.scaladsl.AskPattern._ +import akka.pattern.AskTimeoutException +import akka.testkit.typed.TestKit +import org.scalatest.concurrent.ScalaFutures + +import scala.concurrent.ExecutionContext object AskSpec { sealed trait Msg @@ -17,7 +19,8 @@ object AskSpec { final case class Stop(replyTo: ActorRef[Unit]) extends Msg } -class AskSpec extends TypedSpec with ScalaFutures { +class AskSpec extends TestKit("AskSpec") with TypedAkkaSpec with ScalaFutures { + import AskSpec._ implicit def executor: ExecutionContext = @@ -26,43 +29,38 @@ class AskSpec extends TypedSpec with ScalaFutures { val behavior: Behavior[Msg] = immutable[Msg] { case (_, foo: Foo) ⇒ foo.replyTo ! "foo" - same + Actor.same case (_, Stop(r)) ⇒ r ! () - stopped + Actor.stopped } "Ask pattern" must { "must fail the future if the actor is already terminated" in { - val fut = for { - ref ← system ? TypedSpec.Create(behavior, "test1") - _ ← ref ? Stop - answer ← ref.?(Foo("bar"))(Timeout(1.second), implicitly) - } yield answer - fut.recover { case _: AskTimeoutException ⇒ "" }.futureValue should ===("") + val ref = spawn(behavior) + (ref ? Stop).futureValue + val answer = ref ? Foo("bar") + answer.recover { case _: AskTimeoutException ⇒ "ask" }.futureValue should ===("ask") } "must succeed when the actor is alive" in { - val fut = for { - ref ← system ? TypedSpec.Create(behavior, "test2") - answer ← ref ? Foo("bar") - } yield answer - fut.futureValue should ===("foo") + val ref = spawn(behavior) + val response = ref ? Foo("bar") + response.futureValue should ===("foo") } /** See issue #19947 (MatchError with adapted ActorRef) */ "must fail the future if the actor doesn't exist" in { val noSuchActor: ActorRef[Msg] = system match { - case adaptedSys: akka.actor.typed.internal.adapter.ActorSystemAdapter[_] ⇒ + case adaptedSys: ActorSystemAdapter[_] ⇒ import akka.actor.typed.scaladsl.adapter._ adaptedSys.untyped.provider.resolveActorRef("/foo/bar") case _ ⇒ fail("this test must only run in an adapted actor system") } - val fut = for { - answer ← noSuchActor.?(Foo("bar"))(Timeout(1.second), implicitly) - } yield answer - (fut.recover { case _: AskTimeoutException ⇒ "" }).futureValue should ===("") + + val answer = noSuchActor ? Foo("bar") + answer.recover { case _: AskTimeoutException ⇒ "ask" }.futureValue should ===("ask") } } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala index 85a7f9a6ba..eb8cbfe344 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala @@ -61,9 +61,8 @@ object BehaviorSpec { override def next = StateA } - trait Common extends TypedSpec { + trait Common extends TypedAkkaSpec { type Aux >: Null <: AnyRef - def system: ActorSystem[TypedSpec.Command] def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) def checkAux(signal: Signal, aux: Aux): Unit = () def checkAux(command: Command, aux: Aux): Unit = () @@ -339,11 +338,11 @@ object BehaviorSpec { import BehaviorSpec._ -class FullBehaviorSpec extends TypedSpec with Messages with BecomeWithLifecycle with Stoppable { +class FullBehaviorSpec extends TypedAkkaSpec with Messages with BecomeWithLifecycle with Stoppable { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = mkFull(monitor) → null } -class ImmutableBehaviorSpec extends TypedSpec with Messages with BecomeWithLifecycle with Stoppable { +class ImmutableBehaviorSpec extends Messages with BecomeWithLifecycle with Stoppable { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) → null private def behv(monitor: ActorRef[Event], state: State): Behavior[Command] = { SActor.immutable[Command] { @@ -375,7 +374,7 @@ class ImmutableBehaviorSpec extends TypedSpec with Messages with BecomeWithLifec } } -class ImmutableWithSignalScalaBehaviorSpec extends TypedSpec with Messages with BecomeWithLifecycle with Stoppable { +class ImmutableWithSignalScalaBehaviorSpec extends TypedAkkaSpec with Messages with BecomeWithLifecycle with Stoppable { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) → null @@ -411,7 +410,7 @@ class ImmutableWithSignalScalaBehaviorSpec extends TypedSpec with Messages with } } -class ImmutableScalaBehaviorSpec extends TypedSpec with Messages with Become with Stoppable { +class ImmutableScalaBehaviorSpec extends Messages with Become with Stoppable { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) → null @@ -442,7 +441,7 @@ class ImmutableScalaBehaviorSpec extends TypedSpec with Messages with Become wit } } -class MutableScalaBehaviorSpec extends TypedSpec with Messages with Become with Stoppable { +class MutableScalaBehaviorSpec extends Messages with Become with Stoppable { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) → null @@ -518,7 +517,7 @@ class RestarterScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec wi } } -class ImmutableWithSignalJavaBehaviorSpec extends TypedSpec with Messages with BecomeWithLifecycle with Stoppable { +class ImmutableWithSignalJavaBehaviorSpec extends Messages with BecomeWithLifecycle with Stoppable { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) → null def behv(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] = JActor.immutable( @@ -550,7 +549,7 @@ class ImmutableWithSignalJavaBehaviorSpec extends TypedSpec with Messages with B })) } -class ImmutableJavaBehaviorSpec extends TypedSpec with Messages with Become with Stoppable { +class ImmutableJavaBehaviorSpec extends Messages with Become with Stoppable { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) → null def behv(monitor: ActorRef[Event], state: State): Behavior[Command] = JActor.immutable { diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/DeferredSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/DeferredSpec.scala index e3ef98ddac..2ccc321bb0 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/DeferredSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/DeferredSpec.scala @@ -7,7 +7,7 @@ import scala.concurrent.duration._ import scala.util.control.NoStackTrace import akka.actor.typed.scaladsl.Actor import akka.actor.typed.scaladsl.Actor.BehaviorDecorators -import akka.testkit.typed.{ BehaviorTestkit, TestInbox, TestKitSettings } +import akka.testkit.typed.{ BehaviorTestkit, TestInbox, TestKit, TestKitSettings } import akka.testkit.typed.scaladsl._ object DeferredSpec { @@ -26,7 +26,7 @@ object DeferredSpec { }) } -class DeferredSpec extends TypedSpec with StartSupport { +class DeferredSpec extends TestKit with TypedAkkaSpec { import DeferredSpec._ implicit val testSettings = TestKitSettings(system) @@ -39,7 +39,7 @@ class DeferredSpec extends TypedSpec with StartSupport { target(probe.ref) } probe.expectNoMsg(100.millis) // not yet - start(behv) + spawn(behv) // it's supposed to be created immediately (not waiting for first message) probe.expectMsg(Started) } @@ -58,7 +58,7 @@ class DeferredSpec extends TypedSpec with StartSupport { Actor.stopped } } - start(behv) + spawn(behv) probe.expectMsg(Started) probe.expectMsg(Pong) } @@ -74,7 +74,7 @@ class DeferredSpec extends TypedSpec with StartSupport { Actor.stopped } } - start(behv) + spawn(behv) probe.expectMsg(Pong) } @@ -86,7 +86,7 @@ class DeferredSpec extends TypedSpec with StartSupport { target(probe.ref) } } - start(behv) + spawn(behv) probe.expectMsg(Started) } @@ -99,7 +99,7 @@ class DeferredSpec extends TypedSpec with StartSupport { case m ⇒ m } probe.expectNoMsg(100.millis) // not yet - val ref = start(behv) + val ref = spawn(behv) // it's supposed to be created immediately (not waiting for first message) probe.expectMsg(Started) ref ! Ping @@ -115,7 +115,7 @@ class DeferredSpec extends TypedSpec with StartSupport { target(probe.ref) }) probe.expectNoMsg(100.millis) // not yet - val ref = start(behv) + val ref = spawn(behv) // it's supposed to be created immediately (not waiting for first message) probe.expectMsg(Started) ref ! Ping @@ -123,23 +123,19 @@ class DeferredSpec extends TypedSpec with StartSupport { probe.expectMsg(Pong) } } - } -class DeferredStubbedSpec extends TypedSpec { +class DeferredStubbedSpec extends TypedAkkaSpec { import DeferredSpec._ - def mkCtx(behv: Behavior[Command]): BehaviorTestkit[Command] = - BehaviorTestkit(behv, "ctx") - "must create underlying deferred behavior immediately" in { val inbox = TestInbox[Event]("evt") val behv = Actor.deferred[Command] { _ ⇒ inbox.ref ! Started target(inbox.ref) } - mkCtx(behv) + BehaviorTestkit(behv) // it's supposed to be created immediately (not waiting for first message) inbox.receiveMsg() should ===(Started) } @@ -152,7 +148,7 @@ class DeferredStubbedSpec extends TypedSpec { throw exc } intercept[RuntimeException] { - mkCtx(behv) + BehaviorTestkit(behv) } should ===(exc) inbox.receiveMsg() should ===(Started) } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ExtensionsSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ExtensionsSpec.scala index 5cf2083700..3f93c70b30 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ExtensionsSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ExtensionsSpec.scala @@ -43,7 +43,7 @@ object InstanceCountingExtension extends ExtensionId[DummyExtension1] { } } -class ExtensionsSpec extends TypedSpecSetup { +class ExtensionsSpec extends TypedAkkaSpec { "The extensions subsystem" must { "return the same instance for the same id" in diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/PerformanceSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/PerformanceSpec.scala deleted file mode 100644 index 4c4bb288ed..0000000000 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/PerformanceSpec.scala +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Copyright (C) 2014-2017 Lightbend Inc. - */ -package akka.actor.typed - -import scala.concurrent.duration._ -import com.typesafe.config.ConfigFactory -import org.junit.runner.RunWith -import akka.actor.typed.scaladsl.Actor._ -import akka.util.Timeout - -@RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class PerformanceSpec extends TypedSpec( - ConfigFactory.parseString( - """ - # increase this if you do real benchmarking - akka.actor.typed.PerformanceSpec.iterations=100000 - """)) { - - override def setTimeout = Timeout(20.seconds) - - case class Ping(x: Int, pong: ActorRef[Pong], report: ActorRef[Pong]) - case class Pong(x: Int, ping: ActorRef[Ping], report: ActorRef[Pong]) - - def behavior(pairs: Int, pings: Int, count: Int, executor: String) = - StepWise[Pong] { (ctx, startWith) ⇒ - startWith { - - val pinger = immutable[Ping] { (ctx, msg) ⇒ - if (msg.x == 0) { - msg.report ! Pong(0, ctx.self, msg.report) - same - } else { - msg.pong ! Pong(msg.x - 1, ctx.self, msg.report) - same - } - } // FIXME .withDispatcher(executor) - - val ponger = immutable[Pong] { (ctx, msg) ⇒ - msg.ping ! Ping(msg.x, ctx.self, msg.report) - same - } // FIXME .withDispatcher(executor) - - val actors = - for (i ← 1 to pairs) - yield (ctx.spawn(pinger, s"pinger-$i"), ctx.spawn(ponger, s"ponger-$i")) - - val start = Deadline.now - - for { - (ping, pong) ← actors - _ ← 1 to pings - } ping ! Ping(count, pong, ctx.self) - - start - }.expectMultipleMessages(10.seconds, pairs * pings) { (msgs, start) ⇒ - val stop = Deadline.now - - val rate = 2L * count * pairs * pings / (stop - start).toMillis - info(s"messaging rate was $rate/ms") - } - } - - val iterations = system.settings.config.getInt("akka.actor.typed.PerformanceSpec.iterations") - - "An immutable behaviour" must { - "when warming up" in { - sync(runTest("01")(behavior(1, 1, iterations, "dispatcher-1"))) - } - - "when using a single message on a single thread" in { - sync(runTest("02")(behavior(1, 1, iterations, "dispatcher-1"))) - } - - "when using a 10 messages on a single thread" in { - sync(runTest("03")(behavior(1, 10, iterations, "dispatcher-1"))) - } - - "when using a single message on two threads" in { - sync(runTest("04")(behavior(1, 1, iterations, "dispatcher-2"))) - } - - "when using a 10 messages on two threads" in { - sync(runTest("05")(behavior(1, 10, iterations, "dispatcher-2"))) - } - - "when using 4 pairs with a single message" in { - sync(runTest("06")(behavior(4, 1, iterations, "dispatcher-8"))) - } - - "when using 4 pairs with 10 messages" in { - sync(runTest("07")(behavior(4, 10, iterations, "dispatcher-8"))) - } - - "when using 8 pairs with a single message" in { - sync(runTest("08")(behavior(8, 1, iterations, "dispatcher-8"))) - } - - "when using 8 pairs with 10 messages" in { - sync(runTest("09")(behavior(8, 10, iterations, "dispatcher-8"))) - } - - } -} diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/PropsSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/PropsSpec.scala index 7416ff4a4e..8699d568ae 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/PropsSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/PropsSpec.scala @@ -3,7 +3,7 @@ */ package akka.actor.typed -class PropsSpec extends TypedSpecSetup { +class PropsSpec extends TypedAkkaSpec { val dispatcherFirst = DispatcherDefault(DispatcherFromConfig("pool")) diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/RestarterSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala similarity index 63% rename from akka-actor-typed-tests/src/test/scala/akka/actor/typed/RestarterSpec.scala rename to akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala index 8dbdcd442e..3b0f401f97 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/RestarterSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala @@ -3,19 +3,22 @@ */ package akka.actor.typed +import akka.actor.typed.scaladsl.Actor + import scala.concurrent.duration._ import akka.actor.typed.scaladsl.Actor._ -import akka.testkit.typed.{ BehaviorTestkit, TestInbox, TestKitSettings } +import akka.testkit.typed.{ BehaviorTestkit, TestInbox, TestKit, TestKitSettings } import scala.util.control.NoStackTrace import akka.testkit.typed.scaladsl._ +import org.scalatest.{ Matchers, WordSpec, fixture } -object RestarterSpec { +object SupervisionSpec { sealed trait Command case object Ping extends Command case class Throw(e: Throwable) extends Command - case object NextState extends Command + case object IncrementState extends Command case object GetState extends Command case class CreateChild[T](behavior: Behavior[T], name: String) extends Command @@ -29,28 +32,28 @@ object RestarterSpec { class Exc2 extends Exc1("exc-2") class Exc3(msg: String = "exc-3") extends RuntimeException(msg) with NoStackTrace - def target(monitor: ActorRef[Event], state: State = State(0, Map.empty)): Behavior[Command] = + def targetBehavior(monitor: ActorRef[Event], state: State = State(0, Map.empty)): Behavior[Command] = immutable[Command] { (ctx, cmd) ⇒ cmd match { case Ping ⇒ monitor ! Pong - same - case NextState ⇒ - target(monitor, state.copy(n = state.n + 1)) + Actor.same + case IncrementState ⇒ + targetBehavior(monitor, state.copy(n = state.n + 1)) case GetState ⇒ val reply = state.copy(children = ctx.children.map(c ⇒ c.path.name → c.upcast[Command]).toMap) monitor ! reply - same + Actor.same case CreateChild(childBehv, childName) ⇒ ctx.spawn(childBehv, childName) - same + Actor.same case Throw(e) ⇒ throw e } } onSignal { - case (ctx, sig) ⇒ + case (_, sig) ⇒ monitor ! GotSignal(sig) - same + Actor.same } class FailingConstructor(monitor: ActorRef[Event]) extends MutableBehavior[Command] { @@ -59,71 +62,71 @@ object RestarterSpec { override def onMessage(msg: Command): Behavior[Command] = { monitor ! Pong - same + Actor.same } } } -class RestarterSpec extends TypedSpec { +class StubbedSupervisionSpec extends WordSpec with Matchers { - import RestarterSpec._ + import SupervisionSpec._ - def mkCtx(behv: Behavior[Command]): BehaviorTestkit[Command] = - BehaviorTestkit(behv, "ctx") + def mkTestkit(behv: Behavior[Command]): BehaviorTestkit[Command] = + BehaviorTestkit(behv) - "A restarter" must { + "A restarter (stubbed)" must { "receive message" in { val inbox = TestInbox[Event]("evt") - val behv = supervise(target(inbox.ref)).onFailure[Throwable](SupervisorStrategy.restart) - val ctx = mkCtx(behv) - ctx.run(Ping) + val behv = supervise(targetBehavior(inbox.ref)).onFailure[Throwable](SupervisorStrategy.restart) + val testkit = BehaviorTestkit(behv) + testkit.run(Ping) inbox.receiveMsg() should ===(Pong) } "stop when no supervise" in { val inbox = TestInbox[Event]("evt") - val behv = target(inbox.ref) - val ctx = mkCtx(behv) + val behv = targetBehavior(inbox.ref) + val testkit = BehaviorTestkit(behv) intercept[Exc3] { - ctx.run(Throw(new Exc3)) + testkit.run(Throw(new Exc3)) } inbox.receiveMsg() should ===(GotSignal(PostStop)) } "stop when unhandled exception" in { val inbox = TestInbox[Event]("evt") - val behv = supervise(target(inbox.ref)).onFailure[Exc1](SupervisorStrategy.restart) - val ctx = mkCtx(behv) + val behv = supervise(targetBehavior(inbox.ref)).onFailure[Exc1](SupervisorStrategy.restart) + val testkit = BehaviorTestkit(behv) intercept[Exc3] { - ctx.run(Throw(new Exc3)) + testkit.run(Throw(new Exc3)) } inbox.receiveMsg() should ===(GotSignal(PostStop)) } "restart when handled exception" in { val inbox = TestInbox[Event]("evt") - val behv = supervise(target(inbox.ref)).onFailure[Exc1](SupervisorStrategy.restart) - val ctx = mkCtx(behv) - ctx.run(NextState) - ctx.run(GetState) + val behv = supervise(targetBehavior(inbox.ref)).onFailure[Exc1](SupervisorStrategy.restart) + val testkit = BehaviorTestkit(behv) + testkit.run(IncrementState) + testkit.run(GetState) inbox.receiveMsg() should ===(State(1, Map.empty)) - ctx.run(Throw(new Exc2)) + testkit.run(Throw(new Exc2)) inbox.receiveMsg() should ===(GotSignal(PreRestart)) - ctx.run(GetState) + testkit.run(GetState) inbox.receiveMsg() should ===(State(0, Map.empty)) } "resume when handled exception" in { val inbox = TestInbox[Event]("evt") - val behv = supervise(target(inbox.ref)).onFailure[Exc1](SupervisorStrategy.resume) - val ctx = mkCtx(behv) - ctx.run(NextState) - ctx.run(GetState) + val behv = supervise(targetBehavior(inbox.ref)).onFailure[Exc1](SupervisorStrategy.resume) + val testkit = BehaviorTestkit(behv) + testkit.run(IncrementState) + testkit.run(GetState) inbox.receiveMsg() should ===(State(1, Map.empty)) - ctx.run(Throw(new Exc2)) - ctx.run(GetState) + testkit.run(Throw(new Exc2)) + testkit.run(GetState) inbox.receiveMsg() should ===(State(1, Map.empty)) } @@ -132,38 +135,39 @@ class RestarterSpec extends TypedSpec { val behv = supervise( supervise( - target(inbox.ref) + targetBehavior(inbox.ref) ).onFailure[Exc2](SupervisorStrategy.resume) ).onFailure[Exc3](SupervisorStrategy.restart) - val ctx = mkCtx(behv) - ctx.run(NextState) - ctx.run(GetState) + val testkit = BehaviorTestkit(behv) + testkit.run(IncrementState) + testkit.run(GetState) inbox.receiveMsg() should ===(State(1, Map.empty)) // resume - ctx.run(Throw(new Exc2)) - ctx.run(GetState) + testkit.run(Throw(new Exc2)) + testkit.run(GetState) inbox.receiveMsg() should ===(State(1, Map.empty)) // restart - ctx.run(Throw(new Exc3)) + testkit.run(Throw(new Exc3)) inbox.receiveMsg() should ===(GotSignal(PreRestart)) - ctx.run(GetState) + testkit.run(GetState) inbox.receiveMsg() should ===(State(0, Map.empty)) // stop intercept[Exc1] { - ctx.run(Throw(new Exc1)) + testkit.run(Throw(new Exc1)) } inbox.receiveMsg() should ===(GotSignal(PostStop)) } "not catch fatal error" in { - val inbox = TestInbox[Event]("evt") - val behv = supervise(target(inbox.ref)).onFailure[Throwable](SupervisorStrategy.restart) - val ctx = mkCtx(behv) + val inbox = TestInbox[Event]() + val behv = Actor.supervise(targetBehavior(inbox.ref)) + .onFailure[Throwable](SupervisorStrategy.restart) + val testkit = BehaviorTestkit(behv) intercept[StackOverflowError] { - ctx.run(Throw(new StackOverflowError)) + testkit.run(Throw(new StackOverflowError)) } inbox.receiveAll() should ===(Nil) } @@ -171,14 +175,14 @@ class RestarterSpec extends TypedSpec { "stop after restart retries limit" in { val inbox = TestInbox[Event]("evt") val strategy = SupervisorStrategy.restartWithLimit(maxNrOfRetries = 2, withinTimeRange = 1.minute) - val behv = supervise(target(inbox.ref)).onFailure[Exc1](strategy) - val ctx = mkCtx(behv) - ctx.run(Throw(new Exc1)) + val behv = supervise(targetBehavior(inbox.ref)).onFailure[Exc1](strategy) + val testkit = BehaviorTestkit(behv) + testkit.run(Throw(new Exc1)) inbox.receiveMsg() should ===(GotSignal(PreRestart)) - ctx.run(Throw(new Exc1)) + testkit.run(Throw(new Exc1)) inbox.receiveMsg() should ===(GotSignal(PreRestart)) intercept[Exc1] { - ctx.run(Throw(new Exc1)) + testkit.run(Throw(new Exc1)) } inbox.receiveMsg() should ===(GotSignal(PostStop)) } @@ -187,20 +191,20 @@ class RestarterSpec extends TypedSpec { val inbox = TestInbox[Event]("evt") val withinTimeRange = 2.seconds val strategy = SupervisorStrategy.restartWithLimit(maxNrOfRetries = 2, withinTimeRange) - val behv = supervise(target(inbox.ref)).onFailure[Exc1](strategy) - val ctx = mkCtx(behv) - ctx.run(Throw(new Exc1)) + val behv = supervise(targetBehavior(inbox.ref)).onFailure[Exc1](strategy) + val testkit = BehaviorTestkit(behv) + testkit.run(Throw(new Exc1)) inbox.receiveMsg() should ===(GotSignal(PreRestart)) - ctx.run(Throw(new Exc1)) + testkit.run(Throw(new Exc1)) inbox.receiveMsg() should ===(GotSignal(PreRestart)) Thread.sleep((2.seconds + 100.millis).toMillis) - ctx.run(Throw(new Exc1)) + testkit.run(Throw(new Exc1)) inbox.receiveMsg() should ===(GotSignal(PreRestart)) - ctx.run(Throw(new Exc1)) + testkit.run(Throw(new Exc1)) inbox.receiveMsg() should ===(GotSignal(PreRestart)) intercept[Exc1] { - ctx.run(Throw(new Exc1)) + testkit.run(Throw(new Exc1)) } inbox.receiveMsg() should ===(GotSignal(PostStop)) } @@ -208,10 +212,11 @@ class RestarterSpec extends TypedSpec { "stop at first exception when restart retries limit is 0" in { val inbox = TestInbox[Event]("evt") val strategy = SupervisorStrategy.restartWithLimit(maxNrOfRetries = 0, withinTimeRange = 1.minute) - val behv = supervise(target(inbox.ref)).onFailure[Exc1](strategy) - val ctx = mkCtx(behv) + val behv = supervise(targetBehavior(inbox.ref)) + .onFailure[Exc1](strategy) + val testkit = BehaviorTestkit(behv) intercept[Exc1] { - ctx.run(Throw(new Exc1)) + testkit.run(Throw(new Exc1)) } inbox.receiveMsg() should ===(GotSignal(PostStop)) } @@ -220,34 +225,38 @@ class RestarterSpec extends TypedSpec { val inbox = TestInbox[Event]("evt") val behv = supervise(deferred[Command] { _ ⇒ inbox.ref ! Started - target(inbox.ref) + targetBehavior(inbox.ref) }).onFailure[Exc1](SupervisorStrategy.restart) - mkCtx(behv) + mkTestkit(behv) // it's supposed to be created immediately (not waiting for first message) inbox.receiveMsg() should ===(Started) } } } -class RestarterStubbedSpec extends TypedSpec with StartSupport { +class SupervisionSpec extends TestKit("SupervisionSpec") with TypedAkkaSpecWithShutdown { - import RestarterSpec._ + import SupervisionSpec._ + private val nameCounter = Iterator.from(0) + private def nextName(prefix: String = "a"): String = s"$prefix-${nameCounter.next()}" + private val waitTime = 50.millis implicit val testSettings = TestKitSettings(system) - "A restart (subbed)" must { + "A supervised actor" must { "receive message" in { val probe = TestProbe[Event]("evt") - val behv = supervise(target(probe.ref)).onFailure[Throwable](SupervisorStrategy.restart) - val ref = start(behv) + val behv = Actor.supervise(targetBehavior(probe.ref)) + .onFailure[Throwable](SupervisorStrategy.restart) + val ref = spawn(behv) ref ! Ping probe.expectMsg(Pong) } - "stop when no supervise" in { + "stop when not supervised" in { val probe = TestProbe[Event]("evt") - val behv = target(probe.ref) - val ref = start(behv) + val behv = targetBehavior(probe.ref) + val ref = spawn(behv) ref ! Throw(new Exc3) probe.expectMsg(GotSignal(PostStop)) @@ -255,17 +264,19 @@ class RestarterStubbedSpec extends TypedSpec with StartSupport { "stop when unhandled exception" in { val probe = TestProbe[Event]("evt") - val behv = supervise(target(probe.ref)).onFailure[Exc1](SupervisorStrategy.restart) - val ref = start(behv) + val behv = Actor.supervise(targetBehavior(probe.ref)) + .onFailure[Exc1](SupervisorStrategy.restart) + val ref = spawn(behv) ref ! Throw(new Exc3) probe.expectMsg(GotSignal(PostStop)) } "restart when handled exception" in { val probe = TestProbe[Event]("evt") - val behv = supervise(target(probe.ref)).onFailure[Exc1](SupervisorStrategy.restart) - val ref = start(behv) - ref ! NextState + val behv = Actor.supervise(targetBehavior(probe.ref)) + .onFailure[Exc1](SupervisorStrategy.restart) + val ref = spawn(behv) + ref ! IncrementState ref ! GetState probe.expectMsg(State(1, Map.empty)) @@ -276,29 +287,31 @@ class RestarterStubbedSpec extends TypedSpec with StartSupport { } "NOT stop children when restarting" in { - val probe = TestProbe[Event]("evt") - val behv = supervise(target(probe.ref)).onFailure[Exc1](SupervisorStrategy.restart) - val ref = start(behv) + val parentProbe = TestProbe[Event]("evt") + val behv = Actor.supervise(targetBehavior(parentProbe.ref)) + .onFailure[Exc1](SupervisorStrategy.restart) + val ref = spawn(behv) val childProbe = TestProbe[Event]("childEvt") val childName = nextName() - ref ! CreateChild(target(childProbe.ref), childName) + ref ! CreateChild(targetBehavior(childProbe.ref), childName) ref ! GetState - probe.expectMsgType[State].children.keySet should contain(childName) + parentProbe.expectMsgType[State].children.keySet should contain(childName) ref ! Throw(new Exc1) - probe.expectMsg(GotSignal(PreRestart)) + parentProbe.expectMsg(GotSignal(PreRestart)) ref ! GetState // TODO document this difference compared to classic actors, and that // children can be stopped if needed in PreRestart - probe.expectMsgType[State].children.keySet should contain(childName) + parentProbe.expectMsgType[State].children.keySet should contain(childName) + childProbe.expectNoMsg(waitTime) } "resume when handled exception" in { val probe = TestProbe[Event]("evt") - val behv = supervise(target(probe.ref)).onFailure[Exc1](SupervisorStrategy.resume) - val ref = start(behv) - ref ! NextState + val behv = supervise(targetBehavior(probe.ref)).onFailure[Exc1](SupervisorStrategy.resume) + val ref = spawn(behv) + ref ! IncrementState ref ! GetState probe.expectMsg(State(1, Map.empty)) @@ -309,16 +322,18 @@ class RestarterStubbedSpec extends TypedSpec with StartSupport { "support nesting to handle different exceptions" in { val probe = TestProbe[Event]("evt") - val behv = supervise( - supervise(target(probe.ref)).onFailure[Exc2](SupervisorStrategy.resume) + val behv = Actor.supervise( + Actor.supervise(targetBehavior(probe.ref)) + .onFailure[Exc2](SupervisorStrategy.resume) ).onFailure[Exc3](SupervisorStrategy.restart) - val ref = start(behv) - ref ! NextState + val ref = spawn(behv) + ref ! IncrementState ref ! GetState probe.expectMsg(State(1, Map.empty)) // resume ref ! Throw(new Exc2) + probe.expectNoMsg(waitTime) ref ! GetState probe.expectMsg(State(1, Map.empty)) @@ -337,16 +352,17 @@ class RestarterStubbedSpec extends TypedSpec with StartSupport { val probe = TestProbe[Event]("evt") val startedProbe = TestProbe[Event]("started") val minBackoff = 1.seconds - val strategy = SupervisorStrategy.restartWithBackoff(minBackoff, 10.seconds, 0.0) + val strategy = SupervisorStrategy + .restartWithBackoff(minBackoff, 10.seconds, 0.0) .withResetBackoffAfter(10.seconds) - val behv = supervise(deferred[Command] { _ ⇒ + val behv = Actor.supervise(Actor.deferred[Command] { _ ⇒ startedProbe.ref ! Started - target(probe.ref) + targetBehavior(probe.ref) }).onFailure[Exception](strategy) - val ref = start(behv) + val ref = spawn(behv) startedProbe.expectMsg(Started) - ref ! NextState + ref ! IncrementState ref ! Throw(new Exc1) probe.expectMsg(GotSignal(PreRestart)) ref ! Ping // dropped due to backoff @@ -358,7 +374,7 @@ class RestarterStubbedSpec extends TypedSpec with StartSupport { probe.expectMsg(State(0, Map.empty)) // one more time - ref ! NextState + ref ! IncrementState ref ! Throw(new Exc1) probe.expectMsg(GotSignal(PreRestart)) ref ! Ping // dropped due to backoff @@ -375,10 +391,10 @@ class RestarterStubbedSpec extends TypedSpec with StartSupport { val minBackoff = 1.seconds val strategy = SupervisorStrategy.restartWithBackoff(minBackoff, 10.seconds, 0.0) .withResetBackoffAfter(100.millis) - val behv = supervise(target(probe.ref)).onFailure[Exc1](strategy) - val ref = start(behv) + val behv = supervise(targetBehavior(probe.ref)).onFailure[Exc1](strategy) + val ref = spawn(behv) - ref ! NextState + ref ! IncrementState ref ! Throw(new Exc1) probe.expectMsg(GotSignal(PreRestart)) ref ! Ping // dropped due to backoff @@ -389,7 +405,7 @@ class RestarterStubbedSpec extends TypedSpec with StartSupport { // one more time after the reset timeout probe.expectNoMsg(strategy.resetBackoffAfter + 100.millis) - ref ! NextState + ref ! IncrementState ref ! Throw(new Exc1) probe.expectMsg(GotSignal(PreRestart)) ref ! Ping // dropped due to backoff @@ -404,18 +420,19 @@ class RestarterStubbedSpec extends TypedSpec with StartSupport { val probe = TestProbe[Event]("evt") val behv = supervise(deferred[Command] { _ ⇒ probe.ref ! Started - target(probe.ref) + targetBehavior(probe.ref) }).onFailure[Exception](SupervisorStrategy.restart) probe.expectNoMsg(100.millis) // not yet - start(behv) + spawn(behv) // it's supposed to be created immediately (not waiting for first message) probe.expectMsg(Started) } "stop when exception from MutableBehavior constructor" in { val probe = TestProbe[Event]("evt") - val behv = supervise(mutable[Command](_ ⇒ new FailingConstructor(probe.ref))).onFailure[Exception](SupervisorStrategy.restart) - val ref = start(behv) + val behv = supervise(mutable[Command](_ ⇒ new FailingConstructor(probe.ref))) + .onFailure[Exception](SupervisorStrategy.restart) + val ref = spawn(behv) probe.expectMsg(Started) ref ! Ping probe.expectNoMsg(100.millis) diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala index d501fdbc3c..643a9b86aa 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala @@ -9,16 +9,15 @@ import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.duration._ import scala.util.control.NoStackTrace - import akka.actor.typed.scaladsl.Actor import akka.actor.typed.scaladsl.TimerScheduler import akka.testkit.typed.TestKitSettings +import akka.testkit.typed.TestKit import akka.testkit.typed.scaladsl._ +import org.scalatest.WordSpecLike -class TimerSpec extends TypedSpec( - """ - #akka.loglevel = DEBUG - """) with StartSupport { +class TimerSpec extends TestKit("TimerSpec") + with WordSpecLike { sealed trait Command case class Tick(n: Int) extends Command @@ -87,7 +86,7 @@ class TimerSpec extends TypedSpec( target(probe.ref, timer, 1) } - val ref = start(behv) + val ref = spawn(behv) probe.expectMsg(Tock(1)) probe.expectNoMsg(100.millis) @@ -102,7 +101,7 @@ class TimerSpec extends TypedSpec( target(probe.ref, timer, 1) } - val ref = start(behv) + val ref = spawn(behv) probe.within((interval * 4) - 100.millis) { probe.expectMsg(Tock(1)) probe.expectMsg(Tock(1)) @@ -120,7 +119,7 @@ class TimerSpec extends TypedSpec( target(probe.ref, timer, 1) } - val ref = start(behv) + val ref = spawn(behv) probe.expectMsg(Tock(1)) val latch = new CountDownLatch(1) // next Tock(1) enqueued in mailboxed, but should be discarded because of new timer @@ -140,7 +139,7 @@ class TimerSpec extends TypedSpec( target(probe.ref, timer, 1) } - val ref = start(behv) + val ref = spawn(behv) probe.expectMsg(Tock(1)) ref ! Cancel probe.expectNoMsg(dilatedInterval + 100.millis) @@ -157,7 +156,7 @@ class TimerSpec extends TypedSpec( target(probe.ref, timer, 1) }).onFailure[Exception](SupervisorStrategy.restart) - val ref = start(behv) + val ref = spawn(behv) probe.expectMsg(Tock(1)) val latch = new CountDownLatch(1) @@ -180,7 +179,7 @@ class TimerSpec extends TypedSpec( target(probe.ref, timer, 1) }).onFailure[Exception](SupervisorStrategy.restart) - val ref = start(behv) + val ref = spawn(behv) probe.expectMsg(Tock(1)) // change state so that we see that the restart starts over again ref ! Bump @@ -200,23 +199,23 @@ class TimerSpec extends TypedSpec( } "cancel timers when stopped from exception" in { - val probe = TestProbe[Event]("evt") + val probe = TestProbe[Event]() val behv = Actor.withTimers[Command] { timer ⇒ timer.startPeriodicTimer("T", Tick(1), interval) target(probe.ref, timer, 1) } - val ref = start(behv) + val ref = spawn(behv) ref ! Throw(new Exc) probe.expectMsg(GotPostStop(false)) } "cancel timers when stopped voluntarily" in { - val probe = TestProbe[Event]("evt") + val probe = TestProbe[Event]() val behv = Actor.withTimers[Command] { timer ⇒ timer.startPeriodicTimer("T", Tick(1), interval) target(probe.ref, timer, 1) } - val ref = start(behv) + val ref = spawn(behv) ref ! End probe.expectMsg(GotPostStop(false)) } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TypedAkkaSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TypedAkkaSpec.scala new file mode 100644 index 0000000000..a3fdc822b8 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TypedAkkaSpec.scala @@ -0,0 +1,31 @@ +package akka.actor.typed + +import akka.testkit.typed.{ TestInbox, TestKit } +import akka.util.Timeout +import org.scalactic.TypeCheckedTripleEquals +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.time.Span +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } + +import scala.concurrent.duration._ + +/** + * Helper trait to include standard traits for typed tests + */ +trait TypedAkkaSpec extends WordSpecLike with Matchers with BeforeAndAfterAll with ScalaFutures with TypeCheckedTripleEquals { + implicit val akkaPatience = PatienceConfig(3.seconds, Span(100, org.scalatest.time.Millis)) + implicit val timeout = Timeout(3.seconds) + + def assertEmpty(inboxes: TestInbox[_]*): Unit = { + inboxes foreach (i ⇒ withClue(s"inbox $i had messages")(i.hasMessages should be(false))) + } + +} + +/** + * Helper that also shuts down the actor system if using [[TestKit]] + */ +trait TypedAkkaSpecWithShutdown extends TypedAkkaSpec { + self: TestKit ⇒ + override protected def afterAll(): Unit = shutdown() +} diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TypedSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TypedSpec.scala deleted file mode 100644 index 907446c11d..0000000000 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TypedSpec.scala +++ /dev/null @@ -1,228 +0,0 @@ -/** - * Copyright (C) 2014-2017 Lightbend Inc. - */ -package akka.actor.typed - -import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } -import akka.testkit.AkkaSpec - -import scala.concurrent.Await -import scala.concurrent.duration._ -import scala.concurrent.Future -import com.typesafe.config.Config -import com.typesafe.config.ConfigFactory -import akka.util.Timeout - -import scala.reflect.ClassTag -import akka.actor.ActorInitializationException - -import language.existentials -import akka.testkit.TestEvent.Mute -import akka.actor.typed.scaladsl.Actor._ -import org.scalatest.concurrent.ScalaFutures -import org.scalactic.TypeCheckedTripleEquals -import org.scalactic.CanEqual - -import scala.util.control.NonFatal -import akka.actor.typed.scaladsl.AskPattern - -import scala.util.control.NoStackTrace -import akka.testkit.typed.{ TestInbox, TestKitSettings } -import org.scalatest.time.Span - -/** - * Helper class for writing tests for typed Actors with ScalaTest. - */ -class TypedSpecSetup extends WordSpec with Matchers with BeforeAndAfterAll with ScalaFutures with TypeCheckedTripleEquals { - - // TODO hook this up with config like in akka-testkit/AkkaSpec? - implicit val akkaPatience = PatienceConfig(3.seconds, Span(100, org.scalatest.time.Millis)) - -} - -trait StartSupport { - implicit def system: ActorSystem[TypedSpec.Command] - private implicit def timeout: Timeout = Timeout(1.minute) - private implicit def scheduler = system.scheduler - - private val nameCounter = Iterator.from(0) - - def nextName(prefix: String = "a"): String = s"$prefix-${nameCounter.next()}" - - def start[T](behv: Behavior[T]): ActorRef[T] = { - import akka.actor.typed.scaladsl.AskPattern._ - import akka.testkit.typed.scaladsl._ - implicit val testSettings = TestKitSettings(system) - Await.result(system ? TypedSpec.Create(behv, nextName()), 3.seconds.dilated) - } -} -/** - * Helper class for writing tests against both ActorSystemImpl and ActorSystemAdapter. - */ -abstract class TypedSpec(val config: Config) extends TypedSpecSetup { - - import TypedSpec._ - import AskPattern._ - - def this() = this(ConfigFactory.empty) - - def this(config: String) = this(ConfigFactory.parseString(config)) - - // extension point - def setTimeout: Timeout = Timeout(1.minute) - - implicit lazy val system: ActorSystem[TypedSpec.Command] = { - val sys = ActorSystem(guardian(), AkkaSpec.getCallerName(classOf[TypedSpec]), config = Some(config withFallback AkkaSpec.testConf)) - sys - } - - trait AdaptedSystem { - def system: ActorSystem[TypedSpec.Command] = TypedSpec.this.system - } - - implicit val timeout = setTimeout - implicit def scheduler = system.scheduler - - lazy val blackhole = await(system ? Create(immutable[Any] { case _ ⇒ same }, "blackhole")) - - override def afterAll(): Unit = { - Await.result(system.terminate, timeout.duration) - } - - // TODO remove after basing on ScalaTest 3 with async support - import akka.testkit._ - - def await[T](f: Future[T]): T = Await.result(f, timeout.duration * 1.1) - - /** - * Run an Actor-based test. The test procedure is most conveniently - * formulated using the [[StepWise]] behavior type. - */ - def runTest[T: ClassTag](name: String)(behavior: Behavior[T])(implicit system: ActorSystem[Command]): Future[Status] = - system ? (RunTest(name, behavior, _, timeout.duration)) - - // TODO remove after basing on ScalaTest 3 with async support - def sync(f: Future[Status])(implicit system: ActorSystem[Command]): Unit = { - def unwrap(ex: Throwable): Throwable = ex match { - case ActorInitializationException(_, _, ex) ⇒ ex - case other ⇒ other - } - - try await(f) match { - case Success ⇒ () - case Failed(ex) ⇒ - unwrap(ex) match { - case ex2: TypedSpec.SimulatedException ⇒ - throw ex2 - case _ ⇒ - println(system.printTree) - throw unwrap(ex) - } - case Timedout ⇒ - println(system.printTree) - fail("test timed out") - } catch { - case ex: TypedSpec.SimulatedException ⇒ - throw ex - case NonFatal(ex) ⇒ - println(system.printTree) - throw ex - } - } - - def muteExpectedException[T <: Exception: ClassTag]( - message: String = null, - source: String = null, - start: String = "", - pattern: String = null, - occurrences: Int = Int.MaxValue)(implicit system: ActorSystem[Command]): EventFilter = { - val filter = EventFilter(message, source, start, pattern, occurrences) - system.eventStream.publish(Mute(filter)) - filter - } - - /** - * Group assertion that ensures that the given inboxes are empty. - */ - def assertEmpty(inboxes: TestInbox[_]*): Unit = { - inboxes foreach (i ⇒ withClue(s"inbox $i had messages")(i.hasMessages should be(false))) - } - - // for ScalaTest === compare of Class objects - implicit def classEqualityConstraint[A, B]: CanEqual[Class[A], Class[B]] = - new CanEqual[Class[A], Class[B]] { - def areEqual(a: Class[A], b: Class[B]) = a == b - } - - implicit def setEqualityConstraint[A, T <: Set[_ <: A]]: CanEqual[Set[A], T] = - new CanEqual[Set[A], T] { - def areEqual(a: Set[A], b: T) = a == b - } -} - -object TypedSpec { - - sealed abstract class Start - case object Start extends Start - - sealed trait Command - case class RunTest[T](name: String, behavior: Behavior[T], replyTo: ActorRef[Status], timeout: FiniteDuration) extends Command - case class Terminate(reply: ActorRef[Status]) extends Command - case class Create[T](behavior: Behavior[T], name: String)(val replyTo: ActorRef[ActorRef[T]]) extends Command - - sealed trait Status - case object Success extends Status - case class Failed(thr: Throwable) extends Status - case object Timedout extends Status - - class SimulatedException(message: String) extends RuntimeException(message) with NoStackTrace - - def guardian(outstanding: Map[ActorRef[_], ActorRef[Status]] = Map.empty): Behavior[Command] = - immutable[Command] { - case (ctx, r: RunTest[t]) ⇒ - val test = ctx.spawn(r.behavior, r.name) - ctx.schedule(r.timeout, r.replyTo, Timedout) - ctx.watch(test) - guardian(outstanding + ((test, r.replyTo))) - case (_, Terminate(reply)) ⇒ - reply ! Success - stopped - case (ctx, c: Create[t]) ⇒ - c.replyTo ! ctx.spawn(c.behavior, c.name) - same - } onSignal { - case (ctx, t @ Terminated(test)) ⇒ - outstanding get test match { - case Some(reply) ⇒ - if (t.failure eq null) reply ! Success - else reply ! Failed(t.failure) - guardian(outstanding - test) - case None ⇒ same - } - case _ ⇒ same - } - - def getCallerName(clazz: Class[_]): String = { - val s = (Thread.currentThread.getStackTrace map (_.getClassName) drop 1) - .dropWhile(_ matches "(java.lang.Thread|.*TypedSpec.?$)") - val reduced = s.lastIndexWhere(_ == clazz.getName) match { - case -1 ⇒ s - case z ⇒ s drop (z + 1) - } - reduced.head.replaceFirst(""".*\.""", "").replaceAll("[^a-zA-Z_0-9]", "_") - } -} - -class TypedSpecSpec extends TypedSpec { - - "A TypedSpec" must { - "must report failures" in { - a[TypedSpec.SimulatedException] must be thrownBy { - sync(runTest("failure")(StepWise[String]((ctx, startWith) ⇒ - startWith { - throw new TypedSpec.SimulatedException("expected") - }))) - } - } - } -} diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala index 0006591b34..93d7e0b2fb 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala @@ -3,63 +3,68 @@ */ package akka.actor.typed -import scala.concurrent._ -import scala.concurrent.duration._ -import akka.actor.typed.scaladsl.Actor._ -import akka.actor.typed.scaladsl.AskPattern._ +import akka.actor.typed.scaladsl.Actor -class WatchSpec extends TypedSpec { +import scala.concurrent._ +import akka.testkit.typed.TestKit + +object WatchSpec { + case object Stop + + val terminatorBehavior = + Actor.immutable[Stop.type] { + case (_, Stop) ⇒ Actor.stopped + } + + sealed trait Message + case object CustomTerminationMessage extends Message + case class StartWatchingWith(watchee: ActorRef[Stop.type], msg: CustomTerminationMessage.type) extends Message +} + +class WatchSpec extends TestKit("WordSpec") + with TypedAkkaSpecWithShutdown { + + import WatchSpec._ "Actor monitoring" must { - "get notified of actor termination" in { - case object Stop case class StartWatching(watchee: ActorRef[Stop.type]) + val terminator = systemActor(terminatorBehavior) + val receivedTerminationSignal: Promise[ActorRef[Nothing]] = Promise() - val terminator = Await.result(system ? TypedSpec.Create(immutable[Stop.type] { - case (ctx, `Stop`) ⇒ stopped - }, "t1"), 3.seconds /*.dilated*/ ) - - val receivedTerminationSignal: Promise[Unit] = Promise() - - val watcher = Await.result(system ? TypedSpec.Create(immutable[StartWatching] { - case (ctx, StartWatching(watchee)) ⇒ ctx.watch(watchee); same + val watcher = systemActor(Actor.immutable[StartWatching] { + case (ctx, StartWatching(watchee)) ⇒ + ctx.watch(watchee) + Actor.same }.onSignal { - case (ctx, Terminated(_)) ⇒ receivedTerminationSignal.success(()); stopped - }, "w1"), 3.seconds /*.dilated*/ ) + case (_, Terminated(stopped)) ⇒ + receivedTerminationSignal.success(stopped) + Actor.stopped + }) watcher ! StartWatching(terminator) terminator ! Stop - Await.result(receivedTerminationSignal.future, 3.seconds /*.dilated*/ ) + receivedTerminationSignal.future.futureValue shouldEqual terminator } "get notified of actor termination with a custom message" in { - case object Stop + val terminator = systemActor(terminatorBehavior) + val receivedTerminationSignal: Promise[Message] = Promise() - sealed trait Message - case object CustomTerminationMessage extends Message - case class StartWatchingWith(watchee: ActorRef[Stop.type], msg: CustomTerminationMessage.type) extends Message - - val terminator = Await.result(system ? TypedSpec.Create(immutable[Stop.type] { - case (ctx, `Stop`) ⇒ stopped - }, "t2"), 3.seconds /*.dilated*/ ) - - val receivedTerminationSignal: Promise[Unit] = Promise() - - val watcher = Await.result(system ? TypedSpec.Create(immutable[Message] { + val watcher = systemActor(Actor.immutable[Message] { case (ctx, StartWatchingWith(watchee, msg)) ⇒ ctx.watchWith(watchee, msg) - same - case (ctx, `CustomTerminationMessage`) ⇒ - receivedTerminationSignal.success(()) - stopped - }, "w2"), 3.seconds /*.dilated*/ ) + Actor.same + case (_, msg) ⇒ + receivedTerminationSignal.success(msg) + Actor.stopped + }) watcher ! StartWatchingWith(terminator, CustomTerminationMessage) terminator ! Stop - Await.result(receivedTerminationSignal.future, 3.seconds /*.dilated*/ ) + receivedTerminationSignal.future.futureValue shouldEqual CustomTerminationMessage } } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/MiscMessageSerializerSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/MiscMessageSerializerSpec.scala index d130bf9d2d..57f794a694 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/MiscMessageSerializerSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/MiscMessageSerializerSpec.scala @@ -3,12 +3,11 @@ */ package akka.actor.typed.internal -import akka.actor.typed.TypedSpec -import akka.actor.typed.TypedSpec.{ Create ⇒ TCreate } +import akka.actor.typed.TypedAkkaSpecWithShutdown import akka.actor.typed.scaladsl.Actor -import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.adapter._ import akka.serialization.SerializationExtension +import akka.testkit.typed.TestKit import com.typesafe.config.ConfigFactory object MiscMessageSerializerSpec { @@ -23,7 +22,7 @@ object MiscMessageSerializerSpec { """) } -class MiscMessageSerializerSpec extends TypedSpec(MiscMessageSerializerSpec.config) { +class MiscMessageSerializerSpec extends TestKit(MiscMessageSerializerSpec.config) with TypedAkkaSpecWithShutdown { val serialization = SerializationExtension(system.toUntyped) @@ -40,8 +39,7 @@ class MiscMessageSerializerSpec extends TypedSpec(MiscMessageSerializerSpec.conf } "must serialize and deserialize typed actor refs" in { - val ref = (system ? TCreate(Actor.empty[Unit], "some-actor")).futureValue - println(ref.getClass) + val ref = spawn(Actor.empty[Unit]) checkSerialization(ref) } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/receptionist/LocalReceptionistSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/receptionist/LocalReceptionistSpec.scala index 1cdf09f726..86c7eddf35 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/receptionist/LocalReceptionistSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/receptionist/LocalReceptionistSpec.scala @@ -7,15 +7,13 @@ import akka.actor.typed._ import akka.actor.typed.receptionist.Receptionist._ import akka.actor.typed.scaladsl.Actor import akka.actor.typed.scaladsl.AskPattern._ -import akka.testkit.typed.BehaviorTestkit -import akka.testkit.typed.TestInbox -import akka.testkit.typed.TestKitSettings +import akka.testkit.typed.{ BehaviorTestkit, TestInbox, TestKit, TestKitSettings } import akka.testkit.typed.scaladsl.TestProbe import org.scalatest.concurrent.Eventually -import scala.concurrent.duration._ +import scala.concurrent.Future -class LocalReceptionistSpec extends TypedSpec with Eventually with StartSupport { +class LocalReceptionistSpec extends TestKit with TypedAkkaSpecWithShutdown with Eventually { trait ServiceA val ServiceKeyA = Receptionist.ServiceKey[ServiceA]("service-a") @@ -33,77 +31,77 @@ class LocalReceptionistSpec extends TypedSpec with Eventually with StartSupport } } - import akka.actor.typed.internal.receptionist.ReceptionistImpl.{ localOnlyBehavior ⇒ behavior } + import akka.actor.typed.internal.receptionist.ReceptionistImpl.{ localOnlyBehavior ⇒ receptionistBehavior } implicit val testSettings = TestKitSettings(system) abstract class TestSetup { - val receptionist = start(behavior) + val receptionist = spawn(receptionistBehavior) } "A local receptionist" must { - "must register a service" in { - val ctx = new BehaviorTestkit("register", behavior) + "register a service" in { + val testkit = BehaviorTestkit(receptionistBehavior) val a = TestInbox[ServiceA]("a") val r = TestInbox[Registered[_]]("r") - ctx.run(Register(ServiceKeyA, a.ref)(r.ref)) - ctx.retrieveEffect() // watching however that is implemented + testkit.run(Register(ServiceKeyA, a.ref)(r.ref)) + testkit.retrieveEffect() // watching however that is implemented r.receiveMsg() should be(Registered(ServiceKeyA, a.ref)) val q = TestInbox[Listing[ServiceA]]("q") - ctx.run(Find(ServiceKeyA)(q.ref)) - ctx.retrieveAllEffects() should be(Nil) + testkit.run(Find(ServiceKeyA)(q.ref)) + testkit.retrieveAllEffects() should be(Nil) q.receiveMsg() should be(Listing(ServiceKeyA, Set(a.ref))) assertEmpty(a, r, q) } - "must register two services" in { - val ctx = new BehaviorTestkit("registertwo", behavior) + "register two services" in { + val testkit = BehaviorTestkit(receptionistBehavior) val a = TestInbox[ServiceA]("a") val r = TestInbox[Registered[_]]("r") - ctx.run(Register(ServiceKeyA, a.ref)(r.ref)) + testkit.run(Register(ServiceKeyA, a.ref)(r.ref)) r.receiveMsg() should be(Registered(ServiceKeyA, a.ref)) val b = TestInbox[ServiceB]("b") - ctx.run(Register(ServiceKeyB, b.ref)(r.ref)) + testkit.run(Register(ServiceKeyB, b.ref)(r.ref)) r.receiveMsg() should be(Registered(ServiceKeyB, b.ref)) val q = TestInbox[Listing[_]]("q") - ctx.run(Find(ServiceKeyA)(q.ref)) + testkit.run(Find(ServiceKeyA)(q.ref)) q.receiveMsg() should be(Listing(ServiceKeyA, Set(a.ref))) - ctx.run(Find(ServiceKeyB)(q.ref)) + testkit.run(Find(ServiceKeyB)(q.ref)) q.receiveMsg() should be(Listing(ServiceKeyB, Set(b.ref))) assertEmpty(a, b, r, q) } - "must register two services with the same key" in { - val ctx = new BehaviorTestkit("registertwosame", behavior) + "register two services with the same key" in { + val testkit = BehaviorTestkit(receptionistBehavior) val a1 = TestInbox[ServiceA]("a1") val r = TestInbox[Registered[_]]("r") - ctx.run(Register(ServiceKeyA, a1.ref)(r.ref)) + testkit.run(Register(ServiceKeyA, a1.ref)(r.ref)) r.receiveMsg() should be(Registered(ServiceKeyA, a1.ref)) val a2 = TestInbox[ServiceA]("a2") - ctx.run(Register(ServiceKeyA, a2.ref)(r.ref)) + testkit.run(Register(ServiceKeyA, a2.ref)(r.ref)) r.receiveMsg() should be(Registered(ServiceKeyA, a2.ref)) val q = TestInbox[Listing[_]]("q") - ctx.run(Find(ServiceKeyA)(q.ref)) + testkit.run(Find(ServiceKeyA)(q.ref)) q.receiveMsg() should be(Listing(ServiceKeyA, Set(a1.ref, a2.ref))) - ctx.run(Find(ServiceKeyB)(q.ref)) + testkit.run(Find(ServiceKeyB)(q.ref)) q.receiveMsg() should be(Listing(ServiceKeyB, Set.empty[ActorRef[ServiceB]])) assertEmpty(a1, a2, r, q) } - "must unregister services when they terminate" in { + "unregister services when they terminate" in { new TestSetup { val regProbe = TestProbe[Any]("regProbe") - val serviceA = start(stoppableBehavior.narrow[ServiceA]) + val serviceA = spawn(stoppableBehavior.narrow[ServiceA]) receptionist ! Register(ServiceKeyA, serviceA, regProbe.ref) regProbe.expectMsg(Registered(ServiceKeyA, serviceA)) - val serviceB = start(stoppableBehavior.narrow[ServiceB]) + val serviceB = spawn(stoppableBehavior.narrow[ServiceB]) receptionist ! Register(ServiceKeyB, serviceB, regProbe.ref) regProbe.expectMsg(Registered(ServiceKeyB, serviceB)) - val serviceC = start(stoppableBehavior) + val serviceC = spawn(stoppableBehavior) receptionist ! Register(ServiceKeyA, serviceC, regProbe.ref) receptionist ! Register(ServiceKeyB, serviceC, regProbe.ref) regProbe.expectMsg(Registered(ServiceKeyA, serviceC)) @@ -125,7 +123,7 @@ class LocalReceptionistSpec extends TypedSpec with Eventually with StartSupport } } - "must support subscribing to service changes" in { + "support subscribing to service changes" in { new TestSetup { val regProbe = TestProbe[Registered[_]]("regProbe") @@ -134,13 +132,13 @@ class LocalReceptionistSpec extends TypedSpec with Eventually with StartSupport aSubscriber.expectMsg(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]])) - val serviceA: ActorRef[ServiceA] = start(stoppableBehavior) + val serviceA: ActorRef[ServiceA] = spawn(stoppableBehavior) receptionist ! Register(ServiceKeyA, serviceA, regProbe.ref) regProbe.expectMsg(Registered(ServiceKeyA, serviceA)) aSubscriber.expectMsg(Listing(ServiceKeyA, Set(serviceA))) - val serviceA2: ActorRef[ServiceA] = start(stoppableBehavior) + val serviceA2: ActorRef[ServiceA] = spawn(stoppableBehavior) receptionist ! Register(ServiceKeyA, serviceA2, regProbe.ref) regProbe.expectMsg(Registered(ServiceKeyA, serviceA2)) @@ -153,40 +151,18 @@ class LocalReceptionistSpec extends TypedSpec with Eventually with StartSupport } } - "must work with ask" in { - sync(runTest("Receptionist") { - StepWise[Registered[ServiceA]] { (ctx, startWith) ⇒ - val self = ctx.self - startWith.withKeepTraces(true) { - val r = ctx.spawnAnonymous(behavior) - val s = ctx.spawnAnonymous(behaviorA) - val f = r ? Register(ServiceKeyA, s) - r ! Register(ServiceKeyA, s)(self) - (f, s) - }.expectMessage(1.second) { - case (msg, (f, s)) ⇒ - msg should be(Registered(ServiceKeyA, s)) - f.foreach(self ! _)(system.executionContext) - s - }.expectMessage(1.second) { - case (msg, s) ⇒ - msg should be(Registered(ServiceKeyA, s)) - } - } - }) + "work with ask" in { + val receptionist = spawn(receptionistBehavior) + val serviceA = spawn(behaviorA) + val f: Future[Registered[ServiceA]] = receptionist ? Register(ServiceKeyA, serviceA) + f.futureValue should be(Registered(ServiceKeyA, serviceA)) } - "must be present in the system" in { - sync(runTest("systemReceptionist") { - StepWise[Listing[ServiceA]] { (ctx, startWith) ⇒ - val self = ctx.self - startWith.withKeepTraces(true) { - system.receptionist ! Find(ServiceKeyA)(self) - }.expectMessage(1.second) { (msg, _) ⇒ - msg.serviceInstances should ===(Set()) - } - } - }) + "be present in the system" in { + val probe = TestProbe[Receptionist.Listing[_]]() + system.receptionist ! Find(ServiceKeyA)(probe.ref) + val listing: Listing[_] = probe.expectMsgType[Listing[_]] + listing.serviceInstances should be(Set()) } } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ImmutablePartialSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ImmutablePartialSpec.scala index 29b0a296a9..1e4a7a6147 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ImmutablePartialSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ImmutablePartialSpec.scala @@ -4,13 +4,12 @@ package akka.actor.typed package scaladsl -import akka.testkit.typed.{ BehaviorTestkit, TestKitSettings } +import akka.testkit.typed.{ BehaviorTestkit, TestKit, TestKitSettings } import akka.testkit.typed.scaladsl.TestProbe + import scala.concurrent.duration.DurationInt -class ImmutablePartialSpec extends TypedSpec with StartSupport { - - private implicit val testSettings = TestKitSettings(system) +class ImmutablePartialSpec extends TestKit with TypedAkkaSpecWithShutdown { "An immutable partial" must { @@ -22,14 +21,14 @@ class ImmutablePartialSpec extends TypedSpec with StartSupport { probe.ref ! Command2 Actor.same } - val context = new BehaviorTestkit("ctx", behavior) + val testkit = BehaviorTestkit(behavior) - context.run(Command1) - context.currentBehavior shouldBe behavior + testkit.run(Command1) + testkit.currentBehavior shouldBe behavior probe.expectNoMsg(100.milliseconds) - context.run(Command2) - context.currentBehavior shouldBe behavior + testkit.run(Command2) + testkit.currentBehavior shouldBe behavior probe.expectMsg(Command2) } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/OnSignalSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/OnSignalSpec.scala index 1f9e0008d0..409fe784f3 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/OnSignalSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/OnSignalSpec.scala @@ -5,12 +5,10 @@ package akka.actor.typed package scaladsl import akka.Done -import akka.testkit.typed.TestKitSettings +import akka.testkit.typed.TestKit import akka.testkit.typed.scaladsl.TestProbe -final class OnSignalSpec extends TypedSpec with StartSupport { - - private implicit val testSettings = TestKitSettings(system) +final class OnSignalSpec extends TestKit with TypedAkkaSpecWithShutdown { "An Actor.OnSignal behavior" must { "must correctly install the signal handler" in { @@ -25,7 +23,7 @@ final class OnSignalSpec extends TypedSpec with StartSupport { Actor.stopped } } - start[Nothing](behavior) + spawn[Nothing](behavior) probe.expectMsg(Done) } } diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/IntroSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/IntroSpec.scala index bb3b13959c..9d54b5933b 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/IntroSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/IntroSpec.scala @@ -7,6 +7,8 @@ package docs.akka.typed import akka.actor.typed._ import akka.actor.typed.scaladsl.Actor import akka.actor.typed.scaladsl.AskPattern._ +import akka.testkit.typed.TestKit + import scala.concurrent.Future import scala.concurrent.duration._ import scala.concurrent.Await @@ -73,7 +75,7 @@ object IntroSpec { } -class IntroSpec extends TypedSpec { +class IntroSpec extends TestKit with TypedAkkaSpecWithShutdown { import IntroSpec._ diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/MutableIntroSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/MutableIntroSpec.scala index 03ebef981a..4bc50aa4c8 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/MutableIntroSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/MutableIntroSpec.scala @@ -7,6 +7,7 @@ package docs.akka.typed import akka.actor.typed._ import akka.actor.typed.scaladsl.Actor import akka.actor.typed.scaladsl.ActorContext +import akka.testkit.typed.TestKit import scala.concurrent.duration._ import scala.concurrent.Await @@ -65,7 +66,7 @@ object MutableIntroSpec { } -class MutableIntroSpec extends TypedSpec { +class MutableIntroSpec extends TestKit with TypedAkkaSpecWithShutdown { import MutableIntroSpec._ @@ -102,7 +103,7 @@ class MutableIntroSpec extends TypedSpec { chatRoom ! GetSession("ol’ Gabbler", gabblerRef) Actor.same } onSignal { - case (_, Terminated(ref)) ⇒ + case (_, Terminated(_)) ⇒ println("Stopping guardian") Actor.stopped } diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/testing/async/BasicAsyncTestingSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/testing/async/BasicAsyncTestingSpec.scala index 96ed4053ec..9df4006d82 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/testing/async/BasicAsyncTestingSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/testing/async/BasicAsyncTestingSpec.scala @@ -22,7 +22,7 @@ object BasicAsyncTestingSpec { } //#test-header -class BasicAsyncTestingSpec extends TestKit(ActorSystem(Actor.empty, "BasicTestingSpec")) +class BasicAsyncTestingSpec extends TestKit("BasicTestingSpec") with WordSpecLike with BeforeAndAfterAll { //#test-header @@ -30,12 +30,21 @@ class BasicAsyncTestingSpec extends TestKit(ActorSystem(Actor.empty, "BasicTesti "A testkit" must { "support verifying a response" in { - //#test + //#test-spawn val probe = TestProbe[Pong]() - val pinger = actorOf(echoActor, "ping") + val pinger = spawn(echoActor, "ping") pinger ! Ping("hello", probe.ref) probe.expectMsg(Pong("hello")) - //#test + //#test-spawn + } + + "support verifying a response - anonymous" in { + //#test-spawn-anonymous + val probe = TestProbe[Pong]() + val pinger = spawn(echoActor) + pinger ! Ping("hello", probe.ref) + probe.expectMsg(Pong("hello")) + //#test-spawn-anonymous } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala index 231fa676db..6b110a57c3 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala @@ -61,7 +61,7 @@ object SupervisorStrategy { minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double): BackoffSupervisorStrategy = - new Backoff(minBackoff, maxBackoff, randomFactor, resetBackoffAfter = minBackoff, loggingEnabled = true) + Backoff(minBackoff, maxBackoff, randomFactor, resetBackoffAfter = minBackoff, loggingEnabled = true) /** * INTERNAL API diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Actor.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Actor.scala index 500900b223..ac84065ca3 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Actor.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Actor.scala @@ -218,9 +218,9 @@ object Actor { tap((_, msg) ⇒ monitor ! msg, unitFunction, behavior) /** - * Wrap the given behavior such that it is restarted (i.e. reset to its - * initial state) whenever it throws an exception of the given class or a - * subclass thereof. Exceptions that are not subtypes of `Thr` will not be + * Wrap the given behavior with the given [[SupervisorStrategy]] for + * the given exception. + * Exceptions that are not subtypes of `Thr` will not be * caught and thus lead to the termination of the actor. * * It is possible to specify different supervisor strategies, such as restart, @@ -247,7 +247,7 @@ object Actor { private final val NothingClassTag = ClassTag(classOf[Nothing]) private final val ThrowableClassTag = ClassTag(classOf[Throwable]) final class Supervise[T] private[akka] (val wrapped: Behavior[T]) extends AnyVal { - /** Specify the [[SupervisorStrategy]] to be invoked when the wrapped behaior throws. */ + /** Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws. */ def onFailure[Thr <: Throwable: ClassTag](strategy: SupervisorStrategy): Behavior[T] = { val tag = implicitly[ClassTag[Thr]] val effectiveTag = if (tag == NothingClassTag) ThrowableClassTag else tag diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ClusterShardingPersistenceSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ClusterShardingPersistenceSpec.scala index 89cf849d7c..c934e6b9ee 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ClusterShardingPersistenceSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ClusterShardingPersistenceSpec.scala @@ -1,19 +1,13 @@ /* * Copyright (C) 2017 Lightbend Inc. */ - package akka.cluster.sharding.typed -import akka.actor.typed.{ ActorRef, Props, TypedSpec } -import com.typesafe.config.ConfigFactory -import org.scalatest.concurrent.ScalaFutures - -import scala.concurrent.duration._ -import akka.actor.typed.Behavior -import akka.testkit.typed.TestKitSettings -import akka.testkit.typed.scaladsl.TestProbe +import akka.actor.typed.{ ActorRef, Behavior, Props, TypedAkkaSpecWithShutdown } import akka.persistence.typed.scaladsl.PersistentActor -import akka.persistence.typed.scaladsl.PersistentActor.PersistNothing +import akka.testkit.typed.TestKit +import akka.testkit.typed.scaladsl.TestProbe +import com.typesafe.config.ConfigFactory object ClusterShardingPersistenceSpec { val config = ConfigFactory.parseString( @@ -48,7 +42,7 @@ object ClusterShardingPersistenceSpec { PersistentActor.persistentEntity[Command, String, String]( persistenceIdFromActorName = name ⇒ "Test-" + name, initialState = "", - commandHandler = CommandHandler((ctx, state, cmd) ⇒ cmd match { + commandHandler = CommandHandler((_, state, cmd) ⇒ cmd match { case Add(s) ⇒ Effect.persist(s) case Get(replyTo) ⇒ replyTo ! state @@ -61,12 +55,12 @@ object ClusterShardingPersistenceSpec { } -class ClusterShardingPersistenceSpec extends TypedSpec(ClusterShardingPersistenceSpec.config) with ScalaFutures { - import akka.actor.typed.scaladsl.adapter._ +class ClusterShardingPersistenceSpec extends TestKit("ClusterShardingPersistenceSPec", ClusterShardingPersistenceSpec.config) + with TypedAkkaSpecWithShutdown { import ClusterShardingPersistenceSpec._ + import akka.actor.typed.scaladsl.adapter._ implicit val s = system - implicit val testkitSettings = TestKitSettings(system) val sharding = ClusterSharding(system) implicit val untypedSystem = system.toUntyped diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ClusterShardingSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ClusterShardingSpec.scala index a889cc0e78..6e3a835b78 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ClusterShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ClusterShardingSpec.scala @@ -4,25 +4,21 @@ package akka.cluster.sharding.typed -import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy -import akka.actor.typed.{ ActorRef, ActorRefResolver, ActorSystem, Props, TypedSpec } -import akka.cluster.typed.Cluster -import akka.actor.typed.internal.adapter.ActorSystemAdapter +import java.nio.charset.StandardCharsets + +import akka.actor.ExtendedActorSystem +import akka.actor.typed.{ ActorRef, ActorRefResolver, Props, TypedAkkaSpecWithShutdown } import akka.actor.typed.scaladsl.Actor import akka.actor.typed.scaladsl.adapter._ -import akka.testkit.typed.TestKitSettings +import akka.cluster.MemberStatus +import akka.cluster.typed.{ Cluster, Join } +import akka.serialization.SerializerWithStringManifest +import akka.testkit.typed.TestKit import akka.testkit.typed.scaladsl.TestProbe import com.typesafe.config.ConfigFactory -import org.scalatest.concurrent.ScalaFutures +import org.scalatest.concurrent.{ Eventually, ScalaFutures } import scala.concurrent.duration._ -import scala.concurrent.Await -import akka.cluster.typed.Join -import org.scalatest.concurrent.Eventually -import akka.cluster.MemberStatus -import akka.actor.ExtendedActorSystem -import akka.serialization.SerializerWithStringManifest -import java.nio.charset.StandardCharsets object ClusterShardingSpec { val config = ConfigFactory.parseString( @@ -118,13 +114,13 @@ object ClusterShardingSpec { } -class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with ScalaFutures with Eventually { +class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterShardingSpec.config) + with TypedAkkaSpecWithShutdown with ScalaFutures with Eventually { - import akka.actor.typed.scaladsl.adapter._ import ClusterShardingSpec._ + import akka.actor.typed.scaladsl.adapter._ implicit val s = system - implicit val testkitSettings = TestKitSettings(system) val sharding = ClusterSharding(system) implicit val untypedSystem = system.toUntyped @@ -134,7 +130,7 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca val sharding2 = ClusterSharding(system2) override def afterAll(): Unit = { - Await.result(system2.terminate, timeout.duration) + system2.terminate().futureValue super.afterAll() } @@ -173,17 +169,17 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca Cluster(system2).manager ! Join(Cluster(system).selfMember.address) eventually { - Cluster(system).state.members.map(_.status) should ===(Set(MemberStatus.Up)) + Cluster(system).state.members.map(_.status) should ===(Set[MemberStatus](MemberStatus.Up)) Cluster(system).state.members.size should ===(2) } eventually { - Cluster(system2).state.members.map(_.status) should ===(Set(MemberStatus.Up)) + Cluster(system2).state.members.map(_.status) should ===(Set[MemberStatus](MemberStatus.Up)) Cluster(system2).state.members.size should ===(2) } } - "send messsages via cluster sharding, using envelopes" in { + "send messages via cluster sharding, using envelopes" in { val ref = sharding.spawn( behavior, Props.empty, @@ -207,7 +203,7 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca } } - "send messsages via cluster sharding, without envelopes" in { + "send messages via cluster sharding, without envelopes" in { val ref = sharding.spawn( behaviorWithId, Props.empty, @@ -264,8 +260,6 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca val bobRef = sharding.entityRefFor(typeKey, "bob") val charlieRef = sharding.entityRefFor(typeKey, "charlie") - val p = TestProbe[String]() - val reply1 = bobRef ? WhoAreYou // TODO document that WhoAreYou(_) would not work reply1.futureValue should ===("I'm bob") diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ShardingSerializerSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ShardingSerializerSpec.scala index 2b5d44a82e..6e922b5231 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ShardingSerializerSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ShardingSerializerSpec.scala @@ -3,14 +3,13 @@ */ package akka.cluster.sharding.typed +import akka.actor.typed.TypedAkkaSpecWithShutdown +import akka.actor.typed.internal.adapter.ActorSystemAdapter import akka.cluster.sharding.typed.internal.ShardingSerializer import akka.serialization.SerializationExtension -import akka.actor.typed.TypedSpec -import akka.cluster.sharding.typed.internal.ShardingSerializer -import akka.actor.typed.internal.adapter.ActorSystemAdapter -import akka.actor.typed.scaladsl.AskPattern._ +import akka.testkit.typed.TestKit -class ShardingSerializerSpec extends TypedSpec { +class ShardingSerializerSpec extends TestKit with TypedAkkaSpecWithShutdown { "The typed ShardingSerializer" must { diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ConstantRateEntityRecoveryStrategySpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ConstantRateEntityRecoveryStrategySpec.scala index fd278dc264..762da01566 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ConstantRateEntityRecoveryStrategySpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ConstantRateEntityRecoveryStrategySpec.scala @@ -12,13 +12,12 @@ class ConstantRateEntityRecoveryStrategySpec extends AkkaSpec { import system.dispatcher val strategy = EntityRecoveryStrategy.constantStrategy(system, 1.second, 2) - "ConstantRateEntityRecoveryStrategy" must { - "recover entities" taggedAs (TimingTest) in { + "recover entities" taggedAs TimingTest in { val entities = Set[EntityId]("1", "2", "3", "4", "5") val startTime = System.nanoTime() val resultWithTimes = strategy.recoverEntities(entities).map( - _.map(entityIds ⇒ (entityIds → (System.nanoTime() - startTime).nanos))) + _.map(entityIds ⇒ entityIds → (System.nanoTime() - startTime).nanos)) val result = Await.result(Future.sequence(resultWithTimes), 6.seconds) .toVector.sortBy { case (_, duration) ⇒ duration } diff --git a/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java b/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java index 3fcc6ee8cb..5534619875 100644 --- a/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java +++ b/akka-cluster-typed/src/test/java/akka/cluster/ddata/typed/javadsl/ReplicatorTest.java @@ -31,7 +31,7 @@ import akka.actor.typed.javadsl.ActorContext; public class ReplicatorTest extends JUnitSuite { - static interface ClientCommand { + interface ClientCommand { } static final class Increment implements ClientCommand { @@ -53,13 +53,13 @@ public class ReplicatorTest extends JUnitSuite { } } - static interface InternalMsg extends ClientCommand { + interface InternalMsg extends ClientCommand { } static final class InternalUpdateResponse implements InternalMsg { final Replicator.UpdateResponse rsp; - public InternalUpdateResponse(Replicator.UpdateResponse rsp) { + InternalUpdateResponse(Replicator.UpdateResponse rsp) { this.rsp = rsp; } } @@ -67,7 +67,7 @@ public class ReplicatorTest extends JUnitSuite { static final class InternalGetResponse implements InternalMsg { final Replicator.GetResponse rsp; - public InternalGetResponse(Replicator.GetResponse rsp) { + InternalGetResponse(Replicator.GetResponse rsp) { this.rsp = rsp; } } @@ -75,7 +75,7 @@ public class ReplicatorTest extends JUnitSuite { static final class InternalChanged implements InternalMsg { final Replicator.Changed chg; - public InternalChanged(Replicator.Changed chg) { + InternalChanged(Replicator.Changed chg) { this.chg = chg; } } @@ -91,15 +91,15 @@ public class ReplicatorTest extends JUnitSuite { private int cachedValue = 0; - public Client(ActorRef replicator, Cluster node, ActorContext ctx) { + Client(ActorRef replicator, Cluster node, ActorContext ctx) { this.replicator = replicator; this.node = node; - updateResponseAdapter = ctx.spawnAdapter(m -> new InternalUpdateResponse<>(m)); + updateResponseAdapter = ctx.spawnAdapter(InternalUpdateResponse::new); - getResponseAdapter = ctx.spawnAdapter(m -> new InternalGetResponse<>(m)); + getResponseAdapter = ctx.spawnAdapter(InternalGetResponse::new); - changedAdapter = ctx.spawnAdapter(m -> new InternalChanged<>(m)); + changedAdapter = ctx.spawnAdapter(InternalChanged::new); replicator.tell(new Replicator.Subscribe<>(Key, changedAdapter)); } @@ -113,7 +113,7 @@ public class ReplicatorTest extends JUnitSuite { return receiveBuilder() .onMessage(Increment.class, cmd -> { replicator.tell( - new Replicator.Update(Key, GCounter.empty(), Replicator.writeLocal(), updateResponseAdapter, + new Replicator.Update<>(Key, GCounter.empty(), Replicator.writeLocal(), updateResponseAdapter, curr -> curr.increment(node, 1))); return this; }) @@ -122,7 +122,7 @@ public class ReplicatorTest extends JUnitSuite { }) .onMessage(GetValue.class, cmd -> { replicator.tell( - new Replicator.Get(Key, Replicator.readLocal(), getResponseAdapter, Optional.of(cmd.replyTo))); + new Replicator.Get<>(Key, Replicator.readLocal(), getResponseAdapter, Optional.of(cmd.replyTo))); return this; }) .onMessage(GetCachedValue.class, cmd -> { diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala index bdc21c2cb1..69770db6ed 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala @@ -4,15 +4,15 @@ package akka.cluster.ddata.typed.scaladsl import akka.actor.Scheduler -import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, StartSupport, TypedSpec } -import akka.actor.typed.scaladsl.Actor +import akka.actor.typed.{ ActorRef, Behavior, TypedAkkaSpecWithShutdown } import akka.actor.typed.scaladsl.AskPattern._ +import akka.actor.typed.scaladsl.Actor import akka.actor.typed.scaladsl.adapter._ -import akka.testkit.typed.TestKitSettings -import akka.testkit.typed.scaladsl._ import akka.cluster.Cluster -import akka.cluster.ddata.{ GCounter, GCounterKey, ReplicatedData } import akka.cluster.ddata.typed.scaladsl.Replicator._ +import akka.cluster.ddata.{ GCounter, GCounterKey, ReplicatedData } +import akka.testkit.typed.scaladsl._ +import akka.testkit.typed.{ TestKit, TestKitSettings } import akka.util.Timeout import com.typesafe.config.ConfigFactory import org.scalatest.concurrent.Eventually @@ -107,14 +107,14 @@ object ReplicatorSpec { val reply4: Future[ReplicaCount] = replicator ? Replicator.GetReplicaCount() - // supress unused compiler warnings + // suppress unused compiler warnings println("" + reply1 + reply2 + reply3 + reply4) } } } -class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) with Eventually with StartSupport { +class ReplicatorSpec extends TestKit(ReplicatorSpec.config) with TypedAkkaSpecWithShutdown with Eventually { import ReplicatorSpec._ @@ -125,8 +125,8 @@ class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) with Eventually wi "Replicator" must { "have API for Update and Get" in { - val replicator = start(Replicator.behavior(settings)) - val c = start(client(replicator)) + val replicator = spawn(Replicator.behavior(settings)) + val c = spawn(client(replicator)) val probe = TestProbe[Int] c ! Increment @@ -135,8 +135,8 @@ class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) with Eventually wi } "have API for Subscribe" in { - val replicator = start(Replicator.behavior(settings)) - val c = start(client(replicator)) + val replicator = spawn(Replicator.behavior(settings)) + val c = spawn(client(replicator)) val probe = TestProbe[Int] c ! Increment @@ -154,7 +154,7 @@ class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) with Eventually wi "have an extension" in { val replicator = DistributedData(system).replicator - val c = start(client(replicator)) + val c = spawn(client(replicator)) val probe = TestProbe[Int] c ! Increment diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterApiSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterApiSpec.scala index 7abefc7289..9db2a8a56b 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterApiSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterApiSpec.scala @@ -3,13 +3,12 @@ */ package akka.cluster.typed +import akka.actor.typed.TypedAkkaSpecWithShutdown +import akka.actor.typed.scaladsl.adapter._ import akka.cluster.ClusterEvent._ import akka.cluster.MemberStatus -import akka.actor.typed.TypedSpec -import akka.actor.typed.internal.adapter.ActorSystemAdapter -import akka.actor.typed.scaladsl.adapter._ -import akka.testkit.typed.TestKitSettings import akka.testkit.typed.scaladsl.TestProbe +import akka.testkit.typed.{ TestKit, TestKitSettings } import com.typesafe.config.ConfigFactory import org.scalatest.concurrent.ScalaFutures @@ -33,7 +32,7 @@ object ClusterApiSpec { """) } -class ClusterApiSpec extends TypedSpec(ClusterApiSpec.config) with ScalaFutures { +class ClusterApiSpec extends TestKit("ClusterApiSpec", ClusterApiSpec.config) with TypedAkkaSpecWithShutdown with ScalaFutures { val testSettings = TestKitSettings(system) val clusterNode1 = Cluster(system) diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonApiSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonApiSpec.scala index 9e35683009..2e1fc24a07 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonApiSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonApiSpec.scala @@ -6,14 +6,13 @@ package akka.cluster.typed import java.nio.charset.StandardCharsets import akka.actor.ExtendedActorSystem -import akka.serialization.SerializerWithStringManifest import akka.actor.typed.scaladsl.Actor import akka.actor.typed.scaladsl.adapter._ -import akka.testkit.typed.TestKitSettings +import akka.testkit.typed.{ TestKit, TestKitSettings } import akka.testkit.typed.scaladsl.TestProbe -import akka.actor.typed.{ ActorRef, ActorRefResolver, Props, TypedSpec } +import akka.actor.typed.{ ActorRef, ActorRefResolver, Props, TypedAkkaSpecWithShutdown } +import akka.serialization.SerializerWithStringManifest import com.typesafe.config.ConfigFactory -import org.scalatest.concurrent.ScalaFutures import scala.concurrent.Await import scala.concurrent.duration._ @@ -49,7 +48,7 @@ object ClusterSingletonApiSpec { case object Perish extends PingProtocol - val pingPong = Actor.immutable[PingProtocol] { (ctx, msg) ⇒ + val pingPong = Actor.immutable[PingProtocol] { (_, msg) ⇒ msg match { case Ping(respondTo) ⇒ @@ -84,7 +83,7 @@ object ClusterSingletonApiSpec { } } -class ClusterSingletonApiSpec extends TypedSpec(ClusterSingletonApiSpec.config) with ScalaFutures { +class ClusterSingletonApiSpec extends TestKit("ClusterSingletonApiSpec", ClusterSingletonApiSpec.config) with TypedAkkaSpecWithShutdown { import ClusterSingletonApiSpec._ implicit val testSettings = TestKitSettings(system) diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala index cef9a29b29..a387804e39 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala @@ -4,16 +4,12 @@ package akka.cluster.typed -import akka.actor.typed.ActorRef -import akka.actor.typed.Behavior -import akka.actor.typed.Props -import akka.actor.typed.TypedSpec +import akka.actor.typed.{ ActorRef, Behavior, Props, TypedAkkaSpecWithShutdown } import akka.persistence.typed.scaladsl.PersistentActor import akka.persistence.typed.scaladsl.PersistentActor.{ CommandHandler, Effect } -import akka.testkit.typed.TestKitSettings +import akka.testkit.typed.TestKit import akka.testkit.typed.scaladsl.TestProbe import com.typesafe.config.ConfigFactory -import org.scalatest.concurrent.ScalaFutures object ClusterSingletonPersistenceSpec { val config = ConfigFactory.parseString( @@ -44,7 +40,7 @@ object ClusterSingletonPersistenceSpec { PersistentActor.immutable[Command, String, String]( persistenceId = "TheSingleton", initialState = "", - commandHandler = CommandHandler((ctx, state, cmd) ⇒ cmd match { + commandHandler = CommandHandler((_, state, cmd) ⇒ cmd match { case Add(s) ⇒ Effect.persist(s) case Get(replyTo) ⇒ replyTo ! state @@ -55,12 +51,11 @@ object ClusterSingletonPersistenceSpec { } -class ClusterSingletonPersistenceSpec extends TypedSpec(ClusterSingletonPersistenceSpec.config) with ScalaFutures { +class ClusterSingletonPersistenceSpec extends TestKit(ClusterSingletonPersistenceSpec.config) with TypedAkkaSpecWithShutdown { import ClusterSingletonPersistenceSpec._ import akka.actor.typed.scaladsl.adapter._ implicit val s = system - implicit val testkitSettings = TestKitSettings(system) implicit val untypedSystem = system.toUntyped private val untypedCluster = akka.cluster.Cluster(untypedSystem) diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala index 66d069f32d..4d114580f0 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala @@ -6,14 +6,14 @@ package akka.cluster.typed.internal.receptionist import java.nio.charset.StandardCharsets import akka.actor.ExtendedActorSystem -import akka.cluster.Cluster -import akka.serialization.SerializerWithStringManifest -import akka.actor.typed.{ ActorRef, ActorRefResolver, StartSupport, TypedSpec } +import akka.actor.typed.{ ActorRef, ActorRefResolver, TypedAkkaSpecWithShutdown } import akka.actor.typed.internal.adapter.ActorSystemAdapter import akka.actor.typed.receptionist.Receptionist import akka.actor.typed.scaladsl.Actor import akka.actor.typed.scaladsl.adapter._ -import akka.testkit.typed.TestKitSettings +import akka.cluster.Cluster +import akka.serialization.SerializerWithStringManifest +import akka.testkit.typed.{ TestKit, TestKitSettings } import akka.testkit.typed.scaladsl.TestProbe import com.typesafe.config.ConfigFactory @@ -51,7 +51,7 @@ object ClusterReceptionistSpec { case object Perish extends PingProtocol - val pingPong = Actor.immutable[PingProtocol] { (ctx, msg) ⇒ + val pingPong = Actor.immutable[PingProtocol] { (_, msg) ⇒ msg match { case Ping(respondTo) ⇒ @@ -61,7 +61,6 @@ object ClusterReceptionistSpec { case Perish ⇒ Actor.stopped } - } class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { @@ -88,7 +87,8 @@ object ClusterReceptionistSpec { val PingKey = Receptionist.ServiceKey[PingProtocol]("pingy") } -class ClusterReceptionistSpec extends TypedSpec(ClusterReceptionistSpec.config) with StartSupport { +class ClusterReceptionistSpec extends TestKit("ClusterReceptionistSpec", ClusterReceptionistSpec.config) + with TypedAkkaSpecWithShutdown { import ClusterReceptionistSpec._ @@ -117,7 +117,7 @@ class ClusterReceptionistSpec extends TypedSpec(ClusterReceptionistSpec.config) adaptedSystem2.receptionist ! Subscribe(PingKey, regProbe2.ref) regProbe2.expectMsg(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) - val service = start(pingPong) + val service = spawn(pingPong) system.receptionist ! Register(PingKey, service, regProbe.ref) regProbe.expectMsg(Registered(PingKey, service)) diff --git a/akka-docs/src/main/paradox/actors-typed.md b/akka-docs/src/main/paradox/actors-typed.md index 6b279d8357..5979c7b78e 100644 --- a/akka-docs/src/main/paradox/actors-typed.md +++ b/akka-docs/src/main/paradox/actors-typed.md @@ -15,13 +15,11 @@ This module is currently marked as @ref:[may change](common/may-change.md) in th * `Inbox` has been renamed to `TestInbox` to allign with `TestProbe` * Separated into modules e.g. `akka-actor-typed` `akka-persistence-typed` along with matching package names -### Dependency - -To use typed actors add the following dependency: +To use Akka Typed add the following dependency: @@dependency [sbt,Maven,Gradle] { group=com.typesafe.akka - artifact=akka-actor-typed_2.11 + artifact=akka-actor-typed_2.12 version=$version$ } diff --git a/akka-docs/src/main/paradox/cluster-sharding-typed.md b/akka-docs/src/main/paradox/cluster-sharding-typed.md index 2d9c352a52..3c45dc7576 100644 --- a/akka-docs/src/main/paradox/cluster-sharding-typed.md +++ b/akka-docs/src/main/paradox/cluster-sharding-typed.md @@ -1,11 +1,9 @@ # Sharding -TODO - -### Dependency - @@dependency [sbt,Maven,Gradle] { group=com.typesafe.akka - artifact=akka-cluster-sharding-typed_2.11 + artifact=akka-cluster-sharding-typed_2.12 version=$version$ } + +TODO diff --git a/akka-docs/src/main/paradox/fault-tolerance-typed.md b/akka-docs/src/main/paradox/fault-tolerance-typed.md new file mode 100644 index 0000000000..c01a36508f --- /dev/null +++ b/akka-docs/src/main/paradox/fault-tolerance-typed.md @@ -0,0 +1,20 @@ +# Fault Tolerance + +As explained in @ref:[Actor Systems](general/actor-systems.md) each actor is the supervisor of its +children, and as such each actor defines fault handling supervisor strategy. +This strategy cannot be changed afterwards as it is an integral part of the +actor system’s structure. + +## Creating a Supervisor Strategy + +TODO + +### Default Supervisor Strategy + +### Restart Supervisor Strategy + +### Stopping Supervisor Strategy + +### Logging of Actor Failures + + diff --git a/akka-docs/src/main/paradox/index-typed.md b/akka-docs/src/main/paradox/index-typed.md index 77fe500ee4..130faec70d 100644 --- a/akka-docs/src/main/paradox/index-typed.md +++ b/akka-docs/src/main/paradox/index-typed.md @@ -5,6 +5,7 @@ @@@ index * [actors](actors-typed.md) +* [fault-tolerance-typed.md](fault-tolerance-typed.md) * [coexisting](coexisting.md) * [cluster](cluster-typed.md) * [cluster-sharding](cluster-sharding-typed.md) diff --git a/akka-docs/src/main/paradox/testing-typed.md b/akka-docs/src/main/paradox/testing-typed.md index 333446eeab..9367d159db 100644 --- a/akka-docs/src/main/paradox/testing-typed.md +++ b/akka-docs/src/main/paradox/testing-typed.md @@ -10,12 +10,11 @@ This module is currently marked as @ref:[may change](common/may-change.md) in th @@@ -To use the testkit add the following dependency: To use the testkit add the following dependency: @@dependency [sbt,Maven,Gradle] { group=com.typesafe.akka - artifact=akka-testkit-typed_2.11 + artifact=akka-testkit-typed_2.12 version=$version$ scope=test } @@ -63,7 +62,7 @@ Scala Java : @@snip [BasicSyncTestingTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/sync/BasicSyncTestingTest.java) { #child } -All of the tests make use of the `BehaviourTestkit` to avoid the need for a real `ActorContext`. Some of the tests +All of the tests make use of the `BehaviorTestkit` to avoid the need for a real `ActorContext`. Some of the tests make use of the `TestInbox` which allows the creation of an `ActorRef` that can be used for synchronous testing, similar to the `TestProbe` used for asynchronous testing. @@ -124,11 +123,11 @@ The `BehaviorTestkit` keeps track other effects you can verify, look at the sub- * Unwatched * Scheduled -See the other public methods and API documentation on `BehaviourTestkit` for other types of verification. +See the other public methods and API documentation on `BehaviorTestkit` for other types of verification. ## Asynchronous testing -Asynchronous testing uses a real `ActorSystem` that allows you to test your Actors in a realistic environment. +Asynchronous testing uses a real `ActorSystem` that allows you to test your Actors in a more realistic environment. The minimal setup consists of the test procedure, which provides the desired stimuli, the actor under test, and an actor receiving replies. Bigger systems replace the actor under test with a network of actors, apply stimuli @@ -145,7 +144,10 @@ Scala Java : @@snip [BasicAsyncTestingTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/async/BasicAsyncTestingTest.java) { #under-test } -Tests can optionally extend `TestKit` or include the `TestKitBase`. +Tests extend `TestKit` or include the `TestKitBase`. This provides access to +* An ActorSystem +* Methods for spawning Actors. These are created under the root guardian +* Methods for creating system actors Scala : @@snip [BasicTestingSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/testing/async/BasicAsyncTestingSpec.scala) { #test-header } @@ -163,13 +165,21 @@ Java The following demonstrates: -* Creating a typed actor from the `TestKit`'s system using `actorOf` +* Creating a typed actor from the `TestKit`'s system using `spawn` * Creating a typed `TestProbe` * Verifying that the actor under test responds via the `TestProbe` Scala -: @@snip [BasicTestingSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/testing/async/BasicAsyncTestingSpec.scala) { #test } +: @@snip [BasicTestingSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/testing/async/BasicAsyncTestingSpec.scala) { #test-spawn } Java -: @@snip [BasicAsyncTestingTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/async/BasicAsyncTestingTest.java) { #test } +: @@snip [BasicAsyncTestingTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/async/BasicAsyncTestingTest.java) { #test-spawn } + +Actors can also be spawned anonymously: + +Scala +: @@snip [BasicTestingSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/testing/async/BasicAsyncTestingSpec.scala) { #test-spawn-anonymous } + +Java +: @@snip [BasicAsyncTestingTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/async/BasicAsyncTestingTest.java) { #test-spawn-anonymous } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorSpec.scala index 5c330f7a0f..83a4da2158 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorSpec.scala @@ -4,9 +4,10 @@ package akka.persistence.typed.scaladsl import scala.concurrent.duration._ -import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, StartSupport, SupervisorStrategy, Terminated, TypedSpec } +import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, SupervisorStrategy, Terminated, TypedAkkaSpecWithShutdown } import akka.actor.typed.scaladsl.Actor import akka.testkit.typed.TestKitSettings +import akka.testkit.typed.TestKit import akka.testkit.typed.scaladsl._ import com.typesafe.config.ConfigFactory import org.scalatest.concurrent.Eventually @@ -39,7 +40,7 @@ object PersistentActorSpec { val firstLogging = "first logging" val secondLogging = "second logging" - def counter(persistenceId: String)(implicit actorSystem: ActorSystem[TypedSpec.Command], testSettings: TestKitSettings): Behavior[Command] = + def counter(persistenceId: String)(implicit actorSystem: ActorSystem[_], testSettings: TestKitSettings): Behavior[Command] = counter(persistenceId, TestProbe[String].ref) def counter(persistenceId: String, loggingActor: ActorRef[String]): Behavior[Command] = { @@ -106,8 +107,7 @@ object PersistentActorSpec { } -class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eventually with StartSupport { - +class PersistentActorSpec extends TestKit(PersistentActorSpec.config) with Eventually with TypedAkkaSpecWithShutdown { import PersistentActorSpec._ implicit val testSettings = TestKitSettings(system) @@ -115,7 +115,7 @@ class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eve "A typed persistent actor" must { "persist an event" in { - val c = start(counter("c1")) + val c = spawn(counter("c1")) val probe = TestProbe[State] c ! Increment @@ -124,7 +124,7 @@ class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eve } "replay stored events" in { - val c = start(counter("c2")) + val c = spawn(counter("c2")) val probe = TestProbe[State] c ! Increment @@ -133,7 +133,7 @@ class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eve c ! GetValue(probe.ref) probe.expectMsg(State(3, Vector(0, 1, 2))) - val c2 = start(counter("c2")) + val c2 = spawn(counter("c2")) c2 ! GetValue(probe.ref) probe.expectMsg(State(3, Vector(0, 1, 2))) c2 ! Increment @@ -142,7 +142,7 @@ class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eve } "handle Terminated signal" in { - val c = start(counter("c3")) + val c = spawn(counter("c3")) val probe = TestProbe[State] c ! Increment @@ -154,7 +154,7 @@ class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eve } "handle receive timeout" in { - val c = start(counter("c4")) + val c = spawn(counter("c4")) val probe = TestProbe[State] c ! Increment @@ -173,7 +173,7 @@ class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eve */ "chainable side effects with events" in { val loggingProbe = TestProbe[String] - val c = start(counter("c5", loggingProbe.ref)) + val c = spawn(counter("c5", loggingProbe.ref)) val probe = TestProbe[State] @@ -188,7 +188,7 @@ class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eve /** Proves that side-effects are called when emitting an empty list of events */ "chainable side effects without events" in { val loggingProbe = TestProbe[String] - val c = start(counter("c6", loggingProbe.ref)) + val c = spawn(counter("c6", loggingProbe.ref)) val probe = TestProbe[State] c ! EmptyEventsListAndThenLog @@ -200,7 +200,7 @@ class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eve /** Proves that side-effects are called when explicitly calling Effect.none */ "chainable side effects when doing nothing (Effect.none)" in { val loggingProbe = TestProbe[String] - val c = start(counter("c7", loggingProbe.ref)) + val c = spawn(counter("c7", loggingProbe.ref)) val probe = TestProbe[State] c ! DoNothingAndThenLog @@ -217,7 +217,7 @@ class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eve val probe = TestProbe[State] val behavior = Actor.supervise[Command](counter("c13")) .onFailure(SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 0.1)) - val c = start(behavior) + val c = spawn(behavior) c ! Increment c ! GetValue(probe.ref) probe.expectMsg(State(1, Vector(0))) diff --git a/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala b/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala index 786cb6548c..e6f9b0031c 100644 --- a/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala @@ -5,7 +5,7 @@ package akka.remote.serialization import akka.actor._ -import akka.remote.{ MessageSerializer, RemoteScope, RemoteWatcher } +import akka.remote.{ RemoteScope, RemoteWatcher } import akka.serialization.SerializationExtension import akka.testkit.AkkaSpec import com.typesafe.config.ConfigFactory diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/BehaviourTestkit.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/BehaviourTestkit.scala index 050500e0db..a7b5cfa86a 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/BehaviourTestkit.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/BehaviourTestkit.scala @@ -114,7 +114,7 @@ object BehaviorTestkit { * JAVA API */ def create[T](initialBehavior: Behavior[T]): BehaviorTestkit[T] = - apply(initialBehavior, "ctx") + apply(initialBehavior, "testkit") } /** @@ -122,7 +122,7 @@ object BehaviorTestkit { * watching and offers access to what effects have taken place. */ @ApiMayChange -class BehaviorTestkit[T](_name: String, _initialBehavior: Behavior[T]) { +class BehaviorTestkit[T] private (_name: String, _initialBehavior: Behavior[T]) { import Effect._ diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/StubbedActorContext.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/StubbedActorContext.scala index c1181fdb0e..d07915c5b8 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/StubbedActorContext.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/StubbedActorContext.scala @@ -65,6 +65,7 @@ private[typed] abstract class WatchableRef[-T](override val path: a.ActorPath) e protected def terminate(): Unit type S = Set[ActorRefImpl[Nothing]] + @volatile private[this] var _watchedBy: S = Set.empty protected def isAlive: Boolean = _watchedBy != null diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala index 4f46ffdbc7..fa9be369e7 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala @@ -1,51 +1,104 @@ package akka.testkit.typed +import akka.actor.typed.scaladsl.Actor +import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.{ ActorRef, ActorSystem, Behavior } import akka.annotation.ApiMayChange +import akka.testkit.typed.TestKit._ import akka.util.Timeout +import com.typesafe.config.Config import scala.concurrent.duration._ import scala.concurrent.{ Await, TimeoutException } +object TestKit { + + private[akka] sealed trait TestKitCommand + private[akka] case class SpawnActor[T](name: String, behavior: Behavior[T], replyTo: ActorRef[ActorRef[T]]) extends TestKitCommand + private[akka] case class SpawnActorAnonymous[T](behavior: Behavior[T], replyTo: ActorRef[ActorRef[T]]) extends TestKitCommand + + private val testKitGuardian = Actor.immutable[TestKitCommand] { + case (ctx, SpawnActor(name, behavior, reply)) ⇒ + reply ! ctx.spawn(behavior, name) + Actor.same + case (ctx, SpawnActorAnonymous(behavior, reply)) ⇒ + reply ! ctx.spawnAnonymous(behavior) + Actor.same + } + + private def getCallerName(clazz: Class[_]): String = { + val s = (Thread.currentThread.getStackTrace map (_.getClassName) drop 1) + .dropWhile(_ matches "(java.lang.Thread|.*\\.Abstract.*)") + val reduced = s.lastIndexWhere(_ == clazz.getName) match { + case -1 ⇒ s + case z ⇒ s drop (z + 1) + } + reduced.head.replaceFirst(""".*\.""", "").replaceAll("[^a-zA-Z_0-9]", "_") + } + + def shutdown( + system: ActorSystem[_], + duration: Duration, + verifySystemShutdown: Boolean = false): Unit = { + system.terminate() + try Await.ready(system.whenTerminated, duration) catch { + case _: TimeoutException ⇒ + val msg = "Failed to stop [%s] within [%s] \n%s".format(system.name, duration, + system.printTree) + if (verifySystemShutdown) throw new RuntimeException(msg) + else println(msg) + } + } + +} + /** * Testkit for typed actors. Extending this removes some boiler plate when testing * typed actors. * * If a test can't extend then use the [[TestKitBase]] trait - * - * @param _system The [ActorSystem] for the test */ @ApiMayChange -class TestKit(_system: ActorSystem[_]) extends TestKitBase { - implicit val system = _system +class TestKit(name: String, config: Option[Config]) extends TestKitBase { + def this() = this(TestKit.getCallerName(classOf[TestKit]), None) + def this(name: String) = this(name, None) + def this(config: Config) = this(TestKit.getCallerName(classOf[TestKit]), Some(config)) + def this(name: String, config: Config) = this(name, Some(config)) + import TestKit._ + implicit val system = ActorSystem(testKitGuardian, name, config = config) } @ApiMayChange trait TestKitBase { - def system: ActorSystem[_] + def system: ActorSystem[TestKitCommand] implicit def testkitSettings = TestKitSettings(system) + implicit def scheduler = system.scheduler + private val childName: Iterator[String] = Iterator.from(0).map(_.toString) + // FIXME testkit config + private val timeoutDuration = 5.seconds + implicit private val timeout = Timeout(timeoutDuration) def shutdown(): Unit = { - shutdown(system, 5.seconds) + TestKit.shutdown(system, timeoutDuration) } - def shutdown( - actorSystem: ActorSystem[_], - duration: Duration, - verifySystemShutdown: Boolean = false): Unit = { - system.terminate() - try Await.ready(actorSystem.whenTerminated, duration) catch { - case _: TimeoutException ⇒ - val msg = "Failed to stop [%s] within [%s] \n%s".format(actorSystem.name, duration, - actorSystem.printTree) - if (verifySystemShutdown) throw new RuntimeException(msg) - else println(msg) - } - } + /** + * Spawn the given behavior. This is created as a child of the test kit + * guardian + */ + def spawn[T](behavior: Behavior[T]): ActorRef[T] = + Await.result(system ? (SpawnActorAnonymous(behavior, _)), timeoutDuration) - // The only current impl of a typed actor system returns a Future.successful currently - // hence the hardcoded timeouts - def actorOf[T](behaviour: Behavior[T], name: String): ActorRef[T] = - Await.result(system.systemActorOf(behaviour, name)(Timeout(20.seconds)), 21.seconds) + /** + * Spawn the given behavior. This is created as a child of the test kit + * guardian + */ + def spawn[T](behavior: Behavior[T], name: String): ActorRef[T] = + Await.result(system ? (SpawnActor(name, behavior, _)), timeoutDuration) + + def systemActor[T](behaviour: Behavior[T], name: String): ActorRef[T] = + Await.result(system.systemActorOf(behaviour, name), timeoutDuration) + + def systemActor[T](behaviour: Behavior[T]): ActorRef[T] = + Await.result(system.systemActorOf(behaviour, childName.next()), timeoutDuration) } - diff --git a/akka-testkit-typed/src/test/scala/akka/testkit/typed/BehaviorTestkitSpec.scala b/akka-testkit-typed/src/test/scala/akka/testkit/typed/BehaviorTestkitSpec.scala index c8579d614c..c37c4d2708 100644 --- a/akka-testkit-typed/src/test/scala/akka/testkit/typed/BehaviorTestkitSpec.scala +++ b/akka-testkit-typed/src/test/scala/akka/testkit/typed/BehaviorTestkitSpec.scala @@ -82,56 +82,51 @@ class BehaviorTestkitSpec extends WordSpec with Matchers { private val props = Props.empty - "BehaviourTestkit's spawn" should { + "BehaviorTestkit's spawn" must { "create children when no props specified" in { - val ctx = BehaviorTestkit[Father.Command](Father.init()) - - ctx.run(SpawnChildren(2)) - val effects = ctx.retrieveAllEffects() + val testkit = BehaviorTestkit[Father.Command](Father.init()) + testkit.run(SpawnChildren(2)) + val effects = testkit.retrieveAllEffects() effects should contain only (Spawned(Child.initial, "child0"), Spawned(Child.initial, "child1", Props.empty)) } "create children when props specified and record effects" in { - val ctx = BehaviorTestkit[Father.Command](Father.init()) - - ctx.run(SpawnChildrenWithProps(2, props)) - val effects = ctx.retrieveAllEffects() + val testkit = BehaviorTestkit[Father.Command](Father.init()) + testkit.run(SpawnChildrenWithProps(2, props)) + val effects = testkit.retrieveAllEffects() effects should contain only (Spawned(Child.initial, "child0", props), Spawned(Child.initial, "child1", props)) } } - "BehaviourTestkit's spawnAnonymous" should { + "BehaviorTestkit's spawnAnonymous" must { "create children when no props specified and record effects" in { - val ctx = BehaviorTestkit[Father.Command](Father.init()) - - ctx.run(SpawnAnonymous(2)) - val effects = ctx.retrieveAllEffects() + val testkit = BehaviorTestkit[Father.Command](Father.init()) + testkit.run(SpawnAnonymous(2)) + val effects = testkit.retrieveAllEffects() effects shouldBe Seq(SpawnedAnonymous(Child.initial, Props.empty), SpawnedAnonymous(Child.initial, Props.empty)) } "create children when props specified and record effects" in { - val ctx = BehaviorTestkit[Father.Command](Father.init()) + val testkit = BehaviorTestkit[Father.Command](Father.init()) - ctx.run(SpawnAnonymousWithProps(2, props)) - val effects = ctx.retrieveAllEffects() + testkit.run(SpawnAnonymousWithProps(2, props)) + val effects = testkit.retrieveAllEffects() effects shouldBe Seq(SpawnedAnonymous(Child.initial, props), SpawnedAnonymous(Child.initial, props)) } } - "BehaviourTestkit's spawnAdapter" should { + "BehaviorTestkit's spawnAdapter" must { "create adapters without name and record effects" in { - val ctx = BehaviorTestkit[Father.Command](Father.init()) - - ctx.run(SpawnAdapter) - val effects = ctx.retrieveAllEffects() + val testkit = BehaviorTestkit[Father.Command](Father.init()) + testkit.run(SpawnAdapter) + val effects = testkit.retrieveAllEffects() effects shouldBe Seq(SpawnedAdapter) } "create adapters with name and record effects" in { - val ctx = BehaviorTestkit[Father.Command](Father.init()) - - ctx.run(SpawnAdapterWithName("adapter")) - val effects = ctx.retrieveAllEffects() + val testkit = BehaviorTestkit[Father.Command](Father.init()) + testkit.run(SpawnAdapterWithName("adapter")) + val effects = testkit.retrieveAllEffects() effects shouldBe Seq(SpawnedAdapter) } } diff --git a/akka-testkit-typed/src/test/scala/akka/typed/testkit/BehaviorTestkitSpec.scala b/akka-testkit-typed/src/test/scala/akka/typed/testkit/BehaviorTestkitSpec.scala new file mode 100644 index 0000000000..e69de29bb2