Test output cleaned up in akka-actor-tests and akka-testkit

This commit is contained in:
Derek Williams 2011-07-29 15:22:11 -06:00
parent 02aeec6b57
commit 50bb14d9c3
16 changed files with 484 additions and 430 deletions

View file

@ -85,12 +85,14 @@ class ActorFireForgetRequestReplySpec extends WordSpec with MustMatchers with Be
}
"should shutdown crashed temporary actor" in {
val actor = actorOf[CrashingTemporaryActor].start()
actor.isRunning must be(true)
actor ! "Die"
state.finished.await
sleepFor(1 second)
actor.isShutdown must be(true)
filterEvents(EventFilter[Exception]("Expected")) {
val actor = actorOf[CrashingTemporaryActor].start()
actor.isRunning must be(true)
actor ! "Die"
state.finished.await
sleepFor(1 second)
actor.isShutdown must be(true)
}
}
}
}

View file

@ -326,25 +326,27 @@ class ActorRefSpec extends WordSpec with MustMatchers {
}
"restart when Kill:ed" in {
val latch = new CountDownLatch(2)
filterEvents(EventFilter[ActorKilledException]) {
val latch = new CountDownLatch(2)
val boss = Actor.actorOf(new Actor {
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), scala.Some(2), scala.Some(1000))
val boss = Actor.actorOf(new Actor {
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), scala.Some(2), scala.Some(1000))
val ref = Actor.actorOf(
new Actor {
def receive = { case _ }
override def preRestart(reason: Throwable, msg: Option[Any]) = latch.countDown()
override def postRestart(reason: Throwable) = latch.countDown()
}).start()
val ref = Actor.actorOf(
new Actor {
def receive = { case _ }
override def preRestart(reason: Throwable, msg: Option[Any]) = latch.countDown()
override def postRestart(reason: Throwable) = latch.countDown()
}).start()
self link ref
self link ref
protected def receive = { case "sendKill" ref ! Kill }
}).start()
protected def receive = { case "sendKill" ref ! Kill }
}).start()
boss ! "sendKill"
latch.await(5, TimeUnit.SECONDS) must be === true
boss ! "sendKill"
latch.await(5, TimeUnit.SECONDS) must be === true
}
}
}
}

View file

@ -85,75 +85,85 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo
ref.start()
}
val expectedEvents = Seq(EventFilter[ActorKilledException], EventFilter[IllegalActorStateException]("expected"))
"An Actor restart" must {
"invoke preRestart, preStart, postRestart" in {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
filterEvents(expectedEvents) {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
}
}
}
"support creation of nested actors in freshInstance()" in {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! Nested
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
val (tActor, tRef) = expectMsgType[(Actor, TestActorRef[Actor])]
tRef.underlyingActor must be(tActor)
expectMsg((tActor, tRef))
tRef.stop()
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
filterEvents(expectedEvents) {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! Nested
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
val (tActor, tRef) = expectMsgType[(Actor, TestActorRef[Actor])]
tRef.underlyingActor must be(tActor)
expectMsg((tActor, tRef))
tRef.stop()
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
}
}
}
"use freshInstance() if available" in {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! 42
actor ! Handover
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
filterEvents(expectedEvents) {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! 42
actor ! Handover
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
}
actor ! "get"
expectMsg(1 second, 42)
}
actor ! "get"
expectMsg(1 second, 42)
}
"fall back to default factory if freshInstance() fails" in {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! 42
actor ! Fail
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
filterEvents(expectedEvents) {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! 42
actor ! Fail
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
}
actor ! "get"
expectMsg(1 second, 0)
}
actor ! "get"
expectMsg(1 second, 0)
}
}

View file

