diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala index 9e70a39492..5846fae439 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala @@ -85,12 +85,14 @@ class ActorFireForgetRequestReplySpec extends WordSpec with MustMatchers with Be } "should shutdown crashed temporary actor" in { - val actor = actorOf[CrashingTemporaryActor].start() - actor.isRunning must be(true) - actor ! "Die" - state.finished.await - sleepFor(1 second) - actor.isShutdown must be(true) + filterEvents(EventFilter[Exception]("Expected")) { + val actor = actorOf[CrashingTemporaryActor].start() + actor.isRunning must be(true) + actor ! "Die" + state.finished.await + sleepFor(1 second) + actor.isShutdown must be(true) + } } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala index ea5b620057..c5f93f831f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala @@ -326,25 +326,27 @@ class ActorRefSpec extends WordSpec with MustMatchers { } "restart when Kill:ed" in { - val latch = new CountDownLatch(2) + filterException[ActorKilledException] { + val latch = new CountDownLatch(2) - val boss = Actor.actorOf(new Actor { - self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), scala.Some(2), scala.Some(1000)) + val boss = Actor.actorOf(new Actor { + self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), scala.Some(2), scala.Some(1000)) - val ref = Actor.actorOf( - new Actor { - def receive = { case _ ⇒ } - override def preRestart(reason: Throwable, msg: Option[Any]) = latch.countDown() - override def postRestart(reason: Throwable) = latch.countDown() - }).start() + val ref = Actor.actorOf( + new Actor { + def receive = { case _ ⇒ } + override def preRestart(reason: Throwable, msg: Option[Any]) = latch.countDown() + override def postRestart(reason: Throwable) = latch.countDown() + }).start() - self link ref + self link ref - protected def receive = { case "sendKill" ⇒ ref ! Kill } - }).start() + protected def receive = { case "sendKill" ⇒ ref ! Kill } + }).start() - boss ! "sendKill" - latch.await(5, TimeUnit.SECONDS) must be === true + boss ! "sendKill" + latch.await(5, TimeUnit.SECONDS) must be === true + } } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala index d2b9a42ee5..94e3b843cc 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala @@ -85,75 +85,85 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo ref.start() } + val expectedEvents = Seq(EventFilter[ActorKilledException], EventFilter[IllegalActorStateException]("expected")) + "An Actor restart" must { "invoke preRestart, preStart, postRestart" in { - val actor = newActor(new Restarter(testActor)) - expectMsg(1 second, ("preStart", 1)) - val supervisor = newActor(new Supervisor) - supervisor link actor - actor ! Kill - within(1 second) { - expectMsg(("preRestart", Some(Kill), 1)) - expectMsg(("preStart", 2)) - expectMsg(("postRestart", 2)) - expectNoMsg + filterEvents(expectedEvents) { + val actor = newActor(new Restarter(testActor)) + expectMsg(1 second, ("preStart", 1)) + val supervisor = newActor(new Supervisor) + supervisor link actor + actor ! Kill + within(1 second) { + expectMsg(("preRestart", Some(Kill), 1)) + expectMsg(("preStart", 2)) + expectMsg(("postRestart", 2)) + expectNoMsg + } } } "support creation of nested actors in freshInstance()" in { - val actor = newActor(new Restarter(testActor)) - expectMsg(1 second, ("preStart", 1)) - val supervisor = newActor(new Supervisor) - supervisor link actor - actor ! Nested - actor ! Kill - within(1 second) { - expectMsg(("preRestart", Some(Kill), 1)) - val (tActor, tRef) = expectMsgType[(Actor, TestActorRef[Actor])] - tRef.underlyingActor must be(tActor) - expectMsg((tActor, tRef)) - tRef.stop() - expectMsg(("preStart", 2)) - expectMsg(("postRestart", 2)) - expectNoMsg + filterEvents(expectedEvents) { + val actor = newActor(new Restarter(testActor)) + expectMsg(1 second, ("preStart", 1)) + val supervisor = newActor(new Supervisor) + supervisor link actor + actor ! Nested + actor ! Kill + within(1 second) { + expectMsg(("preRestart", Some(Kill), 1)) + val (tActor, tRef) = expectMsgType[(Actor, TestActorRef[Actor])] + tRef.underlyingActor must be(tActor) + expectMsg((tActor, tRef)) + tRef.stop() + expectMsg(("preStart", 2)) + expectMsg(("postRestart", 2)) + expectNoMsg + } } } "use freshInstance() if available" in { - val actor = newActor(new Restarter(testActor)) - expectMsg(1 second, ("preStart", 1)) - val supervisor = newActor(new Supervisor) - supervisor link actor - actor ! 42 - actor ! Handover - actor ! Kill - within(1 second) { - expectMsg(("preRestart", Some(Kill), 1)) - expectMsg(("preStart", 2)) - expectMsg(("postRestart", 2)) - expectNoMsg + filterEvents(expectedEvents) { + val actor = newActor(new Restarter(testActor)) + expectMsg(1 second, ("preStart", 1)) + val supervisor = newActor(new Supervisor) + supervisor link actor + actor ! 42 + actor ! Handover + actor ! Kill + within(1 second) { + expectMsg(("preRestart", Some(Kill), 1)) + expectMsg(("preStart", 2)) + expectMsg(("postRestart", 2)) + expectNoMsg + } + actor ! "get" + expectMsg(1 second, 42) } - actor ! "get" - expectMsg(1 second, 42) } "fall back to default factory if freshInstance() fails" in { - val actor = newActor(new Restarter(testActor)) - expectMsg(1 second, ("preStart", 1)) - val supervisor = newActor(new Supervisor) - supervisor link actor - actor ! 42 - actor ! Fail - actor ! Kill - within(1 second) { - expectMsg(("preRestart", Some(Kill), 1)) - expectMsg(("preStart", 2)) - expectMsg(("postRestart", 2)) - expectNoMsg + filterEvents(expectedEvents) { + val actor = newActor(new Restarter(testActor)) + expectMsg(1 second, ("preStart", 1)) + val supervisor = newActor(new Supervisor) + supervisor link actor + actor ! 42 + actor ! Fail + actor ! Kill + within(1 second) { + expectMsg(("preRestart", Some(Kill), 1)) + expectMsg(("preStart", 2)) + expectMsg(("postRestart", 2)) + expectNoMsg + } + actor ! "get" + expectMsg(1 second, 0) } - actor ! "get" - expectMsg(1 second, 0) } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/FSMActorSpec.scala index 028480ddbd..d93e652937 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/FSMActorSpec.scala @@ -8,7 +8,7 @@ import org.scalatest.{ WordSpec, BeforeAndAfterAll, BeforeAndAfterEach } import org.scalatest.matchers.MustMatchers import akka.testkit._ - +import TestEvent.{ Mute, UnMuteAll } import FSM._ import akka.util.Duration import akka.util.duration._ @@ -59,8 +59,8 @@ object FSMActorSpec { whenUnhandled { case Ev(msg) ⇒ { - unhandledLatch.open EventHandler.info(this, "unhandled event " + msg + " in state " + stateName + " with data " + stateData) + unhandledLatch.open stay } } @@ -112,12 +112,18 @@ class FSMActorSpec extends WordSpec with MustMatchers with TestKit with BeforeAn } override def beforeAll { + EventHandler notify Mute(EventFilter[EventHandler.EventHandlerException]("Next state 2 does not exist"), + EventFilter.custom { + case _: EventHandler.Debug ⇒ true + case _ ⇒ false + }) val f = FSM.getClass.getDeclaredField("debugEvent") f.setAccessible(true) f.setBoolean(FSM, true) } override def afterAll { + EventHandler notify UnMuteAll val f = FSM.getClass.getDeclaredField("debugEvent") f.setAccessible(true) f.setBoolean(FSM, false) @@ -151,8 +157,13 @@ class FSMActorSpec extends WordSpec with MustMatchers with TestKit with BeforeAn transitionCallBackLatch.await lockedLatch.await - lock ! "not_handled" - unhandledLatch.await + filterEvents(EventFilter.custom { + case EventHandler.Info(_: Lock, _) ⇒ true + case _ ⇒ false + }) { + lock ! "not_handled" + unhandledLatch.await + } val answerLatch = TestLatch() object Hello diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTransitionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTransitionSpec.scala index c552345192..2c825f9063 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTransitionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTransitionSpec.scala @@ -10,6 +10,7 @@ import akka.testkit._ import akka.testkit._ import akka.util.duration._ import akka.config.Supervision._ +import akka.event.EventHandler import FSM._ @@ -79,9 +80,14 @@ class FSMTransitionSpec extends WordSpec with MustMatchers with TestKit { val sup = Actor.actorOf[Supervisor].start() sup link fsm within(300 millis) { - fsm ! SubscribeTransitionCallBack(forward) - fsm ! "reply" - expectMsg("reply") + filterEvents(EventFilter.custom { + case EventHandler.Warning(_: MyFSM, _) ⇒ true + case _ ⇒ false + }) { + fsm ! SubscribeTransitionCallBack(forward) + fsm ! "reply" + expectMsg("reply") + } forward.start() fsm ! SubscribeTransitionCallBack(forward) expectMsg(CurrentState(fsm, 0)) diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/LoggingReceiveSpec.scala index 02c5b13974..b21584dfc0 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/LoggingReceiveSpec.scala @@ -5,7 +5,7 @@ package akka.actor import org.scalatest.{ WordSpec, BeforeAndAfterAll, BeforeAndAfterEach } import org.scalatest.matchers.MustMatchers -import akka.testkit.{ TestKit, TestActorRef } +import akka.testkit.{ TestKit, TestActorRef, EventFilter, TestEvent } import akka.event.EventHandler import Actor._ import akka.util.duration._ @@ -22,6 +22,11 @@ class LoggingReceiveSpec val level = EventHandler.level override def beforeAll { + EventHandler.notify(TestEvent.Mute(EventFilter[UnhandledMessageException], + EventFilter[ActorKilledException], EventFilter.custom { + case d: EventHandler.Debug ⇒ true + case _ ⇒ false + })) EventHandler.addListener(testActor) EventHandler.level = EventHandler.DebugLevel } @@ -29,6 +34,7 @@ class LoggingReceiveSpec override def afterAll { EventHandler.removeListener(testActor) EventHandler.level = level + EventHandler.notify(TestEvent.UnMuteAll) } override def afterEach { diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala index 48f2785d01..66f9a09f71 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala @@ -14,6 +14,7 @@ import akka.util.Duration import akka.dispatch.{ Dispatchers, Future, KeptPromise } import java.util.concurrent.atomic.AtomicReference import annotation.tailrec +import akka.testkit.{ EventFilter, filterEvents } object TypedActorSpec { @@ -172,9 +173,11 @@ class TypedActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach } "throw an IllegalStateExcpetion when TypedActor.self is called in the wrong scope" in { - (intercept[IllegalStateException] { - TypedActor.self[Foo] - }).getMessage must equal("Calling TypedActor.self outside of a TypedActor implementation method!") + filterEvents(EventFilter[IllegalStateException]("Calling")) { + (intercept[IllegalStateException] { + TypedActor.self[Foo] + }).getMessage must equal("Calling TypedActor.self outside of a TypedActor implementation method!") + } } "have access to itself when executing a method call" in { @@ -259,27 +262,29 @@ class TypedActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach } "be able to handle exceptions when calling methods" in { - val t = newFooBar + filterEvents(EventFilter[IllegalStateException]("expected")) { + val t = newFooBar - t.incr() - t.failingPigdog() - t.read() must be(1) //Make sure state is not reset after failure + t.incr() + t.failingPigdog() + t.read() must be(1) //Make sure state is not reset after failure - t.failingFuturePigdog.await.exception.get.getMessage must be("expected") - t.read() must be(1) //Make sure state is not reset after failure + t.failingFuturePigdog.await.exception.get.getMessage must be("expected") + t.read() must be(1) //Make sure state is not reset after failure - (intercept[IllegalStateException] { - t.failingJOptionPigdog - }).getMessage must be("expected") - t.read() must be(1) //Make sure state is not reset after failure + (intercept[IllegalStateException] { + t.failingJOptionPigdog + }).getMessage must be("expected") + t.read() must be(1) //Make sure state is not reset after failure - (intercept[IllegalStateException] { - t.failingOptionPigdog - }).getMessage must be("expected") + (intercept[IllegalStateException] { + t.failingOptionPigdog + }).getMessage must be("expected") - t.read() must be(1) //Make sure state is not reset after failure + t.read() must be(1) //Make sure state is not reset after failure - mustStop(t) + mustStop(t) + } } "be able to support stacked traits for the interface part" in { diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorMiscSpec.scala index 26e718b86c..4d7731d883 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorMiscSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorMiscSpec.scala @@ -8,72 +8,75 @@ import org.scalatest.matchers.MustMatchers import akka.dispatch.Dispatchers import akka.config.Supervision.{ SupervisorConfig, OneForOneStrategy, Supervise, Permanent } import java.util.concurrent.CountDownLatch +import akka.testkit.{ filterEvents, EventFilter } class SupervisorMiscSpec extends WordSpec with MustMatchers { "A Supervisor" should { "restart a crashing actor and its dispatcher for any dispatcher" in { - val countDownLatch = new CountDownLatch(4) + filterEvents(EventFilter[Exception]("killed")) { + val countDownLatch = new CountDownLatch(4) - val actor1 = Actor.actorOf(new Actor { - self.dispatcher = Dispatchers.newPinnedDispatcher(self) - override def postRestart(cause: Throwable) { countDownLatch.countDown() } + val actor1 = Actor.actorOf(new Actor { + self.dispatcher = Dispatchers.newPinnedDispatcher(self) + override def postRestart(cause: Throwable) { countDownLatch.countDown() } - protected def receive = { - case "kill" ⇒ throw new Exception("killed") - case _ ⇒ println("received unknown message") - } - }).start() + protected def receive = { + case "kill" ⇒ throw new Exception("killed") + case _ ⇒ println("received unknown message") + } + }).start() - val actor2 = Actor.actorOf(new Actor { - self.dispatcher = Dispatchers.newPinnedDispatcher(self) - override def postRestart(cause: Throwable) { countDownLatch.countDown() } + val actor2 = Actor.actorOf(new Actor { + self.dispatcher = Dispatchers.newPinnedDispatcher(self) + override def postRestart(cause: Throwable) { countDownLatch.countDown() } - protected def receive = { - case "kill" ⇒ throw new Exception("killed") - case _ ⇒ println("received unknown message") - } - }).start() + protected def receive = { + case "kill" ⇒ throw new Exception("killed") + case _ ⇒ println("received unknown message") + } + }).start() - val actor3 = Actor.actorOf(new Actor { - self.dispatcher = Dispatchers.newDispatcher("test").build - override def postRestart(cause: Throwable) { countDownLatch.countDown() } + val actor3 = Actor.actorOf(new Actor { + self.dispatcher = Dispatchers.newDispatcher("test").build + override def postRestart(cause: Throwable) { countDownLatch.countDown() } - protected def receive = { - case "kill" ⇒ throw new Exception("killed") - case _ ⇒ println("received unknown message") - } - }).start() + protected def receive = { + case "kill" ⇒ throw new Exception("killed") + case _ ⇒ println("received unknown message") + } + }).start() - val actor4 = Actor.actorOf(new Actor { - self.dispatcher = Dispatchers.newPinnedDispatcher(self) - override def postRestart(cause: Throwable) { countDownLatch.countDown() } + val actor4 = Actor.actorOf(new Actor { + self.dispatcher = Dispatchers.newPinnedDispatcher(self) + override def postRestart(cause: Throwable) { countDownLatch.countDown() } - protected def receive = { - case "kill" ⇒ throw new Exception("killed") - case _ ⇒ println("received unknown message") - } - }).start() + protected def receive = { + case "kill" ⇒ throw new Exception("killed") + case _ ⇒ println("received unknown message") + } + }).start() - val sup = Supervisor( - SupervisorConfig( - OneForOneStrategy(List(classOf[Exception]), 3, 5000), - Supervise(actor1, Permanent) :: - Supervise(actor2, Permanent) :: - Supervise(actor3, Permanent) :: - Supervise(actor4, Permanent) :: - Nil)) + val sup = Supervisor( + SupervisorConfig( + OneForOneStrategy(List(classOf[Exception]), 3, 5000), + Supervise(actor1, Permanent) :: + Supervise(actor2, Permanent) :: + Supervise(actor3, Permanent) :: + Supervise(actor4, Permanent) :: + Nil)) - actor1 ! "kill" - actor2 ! "kill" - actor3 ! "kill" - actor4 ! "kill" + actor1 ! "kill" + actor2 ! "kill" + actor3 ! "kill" + actor4 ! "kill" - countDownLatch.await() - assert(!actor1.isShutdown, "actor1 is shutdown") - assert(!actor2.isShutdown, "actor2 is shutdown") - assert(!actor3.isShutdown, "actor3 is shutdown") - assert(!actor4.isShutdown, "actor4 is shutdown") + countDownLatch.await() + assert(!actor1.isShutdown, "actor1 is shutdown") + assert(!actor2.isShutdown, "actor2 is shutdown") + assert(!actor3.isShutdown, "actor3 is shutdown") + assert(!actor4.isShutdown, "actor4 is shutdown") + } } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala index dcdcb47a98..dc5b20d325 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala @@ -70,6 +70,7 @@ object SupervisorSpec { override def receive = { case Die ⇒ (temp.?(Die, TimeoutMillis)).get + case _: MaximumNumberOfRestartsWithinTimeRangeReached ⇒ } } @@ -200,7 +201,8 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach override def beforeAll() = { EventHandler notify Mute(EventFilter[Exception]("Die"), - EventFilter[IllegalStateException]("Don't wanna!")) + EventFilter[IllegalStateException]("Don't wanna!"), + EventFilter[RuntimeException]("Expected")) } override def afterAll() = { diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala index 4c5deb8b51..86591fd2c8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala @@ -8,6 +8,7 @@ import org.scalatest.matchers.MustMatchers import akka.util.duration._ import akka.testkit.Testing.sleepFor +import akka.testkit.{ EventFilter, filterEvents, filterException } import akka.dispatch.Dispatchers import akka.config.Supervision.{ SupervisorConfig, OneForOneStrategy, Supervise, Permanent } import Actor._ @@ -33,15 +34,17 @@ class SupervisorTreeSpec extends WordSpec with MustMatchers { "In a 3 levels deep supervisor tree (linked in the constructor) we" must { "be able to kill the middle actor and see itself and its child restarted" in { - log = "INIT" + filterException[Exception] { + log = "INIT" - val lastActor = actorOf(new Chainer, "lastActor").start - val middleActor = actorOf(new Chainer(Some(lastActor)), "middleActor").start - val headActor = actorOf(new Chainer(Some(middleActor)), "headActor").start + val lastActor = actorOf(new Chainer, "lastActor").start + val middleActor = actorOf(new Chainer(Some(lastActor)), "middleActor").start + val headActor = actorOf(new Chainer(Some(middleActor)), "headActor").start - middleActor ! Die - sleepFor(500 millis) - log must equal("INITmiddleActorlastActor") + middleActor ! Die + sleepFor(500 millis) + log must equal("INITmiddleActorlastActor") + } } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala index 3596c96b40..d10e6f1052 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala @@ -7,7 +7,7 @@ import java.util.concurrent.{ CountDownLatch, TimeUnit } import akka.actor._ import akka.config.Supervision._ - +import akka.testkit.{ filterEvents, EventFilter } import org.scalatest.{ BeforeAndAfterAll, WordSpec } import org.scalatest.matchers.MustMatchers @@ -23,30 +23,33 @@ class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll { "A supervised actor with lifecycle PERMANENT" should { "be able to reply on failure during preRestart" in { + filterEvents(EventFilter[Exception]("test")) { + val latch = new CountDownLatch(1) + val sender = Actor.actorOf(new Sender(latch)).start() - val latch = new CountDownLatch(1) - val sender = Actor.actorOf(new Sender(latch)).start() + val supervised = Actor.actorOf[Supervised] + val supervisor = Supervisor(SupervisorConfig( + AllForOneStrategy(List(classOf[Exception]), 5, 10000), + Supervise(supervised, Permanent) :: Nil)) - val supervised = Actor.actorOf[Supervised] - val supervisor = Supervisor(SupervisorConfig( - AllForOneStrategy(List(classOf[Exception]), 5, 10000), - Supervise(supervised, Permanent) :: Nil)) - - supervised.!("test")(Some(sender)) - latch.await(5, TimeUnit.SECONDS) must be(true) + supervised.!("test")(Some(sender)) + latch.await(5, TimeUnit.SECONDS) must be(true) + } } "be able to reply on failure during postStop" in { - val latch = new CountDownLatch(1) - val sender = Actor.actorOf(new Sender(latch)).start() + filterEvents(EventFilter[Exception]("test")) { + val latch = new CountDownLatch(1) + val sender = Actor.actorOf(new Sender(latch)).start() - val supervised = Actor.actorOf[Supervised] - val supervisor = Supervisor(SupervisorConfig( - AllForOneStrategy(List(classOf[Exception]), 5, 10000), - Supervise(supervised, Temporary) :: Nil)) + val supervised = Actor.actorOf[Supervised] + val supervisor = Supervisor(SupervisorConfig( + AllForOneStrategy(List(classOf[Exception]), 5, 10000), + Supervise(supervised, Temporary) :: Nil)) - supervised.!("test")(Some(sender)) - latch.await(5, TimeUnit.SECONDS) must be(true) + supervised.!("test")(Some(sender)) + latch.await(5, TimeUnit.SECONDS) must be(true) + } } } } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala index 3fa4d16f2e..aeb219f1cd 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -5,7 +5,7 @@ package akka.actor.dispatch import org.scalatest.junit.JUnitSuite import org.scalatest.Assertions._ -import akka.testkit.Testing +import akka.testkit.{ Testing, filterEvents, EventFilter } import akka.dispatch._ import akka.actor.Actor._ import java.util.concurrent.atomic.AtomicLong @@ -330,15 +330,17 @@ abstract class ActorModelSpec extends JUnitSuite { @Test def dispatcherShouldSuspendAndResumeAFailingNonSupervisedPermanentActor { - implicit val dispatcher = newInterceptedDispatcher - val a = newTestActor.start() - val done = new CountDownLatch(1) - a ! Restart - a ! CountDown(done) - assertCountDown(done, Testing.testTime(3000), "Should be suspended+resumed and done with next message within 3 seconds") - a.stop() - assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 2, - msgsProcessed = 2, suspensions = 1, resumes = 1) + filterEvents(EventFilter[Exception]("Restart")) { + implicit val dispatcher = newInterceptedDispatcher + val a = newTestActor.start() + val done = new CountDownLatch(1) + a ! Restart + a ! CountDown(done) + assertCountDown(done, Testing.testTime(3000), "Should be suspended+resumed and done with next message within 3 seconds") + a.stop() + assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 2, + msgsProcessed = 2, suspensions = 1, resumes = 1) + } } @Test @@ -397,48 +399,52 @@ abstract class ActorModelSpec extends JUnitSuite { @Test def dispatcherShouldContinueToProcessMessagesWhenAThreadGetsInterrupted { - implicit val dispatcher = newInterceptedDispatcher - val a = newTestActor.start() - val f1 = a ? Reply("foo") - val f2 = a ? Reply("bar") - val f3 = a ? Interrupt - val f4 = a ? Reply("foo2") - val f5 = a ? Interrupt - val f6 = a ? Reply("bar2") + filterEvents(EventFilter[InterruptedException]("Ping!"), EventFilter[akka.event.EventHandler.EventHandlerException]) { + implicit val dispatcher = newInterceptedDispatcher + val a = newTestActor.start() + val f1 = a ? Reply("foo") + val f2 = a ? Reply("bar") + val f3 = a ? Interrupt + val f4 = a ? Reply("foo2") + val f5 = a ? Interrupt + val f6 = a ? Reply("bar2") - assert(f1.get === "foo") - assert(f2.get === "bar") - assert((intercept[InterruptedException] { - f3.get - }).getMessage === "Ping!") - assert(f4.get === "foo2") - assert((intercept[InterruptedException] { - f5.get - }).getMessage === "Ping!") - assert(f6.get === "bar2") + assert(f1.get === "foo") + assert(f2.get === "bar") + assert((intercept[InterruptedException] { + f3.get + }).getMessage === "Ping!") + assert(f4.get === "foo2") + assert((intercept[InterruptedException] { + f5.get + }).getMessage === "Ping!") + assert(f6.get === "bar2") + } } @Test def dispatcherShouldContinueToProcessMessagesWhenExceptionIsThrown { - implicit val dispatcher = newInterceptedDispatcher - val a = newTestActor.start() - val f1 = a ? Reply("foo") - val f2 = a ? Reply("bar") - val f3 = a ? new ThrowException(new IndexOutOfBoundsException("IndexOutOfBoundsException")) - val f4 = a ? Reply("foo2") - val f5 = a ? new ThrowException(new RemoteException("RemoteException")) - val f6 = a ? Reply("bar2") + filterEvents(EventFilter[IndexOutOfBoundsException], EventFilter[RemoteException]) { + implicit val dispatcher = newInterceptedDispatcher + val a = newTestActor.start() + val f1 = a ? Reply("foo") + val f2 = a ? Reply("bar") + val f3 = a ? new ThrowException(new IndexOutOfBoundsException("IndexOutOfBoundsException")) + val f4 = a ? Reply("foo2") + val f5 = a ? new ThrowException(new RemoteException("RemoteException")) + val f6 = a ? Reply("bar2") - assert(f1.get === "foo") - assert(f2.get === "bar") - assert((intercept[IndexOutOfBoundsException] { - f3.get - }).getMessage === "IndexOutOfBoundsException") - assert(f4.get === "foo2") - assert((intercept[RemoteException] { - f5.get - }).getMessage === "RemoteException") - assert(f6.get === "bar2") + assert(f1.get === "foo") + assert(f2.get === "bar") + assert((intercept[IndexOutOfBoundsException] { + f3.get + }).getMessage === "IndexOutOfBoundsException") + assert(f4.get === "foo2") + assert((intercept[RemoteException] { + f5.get + }).getMessage === "RemoteException") + assert(f6.get === "bar2") + } } } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/DispatcherActorSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/DispatcherActorSpec.scala index 75f984065c..2d5b3ecb16 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/DispatcherActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/DispatcherActorSpec.scala @@ -7,6 +7,7 @@ import akka.dispatch.{ Dispatchers, Dispatcher } import akka.actor.Actor import Actor._ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } +import akka.testkit.{ filterEvents, EventFilter } object DispatcherActorSpec { class TestActor extends Actor { @@ -35,7 +36,7 @@ class DispatcherActorSpec extends JUnitSuite { private val unit = TimeUnit.MILLISECONDS @Test - def shouldSendOneWay = { + def shouldTell = { val actor = actorOf[OneWayTestActor].start() val result = actor ! "OneWay" assert(OneWayTestActor.oneWay.await(1, TimeUnit.SECONDS)) @@ -60,15 +61,17 @@ class DispatcherActorSpec extends JUnitSuite { @Test def shouldSendReceiveException = { - val actor = actorOf[TestActor].start() - try { - (actor ? "Failure").get - fail("Should have thrown an exception") - } catch { - case e ⇒ - assert("Expected exception; to test fault-tolerance" === e.getMessage()) + filterEvents(EventFilter[RuntimeException]("Expected")) { + val actor = actorOf[TestActor].start() + try { + (actor ? "Failure").get + fail("Should have thrown an exception") + } catch { + case e ⇒ + assert("Expected exception; to test fault-tolerance" === e.getMessage()) + } + actor.stop() } - actor.stop() } @Test diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 8811f1a7da..b8e855272a 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -12,9 +12,7 @@ import org.scalacheck.Gen._ import akka.actor.{ Actor, ActorRef } import Actor._ -import akka.event.EventHandler -import akka.testkit.TestEvent._ -import akka.testkit.EventFilter +import akka.testkit.{ EventFilter, filterEvents, filterException } import org.multiverse.api.latches.StandardLatch import java.util.concurrent.{ TimeUnit, CountDownLatch } @@ -47,14 +45,6 @@ class JavaFutureSpec extends JavaFutureTests with JUnitSuite class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAndAfterAll { import FutureSpec._ - override def beforeAll() { - EventHandler.notify(Mute(EventFilter[RuntimeException])) - } - - override def afterAll() { - EventHandler.notify(UnMuteAll) - } - "A Promise" when { "never completed" must { behave like emptyFuture(_(Promise())) @@ -108,17 +98,19 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } "has actions applied" must { "pass checks" in { - check({ (future: Future[Int], actions: List[FutureAction]) ⇒ - val result = (future /: actions)(_ /: _) - val expected = (future.await.value.get /: actions)(_ /: _) - ((result.await.value.get, expected) match { - case (Right(a), Right(b)) ⇒ a == b - case (Left(a), Left(b)) if a.toString == b.toString ⇒ true - case (Left(a), Left(b)) if a.getStackTrace.isEmpty || b.getStackTrace.isEmpty ⇒ - a.getClass.toString == b.getClass.toString - case _ ⇒ false - }) :| result.value.get.toString + " is expected to be " + expected.toString - }, minSuccessful(10000), workers(4)) + filterException[ArithmeticException] { + check({ (future: Future[Int], actions: List[FutureAction]) ⇒ + val result = (future /: actions)(_ /: _) + val expected = (future.await.value.get /: actions)(_ /: _) + ((result.await.value.get, expected) match { + case (Right(a), Right(b)) ⇒ a == b + case (Left(a), Left(b)) if a.toString == b.toString ⇒ true + case (Left(a), Left(b)) if a.getStackTrace.isEmpty || b.getStackTrace.isEmpty ⇒ + a.getClass.toString == b.getClass.toString + case _ ⇒ false + }) :| result.value.get.toString + " is expected to be " + expected.toString + }, minSuccessful(10000), workers(4)) + } } } } @@ -136,12 +128,14 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } "throws an exception" must { behave like futureWithException[RuntimeException] { test ⇒ - val actor = actorOf[TestActor] - actor.start() - val future = actor ? "Failure" - future.await - test(future, "Expected exception; to test fault-tolerance") - actor.stop() + filterException[RuntimeException] { + val actor = actorOf[TestActor] + actor.start() + val future = actor ? "Failure" + future.await + test(future, "Expected exception; to test fault-tolerance") + actor.stop() + } } } } @@ -160,13 +154,15 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } "will throw an exception" must { behave like futureWithException[ArithmeticException] { test ⇒ - val actor1 = actorOf[TestActor].start() - val actor2 = actorOf(new Actor { def receive = { case s: String ⇒ self reply (s.length / 0) } }).start() - val future = actor1 ? "Hello" flatMap { _ match { case s: String ⇒ actor2 ? s } } - future.await - test(future, "/ by zero") - actor1.stop() - actor2.stop() + filterException[ArithmeticException] { + val actor1 = actorOf[TestActor].start() + val actor2 = actorOf(new Actor { def receive = { case s: String ⇒ self reply (s.length / 0) } }).start() + val future = actor1 ? "Hello" flatMap { _ match { case s: String ⇒ actor2 ? s } } + future.await + test(future, "/ by zero") + actor1.stop() + actor2.stop() + } } } } @@ -174,102 +170,109 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd "being tested" must { "compose with for-comprehensions" in { - val actor = actorOf(new Actor { - def receive = { - case s: String ⇒ self reply s.length - case i: Int ⇒ self reply (i * 2).toString - } - }).start() + filterException[ClassCastException] { + val actor = actorOf(new Actor { + def receive = { + case s: String ⇒ self reply s.length + case i: Int ⇒ self reply (i * 2).toString + } + }).start() - val future0 = actor ? "Hello" + val future0 = actor ? "Hello" - val future1 = for { - a: Int ← future0.mapTo[Int] // returns 5 - b: String ← (actor ? a).mapTo[String] // returns "10" - c: String ← (actor ? 7).mapTo[String] // returns "14" - } yield b + "-" + c + val future1 = for { + a ← future0.mapTo[Int] // returns 5 + b ← (actor ? a).mapTo[String] // returns "10" + c ← (actor ? 7).mapTo[String] // returns "14" + } yield b + "-" + c - val future2 = for { - a: Int ← future0.mapTo[Int] - b: Int ← (actor ? a).mapTo[Int] - c: String ← (actor ? 7).mapTo[String] - } yield b + "-" + c + val future2 = for { + a ← future0.mapTo[Int] + b ← (actor ? a).mapTo[Int] + c ← (actor ? 7).mapTo[String] + } yield b + "-" + c - future1.get must be("10-14") - intercept[ClassCastException] { future2.get } - actor.stop() + future1.get must be("10-14") + assert(checkType(future1, manifest[String])) + intercept[ClassCastException] { future2.get } + actor.stop() + } } "support pattern matching within a for-comprehension" in { - case class Req[T](req: T) - case class Res[T](res: T) - val actor = actorOf(new Actor { - def receive = { - case Req(s: String) ⇒ self reply Res(s.length) - case Req(i: Int) ⇒ self reply Res((i * 2).toString) - } - }).start() + filterException[MatchError] { + case class Req[T](req: T) + case class Res[T](res: T) + val actor = actorOf(new Actor { + def receive = { + case Req(s: String) ⇒ self reply Res(s.length) + case Req(i: Int) ⇒ self reply Res((i * 2).toString) + } + }).start() - val future1 = for { - Res(a: Int) ← actor ? Req("Hello") - Res(b: String) ← actor ? Req(a) - Res(c: String) ← actor ? Req(7) - } yield b + "-" + c + val future1 = for { + Res(a: Int) ← actor ? Req("Hello") + Res(b: String) ← actor ? Req(a) + Res(c: String) ← actor ? Req(7) + } yield b + "-" + c - val future2 = for { - Res(a: Int) ← actor ? Req("Hello") - Res(b: Int) ← actor ? Req(a) - Res(c: Int) ← actor ? Req(7) - } yield b + "-" + c + val future2 = for { + Res(a: Int) ← actor ? Req("Hello") + Res(b: Int) ← actor ? Req(a) + Res(c: Int) ← actor ? Req(7) + } yield b + "-" + c - future1.get must be("10-14") - intercept[MatchError] { future2.get } - actor.stop() + future1.get must be("10-14") + intercept[MatchError] { future2.get } + actor.stop() + } } "recover from exceptions" in { - val future1 = Future(5) - val future2 = future1 map (_ / 0) - val future3 = future2 map (_.toString) + filterException[RuntimeException] { + val future1 = Future(5) + val future2 = future1 map (_ / 0) + val future3 = future2 map (_.toString) - val future4 = future1 recover { - case e: ArithmeticException ⇒ 0 - } map (_.toString) + val future4 = future1 recover { + case e: ArithmeticException ⇒ 0 + } map (_.toString) - val future5 = future2 recover { - case e: ArithmeticException ⇒ 0 - } map (_.toString) + val future5 = future2 recover { + case e: ArithmeticException ⇒ 0 + } map (_.toString) - val future6 = future2 recover { - case e: MatchError ⇒ 0 - } map (_.toString) + val future6 = future2 recover { + case e: MatchError ⇒ 0 + } map (_.toString) - val future7 = future3 recover { case e: ArithmeticException ⇒ "You got ERROR" } + val future7 = future3 recover { case e: ArithmeticException ⇒ "You got ERROR" } - val actor = actorOf[TestActor].start() + val actor = actorOf[TestActor].start() - val future8 = actor ? "Failure" - val future9 = actor ? "Failure" recover { - case e: RuntimeException ⇒ "FAIL!" + val future8 = actor ? "Failure" + val future9 = actor ? "Failure" recover { + case e: RuntimeException ⇒ "FAIL!" + } + val future10 = actor ? "Hello" recover { + case e: RuntimeException ⇒ "FAIL!" + } + val future11 = actor ? "Failure" recover { case _ ⇒ "Oops!" } + + future1.get must be(5) + intercept[ArithmeticException] { future2.get } + intercept[ArithmeticException] { future3.get } + future4.get must be("5") + future5.get must be("0") + intercept[ArithmeticException] { future6.get } + future7.get must be("You got ERROR") + intercept[RuntimeException] { future8.get } + future9.get must be("FAIL!") + future10.get must be("World") + future11.get must be("Oops!") + + actor.stop() } - val future10 = actor ? "Hello" recover { - case e: RuntimeException ⇒ "FAIL!" - } - val future11 = actor ? "Failure" recover { case _ ⇒ "Oops!" } - - future1.get must be(5) - intercept[ArithmeticException] { future2.get } - intercept[ArithmeticException] { future3.get } - future4.get must be("5") - future5.get must be("0") - intercept[ArithmeticException] { future6.get } - future7.get must be("You got ERROR") - intercept[RuntimeException] { future8.get } - future9.get must be("FAIL!") - future10.get must be("World") - future11.get must be("Oops!") - - actor.stop() } "fold" in { @@ -294,19 +297,21 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } "fold with an exception" in { - val actors = (1 to 10).toList map { _ ⇒ - actorOf(new Actor { - def receive = { - case (add: Int, wait: Int) ⇒ - Thread.sleep(wait) - if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected") - self tryReply add - } - }).start() + filterException[IllegalArgumentException] { + val actors = (1 to 10).toList map { _ ⇒ + actorOf(new Actor { + def receive = { + case (add: Int, wait: Int) ⇒ + Thread.sleep(wait) + if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected") + self tryReply add + } + }).start() + } + val timeout = 10000 + def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] } + Futures.fold(0, timeout)(futures)(_ + _).await.exception.get.getMessage must be("shouldFoldResultsWithException: expected") } - val timeout = 10000 - def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] } - Futures.fold(0, timeout)(futures)(_ + _).await.exception.get.getMessage must be("shouldFoldResultsWithException: expected") } /* @Test @@ -341,23 +346,27 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } "shouldReduceResultsWithException" in { - val actors = (1 to 10).toList map { _ ⇒ - actorOf(new Actor { - def receive = { - case (add: Int, wait: Int) ⇒ - Thread.sleep(wait) - if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected") - self tryReply add - } - }).start() + filterException[IllegalArgumentException] { + val actors = (1 to 10).toList map { _ ⇒ + actorOf(new Actor { + def receive = { + case (add: Int, wait: Int) ⇒ + Thread.sleep(wait) + if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected") + self tryReply add + } + }).start() + } + val timeout = 10000 + def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] } + assert(Futures.reduce(futures, timeout)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected") } - val timeout = 10000 - def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] } - assert(Futures.reduce(futures, timeout)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected") } "shouldReduceThrowIAEOnEmptyInput" in { - intercept[UnsupportedOperationException] { Futures.reduce(List[Future[Int]]())(_ + _).get } + filterException[IllegalArgumentException] { + intercept[UnsupportedOperationException] { Futures.reduce(List[Future[Int]]())(_ + _).get } + } } "receiveShouldExecuteOnComplete" in { @@ -389,28 +398,30 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd "shouldHandleThrowables" in { class ThrowableTest(m: String) extends Throwable(m) - val f1 = Future { throw new ThrowableTest("test") } - f1.await - intercept[ThrowableTest] { f1.get } + filterException[ThrowableTest] { + val f1 = Future { throw new ThrowableTest("test") } + f1.await + intercept[ThrowableTest] { f1.get } - val latch = new StandardLatch - val f2 = Future { latch.tryAwait(5, TimeUnit.SECONDS); "success" } - f2 foreach (_ ⇒ throw new ThrowableTest("dispatcher foreach")) - f2 onResult { case _ ⇒ throw new ThrowableTest("dispatcher receive") } - val f3 = f2 map (s ⇒ s.toUpperCase) - latch.open - f2.await - assert(f2.get === "success") - f2 foreach (_ ⇒ throw new ThrowableTest("current thread foreach")) - f2 onResult { case _ ⇒ throw new ThrowableTest("current thread receive") } - f3.await - assert(f3.get === "SUCCESS") + val latch = new StandardLatch + val f2 = Future { latch.tryAwait(5, TimeUnit.SECONDS); "success" } + f2 foreach (_ ⇒ throw new ThrowableTest("dispatcher foreach")) + f2 onResult { case _ ⇒ throw new ThrowableTest("dispatcher receive") } + val f3 = f2 map (s ⇒ s.toUpperCase) + latch.open + f2.await + assert(f2.get === "success") + f2 foreach (_ ⇒ throw new ThrowableTest("current thread foreach")) + f2 onResult { case _ ⇒ throw new ThrowableTest("current thread receive") } + f3.await + assert(f3.get === "SUCCESS") - // give time for all callbacks to execute - Thread sleep 100 + // give time for all callbacks to execute + Thread sleep 100 - // make sure all futures are completed in dispatcher - assert(Dispatchers.defaultGlobalDispatcher.pendingTasks === 0) + // make sure all futures are completed in dispatcher + assert(Dispatchers.defaultGlobalDispatcher.tasks === 0) + } } "shouldBlockUntilResult" in { @@ -424,8 +435,10 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd assert(f2.get === 10) val f3 = Future({ Thread.sleep(10); 5 }, 10) - intercept[FutureTimeoutException] { - f3.get + filterException[FutureTimeoutException] { + intercept[FutureTimeoutException] { + f3.get + } } } @@ -445,40 +458,46 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } "futureComposingWithContinuationsFailureDivideZero" in { - import Future.flow + filterException[ArithmeticException] { + import Future.flow - val x = Future("Hello") - val y = x map (_.length) + val x = Future("Hello") + val y = x map (_.length) - val r = flow(x() + " " + y.map(_ / 0).map(_.toString).apply, 100) + val r = flow(x() + " " + y.map(_ / 0).map(_.toString).apply, 100) - intercept[java.lang.ArithmeticException](r.get) + intercept[java.lang.ArithmeticException](r.get) + } } "futureComposingWithContinuationsFailureCastInt" in { - import Future.flow + filterException[ClassCastException] { + import Future.flow - val actor = actorOf[TestActor].start + val actor = actorOf[TestActor].start - val x = Future(3) - val y = (actor ? "Hello").mapTo[Int] + val x = Future(3) + val y = (actor ? "Hello").mapTo[Int] - val r = flow(x() + y(), 100) + val r = flow(x() + y(), 100) - intercept[ClassCastException](r.get) + intercept[ClassCastException](r.get) + } } "futureComposingWithContinuationsFailureCastNothing" in { - import Future.flow + filterException[ClassCastException] { + import Future.flow - val actor = actorOf[TestActor].start + val actor = actorOf[TestActor].start - val x = Future("Hello") - val y = actor ? "Hello" mapTo manifest[Nothing] + val x = Future("Hello") + val y = actor ? "Hello" mapTo manifest[Nothing] - val r = flow(x() + y()) + val r = flow(x() + y()) - intercept[ClassCastException](r.get) + intercept[ClassCastException](r.get) + } } "futureCompletingWithContinuations" in { @@ -522,7 +541,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd Thread.sleep(100) // make sure all futures are completed in dispatcher - assert(Dispatchers.defaultGlobalDispatcher.pendingTasks === 0) + assert(Dispatchers.defaultGlobalDispatcher.tasks === 0) } "shouldNotAddOrRunCallbacksAfterFailureToBeCompletedBeforeExpiry" in { @@ -615,29 +634,31 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } "futureCompletingWithContinuationsFailure" in { - import Future.flow + filterException[ArithmeticException] { + import Future.flow - val x, y, z = Promise[Int]() - val ly, lz = new StandardLatch + val x, y, z = Promise[Int]() + val ly, lz = new StandardLatch - val result = flow { - y << x - ly.open - val oops = 1 / 0 - z << x - lz.open - z() + y() + oops + val result = flow { + y << x + ly.open + val oops = 1 / 0 + z << x + lz.open + z() + y() + oops + } + + assert(!ly.tryAwaitUninterruptible(100, TimeUnit.MILLISECONDS)) + assert(!lz.tryAwaitUninterruptible(100, TimeUnit.MILLISECONDS)) + + flow { x << 5 } + + assert(y.get === 5) + intercept[java.lang.ArithmeticException](result.get) + assert(z.value === None) + assert(!lz.isOpen) } - - assert(!ly.tryAwaitUninterruptible(100, TimeUnit.MILLISECONDS)) - assert(!lz.tryAwaitUninterruptible(100, TimeUnit.MILLISECONDS)) - - flow { x << 5 } - - assert(y.get === 5) - intercept[java.lang.ArithmeticException](result.get) - assert(z.value === None) - assert(!lz.isOpen) } "futureContinuationsShouldNotBlock" in { @@ -663,8 +684,6 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd "futureFlowShouldBeTypeSafe" in { import Future.flow - def checkType[A: Manifest, B](in: Future[A], refmanifest: Manifest[B]): Boolean = manifest[A] == refmanifest - val rString = flow { val x = Future(5) x().toString @@ -728,54 +747,66 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } "ticket812FutureDispatchCleanup" in { - implicit val dispatcher = new Dispatcher("ticket812FutureDispatchCleanup") - assert(dispatcher.pendingTasks === 0) - val future = Future({ Thread.sleep(100); "Done" }, 10) - intercept[FutureTimeoutException] { future.await } - assert(dispatcher.pendingTasks === 1) - Thread.sleep(200) - assert(dispatcher.pendingTasks === 0) + filterException[FutureTimeoutException] { + implicit val dispatcher = new Dispatcher("ticket812FutureDispatchCleanup") + assert(dispatcher.tasks === 0) + val future = Future({ Thread.sleep(100); "Done" }, 10) + intercept[FutureTimeoutException] { future.await } + assert(dispatcher.tasks === 1) + Thread.sleep(200) + assert(dispatcher.tasks === 0) + } } "run callbacks async" in { - val l1, l2, l3, l4, l5, l6 = new StandardLatch + val latch = Vector.fill(10)(new StandardLatch) - val f1 = Future { l1.await; "Hello" } - val f2 = f1 map { s ⇒ l2.await; s.length } + val f1 = Future { latch(0).open; latch(1).await; "Hello" } + val f2 = f1 map { s ⇒ latch(2).open; latch(3).await; s.length } + f2 foreach (_ ⇒ latch(4).open) + + latch(0).await f1 must not be ('completed) f2 must not be ('completed) - l1.open + latch(1).open + latch(2).await - f1.await must be('completed) + f1 must be('completed) f2 must not be ('completed) - val f3 = f1 map { s ⇒ l2.await; s.length * 2 } + val f3 = f1 map { s ⇒ latch(5).open; latch(6).await; s.length * 2 } + f3 foreach (_ ⇒ latch(3).open) + + latch(5).await - f2 must not be ('completed) f3 must not be ('completed) - l2.open + latch(6).open + latch(4).await - f2.await must be('completed) - f3.await must be('completed) + f2 must be('completed) + f3 must be('completed) val p1 = Promise[String]() - val f4 = p1 map { s ⇒ l3.await; s.length } + val f4 = p1 map { s ⇒ latch(7).open; latch(8).await; s.length } + f4 foreach (_ ⇒ latch(9).open) p1 must not be ('completed) f4 must not be ('completed) p1 complete Right("Hello") + latch(7).await + p1 must be('completed) f4 must not be ('completed) - l3.open + latch(8).open + latch(9).await f4.await must be('completed) - } } } @@ -887,4 +918,5 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } + def checkType[A: Manifest, B](in: Future[A], refmanifest: Manifest[B]): Boolean = manifest[A] == refmanifest } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PinnedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PinnedActorSpec.scala index c2b6f93ec2..48918d994d 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PinnedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PinnedActorSpec.scala @@ -41,7 +41,7 @@ class PinnedActorSpec extends JUnitSuite { } @Test - def shouldSendOneWay { + def shouldTell { var oneWay = new CountDownLatch(1) val actor = actorOf(new Actor { self.dispatcher = Dispatchers.newPinnedDispatcher(self) diff --git a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala index 0ac7680da1..cce7a14d11 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala @@ -7,7 +7,7 @@ import java.util.concurrent.atomic.AtomicInteger import akka.actor.Actor._ import akka.testkit.Testing._ import akka.actor.{ TypedActor, Actor } -import akka.testkit.TestLatch +import akka.testkit.{ TestLatch, filterEvents, EventFilter, filterException } import akka.util.duration._ object ActorPoolSpec { @@ -357,210 +357,214 @@ class ActorPoolSpec extends WordSpec with MustMatchers { } "provide default supervision of pooled actors" in { - import akka.config.Supervision._ - val pingCount = new AtomicInteger(0) - val deathCount = new AtomicInteger(0) - var keepDying = false + filterException[RuntimeException] { + import akka.config.Supervision._ + val pingCount = new AtomicInteger(0) + val deathCount = new AtomicInteger(0) + var keepDying = false - val pool1 = actorOf( - new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter { - def lowerBound = 2 - def upperBound = 5 - def rampupRate = 0.1 - def backoffRate = 0.1 - def backoffThreshold = 0.5 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - def pressureThreshold = 1 - def factory = actorOf(new Actor { - if (deathCount.get > 5) deathCount.set(0) - if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } - def receive = { - case akka.Die ⇒ - if (keepDying) deathCount.incrementAndGet - throw new RuntimeException - case _ ⇒ pingCount.incrementAndGet - } + val pool1 = actorOf( + new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter { + def lowerBound = 2 + def upperBound = 5 + def rampupRate = 0.1 + def backoffRate = 0.1 + def backoffThreshold = 0.5 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + def pressureThreshold = 1 + def factory = actorOf(new Actor { + if (deathCount.get > 5) deathCount.set(0) + if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } + def receive = { + case akka.Die ⇒ + if (keepDying) deathCount.incrementAndGet + throw new RuntimeException + case _ ⇒ pingCount.incrementAndGet + } + }).start() }).start() - }).start() - val pool2 = actorOf( - new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter { - def lowerBound = 2 - def upperBound = 5 - def rampupRate = 0.1 - def backoffRate = 0.1 - def backoffThreshold = 0.5 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - def pressureThreshold = 1 - def factory = actorOf(new Actor { - self.lifeCycle = Permanent - if (deathCount.get > 5) deathCount.set(0) - if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } - def receive = { - case akka.Die ⇒ - if (keepDying) deathCount.incrementAndGet - throw new RuntimeException - case _ ⇒ pingCount.incrementAndGet - } + val pool2 = actorOf( + new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter { + def lowerBound = 2 + def upperBound = 5 + def rampupRate = 0.1 + def backoffRate = 0.1 + def backoffThreshold = 0.5 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + def pressureThreshold = 1 + def factory = actorOf(new Actor { + self.lifeCycle = Permanent + if (deathCount.get > 5) deathCount.set(0) + if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } + def receive = { + case akka.Die ⇒ + if (keepDying) deathCount.incrementAndGet + throw new RuntimeException + case _ ⇒ pingCount.incrementAndGet + } + }).start() }).start() - }).start() - val pool3 = actorOf( - new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with RoundRobinSelector with BasicFilter { - def lowerBound = 2 - def upperBound = 5 - def rampupRate = 0.1 - def backoffRate = 0.1 - def backoffThreshold = 0.5 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - def pressureThreshold = 1 - def factory = actorOf(new Actor { - self.lifeCycle = Temporary - if (deathCount.get > 5) deathCount.set(0) - if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } - def receive = { - case akka.Die ⇒ - if (keepDying) deathCount.incrementAndGet - throw new RuntimeException - case _ ⇒ pingCount.incrementAndGet - } + val pool3 = actorOf( + new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with RoundRobinSelector with BasicFilter { + def lowerBound = 2 + def upperBound = 5 + def rampupRate = 0.1 + def backoffRate = 0.1 + def backoffThreshold = 0.5 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + def pressureThreshold = 1 + def factory = actorOf(new Actor { + self.lifeCycle = Temporary + if (deathCount.get > 5) deathCount.set(0) + if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } + def receive = { + case akka.Die ⇒ + if (keepDying) deathCount.incrementAndGet + throw new RuntimeException + case _ ⇒ pingCount.incrementAndGet + } + }).start() }).start() - }).start() - // default lifecycle - // actor comes back right away - pingCount.set(0) - keepDying = false - pool1 ! "ping" - (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pool1 ! akka.Die - sleepFor(2 seconds) - (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pingCount.get must be(1) + // default lifecycle + // actor comes back right away + pingCount.set(0) + keepDying = false + pool1 ! "ping" + (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pool1 ! akka.Die + sleepFor(2 seconds) + (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pingCount.get must be(1) - // default lifecycle - // actor dies completely - pingCount.set(0) - keepDying = true - pool1 ! "ping" - (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pool1 ! akka.Die - sleepFor(2 seconds) - (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1) - pool1 ! "ping" - (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pingCount.get must be(2) + // default lifecycle + // actor dies completely + pingCount.set(0) + keepDying = true + pool1 ! "ping" + (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pool1 ! akka.Die + sleepFor(2 seconds) + (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1) + pool1 ! "ping" + (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pingCount.get must be(2) - // permanent lifecycle - // actor comes back right away - pingCount.set(0) - keepDying = false - pool2 ! "ping" - (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pool2 ! akka.Die - sleepFor(2 seconds) - (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pingCount.get must be(1) + // permanent lifecycle + // actor comes back right away + pingCount.set(0) + keepDying = false + pool2 ! "ping" + (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pool2 ! akka.Die + sleepFor(2 seconds) + (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pingCount.get must be(1) - // permanent lifecycle - // actor dies completely - pingCount.set(0) - keepDying = true - pool2 ! "ping" - (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pool2 ! akka.Die - sleepFor(2 seconds) - (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1) - pool2 ! "ping" - (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pingCount.get must be(2) + // permanent lifecycle + // actor dies completely + pingCount.set(0) + keepDying = true + pool2 ! "ping" + (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pool2 ! akka.Die + sleepFor(2 seconds) + (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1) + pool2 ! "ping" + (pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pingCount.get must be(2) - // temporary lifecycle - pingCount.set(0) - keepDying = false - pool3 ! "ping" - (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pool3 ! akka.Die - sleepFor(2 seconds) - (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1) - pool3 ! "ping" - pool3 ! "ping" - pool3 ! "ping" - (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pingCount.get must be(4) + // temporary lifecycle + pingCount.set(0) + keepDying = false + pool3 ! "ping" + (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pool3 ! akka.Die + sleepFor(2 seconds) + (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1) + pool3 ! "ping" + pool3 ! "ping" + pool3 ! "ping" + (pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pingCount.get must be(4) + } } "support customizable supervision config of pooled actors" in { - import akka.config.Supervision._ - val pingCount = new AtomicInteger(0) - val deathCount = new AtomicInteger(0) - var keepDying = false + filterEvents(EventFilter[IllegalStateException], EventFilter[RuntimeException]) { + import akka.config.Supervision._ + val pingCount = new AtomicInteger(0) + val deathCount = new AtomicInteger(0) + var keepDying = false - trait LimitedTrapSupervisionConfig extends ActorPoolSupervisionConfig { - def poolFaultHandler = OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000) - } + trait LimitedTrapSupervisionConfig extends ActorPoolSupervisionConfig { + def poolFaultHandler = OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000) + } - object BadState + object BadState - val pool1 = actorOf( - new Actor with DefaultActorPool with LimitedTrapSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter { - def lowerBound = 2 - def upperBound = 5 - def rampupRate = 0.1 - def backoffRate = 0.1 - def backoffThreshold = 0.5 - def partialFill = true - def selectionCount = 1 - def instance = factory - def receive = _route - def pressureThreshold = 1 - def factory = actorOf(new Actor { - if (deathCount.get > 5) deathCount.set(0) - if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } - def receive = { - case BadState ⇒ - if (keepDying) deathCount.incrementAndGet - throw new IllegalStateException - case akka.Die ⇒ - throw new RuntimeException - case _ ⇒ pingCount.incrementAndGet - } + val pool1 = actorOf( + new Actor with DefaultActorPool with LimitedTrapSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter { + def lowerBound = 2 + def upperBound = 5 + def rampupRate = 0.1 + def backoffRate = 0.1 + def backoffThreshold = 0.5 + def partialFill = true + def selectionCount = 1 + def instance = factory + def receive = _route + def pressureThreshold = 1 + def factory = actorOf(new Actor { + if (deathCount.get > 5) deathCount.set(0) + if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } + def receive = { + case BadState ⇒ + if (keepDying) deathCount.incrementAndGet + throw new IllegalStateException + case akka.Die ⇒ + throw new RuntimeException + case _ ⇒ pingCount.incrementAndGet + } + }).start() }).start() - }).start() - // actor comes back right away - pingCount.set(0) - keepDying = false - pool1 ! "ping" - (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pool1 ! BadState - sleepFor(2 seconds) - (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pingCount.get must be(1) + // actor comes back right away + pingCount.set(0) + keepDying = false + pool1 ! "ping" + (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pool1 ! BadState + sleepFor(2 seconds) + (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pingCount.get must be(1) - // actor dies completely - pingCount.set(0) - keepDying = true - pool1 ! "ping" - (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pool1 ! BadState - sleepFor(2 seconds) - (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1) - pool1 ! "ping" - (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) - pingCount.get must be(2) + // actor dies completely + pingCount.set(0) + keepDying = true + pool1 ! "ping" + (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pool1 ! BadState + sleepFor(2 seconds) + (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1) + pool1 ! "ping" + (pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2) + pingCount.get must be(2) - // kill it - intercept[RuntimeException](pool1.?(akka.Die).get) + // kill it + intercept[RuntimeException](pool1.?(akka.Die).get) + } } } -} \ No newline at end of file +} diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 962eba6183..1460c81e03 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -29,13 +29,13 @@ class RoutingSpec extends WordSpec with MustMatchers { "be started when constructed" in { val actor1 = Actor.actorOf[TestActor].start - val actor = Routing.newRoutedActorRef("foo", List(actor1), RouterType.Direct) + val actor = Routing.actorOf("foo", List(actor1), RouterType.Direct) actor.isRunning must be(true) } "throw IllegalArgumentException at construction when no connections" in { try { - Routing.newRoutedActorRef("foo", List(), RouterType.Direct) + Routing.actorOf("foo", List(), RouterType.Direct) fail() } catch { case e: IllegalArgumentException ⇒ @@ -53,7 +53,7 @@ class RoutingSpec extends WordSpec with MustMatchers { } }).start() - val routedActor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.Direct) + val routedActor = Routing.actorOf("foo", List(connection1), RouterType.Direct) routedActor ! "hello" routedActor ! "end" @@ -73,7 +73,7 @@ class RoutingSpec extends WordSpec with MustMatchers { } }).start() - val actor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.Direct) + val actor = Routing.actorOf("foo", List(connection1), RouterType.Direct) actor ! Broadcast(1) actor ! "end" @@ -89,13 +89,13 @@ class RoutingSpec extends WordSpec with MustMatchers { "be started when constructed" in { val actor1 = Actor.actorOf[TestActor].start - val actor = Routing.newRoutedActorRef("foo", List(actor1), RouterType.RoundRobin) + val actor = Routing.actorOf("foo", List(actor1), RouterType.RoundRobin) actor.isRunning must be(true) } "throw IllegalArgumentException at construction when no connections" in { try { - Routing.newRoutedActorRef("foo", List(), RouterType.RoundRobin) + Routing.actorOf("foo", List(), RouterType.RoundRobin) fail() } catch { case e: IllegalArgumentException ⇒ @@ -127,7 +127,7 @@ class RoutingSpec extends WordSpec with MustMatchers { } //create the routed actor. - val actor = Routing.newRoutedActorRef("foo", connections, RouterType.RoundRobin) + val actor = Routing.actorOf("foo", connections, RouterType.RoundRobin) //send messages to the actor. for (i ← 0 until iterationCount) { @@ -165,7 +165,7 @@ class RoutingSpec extends WordSpec with MustMatchers { } }).start() - val actor = Routing.newRoutedActorRef("foo", List(connection1, connection2), RouterType.RoundRobin) + val actor = Routing.actorOf("foo", List(connection1, connection2), RouterType.RoundRobin) actor ! Broadcast(1) actor ! Broadcast("end") @@ -187,7 +187,7 @@ class RoutingSpec extends WordSpec with MustMatchers { } }).start() - val actor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.RoundRobin) + val actor = Routing.actorOf("foo", List(connection1), RouterType.RoundRobin) try { actor ? Broadcast(1) @@ -208,13 +208,13 @@ class RoutingSpec extends WordSpec with MustMatchers { val actor1 = Actor.actorOf[TestActor].start - val actor = Routing.newRoutedActorRef("foo", List(actor1), RouterType.Random) + val actor = Routing.actorOf("foo", List(actor1), RouterType.Random) actor.isRunning must be(true) } "throw IllegalArgumentException at construction when no connections" in { try { - Routing.newRoutedActorRef("foo", List(), RouterType.Random) + Routing.actorOf("foo", List(), RouterType.Random) fail() } catch { case e: IllegalArgumentException ⇒ @@ -244,7 +244,7 @@ class RoutingSpec extends WordSpec with MustMatchers { } }).start() - val actor = Routing.newRoutedActorRef("foo", List(connection1, connection2), RouterType.Random) + val actor = Routing.actorOf("foo", List(connection1, connection2), RouterType.Random) actor ! Broadcast(1) actor ! Broadcast("end") @@ -266,7 +266,7 @@ class RoutingSpec extends WordSpec with MustMatchers { } }).start() - val actor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.Random) + val actor = Routing.actorOf("foo", List(connection1), RouterType.Random) try { actor ? Broadcast(1) @@ -286,7 +286,7 @@ class RoutingSpec extends WordSpec with MustMatchers { val actor1 = Actor.actorOf[TestActor].start try { - Routing.newRoutedActorRef("foo", List(actor1), RouterType.LeastCPU) + Routing.actorOf("foo", List(actor1), RouterType.LeastCPU) } catch { case e: IllegalArgumentException ⇒ } @@ -298,7 +298,7 @@ class RoutingSpec extends WordSpec with MustMatchers { val actor1 = Actor.actorOf[TestActor].start try { - Routing.newRoutedActorRef("foo", List(actor1), RouterType.LeastRAM) + Routing.actorOf("foo", List(actor1), RouterType.LeastRAM) } catch { case e: IllegalArgumentException ⇒ } @@ -310,7 +310,7 @@ class RoutingSpec extends WordSpec with MustMatchers { val actor1 = Actor.actorOf[TestActor].start try { - Routing.newRoutedActorRef("foo", List(actor1), RouterType.LeastMessages) + Routing.actorOf("foo", List(actor1), RouterType.LeastMessages) } catch { case e: IllegalArgumentException ⇒ } diff --git a/akka-actor/src/main/java/akka/actor/Actors.java b/akka-actor/src/main/java/akka/actor/Actors.java index 3ef7bc254b..147585cd79 100644 --- a/akka-actor/src/main/java/akka/actor/Actors.java +++ b/akka-actor/src/main/java/akka/actor/Actors.java @@ -46,7 +46,7 @@ public class Actors { * } * }, "my-actor-address"); * actor.start(); - * actor.sendOneWay(message, context); + * actor.tell(message, context); * actor.stop(); * */ @@ -70,7 +70,7 @@ public class Actors { * } * }); * actor.start(); - * actor.sendOneWay(message, context); + * actor.tell(message, context); * actor.stop(); * */ @@ -84,7 +84,7 @@ public class Actors { *
* ActorRef actor = Actors.actorOf(MyUntypedActor.class, "my-actor-address");
* actor.start();
- * actor.sendOneWay(message, context);
+ * actor.tell(message, context);
* actor.stop();
*
* You can create and start the actor in one statement like this:
@@ -102,7 +102,7 @@ public class Actors {
*
* ActorRef actor = Actors.actorOf(MyUntypedActor.class, "my-actor-address");
* actor.start();
- * actor.sendOneWay(message, context);
+ * actor.tell(message, context);
* actor.stop();
*
* You can create and start the actor in one statement like this:
@@ -130,7 +130,7 @@ public class Actors {
/**
* The message that when sent to an Actor kills it by throwing an exception.
*
- * actor.sendOneWay(kill());
+ * actor.tell(kill());
*
* @return the single instance of Kill
*/
@@ -142,7 +142,7 @@ public class Actors {
/**
* The message that when sent to an Actor shuts it down by calling 'stop'.
*
- * actor.sendOneWay(poisonPill());
+ * actor.tell(poisonPill());
*
* @return the single instance of PoisonPill
*/
diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala
index 3a5f906d89..7fec89b4c2 100644
--- a/akka-actor/src/main/scala/akka/actor/Actor.scala
+++ b/akka-actor/src/main/scala/akka/actor/Actor.scala
@@ -108,6 +108,9 @@ class InvalidMessageException private[akka] (message: String, cause: Throwable =
* This message is thrown by default when an Actors behavior doesn't match a message
*/
case class UnhandledMessageException(msg: Any, ref: ActorRef = null) extends Exception {
+
+ def this(msg: String) = this(msg, null)
+
// constructor with 'null' ActorRef needed to work with client instantiation of remote exception
override def getMessage =
if (ref ne null) "Actor %s does not handle [%s]".format(ref, msg)
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index 32b779c466..e2bf4d55f5 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -71,7 +71,7 @@ private[akka] object ActorRefInternals {
*
* @author Jonas Bonér
*/
-trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Comparable[ActorRef] with Serializable {
+abstract class ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Comparable[ActorRef] with Serializable {
scalaRef: ScalaActorRef ⇒
// Only mutable for RemoteServer in order to maintain identity across nodes
@volatile
@@ -195,6 +195,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
* The reference sender Actor of the last received message.
* Is defined if the message was sent from another Actor, else None.
*/
+ @deprecated("will be removed in 2.0, use channel instead", "1.2")
def getSender: Option[ActorRef] = sender
/**
@@ -202,6 +203,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
* The reference sender future of the last received message.
* Is defined if the message was sent with sent with '?'/'ask', else None.
*/
+ @deprecated("will be removed in 2.0, use channel instead", "1.2")
def getSenderFuture: Option[Promise[Any]] = senderFuture
/**
@@ -260,7 +262,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
* Sends a message asynchronously returns a future holding the eventual reply message.
*
* NOTE:
- * Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'getContext().getSender()' to
+ * Use this method with care. In most cases it is better to use 'tell' together with the 'getContext().getSender()' to
* implement request/response message exchanges.
*
* If you are sending messages using ask then you have to use getContext().reply(..)
diff --git a/akka-actor/src/main/scala/akka/actor/Channel.scala b/akka-actor/src/main/scala/akka/actor/Channel.scala
index d5e3ce7159..df35725bfc 100644
--- a/akka-actor/src/main/scala/akka/actor/Channel.scala
+++ b/akka-actor/src/main/scala/akka/actor/Channel.scala
@@ -59,10 +59,10 @@ trait Channel[-T] {
* Java API.
* Sends the specified message to the channel, i.e. fire-and-forget semantics.
*
- * actor.sendOneWay(message);
+ * actor.tell(message);
*
*/
- def sendOneWay(msg: T): Unit = this.!(msg)
+ def tell(msg: T): Unit = this.!(msg)
/**
* Java API.
@@ -70,19 +70,19 @@ trait Channel[-T] {
* semantics, including the sender reference if possible (not supported on
* all channels).
*
- * actor.sendOneWay(message, context);
+ * actor.tell(message, context);
*
*/
- def sendOneWay(msg: T, sender: UntypedChannel): Unit = this.!(msg)(sender)
+ def tell(msg: T, sender: UntypedChannel): Unit = this.!(msg)(sender)
/**
* Java API.
* Try to send the specified message to the channel, i.e. fire-and-forget semantics.
*
- * actor.sendOneWay(message);
+ * actor.tell(message);
*
*/
- def sendOneWaySafe(msg: T): Boolean = this.safe_!(msg)
+ def tellSafe(msg: T): Boolean = this.safe_!(msg)
/**
* Java API.
@@ -90,10 +90,10 @@ trait Channel[-T] {
* semantics, including the sender reference if possible (not supported on
* all channels).
*
- * actor.sendOneWay(message, context);
+ * actor.tell(message, context);
*
*/
- def sendOneWaySafe(msg: T, sender: UntypedChannel): Boolean = this.safe_!(msg)(sender)
+ def tellSafe(msg: T, sender: UntypedChannel): Boolean = this.safe_!(msg)(sender)
}
diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala
index a89a9bccc0..fa93cc0e57 100644
--- a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala
+++ b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala
@@ -25,7 +25,7 @@ import akka.japi.{ Creator, Procedure }
* } else if (msg.equals("UseSender") && getContext().getSender().isDefined()) {
* // Reply to original sender of message using the sender reference
* // also passing along my own reference (the context)
- * getContext().getSender().get().sendOneWay(msg, context);
+ * getContext().getSender().get().tell(msg, context);
*
* } else if (msg.equals("UseSenderFuture") && getContext().getSenderFuture().isDefined()) {
* // Reply to original sender of message using the sender future reference
@@ -33,7 +33,7 @@ import akka.japi.{ Creator, Procedure }
*
* } else if (msg.equals("SendToSelf")) {
* // Send message to the actor itself recursively
- * getContext().sendOneWay(msg)
+ * getContext().tell(msg)
*
* } else if (msg.equals("ForwardMessage")) {
* // Retreive an actor from the ActorRegistry by ID and get an ActorRef back
@@ -46,7 +46,7 @@ import akka.japi.{ Creator, Procedure }
* public static void main(String[] args) {
* ActorRef actor = Actors.actorOf(SampleUntypedActor.class);
* actor.start();
- * actor.sendOneWay("SendToSelf");
+ * actor.tell("SendToSelf");
* actor.stop();
* }
* }
diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala
index 22bfa963df..216b3cdf3f 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Future.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala
@@ -787,6 +787,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
/**
* Must be called inside _lock.lock<->_lock.unlock
+ * Returns true if completed within the timeout
*/
@tailrec
private def awaitUnsafe(waitTimeNanos: Long): Boolean = {
@@ -805,26 +806,23 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
}
def await(atMost: Duration) = {
- _lock.lock()
- if (try { awaitUnsafe(atMost.toNanos min timeLeft()) } finally { _lock.unlock }) this
- else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds")
- }
-
- def await = {
_lock.lock()
try {
- if (timeout.duration.isFinite) {
- if (awaitUnsafe(timeLeft())) this
- else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds")
- } else {
+ if (!atMost.isFinite && !timeout.duration.isFinite) { //If wait until infinity
while (_value.isEmpty) { _signal.await }
this
+ } else { //Limited wait
+ val time = if (!atMost.isFinite) timeLeft() //If atMost is infinity, use preset timeout
+ else if (!timeout.duration.isFinite) atMost.toNanos //If preset timeout is infinite, use atMost
+ else atMost.toNanos min timeLeft() //Otherwise use the smallest of them
+ if (awaitUnsafe(time)) this
+ else throw new FutureTimeoutException("Future timed out after [" + NANOS.toMillis(time) + "] ms")
}
- } finally {
- _lock.unlock
- }
+ } finally { _lock.unlock }
}
+ def await = await(timeout.duration)
+
def isExpired: Boolean = if (timeout.duration.isFinite) timeLeft() <= 0 else false
def value: Option[Either[Throwable, T]] = {
@@ -954,7 +952,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
try {
func(this)
} catch {
- case e ⇒ EventHandler notify EventHandler.Error(e, this)
+ case e ⇒ EventHandler.error(e, this, "Future onComplete-callback raised an exception")
}
}
diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala
index 7f4437ed4c..4081bf5c57 100644
--- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala
@@ -49,12 +49,12 @@ object MessageDispatcher {
/**
* @author Jonas Bonér
*/
-trait MessageDispatcher {
+abstract class MessageDispatcher {
import MessageDispatcher._
protected val uuids = new ConcurrentSkipListSet[Uuid]
- protected val tasks = new AtomicLong(0L)
+ protected val _tasks = new AtomicLong(0L)
protected val guard = new ReentrantGuard
protected val active = new Switch(false)
@@ -93,7 +93,7 @@ trait MessageDispatcher {
}
private[akka] final def dispatchTask(block: () ⇒ Unit): Unit = {
- tasks.getAndIncrement()
+ _tasks.getAndIncrement()
try {
if (active.isOff)
guard withGuard {
@@ -104,15 +104,15 @@ trait MessageDispatcher {
executeTask(TaskInvocation(block, taskCleanup))
} catch {
case e ⇒
- tasks.decrementAndGet
+ _tasks.decrementAndGet
throw e
}
}
private val taskCleanup: () ⇒ Unit =
- () ⇒ if (tasks.decrementAndGet() == 0) {
+ () ⇒ if (_tasks.decrementAndGet() == 0) {
guard withGuard {
- if (tasks.get == 0 && uuids.isEmpty) {
+ if (_tasks.get == 0 && uuids.isEmpty) {
shutdownSchedule match {
case UNSCHEDULED ⇒
shutdownSchedule = SCHEDULED
@@ -149,7 +149,7 @@ trait MessageDispatcher {
if (uuids remove actorRef.uuid) {
cleanUpMailboxFor(actorRef)
actorRef.mailbox = null
- if (uuids.isEmpty && tasks.get == 0) {
+ if (uuids.isEmpty && _tasks.get == 0) {
shutdownSchedule match {
case UNSCHEDULED ⇒
shutdownSchedule = SCHEDULED
@@ -190,7 +190,7 @@ trait MessageDispatcher {
shutdownSchedule = SCHEDULED
Scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS)
case SCHEDULED ⇒
- if (uuids.isEmpty && tasks.get == 0) {
+ if (uuids.isEmpty && _tasks.get == 0) {
active switchOff {
shutdown() // shut down in the dispatcher's references is zero
}
@@ -248,7 +248,7 @@ trait MessageDispatcher {
/**
* Returns the amount of tasks queued for execution
*/
- def pendingTasks: Long = tasks.get
+ def tasks: Long = _tasks.get
}
/**
diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala
index 4b6597393e..a0d5eadbb0 100644
--- a/akka-actor/src/main/scala/akka/routing/Routing.scala
+++ b/akka-actor/src/main/scala/akka/routing/Routing.scala
@@ -73,7 +73,7 @@ object Routing {
* @throws IllegalArgumentException if the number of connections is zero, or if it depends on the actual router implementation
* how many connections it can handle.
*/
- def newRoutedActorRef(actorAddress: String, connections: Iterable[ActorRef], routerType: RouterType): RoutedActorRef = {
+ def actorOf(actorAddress: String, connections: Iterable[ActorRef], routerType: RouterType): ActorRef = {
if (connections.size == 0)
throw new IllegalArgumentException("To create a routed actor ref, at least one connection is required")
@@ -91,8 +91,8 @@ object Routing {
ref.start()
}
- def newRoundRobinActorRef(actorAddress: String, connections: Iterable[ActorRef]): RoutedActorRef = {
- newRoutedActorRef(actorAddress, connections, RoundRobin)
+ def actorOfWithRoundRobin(actorAddress: String, connections: Iterable[ActorRef]): ActorRef = {
+ actorOf(actorAddress, connections, RoundRobin)
}
}
@@ -100,7 +100,7 @@ object Routing {
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to
* on (or more) of these actors.
*/
-class RoutedActorRef(val address: String, val cons: Iterable[ActorRef]) extends UnsupportedActorRef with ScalaActorRef {
+class RoutedActorRef(val address: String, val cons: Iterable[ActorRef]) extends UnsupportedActorRef {
this: Router ⇒
def connections: Iterable[ActorRef] = cons
diff --git a/akka-camel/src/test/scala/akka/camel/UntypedProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/UntypedProducerFeatureTest.scala
index 96510ad415..4a5dfdca0c 100644
--- a/akka-camel/src/test/scala/akka/camel/UntypedProducerFeatureTest.scala
+++ b/akka-camel/src/test/scala/akka/camel/UntypedProducerFeatureTest.scala
@@ -69,7 +69,7 @@ class UntypedProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with
when("a test message is sent to the producer with !")
mockEndpoint.expectedBodiesReceived("received test")
- val result = producer.sendOneWay(Message("test"), producer)
+ val result = producer.tell(Message("test"), producer)
then("a normal response should have been sent")
mockEndpoint.assertIsSatisfied
diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
index 348b11195e..37737280ae 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
@@ -1430,6 +1430,7 @@ class DefaultClusterNode private[akka] (
}
private[cluster] def failOverClusterActorRefConnections(from: InetSocketAddress, to: InetSocketAddress) {
+ EventHandler.info(this, "failOverClusterActorRef from %s to %s".format(from, to))
clusterActorRefs.values(from) foreach (_.failOver(from, to))
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
index 87b120cf30..6745eb1370 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
@@ -15,6 +15,7 @@ import com.eaio.uuid.UUID
import collection.immutable.Map
import annotation.tailrec
import akka.routing.Router
+import akka.event.EventHandler
/**
* ActorRef representing a one or many instances of a clustered, load-balanced and sometimes replicated actor
@@ -25,7 +26,7 @@ import akka.routing.Router
class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]],
val address: String,
_timeout: Long)
- extends UnsupportedActorRef with ScalaActorRef {
+ extends UnsupportedActorRef {
this: Router ⇒
timeout = _timeout
@@ -57,6 +58,8 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
}
private[akka] def failOver(from: InetSocketAddress, to: InetSocketAddress): Unit = {
+ EventHandler.info(this, "ClusterActorRef. %s failover from %s to %s".format(address, from, to))
+
@tailrec
def doFailover(from: InetSocketAddress, to: InetSocketAddress): Unit = {
val oldValue = inetSocketAddressToActorRefMap.get
@@ -93,6 +96,8 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
}
def signalDeadActor(ref: ActorRef): Unit = {
+ EventHandler.info(this, "ClusterActorRef %s signalDeadActor %s".format(address, ref.address))
+
//since the number remote actor refs for a clustered actor ref is quite low, we can deal with the O(N) complexity
//of the following removal.
val map = inetSocketAddressToActorRefMap.get
diff --git a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
index 015ccde474..b5ebf69062 100644
--- a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
+++ b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
@@ -283,7 +283,7 @@ abstract class RemoteClient private[akka] (
while (pendingRequest ne null) {
val (isOneWay, futureUuid, message) = pendingRequest
if (isOneWay) {
- // sendOneWay
+ // tell
val future = currentChannel.write(RemoteEncoder.encode(message))
future.awaitUninterruptibly()
if (!future.isCancelled && !future.isSuccess) {
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/bad_address/BadAddressDirectRoutingMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/bad_address/BadAddressDirectRoutingMultiJvmSpec.scala
deleted file mode 100644
index 75bc03309d..0000000000
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/bad_address/BadAddressDirectRoutingMultiJvmSpec.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-package akka.cluster.routing.direct.bad_address
-
-import akka.cluster.{ Cluster, MasterClusterTestNode }
-import akka.actor.Actor
-import akka.config.Config
-
-object BadAddressDirectRoutingMultiJvmSpec {
-
- val NrOfNodes = 1
-
- class SomeActor extends Actor with Serializable {
- println("---------------------------------------------------------------------------")
- println("SomeActor has been created on node [" + Config.nodename + "]")
- println("---------------------------------------------------------------------------")
-
- def receive = {
- case "identify" ⇒ {
- println("The node received the 'identify' command: " + Config.nodename)
- self.reply(Config.nodename)
- }
- }
- }
-
-}
-
-class BadAddressDirectRoutingMultiJvmNode1 extends MasterClusterTestNode {
-
- import BadAddressDirectRoutingMultiJvmSpec._
-
- val testNodes = NrOfNodes
-
- "node" must {
- "participate in cluster" in {
- Cluster.node.start()
-
- Cluster.barrier("waiting-for-begin", NrOfNodes).await()
- Cluster.barrier("waiting-to-end", NrOfNodes).await()
- Cluster.node.shutdown()
- }
- }
-}
-
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/multiple_replicas/MultiReplicaDirectRoutingMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/failover/FailoverDirectRoutingMultiJvmNode1.conf
similarity index 65%
rename from akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/multiple_replicas/MultiReplicaDirectRoutingMultiJvmNode2.conf
rename to akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/failover/FailoverDirectRoutingMultiJvmNode1.conf
index b60f6a3b5c..602bc41489 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/multiple_replicas/MultiReplicaDirectRoutingMultiJvmNode2.conf
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/failover/FailoverDirectRoutingMultiJvmNode1.conf
@@ -1,3 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"
+akka.actor.deployment.service-hello.clustered.home = "node:node2"
\ No newline at end of file
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/bad_address/BadAddressDirectRoutingMultiJvmNode1.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/failover/FailoverDirectRoutingMultiJvmNode1.opts
similarity index 100%
rename from akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/bad_address/BadAddressDirectRoutingMultiJvmNode1.opts
rename to akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/failover/FailoverDirectRoutingMultiJvmNode1.opts
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/failover/FailoverDirectRoutingMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/failover/FailoverDirectRoutingMultiJvmNode2.conf
new file mode 100644
index 0000000000..602bc41489
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/failover/FailoverDirectRoutingMultiJvmNode2.conf
@@ -0,0 +1,4 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "WARNING"
+akka.actor.deployment.service-hello.router = "direct"
+akka.actor.deployment.service-hello.clustered.home = "node:node2"
\ No newline at end of file
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/multiple_replicas/MultiReplicaDirectRoutingMultiJvmNode2.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/failover/FailoverDirectRoutingMultiJvmNode2.opts
similarity index 100%
rename from akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/multiple_replicas/MultiReplicaDirectRoutingMultiJvmNode2.opts
rename to akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/failover/FailoverDirectRoutingMultiJvmNode2.opts
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/failover/FailoverDirectRoutingMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/failover/FailoverDirectRoutingMultiJvmSpec.scala
new file mode 100644
index 0000000000..df41e93fd9
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/failover/FailoverDirectRoutingMultiJvmSpec.scala
@@ -0,0 +1,77 @@
+package akka.cluster.routing.direct.failover
+
+import akka.config.Config
+import scala.Predef._
+import akka.cluster.{ ClusterActorRef, Cluster, MasterClusterTestNode, ClusterTestNode }
+import akka.actor.{ ActorInitializationException, Actor }
+
+object FailoverDirectRoutingMultiJvmSpec {
+
+ val NrOfNodes = 2
+
+ class SomeActor extends Actor with Serializable {
+ println("---------------------------------------------------------------------------")
+ println("SomeActor has been created on node [" + Config.nodename + "]")
+ println("---------------------------------------------------------------------------")
+
+ def receive = {
+ case "identify" ⇒
+ println("The node received the 'identify' command: " + Config.nodename)
+ self.reply(Config.nodename)
+ case "die" ⇒
+ println("The node received the 'die' command: " + Config.nodename)
+ Cluster.node.shutdown
+ }
+ }
+
+}
+
+class FailoverDirectRoutingMultiJvmNode1 extends MasterClusterTestNode {
+
+ import FailoverDirectRoutingMultiJvmSpec._
+
+ val testNodes = NrOfNodes
+
+ "Direct Router" must {
+ "not yet be able to failover to another node" in {
+
+ println("==================================================================================================")
+ println(" FAILOVER DIRECT ROUTING")
+ println("==================================================================================================")
+
+ Cluster.node.start()
+
+ Cluster.barrier("waiting-for-begin", NrOfNodes).await()
+ val actor = Actor.actorOf[SomeActor]("service-hello").start().asInstanceOf[ClusterActorRef]
+
+ println("retrieved identity was: " + (actor ? "identify").get)
+ (actor ? "identify").get must equal("node2")
+
+ actor ! "die"
+
+ Thread.sleep(4000)
+
+ try {
+ actor ! "identify"
+ fail()
+ } catch {
+ case e: ActorInitializationException ⇒
+ }
+ }
+ }
+}
+
+class FailoverDirectRoutingMultiJvmNode2 extends ClusterTestNode {
+
+ import FailoverDirectRoutingMultiJvmSpec._
+
+ "___" must {
+ "___" in {
+ Cluster.node.start()
+ Cluster.barrier("waiting-for-begin", NrOfNodes).await()
+
+ Thread.sleep(30 * 1000)
+ }
+ }
+}
+
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/homenode/HomeNode1MultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/homenode/HomeNode1MultiJvmSpec.scala
new file mode 100644
index 0000000000..1a8805c337
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/homenode/HomeNode1MultiJvmSpec.scala
@@ -0,0 +1,59 @@
+package akka.cluster.routing.direct.homenode
+
+import akka.config.Config
+import akka.actor.Actor
+import akka.cluster.{ ClusterTestNode, MasterClusterTestNode, Cluster }
+import Cluster._
+
+object HomeNodeMultiJvmSpec {
+
+ val NrOfNodes = 2
+
+ class SomeActor extends Actor with Serializable {
+ def receive = {
+ case "identify" ⇒ {
+ self.reply(Config.nodename)
+ }
+ }
+ }
+
+}
+
+class HomeNodeMultiJvmNode1 extends MasterClusterTestNode {
+
+ import HomeNodeMultiJvmSpec._
+
+ val testNodes = NrOfNodes
+
+ "___" must {
+ "___" in {
+ node.start()
+ barrier("waiting-for-begin", NrOfNodes).await()
+ barrier("waiting-for-end", NrOfNodes).await()
+ node.shutdown()
+ }
+ }
+}
+
+class HomeNodeMultiJvmNode2 extends ClusterTestNode {
+
+ import HomeNodeMultiJvmSpec._
+
+ "Direct Router: A Direct Router" must {
+ "obey 'home-node' config option when instantiated actor in cluster" in {
+ node.start()
+ barrier("waiting-for-begin", NrOfNodes).await()
+
+ val actorNode1 = Actor.actorOf[SomeActor]("service-node1").start()
+ val name1 = (actorNode1 ? "identify").get.asInstanceOf[String]
+ name1 must equal("node1")
+
+ val actorNode2 = Actor.actorOf[SomeActor]("service-node2").start()
+ val name2 = (actorNode2 ? "identify").get.asInstanceOf[String]
+ name2 must equal("node2")
+
+ barrier("waiting-for-end", NrOfNodes).await()
+ node.shutdown()
+ }
+ }
+}
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/homenode/HomeNodeMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/homenode/HomeNodeMultiJvmNode1.conf
new file mode 100644
index 0000000000..cfe103b985
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/homenode/HomeNodeMultiJvmNode1.conf
@@ -0,0 +1,6 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "WARNING"
+akka.actor.deployment.service-node1.router = "direct"
+akka.actor.deployment.service-node1.clustered.preferred-nodes = ["node:node1"]
+akka.actor.deployment.service-node2.router = "direct"
+akka.actor.deployment.service-node2.clustered.preferred-nodes = ["node:node2"]
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode1.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/homenode/HomeNodeMultiJvmNode1.opts
similarity index 100%
rename from akka-cluster/src/multi-jvm/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode1.opts
rename to akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/homenode/HomeNodeMultiJvmNode1.opts
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/homenode/HomeNodeMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/homenode/HomeNodeMultiJvmNode2.conf
new file mode 100644
index 0000000000..cfe103b985
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/homenode/HomeNodeMultiJvmNode2.conf
@@ -0,0 +1,6 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "WARNING"
+akka.actor.deployment.service-node1.router = "direct"
+akka.actor.deployment.service-node1.clustered.preferred-nodes = ["node:node1"]
+akka.actor.deployment.service-node2.router = "direct"
+akka.actor.deployment.service-node2.clustered.preferred-nodes = ["node:node2"]
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode2.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/homenode/HomeNodeMultiJvmNode2.opts
similarity index 100%
rename from akka-cluster/src/multi-jvm/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode2.opts
rename to akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/homenode/HomeNodeMultiJvmNode2.opts
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/multiple_replicas/MultiReplicaDirectRoutingMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/multiple_replicas/MultiReplicaDirectRoutingMultiJvmNode1.conf
deleted file mode 100644
index 40fcfa5d51..0000000000
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/multiple_replicas/MultiReplicaDirectRoutingMultiJvmNode1.conf
+++ /dev/null
@@ -1,3 +0,0 @@
-akka.enabled-modules = ["cluster"]
-akka.event-handler-level = "WARNING"
-akka.actor.deployment.service-hello.router = "direct"
\ No newline at end of file
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/multiple_replicas/MultiReplicaDirectRoutingMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/multiple_replicas/MultiReplicaDirectRoutingMultiJvmSpec.scala
deleted file mode 100644
index ca1f87503b..0000000000
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/multiple_replicas/MultiReplicaDirectRoutingMultiJvmSpec.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-package akka.cluster.routing.direct.multiple_replicas
-
-import akka.actor.Actor
-import akka.cluster.{ MasterClusterTestNode, Cluster, ClusterTestNode }
-import akka.config.Config
-
-object MultiReplicaDirectRoutingMultiJvmSpec {
- val NrOfNodes = 2
-
- class SomeActor extends Actor with Serializable {
- println("---------------------------------------------------------------------------")
- println("SomeActor has been created on node [" + Config.nodename + "]")
- println("---------------------------------------------------------------------------")
-
- def receive = {
- case "identify" ⇒ {
- println("The node received the 'identify' command: " + Config.nodename)
- self.reply(Config.nodename)
- }
- }
- }
-
-}
-
-class MultiReplicaDirectRoutingMultiJvmNode2 extends ClusterTestNode {
-
- import MultiReplicaDirectRoutingMultiJvmSpec._
-
- "when node send message to existing node using direct routing it" must {
- "communicate with that node" in {
- Cluster.node.start()
- Cluster.barrier("waiting-for-begin", NrOfNodes).await()
-
- //Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes).await()
-
- val actor = Actor.actorOf[SomeActor]("service-hello")
- actor.start()
-
- //actor.start()
- val name: String = (actor ? "identify").get.asInstanceOf[String]
-
- println("The name of the actor was " + name)
-
- Cluster.barrier("waiting-to-end", NrOfNodes).await()
- Cluster.node.shutdown()
- }
- }
-}
-
-class MultiReplicaDirectRoutingMultiJvmNode1 extends MasterClusterTestNode {
-
- import MultiReplicaDirectRoutingMultiJvmSpec._
-
- val testNodes = NrOfNodes
-
- "node" must {
- "participate in cluster" in {
- Cluster.node.start()
-
- Cluster.barrier("waiting-for-begin", NrOfNodes).await()
- Cluster.barrier("waiting-to-end", NrOfNodes).await()
- Cluster.node.shutdown()
- }
- }
-}
-
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/normalusage/SingleReplicaDirectRoutingMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/normalusage/SingleReplicaDirectRoutingMultiJvmNode1.conf
new file mode 100644
index 0000000000..e6de1b42f8
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/normalusage/SingleReplicaDirectRoutingMultiJvmNode1.conf
@@ -0,0 +1,4 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "WARNING"
+akka.actor.deployment.service-hello.router = "direct"
+akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/single_replica/SingleReplicaDirectRoutingMultiJvmNode1.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/normalusage/SingleReplicaDirectRoutingMultiJvmNode1.opts
similarity index 100%
rename from akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/single_replica/SingleReplicaDirectRoutingMultiJvmNode1.opts
rename to akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/normalusage/SingleReplicaDirectRoutingMultiJvmNode1.opts
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/normalusage/SingleReplicaDirectRoutingMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/normalusage/SingleReplicaDirectRoutingMultiJvmNode2.conf
new file mode 100644
index 0000000000..e6de1b42f8
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/normalusage/SingleReplicaDirectRoutingMultiJvmNode2.conf
@@ -0,0 +1,4 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "WARNING"
+akka.actor.deployment.service-hello.router = "direct"
+akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/single_replica/SingleReplicaDirectRoutingMultiJvmNode2.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/normalusage/SingleReplicaDirectRoutingMultiJvmNode2.opts
similarity index 100%
rename from akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/single_replica/SingleReplicaDirectRoutingMultiJvmNode2.opts
rename to akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/normalusage/SingleReplicaDirectRoutingMultiJvmNode2.opts
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/single_replica/SingleReplicaDirectRoutingMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/normalusage/SingleReplicaDirectRoutingMultiJvmSpec.scala
similarity index 82%
rename from akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/single_replica/SingleReplicaDirectRoutingMultiJvmSpec.scala
rename to akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/normalusage/SingleReplicaDirectRoutingMultiJvmSpec.scala
index 35009b6d47..9bc9681263 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/single_replica/SingleReplicaDirectRoutingMultiJvmSpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/normalusage/SingleReplicaDirectRoutingMultiJvmSpec.scala
@@ -1,8 +1,8 @@
-package akka.cluster.routing.direct.single_replica
+package akka.cluster.routing.direct.normalusage
import akka.actor.Actor
import akka.config.Config
-import akka.cluster.{ ClusterTestNode, MasterClusterTestNode, Cluster }
+import akka.cluster.{ ClusterActorRef, ClusterTestNode, MasterClusterTestNode, Cluster }
object SingleReplicaDirectRoutingMultiJvmSpec {
val NrOfNodes = 2
@@ -19,7 +19,6 @@ object SingleReplicaDirectRoutingMultiJvmSpec {
}
}
}
-
}
class SingleReplicaDirectRoutingMultiJvmNode1 extends MasterClusterTestNode {
@@ -28,24 +27,6 @@ class SingleReplicaDirectRoutingMultiJvmNode1 extends MasterClusterTestNode {
val testNodes = NrOfNodes
- "when node send message to existing node using direct routing it" must {
- "communicate with that node" in {
- Cluster.node.start()
- Cluster.barrier("waiting-for-begin", NrOfNodes).await()
-
- val actor = Actor.actorOf[SomeActor]("service-hello").start()
- actor.isRunning must be(true)
-
- Cluster.barrier("waiting-to-end", NrOfNodes).await()
- Cluster.node.shutdown()
- }
- }
-}
-
-class SingleReplicaDirectRoutingMultiJvmNode2 extends ClusterTestNode {
-
- import SingleReplicaDirectRoutingMultiJvmSpec._
-
"___" must {
"___" in {
Cluster.node.start()
@@ -57,3 +38,24 @@ class SingleReplicaDirectRoutingMultiJvmNode2 extends ClusterTestNode {
}
}
+class SingleReplicaDirectRoutingMultiJvmNode2 extends ClusterTestNode {
+
+ import SingleReplicaDirectRoutingMultiJvmSpec._
+
+ "Direct Router: when node send message to existing node it" must {
+ "communicate with that node" in {
+ Cluster.node.start()
+ Cluster.barrier("waiting-for-begin", NrOfNodes).await()
+
+ val actor = Actor.actorOf[SomeActor]("service-hello").start().asInstanceOf[ClusterActorRef]
+ actor.isRunning must be(true)
+
+ val result = (actor ? "identify").get
+ result must equal("node1")
+
+ Cluster.barrier("waiting-to-end", NrOfNodes).await()
+ Cluster.node.shutdown()
+ }
+ }
+}
+
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/single_replica/SingleReplicaDirectRoutingMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/single_replica/SingleReplicaDirectRoutingMultiJvmNode2.conf
deleted file mode 100644
index 40fcfa5d51..0000000000
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/single_replica/SingleReplicaDirectRoutingMultiJvmNode2.conf
+++ /dev/null
@@ -1,3 +0,0 @@
-akka.enabled-modules = ["cluster"]
-akka.event-handler-level = "WARNING"
-akka.actor.deployment.service-hello.router = "direct"
\ No newline at end of file
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode1.conf
similarity index 71%
rename from akka-cluster/src/multi-jvm/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode1.conf
rename to akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode1.conf
index 366cf9111c..c75c38be44 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode1.conf
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode1.conf
@@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
-akka.actor.deployment.service-hello.router = "round-robin"
-akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
-akka.actor.deployment.service-hello.clustered.replication-factor = 1
\ No newline at end of file
+akka.actor.deployment.service-hello.router = "random"
+akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node2"]
+akka.actor.deployment.service-hello.clustered.replication-factor = 2
\ No newline at end of file
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/multiple_replicas/MultiReplicaDirectRoutingMultiJvmNode1.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode1.opts
similarity index 100%
rename from akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/multiple_replicas/MultiReplicaDirectRoutingMultiJvmNode1.opts
rename to akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode1.opts
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_failover/RoundRobinFailoverMultiJvmNode4.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode2.conf
similarity index 71%
rename from akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_failover/RoundRobinFailoverMultiJvmNode4.conf
rename to akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode2.conf
index 366cf9111c..c75c38be44 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_failover/RoundRobinFailoverMultiJvmNode4.conf
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode2.conf
@@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
-akka.actor.deployment.service-hello.router = "round-robin"
-akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
-akka.actor.deployment.service-hello.clustered.replication-factor = 1
\ No newline at end of file
+akka.actor.deployment.service-hello.router = "random"
+akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node2"]
+akka.actor.deployment.service-hello.clustered.replication-factor = 2
\ No newline at end of file
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_2_replicas/RoundRobin2ReplicasMultiJvmNode2.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode2.opts
similarity index 100%
rename from akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_2_replicas/RoundRobin2ReplicasMultiJvmNode2.opts
rename to akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode2.opts
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode3.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode3.conf
new file mode 100644
index 0000000000..c75c38be44
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode3.conf
@@ -0,0 +1,5 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "WARNING"
+akka.actor.deployment.service-hello.router = "random"
+akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node2"]
+akka.actor.deployment.service-hello.clustered.replication-factor = 2
\ No newline at end of file
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_3_replicas/RoundRobin3ReplicasMultiJvmNode3.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode3.opts
similarity index 100%
rename from akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_3_replicas/RoundRobin3ReplicasMultiJvmNode3.opts
rename to akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode3.opts
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmSpec.scala
new file mode 100644
index 0000000000..27dc35a8f6
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmSpec.scala
@@ -0,0 +1,119 @@
+package akka.cluster.routing.random.failover
+
+import akka.config.Config
+import akka.cluster._
+import akka.actor.{ ActorRef, Actor }
+import java.util.{ Collections, Set ⇒ JSet }
+
+object RandomFailoverMultiJvmSpec {
+
+ val NrOfNodes = 3
+
+ class SomeActor extends Actor with Serializable {
+ println("---------------------------------------------------------------------------")
+ println("SomeActor has been created on node [" + Config.nodename + "]")
+ println("---------------------------------------------------------------------------")
+
+ def receive = {
+ case "identify" ⇒ {
+ println("The node received the 'identify' command")
+ self.reply(Config.nodename)
+ }
+ case "shutdown" ⇒ {
+ println("The node received the 'shutdown' command")
+ Cluster.node.shutdown()
+ }
+ }
+ }
+
+}
+
+class RandomFailoverMultiJvmNode1 extends MasterClusterTestNode {
+
+ import RandomFailoverMultiJvmSpec._
+
+ def testNodes = NrOfNodes
+
+ def sleepSome() {
+ println("Starting sleep")
+ Thread.sleep(1000) //nasty.. but ok for now.
+ println("Finished doing sleep")
+ }
+
+ "Random: when routing fails" must {
+ "jump to another replica" in {
+ Cluster.node.start()
+ Cluster.barrier("waiting-for-begin", NrOfNodes).await()
+
+ // ============= the real testing =================
+ val actor = Actor.actorOf[SomeActor]("service-hello").asInstanceOf[ClusterActorRef]
+
+ val oldFoundConnections = identifyConnections(actor)
+ println("---------------------------- oldFoundConnections ------------------------")
+ println(oldFoundConnections)
+
+ //since we have replication factor 2
+ oldFoundConnections.size() must be(2)
+
+ //terminate a node
+ actor ! "shutdown"
+
+ sleepSome()
+
+ //this is where the system behaves unpredictable. From time to time it works... from time to time there
+ //all kinds of connection timeouts. So this test shows that there are problems. For the time being
+ //the test code has been deactivated to prevent causing problems.
+
+ val newFoundConnections = identifyConnections(actor)
+ println("---------------------------- newFoundConnections ------------------------")
+ println(newFoundConnections)
+
+ //it still must be 2 since a different node should have been used to failover to
+ newFoundConnections.size() must be(2)
+ //they are not disjoint since, there must be a single element that is in both
+ Collections.disjoint(newFoundConnections, oldFoundConnections) must be(false)
+ //but they should not be equal since the shutdown-node has been replaced by another one.
+ newFoundConnections.equals(oldFoundConnections) must be(false)
+
+ Cluster.node.shutdown()
+ }
+ }
+
+ def identifyConnections(actor: ActorRef): JSet[String] = {
+ val set = new java.util.HashSet[String]
+ for (i ← 0 until NrOfNodes * 10) {
+ val value = (actor ? "identify").get.asInstanceOf[String]
+ set.add(value)
+ }
+ set
+ }
+}
+
+class RandomFailoverMultiJvmNode2 extends ClusterTestNode {
+
+ import RandomFailoverMultiJvmSpec._
+
+ "___" must {
+ "___" in {
+ Cluster.node.start()
+ Cluster.barrier("waiting-for-begin", NrOfNodes).await()
+
+ Thread.sleep(30 * 1000)
+ }
+ }
+}
+
+class RandomFailoverMultiJvmNode3 extends ClusterTestNode {
+
+ import RandomFailoverMultiJvmSpec._
+
+ "___" must {
+ "___" in {
+ Cluster.node.start()
+ Cluster.barrier("waiting-for-begin", NrOfNodes).await()
+
+ Thread.sleep(30 * 1000)
+ }
+ }
+}
+
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/homenode/HomeNodeMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/homenode/HomeNodeMultiJvmNode1.conf
new file mode 100644
index 0000000000..74ca26985f
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/homenode/HomeNodeMultiJvmNode1.conf
@@ -0,0 +1,8 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "WARNING"
+akka.actor.deployment.service-node1.router = "random"
+akka.actor.deployment.service-node1.clustered.preferred-nodes = ["node:node1"]
+akka.actor.deployment.service-node1.clustered.replication-factor = 1
+akka.actor.deployment.service-node2.router = "random"
+akka.actor.deployment.service-node2.clustered.preferred-nodes = ["node:node2"]
+akka.actor.deployment.service-node2.clustered.replication-factor = 1
\ No newline at end of file
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode1.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/homenode/HomeNodeMultiJvmNode1.opts
similarity index 100%
rename from akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode1.opts
rename to akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/homenode/HomeNodeMultiJvmNode1.opts
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/homenode/HomeNodeMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/homenode/HomeNodeMultiJvmNode2.conf
new file mode 100644
index 0000000000..74ca26985f
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/homenode/HomeNodeMultiJvmNode2.conf
@@ -0,0 +1,8 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "WARNING"
+akka.actor.deployment.service-node1.router = "random"
+akka.actor.deployment.service-node1.clustered.preferred-nodes = ["node:node1"]
+akka.actor.deployment.service-node1.clustered.replication-factor = 1
+akka.actor.deployment.service-node2.router = "random"
+akka.actor.deployment.service-node2.clustered.preferred-nodes = ["node:node2"]
+akka.actor.deployment.service-node2.clustered.replication-factor = 1
\ No newline at end of file
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_3_replicas/RoundRobin3ReplicasMultiJvmNode2.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/homenode/HomeNodeMultiJvmNode2.opts
similarity index 100%
rename from akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_3_replicas/RoundRobin3ReplicasMultiJvmNode2.opts
rename to akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/homenode/HomeNodeMultiJvmNode2.opts
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/homenode/HomeNodeMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/homenode/HomeNodeMultiJvmSpec.scala
new file mode 100644
index 0000000000..504eb594d3
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/homenode/HomeNodeMultiJvmSpec.scala
@@ -0,0 +1,59 @@
+package akka.cluster.routing.random.homenode
+
+import akka.config.Config
+import akka.actor.Actor
+import akka.cluster.{ ClusterTestNode, MasterClusterTestNode, Cluster }
+import Cluster._
+
+object HomeNodeMultiJvmSpec {
+
+ val NrOfNodes = 2
+
+ class SomeActor extends Actor with Serializable {
+ def receive = {
+ case "identify" ⇒ {
+ self.reply(Config.nodename)
+ }
+ }
+ }
+
+}
+
+class HomeNodeMultiJvmNode1 extends MasterClusterTestNode {
+
+ import HomeNodeMultiJvmSpec._
+
+ val testNodes = NrOfNodes
+
+ "___" must {
+ "___" in {
+ node.start()
+ barrier("waiting-for-begin", NrOfNodes).await()
+ barrier("waiting-for-end", NrOfNodes).await()
+ node.shutdown()
+ }
+ }
+}
+
+class HomeNodeMultiJvmNode2 extends ClusterTestNode {
+
+ import HomeNodeMultiJvmSpec._
+
+ "Random Router: A Random Router" must {
+ "obey 'home-node' config option when instantiated actor in cluster" in {
+ node.start()
+ barrier("waiting-for-begin", NrOfNodes).await()
+
+ val actorNode1 = Actor.actorOf[SomeActor]("service-node1")
+ val nameNode1 = (actorNode1 ? "identify").get.asInstanceOf[String]
+ nameNode1 must equal("node1")
+
+ val actorNode2 = Actor.actorOf[SomeActor]("service-node2")
+ val nameNode2 = (actorNode2 ? "identify").get.asInstanceOf[String]
+ nameNode2 must equal("node2")
+
+ barrier("waiting-for-end", NrOfNodes).await()
+ node.shutdown()
+ }
+ }
+}
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/single_replica/SingleReplicaDirectRoutingMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/replicationfactor_1/Random1ReplicaMultiJvmNode1.conf
similarity index 72%
rename from akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/single_replica/SingleReplicaDirectRoutingMultiJvmNode1.conf
rename to akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/replicationfactor_1/Random1ReplicaMultiJvmNode1.conf
index 0a47105d03..4140cf7450 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/direct/single_replica/SingleReplicaDirectRoutingMultiJvmNode1.conf
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/replicationfactor_1/Random1ReplicaMultiJvmNode1.conf
@@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
-akka.actor.deployment.service-hello.router = "direct"
+akka.actor.deployment.service-hello.router = "random"
akka.actor.deployment.service-hello.clustered.replication-factor = 1
\ No newline at end of file
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_2_replicas/RoundRobin2ReplicasMultiJvmNode1.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/replicationfactor_1/Random1ReplicaMultiJvmNode1.opts
similarity index 100%
rename from akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin_2_replicas/RoundRobin2ReplicasMultiJvmNode1.opts
rename to akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/replicationfactor_1/Random1ReplicaMultiJvmNode1.opts
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/replicationfactor_1/Random1ReplicaMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/replicationfactor_1/Random1ReplicaMultiJvmSpec.scala
new file mode 100644
index 0000000000..ce5a603555
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/replicationfactor_1/Random1ReplicaMultiJvmSpec.scala
@@ -0,0 +1,50 @@
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc.