Use Wordspec for all tests

It was discussed and team agreed we'd rather have consistent tests even
if the compilation is slightly slower.
This commit is contained in:
Christopher Batey 2017-12-18 15:37:30 +00:00
parent c394ee7aaa
commit 97180eb6ed
34 changed files with 1390 additions and 1360 deletions

View file

@ -13,7 +13,7 @@ import java.util.Optional;
import static junit.framework.TestCase.assertSame; import static junit.framework.TestCase.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
class ExtensionsTest extends JUnitSuite { public class ExtensionsTest extends JUnitSuite {
public static class MyExtImpl implements Extension { public static class MyExtImpl implements Extension {
} }
@ -45,7 +45,7 @@ class ExtensionsTest extends JUnitSuite {
Behavior.empty(), Behavior.empty(),
"loadJavaExtensionsFromConfig", "loadJavaExtensionsFromConfig",
Optional.empty(), Optional.empty(),
Optional.of(ConfigFactory.parseString("akka.typed.extensions += \"akka.typed.ExtensionsTest$MyExtension\"").resolve()), Optional.of(ConfigFactory.parseString("akka.typed.extensions += \"akka.actor.typed.ExtensionsTest$MyExtension\"").resolve()),
Optional.empty(), Optional.empty(),
Optional.empty() Optional.empty()
); );

View file

@ -1,7 +1,7 @@
/** /**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com> * Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/ */
package jdocs.akka.actor.typed; package jdocs.akka.typed;
//#imports //#imports
import akka.actor.typed.ActorRef; import akka.actor.typed.ActorRef;

View file

@ -259,7 +259,7 @@ object ActorContextSpec {
} }
class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( abstract class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
"""|akka { """|akka {
| loglevel = WARNING | loglevel = WARNING
| actor.debug { | actor.debug {
@ -268,93 +268,94 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
| } | }
| typed.loggers = ["akka.typed.testkit.TestEventListener"] | typed.loggers = ["akka.typed.testkit.TestEventListener"]
|}""".stripMargin)) { |}""".stripMargin)) {
import ActorContextSpec._ import ActorContextSpec._
val expectTimeout = 3.seconds val expectTimeout = 3.seconds
trait Tests { /**
/** * The name for the set of tests to be instantiated, used for keeping the test case actors names unique.
* The name for the set of tests to be instantiated, used for keeping the test case actors names unique. */
*/ def suite: String
def suite: String
/** /**
* The behavior against which to run all the tests. * The behavior against which to run all the tests.
*/ */
def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command]
implicit def system: ActorSystem[TypedSpec.Command] private def mySuite: String = suite + "Adapted"
private def mySuite: String = suite + "Adapted" def setup(name: String, wrapper: Option[Behavior[Command] Behavior[Command]] = None, ignorePostStop: Boolean = true)(
proc: (scaladsl.ActorContext[Event], StepWise.Steps[Event, ActorRef[Command]]) StepWise.Steps[Event, _]): Future[TypedSpec.Status] =
runTest(s"$mySuite-$name")(StepWise[Event] { (ctx, startWith)
val b = behavior(ctx, ignorePostStop)
val props = wrapper.map(_(b)).getOrElse(b)
val steps = startWith.withKeepTraces(true)(ctx.spawn(props, "subject"))
def setup(name: String, wrapper: Option[Behavior[Command] Behavior[Command]] = None, ignorePostStop: Boolean = true)( proc(ctx, steps)
proc: (scaladsl.ActorContext[Event], StepWise.Steps[Event, ActorRef[Command]]) StepWise.Steps[Event, _]): Future[TypedSpec.Status] =
runTest(s"$mySuite-$name")(StepWise[Event] { (ctx, startWith)
val b = behavior(ctx, ignorePostStop)
val props = wrapper.map(_(b)).getOrElse(b)
val steps = startWith.withKeepTraces(true)(ctx.spawn(props, "subject"))
proc(ctx, steps)
})
private implicit class MkC(val startWith: StepWise.Steps[Event, ActorRef[Command]]) {
/**
* Ask the subject to create a child actor, setting its behavior to inert if requested.
* The latter is very useful in order to avoid disturbances with GotSignal(PostStop) in
* test procedures that stop this child.
*/
def mkChild(
name: Option[String],
monitor: ActorRef[Event],
self: ActorRef[Event],
inert: Boolean = false): StepWise.Steps[Event, (ActorRef[Command], ActorRef[Command])] = {
val s =
startWith.keep { subj
subj ! MkChild(name, monitor, self)
}.expectMessage(expectTimeout) { (msg, subj)
val Created(child) = msg
(subj, child)
}
if (!inert) s
else
s.keep {
case (subj, child)
child ! BecomeInert(self)
}.expectMessageKeep(expectTimeout) { (msg, _)
msg should ===(BecameInert)
}
}
}
private implicit class MessageStep[T](val startWith: StepWise.Steps[Event, T]) {
def stimulate(f: T Unit, ev: T Event, timeout: FiniteDuration = expectTimeout): StepWise.Steps[Event, T] =
startWith.keep(f).expectMessageKeep(timeout) { (msg, v)
msg should ===(ev(v))
}
}
protected def stop(ref: ActorRef[Command]) = ref ! Stop
def `00 must canonicalize behaviors`(): Unit = sync(setup("ctx00") { (ctx, startWith)
val self = ctx.self
startWith.keep { subj
subj ! Ping(self)
}.expectMessageKeep(expectTimeout) { (msg, subj)
msg should ===(Pong1)
subj ! Miss(self)
}.expectMessageKeep(expectTimeout) { (msg, subj)
msg should ===(Missed)
subj ! Renew(self)
}.expectMessage(expectTimeout) { (msg, subj)
msg should ===(Renewed)
subj ! Ping(self)
}.expectMessage(expectTimeout) { (msg, _)
msg should ===(Pong1)
}
}) })
def `01 must correctly wire the lifecycle hooks`(): Unit = private implicit class MkC(val startWith: StepWise.Steps[Event, ActorRef[Command]]) {
/**
* Ask the subject to create a child actor, setting its behavior to inert if requested.
* The latter is very useful in order to avoid disturbances with GotSignal(PostStop) in
* test procedures that stop this child.
*/
def mkChild(
name: Option[String],
monitor: ActorRef[Event],
self: ActorRef[Event],
inert: Boolean = false): StepWise.Steps[Event, (ActorRef[Command], ActorRef[Command])] = {
val s =
startWith.keep { subj
subj ! MkChild(name, monitor, self)
}.expectMessage(expectTimeout) { (msg, subj)
val Created(child) = msg
(subj, child)
}
if (!inert) s
else
s.keep {
case (subj, child)
child ! BecomeInert(self)
}.expectMessageKeep(expectTimeout) { (msg, _)
msg should ===(BecameInert)
}
}
}
private implicit class MessageStep[T](val startWith: StepWise.Steps[Event, T]) {
def stimulate(f: T Unit, ev: T Event, timeout: FiniteDuration = expectTimeout): StepWise.Steps[Event, T] =
startWith.keep(f).expectMessageKeep(timeout) { (msg, v)
msg should ===(ev(v))
}
}
protected def stop(ref: ActorRef[Command]) = ref ! Stop
"An ActorContext" must {
"canonicalize behaviors" in {
sync(setup("ctx00") { (ctx, startWith)
val self = ctx.self
startWith.keep { subj
subj ! Ping(self)
}.expectMessageKeep(expectTimeout) { (msg, subj)
msg should ===(Pong1)
subj ! Miss(self)
}.expectMessageKeep(expectTimeout) { (msg, subj)
msg should ===(Missed)
subj ! Renew(self)
}.expectMessage(expectTimeout) { (msg, subj)
msg should ===(Renewed)
subj ! Ping(self)
}.expectMessage(expectTimeout) { (msg, _)
msg should ===(Pong1)
}
})
}
"correctly wire the lifecycle hooks" in {
sync(setup("ctx01", Some(b Actor.supervise(b).onFailure[Throwable](SupervisorStrategy.restart)), ignorePostStop = false) { (ctx, startWith) sync(setup("ctx01", Some(b Actor.supervise(b).onFailure[Throwable](SupervisorStrategy.restart)), ignorePostStop = false) { (ctx, startWith)
val self = ctx.self val self = ctx.self
val ex = new Exception("KABOOM1") val ex = new Exception("KABOOM1")
@ -371,296 +372,337 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
msg should ===(GotSignal(PostStop)) msg should ===(GotSignal(PostStop))
} }
}) })
}
def `02 must signal PostStop after voluntary termination`(): Unit = sync(setup("ctx02", ignorePostStop = false) { (ctx, startWith) "signal PostStop after voluntary termination" in {
startWith.keep { subj sync(setup("ctx02", ignorePostStop = false) { (ctx, startWith)
stop(subj) startWith.keep { subj
}.expectMessage(expectTimeout) {
case (msg, _)
msg should ===(GotSignal(PostStop))
}
})
def `03 must restart and stop a child actor`(): Unit = sync(setup("ctx03") { (ctx, startWith)
val self = ctx.self
val ex = new Exception("KABOOM2")
startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self) {
case (subj, child)
val log = muteExpectedException[Exception]("KABOOM2", occurrences = 1)
child ! Throw(ex)
(subj, child, log)
}.expectMessage(expectTimeout) {
case (msg, (subj, child, log))
msg should ===(ChildEvent(GotSignal(PreRestart)))
log.assertDone(expectTimeout)
child ! BecomeInert(self) // necessary to avoid PostStop/Terminated interference
(subj, child)
}.expectMessageKeep(expectTimeout) {
case (msg, (subj, child))
msg should ===(BecameInert)
stop(subj) stop(subj)
ctx.watch(child) }.expectMessage(expectTimeout) {
ctx.watch(subj) case (msg, _)
}.expectTermination(expectTimeout) { msg should ===(GotSignal(PostStop))
case (t, (subj, child))
if (t.ref === child) subj
else if (t.ref === subj) child
else fail(s"expected termination of either $subj or $child but got $t")
}.expectTermination(expectTimeout) { (t, subj)
t.ref should ===(subj)
}
})
def `04 must stop a child actor`(): Unit = sync(setup("ctx04") { (ctx, startWith)
val self = ctx.self
startWith.mkChild(Some("A"), ctx.spawnAdapter(ChildEvent), self, inert = true) {
case (subj, child)
subj ! Kill(child, self)
child
}.expectMessageKeep(expectTimeout) { (msg, child)
msg should ===(Killed)
ctx.watch(child)
}.expectTermination(expectTimeout) { (t, child)
t.ref should ===(child)
}
})
def `05 must reset behavior upon Restart`(): Unit = sync(setup("ctx05", Some(Actor.supervise(_).onFailure(SupervisorStrategy.restart))) { (ctx, startWith)
val self = ctx.self
val ex = new Exception("KABOOM05")
startWith
.stimulate(_ ! BecomeInert(self), _ BecameInert)
.stimulate(_ ! Ping(self), _ Pong2) { subj
val log = muteExpectedException[Exception]("KABOOM05")
subj ! Throw(ex)
subj
} }
.stimulate(_ ! Ping(self), _ Pong1) })
}) }
def `06 must not reset behavior upon Resume`(): Unit = sync(setup( "restart and stop a child actor" in {
"ctx06", sync(setup("ctx03") { (ctx, startWith)
Some(b Actor.supervise(b).onFailure(SupervisorStrategy.resume))) { (ctx, startWith)
val self = ctx.self val self = ctx.self
val ex = new Exception("KABOOM06") val ex = new Exception("KABOOM2")
startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self) {
case (subj, child)
val log = muteExpectedException[Exception]("KABOOM2", occurrences = 1)
child ! Throw(ex)
(subj, child, log)
}.expectMessage(expectTimeout) {
case (msg, (subj, child, log))
msg should ===(ChildEvent(GotSignal(PreRestart)))
log.assertDone(expectTimeout)
child ! BecomeInert(self) // necessary to avoid PostStop/Terminated interference
(subj, child)
}.expectMessageKeep(expectTimeout) {
case (msg, (subj, child))
msg should ===(BecameInert)
stop(subj)
ctx.watch(child)
ctx.watch(subj)
}.expectTermination(expectTimeout) {
case (t, (subj, child))
if (t.ref === child) subj
else if (t.ref === subj) child
else fail(s"expected termination of either $subj or $child but got $t")
}.expectTermination(expectTimeout) { (t, subj)
t.ref should ===(subj)
}
})
}
"stop a child actor" in {
sync(setup("ctx04") { (ctx, startWith)
val self = ctx.self
startWith.mkChild(Some("A"), ctx.spawnAdapter(ChildEvent), self, inert = true) {
case (subj, child)
subj ! Kill(child, self)
child
}.expectMessageKeep(expectTimeout) { (msg, child)
msg should ===(Killed)
ctx.watch(child)
}.expectTermination(expectTimeout) { (t, child)
t.ref should ===(child)
}
})
}
"reset behavior upon Restart" in {
sync(setup("ctx05", Some(Actor.supervise(_).onFailure(SupervisorStrategy.restart))) { (ctx, startWith)
val self = ctx.self
val ex = new Exception("KABOOM05")
startWith startWith
.stimulate(_ ! BecomeInert(self), _ BecameInert) .stimulate(_ ! BecomeInert(self), _ BecameInert)
.stimulate(_ ! Ping(self), _ Pong2).keep { subj .stimulate(_ ! Ping(self), _ Pong2) { subj
muteExpectedException[Exception]("KABOOM06", occurrences = 1) muteExpectedException[Exception]("KABOOM05")
subj ! Throw(ex) subj ! Throw(ex)
}.stimulate(_ ! Ping(self), _ Pong2) subj
}
.stimulate(_ ! Ping(self), _ Pong1)
}) })
}
def `07 must stop upon Stop`(): Unit = sync(setup("ctx07", ignorePostStop = false) { (ctx, startWith) "not reset behavior upon Resume" in {
val self = ctx.self sync(setup(
val ex = new Exception("KABOOM07") "ctx06",
startWith Some(b Actor.supervise(b).onFailure(SupervisorStrategy.resume))) { (ctx, startWith)
.stimulate(_ ! Ping(self), _ Pong1).keep { subj val self = ctx.self
muteExpectedException[Exception]("KABOOM07", occurrences = 1) val ex = new Exception("KABOOM06")
subj ! Throw(ex) startWith
ctx.watch(subj) .stimulate(_ ! BecomeInert(self), _ BecameInert)
}.expectMulti(expectTimeout, 2) { (msgs, subj) .stimulate(_ ! Ping(self), _ Pong2).keep { subj
msgs.toSet should ===(Set(Left(Terminated(subj)(null)), Right(GotSignal(PostStop)))) muteExpectedException[Exception]("KABOOM06", occurrences = 1)
} subj ! Throw(ex)
}) }.stimulate(_ ! Ping(self), _ Pong2)
})
}
def `08 must not stop non-child actor`(): Unit = sync(setup("ctx08") { (ctx, startWith) "stop upon Stop" in {
val self = ctx.self sync(setup("ctx07", ignorePostStop = false) { (ctx, startWith)
startWith.mkChild(Some("A"), ctx.spawnAdapter(ChildEvent), self) { val self = ctx.self
case (subj, child) val ex = new Exception("KABOOM07")
val other = ctx.spawn(behavior(ctx, ignorePostStop = true), "A") startWith
subj ! Kill(other, ctx.self) .stimulate(_ ! Ping(self), _ Pong1).keep { subj
child muteExpectedException[Exception]("KABOOM07", occurrences = 1)
}.expectMessageKeep(expectTimeout) { (msg, _) subj ! Throw(ex)
msg should ===(NotKilled) ctx.watch(subj)
}.stimulate(_ ! Ping(self), _ Pong1) }.expectMulti(expectTimeout, 2) { (msgs, subj)
}) msgs.toSet should ===(Set(Left(Terminated(subj)(null)), Right(GotSignal(PostStop))))
}
})
}
def `10 must watch a child actor before its termination`(): Unit = sync(setup("ctx10") { (ctx, startWith) "not stop non-child actor" in {
val self = ctx.self sync(setup("ctx08") { (ctx, startWith)
startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self) { val self = ctx.self
case (subj, child) startWith.mkChild(Some("A"), ctx.spawnAdapter(ChildEvent), self) {
subj ! Watch(child, self) case (subj, child)
child val other = ctx.spawn(behavior(ctx, ignorePostStop = true), "A")
}.expectMessageKeep(expectTimeout) { (msg, child) subj ! Kill(other, ctx.self)
msg should ===(Watched) child
child ! Stop }.expectMessageKeep(expectTimeout) { (msg, _)
}.expectMessage(expectTimeout) { (msg, child) msg should ===(NotKilled)
msg should ===(GotSignal(Terminated(child)(null))) }.stimulate(_ ! Ping(self), _ Pong1)
} })
}) }
def `11 must watch a child actor after its termination`(): Unit = sync(setup("ctx11") { (ctx, startWith) "watch a child actor before its termination" in {
val self = ctx.self sync(setup("ctx10") { (ctx, startWith)
startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self).keep { val self = ctx.self
case (subj, child) startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self) {
ctx.watch(child) case (subj, child)
subj ! Watch(child, self)
child
}.expectMessageKeep(expectTimeout) { (msg, child)
msg should ===(Watched)
child ! Stop child ! Stop
}.expectTermination(expectTimeout) { }.expectMessage(expectTimeout) { (msg, child)
case (t, (subj, child)) msg should ===(GotSignal(Terminated(child)(null)))
}
})
}
"watch a child actor after its termination" in {
sync(setup("ctx11") { (ctx, startWith)
val self = ctx.self
startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self).keep {
case (subj, child)
ctx.watch(child)
child ! Stop
}.expectTermination(expectTimeout) {
case (t, (subj, child))
t should ===(Terminated(child)(null))
subj ! Watch(child, blackhole)
child
}.expectMessage(expectTimeout) { (msg, child)
msg should ===(GotSignal(Terminated(child)(null)))
}
})
}
"unwatch a child actor before its termination" in {
sync(setup("ctx12") { (ctx, startWith)
val self = ctx.self
startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self).keep {
case (subj, child)
subj ! Watch(child, self)
}.expectMessageKeep(expectTimeout) {
case (msg, (subj, child))
msg should ===(Watched)
subj ! Unwatch(child, self)
}.expectMessage(expectTimeout) {
case (msg, (subj, child))
msg should ===(Unwatched)
ctx.watch(child)
child ! Stop
child
}.expectTermination(expectTimeout) { (t, child)
t should ===(Terminated(child)(null)) t should ===(Terminated(child)(null))
subj ! Watch(child, blackhole)
child
}.expectMessage(expectTimeout) { (msg, child)
msg should ===(GotSignal(Terminated(child)(null)))
}
})
def `12 must unwatch a child actor before its termination`(): Unit = sync(setup("ctx12") { (ctx, startWith)
val self = ctx.self
startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self).keep {
case (subj, child)
subj ! Watch(child, self)
}.expectMessageKeep(expectTimeout) {
case (msg, (subj, child))
msg should ===(Watched)
subj ! Unwatch(child, self)
}.expectMessage(expectTimeout) {
case (msg, (subj, child))
msg should ===(Unwatched)
ctx.watch(child)
child ! Stop
child
}.expectTermination(expectTimeout) { (t, child)
t should ===(Terminated(child)(null))
}
})
def `13 must terminate upon not handling Terminated`(): Unit = sync(setup("ctx13", ignorePostStop = false) { (ctx, startWith)
val self = ctx.self
startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self).keep {
case (subj, child)
muteExpectedException[DeathPactException]()
subj ! Watch(child, self)
}.expectMessageKeep(expectTimeout) {
case (msg, (subj, child))
msg should ===(Watched)
subj ! BecomeCareless(self)
}.expectMessageKeep(expectTimeout) {
case (msg, (subj, child))
msg should ===(BecameCareless)
child ! Stop
}.expectMessage(expectTimeout) {
case (msg, (subj, child))
msg should ===(ChildEvent(GotSignal(PostStop)))
}.expectMessage(expectTimeout) {
case (msg, _)
msg should ===(GotSignal(PostStop))
}
})
def `20 must return the right context info`(): Unit = sync(setup("ctx20") { (ctx, startWith)
startWith.keep(_ ! GetInfo(ctx.self))
.expectMessage(expectTimeout) {
case (msg: Info, subj)
msg.self should ===(subj)
msg.system should ===(system)
case (other, _)
fail(s"$other was not an Info(...)")
} }
}) })
}
def `21 must return right info about children`(): Unit = sync(setup("ctx21") { (ctx, startWith) "terminate upon not handling Terminated" in {
val self = ctx.self sync(setup("ctx13", ignorePostStop = false) { (ctx, startWith)
startWith val self = ctx.self
.mkChild(Some("B"), ctx.spawnAdapter(ChildEvent), self) startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self).keep {
.stimulate(_._1 ! GetChild("A", self), _ Child(None)) case (subj, child)
.stimulate(_._1 ! GetChild("B", self), x Child(Some(x._2))) muteExpectedException[DeathPactException]()
.stimulate(_._1 ! GetChildren(self), x Children(Set(x._2))) subj ! Watch(child, self)
}) }.expectMessageKeep(expectTimeout) {
case (msg, (subj, child))
def `30 must set small receive timeout`(): Unit = sync(setup("ctx30") { (ctx, startWith) msg should ===(Watched)
val self = ctx.self subj ! BecomeCareless(self)
startWith }.expectMessageKeep(expectTimeout) {
.stimulate(_ ! SetTimeout(1.nano, self), _ TimeoutSet) case (msg, (subj, child))
.expectMessage(expectTimeout) { (msg, _) msg should ===(BecameCareless)
msg should ===(GotReceiveTimeout) child ! Stop
}.expectMessage(expectTimeout) {
case (msg, (subj, child))
msg should ===(ChildEvent(GotSignal(PostStop)))
}.expectMessage(expectTimeout) {
case (msg, _)
msg should ===(GotSignal(PostStop))
} }
}) })
}
def `31 must set large receive timeout`(): Unit = sync(setup("ctx31") { (ctx, startWith) "return the right context info" in {
val self = ctx.self sync(setup("ctx20") { (ctx, startWith)
startWith startWith.keep(_ ! GetInfo(ctx.self))
.stimulate(_ ! SetTimeout(1.minute, self), _ TimeoutSet) .expectMessage(expectTimeout) {
.stimulate(_ ctx.schedule(1.second, self, Pong2), _ Pong2) case (msg: Info, subj)
.stimulate(_ ! Ping(self), _ Pong1) msg.self should ===(subj)
}) msg.system should ===(system)
case (other, _)
fail(s"$other was not an Info(...)")
}
})
}
def `32 must schedule a message`(): Unit = sync(setup("ctx32") { (ctx, startWith) "return right info about children" in {
startWith(_ ! Schedule(1.nano, ctx.self, Pong2, ctx.self)) sync(setup("ctx21") { (ctx, startWith)
.expectMultipleMessages(expectTimeout, 2) { (msgs, _) val self = ctx.self
msgs should ===(Scheduled :: Pong2 :: Nil) startWith
.mkChild(Some("B"), ctx.spawnAdapter(ChildEvent), self)
.stimulate(_._1 ! GetChild("A", self), _ Child(None))
.stimulate(_._1 ! GetChild("B", self), x Child(Some(x._2)))
.stimulate(_._1 ! GetChildren(self), x Children(Set(x._2)))
})
}
"set small receive timeout" in {
sync(setup("ctx30") { (ctx, startWith)
val self = ctx.self
startWith
.stimulate(_ ! SetTimeout(1.nano, self), _ TimeoutSet)
.expectMessage(expectTimeout) { (msg, _)
msg should ===(GotReceiveTimeout)
}
})
}
"set large receive timeout" in {
sync(setup("ctx31") { (ctx, startWith)
val self = ctx.self
startWith
.stimulate(_ ! SetTimeout(1.minute, self), _ TimeoutSet)
.stimulate(_ ctx.schedule(1.second, self, Pong2), _ Pong2)
.stimulate(_ ! Ping(self), _ Pong1)
})
}
"schedule a message" in {
sync(setup("ctx32") { (ctx, startWith)
startWith(_ ! Schedule(1.nano, ctx.self, Pong2, ctx.self))
.expectMultipleMessages(expectTimeout, 2) { (msgs, _)
msgs should ===(Scheduled :: Pong2 :: Nil)
}
})
}
"create a working adapter" in {
sync(setup("ctx40", ignorePostStop = false) { (ctx, startWith)
startWith.keep { subj
subj ! GetAdapter(ctx.self)
}.expectMessage(expectTimeout) { (msg, subj)
val Adapter(adapter) = msg
ctx.watch(adapter)
adapter ! Ping(ctx.self)
(subj, adapter)
}.expectMessage(expectTimeout) {
case (msg, (subj, adapter))
msg should ===(Pong1)
ctx.stop(subj)
adapter
}.expectMulti(expectTimeout, 2) { (msgs, adapter)
msgs.toSet should ===(Set(Left(Terminated(adapter)(null)), Right(GotSignal(PostStop))))
} }
}) })
}
def `40 must create a working adapter`(): Unit = sync(setup("ctx40", ignorePostStop = false) { (ctx, startWith) "create a named adapter" in {
startWith.keep { subj sync(setup("ctx41") { (ctx, startWith)
subj ! GetAdapter(ctx.self) startWith.keep { subj
}.expectMessage(expectTimeout) { (msg, subj) subj ! GetAdapter(ctx.self, "named")
val Adapter(adapter) = msg }.expectMessage(expectTimeout) { (msg, subj)
ctx.watch(adapter) val Adapter(adapter) = msg
adapter ! Ping(ctx.self) adapter.path.name should include("named")
(subj, adapter)
}.expectMessage(expectTimeout) {
case (msg, (subj, adapter))
msg should ===(Pong1)
ctx.stop(subj)
adapter
}.expectMulti(expectTimeout, 2) { (msgs, adapter)
msgs.toSet should ===(Set(Left(Terminated(adapter)(null)), Right(GotSignal(PostStop))))
}
})
def `41 must create a named adapter`(): Unit = sync(setup("ctx41") { (ctx, startWith)
startWith.keep { subj
subj ! GetAdapter(ctx.self, "named")
}.expectMessage(expectTimeout) { (msg, subj)
val Adapter(adapter) = msg
adapter.path.name should include("named")
}
})
def `42 must not allow null messages`(): Unit = sync(setup("ctx42") { (ctx, startWith)
startWith.keep { subj
intercept[InvalidMessageException] {
subj ! null
} }
} })
}) }
}
trait Normal extends Tests { "not allow null messages" in {
override def suite = "normal" sync(setup("ctx42") { (ctx, startWith)
override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] = startWith.keep { subj
subject(ctx.self, ignorePostStop) intercept[InvalidMessageException] {
subj ! null
}
}
})
}
} }
object `An ActorContext (adapted)` extends Normal with AdaptedSystem
trait Widened extends Tests {
import Actor._
override def suite = "widened"
override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] =
subject(ctx.self, ignorePostStop).widen { case x x }
}
object `An ActorContext with widened Behavior (adapted)` extends Widened with AdaptedSystem
trait Deferred extends Tests {
override def suite = "deferred"
override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] =
Actor.deferred(_ subject(ctx.self, ignorePostStop))
}
object `An ActorContext with deferred Behavior (adapted)` extends Deferred with AdaptedSystem
trait NestedDeferred extends Tests {
override def suite = "deferred"
override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] =
Actor.deferred(_ Actor.deferred(_ subject(ctx.self, ignorePostStop)))
}
object `An ActorContext with nested deferred Behavior (adapted)` extends NestedDeferred with AdaptedSystem
trait Tap extends Tests {
override def suite = "tap"
override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] =
Actor.tap((_, _) (), (_, _) (), subject(ctx.self, ignorePostStop))
}
object `An ActorContext with Tap (old-adapted)` extends Tap with AdaptedSystem
} }
import ActorContextSpec._
class NormalActorContextSpec extends ActorContextSpec {
override def suite = "normal"
override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] =
subject(ctx.self, ignorePostStop)
}
class WidenedActorContextSpec extends ActorContextSpec {
import Actor._
override def suite = "widened"
override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] =
subject(ctx.self, ignorePostStop).widen { case x x }
}
class DeferredActorContextSpec extends ActorContextSpec {
override def suite = "deferred"
override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] =
Actor.deferred(_ subject(ctx.self, ignorePostStop))
}
class NestedDeferredActorContextSpec extends ActorContextSpec {
override def suite = "nexted-deferred"
override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] =
Actor.deferred(_ Actor.deferred(_ subject(ctx.self, ignorePostStop)))
}
class TapActorContextSpec extends ActorContextSpec {
override def suite = "tap"
override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] =
Actor.tap((_, _) (), (_, _) (), subject(ctx.self, ignorePostStop))
}