@ -8,7 +8,7 @@ import org.scalatest.{ WordSpec, BeforeAndAfterAll, BeforeAndAfterEach }
import org.scalatest.matchers.MustMatchers
import akka.testkit._
import TestEvent.{ Mute, UnMuteAll }
import FSM._
import akka.util.Duration
import akka.util.duration._
@ -112,12 +112,14 @@ class FSMActorSpec extends WordSpec with MustMatchers with TestKit with BeforeAn
}
override def beforeAll {
EventHandler notify Mute(EventFilter[EventHandler.EventHandlerException]("Next state 2 does not exist"))
val f = FSM.getClass.getDeclaredField("debugEvent")
f.setAccessible(true)
f.setBoolean(FSM, true)
}
override def afterAll {
EventHandler notify UnMuteAll
val f = FSM.getClass.getDeclaredField("debugEvent")
f.setAccessible(true)
f.setBoolean(FSM, false)

View file

@ -5,7 +5,7 @@ package akka.actor
import org.scalatest.{ WordSpec, BeforeAndAfterAll, BeforeAndAfterEach }
import org.scalatest.matchers.MustMatchers
import akka.testkit.{ TestKit, TestActorRef }
import akka.testkit.{ TestKit, TestActorRef, EventFilter, TestEvent }
import akka.event.EventHandler
import Actor._
import akka.util.duration._
@ -22,6 +22,8 @@ class LoggingReceiveSpec
val level = EventHandler.level
override def beforeAll {
EventHandler.notify(TestEvent.Mute(EventFilter[UnhandledMessageException],
EventFilter[ActorKilledException]))
EventHandler.addListener(testActor)
EventHandler.level = EventHandler.DebugLevel
}
@ -29,6 +31,7 @@ class LoggingReceiveSpec
override def afterAll {
EventHandler.removeListener(testActor)
EventHandler.level = level
EventHandler.notify(TestEvent.UnMuteAll)
}
override def afterEach {

View file

@ -14,6 +14,7 @@ import akka.util.Duration
import akka.dispatch.{ Dispatchers, Future, KeptPromise }
import java.util.concurrent.atomic.AtomicReference
import annotation.tailrec
import akka.testkit.{ EventFilter, filterEvents }
object TypedActorSpec {
@ -172,9 +173,11 @@ class TypedActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
}
"throw an IllegalStateExcpetion when TypedActor.self is called in the wrong scope" in {
(intercept[IllegalStateException] {
TypedActor.self[Foo]
}).getMessage must equal("Calling TypedActor.self outside of a TypedActor implementation method!")
filterEvents(EventFilter[IllegalStateException]("Calling")) {
(intercept[IllegalStateException] {
TypedActor.self[Foo]
}).getMessage must equal("Calling TypedActor.self outside of a TypedActor implementation method!")
}
}
"have access to itself when executing a method call" in {
@ -259,27 +262,29 @@ class TypedActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
}
"be able to handle exceptions when calling methods" in {
val t = newFooBar
filterEvents(EventFilter[IllegalStateException]("expected")) {
val t = newFooBar
t.incr()
t.failingPigdog()
t.read() must be(1) //Make sure state is not reset after failure
t.incr()
t.failingPigdog()
t.read() must be(1) //Make sure state is not reset after failure
t.failingFuturePigdog.await.exception.get.getMessage must be("expected")
t.read() must be(1) //Make sure state is not reset after failure
t.failingFuturePigdog.await.exception.get.getMessage must be("expected")
t.read() must be(1) //Make sure state is not reset after failure
(intercept[IllegalStateException] {
t.failingJOptionPigdog
}).getMessage must be("expected")
t.read() must be(1) //Make sure state is not reset after failure
(intercept[IllegalStateException] {
t.failingJOptionPigdog
}).getMessage must be("expected")
t.read() must be(1) //Make sure state is not reset after failure
(intercept[IllegalStateException] {
t.failingOptionPigdog
}).getMessage must be("expected")
(intercept[IllegalStateException] {
t.failingOptionPigdog
}).getMessage must be("expected")
t.read() must be(1) //Make sure state is not reset after failure
t.read() must be(1) //Make sure state is not reset after failure
mustStop(t)
mustStop(t)
}
}
"be able to support stacked traits for the interface part" in {

View file

@ -8,72 +8,75 @@ import org.scalatest.matchers.MustMatchers
import akka.dispatch.Dispatchers
import akka.config.Supervision.{ SupervisorConfig, OneForOneStrategy, Supervise, Permanent }
import java.util.concurrent.CountDownLatch
import akka.testkit.{ filterEvents, EventFilter }
class SupervisorMiscSpec extends WordSpec with MustMatchers {
"A Supervisor" should {
"restart a crashing actor and its dispatcher for any dispatcher" in {
val countDownLatch = new CountDownLatch(4)
filterEvents(EventFilter[Exception]("killed")) {
val countDownLatch = new CountDownLatch(4)
val actor1 = Actor.actorOf(new Actor {
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
val actor1 = Actor.actorOf(new Actor {
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
protected def receive = {
case "kill" throw new Exception("killed")
case _ println("received unknown message")
}
}).start()
protected def receive = {
case "kill" throw new Exception("killed")
case _ println("received unknown message")
}
}).start()
val actor2 = Actor.actorOf(new Actor {
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
val actor2 = Actor.actorOf(new Actor {
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
protected def receive = {
case "kill" throw new Exception("killed")
case _ println("received unknown message")
}
}).start()
protected def receive = {
case "kill" throw new Exception("killed")
case _ println("received unknown message")
}
}).start()
val actor3 = Actor.actorOf(new Actor {
self.dispatcher = Dispatchers.newDispatcher("test").build
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
val actor3 = Actor.actorOf(new Actor {
self.dispatcher = Dispatchers.newDispatcher("test").build
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
protected def receive = {
case "kill" throw new Exception("killed")
case _ println("received unknown message")
}
}).start()
protected def receive = {
case "kill" throw new Exception("killed")
case _ println("received unknown message")
}
}).start()
val actor4 = Actor.actorOf(new Actor {
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
val actor4 = Actor.actorOf(new Actor {
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
protected def receive = {
case "kill" throw new Exception("killed")
case _ println("received unknown message")
}
}).start()
protected def receive = {
case "kill" throw new Exception("killed")
case _ println("received unknown message")
}
}).start()
val sup = Supervisor(
SupervisorConfig(
OneForOneStrategy(List(classOf[Exception]), 3, 5000),
Supervise(actor1, Permanent) ::
Supervise(actor2, Permanent) ::
Supervise(actor3, Permanent) ::
Supervise(actor4, Permanent) ::
Nil))
val sup = Supervisor(
SupervisorConfig(
OneForOneStrategy(List(classOf[Exception]), 3, 5000),
Supervise(actor1, Permanent) ::
Supervise(actor2, Permanent) ::
Supervise(actor3, Permanent) ::
Supervise(actor4, Permanent) ::
Nil))
actor1 ! "kill"
actor2 ! "kill"
actor3 ! "kill"
actor4 ! "kill"
actor1 ! "kill"
actor2 ! "kill"
actor3 ! "kill"
actor4 ! "kill"
countDownLatch.await()
assert(!actor1.isShutdown, "actor1 is shutdown")
assert(!actor2.isShutdown, "actor2 is shutdown")
assert(!actor3.isShutdown, "actor3 is shutdown")
assert(!actor4.isShutdown, "actor4 is shutdown")
countDownLatch.await()
assert(!actor1.isShutdown, "actor1 is shutdown")
assert(!actor2.isShutdown, "actor2 is shutdown")
assert(!actor3.isShutdown, "actor3 is shutdown")
assert(!actor4.isShutdown, "actor4 is shutdown")
}
}
}
}

View file

@ -70,6 +70,7 @@ object SupervisorSpec {
override def receive = {
case Die (temp.?(Die, TimeoutMillis)).get
case _: MaximumNumberOfRestartsWithinTimeRangeReached
}
}
@ -200,7 +201,8 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
override def beforeAll() = {
EventHandler notify Mute(EventFilter[Exception]("Die"),
EventFilter[IllegalStateException]("Don't wanna!"))
EventFilter[IllegalStateException]("Don't wanna!"),
EventFilter[RuntimeException]("Expected"))
}
override def afterAll() = {

View file

@ -8,6 +8,7 @@ import org.scalatest.matchers.MustMatchers
import akka.util.duration._
import akka.testkit.Testing.sleepFor
import akka.testkit.{ EventFilter, filterEvents }
import akka.dispatch.Dispatchers
import akka.config.Supervision.{ SupervisorConfig, OneForOneStrategy, Supervise, Permanent }
import Actor._
@ -33,15 +34,17 @@ class SupervisorTreeSpec extends WordSpec with MustMatchers {
"In a 3 levels deep supervisor tree (linked in the constructor) we" must {
"be able to kill the middle actor and see itself and its child restarted" in {
log = "INIT"
filterEvents(EventFilter[Exception]) {
log = "INIT"
val lastActor = actorOf(new Chainer, "lastActor").start
val middleActor = actorOf(new Chainer(Some(lastActor)), "middleActor").start
val headActor = actorOf(new Chainer(Some(middleActor)), "headActor").start
val lastActor = actorOf(new Chainer, "lastActor").start
val middleActor = actorOf(new Chainer(Some(lastActor)), "middleActor").start
val headActor = actorOf(new Chainer(Some(middleActor)), "headActor").start
middleActor ! Die
sleepFor(500 millis)
log must equal("INITmiddleActorlastActor")
middleActor ! Die
sleepFor(500 millis)
log must equal("INITmiddleActorlastActor")
}
}
}
}

View file

@ -7,7 +7,7 @@ import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.actor._
import akka.config.Supervision._
import akka.testkit.{ filterEvents, EventFilter }
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
import org.scalatest.matchers.MustMatchers
@ -23,30 +23,33 @@ class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll {
"A supervised actor with lifecycle PERMANENT" should {
"be able to reply on failure during preRestart" in {
filterEvents(EventFilter[Exception]("test")) {
val latch = new CountDownLatch(1)
val sender = Actor.actorOf(new Sender(latch)).start()
val latch = new CountDownLatch(1)
val sender = Actor.actorOf(new Sender(latch)).start()
val supervised = Actor.actorOf[Supervised]
val supervisor = Supervisor(SupervisorConfig(
AllForOneStrategy(List(classOf[Exception]), 5, 10000),
Supervise(supervised, Permanent) :: Nil))
val supervised = Actor.actorOf[Supervised]
val supervisor = Supervisor(SupervisorConfig(
AllForOneStrategy(List(classOf[Exception]), 5, 10000),
Supervise(supervised, Permanent) :: Nil))
supervised.!("test")(Some(sender))
latch.await(5, TimeUnit.SECONDS) must be(true)
supervised.!("test")(Some(sender))
latch.await(5, TimeUnit.SECONDS) must be(true)
}
}
"be able to reply on failure during postStop" in {
val latch = new CountDownLatch(1)
val sender = Actor.actorOf(new Sender(latch)).start()
filterEvents(EventFilter[Exception]("test")) {
val latch = new CountDownLatch(1)
val sender = Actor.actorOf(new Sender(latch)).start()
val supervised = Actor.actorOf[Supervised]
val supervisor = Supervisor(SupervisorConfig(
AllForOneStrategy(List(classOf[Exception]), 5, 10000),
Supervise(supervised, Temporary) :: Nil))
val supervised = Actor.actorOf[Supervised]
val supervisor = Supervisor(SupervisorConfig(
AllForOneStrategy(List(classOf[Exception]), 5, 10000),
Supervise(supervised, Temporary) :: Nil))
supervised.!("test")(Some(sender))
latch.await(5, TimeUnit.SECONDS) must be(true)
supervised.!("test")(Some(sender))
latch.await(5, TimeUnit.SECONDS) must be(true)
}
}
}
}

View file

@ -5,7 +5,7 @@ package akka.actor.dispatch
import org.scalatest.junit.JUnitSuite
import org.scalatest.Assertions._
import akka.testkit.Testing
import akka.testkit.{ Testing, filterEvents, EventFilter }
import akka.dispatch._
import akka.actor.Actor._
import java.util.concurrent.atomic.AtomicLong
@ -330,15 +330,17 @@ abstract class ActorModelSpec extends JUnitSuite {
@Test
def dispatcherShouldSuspendAndResumeAFailingNonSupervisedPermanentActor {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.start()
val done = new CountDownLatch(1)
a ! Restart
a ! CountDown(done)
assertCountDown(done, Testing.testTime(3000), "Should be suspended+resumed and done with next message within 3 seconds")
a.stop()
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 2,
msgsProcessed = 2, suspensions = 1, resumes = 1)
filterEvents(EventFilter[Exception]("Restart")) {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.start()
val done = new CountDownLatch(1)
a ! Restart
a ! CountDown(done)
assertCountDown(done, Testing.testTime(3000), "Should be suspended+resumed and done with next message within 3 seconds")
a.stop()
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 2,
msgsProcessed = 2, suspensions = 1, resumes = 1)
}
}
@Test
@ -397,48 +399,52 @@ abstract class ActorModelSpec extends JUnitSuite {
@Test
def dispatcherShouldContinueToProcessMessagesWhenAThreadGetsInterrupted {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.start()
val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar")
val f3 = a ? Interrupt
val f4 = a ? Reply("foo2")
val f5 = a ? Interrupt
val f6 = a ? Reply("bar2")
filterEvents(EventFilter[InterruptedException]("Ping!"), EventFilter[akka.event.EventHandler.EventHandlerException]) {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.start()
val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar")
val f3 = a ? Interrupt
val f4 = a ? Reply("foo2")
val f5 = a ? Interrupt
val f6 = a ? Reply("bar2")
assert(f1.get === "foo")
assert(f2.get === "bar")
assert((intercept[InterruptedException] {
f3.get
}).getMessage === "Ping!")
assert(f4.get === "foo2")
assert((intercept[InterruptedException] {
f5.get
}).getMessage === "Ping!")
assert(f6.get === "bar2")
assert(f1.get === "foo")
assert(f2.get === "bar")
assert((intercept[InterruptedException] {
f3.get
}).getMessage === "Ping!")
assert(f4.get === "foo2")
assert((intercept[InterruptedException] {
f5.get
}).getMessage === "Ping!")
assert(f6.get === "bar2")
}
}
@Test
def dispatcherShouldContinueToProcessMessagesWhenExceptionIsThrown {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.start()
val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar")
val f3 = a ? new ThrowException(new IndexOutOfBoundsException("IndexOutOfBoundsException"))
val f4 = a ? Reply("foo2")
val f5 = a ? new ThrowException(new RemoteException("RemoteException"))
val f6 = a ? Reply("bar2")
filterEvents(EventFilter[IndexOutOfBoundsException], EventFilter[RemoteException]) {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.start()
val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar")
val f3 = a ? new ThrowException(new IndexOutOfBoundsException("IndexOutOfBoundsException"))
val f4 = a ? Reply("foo2")
val f5 = a ? new ThrowException(new RemoteException("RemoteException"))
val f6 = a ? Reply("bar2")
assert(f1.get === "foo")
assert(f2.get === "bar")
assert((intercept[IndexOutOfBoundsException] {
f3.get
}).getMessage === "IndexOutOfBoundsException")
assert(f4.get === "foo2")
assert((intercept[RemoteException] {
f5.get
}).getMessage === "RemoteException")
assert(f6.get === "bar2")
assert(f1.get === "foo")
assert(f2.get === "bar")
assert((intercept[IndexOutOfBoundsException] {
f3.get
}).getMessage === "IndexOutOfBoundsException")
assert(f4.get === "foo2")
assert((intercept[RemoteException] {
f5.get
}).getMessage === "RemoteException")
assert(f6.get === "bar2")
}
}
}

View file

@ -7,6 +7,7 @@ import akka.dispatch.{ Dispatchers, Dispatcher }
import akka.actor.Actor
import Actor._
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
import akka.testkit.{ filterEvents, EventFilter }
object DispatcherActorSpec {
class TestActor extends Actor {
@ -60,15 +61,17 @@ class DispatcherActorSpec extends JUnitSuite {
@Test
def shouldSendReceiveException = {
val actor = actorOf[TestActor].start()
try {
(actor ? "Failure").get
fail("Should have thrown an exception")
} catch {
case e
assert("Expected exception; to test fault-tolerance" === e.getMessage())
filterEvents(EventFilter[RuntimeException]("Expected")) {
val actor = actorOf[TestActor].start()
try {
(actor ? "Failure").get
fail("Should have thrown an exception")
} catch {
case e
assert("Expected exception; to test fault-tolerance" === e.getMessage())
}
actor.stop()
}
actor.stop()
}
@Test

View file

@ -7,7 +7,7 @@ import java.util.concurrent.atomic.AtomicInteger
import akka.actor.Actor._
import akka.testkit.Testing._
import akka.actor.{ TypedActor, Actor }
import akka.testkit.TestLatch
import akka.testkit.{ TestLatch, filterEvents, EventFilter }
import akka.util.duration._
object ActorPoolSpec {
@ -357,210 +357,214 @@ class ActorPoolSpec extends WordSpec with MustMatchers {
}
"provide default supervision of pooled actors" in {
import akka.config.Supervision._
val pingCount = new AtomicInteger(0)
val deathCount = new AtomicInteger(0)
var keepDying = false
filterEvents(EventFilter[RuntimeException]) {
import akka.config.Supervision._
val pingCount = new AtomicInteger(0)
val deathCount = new AtomicInteger(0)
var keepDying = false
val pool1 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ pingCount.incrementAndGet
}
val pool1 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ pingCount.incrementAndGet
}
}).start()
}).start()
}).start()
val pool2 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
self.lifeCycle = Permanent
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ pingCount.incrementAndGet
}
val pool2 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
self.lifeCycle = Permanent
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ pingCount.incrementAndGet
}
}).start()
}).start()
}).start()
val pool3 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with RoundRobinSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
self.lifeCycle = Temporary
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ pingCount.incrementAndGet
}
val pool3 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with RoundRobinSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
self.lifeCycle = Temporary
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ pingCount.incrementAndGet
}
}).start()
}).start()
}).start()
// default lifecycle
// actor comes back right away
pingCount.set(0)
keepDying = false
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! akka.Die
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(1)
// default lifecycle
// actor comes back right away
pingCount.set(0)
keepDying = false
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! akka.Die
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(1)
// default lifecycle
// actor dies completely
pingCount.set(0)
keepDying = true
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! akka.Die
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(2)
// default lifecycle
// actor dies completely
pingCount.set(0)
keepDying = true
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! akka.Die
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(2)
// permanent lifecycle
// actor comes back right away
pingCount.set(0)
keepDying = false
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool2 ! akka.Die
sleepFor(2 seconds)
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(1)
// permanent lifecycle
// actor comes back right away
pingCount.set(0)
keepDying = false
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool2 ! akka.Die
sleepFor(2 seconds)
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(1)
// permanent lifecycle
// actor dies completely
pingCount.set(0)
keepDying = true
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool2 ! akka.Die
sleepFor(2 seconds)
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(2)
// permanent lifecycle
// actor dies completely
pingCount.set(0)
keepDying = true
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool2 ! akka.Die
sleepFor(2 seconds)
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(2)
// temporary lifecycle
pingCount.set(0)
keepDying = false
pool3 ! "ping"
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool3 ! akka.Die
sleepFor(2 seconds)
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool3 ! "ping"
pool3 ! "ping"
pool3 ! "ping"
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(4)
// temporary lifecycle
pingCount.set(0)
keepDying = false
pool3 ! "ping"
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool3 ! akka.Die
sleepFor(2 seconds)
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool3 ! "ping"
pool3 ! "ping"
pool3 ! "ping"
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(4)
}
}
"support customizable supervision config of pooled actors" in {
import akka.config.Supervision._
val pingCount = new AtomicInteger(0)
val deathCount = new AtomicInteger(0)
var keepDying = false
filterEvents(EventFilter[IllegalStateException], EventFilter[RuntimeException]) {
import akka.config.Supervision._
val pingCount = new AtomicInteger(0)
val deathCount = new AtomicInteger(0)
var keepDying = false
trait LimitedTrapSupervisionConfig extends ActorPoolSupervisionConfig {
def poolFaultHandler = OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000)
}
trait LimitedTrapSupervisionConfig extends ActorPoolSupervisionConfig {
def poolFaultHandler = OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000)
}
object BadState
object BadState
val pool1 = actorOf(
new Actor with DefaultActorPool with LimitedTrapSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case BadState
if (keepDying) deathCount.incrementAndGet
throw new IllegalStateException
case akka.Die
throw new RuntimeException
case _ pingCount.incrementAndGet
}
val pool1 = actorOf(
new Actor with DefaultActorPool with LimitedTrapSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case BadState
if (keepDying) deathCount.incrementAndGet
throw new IllegalStateException
case akka.Die
throw new RuntimeException
case _ pingCount.incrementAndGet
}
}).start()
}).start()
}).start()
// actor comes back right away
pingCount.set(0)
keepDying = false
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! BadState
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(1)
// actor comes back right away
pingCount.set(0)
keepDying = false
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! BadState
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(1)
// actor dies completely
pingCount.set(0)
keepDying = true
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! BadState
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(2)
// actor dies completely
pingCount.set(0)
keepDying = true
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! BadState
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(2)
// kill it
intercept[RuntimeException](pool1.?(akka.Die).get)
// kill it
intercept[RuntimeException](pool1.?(akka.Die).get)
}
}
}
}
}

