From e7f3945776369cf6fbff1e0dee76981c0bede343 Mon Sep 17 00:00:00 2001 From: Roland Date: Sun, 26 Jun 2011 17:40:30 +0200 Subject: [PATCH 1/6] add TestKit.expectMsgType - plus docs - and replace softTimeout hack with better solution --- akka-docs/scala/testing.rst | 21 +++++++--- .../src/main/scala/akka/testkit/TestKit.scala | 39 ++++++++++++------- 2 files changed, 41 insertions(+), 19 deletions(-) diff --git a/akka-docs/scala/testing.rst b/akka-docs/scala/testing.rst index f3bb07c5bb..e8b20aa830 100644 --- a/akka-docs/scala/testing.rst +++ b/akka-docs/scala/testing.rst @@ -288,6 +288,13 @@ assertions concerning received messages. Here is the full list: does a conformance check; if you need the class to be equal, have a look at :meth:`expectMsgAllClassOf` with a single given class argument. + * :meth:`expectMsgType[T: Manifest](d: Duration)` + + An object which is an instance of the given type (after erasure) must be + received within the allotted time frame; the object will be returned. This + method is approximately equivalent to + ``expectMsgClass(manifest[T].erasure)``. + * :meth:`expectMsgAnyOf[T](d: Duration, obj: T*): T` An object must be received within the given time, and it must be equal ( @@ -398,21 +405,25 @@ is between :obj:`min` and :obj:`max`, where the former defaults to zero. The deadline calculated by adding the :obj:`max` parameter to the block's start time is implicitly available within the block to all examination methods, if you do not specify it, is is inherited from the innermost enclosing -:meth:`within` block. It should be noted that using :meth:`expectNoMsg` will -terminate upon reception of a message or at the deadline, whichever occurs -first; it follows that this examination usually is the last statement in a :meth:`within` block. +It should be noted that if the last message-receiving assertion of the block is +:meth:`expectNoMsg` or :meth:`receiveWhile`, the final check of the +:meth:`within` is skipped in order to avoid false positives due to wake-up +latencies. This means that while individual contained assertions still use the +maximum time bound, the overall block may take arbitrarily longer in this case. + .. code-block:: scala class SomeSpec extends WordSpec with MustMatchers with TestKit { "A Worker" must { "send timely replies" in { val worker = actorOf(...) - within (50 millis) { + within (500 millis) { worker ! "some work" expectMsg("some result") - expectNoMsg + expectNoMsg // will block for the rest of the 500ms + Thread.sleep(1000) // will NOT make this block fail } } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index bb9f69168b..10727ffc7c 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -110,13 +110,12 @@ trait TestKitLight { val senderOption = Some(testActor) private var end: Duration = Duration.Inf - /* - * THIS IS A HACK: expectNoMsg and receiveWhile are bounded by `end`, but - * running them should not trigger an AssertionError, so mark their end - * time here and do not fail at the end of `within` if that time is not - * long gone. + + /** + * if last assertion was expectNoMsg, disable timing failure upon within() + * block end. */ - private var lastSoftTimeout: Duration = now - 5.millis + private var lastWasNoMsg = false /** * Stop test actor. Should be done at the end of the test unless relying on @@ -211,6 +210,8 @@ trait TestKitLight { val rem = end - start assert(rem >= min, "required min time " + min + " not possible, only " + format(min.unit, rem) + " left") + lastWasNoMsg = false + val max_diff = _max min rem val prev_end = end end = start + max_diff @@ -219,13 +220,8 @@ trait TestKitLight { val diff = now - start assert(min <= diff, "block took " + format(min.unit, diff) + ", should at least have been " + min) - /* - * caution: HACK AHEAD - */ - if (now - lastSoftTimeout > 5.millis) { + if (!lastWasNoMsg) { assert(diff <= max_diff, "block took " + format(_max.unit, diff) + ", exceeding " + format(_max.unit, max_diff)) - } else { - lastSoftTimeout -= 5.millis } ret @@ -302,6 +298,20 @@ trait TestKitLight { f(o) } + /** + * Same as `expectMsgType[T](remaining)`, but correctly treating the timeFactor. + */ + def expectMsgType[T](implicit m: Manifest[T]): T = expectMsgClass_internal(remaining, m.erasure.asInstanceOf[Class[T]]) + + /** + * Receive one message from the test actor and assert that it conforms to the + * given type (after erasure). Wait time is bounded by the given duration, + * with an AssertionFailure being thrown in case of timeout. + * + * @return the received object + */ + def expectMsgType[T](max: Duration)(implicit m: Manifest[T]): T = expectMsgClass_internal(max.dilated, m.erasure.asInstanceOf[Class[T]]) + /** * Same as `expectMsgClass(remaining, c)`, but correctly treating the timeFactor. */ @@ -454,7 +464,7 @@ trait TestKitLight { private def expectNoMsg_internal(max: Duration) { val o = receiveOne(max) assert(o eq null, "received unexpected message " + o) - lastSoftTimeout = now + lastWasNoMsg = true } /** @@ -503,7 +513,7 @@ trait TestKitLight { } val ret = doit(Nil) - lastSoftTimeout = now + lastWasNoMsg = true ret } @@ -543,6 +553,7 @@ trait TestKitLight { } else { queue.takeFirst } + lastWasNoMsg = false message match { case null ⇒ lastMessage = NullMessage From 48feec0bbb1d81d74e7706f553638a217933da53 Mon Sep 17 00:00:00 2001 From: Roland Date: Sun, 26 Jun 2011 21:33:27 +0200 Subject: [PATCH 2/6] add debug output to investigate cause of RoutingSpec sporadic failures "smallest mailbox" test fails randomly on my notebook --- .../src/test/scala/akka/routing/RoutingSpec.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 3544767453..52ceb098bd 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -10,6 +10,7 @@ import akka.util.duration._ import akka.actor._ import akka.actor.Actor._ import akka.routing._ +import akka.event.EventHandler import java.util.concurrent.atomic.AtomicInteger import akka.dispatch.{ KeptPromise, Future } @@ -122,9 +123,11 @@ class RoutingSpec extends WordSpec with MustMatchers { latch.await(10 seconds) } finally { // because t1 is much slower and thus has a bigger mailbox all the time - t1Count.get must be < (t2Count.get) + EventHandler.info(this, "t1=" + t1Count + " t2=" + t2Count) } + t1Count.get must be < t2Count.get + for (a ← List(t1, t2, d)) a.stop() } From 10256991cbd1e52fbebab5d426dee7331e786652 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 27 Jun 2011 19:09:09 +0200 Subject: [PATCH 3/6] add Actor.freshInstance hook, test #955 --- .../akka/actor/actor/ActorRestartSpec.scala | 148 ++++++++++++++++++ .../src/main/scala/akka/actor/Actor.scala | 12 ++ .../src/main/scala/akka/actor/ActorRef.scala | 15 +- akka-docs/scala/actors.rst | 116 +++++++++----- 4 files changed, 253 insertions(+), 38 deletions(-) create mode 100644 akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala new file mode 100644 index 0000000000..3f1398e596 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala @@ -0,0 +1,148 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.actor + +import org.scalatest.{ WordSpec, BeforeAndAfterAll, BeforeAndAfterEach } +import org.scalatest.matchers.MustMatchers + +import Actor.actorOf +import akka.testkit._ +import akka.util.duration._ +import akka.config.Supervision.OneForOneStrategy + +import java.util.concurrent.atomic._ + +object ActorRestartSpec { + + private var _gen = new AtomicInteger(0) + def generation = _gen.incrementAndGet + def generation_=(x: Int) { _gen.set(x) } + + sealed trait RestartType + case object Normal extends RestartType + case object Nested extends RestartType + case object Handover extends RestartType + case object Fail extends RestartType + + class Restarter(val testActor: ActorRef) extends Actor { + val gen = generation + var xx = 0 + var restart: RestartType = Normal + def receive = { + case x: Int ⇒ xx = x + case t: RestartType ⇒ restart = t + case "get" ⇒ self reply xx + } + override def preStart { testActor ! (("preStart", gen)) } + override def postStop { testActor ! (("postStop", gen)) } + override def preRestart(cause: Throwable) { testActor ! (("preRestart", gen)) } + override def postRestart(cause: Throwable) { testActor ! (("postRestart", gen)) } + override def freshInstance() = { + restart match { + case Normal ⇒ None + case Nested ⇒ + val ref = TestActorRef(new Actor { + def receive = { case _ ⇒ } + override def preStart { testActor ! ((this, self)) } + }).start() + testActor ! ((ref.underlyingActor, ref)) + None + case Handover ⇒ + val fresh = new Restarter(testActor) + fresh.xx = xx + Some(fresh) + case Fail ⇒ + throw new IllegalActorStateException("expected") + } + } + } + + class Supervisor extends Actor { + self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), 5, 5000) + def receive = { + case _ ⇒ + } + } +} + +class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with BeforeAndAfterEach { + import ActorRestartSpec._ + + override def beforeEach { generation = 0 } + + "An Actor restart" must { + + "invoke preRestart, preStart, postRestart" in { + val actor = actorOf(new Restarter(testActor)).start() + expectMsg(1 second, ("preStart", 1)) + val supervisor = actorOf[Supervisor].start() + supervisor link actor + actor ! Kill + within(1 second) { + expectMsg(("preRestart", 1)) + expectMsg(("preStart", 2)) + expectMsg(("postRestart", 2)) + expectNoMsg + } + } + + "support creation of nested actors in freshInstance()" in { + val actor = actorOf(new Restarter(testActor)).start() + expectMsg(1 second, ("preStart", 1)) + val supervisor = actorOf[Supervisor].start() + supervisor link actor + actor ! Nested + actor ! Kill + within(1 second) { + expectMsg(("preRestart", 1)) + val (tActor, tRef) = expectMsgType[(Actor, TestActorRef[Actor])] + tRef.underlyingActor must be(tActor) + expectMsg((tActor, tRef)) + tRef.stop() + expectMsg(("preStart", 2)) + expectMsg(("postRestart", 2)) + expectNoMsg + } + } + + "use freshInstance() if available" in { + val actor = actorOf(new Restarter(testActor)).start() + expectMsg(1 second, ("preStart", 1)) + val supervisor = actorOf[Supervisor].start() + supervisor link actor + actor ! 42 + actor ! Handover + actor ! Kill + within(1 second) { + expectMsg(("preRestart", 1)) + expectMsg(("preStart", 2)) + expectMsg(("postRestart", 2)) + expectNoMsg + } + actor ! "get" + expectMsg(1 second, 42) + } + + "fall back to default factory if freshInstance() fails" in { + val actor = actorOf(new Restarter(testActor)).start() + expectMsg(1 second, ("preStart", 1)) + val supervisor = actorOf[Supervisor].start() + supervisor link actor + actor ! 42 + actor ! Fail + actor ! Kill + within(1 second) { + expectMsg(("preRestart", 1)) + expectMsg(("preStart", 2)) + expectMsg(("postRestart", 2)) + expectNoMsg + } + actor ! "get" + expectMsg(1 second, 0) + } + + } + +} diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 0ec0616acb..9925c060a5 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -662,6 +662,18 @@ trait Actor { */ def preRestart(reason: Throwable) {} + /** + * User overridable callback. + *

+ * Is called on the crashed Actor to give it the option of producing the + * Actor's reincarnation. If it returns None, which is the default, the + * initially provided actor factory is used. + *

+ * Warning: Propagating state from a crashed actor carries the risk + * of proliferating the cause of the error. Consider let-it-crash first. + */ + def freshInstance(): Option[Actor] = None + /** * User overridable callback. *

diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 17fa3139dd..5e3c5fd3cf 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -845,7 +845,20 @@ class LocalActorRef private[akka] ( val stackBefore = refStack.get refStack.set(stackBefore.push(this)) try { - actorFactory() + if (_status == ActorRefInternals.BEING_RESTARTED) { + val a = actor + val fresh = try a.freshInstance catch { + case e ⇒ + EventHandler.error(e, a, "freshInstance() failed, falling back to initial actor factory") + None + } + fresh match { + case Some(ref) ⇒ ref + case None ⇒ actorFactory() + } + } else { + actorFactory() + } } finally { val stackAfter = refStack.get if (stackAfter.nonEmpty) diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index eb25cb2d1a..ad9fbf2756 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -92,6 +92,83 @@ Here we create a light-weight actor-based thread, that can be used to spawn off ... // do stuff } +Actor Internal API +------------------ + +The :class:`Actor` trait defines only one abstract method, the abovementioned +:meth:`receive`. In addition, it offers two convenience methods +:meth:`become`/:meth:`unbecome` for modifying the hotswap behavior stack as +described in :ref:`Actor.HotSwap` and the :obj:`self` reference to this actor’s +:class:`ActorRef` object. The remaining visible methods are user-overridable +life-cycle hooks which are described in the following:: + + def preStart() {} + def preRestart(cause: Throwable) {} + def freshInstance(): Option[Actor] = None + def postRestart(cause: Throwable) {} + def postStop() {} + +The implementations shown above are the defaults provided by the :class:`Actor` +trait. + +Start Hook +^^^^^^^^^^ + +Right after starting the actor, its :meth:`preStart` method is invoked. This is +guaranteed to happen before the first message from external sources is queued +to the actor’s mailbox. + +:: + + override def preStart { + // e.g. send initial message to self + self ! GetMeStarted + // or do any other stuff, e.g. registering with other actors + someService ! Register(self) + } + +Restart Hooks +^^^^^^^^^^^^^ + +A supervised actor, i.e. one which is linked to another actor with a fault +handling strategy, will be restarted in case an exception is thrown while +processing a message. This restart involves four of the hooks mentioned above: + +1. The old actor is informed by calling :meth:`preRestart` with the exception + which caused the restart; this method is the best place for cleaning up, + preparing hand-over to the fresh actor instance, etc. +2. The old actor’s :meth:`freshInstance` factory method is invoked, which may + optionally produce the new actor instance which will replace this actor. If + this method returns :obj:`None` or throws an exception, the initial factory + from the ``Actor.actorOf`` call is used to produce the fresh instance. +3. The new actor’s :meth:`preStart` method is invoked, just as in the normal + start-up case. +4. The new actor’s :meth:`postRestart` method is called with the exception + which caused the restart. + +.. warning:: + + The :meth:`freshInstance` hook may be used to propagate (part of) the failed + actor’s state to the fresh instance. This carries the risk of proliferating + the cause for the crash which triggered the restart. If you are tempted to + take this route, it is strongly advised to step back and consider other + possible approaches, e.g. distributing the state in question using other + means or spawning short-lived worker actors for carrying out “risky” tasks. + +An actor restart replaces only the actual actor object; the contents of the +mailbox and the hotswap stack are unaffected by the restart, so processing of +messages will resume after the :meth:`postRestart` hook returns. Any message +sent to an actor which is being restarted will be queued to its mailbox as +usual. + +Stop Hook +^^^^^^^^^ + +After stopping an actor, its :meth:`postStop` hook is called, which may be used +e.g. for deregistering this actor from other services. This hook is guaranteed +to run after message queuing has been disabled for this actor, i.e. sending +messages would fail with an :class:`IllegalActorStateException`. + Identifying Actors ------------------ @@ -252,43 +329,6 @@ This method should return a ``PartialFunction``, e.g. a ‘match/case’ clause } } -Actor internal API ------------------- - -The Actor trait contains almost no member fields or methods to invoke, you just use the Actor trait to implement the: - -#. ``receive`` message handler -#. life-cycle callbacks: - - #. preStart - #. postStop - #. preRestart - #. postRestart - -The ``Actor`` trait has one single member field: - -.. code-block:: scala - - val self: ActorRef - -This ``self`` field holds a reference to its ``ActorRef`` and it is this reference you want to access the Actor's API. Here, for example, you find methods to reply to messages, send yourself messages, define timeouts, fault tolerance etc., start and stop etc. - -However, for convenience you can import these functions and fields like below, which will allow you do drop the ``self`` prefix: - -.. code-block:: scala - - class MyActor extends Actor { - import self._ - id = ... - dispatcher = ... - start - ... - } - -But in this documentation we will always prefix the calls with ``self`` for clarity. - -Let's start by looking how we can reply to messages in a convenient way using this ``ActorRef`` API. - Reply to messages ----------------- @@ -441,6 +481,8 @@ You can also send an actor the ``akka.actor.PoisonPill`` message, which will sto If the sender is a ``Future`` (e.g. the message is sent with ``?``), the ``Future`` will be completed with an ``akka.actor.ActorKilledException("PoisonPill")``. +.. _Actor.HotSwap: + HotSwap ------- From 956d055d872ec19f8944b217a89a60e220636bef Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 27 Jun 2011 22:20:09 +0200 Subject: [PATCH 4/6] make currentMessage available in preRestart, test #957 - use Option[Any] since currentMessage may be unavailable (supervisor being restarted) - plus docs --- .../akka/actor/actor/ActorRestartSpec.scala | 41 +++++++++++++++---- .../src/main/scala/akka/actor/Actor.scala | 13 +++++- .../src/main/scala/akka/actor/ActorRef.scala | 3 +- akka-docs/scala/actors.rst | 17 +++++--- 4 files changed, 58 insertions(+), 16 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala index 3f1398e596..2b52c5cd4f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala @@ -36,7 +36,6 @@ object ActorRestartSpec { case "get" ⇒ self reply xx } override def preStart { testActor ! (("preStart", gen)) } - override def postStop { testActor ! (("postStop", gen)) } override def preRestart(cause: Throwable) { testActor ! (("preRestart", gen)) } override def postRestart(cause: Throwable) { testActor ! (("postRestart", gen)) } override def freshInstance() = { @@ -71,13 +70,21 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo import ActorRestartSpec._ override def beforeEach { generation = 0 } + override def afterEach { toStop foreach (_.stop()) } + + private var toStop = List.empty[ActorRef] + private def newActor(f: ⇒ Actor): ActorRef = { + val ref = actorOf(f) + toStop ::= ref + ref.start() + } "An Actor restart" must { "invoke preRestart, preStart, postRestart" in { - val actor = actorOf(new Restarter(testActor)).start() + val actor = newActor(new Restarter(testActor)) expectMsg(1 second, ("preStart", 1)) - val supervisor = actorOf[Supervisor].start() + val supervisor = newActor(new Supervisor) supervisor link actor actor ! Kill within(1 second) { @@ -89,9 +96,9 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo } "support creation of nested actors in freshInstance()" in { - val actor = actorOf(new Restarter(testActor)).start() + val actor = newActor(new Restarter(testActor)) expectMsg(1 second, ("preStart", 1)) - val supervisor = actorOf[Supervisor].start() + val supervisor = newActor(new Supervisor) supervisor link actor actor ! Nested actor ! Kill @@ -108,9 +115,9 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo } "use freshInstance() if available" in { - val actor = actorOf(new Restarter(testActor)).start() + val actor = newActor(new Restarter(testActor)) expectMsg(1 second, ("preStart", 1)) - val supervisor = actorOf[Supervisor].start() + val supervisor = newActor(new Supervisor) supervisor link actor actor ! 42 actor ! Handover @@ -126,9 +133,9 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo } "fall back to default factory if freshInstance() fails" in { - val actor = actorOf(new Restarter(testActor)).start() + val actor = newActor(new Restarter(testActor)) expectMsg(1 second, ("preStart", 1)) - val supervisor = actorOf[Supervisor].start() + val supervisor = newActor(new Supervisor) supervisor link actor actor ! 42 actor ! Fail @@ -143,6 +150,22 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo expectMsg(1 second, 0) } + "call preRestart(cause, currentMessage) if defined" in { + val actor = newActor(new Actor { + def receive = { case _ ⇒ } + override def preRestart(cause: Throwable, currentMessage: Option[Any]) { + testActor ! (("preRestart", currentMessage)) + } + }) + val supervisor = newActor(new Supervisor) + supervisor link actor + actor ! Kill + within(1 second) { + expectMsg(("preRestart", Some(Kill))) + expectNoMsg + } + } + } } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 9925c060a5..bb4bb47327 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -658,10 +658,21 @@ trait Actor { /** * User overridable callback. *

- * Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated. + * Is called on a crashed Actor right BEFORE it is restarted to allow clean + * up of resources before Actor is terminated. Override either the variant + * with or without the currentMessage argument. */ def preRestart(reason: Throwable) {} + /** + * User overridable callback. + *

+ * Is called on a crashed Actor right BEFORE it is restarted to allow clean + * up of resources before Actor is terminated. Override either the variant + * with or without the currentMessage argument. + */ + def preRestart(reason: Throwable, message: Option[Any]) { preRestart(reason) } + /** * User overridable callback. *

diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 5e3c5fd3cf..1287324288 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -766,7 +766,8 @@ class LocalActorRef private[akka] ( def performRestart() { val failedActor = actorInstance.get if (Actor.debugLifecycle) EventHandler.debug(failedActor, "restarting") - failedActor.preRestart(reason) + val message = if (currentMessage ne null) Some(currentMessage.message) else None + failedActor.preRestart(reason, message) val freshActor = newActor setActorSelfFields(failedActor, null) // Only null out the references if we could instantiate the new actor actorInstance.set(freshActor) // Assign it here so if preStart fails, we can null out the sef-refs next call diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index ad9fbf2756..b23d53385e 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -99,11 +99,15 @@ The :class:`Actor` trait defines only one abstract method, the abovementioned :meth:`receive`. In addition, it offers two convenience methods :meth:`become`/:meth:`unbecome` for modifying the hotswap behavior stack as described in :ref:`Actor.HotSwap` and the :obj:`self` reference to this actor’s -:class:`ActorRef` object. The remaining visible methods are user-overridable -life-cycle hooks which are described in the following:: +:class:`ActorRef` object. If the current actor behavior does not match a +received message, :meth:`unhandled` is called, which by default throws an +:class:`UnhandledMessageException`. + +The remaining visible methods are user-overridable life-cycle hooks which are +described in the following:: def preStart() {} - def preRestart(cause: Throwable) {} + def preRestart(cause: Throwable, message: Option[Any]) {} def freshInstance(): Option[Actor] = None def postRestart(cause: Throwable) {} def postStop() {} @@ -135,7 +139,10 @@ handling strategy, will be restarted in case an exception is thrown while processing a message. This restart involves four of the hooks mentioned above: 1. The old actor is informed by calling :meth:`preRestart` with the exception - which caused the restart; this method is the best place for cleaning up, + which caused the restart and the message which triggered that exception; the + latter may be ``None`` if the restart was not caused by processing a + message, e.g. when a supervisor does not trap the exception and is restarted + in turn by its supervisor. This method is the best place for cleaning up, preparing hand-over to the fresh actor instance, etc. 2. The old actor’s :meth:`freshInstance` factory method is invoked, which may optionally produce the new actor instance which will replace this actor. If @@ -158,7 +165,7 @@ processing a message. This restart involves four of the hooks mentioned above: An actor restart replaces only the actual actor object; the contents of the mailbox and the hotswap stack are unaffected by the restart, so processing of messages will resume after the :meth:`postRestart` hook returns. Any message -sent to an actor which is being restarted will be queued to its mailbox as +sent to an actor while it is being restarted will be queued to its mailbox as usual. Stop Hook From 00b9166b577ef6a270cfe4eb6bdfdb4049bc8f31 Mon Sep 17 00:00:00 2001 From: Roland Date: Sun, 3 Jul 2011 20:38:06 +0200 Subject: [PATCH 5/6] make ActorRestartSpec thread safe --- .../scala/akka/actor/actor/ActorRestartSpec.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala index 2b52c5cd4f..cdff70c461 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala @@ -70,12 +70,18 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo import ActorRestartSpec._ override def beforeEach { generation = 0 } - override def afterEach { toStop foreach (_.stop()) } + override def afterEach { + val it = toStop.iterator + while (it.hasNext) { + it.next.stop() + it.remove + } + } - private var toStop = List.empty[ActorRef] + private var toStop = new java.util.concurrent.ConcurrentSkipListSet[ActorRef] private def newActor(f: ⇒ Actor): ActorRef = { val ref = actorOf(f) - toStop ::= ref + toStop add ref ref.start() } From 5de2ca7aa55df94f7591c78f1a5b7cdd95f5068e Mon Sep 17 00:00:00 2001 From: Roland Date: Sun, 3 Jul 2011 22:19:34 +0200 Subject: [PATCH 6/6] remove Actor.preRestart(cause: Throwable) - adapt all internal uses (all tests green) - start migration guide for 2.0 with this change --- .../scala/akka/actor/actor/ActorRefSpec.scala | 2 +- .../akka/actor/actor/ActorRestartSpec.scala | 26 ++++--------------- .../akka/actor/actor/FSMTransitionSpec.scala | 2 +- .../actor/supervisor/SupervisorTreeSpec.scala | 2 +- .../akka/actor/supervisor/Ticket669Spec.scala | 2 +- .../src/main/scala/akka/actor/Actor.scala | 14 ++-------- .../main/scala/akka/actor/UntypedActor.scala | 2 +- .../src/main/scala/akka/camel/Producer.scala | 2 +- .../scala/akka/camel/ConsumerScalaTest.scala | 2 +- .../src/main/scala/akka/cluster/Cluster.scala | 2 +- .../project/migration-guide-1.1.x-1.2.x.rst | 6 +++++ .../project/migration-guide-1.2.x-2.0.x.rst | 20 ++++++++++++++ akka-docs/project/migration-guides.rst | 2 ++ .../akka/spring/RemoteTypedActorOneImpl.java | 4 ++- .../akka/spring/RemoteTypedActorTwoImpl.java | 4 ++- .../scala/akka/testkit/TestActorRefSpec.scala | 2 +- 16 files changed, 50 insertions(+), 44 deletions(-) create mode 100644 akka-docs/project/migration-guide-1.1.x-1.2.x.rst create mode 100644 akka-docs/project/migration-guide-1.2.x-2.0.x.rst diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala index 4bef71a74d..f11b762493 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala @@ -334,7 +334,7 @@ class ActorRefSpec extends WordSpec with MustMatchers { val ref = Actor.actorOf( new Actor { def receive = { case _ ⇒ } - override def preRestart(reason: Throwable) = latch.countDown() + override def preRestart(reason: Throwable, msg: Option[Any]) = latch.countDown() override def postRestart(reason: Throwable) = latch.countDown() }).start() diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala index cdff70c461..d2b9a42ee5 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala @@ -36,7 +36,7 @@ object ActorRestartSpec { case "get" ⇒ self reply xx } override def preStart { testActor ! (("preStart", gen)) } - override def preRestart(cause: Throwable) { testActor ! (("preRestart", gen)) } + override def preRestart(cause: Throwable, msg: Option[Any]) { testActor ! (("preRestart", msg, gen)) } override def postRestart(cause: Throwable) { testActor ! (("postRestart", gen)) } override def freshInstance() = { restart match { @@ -94,7 +94,7 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo supervisor link actor actor ! Kill within(1 second) { - expectMsg(("preRestart", 1)) + expectMsg(("preRestart", Some(Kill), 1)) expectMsg(("preStart", 2)) expectMsg(("postRestart", 2)) expectNoMsg @@ -109,7 +109,7 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo actor ! Nested actor ! Kill within(1 second) { - expectMsg(("preRestart", 1)) + expectMsg(("preRestart", Some(Kill), 1)) val (tActor, tRef) = expectMsgType[(Actor, TestActorRef[Actor])] tRef.underlyingActor must be(tActor) expectMsg((tActor, tRef)) @@ -129,7 +129,7 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo actor ! Handover actor ! Kill within(1 second) { - expectMsg(("preRestart", 1)) + expectMsg(("preRestart", Some(Kill), 1)) expectMsg(("preStart", 2)) expectMsg(("postRestart", 2)) expectNoMsg @@ -147,7 +147,7 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo actor ! Fail actor ! Kill within(1 second) { - expectMsg(("preRestart", 1)) + expectMsg(("preRestart", Some(Kill), 1)) expectMsg(("preStart", 2)) expectMsg(("postRestart", 2)) expectNoMsg @@ -156,22 +156,6 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo expectMsg(1 second, 0) } - "call preRestart(cause, currentMessage) if defined" in { - val actor = newActor(new Actor { - def receive = { case _ ⇒ } - override def preRestart(cause: Throwable, currentMessage: Option[Any]) { - testActor ! (("preRestart", currentMessage)) - } - }) - val supervisor = newActor(new Supervisor) - supervisor link actor - actor ! Kill - within(1 second) { - expectMsg(("preRestart", Some(Kill))) - expectNoMsg - } - } - } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTransitionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTransitionSpec.scala index 0cbaddd8e6..1eba7a71c1 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTransitionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTransitionSpec.scala @@ -32,7 +32,7 @@ object FSMTransitionSpec { case Ev("reply") ⇒ stay replying "reply" } initialize - override def preRestart(reason: Throwable) { target ! "restarted" } + override def preRestart(reason: Throwable, msg: Option[Any]) { target ! "restarted" } } class Forwarder(target: ActorRef) extends Actor { diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala index d8aaa9d0e4..85b41c777c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala @@ -25,7 +25,7 @@ class SupervisorTreeSpec extends WordSpec with MustMatchers { case Die ⇒ throw new Exception(self.address + " is dying...") } - override def preRestart(reason: Throwable) { + override def preRestart(reason: Throwable, msg: Option[Any]) { log += self.address } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala index 0a70058c7b..41019fc0a8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala @@ -59,7 +59,7 @@ object Ticket669Spec { case msg ⇒ throw new Exception("test") } - override def preRestart(reason: scala.Throwable) { + override def preRestart(reason: scala.Throwable, msg: Option[Any]) { self.reply_?("failure1") } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index bb4bb47327..fecd776ca1 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -659,19 +659,9 @@ trait Actor { * User overridable callback. *

* Is called on a crashed Actor right BEFORE it is restarted to allow clean - * up of resources before Actor is terminated. Override either the variant - * with or without the currentMessage argument. + * up of resources before Actor is terminated. */ - def preRestart(reason: Throwable) {} - - /** - * User overridable callback. - *

- * Is called on a crashed Actor right BEFORE it is restarted to allow clean - * up of resources before Actor is terminated. Override either the variant - * with or without the currentMessage argument. - */ - def preRestart(reason: Throwable, message: Option[Any]) { preRestart(reason) } + def preRestart(reason: Throwable, message: Option[Any]) {} /** * User overridable callback. diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala index d561ce6221..fa546b2437 100644 --- a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala @@ -102,7 +102,7 @@ abstract class UntypedActor extends Actor { *

* Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated. */ - override def preRestart(reason: Throwable) {} + override def preRestart(reason: Throwable, lastMessage: Option[Any]) {} /** * User overridable callback. diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala index 041f3397ff..9ad6ce22d2 100644 --- a/akka-camel/src/main/scala/akka/camel/Producer.scala +++ b/akka-camel/src/main/scala/akka/camel/Producer.scala @@ -58,7 +58,7 @@ trait ProducerSupport { this: Actor ⇒ * Default implementation of Actor.preRestart for freeing resources needed * to actually send messages to endpointUri. */ - override def preRestart(reason: Throwable) { + override def preRestart(reason: Throwable, msg: Option[Any]) { try { preRestartProducer(reason) } finally { processor.stop } } diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala index d80e847efa..27bbc264d7 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala @@ -254,7 +254,7 @@ object ConsumerScalaTest { case "succeed" ⇒ self.reply("ok") } - override def preRestart(reason: scala.Throwable) { + override def preRestart(reason: scala.Throwable, msg: Option[Any]) { self.reply_?("pr") } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 48c77ce7db..afab58547a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -1606,7 +1606,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { self.dispatcher = Dispatchers.newPinnedDispatcher(self) - override def preRestart(reason: Throwable) { + override def preRestart(reason: Throwable, msg: Option[Any]) { EventHandler.debug(this, "RemoteClusterDaemon failed due to [%s] restarting...".format(reason)) } diff --git a/akka-docs/project/migration-guide-1.1.x-1.2.x.rst b/akka-docs/project/migration-guide-1.1.x-1.2.x.rst new file mode 100644 index 0000000000..e4988b9460 --- /dev/null +++ b/akka-docs/project/migration-guide-1.1.x-1.2.x.rst @@ -0,0 +1,6 @@ +.. _migration-1.2: + +################################ + Migration Guide 1.1.x to 1.2.x +################################ + diff --git a/akka-docs/project/migration-guide-1.2.x-2.0.x.rst b/akka-docs/project/migration-guide-1.2.x-2.0.x.rst new file mode 100644 index 0000000000..7eabcf2f10 --- /dev/null +++ b/akka-docs/project/migration-guide-1.2.x-2.0.x.rst @@ -0,0 +1,20 @@ +.. _migration-2.0: + +################################ + Migration Guide 1.2.x to 2.0.x +################################ + +Actors +====== + +The 2.0 release contains several new features which require source-level +changes in client code. This API cleanup is planned to be the last one for a +significant amount of time. + +Lifecycle Callbacks +------------------- + +The :meth:`preRestart(cause: Throwable)` method has been replaced by +:meth:`preRestart(cause: Throwable, lastMessage: Any)`, hence you must insert +the second argument in all overriding methods. The good news is that any missed +actor will not compile without error. diff --git a/akka-docs/project/migration-guides.rst b/akka-docs/project/migration-guides.rst index 7eb063811c..7af815f241 100644 --- a/akka-docs/project/migration-guides.rst +++ b/akka-docs/project/migration-guides.rst @@ -6,6 +6,8 @@ Migration Guides .. toctree:: :maxdepth: 1 + migration-guide-1.2.x-2.0.x + migration-guide-1.1.x-1.2.x migration-guide-1.0.x-1.1.x migration-guide-0.10.x-1.0.x migration-guide-0.9.x-0.10.x diff --git a/akka-spring/src/test/java/akka/spring/RemoteTypedActorOneImpl.java b/akka-spring/src/test/java/akka/spring/RemoteTypedActorOneImpl.java index c7ad0f3de6..f6ab0e56e7 100644 --- a/akka-spring/src/test/java/akka/spring/RemoteTypedActorOneImpl.java +++ b/akka-spring/src/test/java/akka/spring/RemoteTypedActorOneImpl.java @@ -2,6 +2,8 @@ package akka.spring; import akka.actor.*; +import scala.Option; + import java.util.concurrent.CountDownLatch; public class RemoteTypedActorOneImpl extends TypedActor implements RemoteTypedActorOne { @@ -22,7 +24,7 @@ public class RemoteTypedActorOneImpl extends TypedActor implements RemoteTypedAc } @Override - public void preRestart(Throwable e) { + public void preRestart(Throwable e, Option msg) { try { RemoteTypedActorLog.messageLog().put(e.getMessage()); } catch(Exception ex) {} latch.countDown(); } diff --git a/akka-spring/src/test/java/akka/spring/RemoteTypedActorTwoImpl.java b/akka-spring/src/test/java/akka/spring/RemoteTypedActorTwoImpl.java index 1479db0d8c..2ee07195e2 100644 --- a/akka-spring/src/test/java/akka/spring/RemoteTypedActorTwoImpl.java +++ b/akka-spring/src/test/java/akka/spring/RemoteTypedActorTwoImpl.java @@ -2,6 +2,8 @@ package akka.spring; import akka.actor.*; +import scala.Option; + import java.util.concurrent.CountDownLatch; public class RemoteTypedActorTwoImpl extends TypedActor implements RemoteTypedActorTwo { @@ -22,7 +24,7 @@ public class RemoteTypedActorTwoImpl extends TypedActor implements RemoteTypedAc } @Override - public void preRestart(Throwable e) { + public void preRestart(Throwable e, Option msg) { try { RemoteTypedActorLog.messageLog().put(e.getMessage()); } catch(Exception ex) {} latch.countDown(); } diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 3ef904fe88..6b4f2415b9 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -174,7 +174,7 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), Some(2), Some(1000)) val ref = TestActorRef(new TActor { def receiveT = { case _ ⇒ } - override def preRestart(reason: Throwable) { counter -= 1 } + override def preRestart(reason: Throwable, msg: Option[Any]) { counter -= 1 } override def postRestart(reason: Throwable) { counter -= 1 } }).start() self.dispatcher = CallingThreadDispatcher.global