From 44385d1092abc64c0d41f84a2987aa7a86a6ba8c Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Thu, 31 Mar 2011 20:32:04 +1300 Subject: [PATCH 01/11] Rework the tests in actor/actor --- .../ActorFireForgetRequestReplySpec.scala | 82 +++++---- .../scala/akka/actor/actor/ActorRefSpec.scala | 50 +++--- .../scala/akka/actor/actor/FSMActorSpec.scala | 107 ++++++----- .../akka/actor/actor/FSMTimingSpec.scala | 12 +- .../akka/actor/actor/ForwardActorSpec.scala | 49 ++--- .../scala/akka/actor/actor/HotSwapSpec.scala | 16 +- .../akka/actor/actor/ReceiveTimeoutSpec.scala | 168 ++++++++++-------- .../scala/akka/dispatch/ActorModelSpec.scala | 18 +- .../test/scala/akka/routing/RoutingSpec.scala | 2 +- .../test/scala/akka/testing/TestBarrier.scala | 40 +++++ .../test/scala/akka/testing/TestLatch.scala | 55 ++++++ .../scala/akka/{ => testing}/Testing.scala | 14 +- .../scala/remote/RemoteTypedActorSpec.scala | 6 +- .../typed-actor/TypedActorLifecycleSpec.scala | 6 +- 14 files changed, 373 insertions(+), 252 deletions(-) create mode 100644 akka-actor/src/test/scala/akka/testing/TestBarrier.scala create mode 100644 akka-actor/src/test/scala/akka/testing/TestLatch.scala rename akka-actor/src/test/scala/akka/{ => testing}/Testing.scala (57%) diff --git a/akka-actor/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala index 1eef7f068c..3c35b42b9a 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala @@ -1,18 +1,25 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + package akka.actor -import java.util.concurrent.{TimeUnit, CyclicBarrier, TimeoutException} -import akka.config.Supervision._ -import org.scalatest.junit.JUnitSuite -import org.junit.Test +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import org.scalatest.BeforeAndAfterEach + +import akka.testing._ +import akka.testing.Testing.sleepFor +import akka.util.duration._ -import akka.dispatch.Dispatchers import Actor._ +import akka.config.Supervision._ +import akka.dispatch.Dispatchers -import akka.Testing object ActorFireForgetRequestReplySpec { - class ReplyActor extends Actor { + class ReplyActor extends Actor { def receive = { case "Send" => self.reply("Reply") @@ -32,7 +39,6 @@ object ActorFireForgetRequestReplySpec { } class SenderActor(replyActor: ActorRef) extends Actor { - def receive = { case "Init" => replyActor ! "Send" @@ -50,44 +56,42 @@ object ActorFireForgetRequestReplySpec { object state { var s = "NIL" - val finished = new CyclicBarrier(2) + val finished = TestBarrier(2) } } -class ActorFireForgetRequestReplySpec extends JUnitSuite { +class ActorFireForgetRequestReplySpec extends WordSpec with MustMatchers with BeforeAndAfterEach { import ActorFireForgetRequestReplySpec._ - @Test - def shouldReplyToBangMessageUsingReply = { + override def beforeEach() = { state.finished.reset - val replyActor = actorOf[ReplyActor].start - val senderActor = actorOf(new SenderActor(replyActor)).start - senderActor ! "Init" - try { state.finished.await(1L, TimeUnit.SECONDS) } - catch { case e: TimeoutException => fail("Never got the message") } - assert("Reply" === state.s) - } + } + + "An Actor" should { - @Test - def shouldReplyToBangMessageUsingImplicitSender = { - state.finished.reset - val replyActor = actorOf[ReplyActor].start - val senderActor = actorOf(new SenderActor(replyActor)).start - senderActor ! "InitImplicit" - try { state.finished.await(1L, TimeUnit.SECONDS) } - catch { case e: TimeoutException => fail("Never got the message") } - assert("ReplyImplicit" === state.s) - } + "reply to bang message using reply" in { + val replyActor = actorOf[ReplyActor].start + val senderActor = actorOf(new SenderActor(replyActor)).start + senderActor ! "Init" + state.finished.await + state.s must be ("Reply") + } - @Test - def shouldShutdownCrashedTemporaryActor = { - state.finished.reset - val actor = actorOf[CrashingTemporaryActor].start - assert(actor.isRunning) - actor ! "Die" - try { state.finished.await(10L, TimeUnit.SECONDS) } - catch { case e: TimeoutException => fail("Never got the message") } - Thread.sleep(Testing.time(500)) - assert(actor.isShutdown) + "reply to bang message using implicit sender" in { + val replyActor = actorOf[ReplyActor].start + val senderActor = actorOf(new SenderActor(replyActor)).start + senderActor ! "InitImplicit" + state.finished.await + state.s must be ("ReplyImplicit") + } + + "should shutdown crashed temporary actor" in { + val actor = actorOf[CrashingTemporaryActor].start + actor.isRunning must be (true) + actor ! "Die" + state.finished.await + sleepFor(1 second) + actor.isShutdown must be (true) + } } } diff --git a/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala index d5d6b28edc..c9ea06cde8 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala @@ -4,19 +4,20 @@ package akka.actor -import org.scalatest.Spec -import org.scalatest.matchers.ShouldMatchers -import org.scalatest.BeforeAndAfterAll -import org.scalatest.junit.JUnitRunner -import org.junit.runner.RunWith +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers + +import akka.testing._ +import akka.util.duration._ +import akka.testing.Testing.sleepFor import akka.actor._ import akka.dispatch.Future -import java.util.concurrent.{CountDownLatch, TimeUnit} + object ActorRefSpec { - var latch = new CountDownLatch(4) + val latch = TestLatch(4) class ReplyActor extends Actor { var replyTo: Channel[Any] = null @@ -49,7 +50,7 @@ object ActorRefSpec { } private def work { - Thread.sleep(1000) + sleepFor(1 second) } } @@ -69,16 +70,12 @@ object ActorRefSpec { } } -@RunWith(classOf[JUnitRunner]) -class ActorRefSpec extends - Spec with - ShouldMatchers with - BeforeAndAfterAll { - +class ActorRefSpec extends WordSpec with MustMatchers { import ActorRefSpec._ - describe("ActorRef") { - it("should support to reply via channel") { + "An ActorRef" should { + + "support reply via channel" in { val serverRef = Actor.actorOf[ReplyActor].start val clientRef = Actor.actorOf(new SenderActor(serverRef)).start @@ -86,18 +83,23 @@ class ActorRefSpec extends clientRef ! "simple" clientRef ! "simple" clientRef ! "simple" - assert(latch.await(4L, TimeUnit.SECONDS)) - latch = new CountDownLatch(4) + + latch.await + + latch.reset + clientRef ! "complex2" clientRef ! "simple" clientRef ! "simple" clientRef ! "simple" - assert(latch.await(4L, TimeUnit.SECONDS)) + + latch.await + clientRef.stop serverRef.stop } - it("should stop when sent a poison pill") { + "stop when sent a poison pill" in { val ref = Actor.actorOf( new Actor { def receive = { @@ -115,11 +117,11 @@ class ActorRefSpec extends fail("shouldn't get here") } - assert(ffive.resultOrException.get == "five") - assert(fnull.resultOrException.get == "null") + ffive.resultOrException.get must be ("five") + fnull.resultOrException.get must be ("null") - assert(ref.isRunning == false) - assert(ref.isShutdown == true) + ref.isRunning must be (false) + ref.isShutdown must be (true) } } } diff --git a/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala index cf910925c8..4f3e65caec 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala @@ -4,34 +4,31 @@ package akka.actor -import org.scalatest.junit.JUnitSuite -import org.junit.Test +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers + +import akka.testing._ + import FSM._ - -import org.multiverse.api.latches.StandardLatch - -import java.util.concurrent.TimeUnit - +import akka.util.Duration import akka.util.duration._ -import akka.Testing object FSMActorSpec { - - val unlockedLatch = new StandardLatch - val lockedLatch = new StandardLatch - val unhandledLatch = new StandardLatch - val terminatedLatch = new StandardLatch - val transitionLatch = new StandardLatch - val initialStateLatch = new StandardLatch - val transitionCallBackLatch = new StandardLatch + val unlockedLatch = TestLatch() + val lockedLatch = TestLatch() + val unhandledLatch = TestLatch() + val terminatedLatch = TestLatch() + val transitionLatch = TestLatch() + val initialStateLatch = TestLatch() + val transitionCallBackLatch = TestLatch() sealed trait LockState case object Locked extends LockState case object Open extends LockState - class Lock(code: String, timeout: (Long, TimeUnit)) extends Actor with FSM[LockState, CodeState] { + class Lock(code: String, timeout: Duration) extends Actor with FSM[LockState, CodeState] { startWith(Locked, CodeState("", code)) @@ -94,53 +91,53 @@ object FSMActorSpec { case class CodeState(soFar: String, code: String) } -class FSMActorSpec extends JUnitSuite { +class FSMActorSpec extends WordSpec with MustMatchers { import FSMActorSpec._ + "An FSM Actor" should { - @Test - def unlockTheLock = { + "unlock the lock" in { + + // lock that locked after being open for 1 sec + val lock = Actor.actorOf(new Lock("33221", 1 second)).start - // lock that locked after being open for 1 sec - val lock = Actor.actorOf(new Lock("33221", (Testing.time(1), TimeUnit.SECONDS))).start + val transitionTester = Actor.actorOf(new Actor { def receive = { + case Transition(_, _, _) => transitionCallBackLatch.open + case CurrentState(_, Locked) => initialStateLatch.open + }}).start - val transitionTester = Actor.actorOf(new Actor { def receive = { - case Transition(_, _, _) => transitionCallBackLatch.open - case CurrentState(_, Locked) => initialStateLatch.open - }}).start + lock ! SubscribeTransitionCallBack(transitionTester) + initialStateLatch.await - lock ! SubscribeTransitionCallBack(transitionTester) - assert(initialStateLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS)) + lock ! '3' + lock ! '3' + lock ! '2' + lock ! '2' + lock ! '1' - lock ! '3' - lock ! '3' - lock ! '2' - lock ! '2' - lock ! '1' + unlockedLatch.await + transitionLatch.await + transitionCallBackLatch.await + lockedLatch.await - assert(unlockedLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS)) - assert(transitionLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS)) - assert(transitionCallBackLatch.tryAwait(Testing.time(1), TimeUnit.SECONDS)) - assert(lockedLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS)) + lock ! "not_handled" + unhandledLatch.await + val answerLatch = TestLatch() + object Hello + object Bye + val tester = Actor.actorOf(new Actor { + protected def receive = { + case Hello => lock ! "hello" + case "world" => answerLatch.open + case Bye => lock ! "bye" + } + }).start + tester ! Hello + answerLatch.await - lock ! "not_handled" - assert(unhandledLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS)) - - val answerLatch = new StandardLatch - object Hello - object Bye - val tester = Actor.actorOf(new Actor { - protected def receive = { - case Hello => lock ! "hello" - case "world" => answerLatch.open - case Bye => lock ! "bye" - } - }).start - tester ! Hello - assert(answerLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS)) - - tester ! Bye - assert(terminatedLatch.tryAwait(Testing.time(2), TimeUnit.SECONDS)) + tester ! Bye + terminatedLatch.await + } } } diff --git a/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala index a59785ab7a..2ea9525a3d 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala @@ -1,16 +1,13 @@ package akka.actor -import akka.testkit.TestKit -import akka.util.duration._ - import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers -class FSMTimingSpec - extends WordSpec - with MustMatchers - with TestKit { +import akka.testkit.TestKit +import akka.util.duration._ + +class FSMTimingSpec extends WordSpec with MustMatchers with TestKit { import FSMTimingSpec._ import FSM._ @@ -140,4 +137,3 @@ object FSMTimingSpec { } -// vim: set ts=2 sw=2 et: diff --git a/akka-actor/src/test/scala/akka/actor/actor/ForwardActorSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/ForwardActorSpec.scala index 3a1efe1fe8..d9670bf471 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/ForwardActorSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/ForwardActorSpec.scala @@ -1,18 +1,25 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + package akka.actor -import java.util.concurrent.{TimeUnit, CountDownLatch} -import org.scalatest.junit.JUnitSuite -import org.junit.Test +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers + +import akka.testing._ +import akka.util.duration._ import Actor._ + object ForwardActorSpec { object ForwardState { var sender: Option[ActorRef] = None } class ReceiverActor extends Actor { - val latch = new CountDownLatch(1) + val latch = TestLatch() def receive = { case "SendBang" => { ForwardState.sender = self.sender @@ -42,7 +49,7 @@ object ForwardActorSpec { } class BangBangSenderActor extends Actor { - val latch = new CountDownLatch(1) + val latch = TestLatch() val forwardActor = actorOf[ForwardActor] forwardActor.start (forwardActor !! "SendBangBang") match { @@ -55,27 +62,27 @@ object ForwardActorSpec { } } -class ForwardActorSpec extends JUnitSuite { +class ForwardActorSpec extends WordSpec with MustMatchers { import ForwardActorSpec._ - @Test - def shouldForwardActorReferenceWhenInvokingForwardOnBang { - val senderActor = actorOf[BangSenderActor] - val latch = senderActor.actor.asInstanceOf[BangSenderActor] + "A Forward Actor" should { + "forward actor reference when invoking forward on bang" in { + val senderActor = actorOf[BangSenderActor] + val latch = senderActor.actor.asInstanceOf[BangSenderActor] .forwardActor.actor.asInstanceOf[ForwardActor] .receiverActor.actor.asInstanceOf[ReceiverActor] .latch - senderActor.start - assert(latch.await(1L, TimeUnit.SECONDS)) - assert(ForwardState.sender ne null) - assert(senderActor.toString === ForwardState.sender.get.toString) - } + senderActor.start + latch.await + ForwardState.sender must not be (null) + senderActor.toString must be (ForwardState.sender.get.toString) + } - @Test - def shouldForwardActorReferenceWhenInvokingForwardOnBangBang { - val senderActor = actorOf[BangBangSenderActor] - senderActor.start - val latch = senderActor.actor.asInstanceOf[BangBangSenderActor].latch - assert(latch.await(1L, TimeUnit.SECONDS)) + "forward actor reference when invoking forward on bang bang" in { + val senderActor = actorOf[BangBangSenderActor] + senderActor.start + val latch = senderActor.actor.asInstanceOf[BangBangSenderActor].latch + latch.await + } } } diff --git a/akka-actor/src/test/scala/akka/actor/actor/HotSwapSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/HotSwapSpec.scala index 011141c746..8632da8f6e 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/HotSwapSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/HotSwapSpec.scala @@ -1,17 +1,23 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + package akka.actor import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers + +import akka.testing._ + import Actor._ -import java.util.concurrent.CyclicBarrier class HotSwapSpec extends WordSpec with MustMatchers { "An Actor" should { "be able to hotswap its behavior with HotSwap(..)" in { - val barrier = new CyclicBarrier(2) + val barrier = TestBarrier(2) @volatile var _log = "" val a = actorOf( new Actor { def receive = { case _ => _log += "default" } @@ -27,7 +33,7 @@ class HotSwapSpec extends WordSpec with MustMatchers { } "be able to hotswap its behavior with become(..)" in { - val barrier = new CyclicBarrier(2) + val barrier = TestBarrier(2) @volatile var _log = "" val a = actorOf(new Actor { def receive = { @@ -55,7 +61,7 @@ class HotSwapSpec extends WordSpec with MustMatchers { } "be able to revert hotswap its behavior with RevertHotSwap(..)" in { - val barrier = new CyclicBarrier(2) + val barrier = TestBarrier(2) @volatile var _log = "" val a = actorOf( new Actor { def receive = { @@ -100,7 +106,7 @@ class HotSwapSpec extends WordSpec with MustMatchers { } "be able to revert hotswap its behavior with unbecome" in { - val barrier = new CyclicBarrier(2) + val barrier = TestBarrier(2) @volatile var _log = "" val a = actorOf(new Actor { def receive = { diff --git a/akka-actor/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala index 9e5fba863e..87b08862d3 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala @@ -1,108 +1,120 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + package akka.actor -import org.scalatest.junit.JUnitSuite -import org.junit.Test +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers + +import akka.testing._ +import akka.util.duration._ -import java.util.concurrent.TimeUnit -import org.multiverse.api.latches.StandardLatch import Actor._ import java.util.concurrent.atomic.AtomicInteger -class ReceiveTimeoutSpec extends JUnitSuite { - @Test def receiveShouldGetTimeout= { +class ReceiveTimeoutSpec extends WordSpec with MustMatchers { + import Actor._ - val timeoutLatch = new StandardLatch + "An actor with receive timeout" should { - val timeoutActor = actorOf(new Actor { - self.receiveTimeout = Some(500L) + "get timeout" in { + val timeoutLatch = TestLatch() - protected def receive = { - case ReceiveTimeout => timeoutLatch.open - } - }).start + val timeoutActor = actorOf(new Actor { + self.receiveTimeout = Some(500L) - assert(timeoutLatch.tryAwait(3, TimeUnit.SECONDS)) - timeoutActor.stop - } + protected def receive = { + case ReceiveTimeout => timeoutLatch.open + } + }).start - @Test def swappedReceiveShouldAlsoGetTimout = { - val timeoutLatch = new StandardLatch + timeoutLatch.await + timeoutActor.stop + } - val timeoutActor = actorOf(new Actor { - self.receiveTimeout = Some(500L) + "get timeout when swapped" in { + val timeoutLatch = TestLatch() - protected def receive = { - case ReceiveTimeout => timeoutLatch.open - } - }).start + val timeoutActor = actorOf(new Actor { + self.receiveTimeout = Some(500L) - // after max 1 second the timeout should already been sent - assert(timeoutLatch.tryAwait(3, TimeUnit.SECONDS)) + protected def receive = { + case ReceiveTimeout => timeoutLatch.open + } + }).start - val swappedLatch = new StandardLatch - timeoutActor ! HotSwap(self => { - case ReceiveTimeout => swappedLatch.open - }) + timeoutLatch.await - assert(swappedLatch.tryAwait(3, TimeUnit.SECONDS)) - timeoutActor.stop - } + val swappedLatch = TestLatch() - @Test def timeoutShouldBeRescheduledAfterRegularReceive = { + timeoutActor ! HotSwap(self => { + case ReceiveTimeout => swappedLatch.open + }) - val timeoutLatch = new StandardLatch - case object Tick - val timeoutActor = actorOf(new Actor { - self.receiveTimeout = Some(500L) + swappedLatch.await + timeoutActor.stop + } + + "reschedule timeout after regular receive" in { + val timeoutLatch = TestLatch() + case object Tick - protected def receive = { - case Tick => () - case ReceiveTimeout => timeoutLatch.open - } - }).start - timeoutActor ! Tick + val timeoutActor = actorOf(new Actor { + self.receiveTimeout = Some(500L) - assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == true) - timeoutActor.stop - } + protected def receive = { + case Tick => () + case ReceiveTimeout => timeoutLatch.open + } + }).start - @Test def timeoutShouldBeTurnedOffIfDesired = { - val count = new AtomicInteger(0) - val timeoutLatch = new StandardLatch - case object Tick - val timeoutActor = actorOf(new Actor { - self.receiveTimeout = Some(500L) + timeoutActor ! Tick - protected def receive = { - case Tick => () - case ReceiveTimeout => - count.incrementAndGet - timeoutLatch.open - self.receiveTimeout = None - } - }).start - timeoutActor ! Tick + timeoutLatch.await + timeoutActor.stop + } - assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == true) - assert(count.get === 1) - timeoutActor.stop - } + "be able to turn off timeout if desired" in { + val count = new AtomicInteger(0) + val timeoutLatch = TestLatch() + case object Tick - @Test def timeoutShouldNotBeSentWhenNotSpecified = { - val timeoutLatch = new StandardLatch - val timeoutActor = actorOf(new Actor { + val timeoutActor = actorOf(new Actor { + self.receiveTimeout = Some(500L) - protected def receive = { - case ReceiveTimeout => timeoutLatch.open - } - }).start + protected def receive = { + case Tick => () + case ReceiveTimeout => + count.incrementAndGet + timeoutLatch.open + self.receiveTimeout = None + } + }).start - assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS) == false) - timeoutActor.stop - } + timeoutActor ! Tick - @Test def ActorsReceiveTimeoutShouldBeReceiveTimeout { - assert(akka.actor.Actors.receiveTimeout() eq ReceiveTimeout) + timeoutLatch.await + count.get must be (1) + timeoutActor.stop + } + + "not receive timeout message when not specified" in { + val timeoutLatch = TestLatch() + + val timeoutActor = actorOf(new Actor { + protected def receive = { + case ReceiveTimeout => timeoutLatch.open + } + }).start + + timeoutLatch.awaitTimeout(1 second) // timeout expected + timeoutActor.stop + } + + "have ReceiveTimeout eq to Actors ReceiveTimeout" in { + akka.actor.Actors.receiveTimeout() must be theSameInstanceAs (ReceiveTimeout) + } } } diff --git a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala index 6b154b42a9..e875ac87e0 100644 --- a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -6,6 +6,7 @@ package akka.actor.dispatch import org.scalatest.junit.JUnitSuite import org.junit.Test import org.scalatest.Assertions._ +import akka.testing._ import akka.dispatch._ import akka.actor.{ActorRef, Actor} import akka.actor.Actor._ @@ -13,7 +14,6 @@ import java.util.concurrent.atomic.AtomicLong import java.util.concurrent. {ConcurrentHashMap, CountDownLatch, TimeUnit} import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor import akka.util.{Duration, Switch} -import akka.Testing object ActorModelSpec { @@ -225,13 +225,13 @@ abstract class ActorModelSpec extends JUnitSuite { a.start a ! CountDown(start) - assertCountDown(start, Testing.time(3000), "Should process first message within 3 seconds") + assertCountDown(start, Testing.testTime(3000), "Should process first message within 3 seconds") assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1) a ! Wait(1000) a ! CountDown(oneAtATime) // in case of serialization violation, restart would happen instead of count down - assertCountDown(oneAtATime, Testing.time(1500) ,"Processed message when allowed") + assertCountDown(oneAtATime, Testing.testTime(1500) ,"Processed message when allowed") assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 3) a.stop @@ -246,7 +246,7 @@ abstract class ActorModelSpec extends JUnitSuite { def start = spawn { for (i <- 1 to 20) { a ! WaitAck(1, counter) } } for (i <- 1 to 10) { start } - assertCountDown(counter, Testing.time(3000), "Should process 200 messages") + assertCountDown(counter, Testing.testTime(3000), "Should process 200 messages") assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200) a.stop @@ -264,10 +264,10 @@ abstract class ActorModelSpec extends JUnitSuite { val aStart,aStop,bParallel = new CountDownLatch(1) a ! Meet(aStart,aStop) - assertCountDown(aStart, Testing.time(3000), "Should process first message within 3 seconds") + assertCountDown(aStart, Testing.testTime(3000), "Should process first message within 3 seconds") b ! CountDown(bParallel) - assertCountDown(bParallel, Testing.time(3000), "Should process other actors in parallel") + assertCountDown(bParallel, Testing.testTime(3000), "Should process other actors in parallel") aStop.countDown() a.stop @@ -282,7 +282,7 @@ abstract class ActorModelSpec extends JUnitSuite { val done = new CountDownLatch(1) a ! Restart a ! CountDown(done) - assertCountDown(done, Testing.time(3000), "Should be suspended+resumed and done with next message within 3 seconds") + assertCountDown(done, Testing.testTime(3000), "Should be suspended+resumed and done with next message within 3 seconds") a.stop assertRefDefaultZero(a)(registers = 1,unregisters = 1, msgsReceived = 2, msgsProcessed = 2, suspensions = 1, resumes = 1) @@ -298,7 +298,7 @@ abstract class ActorModelSpec extends JUnitSuite { assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, suspensions = 1) dispatcher.resume(a) - assertCountDown(done, Testing.time(3000), "Should resume processing of messages when resumed") + assertCountDown(done, Testing.testTime(3000), "Should resume processing of messages when resumed") assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1, suspensions = 1, resumes = 1) @@ -315,7 +315,7 @@ abstract class ActorModelSpec extends JUnitSuite { (1 to num) foreach { _ => newTestActor.start ! cachedMessage } - assertCountDown(cachedMessage.latch, Testing.time(10000), "Should process " + num + " countdowns") + assertCountDown(cachedMessage.latch, Testing.testTime(10000), "Should process " + num + " countdowns") } for(run <- 1 to 3) { flood(10000) diff --git a/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala index 09e618e24c..975ef1ae52 100644 --- a/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor/src/test/scala/akka/routing/RoutingSpec.scala @@ -468,7 +468,7 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers val pool2 = actorOf(new TestPool2).start pool2 ! "a" pool2 ! "b" - done = latch.await(1,TimeUnit.SECONDS) + done = latch.await(1, TimeUnit.SECONDS) done must be (true) delegates.size must be (2) pool2 stop diff --git a/akka-actor/src/test/scala/akka/testing/TestBarrier.scala b/akka-actor/src/test/scala/akka/testing/TestBarrier.scala new file mode 100644 index 0000000000..650ef7de79 --- /dev/null +++ b/akka-actor/src/test/scala/akka/testing/TestBarrier.scala @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.testing + +import akka.util.Duration +import java.util.concurrent.{CyclicBarrier, TimeUnit, TimeoutException} + + +class TestBarrierTimeoutException(message: String) extends RuntimeException(message) + +/** + * A cyclic barrier wrapper for use in testing. + * It always uses a timeout when waiting and timeouts are specified as durations. + * Timeouts will always throw an exception. The default timeout is 5 seconds. + * Timeouts are multiplied by the testing time factor for Jenkins builds. + */ +object TestBarrier { + val DefaultTimeout = Duration(5, TimeUnit.SECONDS) + + def apply(count: Int) = new TestBarrier(count) +} + +class TestBarrier(count: Int) { + private val barrier = new CyclicBarrier(count) + + def await(): Unit = await(TestBarrier.DefaultTimeout) + + def await(timeout: Duration): Unit = { + try { + barrier.await(Testing.testTime(timeout.toNanos), TimeUnit.NANOSECONDS) + } catch { + case e: TimeoutException => + throw new TestBarrierTimeoutException("Timeout of %s and time factor of %s" format (timeout.toString, Testing.timeFactor)) + } + } + + def reset = barrier.reset +} diff --git a/akka-actor/src/test/scala/akka/testing/TestLatch.scala b/akka-actor/src/test/scala/akka/testing/TestLatch.scala new file mode 100644 index 0000000000..b975145963 --- /dev/null +++ b/akka-actor/src/test/scala/akka/testing/TestLatch.scala @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.testing + +import akka.util.Duration +import java.util.concurrent.{CountDownLatch, TimeUnit} + + +class TestLatchTimeoutException(message: String) extends RuntimeException(message) +class TestLatchNoTimeoutException(message: String) extends RuntimeException(message) + +/** + * A count down latch wrapper for use in testing. + * It always uses a timeout when waiting and timeouts are specified as durations. + * There's a default timeout of 5 seconds and the default count is 1. + * Timeouts will always throw an exception (no need to wrap in assert in tests). + * Timeouts are multiplied by the testing time factor for Jenkins builds. + */ +object TestLatch { + val DefaultTimeout = Duration(5, TimeUnit.SECONDS) + + def apply(count: Int = 1) = new TestLatch(count) +} + +class TestLatch(count: Int = 1) { + private var latch = new CountDownLatch(count) + + def countDown = latch.countDown + + def open = countDown + + def await(): Boolean = await(TestLatch.DefaultTimeout) + + def await(timeout: Duration): Boolean = { + val opened = latch.await(Testing.testTime(timeout.toNanos), TimeUnit.NANOSECONDS) + if (!opened) throw new TestLatchTimeoutException( + "Timeout of %s with time factor of %s" format (timeout.toString, Testing.timeFactor)) + opened + } + + /** + * Timeout is expected. Throws exception if latch is opened before timeout. + */ + def awaitTimeout(timeout: Duration = TestLatch.DefaultTimeout) = { + val opened = latch.await(Testing.testTime(timeout.toNanos), TimeUnit.NANOSECONDS) + if (opened) throw new TestLatchNoTimeoutException( + "Latch opened before timeout of %s with time factor of %s" format (timeout.toString, Testing.timeFactor)) + opened + } + + def reset = latch = new CountDownLatch(count) +} + diff --git a/akka-actor/src/test/scala/akka/Testing.scala b/akka-actor/src/test/scala/akka/testing/Testing.scala similarity index 57% rename from akka-actor/src/test/scala/akka/Testing.scala rename to akka-actor/src/test/scala/akka/testing/Testing.scala index afc0c4a05a..d957b26af7 100644 --- a/akka-actor/src/test/scala/akka/Testing.scala +++ b/akka-actor/src/test/scala/akka/testing/Testing.scala @@ -2,7 +2,9 @@ * Copyright (C) 2009-2011 Scalable Solutions AB */ -package akka +package akka.testing + +import akka.util.Duration /** * Multiplying numbers used in test timeouts by a factor, set by system property. @@ -18,8 +20,10 @@ object Testing { } } - def time(t: Int): Int = (timeFactor * t).toInt - def time(t: Long): Long = (timeFactor * t).toLong - def time(t: Float): Float = (timeFactor * t).toFloat - def time(t: Double): Double = timeFactor * t + def testTime(t: Int): Int = (timeFactor * t).toInt + def testTime(t: Long): Long = (timeFactor * t).toLong + def testTime(t: Float): Float = (timeFactor * t).toFloat + def testTime(t: Double): Double = timeFactor * t + + def sleepFor(duration: Duration) = Thread.sleep(testTime(duration.toMillis)) } diff --git a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala index c91565eec7..988236b85b 100644 --- a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala +++ b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala @@ -10,7 +10,7 @@ import akka.actor._ import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue} import akka.config. {RemoteAddress, Config, TypedActorConfigurator} -import akka.Testing +import akka.testing._ object RemoteTypedActorLog { val messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String] @@ -39,13 +39,13 @@ class RemoteTypedActorSpec extends AkkaRemoteTest { classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], Permanent, - Testing.time(10000), + Testing.testTime(20000), RemoteAddress(host,port)), new SuperviseTypedActor( classOf[RemoteTypedActorTwo], classOf[RemoteTypedActorTwoImpl], Permanent, - Testing.time(10000), + Testing.testTime(20000), RemoteAddress(host,port)) ).toArray).supervise } diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala index 0e27557607..a9c7c694bf 100644 --- a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala +++ b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala @@ -12,8 +12,6 @@ import akka.config.Supervision._ import java.util.concurrent.CountDownLatch import akka.config.TypedActorConfigurator -import akka.Testing - /** * @author Martin Krasser */ @@ -97,7 +95,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft } it("should be stopped when supervision cannot handle the problem in") { - val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), Testing.time(30000)) + val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), 30000) val conf = new TypedActorConfigurator().configure(OneForOneStrategy(Nil, 3, 500000), Array(actorSupervision)).inject.supervise try { val first = conf.getInstance(classOf[TypedActorFailer]) @@ -123,7 +121,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft } it("should be restarted when supervision handles the problem in") { - val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), Testing.time(30000)) + val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(), 30000) val conf = new TypedActorConfigurator().configure(OneForOneStrategy(classOf[Throwable] :: Nil, 3, 500000), Array(actorSupervision)).inject.supervise try { val first = conf.getInstance(classOf[TypedActorFailer]) From 72dfe4770f314e34fdb756b7ad5a9b307338d9d3 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Thu, 31 Mar 2011 20:34:36 +1300 Subject: [PATCH 02/11] Should should be must, should must be must --- .../akka/actor/actor/ActorFireForgetRequestReplySpec.scala | 2 +- akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala | 2 +- akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala | 2 +- .../src/test/scala/akka/actor/actor/ForwardActorSpec.scala | 2 +- akka-actor/src/test/scala/akka/actor/actor/HotSwapSpec.scala | 2 +- .../src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala | 2 +- akka-actor/src/test/scala/akka/ticket/Ticket001Spec.scala | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/akka-actor/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala index 3c35b42b9a..ea31d57287 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala @@ -67,7 +67,7 @@ class ActorFireForgetRequestReplySpec extends WordSpec with MustMatchers with Be state.finished.reset } - "An Actor" should { + "An Actor" must { "reply to bang message using reply" in { val replyActor = actorOf[ReplyActor].start diff --git a/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala index c9ea06cde8..2967bf8821 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala @@ -73,7 +73,7 @@ object ActorRefSpec { class ActorRefSpec extends WordSpec with MustMatchers { import ActorRefSpec._ - "An ActorRef" should { + "An ActorRef" must { "support reply via channel" in { val serverRef = Actor.actorOf[ReplyActor].start diff --git a/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala index 4f3e65caec..91e6f92873 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/FSMActorSpec.scala @@ -94,7 +94,7 @@ object FSMActorSpec { class FSMActorSpec extends WordSpec with MustMatchers { import FSMActorSpec._ - "An FSM Actor" should { + "An FSM Actor" must { "unlock the lock" in { diff --git a/akka-actor/src/test/scala/akka/actor/actor/ForwardActorSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/ForwardActorSpec.scala index d9670bf471..b52b7c6b14 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/ForwardActorSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/ForwardActorSpec.scala @@ -65,7 +65,7 @@ object ForwardActorSpec { class ForwardActorSpec extends WordSpec with MustMatchers { import ForwardActorSpec._ - "A Forward Actor" should { + "A Forward Actor" must { "forward actor reference when invoking forward on bang" in { val senderActor = actorOf[BangSenderActor] val latch = senderActor.actor.asInstanceOf[BangSenderActor] diff --git a/akka-actor/src/test/scala/akka/actor/actor/HotSwapSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/HotSwapSpec.scala index 8632da8f6e..e985d383b6 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/HotSwapSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/HotSwapSpec.scala @@ -14,7 +14,7 @@ import Actor._ class HotSwapSpec extends WordSpec with MustMatchers { - "An Actor" should { + "An Actor" must { "be able to hotswap its behavior with HotSwap(..)" in { val barrier = TestBarrier(2) diff --git a/akka-actor/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala index 87b08862d3..c48fbd6f9d 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala @@ -17,7 +17,7 @@ import java.util.concurrent.atomic.AtomicInteger class ReceiveTimeoutSpec extends WordSpec with MustMatchers { import Actor._ - "An actor with receive timeout" should { + "An actor with receive timeout" must { "get timeout" in { val timeoutLatch = TestLatch() diff --git a/akka-actor/src/test/scala/akka/ticket/Ticket001Spec.scala b/akka-actor/src/test/scala/akka/ticket/Ticket001Spec.scala index d4de2675fb..e1a862e03c 100644 --- a/akka-actor/src/test/scala/akka/ticket/Ticket001Spec.scala +++ b/akka-actor/src/test/scala/akka/ticket/Ticket001Spec.scala @@ -5,7 +5,7 @@ import org.scalatest.matchers.MustMatchers class Ticket001Spec extends WordSpec with MustMatchers { - "An XXX" should { + "An XXX" must { "do YYY" in { 1 must be (1) } From cdbde3f661b957b3456f29dc24618dd17cf5ef99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Thu, 31 Mar 2011 15:55:30 +0200 Subject: [PATCH 03/11] Added comment about broken TypedActor remoting behavior --- .../scala/akka/remoteinterface/RemoteInterface.scala | 10 +++++++--- .../scala/akka/remote/netty/NettyRemoteSupport.scala | 11 +++++++++-- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 185f0d2799..d168361bd4 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -17,7 +17,7 @@ import java.util.concurrent.ConcurrentHashMap import java.io.{PrintWriter, PrintStream} trait RemoteModule { - val UUID_PREFIX = "uuid:" + val UUID_PREFIX = "uuid:".intern def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope protected[akka] def notifyListeners(message: => Any): Unit @@ -84,7 +84,6 @@ case class RemoteClientWriteFailed( @BeanProperty client: RemoteClientModule, @BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent - /** * Life-cycle events for RemoteServer. */ @@ -120,7 +119,12 @@ class RemoteClientException private[akka] ( val remoteAddress: InetSocketAddress) extends AkkaException(message) /** - * Returned when a remote exception sent over the wire cannot be loaded and instantiated + * Thrown when the remote server actor dispatching fails for some reason. + */ +class RemoteServerException private[akka] (message: String) extends AkkaException(message) + +/** + * Thrown when a remote exception sent over the wire cannot be loaded and instantiated */ case class CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException private[akka] (cause: Throwable, originalClassName: String, originalMessage: String) extends AkkaException("\nParsingError[%s]\nOriginalException[%s]\nOriginalMessage[%s]" diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index dd7a22df52..d9ded4b0e2 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -1049,7 +1049,7 @@ class RemoteServerHandler( throw firstException targetMethod - } + } try { val messageReceiver = resolveMethod(typedActor.getClass, ownerTypeHint, typedActorInfo.getMethod, argClasses) @@ -1230,7 +1230,14 @@ class RemoteServerHandler( server.findTypedActorByIdOrUuid(actorInfo.getId, parseUuid(uuid).toString) match { case null => // the actor has not been registered globally. See if we have it in the session createTypedSessionActor(actorInfo, channel) match { - case null => createClientManagedTypedActor(actorInfo) //Maybe client managed actor? + case null => + // FIXME this is broken, if a user tries to get a server-managed typed actor and that is not registered then a client-managed typed actor is created, but just throwing an exception here causes client-managed typed actors to fail + +/* val e = new RemoteServerException("Can't load remote Typed Actor for [" + actorInfo.getId + "]") + EventHandler.error(e, this, e.getMessage) + server.notifyListeners(RemoteServerError(e, server)) + throw e +*/ createClientManagedTypedActor(actorInfo) // client-managed actor case sessionActor => sessionActor } case typedActor => typedActor From e7c13253630c0c8c4361b66ebc16a5902a216397 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 1 Apr 2011 12:00:10 +0200 Subject: [PATCH 04/11] Adding Kill message, synonymous with Restart(new ActorKilledException(msg)) --- .../src/main/scala/akka/actor/Actor.scala | 3 +++ .../scala/akka/actor/actor/ActorRefSpec.scala | 27 +++++++++++++++++-- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index e70b4a98ae..16dfbeba4a 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -61,6 +61,8 @@ case class UnlinkAndStop(child: ActorRef) extends AutoReceivedMessage with LifeC case object PoisonPill extends AutoReceivedMessage with LifeCycleMessage +case object Kill extends AutoReceivedMessage with LifeCycleMessage + case object ReceiveTimeout extends LifeCycleMessage case class MaximumNumberOfRestartsWithinTimeRangeReached( @@ -465,6 +467,7 @@ trait Actor { case Unlink(child) => self.unlink(child) case UnlinkAndStop(child) => self.unlink(child); child.stop case Restart(reason) => throw reason + case Kill => throw new ActorKilledException("Kill") case PoisonPill => val f = self.senderFuture self.stop diff --git a/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala index 2967bf8821..d379f3f98c 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/ActorRefSpec.scala @@ -10,10 +10,10 @@ import org.scalatest.matchers.MustMatchers import akka.testing._ import akka.util.duration._ import akka.testing.Testing.sleepFor - +import akka.config.Supervision.{OneForOneStrategy} import akka.actor._ import akka.dispatch.Future - +import java.util.concurrent.{TimeUnit, CountDownLatch} object ActorRefSpec { @@ -123,5 +123,28 @@ class ActorRefSpec extends WordSpec with MustMatchers { ref.isRunning must be (false) ref.isShutdown must be (true) } + + "restart when Kill:ed" in { + val latch = new CountDownLatch(2) + + val boss = Actor.actorOf(new Actor{ + self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), scala.Some(2), scala.Some(1000)) + + val ref = Actor.actorOf( + new Actor { + def receive = { case _ => } + override def preRestart(reason: Throwable) = latch.countDown + override def postRestart(reason: Throwable) = latch.countDown + } + ).start + + self link ref + + protected def receive = { case "sendKill" => ref ! Kill } + }).start + + boss ! "sendKill" + latch.await(5, TimeUnit.SECONDS) must be === true + } } } From 8f4dcfe22ae468dcfc3cce905064442049aa21a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 1 Apr 2011 14:28:08 +0200 Subject: [PATCH 05/11] Added first tutorial based on Scala and SBT --- .../project/build.properties | 5 + .../project/plugins/Plugins.scala | 6 ++ .../src/main/scala/Pi.scala | 101 ++++++++++++++++++ 3 files changed, 112 insertions(+) create mode 100644 akka-tutorials/akka-tutorial-pi-sbt/project/build.properties create mode 100644 akka-tutorials/akka-tutorial-pi-sbt/project/plugins/Plugins.scala create mode 100644 akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala diff --git a/akka-tutorials/akka-tutorial-pi-sbt/project/build.properties b/akka-tutorials/akka-tutorial-pi-sbt/project/build.properties new file mode 100644 index 0000000000..14f82f14d0 --- /dev/null +++ b/akka-tutorials/akka-tutorial-pi-sbt/project/build.properties @@ -0,0 +1,5 @@ +project.organization=se.scalablesolutions.akka +project.name=Akka Tutorial 1 SBT +project.version=1.0 +build.scala.versions=2.9.0.RC1 +sbt.version=0.7.5.RC1 diff --git a/akka-tutorials/akka-tutorial-pi-sbt/project/plugins/Plugins.scala b/akka-tutorials/akka-tutorial-pi-sbt/project/plugins/Plugins.scala new file mode 100644 index 0000000000..74a3d6b705 --- /dev/null +++ b/akka-tutorials/akka-tutorial-pi-sbt/project/plugins/Plugins.scala @@ -0,0 +1,6 @@ +import sbt._ + +class Plugins(info: ProjectInfo) extends PluginDefinition(info) { + val akkaRepo = "Akka Repo" at "http://akka.io/repository" + val akkaPlugin = "se.scalablesolutions.akka" % "akka-sbt-plugin" % "1.1-SNAPSHOT" +} diff --git a/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala new file mode 100644 index 0000000000..fb7dcfcc93 --- /dev/null +++ b/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala @@ -0,0 +1,101 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.tutorial.sbt.pi + +import akka.actor.{Actor, ActorRef} +import Actor._ +import akka.routing.{Routing, CyclicIterator} +import akka.event.EventHandler +import akka.dispatch.Dispatchers + +import System.{currentTimeMillis => now} +import java.util.concurrent.CountDownLatch + +object Main extends App { + Pi.calculate +} + +/* + Pi estimate: 3.1415926435897883 + + === 8 workers (with custom dispatcher 4/4) + Calculation time: 5163 millis + + === 8 workers (with default dispatcher) + Calculation time: 6789 millis + + === 4 workers + Calculation time: 5438 millis + + === 2 workers + Calculation time: 6002 millis + + === 1 workers + Calculation time: 8173 millis +*/ +object Pi { + val nrOfWorkers = 4 + val nrOfMessages = 10000 + val lengthOfCalculationRange = 10000 + + // ===== Messages ===== + sealed trait PiMessage + case class Work(arg: Int, fun: (Int) => Double) extends PiMessage + case class Result(value: Double) extends PiMessage + + // ===== Worker ===== + class Worker extends Actor { + def receive = { + case Work(arg, fun) => self.reply(Result(fun(arg))) + } + } + + // ===== Master ===== + class Master(nrOfMessages: Int, latch: CountDownLatch) extends Actor { + var pi: Double = _ + var count: Int = _ + var start: Long = _ + + def receive = { + case Result(value) => + pi += value + count += 1 + if (count == nrOfMessages) self.stop + } + + override def preStart = start = now + + override def postStop = { + EventHandler.info(this, "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis".format(pi, (now - start))) + Actor.registry.shutdownAll // shut down all workers + latch.countDown + } + } + + def calculate = { + val latch = new CountDownLatch(1) + + // create the master + val master = actorOf(new Master(nrOfMessages, latch)).start + + // the master ref is also the 'implicit sender' that the workers should reply to + implicit val replyTo = Option(master) + + // create the workers + val workers = new Array[ActorRef](nrOfWorkers) + for (i <- 0 until nrOfWorkers) workers(i) = actorOf[Worker].start + + // wrap them with a load-balancing router + val router = Routing.loadBalancerActor(CyclicIterator(workers)).start + + val fun = (x: Int) => (for (k <- (x * lengthOfCalculationRange) to ((x + 1) * lengthOfCalculationRange - 1)) yield (4 * math.pow(-1, k) / (2 * k + 1))).sum + + // schedule work + for (arg <- 0 until nrOfMessages) router ! Work(arg, fun) + + latch.await + } +} + From 827c678b49ba7596e7655604292df9eb631b3175 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 1 Apr 2011 14:29:26 +0200 Subject: [PATCH 06/11] Changed *Iterator to take a Seq instead of a List --- akka-actor/src/main/scala/akka/actor/Actor.scala | 10 +--------- .../scala/akka/remoteinterface/RemoteInterface.scala | 2 +- akka-actor/src/main/scala/akka/routing/Iterators.scala | 10 +++++----- akka-actor/src/main/scala/akka/routing/Routers.scala | 3 +-- 4 files changed, 8 insertions(+), 17 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index e70b4a98ae..0a549473a6 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -6,15 +6,10 @@ package akka.actor import akka.dispatch._ import akka.config.Config._ -import akka.config.Supervision._ -import akka.config.ConfigurationException import akka.util.Helpers.{narrow, narrowSilently} import akka.util.ListenerManagement import akka.AkkaException -import java.util.concurrent.TimeUnit -import java.net.InetSocketAddress - import scala.reflect.BeanProperty import akka.util. {ReflectiveAccess, Duration} import akka.remoteinterface.RemoteSupport @@ -277,9 +272,6 @@ object Actor extends ListenerManagement { * } * * - *

- * The Actor trait also has a 'log' member field that can be used for logging within the Actor. - * * @author Jonas Bonér */ trait Actor { @@ -353,7 +345,7 @@ trait Actor { *

* Example code: *

-   *   def receive =  {
+   *   def receive = {
    *     case Ping =>
    *       println("got a 'Ping' message")
    *       self.reply("pong")
diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala
index d168361bd4..3ccc60505d 100644
--- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala
+++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala
@@ -488,4 +488,4 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule =>
 
   @deprecated("Will be removed after 1.1")
   private[akka] def unregisterClientManagedActor(hostname: String, port: Int, uuid: Uuid): Unit
-}
\ No newline at end of file
+}
diff --git a/akka-actor/src/main/scala/akka/routing/Iterators.scala b/akka-actor/src/main/scala/akka/routing/Iterators.scala
index a01cc6fe2d..1643c6a6b0 100644
--- a/akka-actor/src/main/scala/akka/routing/Iterators.scala
+++ b/akka-actor/src/main/scala/akka/routing/Iterators.scala
@@ -15,10 +15,10 @@ trait InfiniteIterator[T] extends Iterator[T]
 /**
  * CyclicIterator is a round-robin style InfiniteIterator that cycles the supplied List.
  */
-class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] {
-  def this(items: java.util.List[T]) = this(items.toList)
+case class CyclicIterator[T](items: Seq[T]) extends InfiniteIterator[T] {
+  def this(items: java.util.List[T]) = this(items.toSeq)
 
-  @volatile private[this] var current: List[T] = items
+  @volatile private[this] var current: Seq[T] = items
 
   def hasNext = items != Nil
 
@@ -36,8 +36,8 @@ class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] {
  * This InfiniteIterator always returns the Actor that has the currently smallest mailbox
  * useful for work-stealing.
  */
-class SmallestMailboxFirstIterator(items : List[ActorRef]) extends InfiniteIterator[ActorRef] {
-  def this(items: java.util.List[ActorRef]) = this(items.toList)
+case class SmallestMailboxFirstIterator(items : Seq[ActorRef]) extends InfiniteIterator[ActorRef] {
+  def this(items: java.util.List[ActorRef]) = this(items.toSeq)
   def hasNext = items != Nil
 
   def next = items.reduceLeft((a1, a2) => if (a1.mailboxSize < a2.mailboxSize) a1 else a2)
diff --git a/akka-actor/src/main/scala/akka/routing/Routers.scala b/akka-actor/src/main/scala/akka/routing/Routers.scala
index b0283ce77d..2dcf3d0ccc 100644
--- a/akka-actor/src/main/scala/akka/routing/Routers.scala
+++ b/akka-actor/src/main/scala/akka/routing/Routers.scala
@@ -39,8 +39,7 @@ abstract class UntypedDispatcher extends UntypedActor {
   @throws(classOf[Exception])
   def onReceive(msg: Any): Unit = {
     val r = route(msg)
-    if(r eq null)
-      throw new IllegalStateException("No route for " + msg + " defined!")
+    if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!")
     if (isSenderDefined) r.forward(transform(msg))(someSelf)
     else r.!(transform(msg))(None)
   }

From 8e7c212481f3de208319780968575401aa06fe6b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jonas=20Bone=CC=81r?= 
Date: Fri, 1 Apr 2011 14:31:53 +0200
Subject: [PATCH 07/11] Fixed bug with not shutting down remote event handler
 listener properly

---
 .../src/main/scala/akka/remoteinterface/RemoteInterface.scala    | 1 +
 1 file changed, 1 insertion(+)

diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala
index 3ccc60505d..e9e4168995 100644
--- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala
+++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala
@@ -144,6 +144,7 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule
   }
 
   def shutdown {
+    eventHandler.stop
     removeListener(eventHandler)
     this.shutdownClientModule
     this.shutdownServerModule

From 384332da392bc3fb0cbc71a9b2051da1073fbc83 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jonas=20Bone=CC=81r?= 
Date: Fri, 1 Apr 2011 14:48:04 +0200
Subject: [PATCH 08/11] removed JARs added by mistake

---
 .../src/main/scala/Pi.scala                   | 57 +++++++++++--------
 1 file changed, 34 insertions(+), 23 deletions(-)

diff --git a/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala
index fb7dcfcc93..787842e59e 100644
--- a/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala
+++ b/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala
@@ -38,7 +38,7 @@ object Main extends App {
 object Pi  {
   val nrOfWorkers              = 4
   val nrOfMessages             = 10000
-  val lengthOfCalculationRange = 10000
+  val nrOfElements = 10000
 
   // ===== Messages =====
   sealed trait PiMessage
@@ -53,48 +53,59 @@ object Pi  {
   }
 
   // ===== Master =====
-  class Master(nrOfMessages: Int, latch: CountDownLatch) extends Actor {
+  class Master(latch: CountDownLatch) extends Actor {
     var pi: Double = _
-    var count: Int = _
+    var nrOfResults: Int = _
     var start: Long = _
 
+    // create the workers
+    val workers = {
+      val ws = new Array[ActorRef](nrOfWorkers)
+      for (i <- 0 until nrOfWorkers) ws(i) = actorOf[Worker].start
+      ws
+    }
+
+    // wrap them with a load-balancing router
+    val router = Routing.loadBalancerActor(CyclicIterator(workers)).start
+
     def receive = {
+      case Calculate(nrOfMessages, nrOfElements) =>
+        // define the work
+        val fun = (i: Int) => {
+          val range = (i * nrOfElements) to ((i + 1) * nrOfElements - 1)
+          val results = for (j <- range) yield (4 * math.pow(-1, j) / (2 * j + 1))
+          results.sum
+        }
+        // schedule work
+        for (arg <- 0 until nrOfMessages) router ! Work(arg, fun)
+
+        // send a PoisonPill to all workers telling them to shut down themselves
+        router broadcast PoisonPill
+
       case Result(value) =>
         pi += value
-        count += 1
-        if (count == nrOfMessages) self.stop
+        nrOfResults += 1
+        if (nrOfResults == nrOfMessages) self.stop
     }
 
     override def preStart = start = now
 
     override def postStop = {
       EventHandler.info(this, "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis".format(pi, (now - start)))
-      Actor.registry.shutdownAll // shut down all workers
-      latch.countDown
+      latch.nrOfResultsDown
     }
   }
 
   def calculate = {
-    val latch = new CountDownLatch(1)
+    val latch = new nrOfResultsDownLatch(1)
 
     // create the master
-    val master = actorOf(new Master(nrOfMessages, latch)).start
+    val master = actorOf(new Master(latch)).start
 
-    // the master ref is also the 'implicit sender' that the workers should reply to
-    implicit val replyTo = Option(master)
-
-    // create the workers
-    val workers = new Array[ActorRef](nrOfWorkers)
-    for (i <- 0 until nrOfWorkers) workers(i) = actorOf[Worker].start
-
-    // wrap them with a load-balancing router
-    val router = Routing.loadBalancerActor(CyclicIterator(workers)).start
-
-    val fun = (x: Int) => (for (k <- (x * lengthOfCalculationRange) to ((x + 1) * lengthOfCalculationRange - 1)) yield (4 * math.pow(-1, k) / (2 * k + 1))).sum
-
-    // schedule work
-    for (arg <- 0 until nrOfMessages) router ! Work(arg, fun)
+    // start the calculation
+    master ! Calculate(nrOfMessages, nrOfElements)
 
+    // wait for master to shut down
     latch.await
   }
 }

From d97b8fbd9c367aaf0b55ceb47a1aa3f7ff7b8792 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jonas=20Bone=CC=81r?= 
Date: Fri, 1 Apr 2011 15:33:46 +0200
Subject: [PATCH 09/11] Added Routing.Broadcast message and handling to be able
 to broadcast a message to all the actors a load-balancer represents

---
 .gitignore                                    |  1 +
 .../main/scala/akka/routing/Iterators.scala   |  9 ++--
 .../src/main/scala/akka/routing/Routers.scala | 25 ++++++++--
 .../src/main/scala/akka/routing/Routing.scala |  9 ++--
 .../src/main/scala/Pi.scala                   | 49 +++++++------------
 5 files changed, 49 insertions(+), 44 deletions(-)

diff --git a/.gitignore b/.gitignore
index c0fa0f10b4..8613a0ba79 100755
--- a/.gitignore
+++ b/.gitignore
@@ -45,3 +45,4 @@ run-codefellow
 multiverse.log
 .eprj
 .*.swp
+akka-tutorials/akka-tutorial-pi-sbt/project/boot/
\ No newline at end of file
diff --git a/akka-actor/src/main/scala/akka/routing/Iterators.scala b/akka-actor/src/main/scala/akka/routing/Iterators.scala
index 1643c6a6b0..f076ea00c1 100644
--- a/akka-actor/src/main/scala/akka/routing/Iterators.scala
+++ b/akka-actor/src/main/scala/akka/routing/Iterators.scala
@@ -10,12 +10,14 @@ import scala.collection.JavaConversions._
 /**
  * An Iterator that is either always empty or yields an infinite number of Ts.
  */
-trait InfiniteIterator[T] extends Iterator[T]
+trait InfiniteIterator[T] extends Iterator[T] {
+  val items: Seq[T]
+}
 
 /**
  * CyclicIterator is a round-robin style InfiniteIterator that cycles the supplied List.
  */
-case class CyclicIterator[T](items: Seq[T]) extends InfiniteIterator[T] {
+case class CyclicIterator[T](val items: Seq[T]) extends InfiniteIterator[T] {
   def this(items: java.util.List[T]) = this(items.toSeq)
 
   @volatile private[this] var current: Seq[T] = items
@@ -29,14 +31,13 @@ case class CyclicIterator[T](items: Seq[T]) extends InfiniteIterator[T] {
   }
 
   override def exists(f: T => Boolean): Boolean = items.exists(f)
-
 }
 
 /**
  * This InfiniteIterator always returns the Actor that has the currently smallest mailbox
  * useful for work-stealing.
  */
-case class SmallestMailboxFirstIterator(items : Seq[ActorRef]) extends InfiniteIterator[ActorRef] {
+case class SmallestMailboxFirstIterator(val items : Seq[ActorRef]) extends InfiniteIterator[ActorRef] {
   def this(items: java.util.List[ActorRef]) = this(items.toSeq)
   def hasNext = items != Nil
 
diff --git a/akka-actor/src/main/scala/akka/routing/Routers.scala b/akka-actor/src/main/scala/akka/routing/Routers.scala
index 2dcf3d0ccc..57511076e8 100644
--- a/akka-actor/src/main/scala/akka/routing/Routers.scala
+++ b/akka-actor/src/main/scala/akka/routing/Routers.scala
@@ -15,7 +15,11 @@ trait Dispatcher { this: Actor =>
 
   protected def routes: PartialFunction[Any, ActorRef]
 
+  protected def broadcast(message: Any) {}
+
   protected def dispatch: Receive = {
+    case Routing.Broadcast(message) =>
+      broadcast(message)
     case a if routes.isDefinedAt(a) =>
       if (isSenderDefined) routes(a).forward(transform(a))(someSelf)
       else routes(a).!(transform(a))(None)
@@ -34,14 +38,19 @@ abstract class UntypedDispatcher extends UntypedActor {
 
   protected def route(msg: Any): ActorRef
 
+  protected def broadcast(message: Any) {}
+
   private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
 
   @throws(classOf[Exception])
   def onReceive(msg: Any): Unit = {
-    val r = route(msg)
-    if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!")
-    if (isSenderDefined) r.forward(transform(msg))(someSelf)
-    else r.!(transform(msg))(None)
+    if (msg.isInstanceOf[Routing.Broadcast]) broadcast(msg.asInstanceOf[Routing.Broadcast].message)
+    else {
+      val r = route(msg)
+      if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!")
+      if (isSenderDefined) r.forward(transform(msg))(someSelf)
+      else r.!(transform(msg))(None)
+    }
   }
 }
 
@@ -52,7 +61,11 @@ abstract class UntypedDispatcher extends UntypedActor {
 trait LoadBalancer extends Dispatcher { self: Actor =>
   protected def seq: InfiniteIterator[ActorRef]
 
-  protected def routes = { case x if seq.hasNext => seq.next }
+  protected def routes = {
+    case x if seq.hasNext => seq.next
+  }
+
+  override def broadcast(message: Any) = seq.items.foreach(_ ! message)
 
   override def isDefinedAt(msg: Any) = seq.exists( _.isDefinedAt(msg) )
 }
@@ -68,5 +81,7 @@ abstract class UntypedLoadBalancer extends UntypedDispatcher {
     if (seq.hasNext) seq.next
     else null
 
+  override def broadcast(message: Any) = seq.items.foreach(_ ! message)
+
   override def isDefinedAt(msg: Any) = seq.exists( _.isDefinedAt(msg) )
 }
diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala
index 1d43950f8b..2e041a4e35 100644
--- a/akka-actor/src/main/scala/akka/routing/Routing.scala
+++ b/akka-actor/src/main/scala/akka/routing/Routing.scala
@@ -9,6 +9,9 @@ import akka.actor.Actor._
 
 object Routing {
 
+  sealed trait RoutingMessage
+  case class Broadcast(message: Any) extends RoutingMessage
+
   type PF[A, B] = PartialFunction[A, B]
 
   /**
@@ -31,7 +34,7 @@ object Routing {
   /**
    * Creates a LoadBalancer from the thunk-supplied InfiniteIterator.
    */
-   def loadBalancerActor(actors: => InfiniteIterator[ActorRef]): ActorRef =
+  def loadBalancerActor(actors: => InfiniteIterator[ActorRef]): ActorRef =
     actorOf(new Actor with LoadBalancer {
       val seq = actors
     }).start
@@ -39,7 +42,7 @@ object Routing {
   /**
    * Creates a Dispatcher given a routing and a message-transforming function.
    */
-   def dispatcherActor(routing: PF[Any, ActorRef], msgTransformer: (Any) => Any): ActorRef =
+  def dispatcherActor(routing: PF[Any, ActorRef], msgTransformer: (Any) => Any): ActorRef =
     actorOf(new Actor with Dispatcher {
       override def transform(msg: Any) = msgTransformer(msg)
       def routes = routing
@@ -48,7 +51,7 @@ object Routing {
   /**
    * Creates a Dispatcher given a routing.
    */
-   def dispatcherActor(routing: PF[Any, ActorRef]): ActorRef = actorOf(new Actor with Dispatcher {
+  def dispatcherActor(routing: PF[Any, ActorRef]): ActorRef = actorOf(new Actor with Dispatcher {
     def routes = routing
   }).start
 
diff --git a/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala
index 787842e59e..ba4c960ff4 100644
--- a/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala
+++ b/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala
@@ -4,9 +4,10 @@
 
 package akka.tutorial.sbt.pi
 
-import akka.actor.{Actor, ActorRef}
+import akka.actor.{Actor, ActorRef, PoisonPill}
 import Actor._
 import akka.routing.{Routing, CyclicIterator}
+import Routing._
 import akka.event.EventHandler
 import akka.dispatch.Dispatchers
 
@@ -17,38 +18,21 @@ object Main extends App {
   Pi.calculate
 }
 
-/*
-  Pi estimate:    3.1415926435897883
-
-  === 8 workers (with custom dispatcher 4/4)
-  Calculation time:   5163 millis
-
-  === 8 workers (with default dispatcher)
-  Calculation time:   6789 millis
-
-  === 4 workers
-  Calculation time:   5438 millis
-
-  === 2 workers
-  Calculation time:   6002 millis
-
-  === 1 workers
-  Calculation time:   8173 millis
-*/
 object Pi  {
-  val nrOfWorkers              = 4
-  val nrOfMessages             = 10000
+  val nrOfWorkers  = 4
+  val nrOfMessages = 10000
   val nrOfElements = 10000
 
   // ===== Messages =====
   sealed trait PiMessage
+  case class Calculate(nrOfMessages: Int, nrOfElements: Int) extends PiMessage
   case class Work(arg: Int, fun: (Int) => Double) extends PiMessage
   case class Result(value: Double) extends PiMessage
 
   // ===== Worker =====
   class Worker extends Actor {
     def receive = {
-      case Work(arg, fun) => self.reply(Result(fun(arg)))
+      case Work(arg, fun) => self reply Result(fun(arg))
     }
   }
 
@@ -68,19 +52,20 @@ object Pi  {
     // wrap them with a load-balancing router
     val router = Routing.loadBalancerActor(CyclicIterator(workers)).start
 
+    // define the work
+    val algorithm = (i: Int) => {
+      val range = (i * nrOfElements) to ((i + 1) * nrOfElements - 1)
+      val results = for (j <- range) yield (4 * math.pow(-1, j) / (2 * j + 1))
+      results.sum
+    }
+
     def receive = {
       case Calculate(nrOfMessages, nrOfElements) =>
-        // define the work
-        val fun = (i: Int) => {
-          val range = (i * nrOfElements) to ((i + 1) * nrOfElements - 1)
-          val results = for (j <- range) yield (4 * math.pow(-1, j) / (2 * j + 1))
-          results.sum
-        }
         // schedule work
-        for (arg <- 0 until nrOfMessages) router ! Work(arg, fun)
+        for (arg <- 0 until nrOfMessages) router ! Work(arg, algorithm)
 
         // send a PoisonPill to all workers telling them to shut down themselves
-        router broadcast PoisonPill
+        router ! Broadcast(PoisonPill)
 
       case Result(value) =>
         pi += value
@@ -92,12 +77,12 @@ object Pi  {
 
     override def postStop = {
       EventHandler.info(this, "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis".format(pi, (now - start)))
-      latch.nrOfResultsDown
+      latch.countDown
     }
   }
 
   def calculate = {
-    val latch = new nrOfResultsDownLatch(1)
+    val latch = new CountDownLatch(1)
 
     // create the master
     val master = actorOf(new Master(latch)).start

From 0149cb3feeba0b304f8883b032042844920fdfdb Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jonas=20Bone=CC=81r?= 
Date: Fri, 1 Apr 2011 15:49:33 +0200
Subject: [PATCH 10/11] Changed logging level in ReflectiveAccess and set the
 default Akka log level to INFO

---
 .../scala/akka/util/ReflectiveAccess.scala    | 63 +++++++++----------
 config/akka-reference.conf                    |  2 +-
 2 files changed, 32 insertions(+), 33 deletions(-)

diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
index 41d1106818..f4ceba6ebe 100644
--- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
+++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
@@ -6,7 +6,6 @@ package akka.util
 
 import akka.dispatch.{Future, CompletableFuture, MessageInvocation}
 import akka.config.{Config, ModuleNotAvailableException}
-import akka.AkkaException
 
 import java.net.InetSocketAddress
 import akka.remoteinterface.RemoteSupport
@@ -45,13 +44,13 @@ object ReflectiveAccess {
     def ensureEnabled = if (!isEnabled) {
       val e = new ModuleNotAvailableException(
         "Can't load the remoting module, make sure that akka-remote.jar is on the classpath")
-      EventHandler.warning(this, e.toString)
+      EventHandler.debug(this, e.toString)
       throw e
     }
     val remoteSupportClass: Option[Class[_ <: RemoteSupport]] = getClassFor(TRANSPORT)
 
-    protected[akka] val defaultRemoteSupport: Option[() => RemoteSupport] = 
-      remoteSupportClass map { remoteClass => 
+    protected[akka] val defaultRemoteSupport: Option[() => RemoteSupport] =
+      remoteSupportClass map { remoteClass =>
         () => createInstance[RemoteSupport](
           remoteClass,
           Array[Class[_]](),
@@ -59,7 +58,7 @@ object ReflectiveAccess {
         ) getOrElse {
           val e = new ModuleNotAvailableException(
             "Can't instantiate [%s] - make sure that akka-remote.jar is on the classpath".format(remoteClass.getName))
-          EventHandler.warning(this, e.toString)
+          EventHandler.debug(this, e.toString)
           throw e
         }
     }
@@ -135,7 +134,7 @@ object ReflectiveAccess {
     Some(ctor.newInstance(args: _*).asInstanceOf[T])
   } catch {
     case e: Exception =>
-      EventHandler.warning(this, e.toString)
+      EventHandler.debug(this, e.toString)
       None
   }
 
@@ -154,7 +153,7 @@ object ReflectiveAccess {
     }
   } catch {
     case e: Exception =>
-      EventHandler.warning(this, e.toString)
+      EventHandler.debug(this, e.toString)
       None
   }
 
@@ -168,7 +167,7 @@ object ReflectiveAccess {
     }
   } catch {
     case e: ExceptionInInitializerError =>
-      EventHandler.warning(this, e.toString)
+      EventHandler.debug(this, e.toString)
       throw e
   }
 
@@ -176,23 +175,23 @@ object ReflectiveAccess {
     assert(fqn ne null)
 
     // First, use the specified CL
-    val first = try { 
-      Option(classloader.loadClass(fqn).asInstanceOf[Class[T]]) 
-    } catch { 
-      case c: ClassNotFoundException => 
-        EventHandler.warning(this, c.toString)
-        None 
-    } 
+    val first = try {
+      Option(classloader.loadClass(fqn).asInstanceOf[Class[T]])
+    } catch {
+      case c: ClassNotFoundException =>
+        EventHandler.debug(this, c.toString)
+        None
+    }
 
     if (first.isDefined) first
-    else { 
+    else {
       // Second option is to use the ContextClassLoader
-      val second = try { 
-        Option(Thread.currentThread.getContextClassLoader.loadClass(fqn).asInstanceOf[Class[T]]) 
-      } catch { 
-        case c: ClassNotFoundException => 
-          EventHandler.warning(this, c.toString)
-          None 
+      val second = try {
+        Option(Thread.currentThread.getContextClassLoader.loadClass(fqn).asInstanceOf[Class[T]])
+      } catch {
+        case c: ClassNotFoundException =>
+          EventHandler.debug(this, c.toString)
+          None
       }
 
       if (second.isDefined) second
@@ -201,22 +200,22 @@ object ReflectiveAccess {
            // Don't try to use "loader" if we got the default "classloader" parameter
            if (classloader ne loader) Option(loader.loadClass(fqn).asInstanceOf[Class[T]])
           else None
-        } catch { 
-          case c: ClassNotFoundException => 
-            EventHandler.warning(this, c.toString)
-            None 
+        } catch {
+          case c: ClassNotFoundException =>
+            EventHandler.debug(this, c.toString)
+            None
         }
 
         if (third.isDefined) third
         else {
           // Last option is Class.forName
-          try { 
+          try {
             Option(Class.forName(fqn).asInstanceOf[Class[T]])
-          } catch { 
-            case c: ClassNotFoundException => 
-              EventHandler.warning(this, c.toString)
-              None 
-          } 
+          } catch {
+            case c: ClassNotFoundException =>
+              EventHandler.debug(this, c.toString)
+              None
+          }
         }
       }
     }
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index 1c3676ad31..df2c2c3e0d 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -13,7 +13,7 @@ akka {
   time-unit = "seconds"      # Time unit for all timeout properties throughout the config
 
   event-handlers = ["akka.event.EventHandler$DefaultListener"] # event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT)
-  event-handler-level = "DEBUG" # Options: ERROR, WARNING, INFO, DEBUG
+  event-handler-level = "INFO" # Options: ERROR, WARNING, INFO, DEBUG
 
   # These boot classes are loaded (and created) automatically when the Akka Microkernel boots up
   #     Can be used to bootstrap your application(s)

From d859a9cb1372ed88f037aa3182c1b2e45ae8e5d8 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jonas=20Bone=CC=81r?= 
Date: Fri, 1 Apr 2011 16:02:55 +0200
Subject: [PATCH 11/11] Added some comments and scaladoc to Pi tutorial sample

---
 .../src/main/scala/Pi.scala                   | 38 +++++++++++++++++--
 1 file changed, 34 insertions(+), 4 deletions(-)

diff --git a/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala
index ba4c960ff4..1b84ae5008 100644
--- a/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala
+++ b/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala
@@ -14,29 +14,48 @@ import akka.dispatch.Dispatchers
 import System.{currentTimeMillis => now}
 import java.util.concurrent.CountDownLatch
 
-object Main extends App {
-  Pi.calculate
-}
-
+/**
+ * Sample for Akka, SBT an Scala tutorial.
+ * 

+ * Calculates Pi. + *

+ * Run it in SBT: + *

+ *   $ sbt
+ *   > update
+ *   > console
+ *   > akka.tutorial.sbt.pi.Pi.calculate
+ *   > ...
+ *   > :quit
+ * 
+ * + * @author Jonas Bonér + */ object Pi { val nrOfWorkers = 4 val nrOfMessages = 10000 val nrOfElements = 10000 + // ==================== // ===== Messages ===== + // ==================== sealed trait PiMessage case class Calculate(nrOfMessages: Int, nrOfElements: Int) extends PiMessage case class Work(arg: Int, fun: (Int) => Double) extends PiMessage case class Result(value: Double) extends PiMessage + // ================== // ===== Worker ===== + // ================== class Worker extends Actor { def receive = { case Work(arg, fun) => self reply Result(fun(arg)) } } + // ================== // ===== Master ===== + // ================== class Master(latch: CountDownLatch) extends Actor { var pi: Double = _ var nrOfResults: Int = _ @@ -59,6 +78,7 @@ object Pi { results.sum } + // message handler def receive = { case Calculate(nrOfMessages, nrOfElements) => // schedule work @@ -68,6 +88,7 @@ object Pi { router ! Broadcast(PoisonPill) case Result(value) => + // handle result from the worker pi += value nrOfResults += 1 if (nrOfResults == nrOfMessages) self.stop @@ -76,12 +97,17 @@ object Pi { override def preStart = start = now override def postStop = { + // tell the world that the calculation is complete EventHandler.info(this, "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis".format(pi, (now - start))) latch.countDown } } + // ================== + // ===== Run it ===== + // ================== def calculate = { + // this latch is only plumbing to know when the calculation is completed val latch = new CountDownLatch(1) // create the master @@ -95,3 +121,7 @@ object Pi { } } +// To be able to run it as a main application +object Main extends App { + Pi.calculate +}