View file

@ -3,7 +3,7 @@ package akka
import akka.event.EventHandler
package object testkit {
def filterEvents[T](eventFilters: EventFilter*)(block: T): T = {
def filterEvents[T](eventFilters: Iterable[EventFilter])(block: T): T = {
EventHandler.notify(TestEvent.Mute(eventFilters.toSeq))
try {
block
@ -11,4 +11,6 @@ package object testkit {
EventHandler.notify(TestEvent.UnMute(eventFilters.toSeq))
}
}
def filterEvents[T](eventFilters: EventFilter*)(block: T): T = filterEvents(eventFilters.toSeq)(block)
}

View file

@ -158,34 +158,38 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac
}
"stop when sent a poison pill" in {
val a = TestActorRef[WorkerActor].start()
intercept[ActorKilledException] {
(a ? PoisonPill).get
filterEvents(EventFilter[ActorKilledException]) {
val a = TestActorRef[WorkerActor].start()
intercept[ActorKilledException] {
(a ? PoisonPill).get
}
a must not be ('running)
a must be('shutdown)
assertThread
}
a must not be ('running)
a must be('shutdown)
assertThread
}
"restart when Kill:ed" in {
counter = 2
filterEvents(EventFilter[ActorKilledException]) {
counter = 2
val boss = TestActorRef(new TActor {
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), Some(2), Some(1000))
val ref = TestActorRef(new TActor {
def receiveT = { case _ }
override def preRestart(reason: Throwable, msg: Option[Any]) { counter -= 1 }
override def postRestart(reason: Throwable) { counter -= 1 }
val boss = TestActorRef(new TActor {
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), Some(2), Some(1000))
val ref = TestActorRef(new TActor {
def receiveT = { case _ }
override def preRestart(reason: Throwable, msg: Option[Any]) { counter -= 1 }
override def postRestart(reason: Throwable) { counter -= 1 }
}).start()
self.dispatcher = CallingThreadDispatcher.global
self link ref
def receiveT = { case "sendKill" ref ! Kill }
}).start()
self.dispatcher = CallingThreadDispatcher.global
self link ref
def receiveT = { case "sendKill" ref ! Kill }
}).start()
boss ! "sendKill"
boss ! "sendKill"
counter must be(0)
assertThread
counter must be(0)
assertThread
}
}
"support futures" in {

View file

@ -228,7 +228,7 @@ object AkkaBuild extends Build {
lazy val camelSample = Project(
id = "akka-sample-camel",
base = file("akka-samples/akka-sample-camel"),
dependencies = Seq(actor, camelTyped),
dependencies = Seq(actor, camelTyped, testkit % "test"),
settings = defaultSettings ++ Seq(
ivyXML := Dependencies.sampleCamelXML,
libraryDependencies ++= Dependencies.sampleCamel