diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala index 4f8588e803..8778c2dd5e 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala @@ -4,865 +4,600 @@ package akka.actor.typed -import akka.actor.typed.scaladsl.Behaviors._ -import akka.actor.typed.scaladsl.{ AskPattern, Behaviors } -import akka.actor.{ ActorInitializationException, DeadLetterSuppression, InvalidMessageException } -import akka.testkit.AkkaSpec -import akka.testkit.TestEvent.Mute -import akka.util.Timeout -import com.typesafe.config.ConfigFactory -import org.scalactic.CanEqual +import akka.actor.InvalidMessageException +import akka.actor.typed.scaladsl.Behaviors +import akka.testkit.typed.scaladsl.{ ActorTestKit, TestProbe } -import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ -import scala.language.existentials -import scala.reflect.ClassTag -import scala.util.control.{ NoStackTrace, NonFatal } -object ActorContextSpec { +object ActorSpecMessages { sealed trait Command + sealed trait Event - sealed trait Monitor extends Event - final case class GotSignal(signal: Signal) extends Monitor with DeadLetterSuppression - final case object GotReceiveTimeout extends Monitor + case object Ping extends Command - final case object ReceiveTimeout extends Command + object Pong extends Event - final case class Ping(replyTo: ActorRef[Pong]) extends Command - sealed trait Pong extends Event - case object Pong1 extends Pong - case object Pong2 extends Pong + case class Renew(replyTo: ActorRef[Renewed.type]) extends Command - final case class Miss(replyTo: ActorRef[Missed.type]) extends Command - case object Missed extends Event - - final case class Renew(replyTo: ActorRef[Renewed.type]) extends Command case object Renewed extends Event - final case class Throw(ex: Exception) extends Command + case object Miss extends Command - final case class MkChild(name: Option[String], monitor: ActorRef[Monitor], replyTo: ActorRef[Created]) extends Command - final case class Created(ref: ActorRef[Command]) extends Event + case object Missed extends Event - final case class SetTimeout(duration: FiniteDuration, replyTo: ActorRef[TimeoutSet.type]) extends Command - case object TimeoutSet extends Event - - final case class Schedule[T](delay: FiniteDuration, target: ActorRef[T], msg: T, replyTo: ActorRef[Scheduled.type]) extends Command - case object Scheduled extends Event + case object Fail extends Command case object Stop extends Command - final case class Kill(ref: ActorRef[Nothing], replyTo: ActorRef[KillResult]) extends Command - sealed trait KillResult extends Event - case object Killed extends KillResult - case object NotKilled extends KillResult + case class StopRef[T](ref: ActorRef[T]) extends Command - final case class Watch(ref: ActorRef[Nothing], replyTo: ActorRef[Watched.type]) extends Command - case object Watched extends Event + case class GotSignal(signal: Signal) extends Event - final case class Unwatch(ref: ActorRef[Nothing], replyTo: ActorRef[Unwatched.type]) extends Command - case object Unwatched extends Event + case class GotChildSignal(signal: Signal) extends Event - final case class GetInfo(replyTo: ActorRef[Info]) extends Command - final case class Info(self: ActorRef[Command], system: ActorSystem[Nothing]) extends Event + case class ChildMade(ref: ActorRef[Command]) extends Event - final case class GetChild(name: String, replyTo: ActorRef[Child]) extends Command - final case class Child(c: Option[ActorRef[Nothing]]) extends Event + case object Inert extends Command - final case class GetChildren(replyTo: ActorRef[Children]) extends Command - final case class Children(c: Set[ActorRef[Nothing]]) extends Event + case object InertEvent extends Event - final case class ChildEvent(event: Event) extends Event + case class Watch(ref: ActorRef[Command]) extends Command - final case class BecomeInert(replyTo: ActorRef[BecameInert.type]) extends Command - case object BecameInert extends Event + case class UnWatch(ref: ActorRef[Command]) extends Command - final case class BecomeCareless(replyTo: ActorRef[BecameCareless.type]) extends Command - case object BecameCareless extends Event + case object TimeoutSet extends Event - final case class GetAdapter(replyTo: ActorRef[Adapter], name: String = "") extends Command - final case class Adapter(a: ActorRef[Command]) extends Event + case object ReceiveTimeout extends Command - def subject(monitor: ActorRef[Monitor], ignorePostStop: Boolean): Behavior[Command] = - Behaviors.immutable[Command] { - (ctx, message) ⇒ - message match { - case ReceiveTimeout ⇒ - monitor ! GotReceiveTimeout - Behaviors.same - case Ping(replyTo) ⇒ - replyTo ! Pong1 - Behaviors.same - case Miss(replyTo) ⇒ - replyTo ! Missed - Behaviors.unhandled - case Renew(replyTo) ⇒ - replyTo ! Renewed - subject(monitor, ignorePostStop) - case Throw(ex) ⇒ - throw ex - case MkChild(name, mon, replyTo) ⇒ - val child = name match { - case None ⇒ ctx.spawnAnonymous(Behaviors.supervise(subject(mon, ignorePostStop)).onFailure(SupervisorStrategy.restart)) - case Some(n) ⇒ ctx.spawn(Behaviors.supervise(subject(mon, ignorePostStop)).onFailure(SupervisorStrategy.restart), n) - } - replyTo ! Created(child) - Behaviors.same - case SetTimeout(d, replyTo) ⇒ - d match { - case f: FiniteDuration ⇒ ctx.setReceiveTimeout(f, ReceiveTimeout) - case _ ⇒ ctx.cancelReceiveTimeout() - } - replyTo ! TimeoutSet - Behaviors.same - case Schedule(delay, target, msg, replyTo) ⇒ - replyTo ! Scheduled - ctx.schedule(delay, target, msg) - Behaviors.same - case Stop ⇒ - Behaviors.stopped - case Kill(ref, replyTo) ⇒ - try { - ctx.stop(ref) - replyTo ! Killed - } catch { - case ex: IllegalArgumentException ⇒ replyTo ! NotKilled - } - Behaviors.same - case Watch(ref, replyTo) ⇒ - ctx.watch(ref) - replyTo ! Watched - Behaviors.same - case Unwatch(ref, replyTo) ⇒ - ctx.unwatch(ref) - replyTo ! Unwatched - Behaviors.same - case GetInfo(replyTo) ⇒ - replyTo ! Info(ctx.self, ctx.system) - Behaviors.same - case GetChild(name, replyTo) ⇒ - replyTo ! Child(ctx.child(name)) - Behaviors.same - case GetChildren(replyTo) ⇒ - replyTo ! Children(ctx.children.toSet) - Behaviors.same - case BecomeInert(replyTo) ⇒ - replyTo ! BecameInert - Behaviors.immutable { - case (_, Ping(replyTo)) ⇒ - replyTo ! Pong2 - Behaviors.same - case (_, Throw(ex)) ⇒ - throw ex - case _ ⇒ Behaviors.unhandled - } - case BecomeCareless(replyTo) ⇒ - replyTo ! BecameCareless - Behaviors.immutable[Command] { - case (_, _) ⇒ Behaviors.unhandled - } onSignal { - case (_, PostStop) if ignorePostStop ⇒ Behaviors.same // ignore PostStop here - case (_, Terminated(_)) ⇒ Behaviors.unhandled - case (_, sig) ⇒ - monitor ! GotSignal(sig) - Behaviors.same - } - case GetAdapter(replyTo, name) ⇒ - replyTo ! Adapter(ctx.spawnMessageAdapter(identity, name)) - Behaviors.same - } - } onSignal { - case (_, PostStop) if ignorePostStop ⇒ Behaviors.same // ignore PostStop here - case (ctx, signal) ⇒ monitor ! GotSignal(signal); Behaviors.same - } + case class SetTimeout(duration: FiniteDuration) extends Command - def oldSubject(monitor: ActorRef[Monitor], ignorePostStop: Boolean): Behavior[Command] = { - Behaviors.immutable[Command] { - case (ctx, message) ⇒ message match { - case ReceiveTimeout ⇒ - monitor ! GotReceiveTimeout - Behaviors.same - case Ping(replyTo) ⇒ - replyTo ! Pong1 - Behaviors.same - case Miss(replyTo) ⇒ - replyTo ! Missed - Behaviors.unhandled - case Renew(replyTo) ⇒ - replyTo ! Renewed - subject(monitor, ignorePostStop) - case Throw(ex) ⇒ - throw ex - case MkChild(name, mon, replyTo) ⇒ - val child = name match { - case None ⇒ ctx.spawnAnonymous(Behaviors.supervise(subject(mon, ignorePostStop)).onFailure[Throwable](SupervisorStrategy.restart)) - case Some(n) ⇒ ctx.spawn(Behaviors.supervise(subject(mon, ignorePostStop)).onFailure[Throwable](SupervisorStrategy.restart), n) - } - replyTo ! Created(child) - Behaviors.same - case SetTimeout(d, replyTo) ⇒ - d match { - case f: FiniteDuration ⇒ ctx.setReceiveTimeout(f, ReceiveTimeout) - case _ ⇒ ctx.cancelReceiveTimeout() - } - replyTo ! TimeoutSet - Behaviors.same - case Schedule(delay, target, msg, replyTo) ⇒ - replyTo ! Scheduled - ctx.schedule(delay, target, msg) - Behaviors.same - case Stop ⇒ - Behaviors.stopped - case Kill(ref, replyTo) ⇒ - try { - ctx.stop(ref) - replyTo ! Killed - } catch { - case ex: IllegalArgumentException ⇒ replyTo ! NotKilled - } - Behaviors.same - case Watch(ref, replyTo) ⇒ - ctx.watch(ref) - replyTo ! Watched - Behaviors.same - case Unwatch(ref, replyTo) ⇒ - ctx.unwatch(ref) - replyTo ! Unwatched - Behaviors.same - case GetInfo(replyTo) ⇒ - replyTo ! Info(ctx.self, ctx.system) - Behaviors.same - case GetChild(name, replyTo) ⇒ - replyTo ! Child(ctx.child(name)) - Behaviors.same - case GetChildren(replyTo) ⇒ - replyTo ! Children(ctx.children.toSet) - Behaviors.same - case BecomeInert(replyTo) ⇒ - replyTo ! BecameInert - Behaviors.immutable[Command] { - case (_, Ping(r)) ⇒ - r ! Pong2 - Behaviors.same - case (_, Throw(ex)) ⇒ - throw ex - case _ ⇒ Behaviors.same - } - case BecomeCareless(replyTo) ⇒ - replyTo ! BecameCareless - Behaviors.immutable[Command] { - case _ ⇒ Behaviors.unhandled - } onSignal { - case (_, PostStop) if ignorePostStop ⇒ Behaviors.same // ignore PostStop here - case (_, Terminated(_)) ⇒ Behaviors.unhandled - case (_, sig) ⇒ - monitor ! GotSignal(sig) - Behaviors.same - } - case GetAdapter(replyTo, name) ⇒ - replyTo ! Adapter(ctx.spawnMessageAdapter(identity, name)) - Behaviors.same - } - } onSignal { - case (_, PostStop) if ignorePostStop ⇒ Behaviors.same // ignore PostStop here - case (_, signal) ⇒ - monitor ! GotSignal(signal) - Behaviors.same - } - } + case object GotReceiveTimeout extends Event - sealed abstract class Start - case object Start extends Start - - sealed trait GuardianCommand - case class RunTest[T](name: String, behavior: Behavior[T], replyTo: ActorRef[Status], timeout: FiniteDuration) extends GuardianCommand - case class Terminate(reply: ActorRef[Status]) extends GuardianCommand - case class Create[T](behavior: Behavior[T], name: String)(val replyTo: ActorRef[ActorRef[T]]) extends GuardianCommand - - sealed trait Status - case object Success extends Status - case class Failed(thr: Throwable) extends Status - case object Timedout extends Status - - class SimulatedException(message: String) extends RuntimeException(message) with NoStackTrace - - def guardian(outstanding: Map[ActorRef[_], ActorRef[Status]] = Map.empty): Behavior[GuardianCommand] = - Behaviors.immutable[GuardianCommand] { - case (ctx, r: RunTest[t]) ⇒ - val test = ctx.spawn(r.behavior, r.name) - ctx.schedule(r.timeout, r.replyTo, Timedout) - ctx.watch(test) - guardian(outstanding + ((test, r.replyTo))) - case (_, Terminate(reply)) ⇒ - reply ! Success - stopped - case (ctx, c: Create[t]) ⇒ - c.replyTo ! ctx.spawn(c.behavior, c.name) - same - } onSignal { - case (ctx, t @ Terminated(test)) ⇒ - outstanding get test match { - case Some(reply) ⇒ - t.failure match { - case None ⇒ reply ! Success - case Some(ex) ⇒ reply ! Failed(ex) - } - guardian(outstanding - test) - case None ⇒ same - } - case _ ⇒ same - } } -abstract class ActorContextSpec extends TypedAkkaSpec { - import ActorContextSpec._ +abstract class ActorContextSpec extends ActorTestKit with TypedAkkaSpecWithShutdown { - implicit val timeout = Timeout(3.seconds) + import ActorSpecMessages._ - val config = ConfigFactory.parseString( - """ - akka { - loglevel = WARNING - loggers = ["akka.testkit.TestEventListener"] - actor.debug { - lifecycle = off - autoreceive = off - } - }""") + def decoration[T]: Behavior[T] ⇒ Behavior[T] - implicit lazy val system: ActorSystem[GuardianCommand] = - ActorSystem(guardian(), AkkaSpec.getCallerName(classOf[ActorContextSpec]), config = Some(config withFallback AkkaSpec.testConf)) - - val expectTimeout = 3.seconds - import AskPattern._ - - implicit def scheduler = system.scheduler - - lazy val blackhole = await(system ? Create(immutable[Any] { case _ ⇒ same }, "blackhole")) - - override def afterAll(): Unit = { - Await.result(system.terminate, timeout.duration) + implicit class BehaviorDecorator[T](behavior: Behavior[T]) { + def decorate: Behavior[T] = decoration(behavior) } - // TODO remove after basing on ScalaTest 3 with async support - import akka.testkit._ - - def await[T](f: Future[T]): T = Await.result(f, timeout.duration * 1.1) - - /** - * Run an Actor-based test. The test procedure is most conveniently - * formulated using the [[StepWise]] behavior type. - */ - def runTest[T: ClassTag](name: String)(behavior: Behavior[T])(implicit system: ActorSystem[GuardianCommand]): Future[Status] = - system ? (RunTest(name, behavior, _, timeout.duration)) - - // TODO remove after basing on ScalaTest 3 with async support - def sync(f: Future[Status])(implicit system: ActorSystem[GuardianCommand]): Unit = { - def unwrap(ex: Throwable): Throwable = ex match { - case ActorInitializationException(_, _, ex) ⇒ ex - case other ⇒ other - } - - try await(f) match { - case Success ⇒ () - case Failed(ex) ⇒ - unwrap(ex) match { - case ex2: SimulatedException ⇒ - throw ex2 - case _ ⇒ - println(system.printTree) - throw unwrap(ex) - } - case Timedout ⇒ - println(system.printTree) - fail("test timed out") - } catch { - case ex: SimulatedException ⇒ - throw ex - case NonFatal(ex) ⇒ - println(system.printTree) - throw ex - } - } - - def muteExpectedException[T <: Exception: ClassTag]( - message: String = null, - source: String = null, - start: String = "", - pattern: String = null, - occurrences: Int = Int.MaxValue)(implicit system: ActorSystem[GuardianCommand]): EventFilter = { - val filter = EventFilter(message, source, start, pattern, occurrences) - import scaladsl.adapter._ - system.toUntyped.eventStream.publish(Mute(filter)) - filter - } - - // for ScalaTest === compare of Class objects - implicit def classEqualityConstraint[A, B]: CanEqual[Class[A], Class[B]] = - new CanEqual[Class[A], Class[B]] { - def areEqual(a: Class[A], b: Class[B]) = a == b - } - - implicit def setEqualityConstraint[A, T <: Set[_ <: A]]: CanEqual[Set[A], T] = - new CanEqual[Set[A], T] { - def areEqual(a: Set[A], b: T) = a == b - } - - /** - * The name for the set of tests to be instantiated, used for keeping the test case actors’ names unique. - */ - def suite: String - - /** - * The behavior against which to run all the tests. - */ - def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] - - private def mySuite: String = suite + "Adapted" - - def setup(name: String, wrapper: Option[Behavior[Command] ⇒ Behavior[Command]] = None, ignorePostStop: Boolean = true)( - proc: (scaladsl.ActorContext[Event], StepWise.Steps[Event, ActorRef[Command]]) ⇒ StepWise.Steps[Event, _]): Future[Status] = - runTest(s"$mySuite-$name")(StepWise[Event] { (ctx, startWith) ⇒ - val b = behavior(ctx, ignorePostStop) - val props = wrapper.map(_(b)).getOrElse(b) - val steps = startWith.withKeepTraces(true)(ctx.spawn(props, "subject")) - - proc(ctx, steps) - }) - - private implicit class MkC(val startWith: StepWise.Steps[Event, ActorRef[Command]]) { - /** - * Ask the subject to create a child actor, setting its behavior to “inert” if requested. - * The latter is very useful in order to avoid disturbances with GotSignal(PostStop) in - * test procedures that stop this child. - */ - def mkChild( - name: Option[String], - monitor: ActorRef[Event], - self: ActorRef[Event], - inert: Boolean = false): StepWise.Steps[Event, (ActorRef[Command], ActorRef[Command])] = { - val s = - startWith.keep { subj ⇒ - subj ! MkChild(name, monitor, self) - }.expectMessage(expectTimeout) { (msg, subj) ⇒ - val Created(child) = msg - (subj, child) - } - - if (!inert) s - else - s.keep { - case (subj, child) ⇒ - child ! BecomeInert(self) - }.expectMessageKeep(expectTimeout) { (msg, _) ⇒ - msg should ===(BecameInert) - } - } - } - - private implicit class MessageStep[T](val startWith: StepWise.Steps[Event, T]) { - def stimulate(f: T ⇒ Unit, ev: T ⇒ Event, timeout: FiniteDuration = expectTimeout): StepWise.Steps[Event, T] = - startWith.keep(f).expectMessageKeep(timeout) { (msg, v) ⇒ - msg should ===(ev(v)) - } - } - - protected def stop(ref: ActorRef[Command]) = ref ! Stop - "An ActorContext" must { + "canonicalize behaviors" in { - sync(setup("ctx00") { (ctx, startWith) ⇒ - val self = ctx.self - startWith.keep { subj ⇒ - subj ! Ping(self) - }.expectMessageKeep(expectTimeout) { (msg, subj) ⇒ - msg should ===(Pong1) - subj ! Miss(self) - }.expectMessageKeep(expectTimeout) { (msg, subj) ⇒ - msg should ===(Missed) - subj ! Renew(self) - }.expectMessage(expectTimeout) { (msg, subj) ⇒ - msg should ===(Renewed) - subj ! Ping(self) - }.expectMessage(expectTimeout) { (msg, _) ⇒ - msg should ===(Pong1) + val probe = TestProbe[Event]() + + lazy val behavior: Behavior[Command] = Behaviors.immutable[Command] { (_, message) ⇒ + message match { + case Ping ⇒ + probe.ref ! Pong + Behaviors.same + case Miss ⇒ + probe.ref ! Missed + Behaviors.unhandled + case Renew(ref) ⇒ + ref ! Renewed + behavior } - }) + }.decorate + + val actor = spawn(behavior) + actor ! Ping + probe.expectMessage(Pong) + actor ! Miss + probe.expectMessage(Missed) + actor ! Renew(probe.ref) + probe.expectMessage(Renewed) + actor ! Ping + probe.expectMessage(Pong) } - "correctly wire the lifecycle hooks" in { - sync(setup("ctx01", Some(b ⇒ Behaviors.supervise(b).onFailure[Throwable](SupervisorStrategy.restart)), ignorePostStop = false) { (ctx, startWith) ⇒ - val self = ctx.self - val ex = new Exception("KABOOM1") - startWith { subj ⇒ - val log = muteExpectedException[Exception]("KABOOM1", occurrences = 1) - subj ! Throw(ex) - (subj, log) - }.expectMessage(expectTimeout) { - case (msg, (subj, log)) ⇒ - msg should ===(GotSignal(PreRestart)) - log.assertDone(expectTimeout) - ctx.stop(subj) - }.expectMessage(expectTimeout) { (msg, _) ⇒ - msg should ===(GotSignal(PostStop)) - } - }) + "correctly wire the lifecycle hook" in { + val probe = TestProbe[Event]() + + val internal = (Behaviors.immutablePartial[Command] { + case (_, Fail) ⇒ + throw new TestException("Boom") + } onSignal { + case (_, signal) ⇒ + probe.ref ! GotSignal(signal) + Behaviors.same + }).decorate + + val behavior = Behaviors.supervise(internal).onFailure(SupervisorStrategy.restart) + val actor = spawn(behavior) + actor ! Fail + probe.expectMessage(GotSignal(PreRestart)) } - "signal PostStop after voluntary termination" in { - sync(setup("ctx02", ignorePostStop = false) { (ctx, startWith) ⇒ - startWith.keep { subj ⇒ - stop(subj) - }.expectMessage(expectTimeout) { - case (msg, _) ⇒ - msg should ===(GotSignal(PostStop)) - } - }) + "signal post stop after voluntary termination" in { + val probe = TestProbe[Event]() + + val behavior: Behavior[Command] = ( + Behaviors.immutablePartial[Command] { + case (_, Stop) ⇒ Behaviors.stopped + } onSignal { + case (_, signal) ⇒ + probe.ref ! GotSignal(signal) + Behaviors.same + }).decorate + + val actor = spawn(behavior) + actor ! Stop + probe.expectMessage(GotSignal(PostStop)) } "restart and stop a child actor" in { - sync(setup("ctx03") { (ctx, startWith) ⇒ - val self = ctx.self - val ex = new Exception("KABOOM2") - startWith.mkChild(None, ctx.spawnMessageAdapter(ChildEvent), self) { - case (subj, child) ⇒ - val log = muteExpectedException[Exception]("KABOOM2", occurrences = 1) - child ! Throw(ex) - (subj, child, log) - }.expectMessage(expectTimeout) { - case (msg, (subj, child, log)) ⇒ - msg should ===(ChildEvent(GotSignal(PreRestart))) - log.assertDone(expectTimeout) - child ! BecomeInert(self) // necessary to avoid PostStop/Terminated interference - (subj, child) - }.expectMessageKeep(expectTimeout) { - case (msg, (subj, child)) ⇒ - msg should ===(BecameInert) - stop(subj) - ctx.watch(child) - ctx.watch(subj) - }.expectTermination(expectTimeout) { - case (t, (subj, child)) ⇒ - if (t.ref === child) subj - else if (t.ref === subj) child - else fail(s"expected termination of either $subj or $child but got $t") - }.expectTermination(expectTimeout) { (t, subj) ⇒ - t.ref should ===(subj) - } + val probe = TestProbe[Event]() + + val child: Behavior[Command] = (Behaviors.immutablePartial[Command] { + case (_, Fail) ⇒ throw new TestException("Boom") + case (_, Ping) ⇒ + probe.ref ! Pong + Behaviors.same + } onSignal { + case (_, signal) ⇒ + probe.ref ! GotChildSignal(signal) + Behavior.stopped + }).decorate + + val parent: Behavior[Command] = Behaviors.setup[Command](ctx ⇒ { + val childRef = ctx.spawnAnonymous( + Behaviors.supervise(child).onFailure(SupervisorStrategy.restart) + ) + ctx.watch(childRef) + probe.ref ! ChildMade(childRef) + + (Behaviors.immutablePartial[Command] { + case (ctx, StopRef(ref)) ⇒ + ctx.stop(ref) + Behavior.same + } onSignal { + case (_, signal) ⇒ + probe.ref ! GotSignal(signal) + Behavior.stopped + }).decorate }) + + val parentRef = spawn(parent) + val childRef = probe.expectMessageType[ChildMade].ref + childRef ! Fail + probe.expectMessage(GotChildSignal(PreRestart)) + childRef ! Ping + probe.expectMessage(Pong) + parentRef ! StopRef(childRef) + probe.expectTerminated(childRef, timeout.duration) } "stop a child actor" in { - sync(setup("ctx04") { (ctx, startWith) ⇒ - val self = ctx.self - startWith.mkChild(Some("A"), ctx.spawnMessageAdapter(ChildEvent), self, inert = true) { - case (subj, child) ⇒ - subj ! Kill(child, self) - child - }.expectMessageKeep(expectTimeout) { (msg, child) ⇒ - msg should ===(Killed) - ctx.watch(child) - }.expectTermination(expectTimeout) { (t, child) ⇒ - t.ref should ===(child) + val probe = TestProbe[Event]() + + val child: Behavior[Command] = Behaviors.empty[Command].decorate + val parent: Behavior[Command] = Behaviors.setup[Command](ctx ⇒ { + val childRef = ctx.spawnAnonymous(child) + ctx.watch(childRef) + probe.ref ! ChildMade(childRef) + Behaviors.immutablePartial[Command] { + case (ctx, StopRef(ref)) ⇒ + ctx.stop(ref) + Behaviors.same + } onSignal { + case (_, signal) ⇒ + probe.ref ! GotSignal(signal) + Behavior.stopped } - }) + }).decorate + val parentRef = spawn(parent) + val childRef = probe.expectMessageType[ChildMade].ref + parentRef ! StopRef(childRef) + probe.expectTerminated(childRef, timeout.duration) } - "reset behavior upon Restart" in { - sync(setup("ctx05", Some(Behaviors.supervise(_).onFailure(SupervisorStrategy.restart))) { (ctx, startWith) ⇒ - val self = ctx.self - val ex = new Exception("KABOOM05") - startWith - .stimulate(_ ! BecomeInert(self), _ ⇒ BecameInert) - .stimulate(_ ! Ping(self), _ ⇒ Pong2) { subj ⇒ - muteExpectedException[Exception]("KABOOM05") - subj ! Throw(ex) - subj - } - .stimulate(_ ! Ping(self), _ ⇒ Pong1) - }) + "reset behavior upon restart" in { + val probe = TestProbe[Int]() + val internal = Behaviors.setup[Command](_ ⇒ { + var counter = 0 + Behaviors.immutablePartial[Command] { + case (_, Ping) ⇒ + counter += 1 + probe.ref ! counter + Behavior.same + case (_, Fail) ⇒ + throw new TestException("Boom") + } + }).decorate + val behavior = Behaviors.supervise(internal).onFailure(SupervisorStrategy.restart) + val actor = spawn(behavior) + actor ! Ping + probe.expectMessage(1) + actor ! Fail + actor ! Ping + probe.expectMessage(1) } - "not reset behavior upon Resume" in { - sync(setup( - "ctx06", - Some(b ⇒ Behaviors.supervise(b).onFailure(SupervisorStrategy.resume))) { (ctx, startWith) ⇒ - val self = ctx.self - val ex = new Exception("KABOOM06") - startWith - .stimulate(_ ! BecomeInert(self), _ ⇒ BecameInert) - .stimulate(_ ! Ping(self), _ ⇒ Pong2).keep { subj ⇒ - muteExpectedException[Exception]("KABOOM06", occurrences = 1) - subj ! Throw(ex) - }.stimulate(_ ! Ping(self), _ ⇒ Pong2) - }) + "not reset behavior upon resume" in { + val probe = TestProbe[Int]() + val internal = Behaviors.setup[Command](_ ⇒ { + var counter = 0 + Behaviors.immutablePartial[Command] { + case (_, Ping) ⇒ + counter += 1 + probe.ref ! counter + Behavior.same + case (_, Fail) ⇒ + throw new TestException("Boom") + } + }).decorate + val behavior = Behaviors.supervise(internal).onFailure(SupervisorStrategy.resume) + val actor = spawn(behavior) + actor ! Ping + probe.expectMessage(1) + actor ! Fail + actor ! Ping + probe.expectMessage(2) } - "stop upon Stop" in { - sync(setup("ctx07", ignorePostStop = false) { (ctx, startWith) ⇒ - val self = ctx.self - val ex = new Exception("KABOOM07") - startWith - .stimulate(_ ! Ping(self), _ ⇒ Pong1).keep { subj ⇒ - muteExpectedException[Exception]("KABOOM07", occurrences = 1) - subj ! Throw(ex) - ctx.watch(subj) - }.expectMulti(expectTimeout, 2) { (msgs, subj) ⇒ - msgs.toSet should ===(Set(Left(Terminated(subj)(null)), Right(GotSignal(PostStop)))) - } - }) + "stop upon stop" in { + val probe = TestProbe[Event]() + val behavior = (Behaviors.immutablePartial[Command] { + case (_, Ping) ⇒ + probe.ref ! Pong + Behaviors.same + case (_, Fail) ⇒ + throw new TestException("boom") + } onSignal { + case (_, PostStop) ⇒ + probe.ref ! GotSignal(PostStop) + Behavior.same + }).decorate + val actorToWatch = spawn(behavior) + val watcher: ActorRef[Command] = spawn(( + Behaviors.immutablePartial[Any] { + case (ctx, Ping) ⇒ + ctx.watch(actorToWatch) + probe.ref ! Pong + Behavior.same + } onSignal { + case (_, signal) ⇒ + probe.ref ! GotSignal(signal) + Behavior.same + } + ).decorate) + actorToWatch ! Ping + probe.expectMessage(Pong) + watcher ! Ping + probe.expectMessage(Pong) + actorToWatch ! Fail + probe.expectMessage(GotSignal(PostStop)) + probe.expectTerminated(actorToWatch, timeout.duration) } "not stop non-child actor" in { - sync(setup("ctx08") { (ctx, startWith) ⇒ - val self = ctx.self - startWith.mkChild(Some("A"), ctx.spawnMessageAdapter(ChildEvent), self) { - case (subj, child) ⇒ - val other = ctx.spawn(behavior(ctx, ignorePostStop = true), "A") - subj ! Kill(other, ctx.self) - child - }.expectMessageKeep(expectTimeout) { (msg, _) ⇒ - msg should ===(NotKilled) - }.stimulate(_ ! Ping(self), _ ⇒ Pong1) - }) + val probe = TestProbe[Event]() + val victim = spawn(Behaviors.empty[Command]) + val actor = spawn(Behaviors.immutablePartial[Command] { + case (_, Ping) ⇒ + probe.ref ! Pong + Behaviors.same + case (ctx, StopRef(ref)) ⇒ + assertThrows[IllegalArgumentException] { + ctx.stop(ref) + probe.ref ! Pong + } + probe.ref ! Missed + Behaviors.same + }.decorate) + actor ! Ping + probe.expectMessage(Pong) + actor ! StopRef(victim) + probe.expectMessage(Missed) + actor ! Ping + probe.expectMessage(Pong) } "watch a child actor before its termination" in { - sync(setup("ctx10") { (ctx, startWith) ⇒ - val self = ctx.self - startWith.mkChild(None, ctx.spawnMessageAdapter(ChildEvent), self) { - case (subj, child) ⇒ - subj ! Watch(child, self) - child - }.expectMessageKeep(expectTimeout) { (msg, child) ⇒ - msg should ===(Watched) - child ! Stop - }.expectMessage(expectTimeout) { (msg, child) ⇒ - msg should ===(GotSignal(Terminated(child)(null))) - } - }) + val probe = TestProbe[Event]() + val child = Behaviors.immutablePartial[Command] { + case (_, Stop) ⇒ + Behaviors.stopped + }.decorate + val actor: ActorRef[Command] = spawn( + Behaviors.setup[Command](ctx ⇒ { + val childRef = ctx.spawn(child, "A") + ctx.watch(childRef) + probe.ref ! ChildMade(childRef) + Behaviors.immutablePartial[Command] { + case (_, Ping) ⇒ + probe.ref ! Pong + Behaviors.same + } onSignal { + case (_, signal) ⇒ + probe.ref ! GotSignal(signal) + Behaviors.same + } + }).decorate + ) + val childRef = probe.expectMessageType[ChildMade].ref + childRef ! Stop + probe.expectTerminated(childRef, timeout.duration) } "watch a child actor after its termination" in { - sync(setup("ctx11") { (ctx, startWith) ⇒ - val self = ctx.self - startWith.mkChild(None, ctx.spawnMessageAdapter(ChildEvent), self).keep { - case (subj, child) ⇒ - ctx.watch(child) - child ! Stop - }.expectTermination(expectTimeout) { - case (t, (subj, child)) ⇒ - t should ===(Terminated(child)(null)) - subj ! Watch(child, blackhole) - child - }.expectMessage(expectTimeout) { (msg, child) ⇒ - msg should ===(GotSignal(Terminated(child)(null))) - } - }) + val probe = TestProbe[Event]() + val child = Behaviors.immutablePartial[Command] { + case (_, Stop) ⇒ + Behaviors.stopped + }.decorate + val actor = spawn( + Behaviors.setup[Command](ctx ⇒ { + val childRef = ctx.spawn(child, "A") + probe.ref ! ChildMade(childRef) + Behaviors.immutablePartial[Command] { + case (ctx, Watch(ref)) ⇒ + ctx.watch(ref) + probe.ref ! Pong + Behaviors.same + } onSignal { + case (_, signal) ⇒ + probe.ref ! GotSignal(signal) + Behaviors.same + } + }).decorate + ) + val childRef = probe.expectMessageType[ChildMade].ref + actor ! Watch(childRef) + probe.expectMessage(Pong) + childRef ! Stop + probe.expectTerminated(childRef, timeout.duration) + actor ! Watch(childRef) + probe.expectTerminated(childRef, timeout.duration) } "unwatch a child actor before its termination" in { - sync(setup("ctx12") { (ctx, startWith) ⇒ - val self = ctx.self - startWith.mkChild(None, ctx.spawnMessageAdapter(ChildEvent), self).keep { - case (subj, child) ⇒ - subj ! Watch(child, self) - }.expectMessageKeep(expectTimeout) { - case (msg, (subj, child)) ⇒ - msg should ===(Watched) - subj ! Unwatch(child, self) - }.expectMessage(expectTimeout) { - case (msg, (subj, child)) ⇒ - msg should ===(Unwatched) - ctx.watch(child) - child ! Stop - child - }.expectTermination(expectTimeout) { (t, child) ⇒ - t should ===(Terminated(child)(null)) - } - }) + val probe = TestProbe[Event]() + val child = Behaviors.immutablePartial[Command] { + case (_, Stop) ⇒ + Behaviors.stopped + }.decorate + val actor = spawn( + Behaviors.setup[Command](ctx ⇒ { + val childRef = ctx.spawn(child, "A") + probe.ref ! ChildMade(childRef) + Behaviors.immutablePartial[Command] { + case (ctx, Watch(ref)) ⇒ + ctx.watch(ref) + probe.ref ! Pong + Behaviors.same + case (ctx, UnWatch(ref)) ⇒ + ctx.unwatch(ref) + probe.ref ! Pong + Behaviors.same + } onSignal { + case (_, signal) ⇒ + probe.ref ! GotSignal(signal) + Behaviors.same + } + }).decorate + ) + val childRef = probe.expectMessageType[ChildMade].ref + actor ! Watch(childRef) + probe.expectMessage(Pong) + actor ! UnWatch(childRef) + probe.expectMessage(Pong) + childRef ! Stop + probe.expectNoMessage() } "terminate upon not handling Terminated" in { - sync(setup("ctx13", ignorePostStop = false) { (ctx, startWith) ⇒ - val self = ctx.self - startWith.mkChild(None, ctx.spawnMessageAdapter(ChildEvent), self).keep { - case (subj, child) ⇒ - muteExpectedException[DeathPactException]() - subj ! Watch(child, self) - }.expectMessageKeep(expectTimeout) { - case (msg, (subj, child)) ⇒ - msg should ===(Watched) - subj ! BecomeCareless(self) - }.expectMessageKeep(expectTimeout) { - case (msg, (subj, child)) ⇒ - msg should ===(BecameCareless) - child ! Stop - }.expectMessage(expectTimeout) { - case (msg, (subj, child)) ⇒ - msg should ===(ChildEvent(GotSignal(PostStop))) - }.expectMessage(expectTimeout) { - case (msg, _) ⇒ - msg should ===(GotSignal(PostStop)) - } - }) + val probe = TestProbe[Event]() + val child = (Behaviors.immutablePartial[Command] { + case (_, Stop) ⇒ + Behaviors.stopped + } onSignal { + case (_, signal) ⇒ + probe.ref ! GotChildSignal(signal) + Behavior.same + }).decorate + val actor = spawn( + Behaviors.setup[Command](ctx ⇒ { + val childRef = ctx.spawn(child, "A") + ctx.watch(childRef) + probe.ref ! ChildMade(childRef) + Behaviors.immutablePartial[Command] { + case (_, Inert) ⇒ + probe.ref ! InertEvent + Behaviors.immutable[Command] { + case (_, _) ⇒ Behaviors.unhandled + } onSignal { + case (_, Terminated(_)) ⇒ Behaviors.unhandled + case (_, signal) ⇒ + probe.ref ! GotSignal(signal) + Behaviors.same + } + } onSignal { + case (_, signal) ⇒ + probe.ref ! GotSignal(signal) + Behaviors.same + } + }).decorate + ) + val childRef = probe.expectMessageType[ChildMade].ref + actor ! Inert + probe.expectMessage(InertEvent) + childRef ! Stop + probe.expectMessage(GotChildSignal(PostStop)) + probe.expectMessage(GotSignal(PostStop)) + probe.expectTerminated(actor, timeout.duration) } "return the right context info" in { - sync(setup("ctx20") { (ctx, startWith) ⇒ - startWith.keep(_ ! GetInfo(ctx.self)) - .expectMessage(expectTimeout) { - case (msg: Info, subj) ⇒ - msg.self should ===(subj) - msg.system should ===(system) - case (other, _) ⇒ - fail(s"$other was not an Info(...)") - } - }) + type Info = (ActorSystem[Nothing], ActorRef[String]) + val probe = TestProbe[Info] + val actor = spawn(Behaviors.immutablePartial[String] { + case (ctx, "info") ⇒ + probe.ref ! (ctx.system → ctx.self) + Behaviors.same + }.decorate) + actor ! "info" + probe.expectMessage((system, actor)) } "return right info about children" in { - sync(setup("ctx21") { (ctx, startWith) ⇒ - val self = ctx.self - startWith - .mkChild(Some("B"), ctx.spawnMessageAdapter(ChildEvent), self) - .stimulate(_._1 ! GetChild("A", self), _ ⇒ Child(None)) - .stimulate(_._1 ! GetChild("B", self), x ⇒ Child(Some(x._2))) - .stimulate(_._1 ! GetChildren(self), x ⇒ Children(Set(x._2))) - }) + type Children = Seq[ActorRef[Nothing]] + val probe = TestProbe[Children]() + val actor = spawn(Behaviors.immutablePartial[String] { + case (ctx, "create") ⇒ + ctx.spawn(Behaviors.empty, "B") + probe.ref ! ctx.child("B").toSeq + Behaviors.same + case (ctx, "all") ⇒ + probe.ref ! ctx.children.toSeq + Behaviors.same + case (ctx, get) ⇒ + probe.ref ! ctx.child(get).toSeq + Behaviors.same + }.decorate) + actor ! "create" + val children = probe.expectMessageType[Children] + actor ! "A" + probe.expectMessage(Seq.empty) + actor ! "all" + probe.expectMessage(children) + children.size shouldBe 1 + children.head.path.name shouldBe "B" } "set small receive timeout" in { - sync(setup("ctx30") { (ctx, startWith) ⇒ - val self = ctx.self - startWith - .stimulate(_ ! SetTimeout(1.nano, self), _ ⇒ TimeoutSet) - .expectMessage(expectTimeout) { (msg, _) ⇒ - msg should ===(GotReceiveTimeout) - } - }) + val probe = TestProbe[Event]() + val actor = spawn(Behaviors.immutablePartial[Command] { + case (_, ReceiveTimeout) ⇒ + probe.ref ! GotReceiveTimeout + Behaviors.same + case (ctx, SetTimeout(duration)) ⇒ + ctx.setReceiveTimeout(duration, ReceiveTimeout) + probe.ref ! TimeoutSet + Behaviors.same + }.decorate) + actor ! SetTimeout(1.nano) + probe.expectMessage(TimeoutSet) + probe.expectMessage(GotReceiveTimeout) } "set large receive timeout" in { - sync(setup("ctx31") { (ctx, startWith) ⇒ - val self = ctx.self - startWith - .stimulate(_ ! SetTimeout(1.minute, self), _ ⇒ TimeoutSet) - .stimulate(_ ⇒ ctx.schedule(1.second, self, Pong2), _ ⇒ Pong2) - .stimulate(_ ! Ping(self), _ ⇒ Pong1) - - }) + val probe = TestProbe[String]() + val actor = spawn(Behaviors.immutablePartial[String] { + case (ctx, "schedule") ⇒ + ctx.schedule(1.second, probe.ref, "scheduled") + Behaviors.same + case (_, "ping") ⇒ + probe.ref ! "pong" + Behaviors.same + case (_, "receive timeout") ⇒ + probe.ref ! "received timeout" + Behaviors.same + case (ctx, duration) ⇒ + ctx.setReceiveTimeout(Duration(duration).asInstanceOf[FiniteDuration], "receive timeout") + probe.ref ! "timeout set" + Behaviors.same + }.decorate) + actor ! "1 minute" + probe.expectMessage("timeout set") + actor ! "schedule" + probe.expectMessage("scheduled") + actor ! "ping" + probe.expectMessage("pong") } "schedule a message" in { - sync(setup("ctx32") { (ctx, startWith) ⇒ - startWith(_ ! Schedule(1.nano, ctx.self, Pong2, ctx.self)) - .expectMultipleMessages(expectTimeout, 2) { (msgs, _) ⇒ - msgs should ===(Scheduled :: Pong2 :: Nil) - } - }) - } - - "create a working adapter" in { - sync(setup("ctx40", ignorePostStop = false) { (ctx, startWith) ⇒ - startWith.keep { subj ⇒ - subj ! GetAdapter(ctx.self) - }.expectMessage(expectTimeout) { (msg, subj) ⇒ - val Adapter(adapter) = msg - ctx.watch(adapter) - adapter ! Ping(ctx.self) - (subj, adapter) - }.expectMessage(expectTimeout) { - case (msg, (subj, adapter)) ⇒ - msg should ===(Pong1) - ctx.stop(subj) - adapter - }.expectMulti(expectTimeout, 2) { (msgs, adapter) ⇒ - msgs.toSet should ===(Set(Left(Terminated(adapter)(null)), Right(GotSignal(PostStop)))) - } - }) + val probe = TestProbe[Event]() + val actor = spawn(Behaviors.immutablePartial[Command] { + case (ctx, Ping) ⇒ + ctx.schedule(1.nano, probe.ref, Pong) + Behaviors.same + }.decorate) + actor ! Ping + probe.expectMessage(Pong) } "create a named adapter" in { - sync(setup("ctx41") { (ctx, startWith) ⇒ - startWith.keep { subj ⇒ - subj ! GetAdapter(ctx.self, "named") - }.expectMessage(expectTimeout) { (msg, subj) ⇒ - val Adapter(adapter) = msg - adapter.path.name should include("named") - } - }) + type Envelope = (ActorRef[String], String) + val messages = TestProbe[Envelope]() + val probe = TestProbe[ActorRef[String]]() + val actor = spawn(Behaviors.immutablePartial[String] { + case (ctx, "message") ⇒ + messages.ref ! (ctx.self, "received message") + Behaviors.same + case (ctx, name) ⇒ + probe.ref ! ctx.spawnMessageAdapter(identity, name) + Behaviors.same + }.decorate) + val adapterName = "hello" + actor ! adapterName + val adapter = probe.expectMessageType[ActorRef[String]] + adapter.path.name should include(adapterName) + adapter ! "message" + messages.expectMessage(actor → "received message") } "not allow null messages" in { - sync(setup("ctx42") { (ctx, startWith) ⇒ - startWith.keep { subj ⇒ - intercept[InvalidMessageException] { - subj ! null - } - } - }) + val actor = spawn(Behaviors.empty[Null].decorate) + intercept[InvalidMessageException] { + actor ! null + } } "not have problems stopping already stopped child" in { - sync(setup("ctx45", ignorePostStop = false) { (ctx, startWith) ⇒ - val self = ctx.self - startWith.mkChild(Some("A"), ctx.spawnMessageAdapter(ChildEvent), self, inert = true) { - case (subj, child) ⇒ - subj ! Kill(child, self) - (subj, child) - }.expectMessageKeep(expectTimeout) { - case (msg, (subj, child)) ⇒ - msg should ===(Killed) - (subj, ctx.watch(child)) - }.expectTermination(expectTimeout) { - case (t, (subj, child)) ⇒ - t.ref should ===(child) - subj ! Kill(child, self) - child - }.expectMessage(expectTimeout) { - case (msg, _) ⇒ - msg should ===(Killed) - } - }) + val probe = TestProbe[Event]() + val actor = spawn( + Behaviors.setup[Command](ctx ⇒ { + val child = ctx.spawnAnonymous(Behaviors.empty[Command]) + probe.ref ! ChildMade(child) + Behaviors.immutablePartial[Command] { + case (ctx, StopRef(ref)) ⇒ + ctx.stop(ref) + probe.ref ! Pong + Behaviors.same + } + }) + ) + val child = probe.expectMessageType[ChildMade].ref + actor ! StopRef(child) + probe.expectMessage(Pong) + actor ! StopRef(child) + probe.expectMessage(Pong) } - } -} -import akka.actor.typed.ActorContextSpec._ + override def afterAll(): Unit = shutdownTestKit() +} class NormalActorContextSpec extends ActorContextSpec { - override def suite = "normal" - override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] = - subject(ctx.self, ignorePostStop) + + override def decoration[T] = x ⇒ x } -class WidenedActorContextSpec extends ActorContextSpec { +class WidenActorContextSpec extends ActorContextSpec { - import Behaviors._ - - override def suite = "widened" - override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] = - subject(ctx.self, ignorePostStop).widen { case x ⇒ x } + override def decoration[T] = b ⇒ b.widen { case x ⇒ x } } class DeferredActorContextSpec extends ActorContextSpec { - override def suite = "deferred" - override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] = - Behaviors.setup(_ ⇒ subject(ctx.self, ignorePostStop)) + + override def decoration[T] = b ⇒ Behaviors.setup(_ ⇒ b) } class NestedDeferredActorContextSpec extends ActorContextSpec { - override def suite = "nexted-deferred" - override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] = - Behaviors.setup(_ ⇒ Behaviors.setup(_ ⇒ subject(ctx.self, ignorePostStop))) + + override def decoration[T] = b ⇒ Behaviors.setup(_ ⇒ Behaviors.setup(_ ⇒ b)) } class TapActorContextSpec extends ActorContextSpec { - override def suite = "tap" - override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] = - Behaviors.tap((_, _) ⇒ (), (_, _) ⇒ (), subject(ctx.self, ignorePostStop)) + + override def decoration[T] = b ⇒ Behaviors.tap((_, _) ⇒ (), (_, _) ⇒ (), b) } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/StepWise.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/StepWise.scala deleted file mode 100644 index c4a9aa59f1..0000000000 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/StepWise.scala +++ /dev/null @@ -1,216 +0,0 @@ -/** - * Copyright (C) 2014-2018 Lightbend Inc. - */ - -package akka.actor.typed - -import scala.concurrent.duration.FiniteDuration -import java.util.concurrent.TimeoutException - -import akka.actor.typed.scaladsl.Behaviors._ - -import scala.concurrent.duration.Deadline - -/** - * This object contains tools for building step-wise behaviors for formulating - * a linearly progressing protocol. - * - * Example: - * {{{ - * import scala.concurrent.duration._ - * - * StepWise[Command] { (ctx, startWith) => - * startWith { - * val child = ctx.spawn(...) - * child ! msg - * child - * }.expectMessage(100.millis) { (reply, child) => - * target ! GotReply(reply) - * } - * } - * }}} - * - * State can be passed from one step to the next by returning it as is - * demonstrated with the `child` ActorRef in the example. - * - * This way of writing Actors can be very useful when writing Actor-based - * test procedures for actor systems, hence also the possibility to expect - * failures (see [[StepWise.Steps#expectFailure]]). - */ -@deprecated("to be replaced by process DSL", "2.4-M2") -object StepWise { - - sealed trait AST - private final case class Thunk(f: () ⇒ Any) extends AST - private final case class ThunkV(f: Any ⇒ Any) extends AST - private final case class Message(timeout: FiniteDuration, f: (Any, Any) ⇒ Any, trace: Trace) extends AST - private final case class MultiMessage(timeout: FiniteDuration, count: Int, f: (Seq[Any], Any) ⇒ Any, trace: Trace) extends AST - private final case class Termination(timeout: FiniteDuration, f: (Terminated, Any) ⇒ Any, trace: Trace) extends AST - private final case class Multi(timeout: FiniteDuration, count: Int, f: (Seq[Either[Signal, Any]], Any) ⇒ Any, trace: Trace) extends AST - - private case object ReceiveTimeout - - private sealed trait Trace { - def getStackTrace: Array[StackTraceElement] - protected def getFrames: Array[StackTraceElement] = - Thread.currentThread.getStackTrace.dropWhile { elem ⇒ - val name = elem.getClassName - name.startsWith("java.lang.Thread") || name.startsWith("akka.actor.typed.StepWise") - } - } - private class WithTrace extends Trace { - private val trace = getFrames - def getStackTrace = trace - } - private object WithoutTrace extends Trace { - def getStackTrace = getFrames - } - - final case class Steps[T, U](ops: List[AST], keepTraces: Boolean) { - private def getTrace(): Trace = - if (keepTraces) new WithTrace - else WithoutTrace - - def apply[V](thunk: U ⇒ V): Steps[T, V] = - copy(ops = ThunkV(thunk.asInstanceOf[Any ⇒ Any]) :: ops) - - def keep(thunk: U ⇒ Unit): Steps[T, U] = - copy(ops = ThunkV(value ⇒ { thunk.asInstanceOf[Any ⇒ Any](value); value }) :: ops) - - def expectMessage[V](timeout: FiniteDuration)(f: (T, U) ⇒ V): Steps[T, V] = - copy(ops = Message(timeout, f.asInstanceOf[(Any, Any) ⇒ Any], getTrace()) :: ops) - - def expectMultipleMessages[V](timeout: FiniteDuration, count: Int)(f: (Seq[T], U) ⇒ V): Steps[T, V] = - copy(ops = MultiMessage(timeout, count, f.asInstanceOf[(Seq[Any], Any) ⇒ Any], getTrace()) :: ops) - - def expectTermination[V](timeout: FiniteDuration)(f: (Terminated, U) ⇒ V): Steps[T, V] = - copy(ops = Termination(timeout, f.asInstanceOf[(Terminated, Any) ⇒ Any], getTrace()) :: ops) - - def expectMulti[V](timeout: FiniteDuration, count: Int)(f: (Seq[Either[Signal, T]], U) ⇒ V): Steps[T, V] = - copy(ops = Multi(timeout, count, f.asInstanceOf[(Seq[Either[Signal, Any]], Any) ⇒ Any], getTrace()) :: ops) - - def expectMessageKeep(timeout: FiniteDuration)(f: (T, U) ⇒ Unit): Steps[T, U] = - copy(ops = Message(timeout, (msg, value) ⇒ { f.asInstanceOf[(Any, Any) ⇒ Any](msg, value); value }, getTrace()) :: ops) - - def expectMultipleMessagesKeep(timeout: FiniteDuration, count: Int)(f: (Seq[T], U) ⇒ Unit): Steps[T, U] = - copy(ops = MultiMessage(timeout, count, (msgs, value) ⇒ { f.asInstanceOf[(Seq[Any], Any) ⇒ Any](msgs, value); value }, getTrace()) :: ops) - - def expectTerminationKeep(timeout: FiniteDuration)(f: (Terminated, U) ⇒ Unit): Steps[T, U] = - copy(ops = Termination(timeout, (t, value) ⇒ { f.asInstanceOf[(Terminated, Any) ⇒ Any](t, value); value }, getTrace()) :: ops) - - def expectMultiKeep(timeout: FiniteDuration, count: Int)(f: (Seq[Either[Signal, T]], U) ⇒ Unit): Steps[T, U] = - copy(ops = Multi(timeout, count, (msgs, value) ⇒ { f.asInstanceOf[(Seq[Either[Signal, Any]], Any) ⇒ Any](msgs, value); value }, getTrace()) :: ops) - - def withKeepTraces(b: Boolean): Steps[T, U] = copy(keepTraces = b) - } - - class StartWith[T](keepTraces: Boolean) { - def apply[U](thunk: ⇒ U): Steps[T, U] = Steps(Thunk(() ⇒ thunk) :: Nil, keepTraces) - def withKeepTraces(b: Boolean): StartWith[T] = new StartWith(b) - } - - def apply[T](f: (scaladsl.ActorContext[T], StartWith[T]) ⇒ Steps[T, _]): Behavior[T] = - setup[Any] { ctx ⇒ - run(ctx, f(ctx.asInstanceOf[scaladsl.ActorContext[T]], new StartWith(keepTraces = false)).ops.reverse, ()) - }.narrow - - private def throwTimeout(trace: Trace, message: String): Nothing = - throw new TimeoutException(message) { - override def fillInStackTrace(): Throwable = { - setStackTrace(trace.getStackTrace) - this - } - } - - private def throwIllegalState(trace: Trace, message: String): Nothing = - throw new IllegalStateException(message) { - override def fillInStackTrace(): Throwable = { - setStackTrace(trace.getStackTrace) - this - } - } - - private def run[T](ctx: scaladsl.ActorContext[Any], ops: List[AST], value: Any): Behavior[Any] = - ops match { - case Thunk(f) :: tail ⇒ run(ctx, tail, f()) - case ThunkV(f) :: tail ⇒ run(ctx, tail, f(value)) - case Message(t, f, trace) :: tail ⇒ - ctx.setReceiveTimeout(t, ReceiveTimeout) - immutable[Any] { - case (_, ReceiveTimeout) ⇒ throwTimeout(trace, s"timeout of $t expired while waiting for a message") - case (_, msg) ⇒ - ctx.cancelReceiveTimeout() - run(ctx, tail, f(msg, value)) - } onSignal { - case (_, PostStop) ⇒ - // ignore PostStop here - run(ctx, ops, value) - case (_, other) ⇒ throwIllegalState(trace, s"unexpected $other while waiting for a message") - } - case MultiMessage(t, c, f, trace) :: tail ⇒ - val deadline = Deadline.now + t - def behavior(count: Int, acc: List[Any]): Behavior[Any] = { - ctx.setReceiveTimeout(deadline.timeLeft, ReceiveTimeout) - immutable[Any] { - case (_, ReceiveTimeout) ⇒ - throwTimeout(trace, s"timeout of $t expired while waiting for $c messages (got only $count)") - case (_, msg) ⇒ - val nextCount = count + 1 - if (nextCount == c) { - ctx.cancelReceiveTimeout() - run(ctx, tail, f((msg :: acc).reverse, value)) - } else behavior(nextCount, msg :: acc) - } onSignal { - case (_, PostStop) ⇒ - // ignore PostStop here - run(ctx, ops, value) - case (_, other) ⇒ throwIllegalState(trace, s"unexpected $other while waiting for $c messages (got $count valid ones)") - } - } - behavior(0, Nil) - case Multi(t, c, f, trace) :: tail ⇒ - val deadline = Deadline.now + t - def behavior(count: Int, acc: List[Either[Signal, Any]]): Behavior[Any] = { - ctx.setReceiveTimeout(deadline.timeLeft, ReceiveTimeout) - immutable[Any] { - case (_, ReceiveTimeout) ⇒ - throwTimeout(trace, s"timeout of $t expired while waiting for $c messages (got only $count)") - case (_, msg) ⇒ - val nextCount = count + 1 - if (nextCount == c) { - ctx.cancelReceiveTimeout() - run(ctx, tail, f((Right(msg) :: acc).reverse, value)) - } else behavior(nextCount, Right(msg) :: acc) - } onSignal { - case (_, PostStop) ⇒ - // ignore PostStop here - run(ctx, ops, value) - case (_, other) ⇒ - val nextCount = count + 1 - if (nextCount == c) { - ctx.cancelReceiveTimeout() - run(ctx, tail, f((Left(other) :: acc).reverse, value)) - } else behavior(nextCount, Left(other) :: acc) - } - } - behavior(0, Nil) - case Termination(t, f, trace) :: tail ⇒ - ctx.setReceiveTimeout(t, ReceiveTimeout) - immutable[Any] { - case (_, ReceiveTimeout) ⇒ throwTimeout(trace, s"timeout of $t expired while waiting for termination") - case other ⇒ throwIllegalState(trace, s"unexpected $other while waiting for termination") - } onSignal { - case (_, PostStop) ⇒ - // ignore PostStop here - run(ctx, ops, value) - case (_, t: Terminated) ⇒ - ctx.cancelReceiveTimeout() - run(ctx, tail, f(t, value)) - case other ⇒ throwIllegalState(trace, s"unexpected $other while waiting for termination") - } - case Nil ⇒ - stopped - } -} - -abstract class StepWise