Merge branch 'master' into derekjw-1054

Conflicts:
	akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala
This commit is contained in:
Derek Williams 2011-08-02 11:21:48 -06:00
commit 8db226f663
135 changed files with 1696 additions and 1243 deletions

View file

@ -85,12 +85,14 @@ class ActorFireForgetRequestReplySpec extends WordSpec with MustMatchers with Be
}
"should shutdown crashed temporary actor" in {
val actor = actorOf[CrashingTemporaryActor].start()
actor.isRunning must be(true)
actor ! "Die"
state.finished.await
sleepFor(1 second)
actor.isShutdown must be(true)
filterEvents(EventFilter[Exception]("Expected")) {
val actor = actorOf[CrashingTemporaryActor].start()
actor.isRunning must be(true)
actor ! "Die"
state.finished.await
sleepFor(1 second)
actor.isShutdown must be(true)
}
}
}
}

View file

@ -326,25 +326,27 @@ class ActorRefSpec extends WordSpec with MustMatchers {
}
"restart when Kill:ed" in {
val latch = new CountDownLatch(2)
filterException[ActorKilledException] {
val latch = new CountDownLatch(2)
val boss = Actor.actorOf(new Actor {
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), scala.Some(2), scala.Some(1000))
val boss = Actor.actorOf(new Actor {
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), scala.Some(2), scala.Some(1000))
val ref = Actor.actorOf(
new Actor {
def receive = { case _ }
override def preRestart(reason: Throwable, msg: Option[Any]) = latch.countDown()
override def postRestart(reason: Throwable) = latch.countDown()
}).start()
val ref = Actor.actorOf(
new Actor {
def receive = { case _ }
override def preRestart(reason: Throwable, msg: Option[Any]) = latch.countDown()
override def postRestart(reason: Throwable) = latch.countDown()
}).start()
self link ref
self link ref
protected def receive = { case "sendKill" ref ! Kill }
}).start()
protected def receive = { case "sendKill" ref ! Kill }
}).start()
boss ! "sendKill"
latch.await(5, TimeUnit.SECONDS) must be === true
boss ! "sendKill"
latch.await(5, TimeUnit.SECONDS) must be === true
}
}
}
}

View file

@ -85,75 +85,85 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo
ref.start()
}
val expectedEvents = Seq(EventFilter[ActorKilledException], EventFilter[IllegalActorStateException]("expected"))
"An Actor restart" must {
"invoke preRestart, preStart, postRestart" in {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
filterEvents(expectedEvents) {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
}
}
}
"support creation of nested actors in freshInstance()" in {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! Nested
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
val (tActor, tRef) = expectMsgType[(Actor, TestActorRef[Actor])]
tRef.underlyingActor must be(tActor)
expectMsg((tActor, tRef))
tRef.stop()
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
filterEvents(expectedEvents) {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! Nested
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
val (tActor, tRef) = expectMsgType[(Actor, TestActorRef[Actor])]
tRef.underlyingActor must be(tActor)
expectMsg((tActor, tRef))
tRef.stop()
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
}
}
}
"use freshInstance() if available" in {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! 42
actor ! Handover
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
filterEvents(expectedEvents) {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! 42
actor ! Handover
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
}
actor ! "get"
expectMsg(1 second, 42)
}
actor ! "get"
expectMsg(1 second, 42)
}
"fall back to default factory if freshInstance() fails" in {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! 42
actor ! Fail
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
filterEvents(expectedEvents) {
val actor = newActor(new Restarter(testActor))
expectMsg(1 second, ("preStart", 1))
val supervisor = newActor(new Supervisor)
supervisor link actor
actor ! 42
actor ! Fail
actor ! Kill
within(1 second) {
expectMsg(("preRestart", Some(Kill), 1))
expectMsg(("preStart", 2))
expectMsg(("postRestart", 2))
expectNoMsg
}
actor ! "get"
expectMsg(1 second, 0)
}
actor ! "get"
expectMsg(1 second, 0)
}
}

View file