View file

@ -12,56 +12,46 @@ import akka.actor.typed.scaladsl.Actor._
import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.AskPattern._
object AskSpec { object AskSpec {
sealed trait Msg sealed trait Msg
final case class Foo(s: String)(val replyTo: ActorRef[String]) extends Msg final case class Foo(s: String)(val replyTo: ActorRef[String]) extends Msg
final case class Stop(replyTo: ActorRef[Unit]) extends Msg final case class Stop(replyTo: ActorRef[Unit]) extends Msg
} }
class AskSpec extends TypedSpec with ScalaFutures { class AskSpec extends TypedSpec with ScalaFutures {
import AskSpec._ import AskSpec._
trait Common { implicit def executor: ExecutionContext =
system.executionContext
def system: ActorSystem[TypedSpec.Command] val behavior: Behavior[Msg] = immutable[Msg] {
case (_, foo: Foo)
foo.replyTo ! "foo"
same
case (_, Stop(r))
r ! ()
stopped
}
implicit def executor: ExecutionContext = "Ask pattern" must {
system.executionContext "must fail the future if the actor is already terminated" in {
val behavior: Behavior[Msg] = immutable[Msg] {
case (_, foo: Foo)
foo.replyTo ! "foo"
same
case (_, Stop(r))
r ! (())
stopped
}
def `must fail the future if the actor is already terminated`(): Unit = {
val fut = for { val fut = for {
ref system ? TypedSpec.Create(behavior, "test1") ref system ? TypedSpec.Create(behavior, "test1")
_ ref ? Stop _ ref ? Stop
answer ref.?(Foo("bar"))(Timeout(1.second), implicitly) answer ref.?(Foo("bar"))(Timeout(1.second), implicitly)
} yield answer } yield answer
(fut.recover { case _: AskTimeoutException "" }).futureValue should ===("") fut.recover { case _: AskTimeoutException "" }.futureValue should ===("")
} }
def `must succeed when the actor is alive`(): Unit = { "must succeed when the actor is alive" in {
val fut = for { val fut = for {
ref system ? TypedSpec.Create(behavior, "test2") ref system ? TypedSpec.Create(behavior, "test2")
answer ref ? Foo("bar") answer ref ? Foo("bar")
} yield answer } yield answer
fut.futureValue should ===("foo") fut.futureValue should ===("foo")
} }
}
object `Ask pattern (adapted)` extends Common with AdaptedSystem {
import AskSpec._
/** See issue #19947 (MatchError with adapted ActorRef) */ /** See issue #19947 (MatchError with adapted ActorRef) */
def `must fail the future if the actor doesn't exist`(): Unit = { "must fail the future if the actor doesn't exist" in {
val noSuchActor: ActorRef[Msg] = system match { val noSuchActor: ActorRef[Msg] = system match {
case adaptedSys: akka.actor.typed.internal.adapter.ActorSystemAdapter[_] case adaptedSys: akka.actor.typed.internal.adapter.ActorSystemAdapter[_]
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._

View file

@ -12,8 +12,7 @@ import java.util.function.{ Function ⇒ F1 }
import akka.Done import akka.Done
import akka.typed.testkit.{ EffectfulActorContext, Inbox } import akka.typed.testkit.{ EffectfulActorContext, Inbox }
class BehaviorSpec extends TypedSpec { object BehaviorSpec {
sealed trait Command { sealed trait Command {
def expectedResponse(ctx: ActorContext[Command]): Seq[Event] = Nil def expectedResponse(ctx: ActorContext[Command]): Seq[Event] = Nil
} }
@ -50,11 +49,19 @@ class BehaviorSpec extends TypedSpec {
case object Pong extends Event case object Pong extends Event
case object Swapped extends Event case object Swapped extends Event
sealed trait State extends Event { def next: State } sealed trait State extends Event {
val StateA: State = new State { override def toString = "StateA"; override def next = StateB } def next: State
val StateB: State = new State { override def toString = "StateB"; override def next = StateA } }
val StateA: State = new State {
override def toString = "StateA"
override def next = StateB
}
val StateB: State = new State {
override def toString = "StateB"
override def next = StateA
}
trait Common { trait Common extends TypedSpec {
type Aux >: Null <: AnyRef type Aux >: Null <: AnyRef
def system: ActorSystem[TypedSpec.Command] def system: ActorSystem[TypedSpec.Command]
def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux)
@ -130,131 +137,7 @@ class BehaviorSpec extends TypedSpec {
} }
} }
trait Lifecycle extends Common { def mkFull(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] = {
def `must react to PreStart`(): Unit = {
mkCtx(requirePreStart = true)
}
def `must react to PostStop`(): Unit = {
mkCtx().check(PostStop)
}
def `must react to PostStop after a message`(): Unit = {
mkCtx().check(GetSelf).check(PostStop)
}
def `must react to PreRestart`(): Unit = {
mkCtx().check(PreRestart)
}
def `must react to PreRestart after a message`(): Unit = {
mkCtx().check(GetSelf).check(PreRestart)
}
def `must react to Terminated`(): Unit = {
mkCtx().check(Terminated(Inbox("x").ref)(null))
}
def `must react to Terminated after a message`(): Unit = {
mkCtx().check(GetSelf).check(Terminated(Inbox("x").ref)(null))
}
def `must react to a message after Terminated`(): Unit = {
mkCtx().check(Terminated(Inbox("x").ref)(null)).check(GetSelf)
}
}
trait Messages extends Common {
def `must react to two messages`(): Unit = {
mkCtx().check(Ping).check(Ping)
}
def `must react to a message after missing one`(): Unit = {
mkCtx().check(Miss).check(Ping)
}
def `must react to a message after ignoring one`(): Unit = {
mkCtx().check(Ignore).check(Ping)
}
}
trait Unhandled extends Common {
def `must return Unhandled`(): Unit = {
val Setup(ctx, inbox, aux) = mkCtx()
Behavior.interpretMessage(ctx.currentBehavior, ctx, Miss) should be(Behavior.UnhandledBehavior)
inbox.receiveAll() should ===(Missed :: Nil)
checkAux(Miss, aux)
}
}
trait Stoppable extends Common {
def `must stop`(): Unit = {
val Setup(ctx, inbox, aux) = mkCtx()
ctx.run(Stop)
ctx.currentBehavior should be(Behavior.StoppedBehavior)
checkAux(Stop, aux)
}
}
trait Become extends Common with Unhandled {
private implicit val inbox = Inbox[State]("state")
def `must be in state A`(): Unit = {
mkCtx().check(GetState()(StateA))
}
def `must switch to state B`(): Unit = {
mkCtx().check(Swap).check(GetState()(StateB))
}
def `must switch back to state A`(): Unit = {
mkCtx().check(Swap).check(Swap).check(GetState()(StateA))
}
}
trait BecomeWithLifecycle extends Become with Lifecycle {
def `must react to PostStop after swap`(): Unit = {
mkCtx().check(Swap).check(PostStop)
}
def `must react to PostStop after a message after swap`(): Unit = {
mkCtx().check(Swap).check(GetSelf).check(PostStop)
}
def `must react to PreRestart after swap`(): Unit = {
mkCtx().check(Swap).check(PreRestart)
}
def `must react to PreRestart after a message after swap`(): Unit = {
mkCtx().check(Swap).check(GetSelf).check(PreRestart)
}
def `must react to Terminated after swap`(): Unit = {
mkCtx().check(Swap).check(Terminated(Inbox("x").ref)(null))
}
def `must react to Terminated after a message after swap`(): Unit = {
mkCtx().check(Swap).check(GetSelf).check(Terminated(Inbox("x").ref)(null))
}
def `must react to a message after Terminated after swap`(): Unit = {
mkCtx().check(Swap).check(Terminated(Inbox("x").ref)(null)).check(GetSelf)
}
}
/**
* This targets behavior wrappers to ensure that the wrapper does not
* hold on to the changed behavior. Wrappers must be immutable.
*/
trait Reuse extends Common {
def `must be reusable`(): Unit = {
val i = init()
i.mkCtx().check(GetState()(StateA)).check(Swap).check(GetState()(StateB))
i.mkCtx().check(GetState()(StateA)).check(Swap).check(GetState()(StateB))
}
}
private def mkFull(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] = {
SActor.immutable[Command] { SActor.immutable[Command] {
case (ctx, GetSelf) case (ctx, GetSelf)
monitor ! Self(ctx.self) monitor ! Self(ctx.self)
@ -282,191 +165,9 @@ class BehaviorSpec extends TypedSpec {
SActor.same SActor.same
} }
} }
trait FullBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = mkFull(monitor) null
}
object `A Full Behavior (adapted)` extends FullBehavior with AdaptedSystem
trait ImmutableBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
private def behv(monitor: ActorRef[Event], state: State): Behavior[Command] = {
SActor.immutable[Command] {
case (ctx, GetSelf)
monitor ! Self(ctx.self)
SActor.same
case (_, Miss)
monitor ! Missed
SActor.unhandled
case (_, Ignore)
monitor ! Ignored
SActor.same
case (_, Ping)
monitor ! Pong
behv(monitor, state)
case (_, Swap)
monitor ! Swapped
behv(monitor, state.next)
case (_, GetState())
monitor ! state
SActor.same
case (_, Stop) SActor.stopped
case (_, _: AuxPing) SActor.unhandled
} onSignal {
case (ctx, signal)
monitor ! GotSignal(signal)
SActor.same
}
}
}
object `A immutable Behavior (adapted)` extends ImmutableBehavior with AdaptedSystem
trait ImmutableWithSignalScalaBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) null
def behv(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] =
SActor.immutable[Command] {
(ctx, msg)
msg match {
case GetSelf
monitor ! Self(ctx.self)
SActor.same
case Miss
monitor ! Missed
SActor.unhandled
case Ignore
monitor ! Ignored
SActor.same
case Ping
monitor ! Pong
behv(monitor, state)
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.same
case Stop SActor.stopped
case _: AuxPing SActor.unhandled
}
} onSignal {
case (ctx, sig)
monitor ! GotSignal(sig)
SActor.same
}
}
object `A ImmutableWithSignal Behavior (scala,adapted)` extends ImmutableWithSignalScalaBehavior with AdaptedSystem
trait ImmutableScalaBehavior extends Messages with Become with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
def behv(monitor: ActorRef[Event], state: State): Behavior[Command] =
SActor.immutable[Command] { (ctx, msg)
msg match {
case GetSelf
monitor ! Self(ctx.self)
SActor.same
case Miss
monitor ! Missed
SActor.unhandled
case Ignore
monitor ! Ignored
SActor.same
case Ping
monitor ! Pong
behv(monitor, state)
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.same
case Stop SActor.stopped
case _: AuxPing SActor.unhandled
}
}
}
object `A immutable Behavior (scala,adapted)` extends ImmutableScalaBehavior with AdaptedSystem
trait MutableScalaBehavior extends Messages with Become with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) null
def behv(monitor: ActorRef[Event]): Behavior[Command] =
SActor.mutable[Command] { ctx
new SActor.MutableBehavior[Command] {
private var state: State = StateA
override def onMessage(msg: Command): Behavior[Command] = {
msg match {
case GetSelf
monitor ! Self(ctx.self)
this
case Miss
monitor ! Missed
SActor.unhandled
case Ignore
monitor ! Ignored
SActor.same // this or same works the same way
case Ping
monitor ! Pong
this
case Swap
monitor ! Swapped
state = state.next
this
case GetState()
monitor ! state
this
case Stop SActor.stopped
case _: AuxPing SActor.unhandled
}
}
}
}
}
object `A mutable Behavior (scala,adapted)` extends MutableScalaBehavior with AdaptedSystem
trait WidenedScalaBehavior extends ImmutableWithSignalScalaBehavior with Reuse with Siphon {
import SActor.BehaviorDecorators
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Command]("widenedListener")
super.behavior(monitor)._1.widen[Command] { case c inbox.ref ! c; c } inbox
}
}
object `A widened Behavior (scala,adapted)` extends WidenedScalaBehavior with AdaptedSystem
trait DeferredScalaBehavior extends ImmutableWithSignalScalaBehavior {
override type Aux = Inbox[Done]
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Done]("deferredListener")
(SActor.deferred(ctx {
inbox.ref ! Done
super.behavior(monitor)._1
}), inbox)
}
override def checkAux(signal: Signal, aux: Aux): Unit =
aux.receiveAll() should ===(Done :: Nil)
}
object `A deferred Behavior (scala,adapted)` extends DeferredScalaBehavior with AdaptedSystem
trait TapScalaBehavior extends ImmutableWithSignalScalaBehavior with Reuse with SignalSiphon {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Either[Signal, Command]]("tapListener")
(SActor.tap((_, msg) inbox.ref ! Right(msg), (_, sig) inbox.ref ! Left(sig), super.behavior(monitor)._1), inbox)
}
}
object `A tap Behavior (scala,adapted)` extends TapScalaBehavior with AdaptedSystem
trait RestarterScalaBehavior extends ImmutableWithSignalScalaBehavior with Reuse {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
SActor.supervise(super.behavior(monitor)._1).onFailure(SupervisorStrategy.restart) null
}
}
object `A restarter Behavior (scala,adapted)` extends RestarterScalaBehavior with AdaptedSystem
/* /*
* function converters for Java, to ease the pain on Scala 2.11 * function converters for Java, to ease the pain on Scala 2.11
*/ */
def fs(f: (JActorContext[Command], Signal) Behavior[Command]) = def fs(f: (JActorContext[Command], Signal) Behavior[Command]) =
new F2[JActorContext[Command], Signal, Behavior[Command]] { new F2[JActorContext[Command], Signal, Behavior[Command]] {
override def apply(ctx: JActorContext[Command], sig: Signal) = f(ctx, sig) override def apply(ctx: JActorContext[Command], sig: Signal) = f(ctx, sig)
@ -496,11 +197,365 @@ class BehaviorSpec extends TypedSpec {
override def apply(in: JActorContext[Command]) = f(in) override def apply(in: JActorContext[Command]) = f(in)
} }
trait ImmutableWithSignalJavaBehavior extends Messages with BecomeWithLifecycle with Stoppable { trait Lifecycle extends Common {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) null "Lifecycle" must {
def behv(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] = "must react to PreStart" in {
JActor.immutable( mkCtx(requirePreStart = true)
fc((ctx, msg) msg match { }
"must react to PostStop" in {
mkCtx().check(PostStop)
}
"must react to PostStop after a message" in {
mkCtx().check(GetSelf).check(PostStop)
}
"must react to PreRestart" in {
mkCtx().check(PreRestart)
}
"must react to PreRestart after a message" in {
mkCtx().check(GetSelf).check(PreRestart)
}
"must react to Terminated" in {
mkCtx().check(Terminated(Inbox("x").ref)(null))
}
"must react to Terminated after a message" in {
mkCtx().check(GetSelf).check(Terminated(Inbox("x").ref)(null))
}
"must react to a message after Terminated" in {
mkCtx().check(Terminated(Inbox("x").ref)(null)).check(GetSelf)
}
}
}
trait Messages extends Common {
"Messages" must {
"react to two messages" in {
mkCtx().check(Ping).check(Ping)
}
"react to a message after missing one" in {
mkCtx().check(Miss).check(Ping)
}
"must react to a message after ignoring one" in {
mkCtx().check(Ignore).check(Ping)
}
}
}
trait Unhandled extends Common {
"Unahndled" must {
"must return Unhandled" in {
val Setup(ctx, inbox, aux) = mkCtx()
Behavior.interpretMessage(ctx.currentBehavior, ctx, Miss) should be(Behavior.UnhandledBehavior)
inbox.receiveAll() should ===(Missed :: Nil)
checkAux(Miss, aux)
}
}
}
trait Stoppable extends Common {
"Stopping" must {
"must stop" in {
val Setup(ctx, inbox, aux) = mkCtx()
ctx.run(Stop)
ctx.currentBehavior should be(Behavior.StoppedBehavior)
checkAux(Stop, aux)
}
}
}
trait Become extends Common with Unhandled {
private implicit val inbox = Inbox[State]("state")
"Becoming" must {
"must be in state A" in {
mkCtx().check(GetState()(StateA))
}
"must switch to state B" in {
mkCtx().check(Swap).check(GetState()(StateB))
}
"must switch back to state A" in {
mkCtx().check(Swap).check(Swap).check(GetState()(StateA))
}
}
}
trait BecomeWithLifecycle extends Become with Lifecycle {
"Become with lifecycle" must {
"react to PostStop after swap" in {
mkCtx().check(Swap).check(PostStop)
}
"react to PostStop after a message after swap" in {
mkCtx().check(Swap).check(GetSelf).check(PostStop)
}
"react to PreRestart after swap" in {
mkCtx().check(Swap).check(PreRestart)
}
"react to PreRestart after a message after swap" in {
mkCtx().check(Swap).check(GetSelf).check(PreRestart)
}
"react to Terminated after swap" in {
mkCtx().check(Swap).check(Terminated(Inbox("x").ref)(null))
}
"react to Terminated after a message after swap" in {
mkCtx().check(Swap).check(GetSelf).check(Terminated(Inbox("x").ref)(null))
}
"react to a message after Terminated after swap" in {
mkCtx().check(Swap).check(Terminated(Inbox("x").ref)(null)).check(GetSelf)
}
}
}
/**
* This targets behavior wrappers to ensure that the wrapper does not
* hold on to the changed behavior. Wrappers must be immutable.
*/
trait Reuse extends Common {
"Reuse" must {
"must be reusable" in {
val i = init()
i.mkCtx().check(GetState()(StateA)).check(Swap).check(GetState()(StateB))
i.mkCtx().check(GetState()(StateA)).check(Swap).check(GetState()(StateB))
}
}
}
}
import BehaviorSpec._
class FullBehaviorSpec extends TypedSpec with Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = mkFull(monitor) null
}
class ImmutableBehaviorSpec extends TypedSpec with Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
private def behv(monitor: ActorRef[Event], state: State): Behavior[Command] = {
SActor.immutable[Command] {
case (ctx, GetSelf)
monitor ! Self(ctx.self)
SActor.same
case (_, Miss)
monitor ! Missed
SActor.unhandled
case (_, Ignore)
monitor ! Ignored
SActor.same
case (_, Ping)
monitor ! Pong
behv(monitor, state)
case (_, Swap)
monitor ! Swapped
behv(monitor, state.next)
case (_, GetState())
monitor ! state
SActor.same
case (_, Stop) SActor.stopped
case (_, _: AuxPing) SActor.unhandled
} onSignal {
case (ctx, signal)
monitor ! GotSignal(signal)
SActor.same
}
}
}
class ImmutableWithSignalScalaBehaviorSpec extends TypedSpec with Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) null
def behv(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] =
SActor.immutable[Command] {
(ctx, msg)
msg match {
case GetSelf
monitor ! Self(ctx.self)
SActor.same
case Miss
monitor ! Missed
SActor.unhandled
case Ignore
monitor ! Ignored
SActor.same
case Ping
monitor ! Pong
behv(monitor, state)
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.same
case Stop SActor.stopped
case _: AuxPing SActor.unhandled
}
} onSignal {
case (ctx, sig)
monitor ! GotSignal(sig)
SActor.same
}
}
class ImmutableScalaBehaviorSpec extends TypedSpec with Messages with Become with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
def behv(monitor: ActorRef[Event], state: State): Behavior[Command] =
SActor.immutable[Command] { (ctx, msg)
msg match {
case GetSelf
monitor ! Self(ctx.self)
SActor.same
case Miss
monitor ! Missed
SActor.unhandled
case Ignore
monitor ! Ignored
SActor.same
case Ping
monitor ! Pong
behv(monitor, state)
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.same
case Stop SActor.stopped
case _: AuxPing SActor.unhandled
}
}
}
class MutableScalaBehaviorSpec extends TypedSpec with Messages with Become with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) null
def behv(monitor: ActorRef[Event]): Behavior[Command] =
SActor.mutable[Command] { ctx
new SActor.MutableBehavior[Command] {
private var state: State = StateA
override def onMessage(msg: Command): Behavior[Command] = {
msg match {
case GetSelf
monitor ! Self(ctx.self)
this
case Miss
monitor ! Missed
SActor.unhandled
case Ignore
monitor ! Ignored
SActor.same // this or same works the same way
case Ping
monitor ! Pong
this
case Swap
monitor ! Swapped
state = state.next
this
case GetState()
monitor ! state
this
case Stop SActor.stopped
case _: AuxPing SActor.unhandled
}
}
}
}
}
class WidenedScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec with Reuse with Siphon {
import SActor.BehaviorDecorators
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Command]("widenedListener")
super.behavior(monitor)._1.widen[Command] { case c inbox.ref ! c; c } inbox
}
}
class DeferredScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec {
override type Aux = Inbox[Done]
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Done]("deferredListener")
(SActor.deferred(ctx {
inbox.ref ! Done
super.behavior(monitor)._1
}), inbox)
}
override def checkAux(signal: Signal, aux: Aux): Unit =
aux.receiveAll() should ===(Done :: Nil)
}
class TapScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec with Reuse with SignalSiphon {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Either[Signal, Command]]("tapListener")
(SActor.tap((_, msg) inbox.ref ! Right(msg), (_, sig) inbox.ref ! Left(sig), super.behavior(monitor)._1), inbox)
}
}
class RestarterScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec with Reuse {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
SActor.supervise(super.behavior(monitor)._1).onFailure(SupervisorStrategy.restart) null
}
}
class ImmutableWithSignalJavaBehaviorSpec extends TypedSpec with Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) null
def behv(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] =
JActor.immutable(
fc((ctx, msg) msg match {
case GetSelf
monitor ! Self(ctx.getSelf)
SActor.same
case Miss
monitor ! Missed
SActor.unhandled
case Ignore
monitor ! Ignored
SActor.same
case Ping
monitor ! Pong
behv(monitor, state)
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.same
case Stop SActor.stopped
case _: AuxPing SActor.unhandled
}),
fs((ctx, sig) {
monitor ! GotSignal(sig)
SActor.same
}))
}
class ImmutableJavaBehaviorSpec extends TypedSpec with Messages with Become with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
def behv(monitor: ActorRef[Event], state: State): Behavior[Command] =
JActor.immutable {
fc((ctx, msg)
msg match {
case GetSelf case GetSelf
monitor ! Self(ctx.getSelf) monitor ! Self(ctx.getSelf)
SActor.same SActor.same
@ -521,86 +576,48 @@ class BehaviorSpec extends TypedSpec {
SActor.same SActor.same
case Stop SActor.stopped case Stop SActor.stopped
case _: AuxPing SActor.unhandled case _: AuxPing SActor.unhandled
}), })
fs((ctx, sig) {
monitor ! GotSignal(sig)
SActor.same
}))
}
object `A ImmutableWithSignal Behavior (java,adapted)` extends ImmutableWithSignalJavaBehavior with AdaptedSystem
trait ImmutableJavaBehavior extends Messages with Become with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
def behv(monitor: ActorRef[Event], state: State): Behavior[Command] =
JActor.immutable {
fc((ctx, msg)
msg match {
case GetSelf
monitor ! Self(ctx.getSelf)
SActor.same
case Miss
monitor ! Missed
SActor.unhandled
case Ignore
monitor ! Ignored
SActor.same
case Ping
monitor ! Pong
behv(monitor, state)
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.same
case Stop SActor.stopped
case _: AuxPing SActor.unhandled
})
}
}
object `A immutable Behavior (java,adapted)` extends ImmutableJavaBehavior with AdaptedSystem
trait WidenedJavaBehavior extends ImmutableWithSignalJavaBehavior with Reuse with Siphon {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Command]("widenedListener")
JActor.widened(super.behavior(monitor)._1, pf(_.`match`(classOf[Command], fi(x { inbox.ref ! x; x })))) inbox
} }
} }
object `A widened Behavior (java,adapted)` extends WidenedJavaBehavior with AdaptedSystem
class WidenedJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec with Reuse with Siphon {
trait DeferredJavaBehavior extends ImmutableWithSignalJavaBehavior { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
override type Aux = Inbox[Done] val inbox = Inbox[Command]("widenedListener")
JActor.widened(super.behavior(monitor)._1, pf(_.`match`(classOf[Command], fi(x {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { inbox.ref ! x
val inbox = Inbox[Done]("deferredListener") x
(JActor.deferred(df(ctx { })))) inbox
inbox.ref ! Done }
super.behavior(monitor)._1 }
})), inbox)
} class DeferredJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec {
override type Aux = Inbox[Done]
override def checkAux(signal: Signal, aux: Aux): Unit =
aux.receiveAll() should ===(Done :: Nil) override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
} val inbox = Inbox[Done]("deferredListener")
object `A deferred Behavior (java,adapted)` extends DeferredJavaBehavior with AdaptedSystem (JActor.deferred(df(ctx {
inbox.ref ! Done
trait TapJavaBehavior extends ImmutableWithSignalJavaBehavior with Reuse with SignalSiphon { super.behavior(monitor)._1
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { })), inbox)
val inbox = Inbox[Either[Signal, Command]]("tapListener") }
(JActor.tap(
pc((_, msg) inbox.ref ! Right(msg)), override def checkAux(signal: Signal, aux: Aux): Unit =
ps((_, sig) inbox.ref ! Left(sig)), aux.receiveAll() should ===(Done :: Nil)
super.behavior(monitor)._1), inbox) }
}
} class TapJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec with Reuse with SignalSiphon {
object `A tap Behavior (java,adapted)` extends TapJavaBehavior with AdaptedSystem override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Either[Signal, Command]]("tapListener")
trait RestarterJavaBehavior extends ImmutableWithSignalJavaBehavior with Reuse { (JActor.tap(
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { pc((_, msg) inbox.ref ! Right(msg)),
JActor.supervise(super.behavior(monitor)._1) ps((_, sig) inbox.ref ! Left(sig)),
.onFailure(classOf[Exception], SupervisorStrategy.restart) null super.behavior(monitor)._1), inbox)
} }
} }
object `A restarter Behavior (java,adapted)` extends RestarterJavaBehavior with AdaptedSystem
class RestarterJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec with Reuse {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
JActor.supervise(super.behavior(monitor)._1)
.onFailure(classOf[Exception], SupervisorStrategy.restart) null
}
} }

