diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala index 3f2f8e57db..c4e8ade76e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala @@ -8,8 +8,8 @@ import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers import org.scalatest.BeforeAndAfterEach -import akka.testing._ -import akka.testing.Testing.sleepFor +import akka.testkit._ +import akka.testkit.Testing.sleepFor import akka.util.duration._ import Actor._ diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala index f0dda16f9b..7864ea67e7 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala @@ -7,9 +7,9 @@ package akka.actor import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers -import akka.testing._ +import akka.testkit._ import akka.util.duration._ -import akka.testing.Testing.sleepFor +import akka.testkit.Testing.sleepFor import akka.config.Supervision.{ OneForOneStrategy } import akka.dispatch.Future import java.util.concurrent.{ TimeUnit, CountDownLatch } diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorTimeoutSpec.scala index 707fb8f54c..4ff21e2a61 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorTimeoutSpec.scala @@ -29,7 +29,7 @@ class ActorTimeoutSpec "use the global default timeout if no implicit in scope" in { echo.timeout = 12 - within((Actor.TIMEOUT - 100).millis, (Actor.TIMEOUT + 300).millis) { + within((Actor.TIMEOUT - 100).millis, (Actor.TIMEOUT + 400).millis) { val f = echo ? "hallo" intercept[FutureTimeoutException] { f.await } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/FSMActorSpec.scala index 8e97f05500..3415d37702 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/FSMActorSpec.scala @@ -4,14 +4,15 @@ package akka.actor -import org.scalatest.WordSpec +import org.scalatest.{ WordSpec, BeforeAndAfterAll, BeforeAndAfterEach } import org.scalatest.matchers.MustMatchers -import akka.testing._ +import akka.testkit._ import FSM._ import akka.util.Duration import akka.util.duration._ +import akka.event._ object FSMActorSpec { @@ -57,8 +58,9 @@ object FSMActorSpec { } whenUnhandled { - case Event(_, stateData) ⇒ { + case Ev(msg) ⇒ { unhandledLatch.open + EventHandler.info(this, "unhandled event " + msg + " in state " + stateName + " with data " + stateData) stay } } @@ -95,9 +97,32 @@ object FSMActorSpec { case class CodeState(soFar: String, code: String) } -class FSMActorSpec extends WordSpec with MustMatchers { +class FSMActorSpec extends WordSpec with MustMatchers with TestKit with BeforeAndAfterAll with BeforeAndAfterEach { import FSMActorSpec._ + val eh_level = EventHandler.level + var logger: ActorRef = _ + + override def afterEach { + EventHandler.level = eh_level + if (logger ne null) { + EventHandler.removeListener(logger) + logger = null + } + } + + override def beforeAll { + val f = FSM.getClass.getDeclaredField("debugEvent") + f.setAccessible(true) + f.setBoolean(FSM, true) + } + + override def afterAll { + val f = FSM.getClass.getDeclaredField("debugEvent") + f.setAccessible(true) + f.setBoolean(FSM, false) + } + "An FSM Actor" must { "unlock the lock" in { @@ -145,5 +170,98 @@ class FSMActorSpec extends WordSpec with MustMatchers { tester ! Bye terminatedLatch.await } + + "log termination" in { + val fsm = TestActorRef(new Actor with FSM[Int, Null] { + startWith(1, null) + when(1) { + case Ev("go") ⇒ goto(2) + } + }).start() + logger = Actor.actorOf(new Actor { + def receive = { + case x ⇒ testActor forward x + } + }) + EventHandler.addListener(logger) + fsm ! "go" + expectMsgPF(1 second) { + case EventHandler.Error(_: EventHandler.EventHandlerException, ref, "Next state 2 does not exist") if ref eq fsm.underlyingActor ⇒ true + } + } + + "run onTermination upon ActorRef.stop()" in { + lazy val fsm = new Actor with FSM[Int, Null] { + startWith(1, null) + when(1) { NullFunction } + onTermination { + case x ⇒ testActor ! x + } + } + val ref = Actor.actorOf(fsm).start() + ref.stop() + expectMsg(1 second, fsm.StopEvent(Shutdown, 1, null)) + } + + "log events and transitions if asked to do so" in { + val fsmref = TestActorRef(new Actor with LoggingFSM[Int, Null] { + startWith(1, null) + when(1) { + case Ev("go") ⇒ + setTimer("t", Shutdown, 1.5 seconds, false) + goto(2) + } + when(2) { + case Ev("stop") ⇒ + cancelTimer("t") + stop + } + onTermination { + case StopEvent(r, _, _) ⇒ testActor ! r + } + }).start() + val fsm = fsmref.underlyingActor + logger = Actor.actorOf(new Actor { + def receive = { + case x ⇒ testActor forward x + } + }) + EventHandler.addListener(logger) + EventHandler.level = EventHandler.DebugLevel + fsmref ! "go" + expectMsgPF(1 second) { + case EventHandler.Debug(`fsm`, s: String) if s.startsWith("processing Event(go,null) from Actor[testActor") ⇒ true + } + expectMsg(1 second, EventHandler.Debug(fsm, "setting timer 't'/1500 milliseconds: Shutdown")) + expectMsg(1 second, EventHandler.Debug(fsm, "transition 1 -> 2")) + fsmref ! "stop" + expectMsgPF(1 second) { + case EventHandler.Debug(`fsm`, s: String) if s.startsWith("processing Event(stop,null) from Actor[testActor") ⇒ true + } + expectMsgAllOf(1 second, EventHandler.Debug(fsm, "canceling timer 't'"), Normal) + expectNoMsg(1 second) + } + + "fill rolling event log and hand it out" in { + val fsmref = TestActorRef(new Actor with LoggingFSM[Int, Int] { + override def logDepth = 3 + startWith(1, 0) + when(1) { + case Event("count", c) ⇒ stay using (c + 1) + case Event("log", _) ⇒ stay replying getLog + } + }).start() + fsmref ! "log" + val fsm = fsmref.underlyingActor + expectMsg(1 second, IndexedSeq(LogEntry(1, 0, "log"))) + fsmref ! "count" + fsmref ! "log" + expectMsg(1 second, IndexedSeq(LogEntry(1, 0, "log"), LogEntry(1, 0, "count"), LogEntry(1, 1, "log"))) + fsmref ! "count" + fsmref ! "log" + expectMsg(1 second, IndexedSeq(LogEntry(1, 1, "log"), LogEntry(1, 1, "count"), LogEntry(1, 2, "log"))) + } + } + } diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTimingSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTimingSpec.scala index 277329cbe7..1c09abb227 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTimingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTimingSpec.scala @@ -26,7 +26,7 @@ class FSMTimingSpec extends WordSpec with MustMatchers with TestKit { "A Finite State Machine" must { "receive StateTimeout" in { - within(50 millis, 150 millis) { + within(50 millis, 250 millis) { fsm ! TestStateTimeout expectMsg(Transition(fsm, TestStateTimeout, Initial)) expectNoMsg @@ -54,6 +54,17 @@ class FSMTimingSpec extends WordSpec with MustMatchers with TestKit { } } + "correctly cancel a named timer" in { + fsm ! TestCancelTimer + within(100 millis, 200 millis) { + fsm ! Tick + expectMsg(Tick) + expectMsg(Tock) + } + fsm ! Cancel + expectMsg(Transition(fsm, TestCancelTimer, Initial)) + } + "receive and cancel a repeated timer" in { fsm ! TestRepeatedTimer val seq = receiveWhile(600 millis) { @@ -68,21 +79,21 @@ class FSMTimingSpec extends WordSpec with MustMatchers with TestKit { "notify unhandled messages" in { fsm ! TestUnhandled - within(100 millis) { + within(200 millis) { fsm ! Tick expectNoMsg } - within(100 millis) { + within(200 millis) { fsm ! SetHandler fsm ! Tick expectMsg(Unhandled(Tick)) expectNoMsg } - within(100 millis) { + within(200 millis) { fsm ! Unhandled("test") expectNoMsg } - within(100 millis) { + within(200 millis) { fsm ! Cancel expectMsg(Transition(fsm, TestUnhandled, Initial)) } @@ -101,8 +112,10 @@ object FSMTimingSpec { case object TestSingleTimer extends State case object TestRepeatedTimer extends State case object TestUnhandled extends State + case object TestCancelTimer extends State case object Tick + case object Tock case object Cancel case object SetHandler @@ -132,6 +145,21 @@ object FSMTimingSpec { tester ! Tick goto(Initial) } + when(TestCancelTimer) { + case Ev(Tick) ⇒ + tester ! Tick + setTimer("hallo", Tock, 1 milli, false) + Thread.sleep(10); + cancelTimer("hallo") + setTimer("hallo", Tock, 100 millis, false) + stay + case Ev(Tock) ⇒ + tester ! Tock + stay + case Ev(Cancel) ⇒ + cancelTimer("hallo") + goto(Initial) + } when(TestRepeatedTimer) { case Event(Tick, remaining) ⇒ tester ! Tick @@ -151,6 +179,7 @@ object FSMTimingSpec { } stay case Ev(Cancel) ⇒ + whenUnhandled(NullFunction) goto(Initial) } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTransitionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTransitionSpec.scala index c9307ce02f..0cbaddd8e6 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTransitionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTransitionSpec.scala @@ -6,7 +6,7 @@ package akka.actor import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers -import akka.testing._ +import akka.testkit._ import akka.testkit._ import akka.util.duration._ import akka.config.Supervision._ diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ForwardActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ForwardActorSpec.scala index eea3355b74..78a1fbfd0a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ForwardActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ForwardActorSpec.scala @@ -7,7 +7,7 @@ package akka.actor import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers -import akka.testing._ +import akka.testkit._ import akka.util.duration._ import Actor._ diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/HotSwapSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/HotSwapSpec.scala index 2e5babcc34..53ce1fbbfd 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/HotSwapSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/HotSwapSpec.scala @@ -7,7 +7,7 @@ package akka.actor import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers -import akka.testing._ +import akka.testkit._ import Actor._ diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/LoggingReceiveSpec.scala new file mode 100644 index 0000000000..748ed13941 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/LoggingReceiveSpec.scala @@ -0,0 +1,169 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ +package akka.actor + +import org.scalatest.{ WordSpec, BeforeAndAfterAll, BeforeAndAfterEach } +import org.scalatest.matchers.MustMatchers +import akka.testkit.{ TestKit, TestActorRef } +import akka.event.EventHandler +import Actor._ +import akka.util.duration._ +import akka.config.Config.config +import akka.config.Supervision._ + +class LoggingReceiveSpec + extends WordSpec + with BeforeAndAfterEach + with BeforeAndAfterAll + with MustMatchers + with TestKit { + + val level = EventHandler.level + + override def beforeAll { + EventHandler.addListener(testActor) + EventHandler.level = EventHandler.DebugLevel + } + + override def afterAll { + EventHandler.removeListener(testActor) + EventHandler.level = level + } + + override def afterEach { + val f1 = Actor.getClass.getDeclaredField("addLoggingReceive") + f1.setAccessible(true) + f1.setBoolean(Actor, false) + val f2 = Actor.getClass.getDeclaredField("debugAutoReceive") + f2.setAccessible(true) + f2.setBoolean(Actor, false) + val f3 = Actor.getClass.getDeclaredField("debugLifecycle") + f3.setAccessible(true) + f3.setBoolean(Actor, false) + } + + ignoreMsg { + case EventHandler.Debug(_, s: String) ⇒ + !s.startsWith("received") && s != "started" && s != "stopping" && s != "restarting" && + s != "restarted" && !s.startsWith("now supervising") && !s.startsWith("stopped supervising") + case EventHandler.Debug(_, _) ⇒ true + case EventHandler.Error(_: UnhandledMessageException, _, _) ⇒ false + case _: EventHandler.Error ⇒ true + } + + "A LoggingReceive" must { + + "decorate a Receive" in { + val r: Receive = { + case null ⇒ + } + val log = LoggingReceive(this, r) + log.isDefinedAt("hallo") + expectMsg(1 second, EventHandler.Debug(this, "received unhandled message hallo")) + } + + "be added on Actor if requested" in { + val f = Actor.getClass.getDeclaredField("addLoggingReceive") + f.setAccessible(true) + f.setBoolean(Actor, true) + val actor = TestActorRef(new Actor { + def receive = loggable(this) { + case _ ⇒ self reply "x" + } + }).start() + actor ! "buh" + within(1 second) { + expectMsg(EventHandler.Debug(actor.underlyingActor, "received handled message buh")) + expectMsg("x") + } + val r: Receive = { + case null ⇒ + } + actor ! HotSwap(_ ⇒ r, false) + actor ! "bah" + within(300 millis) { + expectMsgPF() { + case EventHandler.Error(ex: UnhandledMessageException, ref, "bah") if ref eq actor ⇒ true + } + } + actor.stop() + } + + "not duplicate logging" in { + val f = Actor.getClass.getDeclaredField("addLoggingReceive") + f.setAccessible(true) + f.setBoolean(Actor, true) + val actor = TestActorRef(new Actor { + def receive = loggable(this)(loggable(this) { + case _ ⇒ self reply "x" + }) + }).start() + actor ! "buh" + within(1 second) { + expectMsg(EventHandler.Debug(actor.underlyingActor, "received handled message buh")) + expectMsg("x") + } + } + + } + + "An Actor" must { + + "log AutoReceiveMessages if requested" in { + val f = Actor.getClass.getDeclaredField("debugAutoReceive") + f.setAccessible(true) + f.setBoolean(Actor, true) + val actor = TestActorRef(new Actor { + def receive = { + case _ ⇒ + } + }).start() + actor ! PoisonPill + expectMsg(300 millis, EventHandler.Debug(actor.underlyingActor, "received AutoReceiveMessage PoisonPill")) + } + + "log LifeCycle changes if requested" in { + within(2 seconds) { + val supervisor = TestActorRef(new Actor { + self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), 5, 5000) + def receive = { + case _ ⇒ + } + }).start() + val f = Actor.getClass.getDeclaredField("debugLifecycle") + f.setAccessible(true) + f.setBoolean(Actor, true) + val actor = TestActorRef(new Actor { + def receive = { + case _ ⇒ + } + }).start() + val actor1 = actor.underlyingActor + expectMsg(EventHandler.Debug(actor1, "started")) + supervisor link actor + expectMsgPF() { + case EventHandler.Debug(ref, msg: String) ⇒ + ref == supervisor.underlyingActor && msg.startsWith("now supervising") + } + actor ! Kill + expectMsg(EventHandler.Debug(actor1, "restarting")) + awaitCond(msgAvailable) + val actor2 = actor.underlyingActor + expectMsgPF() { + case EventHandler.Debug(ref, "restarted") if ref eq actor2 ⇒ true + } + supervisor unlink actor + expectMsgPF() { + case EventHandler.Debug(ref, msg: String) ⇒ + ref == supervisor.underlyingActor && msg.startsWith("stopped supervising") + } + actor.stop() + expectMsg(EventHandler.Debug(actor2, "stopping")) + supervisor.stop() + } + } + + } + +} diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala index 491d37a6a3..a03193d4a6 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala @@ -7,7 +7,7 @@ package akka.actor import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers -import akka.testing._ +import akka.testkit._ import akka.util.duration._ import Actor._ diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala index 8c5f397387..d3f3d65c27 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala @@ -8,8 +8,8 @@ import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers import org.scalatest.BeforeAndAfterEach -import akka.testing._ -import akka.testing.Testing.{ testMillis, sleepFor } +import akka.testkit._ +import akka.testkit.Testing.sleepFor import akka.util.duration._ import akka.config.Supervision._ import akka.{ Die, Ping } @@ -20,7 +20,7 @@ import java.util.concurrent.LinkedBlockingQueue object SupervisorSpec { val Timeout = 5 seconds - val TimeoutMillis = testMillis(Timeout).toInt + val TimeoutMillis = Timeout.dilated.toMillis.toInt // ===================================================== // Message logs @@ -57,7 +57,7 @@ object SupervisorSpec { } class Master extends Actor { - self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, testMillis(1 second).toInt) + self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, (1 second).dilated.toMillis.toInt) val temp = { val a = actorOf[TemporaryActor] diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala index f55f8940cf..d8aaa9d0e4 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala @@ -7,7 +7,7 @@ import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers import akka.util.duration._ -import akka.testing.Testing.sleepFor +import akka.testkit.Testing.sleepFor import akka.dispatch.Dispatchers import akka.config.Supervision.{ SupervisorConfig, OneForOneStrategy, Supervise, Permanent } import Actor._ diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala index a6e24302f6..61d0da3555 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -6,7 +6,7 @@ package akka.actor.dispatch import org.scalatest.junit.JUnitSuite import org.junit.Test import org.scalatest.Assertions._ -import akka.testing._ +import akka.testkit.Testing import akka.dispatch._ import akka.actor.Actor._ import java.util.concurrent.atomic.AtomicLong diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 9c1704b7f5..3544767453 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -3,8 +3,8 @@ package akka.actor.routing import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers -import akka.testing._ -import akka.testing.Testing.{ sleepFor, testMillis } +import akka.testkit._ +import akka.testkit.Testing.sleepFor import akka.util.duration._ import akka.actor._ @@ -55,7 +55,7 @@ class RoutingSpec extends WordSpec with MustMatchers { case Test3 ⇒ t2 }.start() - implicit val timeout = Actor.Timeout(testMillis(5 seconds)) + implicit val timeout = Actor.Timeout((5 seconds).dilated) val result = for { a ← (d ? (Test1)).as[Int] b ← (d ? (Test2)).as[Int] @@ -118,10 +118,12 @@ class RoutingSpec extends WordSpec with MustMatchers { for (i ← 1 to 500) d ! i - latch.await(10 seconds) - - // because t1 is much slower and thus has a bigger mailbox all the time - t1Count.get must be < (t2Count.get) + try { + latch.await(10 seconds) + } finally { + // because t1 is much slower and thus has a bigger mailbox all the time + t1Count.get must be < (t2Count.get) + } for (a ← List(t1, t2, d)) a.stop() } diff --git a/akka-actor/src/main/scala/akka/AkkaException.scala b/akka-actor/src/main/scala/akka/AkkaException.scala index 71da1fa417..77fa99d0f9 100644 --- a/akka-actor/src/main/scala/akka/AkkaException.scala +++ b/akka-actor/src/main/scala/akka/AkkaException.scala @@ -11,7 +11,8 @@ import java.net.{ InetAddress, UnknownHostException } * Akka base Exception. Each Exception gets: * * * @author Jonas Bonér @@ -20,6 +21,9 @@ class AkkaException(message: String = "", cause: Throwable = null) extends Runti val uuid = "%s_%s".format(AkkaException.hostname, newUuid) override lazy val toString = + "%s: %s\n[%s]".format(getClass.getName, message, uuid) + + lazy val toLongString = "%s: %s\n[%s]\n%s".format(getClass.getName, message, uuid, stackTraceToString) def stackTraceToString = { diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index f8f09c5e56..2023d95bfd 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -178,7 +178,45 @@ object Actor extends ListenerManagement { private[akka] lazy val remote: RemoteSupport = cluster.remoteService /** - * Creates an ActorRef out of the Actor with type T. + * This decorator adds invocation logging to a Receive function. + */ + class LoggingReceive(source: AnyRef, r: Receive) extends Receive { + def isDefinedAt(o: Any) = { + val handled = r.isDefinedAt(o) + EventHandler.debug(source, "received " + (if (handled) "handled" else "unhandled") + " message " + o) + handled + } + def apply(o: Any): Unit = r(o) + } + object LoggingReceive { + def apply(source: AnyRef, r: Receive): Receive = r match { + case _: LoggingReceive ⇒ r + case _ ⇒ new LoggingReceive(source, r) + } + } + + /** + * Wrap a Receive partial function in a logging enclosure, which sends a + * debug message to the EventHandler each time before a message is matched. + * This includes messages which are not handled. + * + *

+   * def receive = loggable {
+   *   case x => ...
+   * }
+   * 
+ * + * This method does NOT modify the given Receive unless + * akka.actor.debug.receive is set within akka.conf. + */ + def loggable(self: AnyRef)(r: Receive): Receive = if (addLoggingReceive) LoggingReceive(self, r) else r + + private[akka] val addLoggingReceive = config.getBool("akka.actor.debug.receive", false) + private[akka] val debugAutoReceive = config.getBool("akka.actor.debug.autoreceive", false) + private[akka] val debugLifecycle = config.getBool("akka.actor.debug.lifecycle", false) + + /** + * Creates an ActorRef out of the Actor with type T. *
    *   import Actor._
    *   val actor = actorOf[MyActor]
@@ -508,6 +546,8 @@ object Actor extends ListenerManagement {
  */
 trait Actor {
 
+  import Actor.{ addLoggingReceive, debugAutoReceive, LoggingReceive }
+
   /**
    * Type alias because traits cannot have companion objects.
    */
@@ -675,6 +715,8 @@ trait Actor {
   }
 
   private final def autoReceiveMessage(msg: AutoReceivedMessage) {
+    if (debugAutoReceive)
+      EventHandler.debug(this, "received AutoReceiveMessage " + msg)
     msg match {
       case HotSwap(code, discardOld) ⇒ become(code(self), discardOld)
       case RevertHotSwap             ⇒ unbecome()
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index 821053219d..1f9e87d50b 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -293,7 +293,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
   /**
    * Starts up the actor and its message queue.
    */
-  def start(): ActorRef
+  def start(): this.type
 
   /**
    * Shuts down the actor its dispatcher and message queue.
@@ -542,7 +542,7 @@ class LocalActorRef private[akka] (
   /**
    * Starts up the actor and its message queue.
    */
-  def start(): ActorRef = guard.withGuard {
+  def start(): this.type = guard.withGuard[this.type] {
     if (isShutdown) throw new ActorStartException(
       "Can't restart an actor that has been shut down with 'stop' or 'exit'")
     if (!isRunning) {
@@ -570,7 +570,9 @@ class LocalActorRef private[akka] (
         dispatcher.detach(this)
         _status = ActorRefInternals.SHUTDOWN
         try {
-          actor.postStop()
+          val a = actor
+          if (Actor.debugLifecycle) EventHandler.debug(a, "stopping")
+          a.postStop()
         } finally {
           currentMessage = null
           Actor.registry.unregister(this)
@@ -610,6 +612,7 @@ class LocalActorRef private[akka] (
         actorRef.supervisor = Some(this)
       }
     }
+    if (Actor.debugLifecycle) EventHandler.debug(actor, "now supervising " + actorRef)
   }
 
   /**
@@ -622,6 +625,7 @@ class LocalActorRef private[akka] (
       if (_linkedActors.remove(actorRef.uuid) eq null)
         throw new IllegalActorStateException("Actor [" + actorRef + "] is not a linked actor, can't unlink")
       actorRef.supervisor = None
+      if (Actor.debugLifecycle) EventHandler.debug(actor, "stopped supervising " + actorRef)
     }
   }
 
@@ -700,7 +704,7 @@ class LocalActorRef private[akka] (
           }
         } catch {
           case e ⇒
-            EventHandler.error(e, this, messageHandle.message.toString)
+            EventHandler.error(e, actor, messageHandle.message.toString)
             throw e
         }
       }
@@ -761,12 +765,14 @@ class LocalActorRef private[akka] (
   protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
     def performRestart() {
       val failedActor = actorInstance.get
+      if (Actor.debugLifecycle) EventHandler.debug(failedActor, "restarting")
       failedActor.preRestart(reason)
       val freshActor = newActor
       setActorSelfFields(failedActor, null) // Only null out the references if we could instantiate the new actor
       actorInstance.set(freshActor) // Assign it here so if preStart fails, we can null out the sef-refs next call
       freshActor.preStart()
       freshActor.postRestart(reason)
+      if (Actor.debugLifecycle) EventHandler.debug(freshActor, "restarted")
     }
 
     def tooManyRestarts() {
@@ -924,7 +930,9 @@ class LocalActorRef private[akka] (
   }
 
   private def initializeActorInstance() {
-    actor.preStart() // run actor preStart
+    val a = actor
+    if (Actor.debugLifecycle) EventHandler.debug(a, "started")
+    a.preStart() // run actor preStart
     Actor.registry.register(this)
   }
 
@@ -996,7 +1004,7 @@ private[akka] case class RemoteActorRef private[akka] (
     else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
   }
 
-  def start(): ActorRef = synchronized {
+  def start(): this.type = synchronized[this.type] {
     _status = ActorRefInternals.RUNNING
     this
   }
diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala
index 8317fc5fe3..b7e94e864e 100644
--- a/akka-actor/src/main/scala/akka/actor/FSM.scala
+++ b/akka-actor/src/main/scala/akka/actor/FSM.scala
@@ -5,12 +5,18 @@ package akka.actor
 
 import akka.util._
 import akka.event.EventHandler
+import akka.config.Config.config
 
 import scala.collection.mutable
 import java.util.concurrent.ScheduledFuture
 
 object FSM {
 
+  object NullFunction extends PartialFunction[Any, Nothing] {
+    def isDefinedAt(o: Any) = false
+    def apply(o: Any) = sys.error("undefined")
+  }
+
   case class CurrentState[S](fsmRef: ActorRef, state: S)
   case class Transition[S](fsmRef: ActorRef, from: S, to: S)
   case class SubscribeTransitionCallBack(actorRef: ActorRef)
@@ -24,7 +30,7 @@ object FSM {
   case object StateTimeout
   case class TimeoutMarker(generation: Long)
 
-  case class Timer(name: String, msg: AnyRef, repeat: Boolean) {
+  case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int) {
     private var ref: Option[ScheduledFuture[AnyRef]] = _
 
     def schedule(actor: ActorRef, timeout: Duration) {
@@ -57,6 +63,44 @@ object FSM {
   * for derived classes.
   */
   implicit def d2od(d: Duration): Option[Duration] = Some(d)
+
+  val debugEvent = config.getBool("akka.actor.debug.fsm", false)
+
+  case class LogEntry[S, D](stateName: S, stateData: D, event: Any)
+
+  case class State[S, D](stateName: S, stateData: D, timeout: Option[Duration] = None, stopReason: Option[Reason] = None, replies: List[Any] = Nil) {
+
+    /**
+     * Modify state transition descriptor to include a state timeout for the
+     * next state. This timeout overrides any default timeout set for the next
+     * state.
+     */
+    def forMax(timeout: Duration): State[S, D] = {
+      copy(timeout = Some(timeout))
+    }
+
+    /**
+     * Send reply to sender of the current message, if available.
+     *
+     * @return this state transition descriptor
+     */
+    def replying(replyValue: Any): State[S, D] = {
+      copy(replies = replyValue :: replies)
+    }
+
+    /**
+     * Modify state transition descriptor with new state data. The data will be
+     * set when transitioning to the new state.
+     */
+    def using(nextStateDate: D): State[S, D] = {
+      copy(stateData = nextStateDate)
+    }
+
+    private[akka] def withStopReason(reason: Reason): State[S, D] = {
+      copy(stopReason = Some(reason))
+    }
+  }
+
 }
 
 /**
@@ -143,7 +187,8 @@ trait FSM[S, D] extends ListenerManagement {
 
   import FSM._
 
-  type StateFunction = scala.PartialFunction[Event[D], State]
+  type State = FSM.State[S, D]
+  type StateFunction = scala.PartialFunction[Event, State]
   type Timeout = Option[Duration]
   type TransitionHandler = PartialFunction[(S, S), Unit]
 
@@ -177,7 +222,7 @@ trait FSM[S, D] extends ListenerManagement {
   protected final def startWith(stateName: S,
                                 stateData: D,
                                 timeout: Timeout = None) = {
-    currentState = State(stateName, stateData, timeout)
+    currentState = FSM.State(stateName, stateData, timeout)
   }
 
   /**
@@ -188,7 +233,7 @@ trait FSM[S, D] extends ListenerManagement {
    * @return state transition descriptor
    */
   protected final def goto(nextStateName: S): State = {
-    State(nextStateName, currentState.stateData)
+    FSM.State(nextStateName, currentState.stateData)
   }
 
   /**
@@ -231,11 +276,11 @@ trait FSM[S, D] extends ListenerManagement {
    * @param repeat send once if false, scheduleAtFixedRate if true
    * @return current state descriptor
    */
-  protected final def setTimer(name: String, msg: AnyRef, timeout: Duration, repeat: Boolean): State = {
+  protected[akka] def setTimer(name: String, msg: Any, timeout: Duration, repeat: Boolean): State = {
     if (timers contains name) {
       timers(name).cancel
     }
-    val timer = Timer(name, msg, repeat)
+    val timer = Timer(name, msg, repeat, timerGen.next)
     timer.schedule(self, timeout)
     timers(name) = timer
     stay
@@ -245,7 +290,7 @@ trait FSM[S, D] extends ListenerManagement {
    * Cancel named timer, ensuring that the message is not subsequently delivered (no race).
    * @param name of the timer to cancel
    */
-  protected final def cancelTimer(name: String) = {
+  protected[akka] def cancelTimer(name: String) = {
     if (timers contains name) {
       timers(name).cancel
       timers -= name
@@ -257,7 +302,7 @@ trait FSM[S, D] extends ListenerManagement {
    * timer does not exist, has previously been canceled or if it was a
    * single-shot timer whose message was already received.
    */
-  protected final def timerActive_?(name: String) = timers contains name
+  protected[akka] final def timerActive_?(name: String) = timers contains name
 
   /**
    * Set state timeout explicitly. This method can safely be used from within a
@@ -301,7 +346,7 @@ trait FSM[S, D] extends ListenerManagement {
    * function literal. To be used with onTransition.
    */
   implicit protected final def total2pf(transitionHandler: (S, S) ⇒ Unit) =
-    new PartialFunction[(S, S), Unit] {
+    new TransitionHandler {
       def isDefinedAt(in: (S, S)) = true
       def apply(in: (S, S)) { transitionHandler(in._1, in._2) }
     }
@@ -324,11 +369,21 @@ trait FSM[S, D] extends ListenerManagement {
    * Verify existence of initial state and setup timers. This should be the
    * last call within the constructor.
    */
-  def initialize {
+  protected final def initialize {
     makeTransition(currentState)
   }
 
   /**
+   * Return current state name (i.e. object of type S)
+   */
+  protected[akka] def stateName: S = currentState.stateName
+
+  /**
+   * Return current state data (i.e. object of type D)
+   */
+  protected[akka] def stateData: D = currentState.stateData
+
+  /*
    * ****************************************************************
    *                PRIVATE IMPLEMENTATION DETAILS
    * ****************************************************************
@@ -345,6 +400,7 @@ trait FSM[S, D] extends ListenerManagement {
    * Timer handling
    */
   private val timers = mutable.Map[String, Timer]()
+  private val timerGen = Iterator from 0
 
   /*
    * State definitions
@@ -374,10 +430,7 @@ trait FSM[S, D] extends ListenerManagement {
   /*
    * termination handling
    */
-  private var terminateEvent: PartialFunction[StopEvent[S, D], Unit] = {
-    case StopEvent(Failure(cause), _, _) ⇒
-    case StopEvent(reason, _, _)         ⇒
-  }
+  private var terminateEvent: PartialFunction[StopEvent[S, D], Unit] = NullFunction
 
   /*
    * transition handling
@@ -391,7 +444,7 @@ trait FSM[S, D] extends ListenerManagement {
   // ListenerManagement shall not start() or stop() listener actors
   override protected val manageLifeCycleOfListeners = false
 
-  /**
+  /*
    * *******************************************
    *       Main actor receive() method
    * *******************************************
@@ -399,11 +452,11 @@ trait FSM[S, D] extends ListenerManagement {
   override final protected def receive: Receive = {
     case TimeoutMarker(gen) ⇒
       if (generation == gen) {
-        processEvent(StateTimeout)
+        processMsg(StateTimeout, "state timeout")
       }
-    case t@Timer(name, msg, repeat) ⇒
-      if (timerActive_?(name)) {
-        processEvent(msg)
+    case t@Timer(name, msg, repeat, generation) ⇒
+      if ((timers contains name) && (timers(name).generation == generation)) {
+        processMsg(msg, t)
         if (!repeat) {
           timers -= name
         }
@@ -425,12 +478,16 @@ trait FSM[S, D] extends ListenerManagement {
         timeoutFuture = None
       }
       generation += 1
-      processEvent(value)
+      processMsg(value, self.channel)
     }
   }
 
-  private def processEvent(value: Any) = {
+  private def processMsg(value: Any, source: AnyRef): Unit = {
     val event = Event(value, currentState.stateData)
+    processEvent(event, source)
+  }
+
+  private[akka] def processEvent(event: Event, source: AnyRef): Unit = {
     val stateFunc = stateFunctions(currentState.stateName)
     val nextState = if (stateFunc isDefinedAt event) {
       stateFunc(event)
@@ -438,81 +495,140 @@ trait FSM[S, D] extends ListenerManagement {
       // handleEventDefault ensures that this is always defined
       handleEvent(event)
     }
+    applyState(nextState)
+  }
+
+  private[akka] def applyState(nextState: State): Unit = {
     nextState.stopReason match {
-      case Some(reason) ⇒ terminate(reason)
-      case None         ⇒ makeTransition(nextState)
+      case None ⇒ makeTransition(nextState)
+      case _ ⇒
+        nextState.replies.reverse foreach (self reply _)
+        terminate(nextState)
+        self.stop()
     }
   }
 
-  private def makeTransition(nextState: State) = {
+  private[akka] def makeTransition(nextState: State): Unit = {
     if (!stateFunctions.contains(nextState.stateName)) {
-      terminate(Failure("Next state %s does not exist".format(nextState.stateName)))
+      terminate(stay withStopReason Failure("Next state %s does not exist".format(nextState.stateName)))
     } else {
+      nextState.replies.reverse foreach (self reply _)
       if (currentState.stateName != nextState.stateName) {
         handleTransition(currentState.stateName, nextState.stateName)
         notifyListeners(Transition(self, currentState.stateName, nextState.stateName))
       }
-      applyState(nextState)
-    }
-  }
-
-  private def applyState(nextState: State) = {
-    currentState = nextState
-    val timeout = if (currentState.timeout.isDefined) currentState.timeout else stateTimeouts(currentState.stateName)
-    if (timeout.isDefined) {
-      val t = timeout.get
-      if (t.finite_? && t.length >= 0) {
-        timeoutFuture = Some(Scheduler.scheduleOnce(self, TimeoutMarker(generation), t.length, t.unit))
+      currentState = nextState
+      val timeout = if (currentState.timeout.isDefined) currentState.timeout else stateTimeouts(currentState.stateName)
+      if (timeout.isDefined) {
+        val t = timeout.get
+        if (t.finite_? && t.length >= 0) {
+          timeoutFuture = Some(Scheduler.scheduleOnce(self, TimeoutMarker(generation), t.length, t.unit))
+        }
       }
     }
   }
 
-  private def terminate(reason: Reason) = {
-    terminateEvent.apply(StopEvent(reason, currentState.stateName, currentState.stateData))
-    self.stop()
+  override def postStop { terminate(stay withStopReason Shutdown) }
+
+  private def terminate(nextState: State): Unit = {
+    if (!currentState.stopReason.isDefined) {
+      val reason = nextState.stopReason.get
+      reason match {
+        case Failure(ex: Throwable) ⇒ EventHandler.error(ex, this, "terminating due to Failure")
+        case Failure(msg)           ⇒ EventHandler.error(this, msg)
+        case _                      ⇒
+      }
+      val stopEvent = StopEvent(reason, currentState.stateName, currentState.stateData)
+      if (terminateEvent.isDefinedAt(stopEvent))
+        terminateEvent(stopEvent)
+      currentState = nextState
+    }
   }
 
-  case class Event[D](event: Any, stateData: D)
+  case class Event(event: Any, stateData: D)
   object Ev {
-    def unapply[D](e: Event[D]): Option[Any] = Some(e.event)
-  }
-
-  case class State(stateName: S, stateData: D, timeout: Timeout = None) {
-
-    /**
-     * Modify state transition descriptor to include a state timeout for the
-     * next state. This timeout overrides any default timeout set for the next
-     * state.
-     */
-    def forMax(timeout: Duration): State = {
-      copy(timeout = Some(timeout))
-    }
-
-    /**
-     * Send reply to sender of the current message, if available.
-     *
-     * @return this state transition descriptor
-     */
-    def replying(replyValue: Any): State = {
-      self.channel safe_! replyValue
-      this
-    }
-
-    /**
-     * Modify state transition descriptor with new state data. The data will be
-     * set when transitioning to the new state.
-     */
-    def using(nextStateDate: D): State = {
-      copy(stateData = nextStateDate)
-    }
-
-    private[akka] var stopReason: Option[Reason] = None
-
-    private[akka] def withStopReason(reason: Reason): State = {
-      stopReason = Some(reason)
-      this
-    }
+    def unapply[D](e: Event): Option[Any] = Some(e.event)
   }
 
   case class StopEvent[S, D](reason: Reason, currentState: S, stateData: D)
 }
+
+/**
+ * Stackable trait for FSM which adds a rolling event log.
+ *
+ * @author Roland Kuhn
+ * @since 1.2
+ */
+trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒
+
+  import FSM._
+
+  def logDepth: Int = 0
+
+  private val events = new Array[Event](logDepth)
+  private val states = new Array[AnyRef](logDepth)
+  private var pos = 0
+  private var full = false
+
+  private def advance() {
+    val n = pos + 1
+    if (n == logDepth) {
+      full = true
+      pos = 0
+    } else {
+      pos = n
+    }
+  }
+
+  protected[akka] abstract override def setTimer(name: String, msg: Any, timeout: Duration, repeat: Boolean): State = {
+    if (debugEvent)
+      EventHandler.debug(this, "setting " + (if (repeat) "repeating " else "") + "timer '" + name + "'/" + timeout + ": " + msg)
+    super.setTimer(name, msg, timeout, repeat)
+  }
+
+  protected[akka] abstract override def cancelTimer(name: String) = {
+    if (debugEvent)
+      EventHandler.debug(this, "canceling timer '" + name + "'")
+    super.cancelTimer(name)
+  }
+
+  private[akka] abstract override def processEvent(event: Event, source: AnyRef) {
+    if (debugEvent) {
+      val srcstr = source match {
+        case s: String            ⇒ s
+        case Timer(name, _, _, _) ⇒ "timer " + name
+        case c: UntypedChannel    ⇒ c.toString
+        case _                    ⇒ "unknown"
+      }
+      EventHandler.debug(this, "processing " + event + " from " + srcstr)
+    }
+
+    if (logDepth > 0) {
+      states(pos) = stateName.asInstanceOf[AnyRef]
+      events(pos) = event
+      advance()
+    }
+
+    val oldState = stateName
+    super.processEvent(event, source)
+    val newState = stateName
+
+    if (debugEvent && oldState != newState)
+      EventHandler.debug(this, "transition " + oldState + " -> " + newState)
+  }
+
+  /**
+   * Retrieve current rolling log in oldest-first order. The log is filled with
+   * each incoming event before processing by the user supplied state handler.
+   */
+  protected def getLog: IndexedSeq[LogEntry[S, D]] = {
+    val log = events zip states filter (_._1 ne null) map (x ⇒ LogEntry(x._2.asInstanceOf[S], x._1.stateData, x._1.event))
+    if (full) {
+      IndexedSeq() ++ log.drop(pos) ++ log.take(pos)
+    } else {
+      IndexedSeq() ++ log
+    }
+  }
+
+}
+
diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala
index 39fcf0bb2a..038cd9f959 100644
--- a/akka-actor/src/main/scala/akka/event/EventHandler.scala
+++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala
@@ -93,7 +93,8 @@ object EventHandler extends ListenerManagement {
 
   implicit object defaultListenerFormat extends StatelessActorFormat[DefaultListener]
 
-  val level: Int = config.getString("akka.event-handler-level", "INFO") match {
+  @volatile
+  var level: Int = config.getString("akka.event-handler-level", "INFO") match {
     case "ERROR"   ⇒ ErrorLevel
     case "WARNING" ⇒ WarningLevel
     case "INFO"    ⇒ InfoLevel
diff --git a/akka-actor/src/main/scala/akka/util/Duration.scala b/akka-actor/src/main/scala/akka/util/Duration.scala
index 2e4b69756b..6e00ef0c17 100644
--- a/akka-actor/src/main/scala/akka/util/Duration.scala
+++ b/akka-actor/src/main/scala/akka/util/Duration.scala
@@ -90,6 +90,15 @@ object Duration {
     case "ns" | "nano" | "nanos" | "nanosecond" | "nanoseconds"     ⇒ NANOSECONDS
   }
 
+  /*
+   * Testing facilities
+   */
+  val timeFactor: Double = {
+    val factor = System.getProperty("akka.test.timefactor", "1.0")
+    try { factor.toDouble }
+    catch { case e: java.lang.NumberFormatException ⇒ 1.0 }
+  }
+
   val Zero: Duration = new FiniteDuration(0, NANOSECONDS)
 
   trait Infinite {
@@ -234,6 +243,9 @@ abstract class Duration {
   def /(other: Duration): Double
   def unary_- : Duration
   def finite_? : Boolean
+  def dilated: Duration = this * Duration.timeFactor
+  def min(other: Duration): Duration = if (this < other) this else other
+  def max(other: Duration): Duration = if (this > other) this else other
 
   // Java API
   def lt(other: Duration) = this < other
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
index 494fa42fe1..2b98b153c5 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
@@ -73,7 +73,7 @@ class ClusterActorRef private[akka] (
     RemoteActorRef(inetSocketAddress, actorAddress, Actor.TIMEOUT, None)
   }
 
-  def start(): ActorRef = synchronized {
+  def start(): this.type = synchronized[this.type] {
     _status = ActorRefInternals.RUNNING
     this
   }
diff --git a/akka-docs/Makefile b/akka-docs/Makefile
index 9811732058..3c0041537d 100644
--- a/akka-docs/Makefile
+++ b/akka-docs/Makefile
@@ -23,6 +23,12 @@ else
 endif
 export PYTHONPATH
 
+ifeq (,$(QUICK))
+	SPHINXFLAGS = -a
+else
+	SPHINXFLAGS =
+endif
+
 .PHONY: help clean pygments html singlehtml latex pdf
 
 help:
@@ -48,7 +54,7 @@ $(LOCALPACKAGES):
 	$(MAKE) pygments
 
 html: $(LOCALPACKAGES)
-	$(SPHINXBUILD) -a -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
+	$(SPHINXBUILD) $(SPHINXFLAGS) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
 	@echo
 	@echo "Build finished. The HTML pages are in $(BUILDDIR)/html."
 
diff --git a/akka-docs/dev/team.rst b/akka-docs/dev/team.rst
index 53991eb260..8f636ddafa 100644
--- a/akka-docs/dev/team.rst
+++ b/akka-docs/dev/team.rst
@@ -25,4 +25,5 @@ Hiram Chirino        Committer
 Scott Clasen         Committer    
 Roland Kuhn          Committer
 Patrik Nordwall      Committer                   patrik DOT nordwall AT gmail DOT com
+Derek Williams       Committer                   derek AT nebvin DOT ca
 ===================  ==========================  ====================================
\ No newline at end of file
diff --git a/akka-docs/intro/getting-started-first-java.rst b/akka-docs/intro/getting-started-first-java.rst
index 57bfd01db9..e0720570fc 100644
--- a/akka-docs/intro/getting-started-first-java.rst
+++ b/akka-docs/intro/getting-started-first-java.rst
@@ -435,7 +435,7 @@ Here is the master actor::
 
 A couple of things are worth explaining further.
 
-First, we are passing in a ``java.util.concurrent.CountDownLatch`` to the ``Master`` actor. This latch is only used for plumbing (in this specific tutorial), to have a simple way of letting the outside world knowing when the master can deliver the result and shut down. In more idiomatic Akka code, as we will see in part two of this tutorial series, we would not use a latch but other abstractions and functions like ``Channel``, ``Future`` and ``?`` to achieve the same thing in a non-blocking way. But for simplicity let's stick to a ``CountDownLatch`` for now.
+First, we are passing in a ``java.util.concurrent.CountDownLatch`` to the ``Master`` actor. This latch is only used for plumbing (in this specific tutorial), to have a simple way of letting the outside world knowing when the master can deliver the result and shut down. In more idiomatic Akka code, as we will see in part two of this tutorial series, we would not use a latch but other abstractions and functions like ``Channel``, ``Future`` and ``ask()`` to achieve the same thing in a non-blocking way. But for simplicity let's stick to a ``CountDownLatch`` for now.
 
 Second, we are adding a couple of life-cycle callback methods; ``preStart`` and ``postStop``. In the ``preStart`` callback we are recording the time when the actor is started and in the ``postStop`` callback we are printing out the result (the approximation of Pi) and the time it took to calculate it. In this call we also invoke ``latch.countDown()`` to tell the outside world that we are done.
 
diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst
index db698b1a4f..3e95cf4aae 100644
--- a/akka-docs/java/untyped-actors.rst
+++ b/akka-docs/java/untyped-actors.rst
@@ -12,7 +12,7 @@ Module stability: **SOLID**
 The `Actor Model `_ provides a higher level of abstraction for writing concurrent and distributed systems. It alleviates the developer from having to deal with explicit locking and thread management, making it easier to write correct concurrent and parallel systems. Actors were defined in the 1973 paper by Carl Hewitt but have been popularized by the Erlang language, and used for example at Ericsson with great success to build highly concurrent and reliable telecom systems.
 
 Defining an Actor class
-^^^^^^^^^^^^^^^^^^^^^^^
+-----------------------
 
 Actors in Java are created either by extending the 'UntypedActor' class and implementing the 'onReceive' method. This method takes the message as a parameter.
 
@@ -238,9 +238,9 @@ which you do by Channel.sendOneWay(msg)
   public void onReceive(Object message) throws Exception {
     if (message instanceof String) {
       String msg = (String)message;
-      if (msg.equals("Hello") && getContext().getSenderFuture().isDefined()) {
+      if (msg.equals("Hello")) {
         // Reply to original sender of message using the channel
-        getContext().channel().sendOneWay(msg + " from " + getContext().getUuid());
+        getContext().channel().sendOneWaySafe(msg + " from " + getContext().getUuid());
       }
     }
   }
@@ -281,61 +281,36 @@ The 'replyUnsafe' method throws an 'IllegalStateException' if unable to determin
     }
   }
 
-Reply using the sender reference
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-If the sender reference (the sender's 'ActorRef') is passed into one of the 'send*' methods it will be implicitly passed along together with the message and will be available in the 'Option getSender()' method on the 'ActorRef. This means that you can use this field to send a message back to the sender.
-
-On this 'Option' you can invoke 'boolean isDefined()' or 'boolean isEmpty()' to check if the sender is available or not, and if it is call 'get()' to get the reference. It's important to know that 'getSender().get()' will throw an exception if there is no sender in scope. The same pattern holds for using the 'getSenderFuture()' in the section below.
-
-.. code-block:: java
-
-  public void onReceive(Object message) throws Exception {
-    if (message instanceof String) {
-      String msg = (String)message;
-      if (msg.equals("Hello")) {
-        // Reply to original sender of message using the sender reference
-        // also passing along my own reference (the context)
-        if (getContext().getSender().isDefined)
-          getContext().getSender().get().sendOneWay(msg + " from " + getContext().getUuid(), getContext());
-      }
-    }
-  }
-
-Reply using the sender future
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-If a message was sent with the 'sendRequestReply' or 'ask' methods, which both implements request-reply semantics using Future's, then you either have the option of replying using the 'reply' method as above. This method will then resolve the Future. But you can also get a reference to the Future directly and resolve it yourself or if you would like to store it away to resolve it later, or pass it on to some other Actor to resolve it.
-
-The reference to the Future resides in the 'ActorRef' instance and can be retrieved using 'Option getSenderFuture()'.
-
-Promise is a future with methods for 'completing the future:
-* completeWithResult(..)
-* completeWithException(..)
-
-Here is an example of how it can be used:
-
-.. code-block:: java
-
-  public void onReceive(Object message) throws Exception {
-    if (message instanceof String) {
-      String msg = (String)message;
-      if (msg.equals("Hello") && getContext().getSenderFuture().isDefined()) {
-        // Reply to original sender of message using the sender future reference
-        getContext().getSenderFuture().get().completeWithResult(msg + " from " + getContext().getUuid());
-      }
-    }
-  }
-
-
 Summary of reply semantics and options
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
-* getContext().reply(...) can be used to reply to an Actor or a Future.
-* getContext().getSender() is a reference to the actor you can reply to, if it exists
-* getContext().getSenderFuture() is a reference to the future you can reply to, if it exists
-* getContext().channel() is a reference providing an abstraction to either self.sender or self.senderFuture if one is set, providing a single reference to store and reply to (the reference equivalent to the 'reply(...)' method).
-* getContext().getSender() and getContext().getSenderFuture() will never be set at the same time, as there can only be one reference to accept a reply.
+* ``getContext().reply(...)`` can be used to reply to an ``Actor`` or a
+  ``Future`` from within an actor; the current actor will be passed as reply
+  channel if the current channel supports this.
+* ``getContext().channel`` is a reference providing an abstraction for the
+  reply channel; this reference may be passed to other actors or used by
+  non-actor code.
+
+.. note::
+
+  There used to be two methods for determining the sending Actor or Future for the current invocation:
+
+  * ``getContext().getSender()`` yielded a :class:`Option[ActorRef]`
+  * ``getContext().getSenderFuture()`` yielded a :class:`Option[CompletableFuture[Any]]`
+
+  These two concepts have been unified into the ``channel``. If you need to
+  know the nature of the channel, you may do so using instance tests::
+
+    if (getContext().channel() instanceof ActorRef) {
+      ...
+    } else if (getContext().channel() instanceof ActorPromise) {
+      ...
+    }
+
+Promise represents the write-side of a Future, enabled by the methods
+
+* completeWithResult(..)
+* completeWithException(..)
 
 Starting actors
 ---------------
diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst
index 3019272e7f..eb25cb2d1a 100644
--- a/akka-docs/scala/actors.rst
+++ b/akka-docs/scala/actors.rst
@@ -122,71 +122,101 @@ Other good messages types are ``scala.Tuple2``, ``scala.List``, ``scala.Map`` wh
 Send messages
 -------------
 
-Messages are sent to an Actor through one of the “bang” methods.
+Messages are sent to an Actor through one of the following methods.
 
-* ! means “fire-and-forget”, e.g. send a message asynchronously and return immediately.
-* !! means “send-and-reply-eventually”, e.g. send a message asynchronously and wait for a reply through aFuture. Here you can specify a timeout. Using timeouts is very important. If no timeout is specified then the actor’s default timeout (set by the this.timeout variable in the actor) is used. This method returns an ``Option[Any]`` which will be either ``Some(result)`` if returning successfully or None if the call timed out.
-* ? sends a message asynchronously and returns a ``Future``.
+* ``!`` means “fire-and-forget”, e.g. send a message asynchronously and return
+  immediately.
+* ``?`` sends a message asynchronously and returns a :class:`Future`
+  representing a possible reply.
 
-You can check if an Actor can handle a specific message by invoking the ``isDefinedAt`` method:
+.. note::
 
-.. code-block:: scala
+  There used to be two more “bang” methods, which are deprecated and will be
+  removed in Akka 2.0:
 
-  if (actor.isDefinedAt(message)) actor ! message
-  else ...
+  * ``!!`` was similar to the current ``(actor ? msg).as[T]``; deprecation
+    followed from the change of timeout handling described below.
+  * ``!!![T]`` was similar to the current ``(actor ? msg).mapTo[T]``, with the
+    same change in the handling of :class:`Future`’s timeout as for ``!!``, but
+    additionally the old method could defer possible type cast problems into
+    seemingly unrelated parts of the code base.
 
 Fire-forget
 ^^^^^^^^^^^
 
-This is the preferred way of sending messages. No blocking waiting for a message. This gives the best concurrency and scalability characteristics.
+This is the preferred way of sending messages. No blocking waiting for a
+message. This gives the best concurrency and scalability characteristics.
 
 .. code-block:: scala
 
   actor ! "Hello"
 
-If invoked from within an Actor, then the sending actor reference will be implicitly passed along with the message and available to the receiving Actor in its ``sender: Option[AnyRef]`` member field. He can use this to reply to the original sender or use the ``reply(message: Any)`` method.
+If invoked from within an Actor, then the sending actor reference will be
+implicitly passed along with the message and available to the receiving Actor
+in its ``channel: UntypedChannel`` member field. The target actor can use this
+to reply to the original sender, e.g. by using the ``self.reply(message: Any)``
+method.
 
-If invoked from an instance that is **not** an Actor there will be no implicit sender passed along the message and you will get an IllegalStateException if you call ``self.reply(..)``.
-
-Send-And-Receive-Eventually
-^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-Using ``!!`` will send a message to the receiving Actor asynchronously but it will wait for a reply on a ``Future``, blocking the sender Actor until either:
-
-* A reply is received, or
-* The Future times out
-
-You can pass an explicit time-out to the ``!!`` method and if none is specified then the default time-out defined in the sender Actor will be used.
-
-The ``!!`` method returns an ``Option[Any]`` which will be either ``Some(result)`` if returning successfully, or ``None`` if the call timed out.
-Here are some examples:
-
-.. code-block:: scala
-
-  val resultOption = actor !! ("Hello", 1000)
-  if (resultOption.isDefined) ... // handle reply
-  else ... // handle timeout
-
-  val result: Option[String] = actor !! "Hello"
-  resultOption match {
-    case Some(reply) => ... // handle reply
-    case None =>        ... // handle timeout
-  }
-
-  val result = (actor !! "Hello").getOrElse(throw new RuntimeException("TIMEOUT"))
-
-  (actor !! "Hello").foreach(result => ...) // handle result
+If invoked from an instance that is **not** an Actor there will be no implicit
+sender passed along with the message and you will get an
+IllegalActorStateException when calling ``self.reply(...)``.
 
 Send-And-Receive-Future
 ^^^^^^^^^^^^^^^^^^^^^^^
 
-Using ``?`` will send a message to the receiving Actor asynchronously and will return a 'Future':
+Using ``?`` will send a message to the receiving Actor asynchronously and
+will return a :class:`Future`:
 
 .. code-block:: scala
 
   val future = actor ? "Hello"
 
-See :ref:`futures-scala` for more information.
+The receiving actor should reply to this message, which will complete the
+future with the reply message as value; if the actor throws an exception while
+processing the invocation, this exception will also complete the future. If the
+actor does not complete the future, it will expire after the timeout period,
+which is taken from one of the following three locations in order of
+precedence:
+
+#. explicitly given timeout as in ``actor.?("hello")(timeout = 12 millis)``
+#. implicit argument of type :class:`Actor.Timeout`, e.g.
+
+   ::
+
+     implicit val timeout = Actor.Timeout(12 millis)
+     val future = actor ? "hello"
+
+#. default timeout from ``akka.conf``
+
+See :ref:`futures-scala` for more information on how to await or query a
+future.
+
+Send-And-Receive-Eventually
+^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The future returned from the ``?`` method can conveniently be passed around or
+chained with further processing steps, but sometimes you just need the value,
+even if that entails waiting for it (but keep in mind that waiting inside an
+actor is prone to dead-locks, e.g. if obtaining the result depends on
+processing another message on this actor).
+
+For this purpose, there is the method :meth:`Future.as[T]` which waits until
+either the future is completed or its timeout expires, whichever comes first.
+The result is then inspected and returned as :class:`Some[T]` if it was
+normally completed and the answer’s runtime type matches the desired type; in
+all other cases :class:`None` is returned.
+
+.. code-block:: scala
+
+  (actor ? msg).as[String] match {
+    case Some(answer) => ...
+    case None         => ...
+  }
+
+  val resultOption = (actor ? msg).as[String]
+  if (resultOption.isDefined) ... else ...
+
+  for (x <- (actor ? msg).as[Int]) yield { 2 * x }
 
 Forward message
 ^^^^^^^^^^^^^^^
@@ -235,7 +265,7 @@ The Actor trait contains almost no member fields or methods to invoke, you just
   #. preRestart
   #. postRestart
 
-The ``Actor`` trait has one single member field (apart from the ``log`` field from the mixed in ``Logging`` trait):
+The ``Actor`` trait has one single member field:
 
 .. code-block:: scala
 
@@ -314,58 +344,29 @@ The ``reply`` method throws an ``IllegalStateException`` if unable to determine
     if (self.reply_?(result)) ...// success
     else ... // handle failure
 
-Reply using the sender reference
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-If the sender is an Actor then its reference will be implicitly passed along together with the message and will end up in the ``sender: Option[ActorRef]`` member field in the ``ActorRef``. This means that you can use this field to send a message back to the sender.
-
-.. code-block:: scala
-
-  // receiver code
-  case request =>
-    val result = process(request)
-    self.sender.get ! result
-
-It's important to know that ``sender.get`` will throw an exception if the ``sender`` is not defined, e.g. the ``Option`` is ``None``. You can check if it is defined by invoking the ``sender.isDefined`` method, but a more elegant solution is to use ``foreach`` which will only be executed if the sender is defined in the ``sender`` member ``Option`` field. If it is not, then the operation in the ``foreach`` method is ignored.
-
-.. code-block:: scala
-
-  // receiver code
-  case request =>
-    val result = process(request)
-    self.sender.foreach(_ ! result)
-
-The same pattern holds for using the ``senderFuture`` in the section below.
-
-Reply using the sender future
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-If a message was sent with the ``!!`` or ``?`` methods, which both implements request-reply semantics using Future's, then you either have the option of replying using the ``reply`` method as above. This method will then resolve the Future. But you can also get a reference to the Future directly and resolve it yourself or if you would like to store it away to resolve it later, or pass it on to some other Actor to resolve it.
-
-The reference to the Future resides in the ``senderFuture: Option[Promise[_]]`` member field in the ``ActorRef`` class.
-
-Here is an example of how it can be used:
-
-.. code-block:: scala
-
-  case request =>
-    try {
-      val result = process(request)
-      self.senderFuture.foreach(_.completeWithResult(result))
-    } catch {
-      case e =>
-        senderFuture.foreach(_.completeWithException(this, e))
-    }
-
-
 Summary of reply semantics and options
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
-* ``self.reply(...)`` can be used to reply to an ``Actor`` or a ``Future``.
-* ``self.sender`` is a reference to the ``Actor`` you can reply to, if it exists
-* ``self.senderFuture`` is a reference to the ``Future`` you can reply to, if it exists
-* ``self.channel`` is a reference providing an abstraction to either ``self.sender`` or ``self.senderFuture`` if one is set, providing a single reference to store and reply to (the reference equivalent to the ``reply(...)`` method).
-* ``self.sender`` and ``self.senderFuture`` will never be set at the same time, as there can only be one reference to accept a reply.
+* ``self.reply(...)`` can be used to reply to an ``Actor`` or a ``Future`` from
+  within an actor; the current actor will be passed as reply channel if the
+  current channel supports this.
+* ``self.channel`` is a reference providing an abstraction for the reply
+  channel; this reference may be passed to other actors or used by non-actor
+  code.
+
+.. note::
+
+  There used to be two methods for determining the sending Actor or Future for the current invocation:
+
+  * ``self.sender`` yielded a :class:`Option[ActorRef]`
+  * ``self.senderFuture`` yielded a :class:`Option[CompletableFuture[Any]]`
+
+  These two concepts have been unified into the ``channel``. If you need to know the nature of the channel, you may do so using pattern matching::
+
+    self.channel match {
+      case ref : ActorRef => ...
+      case f : ActorCompletableFuture => ...
+    }
 
 Initial receive timeout
 -----------------------
@@ -438,7 +439,7 @@ PoisonPill
 
 You can also send an actor the ``akka.actor.PoisonPill`` message, which will stop the actor when the message is processed.
 
-If the sender is a ``Future`` (e.g. the message is sent with ``!!`` or ``?``), the ``Future`` will be completed with an ``akka.actor.ActorKilledException("PoisonPill")``.
+If the sender is a ``Future`` (e.g. the message is sent with ``?``), the ``Future`` will be completed with an ``akka.actor.ActorKilledException("PoisonPill")``.
 
 HotSwap
 -------
diff --git a/akka-docs/scala/fsm.rst b/akka-docs/scala/fsm.rst
index 8f4b6187ce..48d716c53b 100644
--- a/akka-docs/scala/fsm.rst
+++ b/akka-docs/scala/fsm.rst
@@ -11,6 +11,8 @@ FSM
    :synopsis: Finite State Machine DSL on top of Actors
 .. moduleauthor:: Irmo Manie, Roland Kuhn
 .. versionadded:: 1.0
+.. versionchanged:: 1.2
+   added Tracing and Logging
 
 Module stability: **STABLE**
 
@@ -283,11 +285,11 @@ Initiating Transitions
 ----------------------
 
 The result of any :obj:`stateFunction` must be a definition of the next state
-unless terminating the FSM, which is described in `Termination`_.  The state
-definition can either be the current state, as described by the :func:`stay`
-directive, or it is a different state as given by :func:`goto(state)`. The
-resulting object allows further qualification by way of the modifiers described
-in the following:
+unless terminating the FSM, which is described in `Termination from Inside`_.
+The state definition can either be the current state, as described by the
+:func:`stay` directive, or it is a different state as given by
+:func:`goto(state)`. The resulting object allows further qualification by way
+of the modifiers described in the following:
 
 :meth:`forMax(duration)`
   This modifier sets a state timeout on the next state. This means that a timer
@@ -430,8 +432,8 @@ queued it. The status of any timer may be inquired with
 These named timers complement state timeouts because they are not affected by
 intervening reception of other messages.
 
-Termination
------------
+Termination from Inside
+-----------------------
 
 The FSM is stopped by specifying the result state as
 
@@ -471,6 +473,79 @@ a :class:`StopEvent(reason, stateName, stateData)` as argument:
 As for the :func:`whenUnhandled` case, this handler is not stacked, so each
 invocation of :func:`onTermination` replaces the previously installed handler.
 
+Termination from Outside
+------------------------
+
+When an :class:`ActorRef` associated to a FSM is stopped using the
+:meth:`stop()` method, its :meth:`postStop` hook will be executed. The default
+implementation by the :class:`FSM` trait is to execute the
+:meth:`onTermination` handler if that is prepared to handle a
+:obj:`StopEvent(Shutdown, ...)`.
+
+.. warning::
+
+  In case you override :meth:`postStop` and want to have your
+  :meth:`onTermination` handler called, do not forget to call
+  ``super.postStop``.
+
+Testing and Debugging Finite State Machines
+===========================================
+
+During development and for trouble shooting FSMs need care just as any other
+actor. There are specialized tools available as described in :ref:`TestFSMRef`
+and in the following.
+
+Event Tracing
+-------------
+
+The setting ``akka.actor.debug.fsm`` in ``akka.conf`` enables logging of an
+event trace by :class:`LoggingFSM` instances::
+
+  class MyFSM extends Actor with LoggingFSM[X, Z] {
+    ...
+  }
+
+This FSM will log at DEBUG level:
+
+  * all processed events, including :obj:`StateTimeout` and scheduled timer
+    messages
+  * every setting and cancellation of named timers
+  * all state transitions
+
+Life cycle changes and special messages can be logged as described for
+:ref:`Actors `.
+
+Rolling Event Log
+-----------------
+
+The :class:`LoggingFSM` trait adds one more feature to the FSM: a rolling event
+log which may be used during debugging (for tracing how the FSM entered a
+certain failure state) or for other creative uses::
+
+  class MyFSM extends Actor with LoggingFSM[X, Z] {
+    override def logDepth = 12
+    onTermination {
+      case StopEvent(Failure(_), state, data) =>
+        EventHandler.warning(this, "Failure in state "+state+" with data "+data+"\n"+
+          "Events leading up to this point:\n\t"+getLog.mkString("\n\t"))
+    }
+    ...
+  }
+
+The :meth:`logDepth` defaults to zero, which turns off the event log.
+
+.. warning::
+
+  The log buffer is allocated during actor creation, which is why the
+  configuration is done using a virtual method call. If you want to override
+  with a ``val``, make sure that its initialization happens before the
+  initializer of :class:`LoggingFSM` runs, and do not change the value returned
+  by ``logDepth`` after the buffer has been allocated.
+
+The contents of the event log are available using method :meth:`getLog`, which
+returns an :class:`IndexedSeq[LogEntry]` where the oldest entry is at index
+zero.
+
 Examples
 ========
 
diff --git a/akka-docs/scala/remote-actors.rst b/akka-docs/scala/remote-actors.rst
index 305783238d..7fcfe34c01 100644
--- a/akka-docs/scala/remote-actors.rst
+++ b/akka-docs/scala/remote-actors.rst
@@ -344,7 +344,7 @@ Client side usage
 .. code-block:: scala
 
   val actor = remote.actorFor("hello-service", "localhost", 2552)
-  val result = actor !! "Hello"
+  val result = (actor ? "Hello").as[String]
 
 There are many variations on the 'remote#actorFor' method. Here are some of them:
 
@@ -394,7 +394,7 @@ Paste in the code below into two sbt concole shells. Then run:
 
     def run() {
       val actor = remote.actorFor("hello-service", "localhost", 2552)
-      val result = actor !! "Hello"
+      val result = (actor ? "Hello").as[AnyRef]
       EventHandler.info("Result from Remote Actor: %s", result)
     }
 
@@ -691,7 +691,7 @@ Using the generated message builder to send the message to a remote actor:
 
 .. code-block:: scala
 
-  val result = actor !! ProtobufPOJO.newBuilder
+  val resultFuture = actor ? ProtobufPOJO.newBuilder
       .setId(11)
       .setStatus(true)
       .setName("Coltrane")
diff --git a/akka-docs/scala/serialization.rst b/akka-docs/scala/serialization.rst
index 1d8276f726..8cb656d750 100644
--- a/akka-docs/scala/serialization.rst
+++ b/akka-docs/scala/serialization.rst
@@ -96,13 +96,13 @@ Step 3: Import the type class module definition and serialize / de-serialize::
     import BinaryFormatMyActor._
 
     val actor1 = actorOf[MyActor].start()
-    (actor1 !! "hello").getOrElse("_") should equal("world 1")
-    (actor1 !! "hello").getOrElse("_") should equal("world 2")
+    (actor1 ? "hello").as[String].getOrElse("_") should equal("world 1")
+    (actor1 ? "hello").as[String].getOrElse("_") should equal("world 2")
 
     val bytes = toBinary(actor1)
     val actor2 = fromBinary(bytes)
     actor2.start()
-    (actor2 !! "hello").getOrElse("_") should equal("world 3")
+    (actor2 ? "hello").as[String].getOrElse("_") should equal("world 3")
   }
 
 Helper Type Class for Stateless Actors
@@ -138,13 +138,13 @@ and use it for serialization::
     import BinaryFormatMyStatelessActor._
 
     val actor1 = actorOf[MyStatelessActor].start()
-    (actor1 !! "hello").getOrElse("_") should equal("world")
-    (actor1 !! "hello").getOrElse("_") should equal("world")
+    (actor1 ? "hello").as[String].getOrElse("_") should equal("world")
+    (actor1 ? "hello").as[String].getOrElse("_") should equal("world")
 
     val bytes = toBinary(actor1)
     val actor2 = fromBinary(bytes)
     actor2.start()
-    (actor2 !! "hello").getOrElse("_") should equal("world")
+    (actor2 ? "hello").as[String].getOrElse("_") should equal("world")
   }
 
 
@@ -189,13 +189,13 @@ and serialize / de-serialize::
     import BinaryFormatMyJavaSerializableActor._
 
     val actor1 = actorOf[MyJavaSerializableActor].start()
-    (actor1 !! "hello").getOrElse("_") should equal("world 1")
-    (actor1 !! "hello").getOrElse("_") should equal("world 2")
+    (actor1 ? "hello").as[String].getOrElse("_") should equal("world 1")
+    (actor1 ? "hello").as[String].getOrElse("_") should equal("world 2")
 
     val bytes = toBinary(actor1)
     val actor2 = fromBinary(bytes)
     actor2.start()
-    (actor2 !! "hello").getOrElse("_") should equal("world 3")
+    (actor2 ? "hello").as[String].getOrElse("_") should equal("world 3")
   }
 
 
@@ -382,7 +382,7 @@ compiler::
 When you compile the spec you will among other things get a message builder. You
 then use this builder to create the messages to send over the wire::
 
-  val result = remoteActor !! ProtobufPOJO.newBuilder
+  val resultFuture = remoteActor ? ProtobufPOJO.newBuilder
       .setId(11)
       .setStatus(true)
       .setName("Coltrane")
diff --git a/akka-docs/scala/testing.rst b/akka-docs/scala/testing.rst
index eee3a2b029..f3bb07c5bb 100644
--- a/akka-docs/scala/testing.rst
+++ b/akka-docs/scala/testing.rst
@@ -18,6 +18,8 @@ Testing Actor Systems
 .. versionadded:: 1.0
 .. versionchanged:: 1.1
    added :class:`TestActorRef`
+.. versionchanged:: 1.2
+   added :class:`TestFSMRef`
 
 As with any piece of software, automated tests are a very important part of the
 development cycle. The actor model presents a different view on how units of
@@ -83,6 +85,53 @@ Since :class:`TestActorRef` is generic in the actor type it returns the
 underlying actor with its proper static type. From this point on you may bring
 any unit testing tool to bear on your actor as usual.
 
+.. _TestFSMRef:
+
+Testing Finite State Machines
+-----------------------------
+
+If your actor under test is a :class:`FSM`, you may use the special
+:class:`TestFSMRef` which offers all features of a normal :class:`TestActorRef`
+and in addition allows access to the internal state::
+
+  import akka.testkit.TestFSMRef
+  import akka.util.duration._
+
+  val fsm = TestFSMRef(new Actor with FSM[Int, String] {
+      startWith(1, "")
+      when (1) {
+        case Ev("go") => goto(2) using "go"
+      }
+      when (2) {
+        case Ev("back") => goto(1) using "back"
+      }
+    }).start()
+  
+  assert (fsm.stateName == 1)
+  assert (fsm.stateData == "")
+  fsm ! "go"                      // being a TestActorRef, this runs also on the CallingThreadDispatcher
+  assert (fsm.stateName == 2)
+  assert (fsm.stateData == "go")
+  
+  fsm.setState(stateName = 1)
+  assert (fsm.stateName == 1)
+
+  assert (fsm.timerActive_?("test") == false)
+  fsm.setTimer("test", 12, 10 millis, true)
+  assert (fsm.timerActive_?("test") == true)
+  fsm.cancelTimer("test")
+  assert (fsm.timerActive_?("test") == false)
+
+Due to a limitation in Scala’s type inference, there is only the factory method
+shown above, so you will probably write code like ``TestFSMRef(new MyFSM)``
+instead of the hypothetical :class:`ActorRef`-inspired ``TestFSMRef[MyFSM]``.
+All methods shown above directly access the FSM state without any
+synchronization; this is perfectly alright if the
+:class:`CallingThreadDispatcher` is used (which is the default for
+:class:`TestFSMRef`) and no other threads are involved, but it may lead to
+surprises if you were to actually exercise timer events, because those are
+executed on the :obj:`Scheduler` thread.
+
 Testing the Actor's Behavior
 ----------------------------
 
@@ -104,8 +153,8 @@ into a :class:`TestActorRef`.
 .. code-block:: scala
 
    val actorRef = TestActorRef(new MyActor)
-   val result = actorRef !! Say42 // hypothetical message stimulating a '42' answer
-   result must be (42)
+   val result = (actorRef ? Say42).as[Int] // hypothetical message stimulating a '42' answer
+   result must be (Some(42))
 
 As the :class:`TestActorRef` is a subclass of :class:`LocalActorRef` with a few
 special extras, also aspects like linking to a supervisor and restarting work
@@ -212,6 +261,125 @@ classes, receiving nothing for some time, etc.
    The test actor shuts itself down by default after 5 seconds (configurable)
    of inactivity, relieving you of the duty of explicitly managing it.
 
+Built-In Assertions
+-------------------
+
+The abovementioned :meth:`expectMsg` is not the only method for formulating
+assertions concerning received messages. Here is the full list:
+
+  * :meth:`expectMsg[T](d: Duration, msg: T): T`
+
+    The given message object must be received within the specified time; the
+    object will be returned.
+
+  * :meth:`expectMsgPF[T](d: Duration)(pf: PartialFunction[Any, T]): T`
+
+    Within the given time period, a message must be received and the given
+    partial function must be defined for that message; the result from applying
+    the partial function to the received message is returned. The duration may
+    be left unspecified (empty parentheses are required in this case) to use
+    the deadline from the innermost enclosing :ref:`within `
+    block instead.
+
+  * :meth:`expectMsgClass[T](d: Duration, c: Class[T]): T`
+
+    An object which is an instance of the given :class:`Class` must be received
+    within the allotted time frame; the object will be returned. Note that this
+    does a conformance check; if you need the class to be equal, have a look at
+    :meth:`expectMsgAllClassOf` with a single given class argument.
+
+  * :meth:`expectMsgAnyOf[T](d: Duration, obj: T*): T`
+
+    An object must be received within the given time, and it must be equal (
+    compared with ``==``) to at least one of the passed reference objects; the
+    received object will be returned.
+
+  * :meth:`expectMsgAnyClassOf[T](d: Duration, obj: Class[_ <: T]*): T`
+
+    An object must be received within the given time, and it must be an
+    instance of at least one of the supplied :class:`Class` objects; the
+    received object will be returned.
+
+  * :meth:`expectMsgAllOf[T](d: Duration, obj: T*): Seq[T]`
+
+    A number of objects matching the size of the supplied object array must be
+    received within the given time, and for each of the given objects there
+    must exist at least one among the received ones which equals (compared with
+    ``==``) it. The full sequence of received objects is returned.
+
+  * :meth:`expectMsgAllClassOf[T](d: Duration, c: Class[_ <: T]*): Seq[T]`
+
+    A number of objects matching the size of the supplied :class:`Class` array
+    must be received within the given time, and for each of the given classes
+    there must exist at least one among the received objects whose class equals
+    (compared with ``==``) it (this is *not* a conformance check). The full
+    sequence of received objects is returned.
+
+  * :meth:`expectMsgAllConformingOf[T](d: Duration, c: Class[_ <: T]*): Seq[T]`
+
+    A number of objects matching the size of the supplied :class:`Class` array
+    must be received within the given time, and for each of the given classes
+    there must exist at least one among the received objects which is an
+    instance of this class. The full sequence of received objects is returned.
+
+  * :meth:`expectNoMsg(d: Duration)`
+
+    No message must be received within the given time. This also fails if a
+    message has been received before calling this method which has not been
+    removed from the queue using one of the other methods.
+
+  * :meth:`receiveN(n: Int, d: Duration): Seq[AnyRef]`
+
+    ``n`` messages must be received within the given time; the received
+    messages are returned.
+
+In addition to message reception assertions there are also methods which help
+with message flows:
+
+  * :meth:`receiveOne(d: Duration): AnyRef`
+
+    Tries to receive one message for at most the given time interval and
+    returns ``null`` in case of failure. If the given Duration is zero, the
+    call is non-blocking (polling mode).
+
+  * :meth:`receiveWhile[T](max: Duration, idle: Duration)(pf: PartialFunction[Any, T]): Seq[T]`
+
+    Collect messages as long as
+    
+    * they are matching the given partial function
+    * the given time interval is not used up
+    * the next message is received within the idle timeout
+      
+    All collected messages are returned. The maximum duration defaults to the
+    time remaining in the innermost enclosing :ref:`within `
+    block and the idle duration defaults to infinity (thereby disabling the
+    idle timeout feature).
+
+  * :meth:`awaitCond(p: => Boolean, max: Duration, interval: Duration)`
+
+    Poll the given condition every :obj:`interval` until it returns ``true`` or
+    the :obj:`max` duration is used up. The interval defaults to 100 ms and the
+    maximum defaults to the time remaining in the innermost enclosing
+    :ref:`within ` block.
+
+  * :meth:`ignoreMsg(pf: PartialFunction[AnyRef, Boolean])`
+    
+    :meth:`ignoreNoMsg`
+
+    The internal :obj:`testActor` contains a partial function for ignoring
+    messages: it will only enqueue messages which do not match the function or
+    for which the function returns ``false``. This function can be set and
+    reset using the methods given above; each invocation replaces the previous
+    function, they are not composed.
+
+    This feature is useful e.g. when testing a logging system, where you want
+    to ignore regular messages and are only interested in your specific ones.
+
+.. _TestKit.within:
+
+Timing Assertions
+-----------------
+
 Another important part of functional testing concerns timing: certain events
 must not happen immediately (like a timer), others need to happen before a
 deadline. Therefore, all examination methods accept an upper time limit within
@@ -259,6 +427,137 @@ Ray Roestenburg has written a great article on using the TestKit:
 ``_.
 His full example is also available :ref:`here `.
 
+Accounting for Slow Test Systems
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The tight timeouts you use during testing on your lightning-fast notebook will
+invariably lead to spurious test failures on the heavily loaded Jenkins server
+(or similar). To account for this situation, all maximum durations are
+internally scaled by a factor taken from ``akka.conf``,
+``akka.test.timefactor``, which defaults to 1.
+
+Resolving Conflicts with Implicit ActorRef
+------------------------------------------
+
+The :class:`TestKit` trait contains an implicit value of type :class:`ActorRef`
+to enable the magic reply handling. This value is named ``self`` so that e.g.
+anonymous actors may be declared within a test class without having to care
+about the ambiguous implicit issues which would otherwise arise. If you find
+yourself in a situation where the implicit you need comes from a different
+trait than :class:`TestKit` and is not named ``self``, then use
+:class:`TestKitLight`, which differs only in not having any implicit members.
+You would then need to make an implicit available in locally confined scopes
+which need it, e.g. different test cases. If this cannot be done, you will need
+to resort to explicitly specifying the sender reference::
+
+  val actor = actorOf[MyWorker].start()
+  actor.!(msg)(testActor)
+
+Using Multiple Probe Actors
+---------------------------
+
+When the actors under test are supposed to send various messages to different
+destinations, it may be difficult distinguishing the message streams arriving
+at the :obj:`testActor` when using the :class:`TestKit` as a mixin. Another
+approach is to use it for creation of simple probe actors to be inserted in the
+message flows. To make this more powerful and convenient, there is a concrete
+implementation called :class:`TestProbe`. The functionality is best explained
+using a small example::
+
+  class MyDoubleEcho extends Actor {
+    var dest1 : ActorRef = _
+    var dest2 : ActorRef = _
+    def receive = {
+      case (d1, d2) =>
+        dest1 = d1
+        dest2 = d2
+      case x =>
+        dest1 ! x
+        dest2 ! x
+    }
+  }
+
+  val probe1 = TestProbe()
+  val probe2 = TestProbe()
+  val actor = Actor.actorOf[MyDoubleEcho].start()
+  actor ! (probe1.ref, probe2.ref)
+  actor ! "hello"
+  probe1.expectMsg(50 millis, "hello")
+  probe2.expectMsg(50 millis, "hello")
+
+Probes may also be equipped with custom assertions to make your test code even
+more concise and clear::
+
+  case class Update(id : Int, value : String)
+
+  val probe = new TestProbe {
+      def expectUpdate(x : Int) = {
+        expectMsg {
+          case Update(id, _) if id == x => true
+        }
+        reply("ACK")
+      }
+    }
+
+You have complete flexibility here in mixing and matching the :class:`TestKit`
+facilities with your own checks and choosing an intuitive name for it. In real
+life your code will probably be a bit more complicated than the example given
+above; just use the power!
+
+Replying to Messages Received by Probes
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The probes keep track of the communications channel for replies, if possible,
+so they can also reply::
+
+  val probe = TestProbe()
+  val future = probe.ref ? "hello"
+  probe.expectMsg(0 millis, "hello") // TestActor runs on CallingThreadDispatcher
+  probe.reply("world")
+  assert (future.isCompleted && future.as[String] == "world")
+
+Forwarding Messages Received by Probes
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Given a destination actor ``dest`` which in the nominal actor network would
+receive a message from actor ``source``. If you arrange for the message to be
+sent to a :class:`TestProbe` ``probe`` instead, you can make assertions
+concerning volume and timing of the message flow while still keeping the
+network functioning::
+
+  val probe = TestProbe()
+  val source = Actor.actorOf(new Source(probe)).start()
+  val dest = Actor.actorOf[Destination].start()
+  source ! "start"
+  probe.expectMsg("work")
+  probe.forward(dest)
+
+The ``dest`` actor will receive the same message invocation as if no test probe
+had intervened.
+
+Caution about Timing Assertions
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The behavior of :meth:`within` blocks when using test probes might be perceived
+as counter-intuitive: you need to remember that the nicely scoped deadline as
+described :ref:`above ` is local to each probe. Hence, probes
+do not react to each other's deadlines or to the deadline set in an enclosing
+:class:`TestKit` instance::
+
+  class SomeTest extends TestKit {
+
+    val probe = TestProbe()
+
+    within(100 millis) {
+      probe.expectMsg("hallo")  // Will hang forever!
+    }
+  }
+
+This test will hang indefinitely, because the :meth:`expectMsg` call does not
+see any deadline. Currently, the only option is to use ``probe.within`` in the
+above code to make it work; later versions may include lexically scoped
+deadlines using implicit arguments.
+
 CallingThreadDispatcher
 =======================
 
@@ -374,3 +673,68 @@ has to offer:
    exception stack traces
  - Exclusion of certain classes of dead-lock scenarios
 
+.. _actor.logging:
+
+Tracing Actor Invocations
+=========================
+
+The testing facilities described up to this point were aiming at formulating
+assertions about a system’s behavior. If a test fails, it is usually your job
+to find the cause, fix it and verify the test again. This process is supported
+by debuggers as well as logging, where the Akka toolkit offers the following
+options:
+
+* *Logging of exceptions thrown within Actor instances*
+  
+  This is always on; in contrast to the other logging mechanisms, this logs at
+  ``ERROR`` level.
+
+* *Logging of message invocations on certain actors*
+
+  This is enabled by a setting in ``akka.conf`` — namely
+  ``akka.actor.debug.receive`` — which enables the :meth:`loggable`
+  statement to be applied to an actor’s :meth:`receive` function::
+
+    def receive = Actor.loggable(this) { // `Actor` unnecessary with import Actor._
+      case msg => ...
+    } 
+
+  The first argument to :meth:`loggable` defines the source to be used in the
+  logging events, which should be the current actor.
+
+  If the abovementioned setting is not given in ``akka.conf``, this method will
+  pass through the given :class:`Receive` function unmodified, meaning that
+  there is no runtime cost unless actually enabled.
+
+  The logging feature is coupled to this specific local mark-up because
+  enabling it uniformly on all actors is not usually what you need, and it
+  would lead to endless loops if it were applied to :class:`EventHandler`
+  listeners.
+
+* *Logging of special messages*
+
+  Actors handle certain special messages automatically, e.g. :obj:`Kill`,
+  :obj:`PoisonPill`, etc. Tracing of these message invocations is enabled by
+  the setting ``akka.actor.debug.autoreceive``, which enables this on all
+  actors.
+
+* *Logging of the actor lifecycle*
+
+  Actor creation, start, restart, link, unlink and stop may be traced by
+  enabling the setting ``akka.actor.debug.lifecycle``; this, too, is enabled
+  uniformly on all actors.
+
+All these messages are logged at ``DEBUG`` level. To summarize, you can enable
+full logging of actor activities using this configuration fragment::
+
+  akka {
+    event-handler-level = "DEBUG"
+    actor {
+      debug {
+        receive = "true"
+        autoreceive = "true"
+        lifecycle = "true"
+      }
+    }
+  }
+
diff --git a/akka-docs/scala/transactors.rst b/akka-docs/scala/transactors.rst
index e4ee824cd3..a3478a9572 100644
--- a/akka-docs/scala/transactors.rst
+++ b/akka-docs/scala/transactors.rst
@@ -71,7 +71,7 @@ Here is an example of coordinating two simple counter Actors so that they both i
 
   ...
 
-  counter1 !! GetCount // Some(1)
+  (counter1 ? GetCount).as[Int] // Some(1)
 
   counter1.stop()
   counter2.stop()
diff --git a/akka-docs/scala/tutorial-chat-server.rst b/akka-docs/scala/tutorial-chat-server.rst
index 1319ee9432..1c10abcae9 100644
--- a/akka-docs/scala/tutorial-chat-server.rst
+++ b/akka-docs/scala/tutorial-chat-server.rst
@@ -155,7 +155,22 @@ Client: Sending messages
 
 Our client wraps each message send in a function, making it a bit easier to use. Here we assume that we have a reference to the chat service so we can communicate with it by sending messages. Messages are sent with the '!' operator (pronounced "bang"). This sends a message of asynchronously and do not wait for a reply.
 
-Sometimes however, there is a need for sequential logic, sending a message and wait for the reply before doing anything else. In Akka we can achieve that using the '!!' ("bangbang") operator. When sending a message with '!!' we do not return immediately but wait for a reply using a `Future `_. A 'Future' is a promise that we will get a result later but with the difference from regular method dispatch that the OS thread we are running on is put to sleep while waiting and that we can set a time-out for how long we wait before bailing out, retrying or doing something else. The '!!' function returns a `scala.Option `_ which implements the `Null Object pattern `_. It has two subclasses; 'None' which means no result and 'Some(value)' which means that we got a reply. The 'Option' class has a lot of great methods to work with the case of not getting a defined result. F.e. as you can see below we are using the 'getOrElse' method which will try to return the result and if there is no result defined invoke the "...OrElse" statement.
+Sometimes however, there is a need for sequential logic, sending a message and
+wait for the reply before doing anything else. In Akka we can achieve that
+using the '?' operator. When sending a message with '?' we get back a `Future
+`_. A 'Future' is a promise
+that we will get a result later but with the difference from regular method
+dispatch that the OS thread we are running on is put to sleep while waiting and
+that we can set a time-out for how long we wait before bailing out, retrying or
+doing something else. This waiting is achieved with the :meth:`Future.as[T]`
+method, which returns a `scala.Option
+`_ which implements
+the `Null Object pattern `_.
+It has two subclasses; 'None' which means no result and 'Some(value)' which
+means that we got a reply. The 'Option' class has a lot of great methods to
+work with the case of not getting a defined result. F.e. as you can see below
+we are using the 'getOrElse' method which will try to return the result and if
+there is no result defined invoke the "...OrElse" statement.
 
 .. code-block:: scala
 
@@ -165,7 +180,7 @@ Sometimes however, there is a need for sequential logic, sending a message and w
     def login                 = chat ! Login(name)
     def logout                = chat ! Logout(name)
     def post(message: String) = chat ! ChatMessage(name, name + ": " + message)
-    def chatLog               = (chat !! GetChatLog(name)).as[ChatLog]
+    def chatLog               = (chat ? GetChatLog(name)).as[ChatLog]
                                   .getOrElse(throw new Exception("Couldn't get the chat log from ChatServer"))
   }
 
diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala
index a940d5ce01..d859f695ea 100644
--- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala
+++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala
@@ -38,14 +38,6 @@ class TestActorRef[T <: Actor](factory: () ⇒ T, address: String)
    */
   def underlyingActor: T = actor.asInstanceOf[T]
 
-  /**
-   * Override to return the more specific static type.
-   */
-  override def start() = {
-    super.start()
-    this
-  }
-
   override def toString = "TestActor[" + address + ":" + uuid + "]"
 
   override def equals(other: Any) =
diff --git a/akka-actor-tests/src/main/scala/akka/testing/TestBarrier.scala b/akka-testkit/src/main/scala/akka/testkit/TestBarrier.scala
similarity index 92%
rename from akka-actor-tests/src/main/scala/akka/testing/TestBarrier.scala
rename to akka-testkit/src/main/scala/akka/testkit/TestBarrier.scala
index 8806bf903e..87c258b255 100644
--- a/akka-actor-tests/src/main/scala/akka/testing/TestBarrier.scala
+++ b/akka-testkit/src/main/scala/akka/testkit/TestBarrier.scala
@@ -2,7 +2,7 @@
  * Copyright (C) 2009-2011 Scalable Solutions AB 
  */
 
-package akka.testing
+package akka.testkit
 
 import akka.util.Duration
 import java.util.concurrent.{ CyclicBarrier, TimeUnit, TimeoutException }
@@ -31,7 +31,7 @@ class TestBarrier(count: Int) {
       barrier.await(Testing.testTime(timeout.toNanos), TimeUnit.NANOSECONDS)
     } catch {
       case e: TimeoutException ⇒
-        throw new TestBarrierTimeoutException("Timeout of %s and time factor of %s" format (timeout.toString, Testing.timeFactor))
+        throw new TestBarrierTimeoutException("Timeout of %s and time factor of %s" format (timeout.toString, Duration.timeFactor))
     }
   }
 
diff --git a/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala
new file mode 100644
index 0000000000..f7b064923f
--- /dev/null
+++ b/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala
@@ -0,0 +1,86 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB 
+ */
+
+package akka.testkit
+
+import akka.actor._
+import akka.util._
+
+import com.eaio.uuid.UUID
+
+/**
+ * This is a specialised form of the TestActorRef with support for querying and
+ * setting the state of a FSM. Use a LoggingFSM with this class if you also
+ * need to inspect event traces.
+ *
+ * 

+ * val fsm = TestFSMRef(new Actor with LoggingFSM[Int, Null] {
+ *     override def logDepth = 12
+ *     startWith(1, null)
+ *     when(1) {
+ *       case Ev("hello") => goto(2)
+ *     }
+ *     when(2) {
+ *       case Ev("world") => goto(1)
+ *     }
+ *   }
+ * assert (fsm.stateName == 1)
+ * fsm ! "hallo"
+ * assert (fsm.stateName == 2)
+ * assert (fsm.underlyingActor.getLog == IndexedSeq(FSMLogEntry(1, null, "hallo")))
+ * 
+ * + * @author Roland Kuhn + * @since 1.2 + */ +class TestFSMRef[S, D, T <: Actor](factory: () ⇒ T, address: String)(implicit ev: T <:< FSM[S, D]) extends TestActorRef(factory, address) { + + private def fsm = underlyingActor + + /** + * Get current state name of this FSM. + */ + def stateName: S = fsm.stateName + + /** + * Get current state data of this FSM. + */ + def stateData: D = fsm.stateData + + /** + * Change FSM state; any value left out defaults to the current FSM state + * (timeout defaults to None). This method is directly equivalent to a + * corresponding transition initiated from within the FSM, including timeout + * and stop handling. + */ + def setState(stateName: S = fsm.stateName, stateData: D = fsm.stateData, timeout: Option[Duration] = None, stopReason: Option[FSM.Reason] = None) { + fsm.applyState(FSM.State(stateName, stateData, timeout, stopReason)) + } + + /** + * Proxy for FSM.setTimer. + */ + def setTimer(name: String, msg: Any, timeout: Duration, repeat: Boolean) { + fsm.setTimer(name, msg, timeout, repeat) + } + + /** + * Proxy for FSM.cancelTimer. + */ + def cancelTimer(name: String) { fsm.cancelTimer(name) } + + /** + * Proxy for FSM.timerActive_?. + */ + def timerActive_?(name: String) = fsm.timerActive_?(name) + +} + +object TestFSMRef { + + def apply[S, D, T <: Actor](factory: ⇒ T)(implicit ev: T <:< FSM[S, D]): TestFSMRef[S, D, T] = new TestFSMRef(() ⇒ factory, new UUID().toString) + + def apply[S, D, T <: Actor](factory: ⇒ T, address: String)(implicit ev: T <:< FSM[S, D]): TestFSMRef[S, D, T] = new TestFSMRef(() ⇒ factory, address) + +} diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index 25bd87bd3b..0b07e61783 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -8,7 +8,8 @@ import Actor._ import akka.util.Duration import akka.util.duration._ -import java.util.concurrent.{ BlockingDeque, LinkedBlockingDeque, TimeUnit } +import java.util.concurrent.{ BlockingDeque, LinkedBlockingDeque, TimeUnit, atomic } +import atomic.AtomicInteger import scala.annotation.tailrec @@ -81,10 +82,14 @@ class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor with FSM[ * constructor as shown above, which makes this a non-issue, otherwise take * care not to run tests within a single test class instance in parallel. * + * It should be noted that for CI servers and the like all maximum Durations + * are scaled using their Duration.dilated method, which uses the + * Duration.timeFactor settable via akka.conf entry "akka.test.timefactor". + * * @author Roland Kuhn * @since 1.1 */ -trait TestKit { +trait TestKitLight { import TestActor.{ Message, RealMessage, NullMessage } @@ -95,7 +100,7 @@ trait TestKit { * ActorRef of the test actor. Access is provided to enable e.g. * registration as message target. */ - implicit val testActor = localActorOf(new TestActor(queue)).start() + val testActor = localActorOf(new TestActor(queue), "testActor" + TestKit.testActorId.incrementAndGet()).start() /** * Implicit sender reference so that replies are possible for messages sent @@ -145,7 +150,7 @@ trait TestKit { def ignoreNoMsg { testActor ! TestActor.SetIgnore(None) } /** - * Obtain current time (`System.currentTimeMillis`) as Duration. + * Obtain current time (`System.nanoTime`) as Duration. */ def now: Duration = System.nanoTime.nanos @@ -154,12 +159,45 @@ trait TestKit { */ def remaining: Duration = end - now + /** + * Query queue status. + */ + def msgAvailable = !queue.isEmpty + + /** + * Block until the given condition evaluates to `true` or the timeout + * expires, whichever comes first. + * + * If no timeout is given, take it from the innermost enclosing `within` + * block. + * + * Note that the timeout is scaled using Duration.timeFactor. + */ + def awaitCond(p: ⇒ Boolean, max: Duration = Duration.MinusInf, interval: Duration = 100.millis) { + val _max = if (max eq Duration.MinusInf) remaining else max.dilated + val stop = now + _max + + @tailrec + def poll(t: Duration) { + if (!p) { + assert(now < stop, "timeout " + _max + " expired") + Thread.sleep(t.toMillis) + poll((stop - now) min interval) + } + } + + poll(_max min interval) + } + /** * Execute code block while bounding its execution time between `min` and * `max`. `within` blocks may be nested. All methods in this trait which * take maximum wait times are available in a version which implicitly uses * the remaining time governed by the innermost enclosing `within` block. * + * Note that the max Duration is scaled by Duration.timeFactor while the min + * Duration is not. + * *
    * val ret = within(50 millis) {
    *         test ! "ping"
@@ -168,11 +206,12 @@ trait TestKit {
    * 
*/ def within[T](min: Duration, max: Duration)(f: ⇒ T): T = { + val _max = max.dilated val start = now val rem = end - start assert(rem >= min, "required min time " + min + " not possible, only " + format(min.unit, rem) + " left") - val max_diff = if (max < rem) max else rem + val max_diff = _max min rem val prev_end = end end = start + max_diff @@ -184,7 +223,7 @@ trait TestKit { * caution: HACK AHEAD */ if (now - lastSoftTimeout > 5.millis) { - assert(diff <= max_diff, "block took " + format(max.unit, diff) + ", exceeding " + format(max.unit, max_diff)) + assert(diff <= max_diff, "block took " + format(_max.unit, diff) + ", exceeding " + format(_max.unit, max_diff)) } else { lastSoftTimeout -= 5.millis } @@ -206,10 +245,9 @@ trait TestKit { def reply(msg: AnyRef) { lastMessage.channel ! msg } /** - * Same as `expectMsg`, but takes the maximum wait time from the innermost - * enclosing `within` block. + * Same as `expectMsg(remaining, obj)`, but correctly treating the timeFactor. */ - def expectMsg(obj: Any): AnyRef = expectMsg(remaining, obj) + def expectMsg[T](obj: T): T = expectMsg_internal(remaining, obj) /** * Receive one message from the test actor and assert that it equals the @@ -218,18 +256,20 @@ trait TestKit { * * @return the received object */ - def expectMsg(max: Duration, obj: Any): AnyRef = { + def expectMsg[T](max: Duration, obj: T): T = expectMsg_internal(max.dilated, obj) + + private def expectMsg_internal[T](max: Duration, obj: T): T = { val o = receiveOne(max) assert(o ne null, "timeout during expectMsg") assert(obj == o, "expected " + obj + ", found " + o) - o + o.asInstanceOf[T] } /** - * Same as `expectMsg`, but takes the maximum wait time from the innermost - * enclosing `within` block. + * Same as `expectMsg(remaining)(f)`, but correctly treating the timeFactor. */ - def expectMsg[T](f: PartialFunction[Any, T]): T = expectMsg(remaining)(f) + @deprecated("use expectMsgPF instead", "1.2") + def expectMsg[T](f: PartialFunction[Any, T]): T = expectMsgPF()(f) /** * Receive one message from the test actor and assert that the given @@ -241,18 +281,31 @@ trait TestKit { * * @return the received object as transformed by the partial function */ - def expectMsg[T](max: Duration)(f: PartialFunction[Any, T]): T = { - val o = receiveOne(max) + @deprecated("use expectMsgPF instead", "1.2") + def expectMsg[T](max: Duration)(f: PartialFunction[Any, T]): T = expectMsgPF(max)(f) + + /** + * Receive one message from the test actor and assert that the given + * partial function accepts it. Wait time is bounded by the given duration, + * with an AssertionFailure being thrown in case of timeout. + * + * Use this variant to implement more complicated or conditional + * processing. + * + * @return the received object as transformed by the partial function + */ + def expectMsgPF[T](max: Duration = Duration.MinusInf)(f: PartialFunction[Any, T]): T = { + val _max = if (max eq Duration.MinusInf) remaining else max.dilated + val o = receiveOne(_max) assert(o ne null, "timeout during expectMsg") assert(f.isDefinedAt(o), "does not match: " + o) f(o) } /** - * Same as `expectMsgClass`, but takes the maximum wait time from the innermost - * enclosing `within` block. + * Same as `expectMsgClass(remaining, c)`, but correctly treating the timeFactor. */ - def expectMsgClass[C](c: Class[C]): C = expectMsgClass(remaining, c) + def expectMsgClass[C](c: Class[C]): C = expectMsgClass_internal(remaining, c) /** * Receive one message from the test actor and assert that it conforms to @@ -261,7 +314,9 @@ trait TestKit { * * @return the received object */ - def expectMsgClass[C](max: Duration, c: Class[C]): C = { + def expectMsgClass[C](max: Duration, c: Class[C]): C = expectMsgClass_internal(max.dilated, c) + + private def expectMsgClass_internal[C](max: Duration, c: Class[C]): C = { val o = receiveOne(max) assert(o ne null, "timeout during expectMsgClass") assert(c isInstance o, "expected " + c + ", found " + o.getClass) @@ -269,10 +324,9 @@ trait TestKit { } /** - * Same as `expectMsgAnyOf`, but takes the maximum wait time from the innermost - * enclosing `within` block. + * Same as `expectMsgAnyOf(remaining, obj...)`, but correctly treating the timeFactor. */ - def expectMsgAnyOf(obj: Any*): AnyRef = expectMsgAnyOf(remaining, obj: _*) + def expectMsgAnyOf[T](obj: T*): T = expectMsgAnyOf_internal(remaining, obj: _*) /** * Receive one message from the test actor and assert that it equals one of @@ -281,18 +335,19 @@ trait TestKit { * * @return the received object */ - def expectMsgAnyOf(max: Duration, obj: Any*): AnyRef = { + def expectMsgAnyOf[T](max: Duration, obj: T*): T = expectMsgAnyOf_internal(max.dilated, obj: _*) + + private def expectMsgAnyOf_internal[T](max: Duration, obj: T*): T = { val o = receiveOne(max) assert(o ne null, "timeout during expectMsgAnyOf") assert(obj exists (_ == o), "found unexpected " + o) - o + o.asInstanceOf[T] } /** - * Same as `expectMsgAnyClassOf`, but takes the maximum wait time from the innermost - * enclosing `within` block. + * Same as `expectMsgAnyClassOf(remaining, obj...)`, but correctly treating the timeFactor. */ - def expectMsgAnyClassOf(obj: Class[_]*): AnyRef = expectMsgAnyClassOf(remaining, obj: _*) + def expectMsgAnyClassOf[C](obj: Class[_ <: C]*): C = expectMsgAnyClassOf_internal(remaining, obj: _*) /** * Receive one message from the test actor and assert that it conforms to @@ -301,25 +356,26 @@ trait TestKit { * * @return the received object */ - def expectMsgAnyClassOf(max: Duration, obj: Class[_]*): AnyRef = { + def expectMsgAnyClassOf[C](max: Duration, obj: Class[_ <: C]*): C = expectMsgAnyClassOf_internal(max.dilated, obj: _*) + + private def expectMsgAnyClassOf_internal[C](max: Duration, obj: Class[_ <: C]*): C = { val o = receiveOne(max) assert(o ne null, "timeout during expectMsgAnyClassOf") assert(obj exists (_ isInstance o), "found unexpected " + o) - o + o.asInstanceOf[C] } /** - * Same as `expectMsgAllOf`, but takes the maximum wait time from the innermost - * enclosing `within` block. + * Same as `expectMsgAllOf(remaining, obj...)`, but correctly treating the timeFactor. */ - def expectMsgAllOf(obj: Any*) { expectMsgAllOf(remaining, obj: _*) } + def expectMsgAllOf[T](obj: T*): Seq[T] = expectMsgAllOf_internal(remaining, obj: _*) /** * Receive a number of messages from the test actor matching the given * number of objects and assert that for each given object one is received - * which equals it. This construct is useful when the order in which the - * objects are received is not fixed. Wait time is bounded by the given - * duration, with an AssertionFailure being thrown in case of timeout. + * which equals it and vice versa. This construct is useful when the order in + * which the objects are received is not fixed. Wait time is bounded by the + * given duration, with an AssertionFailure being thrown in case of timeout. * *
    * within(1 second) {
@@ -329,16 +385,19 @@ trait TestKit {
    * }
    * 
*/ - def expectMsgAllOf(max: Duration, obj: Any*) { - val recv = receiveN(obj.size, now + max) - assert(obj forall (x ⇒ recv exists (x == _)), "not found all") + def expectMsgAllOf[T](max: Duration, obj: T*): Seq[T] = expectMsgAllOf_internal(max.dilated, obj: _*) + + private def expectMsgAllOf_internal[T](max: Duration, obj: T*): Seq[T] = { + val recv = receiveN_internal(obj.size, max) + obj foreach (x ⇒ assert(recv exists (x == _), "not found " + x)) + recv foreach (x ⇒ assert(obj exists (x == _), "found unexpected " + x)) + recv.asInstanceOf[Seq[T]] } /** - * Same as `expectMsgAllClassOf`, but takes the maximum wait time from the innermost - * enclosing `within` block. + * Same as `expectMsgAllClassOf(remaining, obj...)`, but correctly treating the timeFactor. */ - def expectMsgAllClassOf(obj: Class[_]*) { expectMsgAllClassOf(remaining, obj: _*) } + def expectMsgAllClassOf[T](obj: Class[_ <: T]*): Seq[T] = expectMsgAllClassOf_internal(remaining, obj: _*) /** * Receive a number of messages from the test actor matching the given @@ -348,62 +407,68 @@ trait TestKit { * Wait time is bounded by the given duration, with an AssertionFailure * being thrown in case of timeout. */ - def expectMsgAllClassOf(max: Duration, obj: Class[_]*) { - val recv = receiveN(obj.size, now + max) - assert(obj forall (x ⇒ recv exists (_.getClass eq x)), "not found all") + def expectMsgAllClassOf[T](max: Duration, obj: Class[_ <: T]*): Seq[T] = expectMsgAllClassOf_internal(max.dilated, obj: _*) + + private def expectMsgAllClassOf_internal[T](max: Duration, obj: Class[_ <: T]*): Seq[T] = { + val recv = receiveN_internal(obj.size, max) + obj foreach (x ⇒ assert(recv exists (_.getClass eq x), "not found " + x)) + recv foreach (x ⇒ assert(obj exists (_ eq x.getClass), "found non-matching object " + x)) + recv.asInstanceOf[Seq[T]] } /** - * Same as `expectMsgAllConformingOf`, but takes the maximum wait time from the innermost - * enclosing `within` block. + * Same as `expectMsgAllConformingOf(remaining, obj...)`, but correctly treating the timeFactor. */ - def expectMsgAllConformingOf(obj: Class[_]*) { expectMsgAllClassOf(remaining, obj: _*) } + def expectMsgAllConformingOf[T](obj: Class[_ <: T]*): Seq[T] = expectMsgAllClassOf_internal(remaining, obj: _*) /** * Receive a number of messages from the test actor matching the given * number of classes and assert that for each given class one is received - * which conforms to that class. This construct is useful when the order in - * which the objects are received is not fixed. Wait time is bounded by - * the given duration, with an AssertionFailure being thrown in case of - * timeout. + * which conforms to that class (and vice versa). This construct is useful + * when the order in which the objects are received is not fixed. Wait time + * is bounded by the given duration, with an AssertionFailure being thrown in + * case of timeout. * * Beware that one object may satisfy all given class constraints, which * may be counter-intuitive. */ - def expectMsgAllConformingOf(max: Duration, obj: Class[_]*) { - val recv = receiveN(obj.size, now + max) - assert(obj forall (x ⇒ recv exists (x isInstance _)), "not found all") + def expectMsgAllConformingOf[T](max: Duration, obj: Class[_ <: T]*): Seq[T] = expectMsgAllConformingOf(max.dilated, obj: _*) + + private def expectMsgAllConformingOf_internal[T](max: Duration, obj: Class[_ <: T]*): Seq[T] = { + val recv = receiveN_internal(obj.size, max) + obj foreach (x ⇒ assert(recv exists (x isInstance _), "not found " + x)) + recv foreach (x ⇒ assert(obj exists (_ isInstance x), "found non-matching object " + x)) + recv.asInstanceOf[Seq[T]] } /** - * Same as `expectNoMsg`, but takes the maximum wait time from the innermost - * enclosing `within` block. + * Same as `expectNoMsg(remaining)`, but correctly treating the timeFactor. */ - def expectNoMsg { expectNoMsg(remaining) } + def expectNoMsg { expectNoMsg_internal(remaining) } /** * Assert that no message is received for the specified time. */ - def expectNoMsg(max: Duration) { + def expectNoMsg(max: Duration) { expectNoMsg_internal(max.dilated) } + + private def expectNoMsg_internal(max: Duration) { val o = receiveOne(max) assert(o eq null, "received unexpected message " + o) lastSoftTimeout = now } /** - * Same as `receiveWhile`, but takes the maximum wait time from the innermost - * enclosing `within` block. + * Same as `receiveWhile(remaining)(f)`, but correctly treating the timeFactor. */ - def receiveWhile[T](f: PartialFunction[AnyRef, T]): Seq[T] = receiveWhile(remaining)(f) + @deprecated("insert empty first parameter list", "1.2") + def receiveWhile[T](f: PartialFunction[AnyRef, T]): Seq[T] = receiveWhile(remaining / Duration.timeFactor)(f) /** - * Receive a series of messages as long as the given partial function - * accepts them or the idle timeout is met or the overall maximum duration - * is elapsed. Returns the sequence of messages. + * Receive a series of messages until one does not match the given partial + * function or the idle timeout is met (disabled by default) or the overall + * maximum duration is elapsed. Returns the sequence of messages. * - * Beware that the maximum duration is not implicitly bounded by or taken - * from the innermost enclosing `within` block, as it is not an error to - * hit the `max` duration in this case. + * Note that it is not an error to hit the `max` duration in this case. * * One possible use of this method is for testing whether messages of * certain characteristics are generated at a certain rate: @@ -416,13 +481,13 @@ trait TestKit { * assert(series == (1 to 7).toList) *
*/ - def receiveWhile[T](max: Duration)(f: PartialFunction[AnyRef, T]): Seq[T] = { - val stop = now + max + def receiveWhile[T](max: Duration = Duration.MinusInf, idle: Duration = Duration.Inf)(f: PartialFunction[AnyRef, T]): Seq[T] = { + val stop = now + (if (max == Duration.MinusInf) remaining else max.dilated) var msg: Message = NullMessage @tailrec def doit(acc: List[T]): List[T] = { - receiveOne(stop - now) + receiveOne((stop - now) min idle) lastMessage match { case NullMessage ⇒ lastMessage = msg @@ -442,10 +507,19 @@ trait TestKit { ret } + /** + * Same as `receiveN(n, remaining)` but correctly taking into account + * Duration.timeFactor. + */ + def receiveN(n: Int): Seq[AnyRef] = receiveN_internal(n, remaining) + /** * Receive N messages in a row before the given deadline. */ - def receiveN(n: Int, stop: Duration): Seq[AnyRef] = { + def receiveN(n: Int, max: Duration): Seq[AnyRef] = receiveN_internal(n, max.dilated) + + private def receiveN_internal(n: Int, max: Duration): Seq[AnyRef] = { + val stop = max + now for { x ← 1 to n } yield { val timeout = stop - now val o = receiveOne(timeout) @@ -457,6 +531,8 @@ trait TestKit { /** * Receive one message from the internal queue of the TestActor. If the given * duration is zero, the queue is polled (non-blocking). + * + * This method does NOT automatically scale its Duration parameter! */ def receiveOne(max: Duration): AnyRef = { val message = @@ -480,6 +556,14 @@ trait TestKit { private def format(u: TimeUnit, d: Duration) = "%.3f %s".format(d.toUnit(u), u.toString.toLowerCase) } +object TestKit { + private[testkit] val testActorId = new AtomicInteger(0) +} + +trait TestKit extends TestKitLight { + implicit val self = testActor +} + /** * TestKit-based probe which allows sending, reception and reply. */ diff --git a/akka-actor-tests/src/main/scala/akka/testing/TestLatch.scala b/akka-testkit/src/main/scala/akka/testkit/TestLatch.scala similarity index 94% rename from akka-actor-tests/src/main/scala/akka/testing/TestLatch.scala rename to akka-testkit/src/main/scala/akka/testkit/TestLatch.scala index 239aa6987f..7240149c45 100644 --- a/akka-actor-tests/src/main/scala/akka/testing/TestLatch.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestLatch.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009-2011 Scalable Solutions AB */ -package akka.testing +package akka.testkit import akka.util.Duration import java.util.concurrent.{ CountDownLatch, TimeUnit } @@ -35,7 +35,7 @@ class TestLatch(count: Int = 1) { def await(timeout: Duration): Boolean = { val opened = latch.await(Testing.testTime(timeout.toNanos), TimeUnit.NANOSECONDS) if (!opened) throw new TestLatchTimeoutException( - "Timeout of %s with time factor of %s" format (timeout.toString, Testing.timeFactor)) + "Timeout of %s with time factor of %s" format (timeout.toString, Duration.timeFactor)) opened } @@ -45,7 +45,7 @@ class TestLatch(count: Int = 1) { def awaitTimeout(timeout: Duration = TestLatch.DefaultTimeout) = { val opened = latch.await(Testing.testTime(timeout.toNanos), TimeUnit.NANOSECONDS) if (opened) throw new TestLatchNoTimeoutException( - "Latch opened before timeout of %s with time factor of %s" format (timeout.toString, Testing.timeFactor)) + "Latch opened before timeout of %s with time factor of %s" format (timeout.toString, Duration.timeFactor)) opened } diff --git a/akka-actor-tests/src/main/scala/akka/testing/Testing.scala b/akka-testkit/src/main/scala/akka/testkit/Testing.scala similarity index 57% rename from akka-actor-tests/src/main/scala/akka/testing/Testing.scala rename to akka-testkit/src/main/scala/akka/testkit/Testing.scala index 730f35f840..889b87920b 100644 --- a/akka-actor-tests/src/main/scala/akka/testing/Testing.scala +++ b/akka-testkit/src/main/scala/akka/testkit/Testing.scala @@ -2,32 +2,20 @@ * Copyright (C) 2009-2011 Scalable Solutions AB */ -package akka.testing +package akka.testkit import akka.util.Duration +import Duration.timeFactor /** * Multiplying numbers used in test timeouts by a factor, set by system property. * Useful for Jenkins builds (where the machine may need more time). */ object Testing { - val timeFactor: Double = { - val factor = System.getProperty("akka.test.timefactor", "1.0") - try { - factor.toDouble - } catch { - case e: java.lang.NumberFormatException ⇒ 1.0 - } - } - def testTime(t: Int): Int = (timeFactor * t).toInt def testTime(t: Long): Long = (timeFactor * t).toLong def testTime(t: Float): Float = (timeFactor * t).toFloat def testTime(t: Double): Double = timeFactor * t - def testSeconds(duration: Duration) = testTime(duration.toSeconds) - def testMillis(duration: Duration) = testTime(duration.toMillis) - def testNanos(duration: Duration) = testTime(duration.toNanos) - def sleepFor(duration: Duration) = Thread.sleep(testTime(duration.toMillis)) } diff --git a/akka-testkit/src/test/scala/akka/testkit/TestFSMRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestFSMRefSpec.scala new file mode 100644 index 0000000000..67fdadc529 --- /dev/null +++ b/akka-testkit/src/test/scala/akka/testkit/TestFSMRefSpec.scala @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.testkit + +import org.scalatest.matchers.MustMatchers +import org.scalatest.{ BeforeAndAfterEach, WordSpec } +import akka.actor._ +import akka.util.duration._ + +class TestFSMRefSpec extends WordSpec with MustMatchers with TestKit { + + import FSM._ + + "A TestFSMRef" must { + + "allow access to state data" in { + val fsm = TestFSMRef(new Actor with FSM[Int, String] { + startWith(1, "") + when(1) { + case Ev("go") ⇒ goto(2) using "go" + case Ev(StateTimeout) ⇒ goto(2) using "timeout" + } + when(2) { + case Ev("back") ⇒ goto(1) using "back" + } + }).start() + fsm.stateName must be(1) + fsm.stateData must be("") + fsm ! "go" + fsm.stateName must be(2) + fsm.stateData must be("go") + fsm.setState(stateName = 1) + fsm.stateName must be(1) + fsm.stateData must be("go") + fsm.setState(stateData = "buh") + fsm.stateName must be(1) + fsm.stateData must be("buh") + fsm.setState(timeout = 100 millis) + within(80 millis, 500 millis) { + awaitCond(fsm.stateName == 2 && fsm.stateData == "timeout") + } + } + + "allow access to timers" in { + val fsm = TestFSMRef(new Actor with FSM[Int, Null] { + startWith(1, null) + when(1) { + case x ⇒ stay + } + }) + fsm.timerActive_?("test") must be(false) + fsm.setTimer("test", 12, 10 millis, true) + fsm.timerActive_?("test") must be(true) + fsm.cancelTimer("test") + fsm.timerActive_?("test") must be(false) + } + + } + +} diff --git a/akka-testkit/src/test/scala/akka/testkit/TestTimeSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestTimeSpec.scala new file mode 100644 index 0000000000..55d19288c9 --- /dev/null +++ b/akka-testkit/src/test/scala/akka/testkit/TestTimeSpec.scala @@ -0,0 +1,36 @@ +package akka.testkit + +import org.scalatest.matchers.MustMatchers +import org.scalatest.{ BeforeAndAfterEach, WordSpec } +import akka.util.Duration + +class TestTimeSpec extends WordSpec with MustMatchers with BeforeAndAfterEach { + + val tf = Duration.timeFactor + + override def beforeEach { + val f = Duration.getClass.getDeclaredField("timeFactor") + f.setAccessible(true) + f.setDouble(Duration, 2.0) + } + + override def afterEach { + val f = Duration.getClass.getDeclaredField("timeFactor") + f.setAccessible(true) + f.setDouble(Duration, tf) + } + + "A TestKit" must { + + "correctly dilate times" in { + val probe = TestProbe() + val now = System.nanoTime + intercept[AssertionError] { probe.awaitCond(false, Duration("1 second")) } + val diff = System.nanoTime - now + diff must be > 1700000000l + diff must be < 3000000000l + } + + } + +} diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 85a14d243c..0ca0a661ff 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -99,6 +99,14 @@ akka { # (in unit defined by the time-unit property) } + debug { + receive = "false" # enable function of Actor.loggable(), which is + # to log any received message at DEBUG level + autoreceive = "false" # enable DEBUG logging of all AutoReceiveMessages + # (Kill, PoisonPill and the like) + lifecycle = "false" # enable DEBUG logging of actor lifecycle changes + } + mailbox { file-based { @@ -244,4 +252,8 @@ akka { expired-header-name = "Async-Timeout" # the name of the response header to use when an async request expires expired-header-value = "expired" # the value of the response header to use when an async request expires } + + test { + timefactor = "1.0" # factor by which to scale timeouts during tests, e.g. to account for shared build system load + } }