@ -8,7 +8,7 @@ import org.scalatest.{ WordSpec, BeforeAndAfterAll, BeforeAndAfterEach }
import org.scalatest.matchers.MustMatchers
import akka.testkit._
import TestEvent.{ Mute, UnMuteAll }
import FSM._
import akka.util.Duration
import akka.util.duration._
@ -59,8 +59,8 @@ object FSMActorSpec {
whenUnhandled {
case Ev(msg) {
unhandledLatch.open
EventHandler.info(this, "unhandled event " + msg + " in state " + stateName + " with data " + stateData)
unhandledLatch.open
stay
}
}
@ -112,12 +112,18 @@ class FSMActorSpec extends WordSpec with MustMatchers with TestKit with BeforeAn
}
override def beforeAll {
EventHandler notify Mute(EventFilter[EventHandler.EventHandlerException]("Next state 2 does not exist"),
EventFilter.custom {
case _: EventHandler.Debug true
case _ false
})
val f = FSM.getClass.getDeclaredField("debugEvent")
f.setAccessible(true)
f.setBoolean(FSM, true)
}
override def afterAll {
EventHandler notify UnMuteAll
val f = FSM.getClass.getDeclaredField("debugEvent")
f.setAccessible(true)
f.setBoolean(FSM, false)
@ -151,8 +157,13 @@ class FSMActorSpec extends WordSpec with MustMatchers with TestKit with BeforeAn
transitionCallBackLatch.await
lockedLatch.await
lock ! "not_handled"
unhandledLatch.await
filterEvents(EventFilter.custom {
case EventHandler.Info(_: Lock, _) true
case _ false
}) {
lock ! "not_handled"
unhandledLatch.await
}
val answerLatch = TestLatch()
object Hello

View file

@ -10,6 +10,7 @@ import akka.testkit._
import akka.testkit._
import akka.util.duration._
import akka.config.Supervision._
import akka.event.EventHandler
import FSM._
@ -79,9 +80,14 @@ class FSMTransitionSpec extends WordSpec with MustMatchers with TestKit {
val sup = Actor.actorOf[Supervisor].start()
sup link fsm
within(300 millis) {
fsm ! SubscribeTransitionCallBack(forward)
fsm ! "reply"
expectMsg("reply")
filterEvents(EventFilter.custom {
case EventHandler.Warning(_: MyFSM, _) true
case _ false
}) {
fsm ! SubscribeTransitionCallBack(forward)
fsm ! "reply"
expectMsg("reply")
}
forward.start()
fsm ! SubscribeTransitionCallBack(forward)
expectMsg(CurrentState(fsm, 0))

View file

@ -5,7 +5,7 @@ package akka.actor
import org.scalatest.{ WordSpec, BeforeAndAfterAll, BeforeAndAfterEach }
import org.scalatest.matchers.MustMatchers
import akka.testkit.{ TestKit, TestActorRef }
import akka.testkit.{ TestKit, TestActorRef, EventFilter, TestEvent }
import akka.event.EventHandler
import Actor._
import akka.util.duration._
@ -22,6 +22,11 @@ class LoggingReceiveSpec
val level = EventHandler.level
override def beforeAll {
EventHandler.notify(TestEvent.Mute(EventFilter[UnhandledMessageException],
EventFilter[ActorKilledException], EventFilter.custom {
case d: EventHandler.Debug true
case _ false
}))
EventHandler.addListener(testActor)
EventHandler.level = EventHandler.DebugLevel
}
@ -29,6 +34,7 @@ class LoggingReceiveSpec
override def afterAll {
EventHandler.removeListener(testActor)
EventHandler.level = level
EventHandler.notify(TestEvent.UnMuteAll)
}
override def afterEach {

View file

@ -14,6 +14,7 @@ import akka.util.Duration
import akka.dispatch.{ Dispatchers, Future, KeptPromise }
import java.util.concurrent.atomic.AtomicReference
import annotation.tailrec
import akka.testkit.{ EventFilter, filterEvents }
object TypedActorSpec {
@ -172,9 +173,11 @@ class TypedActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
}
"throw an IllegalStateExcpetion when TypedActor.self is called in the wrong scope" in {
(intercept[IllegalStateException] {
TypedActor.self[Foo]
}).getMessage must equal("Calling TypedActor.self outside of a TypedActor implementation method!")
filterEvents(EventFilter[IllegalStateException]("Calling")) {
(intercept[IllegalStateException] {
TypedActor.self[Foo]
}).getMessage must equal("Calling TypedActor.self outside of a TypedActor implementation method!")
}
}
"have access to itself when executing a method call" in {
@ -259,27 +262,29 @@ class TypedActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
}
"be able to handle exceptions when calling methods" in {
val t = newFooBar
filterEvents(EventFilter[IllegalStateException]("expected")) {
val t = newFooBar
t.incr()
t.failingPigdog()
t.read() must be(1) //Make sure state is not reset after failure
t.incr()
t.failingPigdog()
t.read() must be(1) //Make sure state is not reset after failure
t.failingFuturePigdog.await.exception.get.getMessage must be("expected")
t.read() must be(1) //Make sure state is not reset after failure
t.failingFuturePigdog.await.exception.get.getMessage must be("expected")
t.read() must be(1) //Make sure state is not reset after failure
(intercept[IllegalStateException] {
t.failingJOptionPigdog
}).getMessage must be("expected")
t.read() must be(1) //Make sure state is not reset after failure
(intercept[IllegalStateException] {
t.failingJOptionPigdog
}).getMessage must be("expected")
t.read() must be(1) //Make sure state is not reset after failure
(intercept[IllegalStateException] {
t.failingOptionPigdog
}).getMessage must be("expected")
(intercept[IllegalStateException] {
t.failingOptionPigdog
}).getMessage must be("expected")
t.read() must be(1) //Make sure state is not reset after failure
t.read() must be(1) //Make sure state is not reset after failure
mustStop(t)
mustStop(t)
}
}
"be able to support stacked traits for the interface part" in {

View file

@ -8,72 +8,75 @@ import org.scalatest.matchers.MustMatchers
import akka.dispatch.Dispatchers
import akka.config.Supervision.{ SupervisorConfig, OneForOneStrategy, Supervise, Permanent }
import java.util.concurrent.CountDownLatch
import akka.testkit.{ filterEvents, EventFilter }
class SupervisorMiscSpec extends WordSpec with MustMatchers {
"A Supervisor" should {
"restart a crashing actor and its dispatcher for any dispatcher" in {
val countDownLatch = new CountDownLatch(4)
filterEvents(EventFilter[Exception]("killed")) {
val countDownLatch = new CountDownLatch(4)
val actor1 = Actor.actorOf(new Actor {
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
val actor1 = Actor.actorOf(new Actor {
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
protected def receive = {
case "kill" throw new Exception("killed")
case _ println("received unknown message")
}
}).start()
protected def receive = {
case "kill" throw new Exception("killed")
case _ println("received unknown message")
}
}).start()
val actor2 = Actor.actorOf(new Actor {
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
val actor2 = Actor.actorOf(new Actor {
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
protected def receive = {
case "kill" throw new Exception("killed")
case _ println("received unknown message")
}
}).start()
protected def receive = {
case "kill" throw new Exception("killed")
case _ println("received unknown message")
}
}).start()
val actor3 = Actor.actorOf(new Actor {
self.dispatcher = Dispatchers.newDispatcher("test").build
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
val actor3 = Actor.actorOf(new Actor {
self.dispatcher = Dispatchers.newDispatcher("test").build
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
protected def receive = {
case "kill" throw new Exception("killed")
case _ println("received unknown message")
}
}).start()
protected def receive = {
case "kill" throw new Exception("killed")
case _ println("received unknown message")
}
}).start()
val actor4 = Actor.actorOf(new Actor {
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
val actor4 = Actor.actorOf(new Actor {
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
protected def receive = {
case "kill" throw new Exception("killed")
case _ println("received unknown message")
}
}).start()
protected def receive = {
case "kill" throw new Exception("killed")
case _ println("received unknown message")
}
}).start()
val sup = Supervisor(
SupervisorConfig(
OneForOneStrategy(List(classOf[Exception]), 3, 5000),
Supervise(actor1, Permanent) ::
Supervise(actor2, Permanent) ::
Supervise(actor3, Permanent) ::
Supervise(actor4, Permanent) ::
Nil))
val sup = Supervisor(
SupervisorConfig(
OneForOneStrategy(List(classOf[Exception]), 3, 5000),
Supervise(actor1, Permanent) ::
Supervise(actor2, Permanent) ::
Supervise(actor3, Permanent) ::
Supervise(actor4, Permanent) ::
Nil))
actor1 ! "kill"
actor2 ! "kill"
actor3 ! "kill"
actor4 ! "kill"
actor1 ! "kill"
actor2 ! "kill"
actor3 ! "kill"
actor4 ! "kill"
countDownLatch.await()
assert(!actor1.isShutdown, "actor1 is shutdown")
assert(!actor2.isShutdown, "actor2 is shutdown")
assert(!actor3.isShutdown, "actor3 is shutdown")
assert(!actor4.isShutdown, "actor4 is shutdown")
countDownLatch.await()
assert(!actor1.isShutdown, "actor1 is shutdown")
assert(!actor2.isShutdown, "actor2 is shutdown")
assert(!actor3.isShutdown, "actor3 is shutdown")
assert(!actor4.isShutdown, "actor4 is shutdown")
}
}
}
}

View file

@ -70,6 +70,7 @@ object SupervisorSpec {
override def receive = {
case Die (temp.?(Die, TimeoutMillis)).get
case _: MaximumNumberOfRestartsWithinTimeRangeReached
}
}
@ -200,7 +201,8 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach
override def beforeAll() = {
EventHandler notify Mute(EventFilter[Exception]("Die"),
EventFilter[IllegalStateException]("Don't wanna!"))
EventFilter[IllegalStateException]("Don't wanna!"),
EventFilter[RuntimeException]("Expected"))
}
override def afterAll() = {

View file

@ -8,6 +8,7 @@ import org.scalatest.matchers.MustMatchers
import akka.util.duration._
import akka.testkit.Testing.sleepFor
import akka.testkit.{ EventFilter, filterEvents, filterException }
import akka.dispatch.Dispatchers
import akka.config.Supervision.{ SupervisorConfig, OneForOneStrategy, Supervise, Permanent }
import Actor._
@ -33,15 +34,17 @@ class SupervisorTreeSpec extends WordSpec with MustMatchers {
"In a 3 levels deep supervisor tree (linked in the constructor) we" must {
"be able to kill the middle actor and see itself and its child restarted" in {
log = "INIT"
filterException[Exception] {
log = "INIT"
val lastActor = actorOf(new Chainer, "lastActor").start
val middleActor = actorOf(new Chainer(Some(lastActor)), "middleActor").start
val headActor = actorOf(new Chainer(Some(middleActor)), "headActor").start
val lastActor = actorOf(new Chainer, "lastActor").start
val middleActor = actorOf(new Chainer(Some(lastActor)), "middleActor").start
val headActor = actorOf(new Chainer(Some(middleActor)), "headActor").start
middleActor ! Die
sleepFor(500 millis)
log must equal("INITmiddleActorlastActor")
middleActor ! Die
sleepFor(500 millis)
log must equal("INITmiddleActorlastActor")
}
}
}
}

View file

@ -7,7 +7,7 @@ import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.actor._
import akka.config.Supervision._
import akka.testkit.{ filterEvents, EventFilter }
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
import org.scalatest.matchers.MustMatchers
@ -23,30 +23,33 @@ class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll {
"A supervised actor with lifecycle PERMANENT" should {
"be able to reply on failure during preRestart" in {
filterEvents(EventFilter[Exception]("test")) {
val latch = new CountDownLatch(1)
val sender = Actor.actorOf(new Sender(latch)).start()
val latch = new CountDownLatch(1)
val sender = Actor.actorOf(new Sender(latch)).start()
val supervised = Actor.actorOf[Supervised]
val supervisor = Supervisor(SupervisorConfig(
AllForOneStrategy(List(classOf[Exception]), 5, 10000),
Supervise(supervised, Permanent) :: Nil))
val supervised = Actor.actorOf[Supervised]
val supervisor = Supervisor(SupervisorConfig(
AllForOneStrategy(List(classOf[Exception]), 5, 10000),
Supervise(supervised, Permanent) :: Nil))
supervised.!("test")(Some(sender))
latch.await(5, TimeUnit.SECONDS) must be(true)
supervised.!("test")(Some(sender))
latch.await(5, TimeUnit.SECONDS) must be(true)
}
}
"be able to reply on failure during postStop" in {
val latch = new CountDownLatch(1)
val sender = Actor.actorOf(new Sender(latch)).start()
filterEvents(EventFilter[Exception]("test")) {
val latch = new CountDownLatch(1)
val sender = Actor.actorOf(new Sender(latch)).start()
val supervised = Actor.actorOf[Supervised]
val supervisor = Supervisor(SupervisorConfig(
AllForOneStrategy(List(classOf[Exception]), 5, 10000),
Supervise(supervised, Temporary) :: Nil))
val supervised = Actor.actorOf[Supervised]
val supervisor = Supervisor(SupervisorConfig(
AllForOneStrategy(List(classOf[Exception]), 5, 10000),
Supervise(supervised, Temporary) :: Nil))
supervised.!("test")(Some(sender))
latch.await(5, TimeUnit.SECONDS) must be(true)
supervised.!("test")(Some(sender))
latch.await(5, TimeUnit.SECONDS) must be(true)
}
}
}
}

View file

@ -5,7 +5,7 @@ package akka.actor.dispatch
import org.scalatest.junit.JUnitSuite
import org.scalatest.Assertions._
import akka.testkit.Testing
import akka.testkit.{ Testing, filterEvents, EventFilter }
import akka.dispatch._
import akka.actor.Actor._
import java.util.concurrent.atomic.AtomicLong
@ -330,15 +330,17 @@ abstract class ActorModelSpec extends JUnitSuite {
@Test
def dispatcherShouldSuspendAndResumeAFailingNonSupervisedPermanentActor {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.start()
val done = new CountDownLatch(1)
a ! Restart
a ! CountDown(done)
assertCountDown(done, Testing.testTime(3000), "Should be suspended+resumed and done with next message within 3 seconds")
a.stop()
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 2,
msgsProcessed = 2, suspensions = 1, resumes = 1)
filterEvents(EventFilter[Exception]("Restart")) {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.start()
val done = new CountDownLatch(1)
a ! Restart
a ! CountDown(done)
assertCountDown(done, Testing.testTime(3000), "Should be suspended+resumed and done with next message within 3 seconds")
a.stop()
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 2,
msgsProcessed = 2, suspensions = 1, resumes = 1)
}
}
@Test
@ -397,48 +399,52 @@ abstract class ActorModelSpec extends JUnitSuite {
@Test
def dispatcherShouldContinueToProcessMessagesWhenAThreadGetsInterrupted {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.start()
val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar")
val f3 = a ? Interrupt
val f4 = a ? Reply("foo2")
val f5 = a ? Interrupt
val f6 = a ? Reply("bar2")
filterEvents(EventFilter[InterruptedException]("Ping!"), EventFilter[akka.event.EventHandler.EventHandlerException]) {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.start()
val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar")
val f3 = a ? Interrupt
val f4 = a ? Reply("foo2")
val f5 = a ? Interrupt
val f6 = a ? Reply("bar2")
assert(f1.get === "foo")
assert(f2.get === "bar")
assert((intercept[InterruptedException] {
f3.get
}).getMessage === "Ping!")
assert(f4.get === "foo2")
assert((intercept[InterruptedException] {
f5.get
}).getMessage === "Ping!")
assert(f6.get === "bar2")
assert(f1.get === "foo")
assert(f2.get === "bar")
assert((intercept[InterruptedException] {
f3.get
}).getMessage === "Ping!")
assert(f4.get === "foo2")
assert((intercept[InterruptedException] {
f5.get
}).getMessage === "Ping!")
assert(f6.get === "bar2")
}
}
@Test
def dispatcherShouldContinueToProcessMessagesWhenExceptionIsThrown {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.start()
val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar")
val f3 = a ? new ThrowException(new IndexOutOfBoundsException("IndexOutOfBoundsException"))
val f4 = a ? Reply("foo2")
val f5 = a ? new ThrowException(new RemoteException("RemoteException"))
val f6 = a ? Reply("bar2")
filterEvents(EventFilter[IndexOutOfBoundsException], EventFilter[RemoteException]) {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.start()
val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar")
val f3 = a ? new ThrowException(new IndexOutOfBoundsException("IndexOutOfBoundsException"))
val f4 = a ? Reply("foo2")
val f5 = a ? new ThrowException(new RemoteException("RemoteException"))
val f6 = a ? Reply("bar2")
assert(f1.get === "foo")
assert(f2.get === "bar")
assert((intercept[IndexOutOfBoundsException] {
f3.get
}).getMessage === "IndexOutOfBoundsException")
assert(f4.get === "foo2")
assert((intercept[RemoteException] {
f5.get
}).getMessage === "RemoteException")
assert(f6.get === "bar2")
assert(f1.get === "foo")
assert(f2.get === "bar")
assert((intercept[IndexOutOfBoundsException] {
f3.get
}).getMessage === "IndexOutOfBoundsException")
assert(f4.get === "foo2")
assert((intercept[RemoteException] {
f5.get
}).getMessage === "RemoteException")
assert(f6.get === "bar2")
}
}
}

View file

@ -7,6 +7,7 @@ import akka.dispatch.{ Dispatchers, Dispatcher }
import akka.actor.Actor
import Actor._
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
import akka.testkit.{ filterEvents, EventFilter }
object DispatcherActorSpec {
class TestActor extends Actor {
@ -35,7 +36,7 @@ class DispatcherActorSpec extends JUnitSuite {
private val unit = TimeUnit.MILLISECONDS
@Test
def shouldSendOneWay = {
def shouldTell = {
val actor = actorOf[OneWayTestActor].start()
val result = actor ! "OneWay"
assert(OneWayTestActor.oneWay.await(1, TimeUnit.SECONDS))
@ -60,15 +61,17 @@ class DispatcherActorSpec extends JUnitSuite {
@Test
def shouldSendReceiveException = {
val actor = actorOf[TestActor].start()
try {
(actor ? "Failure").get
fail("Should have thrown an exception")
} catch {
case e
assert("Expected exception; to test fault-tolerance" === e.getMessage())
filterEvents(EventFilter[RuntimeException]("Expected")) {
val actor = actorOf[TestActor].start()
try {
(actor ? "Failure").get
fail("Should have thrown an exception")
} catch {
case e
assert("Expected exception; to test fault-tolerance" === e.getMessage())
}
actor.stop()
}
actor.stop()
}
@Test

View file

@ -12,9 +12,7 @@ import org.scalacheck.Gen._
import akka.actor.{ Actor, ActorRef }
import Actor._
import akka.event.EventHandler
import akka.testkit.TestEvent._
import akka.testkit.EventFilter
import akka.testkit.{ EventFilter, filterEvents, filterException }
import org.multiverse.api.latches.StandardLatch
import java.util.concurrent.{ TimeUnit, CountDownLatch }
@ -47,14 +45,6 @@ class JavaFutureSpec extends JavaFutureTests with JUnitSuite
class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAndAfterAll {
import FutureSpec._
override def beforeAll() {
EventHandler.notify(Mute(EventFilter[RuntimeException]))
}
override def afterAll() {
EventHandler.notify(UnMuteAll)
}
"A Promise" when {
"never completed" must {
behave like emptyFuture(_(Promise()))
@ -108,17 +98,19 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
}
"has actions applied" must {
"pass checks" in {
check({ (future: Future[Int], actions: List[FutureAction])
val result = (future /: actions)(_ /: _)
val expected = (future.await.value.get /: actions)(_ /: _)
((result.await.value.get, expected) match {
case (Right(a), Right(b)) a == b
case (Left(a), Left(b)) if a.toString == b.toString true
case (Left(a), Left(b)) if a.getStackTrace.isEmpty || b.getStackTrace.isEmpty
a.getClass.toString == b.getClass.toString
case _ false
}) :| result.value.get.toString + " is expected to be " + expected.toString
}, minSuccessful(10000), workers(4))
filterException[ArithmeticException] {
check({ (future: Future[Int], actions: List[FutureAction])
val result = (future /: actions)(_ /: _)
val expected = (future.await.value.get /: actions)(_ /: _)
((result.await.value.get, expected) match {
case (Right(a), Right(b)) a == b
case (Left(a), Left(b)) if a.toString == b.toString true
case (Left(a), Left(b)) if a.getStackTrace.isEmpty || b.getStackTrace.isEmpty
a.getClass.toString == b.getClass.toString
case _ false
}) :| result.value.get.toString + " is expected to be " + expected.toString
}, minSuccessful(10000), workers(4))
}
}
}
}
@ -136,12 +128,14 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
}
"throws an exception" must {
behave like futureWithException[RuntimeException] { test
val actor = actorOf[TestActor]
actor.start()
val future = actor ? "Failure"
future.await
test(future, "Expected exception; to test fault-tolerance")
actor.stop()
filterException[RuntimeException] {
val actor = actorOf[TestActor]
actor.start()
val future = actor ? "Failure"
future.await
test(future, "Expected exception; to test fault-tolerance")
actor.stop()
}
}
}
}
@ -160,13 +154,15 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
}
"will throw an exception" must {
behave like futureWithException[ArithmeticException] { test
val actor1 = actorOf[TestActor].start()
val actor2 = actorOf(new Actor { def receive = { case s: String self reply (s.length / 0) } }).start()
val future = actor1 ? "Hello" flatMap { _ match { case s: String actor2 ? s } }
future.await
test(future, "/ by zero")
actor1.stop()
actor2.stop()
filterException[ArithmeticException] {
val actor1 = actorOf[TestActor].start()
val actor2 = actorOf(new Actor { def receive = { case s: String self reply (s.length / 0) } }).start()
val future = actor1 ? "Hello" flatMap { _ match { case s: String actor2 ? s } }
future.await
test(future, "/ by zero")
actor1.stop()
actor2.stop()
}
}
}
}
@ -174,102 +170,109 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
"being tested" must {
"compose with for-comprehensions" in {
val actor = actorOf(new Actor {
def receive = {
case s: String self reply s.length
case i: Int self reply (i * 2).toString
}
}).start()
filterException[ClassCastException] {
val actor = actorOf(new Actor {
def receive = {
case s: String self reply s.length
case i: Int self reply (i * 2).toString
}
}).start()
val future0 = actor ? "Hello"
val future0 = actor ? "Hello"
val future1 = for {
a: Int future0.mapTo[Int] // returns 5
b: String (actor ? a).mapTo[String] // returns "10"
c: String (actor ? 7).mapTo[String] // returns "14"
} yield b + "-" + c
val future1 = for {
a future0.mapTo[Int] // returns 5
b (actor ? a).mapTo[String] // returns "10"
c (actor ? 7).mapTo[String] // returns "14"
} yield b + "-" + c
val future2 = for {
a: Int future0.mapTo[Int]
b: Int (actor ? a).mapTo[Int]
c: String (actor ? 7).mapTo[String]
} yield b + "-" + c
val future2 = for {
a future0.mapTo[Int]
b (actor ? a).mapTo[Int]
c (actor ? 7).mapTo[String]
} yield b + "-" + c
future1.get must be("10-14")
intercept[ClassCastException] { future2.get }
actor.stop()
future1.get must be("10-14")
assert(checkType(future1, manifest[String]))
intercept[ClassCastException] { future2.get }
actor.stop()
}
}
"support pattern matching within a for-comprehension" in {
case class Req[T](req: T)
case class Res[T](res: T)
val actor = actorOf(new Actor {
def receive = {
case Req(s: String) self reply Res(s.length)
case Req(i: Int) self reply Res((i * 2).toString)
}
}).start()
filterException[MatchError] {
case class Req[T](req: T)
case class Res[T](res: T)
val actor = actorOf(new Actor {
def receive = {
case Req(s: String) self reply Res(s.length)
case Req(i: Int) self reply Res((i * 2).toString)
}
}).start()
val future1 = for {
Res(a: Int) actor ? Req("Hello")
Res(b: String) actor ? Req(a)
Res(c: String) actor ? Req(7)
} yield b + "-" + c
val future1 = for {
Res(a: Int) actor ? Req("Hello")
Res(b: String) actor ? Req(a)
Res(c: String) actor ? Req(7)
} yield b + "-" + c
val future2 = for {
Res(a: Int) actor ? Req("Hello")
Res(b: Int) actor ? Req(a)
Res(c: Int) actor ? Req(7)
} yield b + "-" + c
val future2 = for {
Res(a: Int) actor ? Req("Hello")
Res(b: Int) actor ? Req(a)
Res(c: Int) actor ? Req(7)
} yield b + "-" + c
future1.get must be("10-14")
intercept[MatchError] { future2.get }
actor.stop()
future1.get must be("10-14")
intercept[MatchError] { future2.get }
actor.stop()
}
}
"recover from exceptions" in {
val future1 = Future(5)
val future2 = future1 map (_ / 0)
val future3 = future2 map (_.toString)
filterException[RuntimeException] {
val future1 = Future(5)
val future2 = future1 map (_ / 0)
val future3 = future2 map (_.toString)
val future4 = future1 recover {
case e: ArithmeticException 0
} map (_.toString)
val future4 = future1 recover {
case e: ArithmeticException 0
} map (_.toString)
val future5 = future2 recover {
case e: ArithmeticException 0
} map (_.toString)
val future5 = future2 recover {
case e: ArithmeticException 0
} map (_.toString)
val future6 = future2 recover {
case e: MatchError 0
} map (_.toString)
val future6 = future2 recover {
case e: MatchError 0
} map (_.toString)
val future7 = future3 recover { case e: ArithmeticException "You got ERROR" }
val future7 = future3 recover { case e: ArithmeticException "You got ERROR" }
val actor = actorOf[TestActor].start()
val actor = actorOf[TestActor].start()
val future8 = actor ? "Failure"
val future9 = actor ? "Failure" recover {
case e: RuntimeException "FAIL!"
val future8 = actor ? "Failure"
val future9 = actor ? "Failure" recover {
case e: RuntimeException "FAIL!"
}
val future10 = actor ? "Hello" recover {
case e: RuntimeException "FAIL!"
}
val future11 = actor ? "Failure" recover { case _ "Oops!" }
future1.get must be(5)
intercept[ArithmeticException] { future2.get }
intercept[ArithmeticException] { future3.get }
future4.get must be("5")
future5.get must be("0")
intercept[ArithmeticException] { future6.get }
future7.get must be("You got ERROR")
intercept[RuntimeException] { future8.get }
future9.get must be("FAIL!")
future10.get must be("World")
future11.get must be("Oops!")
actor.stop()
}
val future10 = actor ? "Hello" recover {
case e: RuntimeException "FAIL!"
}
val future11 = actor ? "Failure" recover { case _ "Oops!" }
future1.get must be(5)
intercept[ArithmeticException] { future2.get }
intercept[ArithmeticException] { future3.get }
future4.get must be("5")
future5.get must be("0")
intercept[ArithmeticException] { future6.get }
future7.get must be("You got ERROR")
intercept[RuntimeException] { future8.get }
future9.get must be("FAIL!")
future10.get must be("World")
future11.get must be("Oops!")
actor.stop()
}
"fold" in {
@ -294,19 +297,21 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
}
"fold with an exception" in {
val actors = (1 to 10).toList map { _
actorOf(new Actor {
def receive = {
case (add: Int, wait: Int)
Thread.sleep(wait)
if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
self tryReply add
}
}).start()
filterException[IllegalArgumentException] {
val actors = (1 to 10).toList map { _
actorOf(new Actor {
def receive = {
case (add: Int, wait: Int)
Thread.sleep(wait)
if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
self tryReply add
}
}).start()
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 100), timeout).mapTo[Int] }
Futures.fold(0, timeout)(futures)(_ + _).await.exception.get.getMessage must be("shouldFoldResultsWithException: expected")
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 100), timeout).mapTo[Int] }
Futures.fold(0, timeout)(futures)(_ + _).await.exception.get.getMessage must be("shouldFoldResultsWithException: expected")
}
/* @Test
@ -341,23 +346,27 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
}
"shouldReduceResultsWithException" in {
val actors = (1 to 10).toList map { _
actorOf(new Actor {
def receive = {
case (add: Int, wait: Int)
Thread.sleep(wait)
if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
self tryReply add
}
}).start()
filterException[IllegalArgumentException] {
val actors = (1 to 10).toList map { _
actorOf(new Actor {
def receive = {
case (add: Int, wait: Int)
Thread.sleep(wait)
if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
self tryReply add
}
}).start()
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 100), timeout).mapTo[Int] }
assert(Futures.reduce(futures, timeout)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected")
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 100), timeout).mapTo[Int] }
assert(Futures.reduce(futures, timeout)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected")
}
"shouldReduceThrowIAEOnEmptyInput" in {
intercept[UnsupportedOperationException] { Futures.reduce(List[Future[Int]]())(_ + _).get }
filterException[IllegalArgumentException] {
intercept[UnsupportedOperationException] { Futures.reduce(List[Future[Int]]())(_ + _).get }
}
}
"receiveShouldExecuteOnComplete" in {
@ -389,28 +398,30 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
"shouldHandleThrowables" in {
class ThrowableTest(m: String) extends Throwable(m)
val f1 = Future { throw new ThrowableTest("test") }
f1.await
intercept[ThrowableTest] { f1.get }
filterException[ThrowableTest] {
val f1 = Future { throw new ThrowableTest("test") }
f1.await
intercept[ThrowableTest] { f1.get }
val latch = new StandardLatch
val f2 = Future { latch.tryAwait(5, TimeUnit.SECONDS); "success" }
f2 foreach (_ throw new ThrowableTest("dispatcher foreach"))
f2 onResult { case _ throw new ThrowableTest("dispatcher receive") }
val f3 = f2 map (s s.toUpperCase)
latch.open
f2.await
assert(f2.get === "success")
f2 foreach (_ throw new ThrowableTest("current thread foreach"))
f2 onResult { case _ throw new ThrowableTest("current thread receive") }
f3.await
assert(f3.get === "SUCCESS")
val latch = new StandardLatch
val f2 = Future { latch.tryAwait(5, TimeUnit.SECONDS); "success" }
f2 foreach (_ throw new ThrowableTest("dispatcher foreach"))
f2 onResult { case _ throw new ThrowableTest("dispatcher receive") }
val f3 = f2 map (s s.toUpperCase)
latch.open
f2.await
assert(f2.get === "success")
f2 foreach (_ throw new ThrowableTest("current thread foreach"))
f2 onResult { case _ throw new ThrowableTest("current thread receive") }
f3.await
assert(f3.get === "SUCCESS")
// give time for all callbacks to execute
Thread sleep 100
// give time for all callbacks to execute
Thread sleep 100
// make sure all futures are completed in dispatcher
assert(Dispatchers.defaultGlobalDispatcher.pendingTasks === 0)
// make sure all futures are completed in dispatcher
assert(Dispatchers.defaultGlobalDispatcher.tasks === 0)
}
}
"shouldBlockUntilResult" in {
@ -424,8 +435,10 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
assert(f2.get === 10)
val f3 = Future({ Thread.sleep(10); 5 }, 10)
intercept[FutureTimeoutException] {
f3.get
filterException[FutureTimeoutException] {
intercept[FutureTimeoutException] {
f3.get
}
}
}
@ -445,40 +458,46 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
}
"futureComposingWithContinuationsFailureDivideZero" in {
import Future.flow
filterException[ArithmeticException] {
import Future.flow
val x = Future("Hello")
val y = x map (_.length)
val x = Future("Hello")
val y = x map (_.length)
val r = flow(x() + " " + y.map(_ / 0).map(_.toString).apply, 100)
val r = flow(x() + " " + y.map(_ / 0).map(_.toString).apply, 100)
intercept[java.lang.ArithmeticException](r.get)
intercept[java.lang.ArithmeticException](r.get)
}
}
"futureComposingWithContinuationsFailureCastInt" in {
import Future.flow
filterException[ClassCastException] {
import Future.flow
val actor = actorOf[TestActor].start
val actor = actorOf[TestActor].start
val x = Future(3)
val y = (actor ? "Hello").mapTo[Int]
val x = Future(3)
val y = (actor ? "Hello").mapTo[Int]
val r = flow(x() + y(), 100)
val r = flow(x() + y(), 100)
intercept[ClassCastException](r.get)
intercept[ClassCastException](r.get)
}
}
"futureComposingWithContinuationsFailureCastNothing" in {
import Future.flow
filterException[ClassCastException] {
import Future.flow
val actor = actorOf[TestActor].start
val actor = actorOf[TestActor].start
val x = Future("Hello")
val y = actor ? "Hello" mapTo manifest[Nothing]
val x = Future("Hello")
val y = actor ? "Hello" mapTo manifest[Nothing]
val r = flow(x() + y())
val r = flow(x() + y())
intercept[ClassCastException](r.get)
intercept[ClassCastException](r.get)
}
}
"futureCompletingWithContinuations" in {
@ -522,7 +541,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
Thread.sleep(100)
// make sure all futures are completed in dispatcher
assert(Dispatchers.defaultGlobalDispatcher.pendingTasks === 0)
assert(Dispatchers.defaultGlobalDispatcher.tasks === 0)
}
"shouldNotAddOrRunCallbacksAfterFailureToBeCompletedBeforeExpiry" in {
@ -615,29 +634,31 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
}
"futureCompletingWithContinuationsFailure" in {
import Future.flow
filterException[ArithmeticException] {
import Future.flow
val x, y, z = Promise[Int]()
val ly, lz = new StandardLatch
val x, y, z = Promise[Int]()
val ly, lz = new StandardLatch
val result = flow {
y << x
ly.open
val oops = 1 / 0
z << x
lz.open
z() + y() + oops
val result = flow {
y << x
ly.open
val oops = 1 / 0
z << x
lz.open
z() + y() + oops
}
assert(!ly.tryAwaitUninterruptible(100, TimeUnit.MILLISECONDS))
assert(!lz.tryAwaitUninterruptible(100, TimeUnit.MILLISECONDS))
flow { x << 5 }
assert(y.get === 5)
intercept[java.lang.ArithmeticException](result.get)
assert(z.value === None)
assert(!lz.isOpen)
}
assert(!ly.tryAwaitUninterruptible(100, TimeUnit.MILLISECONDS))
assert(!lz.tryAwaitUninterruptible(100, TimeUnit.MILLISECONDS))
flow { x << 5 }
assert(y.get === 5)
intercept[java.lang.ArithmeticException](result.get)
assert(z.value === None)
assert(!lz.isOpen)
}
"futureContinuationsShouldNotBlock" in {
@ -663,8 +684,6 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
"futureFlowShouldBeTypeSafe" in {
import Future.flow
def checkType[A: Manifest, B](in: Future[A], refmanifest: Manifest[B]): Boolean = manifest[A] == refmanifest
val rString = flow {
val x = Future(5)
x().toString
@ -728,54 +747,66 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
}
"ticket812FutureDispatchCleanup" in {
implicit val dispatcher = new Dispatcher("ticket812FutureDispatchCleanup")
assert(dispatcher.pendingTasks === 0)
val future = Future({ Thread.sleep(100); "Done" }, 10)
intercept[FutureTimeoutException] { future.await }
assert(dispatcher.pendingTasks === 1)
Thread.sleep(200)
assert(dispatcher.pendingTasks === 0)
filterException[FutureTimeoutException] {
implicit val dispatcher = new Dispatcher("ticket812FutureDispatchCleanup")
assert(dispatcher.tasks === 0)
val future = Future({ Thread.sleep(100); "Done" }, 10)
intercept[FutureTimeoutException] { future.await }
assert(dispatcher.tasks === 1)
Thread.sleep(200)
assert(dispatcher.tasks === 0)
}
}
"run callbacks async" in {
val l1, l2, l3, l4, l5, l6 = new StandardLatch
val latch = Vector.fill(10)(new StandardLatch)
val f1 = Future { l1.await; "Hello" }
val f2 = f1 map { s l2.await; s.length }
val f1 = Future { latch(0).open; latch(1).await; "Hello" }
val f2 = f1 map { s latch(2).open; latch(3).await; s.length }
f2 foreach (_ latch(4).open)
latch(0).await
f1 must not be ('completed)
f2 must not be ('completed)
l1.open
latch(1).open
latch(2).await
f1.await must be('completed)
f1 must be('completed)
f2 must not be ('completed)
val f3 = f1 map { s l2.await; s.length * 2 }
val f3 = f1 map { s latch(5).open; latch(6).await; s.length * 2 }
f3 foreach (_ latch(3).open)
latch(5).await
f2 must not be ('completed)
f3 must not be ('completed)
l2.open
latch(6).open
latch(4).await
f2.await must be('completed)
f3.await must be('completed)
f2 must be('completed)
f3 must be('completed)
val p1 = Promise[String]()
val f4 = p1 map { s l3.await; s.length }
val f4 = p1 map { s latch(7).open; latch(8).await; s.length }
f4 foreach (_ latch(9).open)
p1 must not be ('completed)
f4 must not be ('completed)
p1 complete Right("Hello")
latch(7).await
p1 must be('completed)
f4 must not be ('completed)
l3.open
latch(8).open
latch(9).await
f4.await must be('completed)
}
}
}
@ -887,4 +918,5 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
}
def checkType[A: Manifest, B](in: Future[A], refmanifest: Manifest[B]): Boolean = manifest[A] == refmanifest
}

View file

@ -41,7 +41,7 @@ class PinnedActorSpec extends JUnitSuite {
}
@Test
def shouldSendOneWay {
def shouldTell {
var oneWay = new CountDownLatch(1)
val actor = actorOf(new Actor {
self.dispatcher = Dispatchers.newPinnedDispatcher(self)

View file

@ -7,7 +7,7 @@ import java.util.concurrent.atomic.AtomicInteger
import akka.actor.Actor._
import akka.testkit.Testing._
import akka.actor.{ TypedActor, Actor }
import akka.testkit.TestLatch
import akka.testkit.{ TestLatch, filterEvents, EventFilter, filterException }
import akka.util.duration._
object ActorPoolSpec {
@ -357,210 +357,214 @@ class ActorPoolSpec extends WordSpec with MustMatchers {
}
"provide default supervision of pooled actors" in {
import akka.config.Supervision._
val pingCount = new AtomicInteger(0)
val deathCount = new AtomicInteger(0)
var keepDying = false
filterException[RuntimeException] {
import akka.config.Supervision._
val pingCount = new AtomicInteger(0)
val deathCount = new AtomicInteger(0)
var keepDying = false
val pool1 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ pingCount.incrementAndGet
}
val pool1 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ pingCount.incrementAndGet
}
}).start()
}).start()
}).start()
val pool2 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
self.lifeCycle = Permanent
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ pingCount.incrementAndGet
}
val pool2 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
self.lifeCycle = Permanent
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ pingCount.incrementAndGet
}
}).start()
}).start()
}).start()
val pool3 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with RoundRobinSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
self.lifeCycle = Temporary
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ pingCount.incrementAndGet
}
val pool3 = actorOf(
new Actor with DefaultActorPool with DefaultActorPoolSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with RoundRobinSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
self.lifeCycle = Temporary
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case akka.Die
if (keepDying) deathCount.incrementAndGet
throw new RuntimeException
case _ pingCount.incrementAndGet
}
}).start()
}).start()
}).start()
// default lifecycle
// actor comes back right away
pingCount.set(0)
keepDying = false
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! akka.Die
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(1)
// default lifecycle
// actor comes back right away
pingCount.set(0)
keepDying = false
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! akka.Die
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(1)
// default lifecycle
// actor dies completely
pingCount.set(0)
keepDying = true
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! akka.Die
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(2)
// default lifecycle
// actor dies completely
pingCount.set(0)
keepDying = true
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! akka.Die
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(2)
// permanent lifecycle
// actor comes back right away
pingCount.set(0)
keepDying = false
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool2 ! akka.Die
sleepFor(2 seconds)
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(1)
// permanent lifecycle
// actor comes back right away
pingCount.set(0)
keepDying = false
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool2 ! akka.Die
sleepFor(2 seconds)
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(1)
// permanent lifecycle
// actor dies completely
pingCount.set(0)
keepDying = true
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool2 ! akka.Die
sleepFor(2 seconds)
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(2)
// permanent lifecycle
// actor dies completely
pingCount.set(0)
keepDying = true
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool2 ! akka.Die
sleepFor(2 seconds)
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool2 ! "ping"
(pool2 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(2)
// temporary lifecycle
pingCount.set(0)
keepDying = false
pool3 ! "ping"
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool3 ! akka.Die
sleepFor(2 seconds)
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool3 ! "ping"
pool3 ! "ping"
pool3 ! "ping"
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(4)
// temporary lifecycle
pingCount.set(0)
keepDying = false
pool3 ! "ping"
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool3 ! akka.Die
sleepFor(2 seconds)
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool3 ! "ping"
pool3 ! "ping"
pool3 ! "ping"
(pool3 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(4)
}
}
"support customizable supervision config of pooled actors" in {
import akka.config.Supervision._
val pingCount = new AtomicInteger(0)
val deathCount = new AtomicInteger(0)
var keepDying = false
filterEvents(EventFilter[IllegalStateException], EventFilter[RuntimeException]) {
import akka.config.Supervision._
val pingCount = new AtomicInteger(0)
val deathCount = new AtomicInteger(0)
var keepDying = false
trait LimitedTrapSupervisionConfig extends ActorPoolSupervisionConfig {
def poolFaultHandler = OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000)
}
trait LimitedTrapSupervisionConfig extends ActorPoolSupervisionConfig {
def poolFaultHandler = OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000)
}
object BadState
object BadState
val pool1 = actorOf(
new Actor with DefaultActorPool with LimitedTrapSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case BadState
if (keepDying) deathCount.incrementAndGet
throw new IllegalStateException
case akka.Die
throw new RuntimeException
case _ pingCount.incrementAndGet
}
val pool1 = actorOf(
new Actor with DefaultActorPool with LimitedTrapSupervisionConfig with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter {
def lowerBound = 2
def upperBound = 5
def rampupRate = 0.1
def backoffRate = 0.1
def backoffThreshold = 0.5
def partialFill = true
def selectionCount = 1
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
case BadState
if (keepDying) deathCount.incrementAndGet
throw new IllegalStateException
case akka.Die
throw new RuntimeException
case _ pingCount.incrementAndGet
}
}).start()
}).start()
}).start()
// actor comes back right away
pingCount.set(0)
keepDying = false
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! BadState
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(1)
// actor comes back right away
pingCount.set(0)
keepDying = false
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! BadState
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(1)
// actor dies completely
pingCount.set(0)
keepDying = true
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! BadState
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(2)
// actor dies completely
pingCount.set(0)
keepDying = true
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pool1 ! BadState
sleepFor(2 seconds)
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(1)
pool1 ! "ping"
(pool1 ? ActorPool.Stat).as[ActorPool.Stats].get.size must be(2)
pingCount.get must be(2)
// kill it
intercept[RuntimeException](pool1.?(akka.Die).get)
// kill it
intercept[RuntimeException](pool1.?(akka.Die).get)
}
}
}
}
}

View file

@ -29,13 +29,13 @@ class RoutingSpec extends WordSpec with MustMatchers {
"be started when constructed" in {
val actor1 = Actor.actorOf[TestActor].start
val actor = Routing.newRoutedActorRef("foo", List(actor1), RouterType.Direct)
val actor = Routing.actorOf("foo", List(actor1), RouterType.Direct)
actor.isRunning must be(true)
}
"throw IllegalArgumentException at construction when no connections" in {
try {
Routing.newRoutedActorRef("foo", List(), RouterType.Direct)
Routing.actorOf("foo", List(), RouterType.Direct)
fail()
} catch {
case e: IllegalArgumentException
@ -53,7 +53,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
}
}).start()
val routedActor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.Direct)
val routedActor = Routing.actorOf("foo", List(connection1), RouterType.Direct)
routedActor ! "hello"
routedActor ! "end"
@ -73,7 +73,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
}
}).start()
val actor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.Direct)
val actor = Routing.actorOf("foo", List(connection1), RouterType.Direct)
actor ! Broadcast(1)
actor ! "end"
@ -89,13 +89,13 @@ class RoutingSpec extends WordSpec with MustMatchers {
"be started when constructed" in {
val actor1 = Actor.actorOf[TestActor].start
val actor = Routing.newRoutedActorRef("foo", List(actor1), RouterType.RoundRobin)
val actor = Routing.actorOf("foo", List(actor1), RouterType.RoundRobin)
actor.isRunning must be(true)
}
"throw IllegalArgumentException at construction when no connections" in {
try {
Routing.newRoutedActorRef("foo", List(), RouterType.RoundRobin)
Routing.actorOf("foo", List(), RouterType.RoundRobin)
fail()
} catch {
case e: IllegalArgumentException
@ -127,7 +127,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
}
//create the routed actor.
val actor = Routing.newRoutedActorRef("foo", connections, RouterType.RoundRobin)
val actor = Routing.actorOf("foo", connections, RouterType.RoundRobin)
//send messages to the actor.
for (i 0 until iterationCount) {
@ -165,7 +165,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
}
}).start()
val actor = Routing.newRoutedActorRef("foo", List(connection1, connection2), RouterType.RoundRobin)
val actor = Routing.actorOf("foo", List(connection1, connection2), RouterType.RoundRobin)
actor ! Broadcast(1)
actor ! Broadcast("end")
@ -187,7 +187,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
}
}).start()
val actor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.RoundRobin)
val actor = Routing.actorOf("foo", List(connection1), RouterType.RoundRobin)
try {
actor ? Broadcast(1)
@ -208,13 +208,13 @@ class RoutingSpec extends WordSpec with MustMatchers {
val actor1 = Actor.actorOf[TestActor].start
val actor = Routing.newRoutedActorRef("foo", List(actor1), RouterType.Random)
val actor = Routing.actorOf("foo", List(actor1), RouterType.Random)
actor.isRunning must be(true)
}
"throw IllegalArgumentException at construction when no connections" in {
try {
Routing.newRoutedActorRef("foo", List(), RouterType.Random)
Routing.actorOf("foo", List(), RouterType.Random)
fail()
} catch {
case e: IllegalArgumentException
@ -244,7 +244,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
}
}).start()
val actor = Routing.newRoutedActorRef("foo", List(connection1, connection2), RouterType.Random)
val actor = Routing.actorOf("foo", List(connection1, connection2), RouterType.Random)
actor ! Broadcast(1)
actor ! Broadcast("end")
@ -266,7 +266,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
}
}).start()
val actor = Routing.newRoutedActorRef("foo", List(connection1), RouterType.Random)
val actor = Routing.actorOf("foo", List(connection1), RouterType.Random)
try {
actor ? Broadcast(1)
@ -286,7 +286,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
val actor1 = Actor.actorOf[TestActor].start
try {
Routing.newRoutedActorRef("foo", List(actor1), RouterType.LeastCPU)
Routing.actorOf("foo", List(actor1), RouterType.LeastCPU)
} catch {
case e: IllegalArgumentException
}
@ -298,7 +298,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
val actor1 = Actor.actorOf[TestActor].start
try {
Routing.newRoutedActorRef("foo", List(actor1), RouterType.LeastRAM)
Routing.actorOf("foo", List(actor1), RouterType.LeastRAM)
} catch {
case e: IllegalArgumentException
}
@ -310,7 +310,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
val actor1 = Actor.actorOf[TestActor].start
try {
Routing.newRoutedActorRef("foo", List(actor1), RouterType.LeastMessages)
Routing.actorOf("foo", List(actor1), RouterType.LeastMessages)
} catch {
case e: IllegalArgumentException
}

View file

@ -46,7 +46,7 @@ public class Actors {
* }
* }, "my-actor-address");
* actor.start();
* actor.sendOneWay(message, context);
* actor.tell(message, context);
* actor.stop();
* </pre>
*/
@ -70,7 +70,7 @@ public class Actors {
* }
* });
* actor.start();
* actor.sendOneWay(message, context);
* actor.tell(message, context);
* actor.stop();
* </pre>
*/
@ -84,7 +84,7 @@ public class Actors {
* <pre>
* ActorRef actor = Actors.actorOf(MyUntypedActor.class, "my-actor-address");
* actor.start();
* actor.sendOneWay(message, context);
* actor.tell(message, context);
* actor.stop();
* </pre>
* You can create and start the actor in one statement like this:
@ -102,7 +102,7 @@ public class Actors {
* <pre>
* ActorRef actor = Actors.actorOf(MyUntypedActor.class, "my-actor-address");
* actor.start();
* actor.sendOneWay(message, context);
* actor.tell(message, context);
* actor.stop();
* </pre>
* You can create and start the actor in one statement like this:
@ -130,7 +130,7 @@ public class Actors {
/**
* The message that when sent to an Actor kills it by throwing an exception.
* <pre>
* actor.sendOneWay(kill());
* actor.tell(kill());
* </pre>
* @return the single instance of Kill
*/
@ -142,7 +142,7 @@ public class Actors {
/**
* The message that when sent to an Actor shuts it down by calling 'stop'.
* <pre>
* actor.sendOneWay(poisonPill());
* actor.tell(poisonPill());
* </pre>
* @return the single instance of PoisonPill
*/

View file

@ -108,6 +108,9 @@ class InvalidMessageException private[akka] (message: String, cause: Throwable =
* This message is thrown by default when an Actors behavior doesn't match a message
*/
case class UnhandledMessageException(msg: Any, ref: ActorRef = null) extends Exception {
def this(msg: String) = this(msg, null)
// constructor with 'null' ActorRef needed to work with client instantiation of remote exception
override def getMessage =
if (ref ne null) "Actor %s does not handle [%s]".format(ref, msg)

View file

@ -71,7 +71,7 @@ private[akka] object ActorRefInternals {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Comparable[ActorRef] with Serializable {
abstract class ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Comparable[ActorRef] with Serializable {
scalaRef: ScalaActorRef
// Only mutable for RemoteServer in order to maintain identity across nodes
@volatile
@ -195,6 +195,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
* The reference sender Actor of the last received message.
* Is defined if the message was sent from another Actor, else None.
*/
@deprecated("will be removed in 2.0, use channel instead", "1.2")
def getSender: Option[ActorRef] = sender
/**
@ -202,6 +203,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
* The reference sender future of the last received message.
* Is defined if the message was sent with sent with '?'/'ask', else None.
*/
@deprecated("will be removed in 2.0, use channel instead", "1.2")
def getSenderFuture: Option[Promise[Any]] = senderFuture
/**
@ -260,7 +262,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
* Sends a message asynchronously returns a future holding the eventual reply message.
* <p/>
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'getContext().getSender()' to
* Use this method with care. In most cases it is better to use 'tell' together with the 'getContext().getSender()' to
* implement request/response message exchanges.
* <p/>
* If you are sending messages using <code>ask</code> then you <b>have to</b> use <code>getContext().reply(..)</code>

View file

@ -59,10 +59,10 @@ trait Channel[-T] {
* Java API.<p/>
* Sends the specified message to the channel, i.e. fire-and-forget semantics.<p/>
* <pre>
* actor.sendOneWay(message);
* actor.tell(message);
* </pre>
*/
def sendOneWay(msg: T): Unit = this.!(msg)
def tell(msg: T): Unit = this.!(msg)
/**
* Java API. <p/>
@ -70,19 +70,19 @@ trait Channel[-T] {
* semantics, including the sender reference if possible (not supported on
* all channels).<p/>
* <pre>
* actor.sendOneWay(message, context);
* actor.tell(message, context);
* </pre>
*/
def sendOneWay(msg: T, sender: UntypedChannel): Unit = this.!(msg)(sender)
def tell(msg: T, sender: UntypedChannel): Unit = this.!(msg)(sender)
/**
* Java API.<p/>
* Try to send the specified message to the channel, i.e. fire-and-forget semantics.<p/>
* <pre>
* actor.sendOneWay(message);
* actor.tell(message);
* </pre>
*/
def sendOneWaySafe(msg: T): Boolean = this.safe_!(msg)
def tellSafe(msg: T): Boolean = this.safe_!(msg)
/**
* Java API. <p/>
@ -90,10 +90,10 @@ trait Channel[-T] {
* semantics, including the sender reference if possible (not supported on
* all channels).<p/>
* <pre>
* actor.sendOneWay(message, context);
* actor.tell(message, context);
* </pre>
*/
def sendOneWaySafe(msg: T, sender: UntypedChannel): Boolean = this.safe_!(msg)(sender)
def tellSafe(msg: T, sender: UntypedChannel): Boolean = this.safe_!(msg)(sender)
}

View file

@ -25,7 +25,7 @@ import akka.japi.{ Creator, Procedure }
* } else if (msg.equals("UseSender") && getContext().getSender().isDefined()) {
* // Reply to original sender of message using the sender reference
* // also passing along my own reference (the context)
* getContext().getSender().get().sendOneWay(msg, context);
* getContext().getSender().get().tell(msg, context);
*
* } else if (msg.equals("UseSenderFuture") && getContext().getSenderFuture().isDefined()) {
* // Reply to original sender of message using the sender future reference
@ -33,7 +33,7 @@ import akka.japi.{ Creator, Procedure }
*
* } else if (msg.equals("SendToSelf")) {
* // Send message to the actor itself recursively
* getContext().sendOneWay(msg)
* getContext().tell(msg)
*
* } else if (msg.equals("ForwardMessage")) {
* // Retreive an actor from the ActorRegistry by ID and get an ActorRef back
@ -46,7 +46,7 @@ import akka.japi.{ Creator, Procedure }
* public static void main(String[] args) {
* ActorRef actor = Actors.actorOf(SampleUntypedActor.class);
* actor.start();
* actor.sendOneWay("SendToSelf");
* actor.tell("SendToSelf");
* actor.stop();
* }
* }

View file

@ -787,6 +787,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
/**
* Must be called inside _lock.lock<->_lock.unlock
* Returns true if completed within the timeout
*/
@tailrec
private def awaitUnsafe(waitTimeNanos: Long): Boolean = {
@ -805,26 +806,23 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
}
def await(atMost: Duration) = {
_lock.lock()
if (try { awaitUnsafe(atMost.toNanos min timeLeft()) } finally { _lock.unlock }) this
else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds")
}
def await = {
_lock.lock()
try {
if (timeout.duration.isFinite) {
if (awaitUnsafe(timeLeft())) this
else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds")
} else {
if (!atMost.isFinite && !timeout.duration.isFinite) { //If wait until infinity
while (_value.isEmpty) { _signal.await }
this
} else { //Limited wait
val time = if (!atMost.isFinite) timeLeft() //If atMost is infinity, use preset timeout
else if (!timeout.duration.isFinite) atMost.toNanos //If preset timeout is infinite, use atMost
else atMost.toNanos min timeLeft() //Otherwise use the smallest of them
if (awaitUnsafe(time)) this
else throw new FutureTimeoutException("Future timed out after [" + NANOS.toMillis(time) + "] ms")
}
} finally {
_lock.unlock
}
} finally { _lock.unlock }
}
def await = await(timeout.duration)
def isExpired: Boolean = if (timeout.duration.isFinite) timeLeft() <= 0 else false
def value: Option[Either[Throwable, T]] = {
@ -954,7 +952,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
try {
func(this)
} catch {
case e EventHandler notify EventHandler.Error(e, this)
case e EventHandler.error(e, this, "Future onComplete-callback raised an exception")
}
}

View file

@ -49,12 +49,12 @@ object MessageDispatcher {
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait MessageDispatcher {
abstract class MessageDispatcher {
import MessageDispatcher._
protected val uuids = new ConcurrentSkipListSet[Uuid]
protected val tasks = new AtomicLong(0L)
protected val _tasks = new AtomicLong(0L)
protected val guard = new ReentrantGuard
protected val active = new Switch(false)
@ -93,7 +93,7 @@ trait MessageDispatcher {
}
private[akka] final def dispatchTask(block: () Unit): Unit = {
tasks.getAndIncrement()
_tasks.getAndIncrement()
try {
if (active.isOff)
guard withGuard {
@ -104,15 +104,15 @@ trait MessageDispatcher {
executeTask(TaskInvocation(block, taskCleanup))
} catch {
case e
tasks.decrementAndGet
_tasks.decrementAndGet
throw e
}
}
private val taskCleanup: () Unit =
() if (tasks.decrementAndGet() == 0) {
() if (_tasks.decrementAndGet() == 0) {
guard withGuard {
if (tasks.get == 0 && uuids.isEmpty) {
if (_tasks.get == 0 && uuids.isEmpty) {
shutdownSchedule match {
case UNSCHEDULED
shutdownSchedule = SCHEDULED
@ -149,7 +149,7 @@ trait MessageDispatcher {
if (uuids remove actorRef.uuid) {
cleanUpMailboxFor(actorRef)
actorRef.mailbox = null
if (uuids.isEmpty && tasks.get == 0) {
if (uuids.isEmpty && _tasks.get == 0) {
shutdownSchedule match {
case UNSCHEDULED
shutdownSchedule = SCHEDULED
@ -190,7 +190,7 @@ trait MessageDispatcher {
shutdownSchedule = SCHEDULED
Scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS)
case SCHEDULED
if (uuids.isEmpty && tasks.get == 0) {
if (uuids.isEmpty && _tasks.get == 0) {
active switchOff {
shutdown() // shut down in the dispatcher's references is zero
}
@ -248,7 +248,7 @@ trait MessageDispatcher {
/**
* Returns the amount of tasks queued for execution
*/
def pendingTasks: Long = tasks.get
def tasks: Long = _tasks.get
}
/**

View file

@ -73,7 +73,7 @@ object Routing {
* @throws IllegalArgumentException if the number of connections is zero, or if it depends on the actual router implementation
* how many connections it can handle.
*/
def newRoutedActorRef(actorAddress: String, connections: Iterable[ActorRef], routerType: RouterType): RoutedActorRef = {
def actorOf(actorAddress: String, connections: Iterable[ActorRef], routerType: RouterType): ActorRef = {
if (connections.size == 0)
throw new IllegalArgumentException("To create a routed actor ref, at least one connection is required")
@ -91,8 +91,8 @@ object Routing {
ref.start()
}
def newRoundRobinActorRef(actorAddress: String, connections: Iterable[ActorRef]): RoutedActorRef = {
newRoutedActorRef(actorAddress, connections, RoundRobin)
def actorOfWithRoundRobin(actorAddress: String, connections: Iterable[ActorRef]): ActorRef = {
actorOf(actorAddress, connections, RoundRobin)
}
}
@ -100,7 +100,7 @@ object Routing {
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to
* on (or more) of these actors.
*/
class RoutedActorRef(val address: String, val cons: Iterable[ActorRef]) extends UnsupportedActorRef with ScalaActorRef {
class RoutedActorRef(val address: String, val cons: Iterable[ActorRef]) extends UnsupportedActorRef {
this: Router
def connections: Iterable[ActorRef] = cons

View file

@ -69,7 +69,7 @@ class UntypedProducerFeatureTest extends FeatureSpec with BeforeAndAfterAll with
when("a test message is sent to the producer with !")
mockEndpoint.expectedBodiesReceived("received test")
val result = producer.sendOneWay(Message("test"), producer)
val result = producer.tell(Message("test"), producer)
then("a normal response should have been sent")
mockEndpoint.assertIsSatisfied

View file

@ -1430,6 +1430,7 @@ class DefaultClusterNode private[akka] (
}
private[cluster] def failOverClusterActorRefConnections(from: InetSocketAddress, to: InetSocketAddress) {
EventHandler.info(this, "failOverClusterActorRef from %s to %s".format(from, to))
clusterActorRefs.values(from) foreach (_.failOver(from, to))
}

View file

@ -15,6 +15,7 @@ import com.eaio.uuid.UUID
import collection.immutable.Map
import annotation.tailrec
import akka.routing.Router
import akka.event.EventHandler
/**
* ActorRef representing a one or many instances of a clustered, load-balanced and sometimes replicated actor
@ -25,7 +26,7 @@ import akka.routing.Router
class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]],
val address: String,
_timeout: Long)
extends UnsupportedActorRef with ScalaActorRef {
extends UnsupportedActorRef {
this: Router
timeout = _timeout
@ -57,6 +58,8 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
}
private[akka] def failOver(from: InetSocketAddress, to: InetSocketAddress): Unit = {
EventHandler.info(this, "ClusterActorRef. %s failover from %s to %s".format(address, from, to))
@tailrec
def doFailover(from: InetSocketAddress, to: InetSocketAddress): Unit = {
val oldValue = inetSocketAddressToActorRefMap.get
@ -93,6 +96,8 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
}
def signalDeadActor(ref: ActorRef): Unit = {
EventHandler.info(this, "ClusterActorRef %s signalDeadActor %s".format(address, ref.address))
//since the number remote actor refs for a clustered actor ref is quite low, we can deal with the O(N) complexity
//of the following removal.
val map = inetSocketAddressToActorRefMap.get

View file

@ -283,7 +283,7 @@ abstract class RemoteClient private[akka] (
while (pendingRequest ne null) {
val (isOneWay, futureUuid, message) = pendingRequest
if (isOneWay) {
// sendOneWay
// tell
val future = currentChannel.write(RemoteEncoder.encode(message))
future.awaitUninterruptibly()
if (!future.isCancelled && !future.isSuccess) {

View file

@ -1,42 +0,0 @@
package akka.cluster.routing.direct.bad_address
import akka.cluster.{ Cluster, MasterClusterTestNode }
import akka.actor.Actor
import akka.config.Config
object BadAddressDirectRoutingMultiJvmSpec {
val NrOfNodes = 1
class SomeActor extends Actor with Serializable {
println("---------------------------------------------------------------------------")
println("SomeActor has been created on node [" + Config.nodename + "]")
println("---------------------------------------------------------------------------")
def receive = {
case "identify" {
println("The node received the 'identify' command: " + Config.nodename)
self.reply(Config.nodename)
}
}
}
}
class BadAddressDirectRoutingMultiJvmNode1 extends MasterClusterTestNode {
import BadAddressDirectRoutingMultiJvmSpec._
val testNodes = NrOfNodes
"node" must {
"participate in cluster" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
Cluster.barrier("waiting-to-end", NrOfNodes).await()
Cluster.node.shutdown()
}
}
}

View file

@ -1,3 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"
akka.actor.deployment.service-hello.clustered.home = "node:node2"

View file

@ -0,0 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"
akka.actor.deployment.service-hello.clustered.home = "node:node2"

View file

@ -0,0 +1,77 @@
package akka.cluster.routing.direct.failover
import akka.config.Config
import scala.Predef._
import akka.cluster.{ ClusterActorRef, Cluster, MasterClusterTestNode, ClusterTestNode }
import akka.actor.{ ActorInitializationException, Actor }
object FailoverDirectRoutingMultiJvmSpec {
val NrOfNodes = 2
class SomeActor extends Actor with Serializable {
println("---------------------------------------------------------------------------")
println("SomeActor has been created on node [" + Config.nodename + "]")
println("---------------------------------------------------------------------------")
def receive = {
case "identify"
println("The node received the 'identify' command: " + Config.nodename)
self.reply(Config.nodename)
case "die"
println("The node received the 'die' command: " + Config.nodename)
Cluster.node.shutdown
}
}
}
class FailoverDirectRoutingMultiJvmNode1 extends MasterClusterTestNode {
import FailoverDirectRoutingMultiJvmSpec._
val testNodes = NrOfNodes
"Direct Router" must {
"not yet be able to failover to another node" in {
println("==================================================================================================")
println(" FAILOVER DIRECT ROUTING")
println("==================================================================================================")
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
val actor = Actor.actorOf[SomeActor]("service-hello").start().asInstanceOf[ClusterActorRef]
println("retrieved identity was: " + (actor ? "identify").get)
(actor ? "identify").get must equal("node2")
actor ! "die"
Thread.sleep(4000)
try {
actor ! "identify"
fail()
} catch {
case e: ActorInitializationException
}
}
}
}
class FailoverDirectRoutingMultiJvmNode2 extends ClusterTestNode {
import FailoverDirectRoutingMultiJvmSpec._
"___" must {
"___" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
Thread.sleep(30 * 1000)
}
}
}

View file

@ -0,0 +1,59 @@
package akka.cluster.routing.direct.homenode
import akka.config.Config
import akka.actor.Actor
import akka.cluster.{ ClusterTestNode, MasterClusterTestNode, Cluster }
import Cluster._
object HomeNodeMultiJvmSpec {
val NrOfNodes = 2
class SomeActor extends Actor with Serializable {
def receive = {
case "identify" {
self.reply(Config.nodename)
}
}
}
}
class HomeNodeMultiJvmNode1 extends MasterClusterTestNode {
import HomeNodeMultiJvmSpec._
val testNodes = NrOfNodes
"___" must {
"___" in {
node.start()
barrier("waiting-for-begin", NrOfNodes).await()
barrier("waiting-for-end", NrOfNodes).await()
node.shutdown()
}
}
}
class HomeNodeMultiJvmNode2 extends ClusterTestNode {
import HomeNodeMultiJvmSpec._
"Direct Router: A Direct Router" must {
"obey 'home-node' config option when instantiated actor in cluster" in {
node.start()
barrier("waiting-for-begin", NrOfNodes).await()
val actorNode1 = Actor.actorOf[SomeActor]("service-node1").start()
val name1 = (actorNode1 ? "identify").get.asInstanceOf[String]
name1 must equal("node1")
val actorNode2 = Actor.actorOf[SomeActor]("service-node2").start()
val name2 = (actorNode2 ? "identify").get.asInstanceOf[String]
name2 must equal("node2")
barrier("waiting-for-end", NrOfNodes).await()
node.shutdown()
}
}
}

View file

@ -0,0 +1,6 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-node1.router = "direct"
akka.actor.deployment.service-node1.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-node2.router = "direct"
akka.actor.deployment.service-node2.clustered.preferred-nodes = ["node:node2"]

View file

@ -0,0 +1,6 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-node1.router = "direct"
akka.actor.deployment.service-node1.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-node2.router = "direct"
akka.actor.deployment.service-node2.clustered.preferred-nodes = ["node:node2"]

View file

@ -1,3 +0,0 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"

View file

@ -1,66 +0,0 @@
package akka.cluster.routing.direct.multiple_replicas
import akka.actor.Actor
import akka.cluster.{ MasterClusterTestNode, Cluster, ClusterTestNode }
import akka.config.Config
object MultiReplicaDirectRoutingMultiJvmSpec {
val NrOfNodes = 2
class SomeActor extends Actor with Serializable {
println("---------------------------------------------------------------------------")
println("SomeActor has been created on node [" + Config.nodename + "]")
println("---------------------------------------------------------------------------")
def receive = {
case "identify" {
println("The node received the 'identify' command: " + Config.nodename)
self.reply(Config.nodename)
}
}
}
}
class MultiReplicaDirectRoutingMultiJvmNode2 extends ClusterTestNode {
import MultiReplicaDirectRoutingMultiJvmSpec._
"when node send message to existing node using direct routing it" must {
"communicate with that node" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
//Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes).await()
val actor = Actor.actorOf[SomeActor]("service-hello")
actor.start()
//actor.start()
val name: String = (actor ? "identify").get.asInstanceOf[String]
println("The name of the actor was " + name)
Cluster.barrier("waiting-to-end", NrOfNodes).await()
Cluster.node.shutdown()
}
}
}
class MultiReplicaDirectRoutingMultiJvmNode1 extends MasterClusterTestNode {
import MultiReplicaDirectRoutingMultiJvmSpec._
val testNodes = NrOfNodes
"node" must {
"participate in cluster" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
Cluster.barrier("waiting-to-end", NrOfNodes).await()
Cluster.node.shutdown()
}
}
}

View file

@ -0,0 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]

View file

@ -0,0 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]

View file

@ -1,8 +1,8 @@
package akka.cluster.routing.direct.single_replica
package akka.cluster.routing.direct.normalusage
import akka.actor.Actor
import akka.config.Config
import akka.cluster.{ ClusterTestNode, MasterClusterTestNode, Cluster }
import akka.cluster.{ ClusterActorRef, ClusterTestNode, MasterClusterTestNode, Cluster }
object SingleReplicaDirectRoutingMultiJvmSpec {
val NrOfNodes = 2
@ -19,7 +19,6 @@ object SingleReplicaDirectRoutingMultiJvmSpec {
}
}
}
}
class SingleReplicaDirectRoutingMultiJvmNode1 extends MasterClusterTestNode {
@ -28,24 +27,6 @@ class SingleReplicaDirectRoutingMultiJvmNode1 extends MasterClusterTestNode {
val testNodes = NrOfNodes
"when node send message to existing node using direct routing it" must {
"communicate with that node" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
val actor = Actor.actorOf[SomeActor]("service-hello").start()
actor.isRunning must be(true)
Cluster.barrier("waiting-to-end", NrOfNodes).await()
Cluster.node.shutdown()
}
}
}
class SingleReplicaDirectRoutingMultiJvmNode2 extends ClusterTestNode {
import SingleReplicaDirectRoutingMultiJvmSpec._
"___" must {
"___" in {
Cluster.node.start()
@ -57,3 +38,24 @@ class SingleReplicaDirectRoutingMultiJvmNode2 extends ClusterTestNode {
}
}
class SingleReplicaDirectRoutingMultiJvmNode2 extends ClusterTestNode {
import SingleReplicaDirectRoutingMultiJvmSpec._
"Direct Router: when node send message to existing node it" must {
"communicate with that node" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
val actor = Actor.actorOf[SomeActor]("service-hello").start().asInstanceOf[ClusterActorRef]
actor.isRunning must be(true)
val result = (actor ? "identify").get
result must equal("node1")
Cluster.barrier("waiting-to-end", NrOfNodes).await()
Cluster.node.shutdown()
}
}
}

View file

@ -1,3 +0,0 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replication-factor = 1
akka.actor.deployment.service-hello.router = "random"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node2"]
akka.actor.deployment.service-hello.clustered.replication-factor = 2

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replication-factor = 1
akka.actor.deployment.service-hello.router = "random"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node2"]
akka.actor.deployment.service-hello.clustered.replication-factor = 2

View file

@ -0,0 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "random"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node2"]
akka.actor.deployment.service-hello.clustered.replication-factor = 2

View file

@ -0,0 +1,119 @@
package akka.cluster.routing.random.failover
import akka.config.Config
import akka.cluster._
import akka.actor.{ ActorRef, Actor }
import java.util.{ Collections, Set JSet }
object RandomFailoverMultiJvmSpec {
val NrOfNodes = 3
class SomeActor extends Actor with Serializable {
println("---------------------------------------------------------------------------")
println("SomeActor has been created on node [" + Config.nodename + "]")
println("---------------------------------------------------------------------------")
def receive = {
case "identify" {
println("The node received the 'identify' command")
self.reply(Config.nodename)
}
case "shutdown" {
println("The node received the 'shutdown' command")
Cluster.node.shutdown()
}
}
}
}
class RandomFailoverMultiJvmNode1 extends MasterClusterTestNode {
import RandomFailoverMultiJvmSpec._
def testNodes = NrOfNodes
def sleepSome() {
println("Starting sleep")
Thread.sleep(1000) //nasty.. but ok for now.
println("Finished doing sleep")
}
"Random: when routing fails" must {
"jump to another replica" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
// ============= the real testing =================
val actor = Actor.actorOf[SomeActor]("service-hello").asInstanceOf[ClusterActorRef]
val oldFoundConnections = identifyConnections(actor)
println("---------------------------- oldFoundConnections ------------------------")
println(oldFoundConnections)
//since we have replication factor 2
oldFoundConnections.size() must be(2)
//terminate a node
actor ! "shutdown"
sleepSome()
//this is where the system behaves unpredictable. From time to time it works... from time to time there
//all kinds of connection timeouts. So this test shows that there are problems. For the time being
//the test code has been deactivated to prevent causing problems.
val newFoundConnections = identifyConnections(actor)
println("---------------------------- newFoundConnections ------------------------")
println(newFoundConnections)
//it still must be 2 since a different node should have been used to failover to
newFoundConnections.size() must be(2)
//they are not disjoint since, there must be a single element that is in both
Collections.disjoint(newFoundConnections, oldFoundConnections) must be(false)
//but they should not be equal since the shutdown-node has been replaced by another one.
newFoundConnections.equals(oldFoundConnections) must be(false)
Cluster.node.shutdown()
}
}
def identifyConnections(actor: ActorRef): JSet[String] = {
val set = new java.util.HashSet[String]
for (i 0 until NrOfNodes * 10) {
val value = (actor ? "identify").get.asInstanceOf[String]
set.add(value)
}
set
}
}
class RandomFailoverMultiJvmNode2 extends ClusterTestNode {
import RandomFailoverMultiJvmSpec._
"___" must {
"___" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
Thread.sleep(30 * 1000)
}
}
}
class RandomFailoverMultiJvmNode3 extends ClusterTestNode {
import RandomFailoverMultiJvmSpec._
"___" must {
"___" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
Thread.sleep(30 * 1000)
}
}
}

View file

@ -0,0 +1,8 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-node1.router = "random"
akka.actor.deployment.service-node1.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-node1.clustered.replication-factor = 1
akka.actor.deployment.service-node2.router = "random"
akka.actor.deployment.service-node2.clustered.preferred-nodes = ["node:node2"]
akka.actor.deployment.service-node2.clustered.replication-factor = 1

View file

@ -0,0 +1,8 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-node1.router = "random"
akka.actor.deployment.service-node1.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-node1.clustered.replication-factor = 1
akka.actor.deployment.service-node2.router = "random"
akka.actor.deployment.service-node2.clustered.preferred-nodes = ["node:node2"]
akka.actor.deployment.service-node2.clustered.replication-factor = 1

View file

@ -0,0 +1,59 @@
package akka.cluster.routing.random.homenode
import akka.config.Config
import akka.actor.Actor
import akka.cluster.{ ClusterTestNode, MasterClusterTestNode, Cluster }
import Cluster._
object HomeNodeMultiJvmSpec {
val NrOfNodes = 2
class SomeActor extends Actor with Serializable {
def receive = {
case "identify" {
self.reply(Config.nodename)
}
}
}
}
class HomeNodeMultiJvmNode1 extends MasterClusterTestNode {
import HomeNodeMultiJvmSpec._
val testNodes = NrOfNodes
"___" must {
"___" in {
node.start()
barrier("waiting-for-begin", NrOfNodes).await()
barrier("waiting-for-end", NrOfNodes).await()
node.shutdown()
}
}
}
class HomeNodeMultiJvmNode2 extends ClusterTestNode {
import HomeNodeMultiJvmSpec._
"Random Router: A Random Router" must {
"obey 'home-node' config option when instantiated actor in cluster" in {
node.start()
barrier("waiting-for-begin", NrOfNodes).await()
val actorNode1 = Actor.actorOf[SomeActor]("service-node1")
val nameNode1 = (actorNode1 ? "identify").get.asInstanceOf[String]
nameNode1 must equal("node1")
val actorNode2 = Actor.actorOf[SomeActor]("service-node2")
val nameNode2 = (actorNode2 ? "identify").get.asInstanceOf[String]
nameNode2 must equal("node2")
barrier("waiting-for-end", NrOfNodes).await()
node.shutdown()
}
}
}

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"
akka.actor.deployment.service-hello.router = "random"
akka.actor.deployment.service-hello.clustered.replication-factor = 1

View file

@ -0,0 +1,50 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing.random.replicationfactor_1
import akka.cluster._
import akka.cluster.Cluster._
import akka.actor._
import akka.config.Config
/**
* Test that if a single node is used with a random router with replication factor then the actor is instantiated
* on the single node.
*/
object Random1ReplicaMultiJvmSpec {
class HelloWorld extends Actor with Serializable {
def receive = {
case "Hello"
self.reply("World from node [" + Config.nodename + "]")
}
}
}
class Random1ReplicaMultiJvmNode1 extends MasterClusterTestNode {
import Random1ReplicaMultiJvmSpec._
val testNodes = 1
"Random Router: A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in {
node.start()
var hello = Actor.actorOf[HelloWorld]("service-hello")
hello must not equal (null)
hello.address must equal("service-hello")
hello.isInstanceOf[ClusterActorRef] must be(true)
hello must not equal (null)
val reply = (hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1"))
reply must equal("World from node [node1]")
node.shutdown()
}
}
}

View file

@ -1,5 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.home = "node:node1"
akka.actor.deployment.service-hello.clustered.replication-factor = 1
akka.actor.deployment.service-hello.router = "random"
akka.actor.deployment.service-hello.clustered.replication-factor = 3

View file

@ -0,0 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "random"
akka.actor.deployment.service-hello.clustered.repliction-factor = 3

View file

@ -0,0 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "random"
akka.actor.deployment.service-hello.clustered.replication-factor = 3

View file

@ -0,0 +1,111 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing.random.replicationfactor_3
import akka.cluster._
import akka.actor._
import akka.config.Config
import Cluster._
/**
* When a MultiJvmNode is started, will it automatically be part of the cluster (so will it automatically be eligible
* for running actors, or will it be just a 'client' talking to the cluster.
*/
object Random3ReplicasMultiJvmSpec {
val NrOfNodes = 3
class HelloWorld extends Actor with Serializable {
def receive = {
case "Hello"
self.reply("World from node [" + Config.nodename + "]")
}
}
}
/**
* What is the purpose of this node? Is this just a node for the cluster to make use of?
*/
class Random3ReplicasMultiJvmNode1 extends MasterClusterTestNode {
import Random3ReplicasMultiJvmSpec._
def testNodes: Int = NrOfNodes
"___" must {
"___" in {
node.start()
//wait till node 1 has started.
barrier("begin", NrOfNodes).await()
barrier("end", NrOfNodes).await()
node.shutdown()
}
}
}
class Random3ReplicasMultiJvmNode2 extends ClusterTestNode {
import Random3ReplicasMultiJvmSpec._
import Cluster._
"Random: A cluster" must {
"distribute requests randomly" in {
node.start()
//wait till node 1 has started.
barrier("begin", NrOfNodes).await()
//check if the actorRef is the expected remoteActorRef.
var hello: ActorRef = null
hello = Actor.actorOf[HelloWorld]("service-hello")
hello must not equal (null)
hello.address must equal("service-hello")
hello.isInstanceOf[ClusterActorRef] must be(true)
//todo: is there a reason to check for null again since it already has been done in the previous block.
hello must not equal (null)
val replies = collection.mutable.Map.empty[String, Int]
def count(reply: String) = {
if (replies.get(reply).isEmpty) replies.put(reply, 1)
else replies.put(reply, replies(reply) + 1)
}
for (i 0 until 1000) {
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from a node")))
}
assert(replies("World from node [node1]") > 100)
assert(replies("World from node [node2]") > 100)
assert(replies("World from node [node3]") > 100)
barrier("end", NrOfNodes).await()
node.shutdown()
}
}
}
class Random3ReplicasMultiJvmNode3 extends ClusterTestNode {
import Random3ReplicasMultiJvmSpec._
import Cluster._
"___" must {
"___" in {
node.start()
//wait till node 1 has started.
barrier("begin", NrOfNodes).await()
barrier("end", NrOfNodes).await()
node.shutdown()
}
}
}

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replication-factor = 1
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node2"]
akka.actor.deployment.service-hello.clustered.replication-factor = 2

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replication-factor = 1
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node2"]
akka.actor.deployment.service-hello.clustered.replication-factor = 2

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replication-factor = 1
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node2"]
akka.actor.deployment.service-hello.clustered.replication-factor = 2

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node3 -Dakka.cluster.port=9993

View file

@ -0,0 +1,120 @@
package akka.cluster.routing.roundrobin.failover
import akka.config.Config
import akka.cluster._
import akka.actor.{ ActorRef, Actor }
import java.util.{ Collections, Set JSet }
object RoundRobinFailoverMultiJvmSpec {
val NrOfNodes = 3
class SomeActor extends Actor with Serializable {
println("---------------------------------------------------------------------------")
println("SomeActor has been created on node [" + Config.nodename + "]")
println("---------------------------------------------------------------------------")
def receive = {
case "identify" {
println("The node received the 'identify' command")
self.reply(Config.nodename)
}
case "shutdown" {
println("The node received the 'shutdown' command")
Cluster.node.shutdown()
}
}
}
}
class RoundRobinFailoverMultiJvmNode1 extends MasterClusterTestNode {
import RoundRobinFailoverMultiJvmSpec._
def testNodes = NrOfNodes
def sleepSome() {
println("Starting sleep")
Thread.sleep(1000) //nasty.. but ok for now.
println("Finished doing sleep")
}
"Round Robin: when round robin fails" must {
"jump to another replica" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
// ============= the real testing =================
val actor = Actor.actorOf[SomeActor]("service-hello").asInstanceOf[ClusterActorRef]
val oldFoundConnections = identifyConnections(actor)
println("---------------------------- oldFoundConnections ------------------------")
println(oldFoundConnections)
//since we have replication factor 2
oldFoundConnections.size() must be(2)
//terminate a node
actor ! "shutdown"
sleepSome()
//this is where the system behaves unpredictable. From time to time it works... from time to time there
//all kinds of connection timeouts. So this test shows that there are problems. For the time being
//the test code has been deactivated to prevent causing problems.
/*
val newFoundConnections = identifyConnections(actor)
println("---------------------------- newFoundConnections ------------------------")
println(newFoundConnections)
//it still must be 2 since a different node should have been used to failover to
newFoundConnections.size() must be(2)
//they are not disjoint since, there must be a single element that is in both
Collections.disjoint(newFoundConnections, oldFoundConnections) must be(false)
//but they should not be equal since the shutdown-node has been replaced by another one.
newFoundConnections.equals(oldFoundConnections) must be(false)
*/
Cluster.node.shutdown()
}
}
def identifyConnections(actor: ActorRef): JSet[String] = {
val set = new java.util.HashSet[String]
for (i 0 until NrOfNodes * 2) {
val value = (actor ? "identify").get.asInstanceOf[String]
set.add(value)
}
set
}
}
class RoundRobinFailoverMultiJvmNode2 extends ClusterTestNode {
import RoundRobinFailoverMultiJvmSpec._
"___" must {
"___" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
Thread.sleep(30 * 1000)
}
}
}
class RoundRobinFailoverMultiJvmNode3 extends ClusterTestNode {
import RoundRobinFailoverMultiJvmSpec._
"___" must {
"___" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
Thread.sleep(30 * 1000)
}
}
}

View file

@ -0,0 +1,8 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-node1.router = "round-robin"
akka.actor.deployment.service-node1.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-node1.clustered.replication-factor = 1
akka.actor.deployment.service-node2.router = "round-robin"
akka.actor.deployment.service-node2.clustered.preferred-nodes = ["node:node2"]
akka.actor.deployment.service-node2.clustered.replication-factor = 1

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992

View file

@ -1,7 +1,7 @@
package akka.cluster.routing.homenode
package akka.cluster.routing.roundrobin.homenode
import akka.config.Config
import akka.actor.{ ActorRef, Actor }
import akka.actor.Actor
import akka.cluster.{ ClusterTestNode, MasterClusterTestNode, Cluster }
import Cluster._
@ -16,6 +16,7 @@ object HomeNodeMultiJvmSpec {
}
}
}
}
class HomeNodeMultiJvmNode1 extends MasterClusterTestNode {
@ -24,13 +25,12 @@ class HomeNodeMultiJvmNode1 extends MasterClusterTestNode {
val testNodes = NrOfNodes
"A Router" must {
"obey 'home-node' config option when instantiated actor in cluster" in {
"___" must {
"___" in {
node.start()
barrier("waiting-for-begin", NrOfNodes).await()
barrier("get-ref-to-actor-on-node2", NrOfNodes).await()
barrier("waiting-for-end", NrOfNodes).await()
node.shutdown()
}
@ -41,18 +41,21 @@ class HomeNodeMultiJvmNode2 extends ClusterTestNode {
import HomeNodeMultiJvmSpec._
"A Router" must {
"Round Robin: A Router" must {
"obey 'home-node' config option when instantiated actor in cluster" in {
node.start()
barrier("waiting-for-begin", NrOfNodes).await()
barrier("get-ref-to-actor-on-node2", NrOfNodes) {
val actor = Actor.actorOf[SomeActor]("service-hello")
val name = (actor ? "identify").get.asInstanceOf[String]
name must equal("node1")
}
val actorNode1 = Actor.actorOf[SomeActor]("service-node1")
val name1 = (actorNode1 ? "identify").get.asInstanceOf[String]
name1 must equal("node1")
val actorNode2 = Actor.actorOf[SomeActor]("service-node2")
val name2 = (actorNode2 ? "identify").get.asInstanceOf[String]
name2 must equal("node2")
barrier("waiting-for-end", NrOfNodes).await()
node.shutdown()
}
}

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing.roundrobin_1_replica
package akka.cluster.routing.roundrobin.replicationfactor_1
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
@ -32,13 +32,12 @@ class RoundRobin1ReplicaMultiJvmNode1 extends MasterClusterTestNode {
val testNodes = 1
"A cluster" must {
"Round Robin: A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in {
node.start()
var hello: ActorRef = null
hello = Actor.actorOf[HelloWorld]("service-hello")
var hello = Actor.actorOf[HelloWorld]("service-hello")
hello must not equal (null)
hello.address must equal("service-hello")
hello.isInstanceOf[ClusterActorRef] must be(true)

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing.roundrobin_2_replicas
package akka.cluster.routing.roundrobin.replicationfactor_2
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
@ -39,7 +39,7 @@ object RoundRobin2ReplicasMultiJvmSpec {
class RoundRobin2ReplicasMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
import RoundRobin2ReplicasMultiJvmSpec._
"A cluster" must {
"Round Robin: A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in {
System.getProperty("akka.cluster.nodename", "") must be("node1")
@ -51,16 +51,16 @@ class RoundRobin2ReplicasMultiJvmNode1 extends WordSpec with MustMatchers with B
}
//wait till ndoe 2 has started.
barrier("start-node2", NrOfNodes) {}
barrier("start-node2", NrOfNodes).await()
//wait till node 3 has started.
barrier("start-node3", NrOfNodes) {}
barrier("start-node3", NrOfNodes).await()
//wait till an actor reference on node 2 has become available.
barrier("get-ref-to-actor-on-node2", NrOfNodes) {}
barrier("get-ref-to-actor-on-node2", NrOfNodes).await()
//wait till the node 2 has send a message to the replica's.
barrier("send-message-from-node2-to-replicas", NrOfNodes) {}
barrier("send-message-from-node2-to-replicas", NrOfNodes).await()
node.shutdown()
}
@ -78,14 +78,14 @@ class RoundRobin2ReplicasMultiJvmNode1 extends WordSpec with MustMatchers with B
class RoundRobin2ReplicasMultiJvmNode2 extends WordSpec with MustMatchers {
import RoundRobin2ReplicasMultiJvmSpec._
"A cluster" must {
"Round Robin: A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in {
System.getProperty("akka.cluster.nodename", "") must be("node2")
System.getProperty("akka.cluster.port", "") must be("9992")
//wait till node 1 has started.
barrier("start-node1", NrOfNodes) {}
barrier("start-node1", NrOfNodes).await()
//wait till node 2 has started.
barrier("start-node2", NrOfNodes) {
@ -93,7 +93,7 @@ class RoundRobin2ReplicasMultiJvmNode2 extends WordSpec with MustMatchers {
}
//wait till node 3 has started.
barrier("start-node3", NrOfNodes) {}
barrier("start-node3", NrOfNodes).await()
//check if the actorRef is the expected remoteActorRef.
var hello: ActorRef = null

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992

View file

@ -0,0 +1 @@
-Dakka.cluster.nodename=node3 -Dakka.cluster.port=9993

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing.roundrobin_3_replicas
package akka.cluster.routing.roundrobin.replicationfactor_3
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
@ -35,7 +35,7 @@ object RoundRobin3ReplicasMultiJvmSpec {
class RoundRobin3ReplicasMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
import RoundRobin3ReplicasMultiJvmSpec._
"A cluster" must {
"Round Robin: A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" ignore {
@ -45,16 +45,16 @@ class RoundRobin3ReplicasMultiJvmNode1 extends WordSpec with MustMatchers with B
}
//wait till ndoe 2 has started.
barrier("start-node2", NrOfNodes) {}
barrier("start-node2", NrOfNodes).await()
//wait till node 3 has started.
barrier("start-node3", NrOfNodes) {}
barrier("start-node3", NrOfNodes).await()
//wait till an actor reference on node 2 has become available.
barrier("get-ref-to-actor-on-node2", NrOfNodes) {}
barrier("get-ref-to-actor-on-node2", NrOfNodes).await()
//wait till the node 2 has send a message to the replica's.
barrier("send-message-from-node2-to-replicas", NrOfNodes) {}
barrier("send-message-from-node2-to-replicas", NrOfNodes).await()
node.shutdown()
}
@ -73,12 +73,12 @@ class RoundRobin3ReplicasMultiJvmNode2 extends WordSpec with MustMatchers {
import RoundRobin3ReplicasMultiJvmSpec._
import Cluster._
"A cluster" must {
"Round Robin: A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" ignore {
//wait till node 1 has started.
barrier("start-node1", NrOfNodes) {}
barrier("start-node1", NrOfNodes).await()
//wait till node 2 has started.
barrier("start-node2", NrOfNodes) {
@ -86,7 +86,7 @@ class RoundRobin3ReplicasMultiJvmNode2 extends WordSpec with MustMatchers {
}
//wait till node 3 has started.
barrier("start-node3", NrOfNodes) {}
barrier("start-node3", NrOfNodes).await()
//check if the actorRef is the expected remoteActorRef.
var hello: ActorRef = null
@ -134,20 +134,20 @@ class RoundRobin3ReplicasMultiJvmNode3 extends WordSpec with MustMatchers {
import RoundRobin3ReplicasMultiJvmSpec._
import Cluster._
"A cluster" must {
"Round Robin: A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" ignore {
barrier("start-node1", NrOfNodes) {}
barrier("start-node1", NrOfNodes).await()
barrier("start-node2", NrOfNodes) {}
barrier("start-node2", NrOfNodes).await()
barrier("start-node3", NrOfNodes) {
node.start()
}
barrier("get-ref-to-actor-on-node2", NrOfNodes) {}
barrier("get-ref-to-actor-on-node2", NrOfNodes).await()
barrier("send-message-from-node2-to-replicas", NrOfNodes) {}
barrier("send-message-from-node2-to-replicas", NrOfNodes).await()
node.shutdown()
}

View file

@ -1 +0,0 @@
-Dakka.cluster.nodename=node4 -Dakka.cluster.port=9994

View file

@ -1,144 +0,0 @@
package akka.cluster.routing.roundrobin_failover
import akka.config.Config
import akka.cluster._
import akka.actor.{ ActorRef, Actor }
object RoundRobinFailoverMultiJvmSpec {
val NrOfNodes = 2
class SomeActor extends Actor with Serializable {
println("---------------------------------------------------------------------------")
println("SomeActor has been created on node [" + Config.nodename + "]")
println("---------------------------------------------------------------------------")
def receive = {
case "identify" {
println("The node received the 'identify' command")
self.reply(Config.nodename)
}
case "shutdown" {
println("The node received the 'shutdown' command")
Cluster.node.shutdown()
}
}
}
}
class RoundRobinFailoverMultiJvmNode1 extends MasterClusterTestNode {
import RoundRobinFailoverMultiJvmSpec._
val testNodes = NrOfNodes
"foo" must {
"bla" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
println("Getting reference to service-hello actor")
var hello: ActorRef = null
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {
hello = Actor.actorOf[SomeActor]("service-hello")
}
println("Successfully acquired reference")
Cluster.barrier("waiting-to-end", NrOfNodes).await()
Cluster.node.shutdown()
}
}
}
class RoundRobinFailoverMultiJvmNode2 extends ClusterTestNode {
import RoundRobinFailoverMultiJvmSpec._
"foo" must {
"bla" in {
println("Started Zookeeper Node")
Cluster.node.start()
println("Waiting to begin")
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
println("Begin!")
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes).await()
// ============= the real testing =================
/*
val actor = Actor.actorOf[SomeActor]("service-hello")
val firstTimeResult = (actor ? "identify").get
val secondTimeResult = (actor ? "identify").get
//since there are only 2 nodes, the identity should not have changed.
assert(firstTimeResult == secondTimeResult)
//if we now terminate the node that
actor ! "shutdown"
//todo: do some waiting
println("Doing some sleep")
try {
Thread.sleep(4000) //nasty.. but ok for now.
println("Finished doing sleep")
} finally {
println("Ended the Thread.sleep method somehow..")
}
//now we should get a different node that responds to us since there was a failover.
val thirdTimeResult = (actor ? "identify").get
assert(!(firstTimeResult == thirdTimeResult)) */
// ==================================================
println("Waiting to end")
Cluster.barrier("waiting-to-end", NrOfNodes).await()
println("Shutting down ClusterNode")
Cluster.node.shutdown()
}
}
}
/*
class RoundRobinFailoverMultiJvmNode3 extends SlaveNode {
import RoundRobinFailoverMultiJvmSpec._
"foo" must {
"bla" in {
println("Started Zookeeper Node")
Cluster.node.start()
println("Waiting to begin")
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
println("Begin!")
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes).await()
println("Waiting to end")
Cluster.barrier("waiting-to-end", NrOfNodes).await()
println("Shutting down ClusterNode")
Cluster.node.shutdown()
}
}
}
class RoundRobinFailoverMultiJvmNode4 extends SlaveNode {
import RoundRobinFailoverMultiJvmSpec._
"foo" must {
"bla" in {
println("Started Zookeeper Node")
Cluster.node.start()
println("Waiting to begin")
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
println("Begin!")
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes).await()
println("Waiting to end")
Cluster.barrier("waiting-to-end", NrOfNodes).await()
println("Shutting down ClusterNode")
Cluster.node.shutdown()
}
}
} */

View file

@ -1,6 +0,0 @@
What does clustered home mean?
akka.actor.deployment.service-hello.clustered.home = "node:node1"
If a node fails, it should transparently be redeployed on a different node. So actors imho are homeless.. they run
wherever the grid deploys them.

Some files were not shown because too many files have changed in this diff Show more