View file

@ -12,9 +12,7 @@ import akka.actor.typed.scaladsl.AskPattern._
import akka.typed.testkit.{ EffectfulActorContext, Inbox, TestKitSettings } import akka.typed.testkit.{ EffectfulActorContext, Inbox, TestKitSettings }
import akka.typed.testkit.scaladsl._ import akka.typed.testkit.scaladsl._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) object DeferredSpec {
class DeferredSpec extends TypedSpec {
sealed trait Command sealed trait Command
case object Ping extends Command case object Ping extends Command
@ -28,44 +26,15 @@ class DeferredSpec extends TypedSpec {
monitor ! Pong monitor ! Pong
Actor.same Actor.same
}) })
}
trait StubbedTests { class DeferredSpec extends TypedSpec with StartSupport {
def system: ActorSystem[TypedSpec.Command]
def mkCtx(behv: Behavior[Command]): EffectfulActorContext[Command] = import DeferredSpec._
new EffectfulActorContext("ctx", behv, 1000, system) implicit val testSettings = TestKitSettings(system)
def `must create underlying deferred behavior immediately`(): Unit = { "Deferred behaviour" must {
val inbox = Inbox[Event]("evt") "must create underlying" in {
val behv = Actor.deferred[Command] { _
inbox.ref ! Started
target(inbox.ref)
}
val ctx = mkCtx(behv)
// it's supposed to be created immediately (not waiting for first message)
inbox.receiveMsg() should ===(Started)
}
def `must stop when exception from factory`(): Unit = {
val inbox = Inbox[Event]("evt")
val exc = new RuntimeException("simulated exc from factory") with NoStackTrace
val behv = Actor.deferred[Command] { _
inbox.ref ! Started
throw exc
}
intercept[RuntimeException] {
mkCtx(behv)
} should ===(exc)
inbox.receiveMsg() should ===(Started)
}
}
trait RealTests extends StartSupport {
implicit def system: ActorSystem[TypedSpec.Command]
implicit val testSettings = TestKitSettings(system)
def `must create underlying`(): Unit = {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val behv = Actor.deferred[Command] { _ val behv = Actor.deferred[Command] { _
probe.ref ! Started probe.ref ! Started
@ -77,7 +46,7 @@ class DeferredSpec extends TypedSpec {
probe.expectMsg(Started) probe.expectMsg(Started)
} }
def `must stop when exception from factory`(): Unit = { "must stop when exception from factory" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val behv = Actor.deferred[Command] { ctx val behv = Actor.deferred[Command] { ctx
val child = ctx.spawnAnonymous(Actor.deferred[Command] { _ val child = ctx.spawnAnonymous(Actor.deferred[Command] { _
@ -96,7 +65,7 @@ class DeferredSpec extends TypedSpec {
probe.expectMsg(Pong) probe.expectMsg(Pong)
} }
def `must stop when deferred result it Stopped`(): Unit = { "must stop when deferred result it Stopped" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val behv = Actor.deferred[Command] { ctx val behv = Actor.deferred[Command] { ctx
val child = ctx.spawnAnonymous(Actor.deferred[Command](_ Actor.stopped)) val child = ctx.spawnAnonymous(Actor.deferred[Command](_ Actor.stopped))
@ -111,7 +80,7 @@ class DeferredSpec extends TypedSpec {
probe.expectMsg(Pong) probe.expectMsg(Pong)
} }
def `must create underlying when nested`(): Unit = { "must create underlying when nested" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val behv = Actor.deferred[Command] { _ val behv = Actor.deferred[Command] { _
Actor.deferred[Command] { _ Actor.deferred[Command] { _
@ -123,7 +92,7 @@ class DeferredSpec extends TypedSpec {
probe.expectMsg(Started) probe.expectMsg(Started)
} }
def `must undefer underlying when wrapped by widen`(): Unit = { "must undefer underlying when wrapped by widen" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val behv = Actor.deferred[Command] { _ val behv = Actor.deferred[Command] { _
probe.ref ! Started probe.ref ! Started
@ -139,7 +108,7 @@ class DeferredSpec extends TypedSpec {
probe.expectMsg(Pong) probe.expectMsg(Pong)
} }
def `must undefer underlying when wrapped by monitor`(): Unit = { "must undefer underlying when wrapped by monitor" in {
// monitor is implemented with tap, so this is testing both // monitor is implemented with tap, so this is testing both
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val monitorProbe = TestProbe[Command]("monitor") val monitorProbe = TestProbe[Command]("monitor")
@ -155,10 +124,40 @@ class DeferredSpec extends TypedSpec {
monitorProbe.expectMsg(Ping) monitorProbe.expectMsg(Ping)
probe.expectMsg(Pong) probe.expectMsg(Pong)
} }
} }
object `A DeferredBehavior (stubbed, adapted)` extends StubbedTests with AdaptedSystem }
object `A DeferredBehavior (real, adapted)` extends RealTests with AdaptedSystem
class DeferredStubbedSpec extends TypedSpec {
import DeferredSpec._
def mkCtx(behv: Behavior[Command]): EffectfulActorContext[Command] =
new EffectfulActorContext("ctx", behv, 1000, system)
"must create underlying deferred behavior immediately" in {
val inbox = Inbox[Event]("evt")
val behv = Actor.deferred[Command] { _
inbox.ref ! Started
target(inbox.ref)
}
val ctx = mkCtx(behv)
// it's supposed to be created immediately (not waiting for first message)
inbox.receiveMsg() should ===(Started)
}
"must stop when exception from factory" in {
val inbox = Inbox[Event]("evt")
val exc = new RuntimeException("simulated exc from factory") with NoStackTrace
val behv = Actor.deferred[Command] { _
inbox.ref ! Started
throw exc
}
intercept[RuntimeException] {
mkCtx(behv)
} should ===(exc)
inbox.receiveMsg() should ===(Started)
}
} }

View file

@ -45,9 +45,8 @@ object InstanceCountingExtension extends ExtensionId[DummyExtension1] {
class ExtensionsSpec extends TypedSpecSetup { class ExtensionsSpec extends TypedSpecSetup {
object `The extensions subsystem` { "The extensions subsystem" must {
"return the same instance for the same id" in
def `01 should return the same instance for the same id`(): Unit =
withEmptyActorSystem("ExtensionsSpec01") { system withEmptyActorSystem("ExtensionsSpec01") { system
val instance1 = system.registerExtension(DummyExtension1) val instance1 = system.registerExtension(DummyExtension1)
val instance2 = system.registerExtension(DummyExtension1) val instance2 = system.registerExtension(DummyExtension1)
@ -61,7 +60,7 @@ class ExtensionsSpec extends TypedSpecSetup {
instance4 should be theSameInstanceAs instance3 instance4 should be theSameInstanceAs instance3
} }
def `02 should return the same instance for the same id concurrently`(): Unit = "return the same instance for the same id concurrently" in
withEmptyActorSystem("ExtensionsSpec02") { system withEmptyActorSystem("ExtensionsSpec02") { system
// not exactly water tight but better than nothing // not exactly water tight but better than nothing
import system.executionContext import system.executionContext
@ -79,7 +78,7 @@ class ExtensionsSpec extends TypedSpecSetup {
} }
} }
def `03 should load extensions from the configuration`(): Unit = "load extensions from the configuration" in
withEmptyActorSystem("ExtensionsSpec03", Some(ConfigFactory.parseString( withEmptyActorSystem("ExtensionsSpec03", Some(ConfigFactory.parseString(
""" """
akka.typed.extensions = ["akka.actor.typed.DummyExtension1$", "akka.actor.typed.SlowExtension$"] akka.typed.extensions = ["akka.actor.typed.DummyExtension1$", "akka.actor.typed.SlowExtension$"]
@ -92,7 +91,7 @@ class ExtensionsSpec extends TypedSpecSetup {
system.extension(SlowExtension) shouldBe a[SlowExtension] system.extension(SlowExtension) shouldBe a[SlowExtension]
} }
def `04 handle extensions that fail to initialize`(): Unit = { "handle extensions that fail to initialize" in {
def create(): Unit = { def create(): Unit = {
ActorSystem[Any](Behavior.EmptyBehavior, "ExtensionsSpec04", config = Some(ConfigFactory.parseString( ActorSystem[Any](Behavior.EmptyBehavior, "ExtensionsSpec04", config = Some(ConfigFactory.parseString(
""" """
@ -109,7 +108,7 @@ class ExtensionsSpec extends TypedSpecSetup {
} }
} }
def `05 support multiple instances of the same type of extension (with different ids)`(): Unit = "support multiple instances of the same type of extension (with different ids)" in
withEmptyActorSystem("ExtensionsSpec06") { system withEmptyActorSystem("ExtensionsSpec06") { system
val id1 = new MultiExtensionId(1) val id1 = new MultiExtensionId(1)
val id2 = new MultiExtensionId(2) val id2 = new MultiExtensionId(2)
@ -119,7 +118,7 @@ class ExtensionsSpec extends TypedSpecSetup {
system.registerExtension(id1).n should ===(1) system.registerExtension(id1).n should ===(1)
} }
def `06 allow for auto-loading of library-extensions`(): Unit = "allow for auto-loading of library-extensions" in
withEmptyActorSystem("ExtensionsSpec06") { system withEmptyActorSystem("ExtensionsSpec06") { system
val listedExtensions = system.settings.config.getStringList("akka.typed.library-extensions") val listedExtensions = system.settings.config.getStringList("akka.typed.library-extensions")
listedExtensions.size should be > 0 listedExtensions.size should be > 0
@ -127,23 +126,23 @@ class ExtensionsSpec extends TypedSpecSetup {
InstanceCountingExtension.createCount.get() should be > 0 InstanceCountingExtension.createCount.get() should be > 0
} }
def `07 fail the system if a library-extension cannot be loaded`(): Unit = "fail the system if a library-extension cannot be loaded" in
intercept[RuntimeException] { intercept[RuntimeException] {
withEmptyActorSystem( withEmptyActorSystem(
"ExtensionsSpec07", "ExtensionsSpec07",
Some(ConfigFactory.parseString("""akka.typed.library-extensions += "akka.actor.typed.FailingToLoadExtension$" """)) Some(ConfigFactory.parseString("""akka.typed.library-extensions += "akka.actor.typed.FailingToLoadExtension$""""))
) { _ () } ) { _ () }
} }
def `08 fail the system if a library-extension cannot be loaded`(): Unit = "fail the system if a library-extension is missing" in
intercept[RuntimeException] { intercept[RuntimeException] {
withEmptyActorSystem( withEmptyActorSystem(
"ExtensionsSpec08", "ExtensionsSpec08",
Some(ConfigFactory.parseString("""akka.typed.library-extensions += "akka.actor.typed.MissingExtension" """)) Some(ConfigFactory.parseString("""akka.typed.library-extensions += "akka.actor.typed.MissingExtension""""))
) { _ () } ) { _ () }
} }
def `09 load an extension implemented in Java`(): Unit = "load an extension implemented in Java" in
withEmptyActorSystem("ExtensionsSpec09") { system withEmptyActorSystem("ExtensionsSpec09") { system
// no way to make apply work cleanly with extensions implemented in Java // no way to make apply work cleanly with extensions implemented in Java
val instance1 = ExtensionsTest.MyExtension.get(system) val instance1 = ExtensionsTest.MyExtension.get(system)
@ -152,7 +151,7 @@ class ExtensionsSpec extends TypedSpecSetup {
instance1 should be theSameInstanceAs instance2 instance1 should be theSameInstanceAs instance2
} }
def `10 not create an extension multiple times when using the ActorSystemAdapter`(): Unit = { "not create an extension multiple times when using the ActorSystemAdapter" in {
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._
val untypedSystem = akka.actor.ActorSystem() val untypedSystem = akka.actor.ActorSystem()
try { try {
@ -168,11 +167,10 @@ class ExtensionsSpec extends TypedSpecSetup {
untypedSystem.terminate().futureValue untypedSystem.terminate().futureValue
} }
} }
}
def withEmptyActorSystem[T](name: String, config: Option[Config] = None)(f: ActorSystem[_] T): T = { def withEmptyActorSystem[T](name: String, config: Option[Config] = None)(f: ActorSystem[_] T): T = {
val system = ActorSystem[Any](Behavior.EmptyBehavior, name, config = config) val system = ActorSystem[Any](Behavior.EmptyBehavior, name, config = config)
try f(system) finally system.terminate().futureValue try f(system) finally system.terminate().futureValue
}
} }
} }

View file

@ -11,73 +11,94 @@ import akka.util.Timeout
@RunWith(classOf[org.scalatest.junit.JUnitRunner]) @RunWith(classOf[org.scalatest.junit.JUnitRunner])
class PerformanceSpec extends TypedSpec( class PerformanceSpec extends TypedSpec(
ConfigFactory.parseString(""" ConfigFactory.parseString(
"""
# increase this if you do real benchmarking # increase this if you do real benchmarking
akka.actor.typed.PerformanceSpec.iterations=100000 akka.actor.typed.PerformanceSpec.iterations=100000
""")) { """)) {
override def setTimeout = Timeout(20.seconds) override def setTimeout = Timeout(20.seconds)
object `A immutable behavior` { case class Ping(x: Int, pong: ActorRef[Pong], report: ActorRef[Pong])
case class Pong(x: Int, ping: ActorRef[Ping], report: ActorRef[Pong])
case class Ping(x: Int, pong: ActorRef[Pong], report: ActorRef[Pong]) def behavior(pairs: Int, pings: Int, count: Int, executor: String) =
case class Pong(x: Int, ping: ActorRef[Ping], report: ActorRef[Pong]) StepWise[Pong] { (ctx, startWith)
startWith {
def behavior(pairs: Int, pings: Int, count: Int, executor: String) = val pinger = immutable[Ping] { (ctx, msg)
StepWise[Pong] { (ctx, startWith) if (msg.x == 0) {
startWith { msg.report ! Pong(0, ctx.self, msg.report)
val pinger = immutable[Ping] { (ctx, msg)
if (msg.x == 0) {
msg.report ! Pong(0, ctx.self, msg.report)
same
} else {
msg.pong ! Pong(msg.x - 1, ctx.self, msg.report)
same
}
} // FIXME .withDispatcher(executor)
val ponger = immutable[Pong] { (ctx, msg)
msg.ping ! Ping(msg.x, ctx.self, msg.report)
same same
} // FIXME .withDispatcher(executor) } else {
msg.pong ! Pong(msg.x - 1, ctx.self, msg.report)
same
}
} // FIXME .withDispatcher(executor)
val actors = val ponger = immutable[Pong] { (ctx, msg)
for (i 1 to pairs) msg.ping ! Ping(msg.x, ctx.self, msg.report)
yield (ctx.spawn(pinger, s"pinger-$i"), ctx.spawn(ponger, s"ponger-$i")) same
} // FIXME .withDispatcher(executor)
val start = Deadline.now val actors =
for (i 1 to pairs)
yield (ctx.spawn(pinger, s"pinger-$i"), ctx.spawn(ponger, s"ponger-$i"))
for { val start = Deadline.now
(ping, pong) actors
_ 1 to pings
} ping ! Ping(count, pong, ctx.self)
start for {
}.expectMultipleMessages(10.seconds, pairs * pings) { (msgs, start) (ping, pong) actors
val stop = Deadline.now _ 1 to pings
} ping ! Ping(count, pong, ctx.self)
val rate = 2L * count * pairs * pings / (stop - start).toMillis start
info(s"messaging rate was $rate/ms") }.expectMultipleMessages(10.seconds, pairs * pings) { (msgs, start)
} val stop = Deadline.now
val rate = 2L * count * pairs * pings / (stop - start).toMillis
info(s"messaging rate was $rate/ms")
} }
val iterations = system.settings.config.getInt("akka.actor.typed.PerformanceSpec.iterations")
trait CommonTests {
implicit def system: ActorSystem[TypedSpec.Command]
def `01 when warming up`(): Unit = sync(runTest("01")(behavior(1, 1, iterations, "dispatcher-1")))
def `02 when using a single message on a single thread`(): Unit = sync(runTest("02")(behavior(1, 1, iterations, "dispatcher-1")))
def `03 when using a 10 messages on a single thread`(): Unit = sync(runTest("03")(behavior(1, 10, iterations, "dispatcher-1")))
def `04 when using a single message on two threads`(): Unit = sync(runTest("04")(behavior(1, 1, iterations, "dispatcher-2")))
def `05 when using a 10 messages on two threads`(): Unit = sync(runTest("05")(behavior(1, 10, iterations, "dispatcher-2")))
def `06 when using 4 pairs with a single message`(): Unit = sync(runTest("06")(behavior(4, 1, iterations, "dispatcher-8")))
def `07 when using 4 pairs with 10 messages`(): Unit = sync(runTest("07")(behavior(4, 10, iterations, "dispatcher-8")))
def `08 when using 8 pairs with a single message`(): Unit = sync(runTest("08")(behavior(8, 1, iterations, "dispatcher-8")))
def `09 when using 8 pairs with 10 messages`(): Unit = sync(runTest("09")(behavior(8, 10, iterations, "dispatcher-8")))
} }
object `must be fast with ActorSystemAdapter` extends CommonTests with AdaptedSystem val iterations = system.settings.config.getInt("akka.actor.typed.PerformanceSpec.iterations")
"An immutable behaviour" must {
"when warming up" in {
sync(runTest("01")(behavior(1, 1, iterations, "dispatcher-1")))
}
"when using a single message on a single thread" in {
sync(runTest("02")(behavior(1, 1, iterations, "dispatcher-1")))
}
"when using a 10 messages on a single thread" in {
sync(runTest("03")(behavior(1, 10, iterations, "dispatcher-1")))
}
"when using a single message on two threads" in {
sync(runTest("04")(behavior(1, 1, iterations, "dispatcher-2")))
}
"when using a 10 messages on two threads" in {
sync(runTest("05")(behavior(1, 10, iterations, "dispatcher-2")))
}
"when using 4 pairs with a single message" in {
sync(runTest("06")(behavior(4, 1, iterations, "dispatcher-8")))
}
"when using 4 pairs with 10 messages" in {
sync(runTest("07")(behavior(4, 10, iterations, "dispatcher-8")))
}
"when using 8 pairs with a single message" in {
sync(runTest("08")(behavior(8, 1, iterations, "dispatcher-8")))
}
"when using 8 pairs with 10 messages" in {
sync(runTest("09")(behavior(8, 10, iterations, "dispatcher-8")))
}
} }
} }

View file

@ -7,13 +7,13 @@ class PropsSpec extends TypedSpecSetup {
val dispatcherFirst = DispatcherDefault(DispatcherFromConfig("pool")) val dispatcherFirst = DispatcherDefault(DispatcherFromConfig("pool"))
object `A Props` { "A Props" must {
def `must get first dispatcher`(): Unit = { "get first dispatcher" in {
dispatcherFirst.firstOrElse[DispatcherSelector](null) should ===(dispatcherFirst) dispatcherFirst.firstOrElse[DispatcherSelector](null) should ===(dispatcherFirst)
} }
def `must yield all configs of some type`(): Unit = { "yield all configs of some type" in {
dispatcherFirst.allOf[DispatcherSelector] should ===(DispatcherSelector.default() :: DispatcherSelector.fromConfig("pool") :: Nil) dispatcherFirst.allOf[DispatcherSelector] should ===(DispatcherSelector.default() :: DispatcherSelector.fromConfig("pool") :: Nil)
} }
} }

View file

@ -10,8 +10,7 @@ import akka.typed.testkit.{ EffectfulActorContext, Inbox, TestKitSettings }
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import akka.typed.testkit.scaladsl._ import akka.typed.testkit.scaladsl._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) object RestarterSpec {
class RestarterSpec extends TypedSpec {
sealed trait Command sealed trait Command
case object Ping extends Command case object Ping extends Command
@ -63,14 +62,17 @@ class RestarterSpec extends TypedSpec {
same same
} }
} }
}
trait StubbedTests { class RestarterSpec extends TypedSpec {
def system: ActorSystem[TypedSpec.Command]
def mkCtx(behv: Behavior[Command]): EffectfulActorContext[Command] = import RestarterSpec._
new EffectfulActorContext("ctx", behv, 1000, system)
def `must receive message`(): Unit = { def mkCtx(behv: Behavior[Command]): EffectfulActorContext[Command] =
new EffectfulActorContext("ctx", behv, 1000, system)
"A restarter" must {
"receive message" in {
val inbox = Inbox[Event]("evt") val inbox = Inbox[Event]("evt")
val behv = supervise(target(inbox.ref)).onFailure[Throwable](SupervisorStrategy.restart) val behv = supervise(target(inbox.ref)).onFailure[Throwable](SupervisorStrategy.restart)
val ctx = mkCtx(behv) val ctx = mkCtx(behv)
@ -78,7 +80,7 @@ class RestarterSpec extends TypedSpec {
inbox.receiveMsg() should ===(Pong) inbox.receiveMsg() should ===(Pong)
} }
def `must stop when no supervise`(): Unit = { "stop when no supervise" in {
val inbox = Inbox[Event]("evt") val inbox = Inbox[Event]("evt")
val behv = target(inbox.ref) val behv = target(inbox.ref)
val ctx = mkCtx(behv) val ctx = mkCtx(behv)
@ -88,7 +90,7 @@ class RestarterSpec extends TypedSpec {
inbox.receiveMsg() should ===(GotSignal(PostStop)) inbox.receiveMsg() should ===(GotSignal(PostStop))
} }
def `must stop when unhandled exception`(): Unit = { "stop when unhandled exception" in {
val inbox = Inbox[Event]("evt") val inbox = Inbox[Event]("evt")
val behv = supervise(target(inbox.ref)).onFailure[Exc1](SupervisorStrategy.restart) val behv = supervise(target(inbox.ref)).onFailure[Exc1](SupervisorStrategy.restart)
val ctx = mkCtx(behv) val ctx = mkCtx(behv)
@ -98,7 +100,7 @@ class RestarterSpec extends TypedSpec {
inbox.receiveMsg() should ===(GotSignal(PostStop)) inbox.receiveMsg() should ===(GotSignal(PostStop))
} }
def `must restart when handled exception`(): Unit = { "restart when handled exception" in {
val inbox = Inbox[Event]("evt") val inbox = Inbox[Event]("evt")
val behv = supervise(target(inbox.ref)).onFailure[Exc1](SupervisorStrategy.restart) val behv = supervise(target(inbox.ref)).onFailure[Exc1](SupervisorStrategy.restart)
val ctx = mkCtx(behv) val ctx = mkCtx(behv)
@ -112,7 +114,7 @@ class RestarterSpec extends TypedSpec {
inbox.receiveMsg() should ===(State(0, Map.empty)) inbox.receiveMsg() should ===(State(0, Map.empty))
} }
def `must resume when handled exception`(): Unit = { "resume when handled exception" in {
val inbox = Inbox[Event]("evt") val inbox = Inbox[Event]("evt")
val behv = supervise(target(inbox.ref)).onFailure[Exc1](SupervisorStrategy.resume) val behv = supervise(target(inbox.ref)).onFailure[Exc1](SupervisorStrategy.resume)
val ctx = mkCtx(behv) val ctx = mkCtx(behv)
@ -125,7 +127,7 @@ class RestarterSpec extends TypedSpec {
inbox.receiveMsg() should ===(State(1, Map.empty)) inbox.receiveMsg() should ===(State(1, Map.empty))
} }
def `must support nesting to handle different exceptions`(): Unit = { "support nesting to handle different exceptions" in {
val inbox = Inbox[Event]("evt") val inbox = Inbox[Event]("evt")
val behv = val behv =
supervise( supervise(
@ -156,7 +158,7 @@ class RestarterSpec extends TypedSpec {
inbox.receiveMsg() should ===(GotSignal(PostStop)) inbox.receiveMsg() should ===(GotSignal(PostStop))
} }
def `must not catch fatal error`(): Unit = { "not catch fatal error" in {
val inbox = Inbox[Event]("evt") val inbox = Inbox[Event]("evt")
val behv = supervise(target(inbox.ref)).onFailure[Throwable](SupervisorStrategy.restart) val behv = supervise(target(inbox.ref)).onFailure[Throwable](SupervisorStrategy.restart)
val ctx = mkCtx(behv) val ctx = mkCtx(behv)
@ -166,7 +168,7 @@ class RestarterSpec extends TypedSpec {
inbox.receiveAll() should ===(Nil) inbox.receiveAll() should ===(Nil)
} }
def `must stop after restart retries limit`(): Unit = { "stop after restart retries limit" in {
val inbox = Inbox[Event]("evt") val inbox = Inbox[Event]("evt")
val strategy = SupervisorStrategy.restartWithLimit(maxNrOfRetries = 2, withinTimeRange = 1.minute) val strategy = SupervisorStrategy.restartWithLimit(maxNrOfRetries = 2, withinTimeRange = 1.minute)
val behv = supervise(target(inbox.ref)).onFailure[Exc1](strategy) val behv = supervise(target(inbox.ref)).onFailure[Exc1](strategy)
@ -181,7 +183,7 @@ class RestarterSpec extends TypedSpec {
inbox.receiveMsg() should ===(GotSignal(PostStop)) inbox.receiveMsg() should ===(GotSignal(PostStop))
} }
def `must reset retry limit after withinTimeRange`(): Unit = { "reset retry limit after withinTimeRange" in {
val inbox = Inbox[Event]("evt") val inbox = Inbox[Event]("evt")
val withinTimeRange = 2.seconds val withinTimeRange = 2.seconds
val strategy = SupervisorStrategy.restartWithLimit(maxNrOfRetries = 2, withinTimeRange) val strategy = SupervisorStrategy.restartWithLimit(maxNrOfRetries = 2, withinTimeRange)
@ -203,7 +205,7 @@ class RestarterSpec extends TypedSpec {
inbox.receiveMsg() should ===(GotSignal(PostStop)) inbox.receiveMsg() should ===(GotSignal(PostStop))
} }
def `must stop at first exception when restart retries limit is 0`(): Unit = { "stop at first exception when restart retries limit is 0" in {
val inbox = Inbox[Event]("evt") val inbox = Inbox[Event]("evt")
val strategy = SupervisorStrategy.restartWithLimit(maxNrOfRetries = 0, withinTimeRange = 1.minute) val strategy = SupervisorStrategy.restartWithLimit(maxNrOfRetries = 0, withinTimeRange = 1.minute)
val behv = supervise(target(inbox.ref)).onFailure[Exc1](strategy) val behv = supervise(target(inbox.ref)).onFailure[Exc1](strategy)
@ -214,7 +216,7 @@ class RestarterSpec extends TypedSpec {
inbox.receiveMsg() should ===(GotSignal(PostStop)) inbox.receiveMsg() should ===(GotSignal(PostStop))
} }
def `must create underlying deferred behavior immediately`(): Unit = { "create underlying deferred behavior immediately" in {
val inbox = Inbox[Event]("evt") val inbox = Inbox[Event]("evt")
val behv = supervise(deferred[Command] { _ val behv = supervise(deferred[Command] { _
inbox.ref ! Started inbox.ref ! Started
@ -225,13 +227,16 @@ class RestarterSpec extends TypedSpec {
inbox.receiveMsg() should ===(Started) inbox.receiveMsg() should ===(Started)
} }
} }
}
trait RealTests extends StartSupport { class RestarterStubbedSpec extends TypedSpec with StartSupport {
import akka.actor.typed.scaladsl.adapter._
implicit def system: ActorSystem[TypedSpec.Command]
implicit val testSettings = TestKitSettings(system)
def `must receive message`(): Unit = { import RestarterSpec._
implicit val testSettings = TestKitSettings(system)
"A restart (subbed)" must {
"receive message" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val behv = supervise(target(probe.ref)).onFailure[Throwable](SupervisorStrategy.restart) val behv = supervise(target(probe.ref)).onFailure[Throwable](SupervisorStrategy.restart)
val ref = start(behv) val ref = start(behv)
@ -239,7 +244,7 @@ class RestarterSpec extends TypedSpec {
probe.expectMsg(Pong) probe.expectMsg(Pong)
} }
def `must stop when no supervise`(): Unit = { "stop when no supervise" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val behv = target(probe.ref) val behv = target(probe.ref)
val ref = start(behv) val ref = start(behv)
@ -248,7 +253,7 @@ class RestarterSpec extends TypedSpec {
probe.expectMsg(GotSignal(PostStop)) probe.expectMsg(GotSignal(PostStop))
} }
def `must stop when unhandled exception`(): Unit = { "stop when unhandled exception" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val behv = supervise(target(probe.ref)).onFailure[Exc1](SupervisorStrategy.restart) val behv = supervise(target(probe.ref)).onFailure[Exc1](SupervisorStrategy.restart)
val ref = start(behv) val ref = start(behv)
@ -256,7 +261,7 @@ class RestarterSpec extends TypedSpec {
probe.expectMsg(GotSignal(PostStop)) probe.expectMsg(GotSignal(PostStop))
} }
def `must restart when handled exception`(): Unit = { "restart when handled exception" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val behv = supervise(target(probe.ref)).onFailure[Exc1](SupervisorStrategy.restart) val behv = supervise(target(probe.ref)).onFailure[Exc1](SupervisorStrategy.restart)
val ref = start(behv) val ref = start(behv)
@ -270,7 +275,7 @@ class RestarterSpec extends TypedSpec {
probe.expectMsg(State(0, Map.empty)) probe.expectMsg(State(0, Map.empty))
} }
def `must NOT stop children when restarting`(): Unit = { "NOT stop children when restarting" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val behv = supervise(target(probe.ref)).onFailure[Exc1](SupervisorStrategy.restart) val behv = supervise(target(probe.ref)).onFailure[Exc1](SupervisorStrategy.restart)
val ref = start(behv) val ref = start(behv)
@ -289,7 +294,7 @@ class RestarterSpec extends TypedSpec {
probe.expectMsgType[State].children.keySet should contain(childName) probe.expectMsgType[State].children.keySet should contain(childName)
} }
def `must resume when handled exception`(): Unit = { "resume when handled exception" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val behv = supervise(target(probe.ref)).onFailure[Exc1](SupervisorStrategy.resume) val behv = supervise(target(probe.ref)).onFailure[Exc1](SupervisorStrategy.resume)
val ref = start(behv) val ref = start(behv)
@ -302,7 +307,7 @@ class RestarterSpec extends TypedSpec {
probe.expectMsg(State(1, Map.empty)) probe.expectMsg(State(1, Map.empty))
} }
def `must support nesting to handle different exceptions`(): Unit = { "support nesting to handle different exceptions" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val behv = supervise( val behv = supervise(
supervise(target(probe.ref)).onFailure[Exc2](SupervisorStrategy.resume) supervise(target(probe.ref)).onFailure[Exc2](SupervisorStrategy.resume)
@ -328,7 +333,7 @@ class RestarterSpec extends TypedSpec {
probe.expectMsg(GotSignal(PostStop)) probe.expectMsg(GotSignal(PostStop))
} }
def `must restart after exponential backoff`(): Unit = { "restart after exponential backoff" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val startedProbe = TestProbe[Event]("started") val startedProbe = TestProbe[Event]("started")
val minBackoff = 1.seconds val minBackoff = 1.seconds
@ -365,7 +370,7 @@ class RestarterSpec extends TypedSpec {
probe.expectMsg(State(0, Map.empty)) probe.expectMsg(State(0, Map.empty))
} }
def `must reset exponential backoff count after reset timeout`(): Unit = { "reset exponential backoff count after reset timeout" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val minBackoff = 1.seconds val minBackoff = 1.seconds
val strategy = SupervisorStrategy.restartWithBackoff(minBackoff, 10.seconds, 0.0) val strategy = SupervisorStrategy.restartWithBackoff(minBackoff, 10.seconds, 0.0)
@ -395,7 +400,7 @@ class RestarterSpec extends TypedSpec {
probe.expectMsg(State(0, Map.empty)) probe.expectMsg(State(0, Map.empty))
} }
def `must create underlying deferred behavior immediately`(): Unit = { "create underlying deferred behavior immediately" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val behv = supervise(deferred[Command] { _ val behv = supervise(deferred[Command] { _
probe.ref ! Started probe.ref ! Started
@ -407,7 +412,7 @@ class RestarterSpec extends TypedSpec {
probe.expectMsg(Started) probe.expectMsg(Started)
} }
def `must stop when exception from MutableBehavior constructor`(): Unit = { "stop when exception from MutableBehavior constructor" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val behv = supervise(mutable[Command](_ new FailingConstructor(probe.ref))).onFailure[Exception](SupervisorStrategy.restart) val behv = supervise(mutable[Command](_ new FailingConstructor(probe.ref))).onFailure[Exception](SupervisorStrategy.restart)
val ref = start(behv) val ref = start(behv)
@ -415,10 +420,5 @@ class RestarterSpec extends TypedSpec {
ref ! Ping ref ! Ping
probe.expectNoMsg(100.millis) probe.expectNoMsg(100.millis)
} }
} }
object `A restarter (stubbed, adapted)` extends StubbedTests with AdaptedSystem
object `A restarter (real, adapted)` extends RealTests with AdaptedSystem
} }

View file

@ -15,10 +15,10 @@ import akka.actor.typed.scaladsl.TimerScheduler
import akka.typed.testkit.TestKitSettings import akka.typed.testkit.TestKitSettings
import akka.typed.testkit.scaladsl._ import akka.typed.testkit.scaladsl._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class TimerSpec extends TypedSpec(
class TimerSpec extends TypedSpec(""" """
#akka.loglevel = DEBUG #akka.loglevel = DEBUG
""") { """) with StartSupport {
sealed trait Command sealed trait Command
case class Tick(n: Int) extends Command case class Tick(n: Int) extends Command
@ -36,52 +36,51 @@ class TimerSpec extends TypedSpec("""
class Exc extends RuntimeException("simulated exc") with NoStackTrace class Exc extends RuntimeException("simulated exc") with NoStackTrace
trait RealTests extends StartSupport { implicit val testSettings = TestKitSettings(system)
implicit def system: ActorSystem[TypedSpec.Command]
implicit val testSettings = TestKitSettings(system)
val interval = 1.second val interval = 1.second
val dilatedInterval = interval.dilated val dilatedInterval = interval.dilated
def target(monitor: ActorRef[Event], timer: TimerScheduler[Command], bumpCount: Int): Behavior[Command] = { def target(monitor: ActorRef[Event], timer: TimerScheduler[Command], bumpCount: Int): Behavior[Command] = {
def bump(): Behavior[Command] = { def bump(): Behavior[Command] = {
val nextCount = bumpCount + 1 val nextCount = bumpCount + 1
timer.startPeriodicTimer("T", Tick(nextCount), interval) timer.startPeriodicTimer("T", Tick(nextCount), interval)
target(monitor, timer, nextCount) target(monitor, timer, nextCount)
}
Actor.immutable[Command] { (ctx, cmd)
cmd match {
case Tick(n)
monitor ! Tock(n)
Actor.same
case Bump
bump()
case SlowThenBump(latch)
latch.await(10, TimeUnit.SECONDS)
bump()
case End
Actor.stopped
case Cancel
timer.cancel("T")
Actor.same
case Throw(e)
throw e
case SlowThenThrow(latch, e)
latch.await(10, TimeUnit.SECONDS)
throw e
}
} onSignal {
case (ctx, PreRestart)
monitor ! GotPreRestart(timer.isTimerActive("T"))
Actor.same
case (ctx, PostStop)
monitor ! GotPostStop(timer.isTimerActive("T"))
Actor.same
}
} }
def `01 must schedule non-repeated ticks`(): Unit = { Actor.immutable[Command] { (ctx, cmd)
cmd match {
case Tick(n)
monitor ! Tock(n)
Actor.same
case Bump
bump()
case SlowThenBump(latch)
latch.await(10, TimeUnit.SECONDS)
bump()
case End
Actor.stopped
case Cancel
timer.cancel("T")
Actor.same
case Throw(e)
throw e
case SlowThenThrow(latch, e)
latch.await(10, TimeUnit.SECONDS)
throw e
}
} onSignal {
case (ctx, PreRestart)
monitor ! GotPreRestart(timer.isTimerActive("T"))
Actor.same
case (ctx, PostStop)
monitor ! GotPostStop(timer.isTimerActive("T"))
Actor.same
}
}
"A timer" must {
"schedule non-repeated ticks" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val behv = Actor.withTimers[Command] { timer val behv = Actor.withTimers[Command] { timer
timer.startSingleTimer("T", Tick(1), 10.millis) timer.startSingleTimer("T", Tick(1), 10.millis)
@ -96,7 +95,7 @@ class TimerSpec extends TypedSpec("""
probe.expectMsg(GotPostStop(false)) probe.expectMsg(GotPostStop(false))
} }
def `02 must schedule repeated ticks`(): Unit = { "schedule repeated ticks" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val behv = Actor.withTimers[Command] { timer val behv = Actor.withTimers[Command] { timer
timer.startPeriodicTimer("T", Tick(1), interval) timer.startPeriodicTimer("T", Tick(1), interval)
@ -114,7 +113,7 @@ class TimerSpec extends TypedSpec("""
probe.expectMsg(GotPostStop(false)) probe.expectMsg(GotPostStop(false))
} }
def `03 must replace timer`(): Unit = { "replace timer" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val behv = Actor.withTimers[Command] { timer val behv = Actor.withTimers[Command] { timer
timer.startPeriodicTimer("T", Tick(1), interval) timer.startPeriodicTimer("T", Tick(1), interval)
@ -134,7 +133,7 @@ class TimerSpec extends TypedSpec("""
probe.expectMsg(GotPostStop(false)) probe.expectMsg(GotPostStop(false))
} }
def `04 must cancel timer`(): Unit = { "cancel timer" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val behv = Actor.withTimers[Command] { timer val behv = Actor.withTimers[Command] { timer
timer.startPeriodicTimer("T", Tick(1), interval) timer.startPeriodicTimer("T", Tick(1), interval)
@ -150,7 +149,7 @@ class TimerSpec extends TypedSpec("""
probe.expectMsg(GotPostStop(false)) probe.expectMsg(GotPostStop(false))
} }
def `05 must discard timers from old incarnation after restart, alt 1`(): Unit = { "discard timers from old incarnation after restart, alt 1" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val startCounter = new AtomicInteger(0) val startCounter = new AtomicInteger(0)
val behv = Actor.supervise(Actor.withTimers[Command] { timer val behv = Actor.supervise(Actor.withTimers[Command] { timer
@ -174,7 +173,7 @@ class TimerSpec extends TypedSpec("""
probe.expectMsg(GotPostStop(false)) probe.expectMsg(GotPostStop(false))
} }
def `06 must discard timers from old incarnation after restart, alt 2`(): Unit = { "discard timers from old incarnation after restart, alt 2" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val behv = Actor.supervise(Actor.withTimers[Command] { timer val behv = Actor.supervise(Actor.withTimers[Command] { timer
timer.startPeriodicTimer("T", Tick(1), interval) timer.startPeriodicTimer("T", Tick(1), interval)
@ -200,7 +199,7 @@ class TimerSpec extends TypedSpec("""
probe.expectMsg(GotPostStop(false)) probe.expectMsg(GotPostStop(false))
} }
def `07 must cancel timers when stopped from exception`(): Unit = { "cancel timers when stopped from exception" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val behv = Actor.withTimers[Command] { timer val behv = Actor.withTimers[Command] { timer
timer.startPeriodicTimer("T", Tick(1), interval) timer.startPeriodicTimer("T", Tick(1), interval)
@ -211,7 +210,7 @@ class TimerSpec extends TypedSpec("""
probe.expectMsg(GotPostStop(false)) probe.expectMsg(GotPostStop(false))
} }
def `08 must cancel timers when stopped voluntarily`(): Unit = { "cancel timers when stopped voluntarily" in {
val probe = TestProbe[Event]("evt") val probe = TestProbe[Event]("evt")
val behv = Actor.withTimers[Command] { timer val behv = Actor.withTimers[Command] { timer
timer.startPeriodicTimer("T", Tick(1), interval) timer.startPeriodicTimer("T", Tick(1), interval)
@ -222,6 +221,4 @@ class TimerSpec extends TypedSpec("""
probe.expectMsg(GotPostStop(false)) probe.expectMsg(GotPostStop(false))
} }
} }
object `A Restarter (real, adapted)` extends RealTests with AdaptedSystem
} }

View file

@ -3,9 +3,7 @@
*/ */
package akka.actor.typed package akka.actor.typed
import org.scalatest.refspec.RefSpec import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec }
import org.scalatest.Matchers
import org.scalatest.BeforeAndAfterAll
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import scala.concurrent.Await import scala.concurrent.Await
@ -36,14 +34,29 @@ import org.scalatest.time.Span
/** /**
* Helper class for writing tests for typed Actors with ScalaTest. * Helper class for writing tests for typed Actors with ScalaTest.
*/ */
@RunWith(classOf[org.scalatest.junit.JUnitRunner]) class TypedSpecSetup extends WordSpec with Matchers with BeforeAndAfterAll with ScalaFutures with TypeCheckedTripleEquals {
class TypedSpecSetup extends RefSpec with Matchers with BeforeAndAfterAll with ScalaFutures with TypeCheckedTripleEquals {
// TODO hook this up with config like in akka-testkit/AkkaSpec? // TODO hook this up with config like in akka-testkit/AkkaSpec?
implicit val akkaPatience = PatienceConfig(3.seconds, Span(100, org.scalatest.time.Millis)) implicit val akkaPatience = PatienceConfig(3.seconds, Span(100, org.scalatest.time.Millis))
} }
trait StartSupport {
implicit def system: ActorSystem[TypedSpec.Command]
private implicit def timeout: Timeout = Timeout(1.minute)
private implicit def scheduler = system.scheduler
private val nameCounter = Iterator.from(0)
def nextName(prefix: String = "a"): String = s"$prefix-${nameCounter.next()}"
def start[T](behv: Behavior[T]): ActorRef[T] = {
import akka.actor.typed.scaladsl.AskPattern._
import akka.typed.testkit.scaladsl._
implicit val testSettings = TestKitSettings(system)
Await.result(system ? TypedSpec.Create(behv, nextName()), 3.seconds.dilated)
}
}
/** /**
* Helper class for writing tests against both ActorSystemImpl and ActorSystemAdapter. * Helper class for writing tests against both ActorSystemImpl and ActorSystemAdapter.
*/ */
@ -59,25 +72,11 @@ abstract class TypedSpec(val config: Config) extends TypedSpecSetup {
// extension point // extension point
def setTimeout: Timeout = Timeout(1.minute) def setTimeout: Timeout = Timeout(1.minute)
lazy val system: ActorSystem[TypedSpec.Command] = { implicit lazy val system: ActorSystem[TypedSpec.Command] = {
val sys = ActorSystem(guardian(), AkkaSpec.getCallerName(classOf[TypedSpec]), config = Some(config withFallback AkkaSpec.testConf)) val sys = ActorSystem(guardian(), AkkaSpec.getCallerName(classOf[TypedSpec]), config = Some(config withFallback AkkaSpec.testConf))
sys sys
} }
trait StartSupport {
def system: ActorSystem[TypedSpec.Command]
private val nameCounter = Iterator.from(0)
def nextName(prefix: String = "a"): String = s"$prefix-${nameCounter.next()}"
def start[T](behv: Behavior[T]): ActorRef[T] = {
import akka.actor.typed.scaladsl.AskPattern._
import akka.typed.testkit.scaladsl._
implicit val testSettings = TestKitSettings(system)
Await.result(system ? TypedSpec.Create(behv, nextName()), 3.seconds.dilated)
}
}
trait AdaptedSystem { trait AdaptedSystem {
def system: ActorSystem[TypedSpec.Command] = TypedSpec.this.system def system: ActorSystem[TypedSpec.Command] = TypedSpec.this.system
} }
@ -219,21 +218,14 @@ object TypedSpec {
class TypedSpecSpec extends TypedSpec { class TypedSpecSpec extends TypedSpec {
object `A TypedSpec` { "A TypedSpec" must {
"must report failures" in {
trait CommonTests { a[TypedSpec.SimulatedException] must be thrownBy {
implicit def system: ActorSystem[TypedSpec.Command] sync(runTest("failure")(StepWise[String]((ctx, startWith)
startWith {
def `must report failures`(): Unit = { throw new TypedSpec.SimulatedException("expected")
a[TypedSpec.SimulatedException] must be thrownBy { })))
sync(runTest("failure")(StepWise[String]((ctx, startWith)
startWith {
throw new TypedSpec.SimulatedException("expected")
})))
}
} }
} }
object `when using the adapted implementation` extends CommonTests with AdaptedSystem
} }
} }

View file

@ -7,15 +7,12 @@ import scala.concurrent._
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor.typed.scaladsl.Actor._ import akka.actor.typed.scaladsl.Actor._
import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.AskPattern._
import akka.testkit._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class WatchSpec extends TypedSpec { class WatchSpec extends TypedSpec {
trait Tests { "Actor monitoring" must {
implicit def system: ActorSystem[TypedSpec.Command]
def `get notified of actor termination`(): Unit = { "get notified of actor termination" in {
case object Stop case object Stop
case class StartWatching(watchee: ActorRef[Stop.type]) case class StartWatching(watchee: ActorRef[Stop.type])
@ -37,7 +34,7 @@ class WatchSpec extends TypedSpec {
Await.result(receivedTerminationSignal.future, 3.seconds /*.dilated*/ ) Await.result(receivedTerminationSignal.future, 3.seconds /*.dilated*/ )
} }
def `get notified of actor termination with a custom message`(): Unit = { "get notified of actor termination with a custom message" in {
case object Stop case object Stop
sealed trait Message sealed trait Message
@ -65,6 +62,4 @@ class WatchSpec extends TypedSpec {
Await.result(receivedTerminationSignal.future, 3.seconds /*.dilated*/ ) Await.result(receivedTerminationSignal.future, 3.seconds /*.dilated*/ )
} }
} }
object `Actor monitoring (adapted)` extends Tests with AdaptedSystem
} }

View file

@ -4,13 +4,10 @@
package akka.actor.typed package akka.actor.typed
package internal package internal
import akka.Done
import akka.actor.InvalidMessageException import akka.actor.InvalidMessageException
import akka.actor.typed.scaladsl.Actor import akka.actor.typed.scaladsl.Actor
import akka.actor.typed.scaladsl.Actor._ import akka.actor.typed.scaladsl.Actor._
import akka.typed.testkit.Inbox import akka.typed.testkit.Inbox
import akka.util.Timeout
import org.junit.runner.RunWith
import org.scalactic.ConversionCheckedTripleEquals import org.scalactic.ConversionCheckedTripleEquals
import org.scalatest._ import org.scalatest._
import org.scalatest.concurrent.{ Eventually, ScalaFutures } import org.scalatest.concurrent.{ Eventually, ScalaFutures }
@ -19,34 +16,34 @@ import scala.concurrent.duration._
import scala.concurrent.{ Future, Promise } import scala.concurrent.{ Future, Promise }
import scala.util.control.NonFatal import scala.util.control.NonFatal
@RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with ScalaFutures with Eventually with ConversionCheckedTripleEquals {
class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with ScalaFutures with Eventually with ConversionCheckedTripleEquals {
override implicit val patienceConfig = PatienceConfig(1.second) override implicit val patienceConfig = PatienceConfig(1.second)
def system[T](behavior: Behavior[T], name: String) = ActorSystem(behavior, name)
def suite = "adapter"
case class Probe(msg: String, replyTo: ActorRef[String]) case class Probe(msg: String, replyTo: ActorRef[String])
trait CommonTests { def withSystem[T](name: String, behavior: Behavior[T], doTerminate: Boolean = true)(block: ActorSystem[T] Unit): Terminated = {
def system[T](behavior: Behavior[T], name: String): ActorSystem[T] val sys = system(behavior, s"$suite-$name")
def suite: String try {
block(sys)
def withSystem[T](name: String, behavior: Behavior[T], doTerminate: Boolean = true)(block: ActorSystem[T] Unit): Terminated = { if (doTerminate) sys.terminate().futureValue else sys.whenTerminated.futureValue
val sys = system(behavior, s"$suite-$name") } catch {
try { case NonFatal(ex)
block(sys) sys.terminate()
if (doTerminate) sys.terminate().futureValue else sys.whenTerminated.futureValue throw ex
} catch {
case NonFatal(ex)
sys.terminate()
throw ex
}
} }
}
def `must start the guardian actor and terminate when it terminates`(): Unit = { "An ActorSystem" must {
"must start the guardian actor and terminate when it terminates" in {
val t = withSystem("a", immutable[Probe] { case (_, p) p.replyTo ! p.msg; stopped }, doTerminate = false) { sys val t = withSystem("a", immutable[Probe] { case (_, p) p.replyTo ! p.msg; stopped }, doTerminate = false) { sys
val inbox = Inbox[String]("a") val inbox = Inbox[String]("a")
sys ! Probe("hello", inbox.ref) sys ! Probe("hello", inbox.ref)
eventually { inbox.hasMessages should ===(true) } eventually {
inbox.hasMessages should ===(true)
}
inbox.receiveAll() should ===("hello" :: Nil) inbox.receiveAll() should ===("hello" :: Nil)
} }
val p = t.ref.path val p = t.ref.path
@ -54,7 +51,7 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca
p.address.system should ===(suite + "-a") p.address.system should ===(suite + "-a")
} }
def `must terminate the guardian actor`(): Unit = { "must terminate the guardian actor" in {
val inbox = Inbox[String]("terminate") val inbox = Inbox[String]("terminate")
val sys = system( val sys = system(
immutable[Probe] { immutable[Probe] {
@ -69,21 +66,25 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca
inbox.receiveAll() should ===("done" :: Nil) inbox.receiveAll() should ===("done" :: Nil)
} }
def `must log to the event stream`(): Unit = pending "must log to the event stream" in {
pending
}
def `must have a name`(): Unit = "must have a name" in {
withSystem("name", Actor.empty[String]) { sys withSystem("name", Actor.empty[String]) { sys
sys.name should ===(suite + "-name") sys.name should ===(suite + "-name")
} }
}
def `must report its uptime`(): Unit = "must report its uptime" in {
withSystem("uptime", Actor.empty[String]) { sys withSystem("uptime", Actor.empty[String]) { sys
sys.uptime should be < 1L sys.uptime should be < 1L
Thread.sleep(1000) Thread.sleep(1000)
sys.uptime should be >= 1L sys.uptime should be >= 1L
} }
}
def `must have a working thread factory`(): Unit = "must have a working thread factory" in {
withSystem("thread", Actor.empty[String]) { sys withSystem("thread", Actor.empty[String]) { sys
val p = Promise[Int] val p = Promise[Int]
sys.threadFactory.newThread(new Runnable { sys.threadFactory.newThread(new Runnable {
@ -91,14 +92,16 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca
}).start() }).start()
p.future.futureValue should ===(42) p.future.futureValue should ===(42)
} }
}
def `must be able to run Futures`(): Unit = "must be able to run Futures" in {
withSystem("futures", Actor.empty[String]) { sys withSystem("futures", Actor.empty[String]) { sys
val f = Future(42)(sys.executionContext) val f = Future(42)(sys.executionContext)
f.futureValue should ===(42) f.futureValue should ===(42)
} }
}
def `must not allow null messages`(): Unit = { "must not allow null messages" in {
withSystem("null-messages", Actor.empty[String]) { sys withSystem("null-messages", Actor.empty[String]) { sys
intercept[InvalidMessageException] { intercept[InvalidMessageException] {
sys ! null sys ! null
@ -106,9 +109,4 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca
} }
} }
} }
object `An ActorSystemAdapter` extends CommonTests {
def system[T](behavior: Behavior[T], name: String) = ActorSystem(behavior, name)
def suite = "adapter"
}
} }

View file

@ -15,7 +15,7 @@ import org.scalatest.concurrent.Eventually
import scala.concurrent.duration._ import scala.concurrent.duration._
class LocalReceptionistSpec extends TypedSpec with Eventually { class LocalReceptionistSpec extends TypedSpec with Eventually with StartSupport {
trait ServiceA trait ServiceA
val ServiceKeyA = Receptionist.ServiceKey[ServiceA]("service-a") val ServiceKeyA = Receptionist.ServiceKey[ServiceA]("service-a")
@ -35,15 +35,15 @@ class LocalReceptionistSpec extends TypedSpec with Eventually {
import akka.actor.typed.internal.receptionist.ReceptionistImpl.{ localOnlyBehavior behavior } import akka.actor.typed.internal.receptionist.ReceptionistImpl.{ localOnlyBehavior behavior }
trait CommonTests extends StartSupport { implicit val testSettings = TestKitSettings(system)
implicit def system: ActorSystem[TypedSpec.Command]
implicit val testSettings = TestKitSettings(system)
abstract class TestSetup { abstract class TestSetup {
val receptionist = start(behavior) val receptionist = start(behavior)
} }
def `must register a service`(): Unit = { "A local receptionist" must {
"must register a service" in {
val ctx = new EffectfulActorContext("register", behavior, 1000, system) val ctx = new EffectfulActorContext("register", behavior, 1000, system)
val a = Inbox[ServiceA]("a") val a = Inbox[ServiceA]("a")
val r = Inbox[Registered[_]]("r") val r = Inbox[Registered[_]]("r")
@ -57,7 +57,7 @@ class LocalReceptionistSpec extends TypedSpec with Eventually {
assertEmpty(a, r, q) assertEmpty(a, r, q)
} }
def `must register two services`(): Unit = { "must register two services" in {
val ctx = new EffectfulActorContext("registertwo", behavior, 1000, system) val ctx = new EffectfulActorContext("registertwo", behavior, 1000, system)
val a = Inbox[ServiceA]("a") val a = Inbox[ServiceA]("a")
val r = Inbox[Registered[_]]("r") val r = Inbox[Registered[_]]("r")
@ -74,7 +74,7 @@ class LocalReceptionistSpec extends TypedSpec with Eventually {
assertEmpty(a, b, r, q) assertEmpty(a, b, r, q)
} }
def `must register two services with the same key`(): Unit = { "must register two services with the same key" in {
val ctx = new EffectfulActorContext("registertwosame", behavior, 1000, system) val ctx = new EffectfulActorContext("registertwosame", behavior, 1000, system)
val a1 = Inbox[ServiceA]("a1") val a1 = Inbox[ServiceA]("a1")
val r = Inbox[Registered[_]]("r") val r = Inbox[Registered[_]]("r")
@ -91,97 +91,102 @@ class LocalReceptionistSpec extends TypedSpec with Eventually {
assertEmpty(a1, a2, r, q) assertEmpty(a1, a2, r, q)
} }
def `must unregister services when they terminate`(): Unit = new TestSetup { "must unregister services when they terminate" in {
val regProbe = TestProbe[Any]("regProbe") new TestSetup {
val regProbe = TestProbe[Any]("regProbe")
val serviceA = start(stoppableBehavior.narrow[ServiceA]) val serviceA = start(stoppableBehavior.narrow[ServiceA])
receptionist ! Register(ServiceKeyA, serviceA, regProbe.ref) receptionist ! Register(ServiceKeyA, serviceA, regProbe.ref)
regProbe.expectMsg(Registered(ServiceKeyA, serviceA)) regProbe.expectMsg(Registered(ServiceKeyA, serviceA))
val serviceB = start(stoppableBehavior.narrow[ServiceB]) val serviceB = start(stoppableBehavior.narrow[ServiceB])
receptionist ! Register(ServiceKeyB, serviceB, regProbe.ref) receptionist ! Register(ServiceKeyB, serviceB, regProbe.ref)
regProbe.expectMsg(Registered(ServiceKeyB, serviceB)) regProbe.expectMsg(Registered(ServiceKeyB, serviceB))
val serviceC = start(stoppableBehavior) val serviceC = start(stoppableBehavior)
receptionist ! Register(ServiceKeyA, serviceC, regProbe.ref) receptionist ! Register(ServiceKeyA, serviceC, regProbe.ref)
receptionist ! Register(ServiceKeyB, serviceC, regProbe.ref) receptionist ! Register(ServiceKeyB, serviceC, regProbe.ref)
regProbe.expectMsg(Registered(ServiceKeyA, serviceC)) regProbe.expectMsg(Registered(ServiceKeyA, serviceC))
regProbe.expectMsg(Registered(ServiceKeyB, serviceC)) regProbe.expectMsg(Registered(ServiceKeyB, serviceC))
receptionist ! Find(ServiceKeyA, regProbe.ref)
regProbe.expectMsg(Listing(ServiceKeyA, Set(serviceA, serviceC)))
receptionist ! Find(ServiceKeyB, regProbe.ref)
regProbe.expectMsg(Listing(ServiceKeyB, Set(serviceB, serviceC)))
serviceC ! Stop
eventually {
receptionist ! Find(ServiceKeyA, regProbe.ref) receptionist ! Find(ServiceKeyA, regProbe.ref)
regProbe.expectMsg(Listing(ServiceKeyA, Set(serviceA))) regProbe.expectMsg(Listing(ServiceKeyA, Set(serviceA, serviceC)))
receptionist ! Find(ServiceKeyB, regProbe.ref) receptionist ! Find(ServiceKeyB, regProbe.ref)
regProbe.expectMsg(Listing(ServiceKeyB, Set(serviceB))) regProbe.expectMsg(Listing(ServiceKeyB, Set(serviceB, serviceC)))
serviceC ! Stop
eventually {
receptionist ! Find(ServiceKeyA, regProbe.ref)
regProbe.expectMsg(Listing(ServiceKeyA, Set(serviceA)))
receptionist ! Find(ServiceKeyB, regProbe.ref)
regProbe.expectMsg(Listing(ServiceKeyB, Set(serviceB)))
}
} }
} }
def `must support subscribing to service changes`(): Unit = new TestSetup { "must support subscribing to service changes" in {
val regProbe = TestProbe[Registered[_]]("regProbe") new TestSetup {
val regProbe = TestProbe[Registered[_]]("regProbe")
val aSubscriber = TestProbe[Listing[ServiceA]]("aUser") val aSubscriber = TestProbe[Listing[ServiceA]]("aUser")
receptionist ! Subscribe(ServiceKeyA, aSubscriber.ref) receptionist ! Subscribe(ServiceKeyA, aSubscriber.ref)
aSubscriber.expectMsg(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]])) aSubscriber.expectMsg(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
val serviceA: ActorRef[ServiceA] = start(stoppableBehavior) val serviceA: ActorRef[ServiceA] = start(stoppableBehavior)
receptionist ! Register(ServiceKeyA, serviceA, regProbe.ref) receptionist ! Register(ServiceKeyA, serviceA, regProbe.ref)
regProbe.expectMsg(Registered(ServiceKeyA, serviceA)) regProbe.expectMsg(Registered(ServiceKeyA, serviceA))
aSubscriber.expectMsg(Listing(ServiceKeyA, Set(serviceA))) aSubscriber.expectMsg(Listing(ServiceKeyA, Set(serviceA)))
val serviceA2: ActorRef[ServiceA] = start(stoppableBehavior) val serviceA2: ActorRef[ServiceA] = start(stoppableBehavior)
receptionist ! Register(ServiceKeyA, serviceA2, regProbe.ref) receptionist ! Register(ServiceKeyA, serviceA2, regProbe.ref)
regProbe.expectMsg(Registered(ServiceKeyA, serviceA2)) regProbe.expectMsg(Registered(ServiceKeyA, serviceA2))
aSubscriber.expectMsg(Listing(ServiceKeyA, Set(serviceA, serviceA2))) aSubscriber.expectMsg(Listing(ServiceKeyA, Set(serviceA, serviceA2)))
serviceA ! Stop serviceA ! Stop
aSubscriber.expectMsg(Listing(ServiceKeyA, Set(serviceA2))) aSubscriber.expectMsg(Listing(ServiceKeyA, Set(serviceA2)))
serviceA2 ! Stop serviceA2 ! Stop
aSubscriber.expectMsg(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]])) aSubscriber.expectMsg(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
}
} }
def `must work with ask`(): Unit = sync(runTest("Receptionist") { "must work with ask" in {
StepWise[Registered[ServiceA]] { (ctx, startWith) sync(runTest("Receptionist") {
val self = ctx.self StepWise[Registered[ServiceA]] { (ctx, startWith)
startWith.withKeepTraces(true) { val self = ctx.self
val r = ctx.spawnAnonymous(behavior) startWith.withKeepTraces(true) {
val s = ctx.spawnAnonymous(behaviorA) val r = ctx.spawnAnonymous(behavior)
val f = r ? Register(ServiceKeyA, s) val s = ctx.spawnAnonymous(behaviorA)
r ! Register(ServiceKeyA, s)(self) val f = r ? Register(ServiceKeyA, s)
(f, s) r ! Register(ServiceKeyA, s)(self)
}.expectMessage(1.second) { (f, s)
case (msg, (f, s)) }.expectMessage(1.second) {
msg should be(Registered(ServiceKeyA, s)) case (msg, (f, s))
f.foreach(self ! _)(system.executionContext) msg should be(Registered(ServiceKeyA, s))
s f.foreach(self ! _)(system.executionContext)
}.expectMessage(1.second) { s
case (msg, s) }.expectMessage(1.second) {
msg should be(Registered(ServiceKeyA, s)) case (msg, s)
msg should be(Registered(ServiceKeyA, s))
}
} }
} })
}) }
def `must be present in the system`(): Unit = sync(runTest("systemReceptionist") { "must be present in the system" in {
StepWise[Listing[ServiceA]] { (ctx, startWith) sync(runTest("systemReceptionist") {
val self = ctx.self StepWise[Listing[ServiceA]] { (ctx, startWith)
startWith.withKeepTraces(true) { val self = ctx.self
system.receptionist ! Find(ServiceKeyA)(self) startWith.withKeepTraces(true) {
}.expectMessage(1.second) { (msg, _) system.receptionist ! Find(ServiceKeyA)(self)
msg.serviceInstances should ===(Set()) }.expectMessage(1.second) { (msg, _)
msg.serviceInstances should ===(Set())
}
} }
} })
}) }
} }
object `A Receptionist (adapted)` extends CommonTests with AdaptedSystem
} }

View file

@ -6,24 +6,15 @@ package scaladsl
import akka.typed.testkit.{ EffectfulActorContext, TestKitSettings } import akka.typed.testkit.{ EffectfulActorContext, TestKitSettings }
import akka.typed.testkit.scaladsl.TestProbe import akka.typed.testkit.scaladsl.TestProbe
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import scala.concurrent.duration.DurationInt import scala.concurrent.duration.DurationInt
@RunWith(classOf[JUnitRunner]) class ImmutablePartialSpec extends TypedSpec with StartSupport {
final class ImmutablePartialSpec extends TypedSpec {
final object `An Actor.immutablePartial behavior (adapted)` private implicit val testSettings = TestKitSettings(system)
extends Tests
with AdaptedSystem
trait Tests extends StartSupport { "An immutable partial" must {
private implicit val testSettings = TestKitSettings(system) "correctly install the message handler" in {
override implicit def system: ActorSystem[TypedSpec.Command]
def `must correctly install the message handler`(): Unit = {
val probe = TestProbe[Command]("probe") val probe = TestProbe[Command]("probe")
val behavior = val behavior =
Actor.immutablePartial[Command] { Actor.immutablePartial[Command] {

View file

@ -7,21 +7,13 @@ package scaladsl
import akka.Done import akka.Done
import akka.typed.testkit.TestKitSettings import akka.typed.testkit.TestKitSettings
import akka.typed.testkit.scaladsl.TestProbe import akka.typed.testkit.scaladsl.TestProbe
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
@RunWith(classOf[JUnitRunner]) final class OnSignalSpec extends TypedSpec with StartSupport {
final class OnSignalSpec extends TypedSpec {
final object `An Actor.onSignal behavior (adapted)` extends Tests with AdaptedSystem private implicit val testSettings = TestKitSettings(system)
trait Tests extends StartSupport { "An Actor.OnSignal behavior" must {
"must correctly install the signal handler" in {
private implicit val testSettings = TestKitSettings(system)
override implicit def system: ActorSystem[TypedSpec.Command]
def `must correctly install the signal handler`(): Unit = {
val probe = TestProbe[Done]("probe") val probe = TestProbe[Done]("probe")
val behavior = val behavior =
Actor.deferred[Nothing] { context Actor.deferred[Nothing] { context

View file

@ -74,70 +74,76 @@ object IntroSpec {
} }
class IntroSpec extends TypedSpec { class IntroSpec extends TypedSpec {
import IntroSpec._ import IntroSpec._
def `must say hello`(): Unit = { "Hello world" must {
// TODO Implicits.global is not something we would like to encourage in docs "must say hello" in {
//#hello-world // TODO Implicits.global is not something we would like to encourage in docs
import HelloWorld._ //#hello-world
// using global pool since we want to run tasks after system.terminate import HelloWorld._
import scala.concurrent.ExecutionContext.Implicits.global // using global pool since we want to run tasks after system.terminate
import scala.concurrent.ExecutionContext.Implicits.global
val system: ActorSystem[Greet] = ActorSystem(greeter, "hello") val system: ActorSystem[Greet] = ActorSystem(greeter, "hello")
val future: Future[Greeted] = system ? (Greet("world", _)) val future: Future[Greeted] = system ? (Greet("world", _))
for { for {
greeting future.recover { case ex ex.getMessage } greeting future.recover { case ex ex.getMessage }
done { println(s"result: $greeting"); system.terminate() } done {
} println("system terminated") println(s"result: $greeting")
//#hello-world system.terminate()
}
def `must chat`(): Unit = {
//#chatroom-gabbler
import ChatRoom._
val gabbler =
Actor.immutable[SessionEvent] { (_, msg)
msg match {
//#chatroom-gabbler
// We document that the compiler warns about the missing handler for `SessionDenied`
case SessionDenied(reason)
println(s"cannot start chat room session: $reason")
Actor.stopped
//#chatroom-gabbler
case SessionGranted(handle)
handle ! PostMessage("Hello World!")
Actor.same
case MessagePosted(screenName, message)
println(s"message has been posted by '$screenName': $message")
Actor.stopped
} }
} } println("system terminated")
//#chatroom-gabbler //#hello-world
}
//#chatroom-main "must chat" in {
val main: Behavior[String] = //#chatroom-gabbler
Actor.deferred { ctx import ChatRoom._
val chatRoom = ctx.spawn(ChatRoom.behavior, "chatroom")
val gabblerRef = ctx.spawn(gabbler, "gabbler")
ctx.watch(gabblerRef)
Actor.immutablePartial[String] { val gabbler =
case (_, "go") Actor.immutable[SessionEvent] { (_, msg)
chatRoom ! GetSession("ol Gabbler", gabblerRef) msg match {
Actor.same //#chatroom-gabbler
} onSignal { // We document that the compiler warns about the missing handler for `SessionDenied`
case (_, Terminated(ref)) case SessionDenied(reason)
Actor.stopped println(s"cannot start chat room session: $reason")
Actor.stopped
//#chatroom-gabbler
case SessionGranted(handle)
handle ! PostMessage("Hello World!")
Actor.same
case MessagePosted(screenName, message)
println(s"message has been posted by '$screenName': $message")
Actor.stopped
}
} }
} //#chatroom-gabbler
val system = ActorSystem(main, "ChatRoomDemo") //#chatroom-main
system ! "go" val main: Behavior[String] =
Await.result(system.whenTerminated, 3.seconds) Actor.deferred { ctx
//#chatroom-main val chatRoom = ctx.spawn(ChatRoom.behavior, "chatroom")
val gabblerRef = ctx.spawn(gabbler, "gabbler")
ctx.watch(gabblerRef)
Actor.immutablePartial[String] {
case (_, "go")
chatRoom ! GetSession("ol Gabbler", gabblerRef)
Actor.same
} onSignal {
case (_, Terminated(ref))
Actor.stopped
}
}
val system = ActorSystem(main, "ChatRoomDemo")
system ! "go"
Await.result(system.whenTerminated, 3.seconds)
//#chatroom-main
}
} }
} }

View file

@ -4,7 +4,6 @@
package docs.akka.typed package docs.akka.typed
//#imports //#imports
import akka.NotUsed
import akka.actor.typed._ import akka.actor.typed._
import akka.actor.typed.scaladsl.Actor import akka.actor.typed.scaladsl.Actor
import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.ActorContext
@ -67,50 +66,52 @@ object MutableIntroSpec {
} }
class MutableIntroSpec extends TypedSpec { class MutableIntroSpec extends TypedSpec {
import MutableIntroSpec._ import MutableIntroSpec._
def `must chat`(): Unit = { "A chat room" must {
//#chatroom-gabbler "chat" in {
import ChatRoom._ //#chatroom-gabbler
import ChatRoom._
val gabbler = val gabbler =
Actor.immutable[SessionEvent] { (_, msg) Actor.immutable[SessionEvent] { (_, msg)
msg match { msg match {
case SessionDenied(reason) case SessionDenied(reason)
println(s"cannot start chat room session: $reason") println(s"cannot start chat room session: $reason")
Actor.stopped Actor.stopped
case SessionGranted(handle) case SessionGranted(handle)
handle ! PostMessage("Hello World!") handle ! PostMessage("Hello World!")
Actor.same Actor.same
case MessagePosted(screenName, message) case MessagePosted(screenName, message)
println(s"message has been posted by '$screenName': $message") println(s"message has been posted by '$screenName': $message")
Actor.stopped Actor.stopped
}
} }
} //#chatroom-gabbler
//#chatroom-gabbler
//#chatroom-main //#chatroom-main
val main: Behavior[String] = val main: Behavior[String] =
Actor.deferred { ctx Actor.deferred { ctx
val chatRoom = ctx.spawn(ChatRoom.behavior(), "chatroom") val chatRoom = ctx.spawn(ChatRoom.behavior(), "chatroom")
val gabblerRef = ctx.spawn(gabbler, "gabbler") val gabblerRef = ctx.spawn(gabbler, "gabbler")
ctx.watch(gabblerRef) ctx.watch(gabblerRef)
Actor.immutablePartial[String] { Actor.immutablePartial[String] {
case (_, "go") case (_, "go")
chatRoom ! GetSession("ol Gabbler", gabblerRef) chatRoom ! GetSession("ol Gabbler", gabblerRef)
Actor.same Actor.same
} onSignal { } onSignal {
case (_, Terminated(ref)) case (_, Terminated(ref))
println("Stopping guardian") println("Stopping guardian")
Actor.stopped Actor.stopped
}
} }
}
val system = ActorSystem(main, "ChatRoomDemo") val system = ActorSystem(main, "ChatRoomDemo")
system ! "go" system ! "go"
Await.result(system.whenTerminated, 1.second) Await.result(system.whenTerminated, 1.second)
//#chatroom-main //#chatroom-main
}
} }
} }

View file

@ -16,7 +16,6 @@ akka.typed {
library-extensions = ${?akka.typed.library-extensions} [] library-extensions = ${?akka.typed.library-extensions} []
} }
# TODO: move these out somewhere else when doing #23632
akka.actor { akka.actor {
serialization-bindings { serialization-bindings {
"akka.actor.typed.ActorRef" = typed-misc "akka.actor.typed.ActorRef" = typed-misc

View file

@ -3,6 +3,7 @@
*/ */
package akka.actor.typed package akka.actor.typed
import akka.annotation.InternalApi
import akka.{ actor a } import akka.{ actor a }
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
@ -61,10 +62,11 @@ object ActorRef {
// FIXME factory methods for below for Java (trait + object) // FIXME factory methods for below for Java (trait + object)
/** /**
* Create an ActorRef from a Future, buffering up to the given number of * INTERNAL API
* messages in while the Future is not fulfilled. *
* FIXME, this isn't really used since we removed the native actor system
*/ */
private[akka] def apply[T](f: Future[ActorRef[T]], bufferSize: Int = 1000): ActorRef[T] = @InternalApi private[akka] def apply[T](f: Future[ActorRef[T]], bufferSize: Int = 1000): ActorRef[T] =
f.value match { f.value match {
// an AdaptedActorSystem will always create refs eagerly, so it will take this path // an AdaptedActorSystem will always create refs eagerly, so it will take this path
case Some(Success(ref)) ref case Some(Success(ref)) ref

View file

@ -143,7 +143,7 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions {
* invocation when asking the guardian. * invocation when asking the guardian.
* *
* The returned Future of [[ActorRef]] may be converted into an [[ActorRef]] * The returned Future of [[ActorRef]] may be converted into an [[ActorRef]]
* to which messages can immediately be sent by using the [[ActorRef.apply[T](s*]] * to which messages can immediately be sent by using the [[ActorRef$.apply[T](s*]]
* method. * method.
*/ */
def systemActorOf[U](behavior: Behavior[U], name: String, props: Props = Props.empty)(implicit timeout: Timeout): Future[ActorRef[U]] def systemActorOf[U](behavior: Behavior[U], name: String, props: Props = Props.empty)(implicit timeout: Timeout): Future[ActorRef[U]]

View file

@ -23,9 +23,11 @@ trait ExtensionsImpl extends Extensions { self: ActorSystem[_] ⇒
private val extensions = new ConcurrentHashMap[ExtensionId[_], AnyRef] private val extensions = new ConcurrentHashMap[ExtensionId[_], AnyRef]
/** /**
* INTERNAL API
*
* Hook for ActorSystem to load extensions on startup * Hook for ActorSystem to load extensions on startup
*/ */
final def loadExtensions(): Unit = { @InternalApi private[akka] def loadExtensions(): Unit = {
/** /**
* @param throwOnLoadFail Throw exception when an extension fails to load (needed for backwards compatibility) * @param throwOnLoadFail Throw exception when an extension fails to load (needed for backwards compatibility)
*/ */

View file

@ -72,11 +72,11 @@ class ClusterShardingPersistenceSpec extends TypedSpec(ClusterShardingPersistenc
implicit val untypedSystem = system.toUntyped implicit val untypedSystem = system.toUntyped
private val untypedCluster = akka.cluster.Cluster(untypedSystem) private val untypedCluster = akka.cluster.Cluster(untypedSystem)
object `Typed cluster sharding with persistent actor` { "Typed cluster sharding with persistent actor" must {
untypedCluster.join(untypedCluster.selfAddress) untypedCluster.join(untypedCluster.selfAddress)
def `01 start persistent actor`(): Unit = { "start persistent actor" in {
ClusterSharding(system).spawn(persistentActor, Props.empty, typeKey, ClusterSharding(system).spawn(persistentActor, Props.empty, typeKey,
ClusterShardingSettings(system), maxNumberOfShards = 100, handOffStopMessage = StopPlz) ClusterShardingSettings(system), maxNumberOfShards = 100, handOffStopMessage = StopPlz)
@ -90,5 +90,4 @@ class ClusterShardingPersistenceSpec extends TypedSpec(ClusterShardingPersistenc
p.expectMsg("a|b|c") p.expectMsg("a|b|c")
} }
} }
} }

View file

@ -60,7 +60,9 @@ object ClusterShardingSpec {
final case class WhoAreYou(replyTo: ActorRef[String]) extends TestProtocol final case class WhoAreYou(replyTo: ActorRef[String]) extends TestProtocol
final case class StopPlz() extends TestProtocol final case class StopPlz() extends TestProtocol
sealed trait IdTestProtocol extends java.io.Serializable { def id: String } sealed trait IdTestProtocol extends java.io.Serializable {
def id: String
}
final case class IdReplyPlz(id: String, toMe: ActorRef[String]) extends IdTestProtocol final case class IdReplyPlz(id: String, toMe: ActorRef[String]) extends IdTestProtocol
final case class IdWhoAreYou(id: String, replyTo: ActorRef[String]) extends IdTestProtocol final case class IdWhoAreYou(id: String, replyTo: ActorRef[String]) extends IdTestProtocol
final case class IdStopPlz(id: String) extends IdTestProtocol final case class IdStopPlz(id: String) extends IdTestProtocol
@ -118,6 +120,7 @@ object ClusterShardingSpec {
} }
class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with ScalaFutures with Eventually { class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with ScalaFutures with Eventually {
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._
import ClusterShardingSpec._ import ClusterShardingSpec._
@ -164,9 +167,9 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca
Actor.same Actor.same
} }
object `Typed cluster sharding` { "Typed cluster sharding" must {
def `01 must join cluster`(): Unit = { "join cluster" in {
Cluster(system).manager ! Join(Cluster(system).selfMember.address) Cluster(system).manager ! Join(Cluster(system).selfMember.address)
Cluster(system2).manager ! Join(Cluster(system).selfMember.address) Cluster(system2).manager ! Join(Cluster(system).selfMember.address)
@ -181,7 +184,7 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca
} }
def `02 must send messsages via cluster sharding, using envelopes`(): Unit = { "send messsages via cluster sharding, using envelopes" in {
val ref = sharding.spawn( val ref = sharding.spawn(
behavior, behavior,
Props.empty, Props.empty,
@ -204,7 +207,8 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca
ref ! ShardingEnvelope(s"test$n", StopPlz()) ref ! ShardingEnvelope(s"test$n", StopPlz())
} }
} }
def `03 must send messsages via cluster sharding, without envelopes`(): Unit = {
"send messsages via cluster sharding, without envelopes" in {
val ref = sharding.spawn( val ref = sharding.spawn(
behaviorWithId, behaviorWithId,
Props.empty, Props.empty,
@ -228,7 +232,7 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca
} }
} }
// def `04 fail if starting sharding for already used typeName, but with wrong type`(): Unit = { // "04 fail if starting sharding for already used typeName, but with wrong type" in {
// val ex = intercept[Exception] { // val ex = intercept[Exception] {
// sharding.spawn( // sharding.spawn(
// Actor.empty[String], // Actor.empty[String],
@ -243,7 +247,7 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca
// ex.getMessage should include("already started") // ex.getMessage should include("already started")
// } // }
def `11 EntityRef - tell`(): Unit = { "EntityRef - tell" in {
val charlieRef = sharding.entityRefFor(typeKey, "charlie") val charlieRef = sharding.entityRefFor(typeKey, "charlie")
val p = TestProbe[String]() val p = TestProbe[String]()
@ -257,7 +261,7 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca
charlieRef ! StopPlz() charlieRef ! StopPlz()
} }
def `12 EntityRef - ask`(): Unit = { "EntityRef - ask" in {
val bobRef = sharding.entityRefFor(typeKey, "bob") val bobRef = sharding.entityRefFor(typeKey, "bob")
val charlieRef = sharding.entityRefFor(typeKey, "charlie") val charlieRef = sharding.entityRefFor(typeKey, "charlie")
@ -271,7 +275,5 @@ class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with Sca
bobRef ! StopPlz() bobRef ! StopPlz()
} }
} }
} }

View file

@ -12,7 +12,7 @@ import akka.actor.typed.scaladsl.AskPattern._
class ShardingSerializerSpec extends TypedSpec { class ShardingSerializerSpec extends TypedSpec {
object `The typed ShardingSerializer` { "The typed ShardingSerializer" must {
val serialization = SerializationExtension(ActorSystemAdapter.toUntyped(system)) val serialization = SerializationExtension(ActorSystemAdapter.toUntyped(system))
@ -27,13 +27,12 @@ class ShardingSerializerSpec extends TypedSpec {
} }
} }
def `must serialize and deserialize ShardingEnvelope`(): Unit = { "must serialize and deserialize ShardingEnvelope" in {
checkSerialization(ShardingEnvelope("abc", 42)) checkSerialization(ShardingEnvelope("abc", 42))
} }
def `must serialize and deserialize StartEntity`(): Unit = { "must serialize and deserialize StartEntity" in {
checkSerialization(StartEntity("abc")) checkSerialization(StartEntity("abc"))
} }
} }
} }

View file

@ -4,7 +4,7 @@
package akka.cluster.ddata.typed.scaladsl package akka.cluster.ddata.typed.scaladsl
import akka.actor.Scheduler import akka.actor.Scheduler
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, TypedSpec } import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, StartSupport, TypedSpec }
import akka.actor.typed.scaladsl.Actor import akka.actor.typed.scaladsl.Actor
import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._
@ -22,7 +22,8 @@ import scala.concurrent.duration._
object ReplicatorSpec { object ReplicatorSpec {
val config = ConfigFactory.parseString(""" val config = ConfigFactory.parseString(
"""
akka.actor.provider = "cluster" akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0 akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0 akka.remote.artery.canonical.port = 0
@ -113,16 +114,17 @@ object ReplicatorSpec {
} }
class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) with Eventually { class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) with Eventually with StartSupport {
import ReplicatorSpec._ import ReplicatorSpec._
trait RealTests extends StartSupport { implicit val testSettings = TestKitSettings(system)
implicit def system: ActorSystem[TypedSpec.Command] val settings = ReplicatorSettings(system)
implicit val testSettings = TestKitSettings(system) implicit val cluster = Cluster(system.toUntyped)
val settings = ReplicatorSettings(system)
implicit val cluster = Cluster(system.toUntyped)
def `have API for Update and Get`(): Unit = { "Replicator" must {
"have API for Update and Get" in {
val replicator = start(Replicator.behavior(settings)) val replicator = start(Replicator.behavior(settings))
val c = start(client(replicator)) val c = start(client(replicator))
@ -132,7 +134,7 @@ class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) with Eventually {
probe.expectMsg(1) probe.expectMsg(1)
} }
def `have API for Subscribe`(): Unit = { "have API for Subscribe" in {
val replicator = start(Replicator.behavior(settings)) val replicator = start(Replicator.behavior(settings))
val c = start(client(replicator)) val c = start(client(replicator))
@ -150,7 +152,7 @@ class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) with Eventually {
} }
} }
def `have an extension`(): Unit = { "have an extension" in {
val replicator = DistributedData(system).replicator val replicator = DistributedData(system).replicator
val c = start(client(replicator)) val c = start(client(replicator))
@ -159,8 +161,6 @@ class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) with Eventually {
c ! GetValue(probe.ref) c ! GetValue(probe.ref)
probe.expectMsg(1) probe.expectMsg(1)
} }
} }
object `A ReplicatorBehavior (real, adapted)` extends RealTests with AdaptedSystem
} }

View file

@ -39,9 +39,9 @@ class ClusterApiSpec extends TypedSpec(ClusterApiSpec.config) with ScalaFutures
val clusterNode1 = Cluster(system) val clusterNode1 = Cluster(system)
val untypedSystem1 = system.toUntyped val untypedSystem1 = system.toUntyped
object `A typed cluster` { "A typed Cluster" must {
def `01 must join a cluster and observe events from both sides`() = { "join a cluster and observe events from both sides" in {
val system2 = akka.actor.ActorSystem(system.name, system.settings.config) val system2 = akka.actor.ActorSystem(system.name, system.settings.config)
val adaptedSystem2 = system2.toTyped val adaptedSystem2 = system2.toTyped

View file

@ -101,9 +101,9 @@ class ClusterSingletonApiSpec extends TypedSpec(ClusterSingletonApiSpec.config)
val adaptedSystem2 = system2.toTyped val adaptedSystem2 = system2.toTyped
val clusterNode2 = Cluster(adaptedSystem2) val clusterNode2 = Cluster(adaptedSystem2)
object `A typed cluster singleton` { "A typed cluster singleton" must {
def `01 must be accessible from two nodes in a cluster`() = { "be accessible from two nodes in a cluster" in {
val node1UpProbe = TestProbe[SelfUp]()(system, implicitly[TestKitSettings]) val node1UpProbe = TestProbe[SelfUp]()(system, implicitly[TestKitSettings])
clusterNode1.subscriptions ! Subscribe(node1UpProbe.ref, classOf[SelfUp]) clusterNode1.subscriptions ! Subscribe(node1UpProbe.ref, classOf[SelfUp])

View file

@ -65,11 +65,11 @@ class ClusterSingletonPersistenceSpec extends TypedSpec(ClusterSingletonPersiste
implicit val untypedSystem = system.toUntyped implicit val untypedSystem = system.toUntyped
private val untypedCluster = akka.cluster.Cluster(untypedSystem) private val untypedCluster = akka.cluster.Cluster(untypedSystem)
object `Typed cluster singleton with persistent actor` { "A typed cluster singleton with persistent actor" must {
untypedCluster.join(untypedCluster.selfAddress) untypedCluster.join(untypedCluster.selfAddress)
def `01 start persistent actor`(): Unit = { "start persistent actor" in {
val ref = ClusterSingleton(system).spawn( val ref = ClusterSingleton(system).spawn(
behavior = persistentActor, behavior = persistentActor,
singletonName = "singleton", singletonName = "singleton",
@ -86,5 +86,4 @@ class ClusterSingletonPersistenceSpec extends TypedSpec(ClusterSingletonPersiste
p.expectMsg("a|b|c") p.expectMsg("a|b|c")
} }
} }
} }

View file

@ -26,10 +26,9 @@ object MiscMessageSerializerSpec {
class MiscMessageSerializerSpec extends TypedSpec(MiscMessageSerializerSpec.config) { class MiscMessageSerializerSpec extends TypedSpec(MiscMessageSerializerSpec.config) {
object `The typed MiscMessageSerializer` { val serialization = SerializationExtension(system.toUntyped)
val serialization = SerializationExtension(system.toUntyped)
"MiscMessageSerializer" must {
def checkSerialization(obj: AnyRef): Unit = { def checkSerialization(obj: AnyRef): Unit = {
serialization.findSerializerFor(obj) match { serialization.findSerializerFor(obj) match {
case serializer: MiscMessageSerializer case serializer: MiscMessageSerializer
@ -41,10 +40,9 @@ class MiscMessageSerializerSpec extends TypedSpec(MiscMessageSerializerSpec.conf
} }
} }
def `must serialize and deserialize typed actor refs `(): Unit = { "must serialize and deserialize typed actor refs" in {
val ref = (system ? Create(Actor.empty[Unit], "some-actor")).futureValue val ref = (system ? Create(Actor.empty[Unit], "some-actor")).futureValue
checkSerialization(ref) checkSerialization(ref)
} }
} }
} }

View file

@ -9,10 +9,8 @@ import akka.actor.ExtendedActorSystem
import akka.cluster.Cluster import akka.cluster.Cluster
import akka.cluster.typed.ActorRefResolver import akka.cluster.typed.ActorRefResolver
import akka.serialization.SerializerWithStringManifest import akka.serialization.SerializerWithStringManifest
import akka.actor.typed.ActorRef import akka.actor.typed.{ ActorRef, ActorSystem, StartSupport, TypedSpec }
import akka.actor.typed.ActorSystem
import akka.actor.typed.internal.adapter.ActorSystemAdapter import akka.actor.typed.internal.adapter.ActorSystemAdapter
import akka.actor.typed.TypedSpec
import akka.actor.typed.TypedSpec.Command import akka.actor.typed.TypedSpec.Command
import akka.cluster.typed.ActorRefResolver import akka.cluster.typed.ActorRefResolver
import akka.actor.typed.internal.adapter.ActorRefAdapter import akka.actor.typed.internal.adapter.ActorRefAdapter
@ -96,45 +94,47 @@ object ClusterReceptionistSpec {
val PingKey = Receptionist.ServiceKey[PingProtocol]("pingy") val PingKey = Receptionist.ServiceKey[PingProtocol]("pingy")
} }
class ClusterReceptionistSpec extends TypedSpec(ClusterReceptionistSpec.config) { class ClusterReceptionistSpec extends TypedSpec(ClusterReceptionistSpec.config) with StartSupport {
import ClusterReceptionistSpec._ import ClusterReceptionistSpec._
val adaptedSystem = system implicit val testSettings = TestKitSettings(system)
implicit val testSettings = TestKitSettings(adaptedSystem) val untypedSystem1 = ActorSystemAdapter.toUntyped(system)
val untypedSystem1 = ActorSystemAdapter.toUntyped(adaptedSystem)
val clusterNode1 = Cluster(untypedSystem1) val clusterNode1 = Cluster(untypedSystem1)
val system2 = akka.actor.ActorSystem( val system2 = akka.actor.ActorSystem(
adaptedSystem.name, system.name,
adaptedSystem.settings.config) system.settings.config)
val adaptedSystem2 = system2.toTyped val adaptedSystem2 = system2.toTyped
val clusterNode2 = Cluster(system2) val clusterNode2 = Cluster(system2)
clusterNode1.join(clusterNode1.selfAddress) clusterNode1.join(clusterNode1.selfAddress)
clusterNode2.join(clusterNode1.selfAddress) clusterNode2.join(clusterNode1.selfAddress)
object `The ClusterReceptionist` extends StartSupport { import Receptionist._
def system: ActorSystem[Command] = adaptedSystem
import Receptionist._
def `must eventually replicate registrations to the other side`() = new TestSetup { "The cluster receptionist" must {
val regProbe = TestProbe[Any]()(adaptedSystem, testSettings)
val regProbe2 = TestProbe[Any]()(adaptedSystem2, testSettings)
adaptedSystem2.receptionist ! Subscribe(PingKey, regProbe2.ref) "must eventually replicate registrations to the other side" in {
regProbe2.expectMsg(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) new TestSetup {
val regProbe = TestProbe[Any]()(system, testSettings)
val regProbe2 = TestProbe[Any]()(adaptedSystem2, testSettings)
val service = start(pingPong) adaptedSystem2.receptionist ! Subscribe(PingKey, regProbe2.ref)
system.receptionist ! Register(PingKey, service, regProbe.ref) regProbe2.expectMsg(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
regProbe.expectMsg(Registered(PingKey, service))
val Listing(PingKey, remoteServiceRefs) = regProbe2.expectMsgType[Listing[PingProtocol]] val service = start(pingPong)
val theRef = remoteServiceRefs.head system.receptionist ! Register(PingKey, service, regProbe.ref)
theRef ! Ping(regProbe2.ref) regProbe.expectMsg(Registered(PingKey, service))
regProbe2.expectMsg(Pong)
service ! Perish val Listing(PingKey, remoteServiceRefs) = regProbe2.expectMsgType[Listing[PingProtocol]]
regProbe2.expectMsg(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) val theRef = remoteServiceRefs.head
theRef ! Ping(regProbe2.ref)
regProbe2.expectMsg(Pong)
service ! Perish
regProbe2.expectMsg(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
}
} }
} }
@ -143,7 +143,7 @@ class ClusterReceptionistSpec extends TypedSpec(ClusterReceptionistSpec.config)
override def afterAll(): Unit = { override def afterAll(): Unit = {
super.afterAll() super.afterAll()
Await.result(adaptedSystem.terminate(), 3.seconds) Await.result(system.terminate(), 3.seconds)
Await.result(system2.terminate(), 3.seconds) Await.result(system2.terminate(), 3.seconds)
} }
} }

View file

@ -4,10 +4,7 @@
package akka.persistence.typed.scaladsl package akka.persistence.typed.scaladsl
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor.typed.ActorRef import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, StartSupport, SupervisorStrategy, Terminated, TypedSpec }
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.TypedSpec
import akka.actor.typed.scaladsl.Actor import akka.actor.typed.scaladsl.Actor
import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._
@ -17,8 +14,6 @@ import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.Eventually import org.scalatest.concurrent.Eventually
import akka.util.Timeout import akka.util.Timeout
import akka.persistence.typed.scaladsl.PersistentActor._ import akka.persistence.typed.scaladsl.PersistentActor._
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.Terminated
object PersistentActorSpec { object PersistentActorSpec {
@ -113,14 +108,14 @@ object PersistentActorSpec {
} }
class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eventually { class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eventually with StartSupport {
import PersistentActorSpec._ import PersistentActorSpec._
trait RealTests extends StartSupport { implicit val testSettings = TestKitSettings(system)
implicit def system: ActorSystem[TypedSpec.Command]
implicit val testSettings = TestKitSettings(system)
def `persist an event`(): Unit = { "A typed persistent actor" must {
"persist an event" in {
val c = start(counter("c1")) val c = start(counter("c1"))
val probe = TestProbe[State] val probe = TestProbe[State]
@ -129,7 +124,7 @@ class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eve
probe.expectMsg(State(1, Vector(0))) probe.expectMsg(State(1, Vector(0)))
} }
def `replay stored events`(): Unit = { "replay stored events" in {
val c = start(counter("c2")) val c = start(counter("c2"))
val probe = TestProbe[State] val probe = TestProbe[State]
@ -147,7 +142,7 @@ class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eve
probe.expectMsg(State(4, Vector(0, 1, 2, 3))) probe.expectMsg(State(4, Vector(0, 1, 2, 3)))
} }
def `handle Terminated signal`(): Unit = { "handle Terminated signal" in {
val c = start(counter("c3")) val c = start(counter("c3"))
val probe = TestProbe[State] val probe = TestProbe[State]
@ -159,7 +154,7 @@ class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eve
} }
} }
def `handle receive timeout`(): Unit = { "handle receive timeout" in {
val c = start(counter("c4")) val c = start(counter("c4"))
val probe = TestProbe[State] val probe = TestProbe[State]
@ -177,7 +172,7 @@ class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eve
* Verify that all side-effects callbacks are called (in order) and only once. * Verify that all side-effects callbacks are called (in order) and only once.
* The [[IncrementTwiceAndThenLog]] command will emit two Increment events * The [[IncrementTwiceAndThenLog]] command will emit two Increment events
*/ */
def `chainable side effects with events`(): Unit = { "chainable side effects with events" in {
val loggingProbe = TestProbe[String] val loggingProbe = TestProbe[String]
val c = start(counter("c5", loggingProbe.ref)) val c = start(counter("c5", loggingProbe.ref))
@ -192,7 +187,7 @@ class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eve
} }
/** Proves that side-effects are called when emitting an empty list of events */ /** Proves that side-effects are called when emitting an empty list of events */
def `chainable side effects without events`(): Unit = { "chainable side effects without events" in {
val loggingProbe = TestProbe[String] val loggingProbe = TestProbe[String]
val c = start(counter("c6", loggingProbe.ref)) val c = start(counter("c6", loggingProbe.ref))
@ -204,7 +199,7 @@ class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eve
} }
/** Proves that side-effects are called when explicitly calling Effect.none */ /** Proves that side-effects are called when explicitly calling Effect.none */
def `chainable side effects when doing nothing (Effect.none)`(): Unit = { "chainable side effects when doing nothing (Effect.none)" in {
val loggingProbe = TestProbe[String] val loggingProbe = TestProbe[String]
val c = start(counter("c7", loggingProbe.ref)) val c = start(counter("c7", loggingProbe.ref))
@ -215,7 +210,7 @@ class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eve
loggingProbe.expectMsg(firstLogging) loggingProbe.expectMsg(firstLogging)
} }
def `work when wrapped in other behavior`(): Unit = { "work when wrapped in other behavior" in {
// FIXME This is a major problem with current implementation. Since the // FIXME This is a major problem with current implementation. Since the
// behavior is running as an untyped PersistentActor it's not possible to // behavior is running as an untyped PersistentActor it's not possible to
// wrap it in Actor.deferred or Actor.supervise // wrap it in Actor.deferred or Actor.supervise
@ -224,8 +219,6 @@ class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eve
.onFailure(SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 0.1)) .onFailure(SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 0.1))
val c = start(behavior) val c = start(behavior)
} }
} }
object `A PersistentActor (real, adapted)` extends RealTests with AdaptedSystem
} }

View file

@ -446,10 +446,3 @@ def akkaModule(name: String): Project =
.settings(akka.Formatting.formatSettings) .settings(akka.Formatting.formatSettings)
.enablePlugins(BootstrapGenjavadoc) .enablePlugins(BootstrapGenjavadoc)
lazy val typedTests = taskKey[Unit]("Runs all the typed tests")
typedTests := {
(test in(actorTyped, Test)).value
(test in(actorTypedTests, Test)).value
(test in(clusterTyped, Test)).value
(test in(clusterShardingTyped, Test)).value
}