add Actor.freshInstance hook, test #955
This commit is contained in:
parent
48feec0bbb
commit
10256991cb
4 changed files with 253 additions and 38 deletions
|
|
@ -0,0 +1,148 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
||||
import org.scalatest.{ WordSpec, BeforeAndAfterAll, BeforeAndAfterEach }
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
|
||||
import Actor.actorOf
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
import akka.config.Supervision.OneForOneStrategy
|
||||
|
||||
import java.util.concurrent.atomic._
|
||||
|
||||
object ActorRestartSpec {
|
||||
|
||||
private var _gen = new AtomicInteger(0)
|
||||
def generation = _gen.incrementAndGet
|
||||
def generation_=(x: Int) { _gen.set(x) }
|
||||
|
||||
sealed trait RestartType
|
||||
case object Normal extends RestartType
|
||||
case object Nested extends RestartType
|
||||
case object Handover extends RestartType
|
||||
case object Fail extends RestartType
|
||||
|
||||
class Restarter(val testActor: ActorRef) extends Actor {
|
||||
val gen = generation
|
||||
var xx = 0
|
||||
var restart: RestartType = Normal
|
||||
def receive = {
|
||||
case x: Int ⇒ xx = x
|
||||
case t: RestartType ⇒ restart = t
|
||||
case "get" ⇒ self reply xx
|
||||
}
|
||||
override def preStart { testActor ! (("preStart", gen)) }
|
||||
override def postStop { testActor ! (("postStop", gen)) }
|
||||
override def preRestart(cause: Throwable) { testActor ! (("preRestart", gen)) }
|
||||
override def postRestart(cause: Throwable) { testActor ! (("postRestart", gen)) }
|
||||
override def freshInstance() = {
|
||||
restart match {
|
||||
case Normal ⇒ None
|
||||
case Nested ⇒
|
||||
val ref = TestActorRef(new Actor {
|
||||
def receive = { case _ ⇒ }
|
||||
override def preStart { testActor ! ((this, self)) }
|
||||
}).start()
|
||||
testActor ! ((ref.underlyingActor, ref))
|
||||
None
|
||||
case Handover ⇒
|
||||
val fresh = new Restarter(testActor)
|
||||
fresh.xx = xx
|
||||
Some(fresh)
|
||||
case Fail ⇒
|
||||
throw new IllegalActorStateException("expected")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class Supervisor extends Actor {
|
||||
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), 5, 5000)
|
||||
def receive = {
|
||||
case _ ⇒
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with BeforeAndAfterEach {
|
||||
import ActorRestartSpec._
|
||||
|
||||
override def beforeEach { generation = 0 }
|
||||
|
||||
"An Actor restart" must {
|
||||
|
||||
"invoke preRestart, preStart, postRestart" in {
|
||||
val actor = actorOf(new Restarter(testActor)).start()
|
||||
expectMsg(1 second, ("preStart", 1))
|
||||
val supervisor = actorOf[Supervisor].start()
|
||||
supervisor link actor
|
||||
actor ! Kill
|
||||
within(1 second) {
|
||||
expectMsg(("preRestart", 1))
|
||||
expectMsg(("preStart", 2))
|
||||
expectMsg(("postRestart", 2))
|
||||
expectNoMsg
|
||||
}
|
||||
}
|
||||
|
||||
"support creation of nested actors in freshInstance()" in {
|
||||
val actor = actorOf(new Restarter(testActor)).start()
|
||||
expectMsg(1 second, ("preStart", 1))
|
||||
val supervisor = actorOf[Supervisor].start()
|
||||
supervisor link actor
|
||||
actor ! Nested
|
||||
actor ! Kill
|
||||
within(1 second) {
|
||||
expectMsg(("preRestart", 1))
|
||||
val (tActor, tRef) = expectMsgType[(Actor, TestActorRef[Actor])]
|
||||
tRef.underlyingActor must be(tActor)
|
||||
expectMsg((tActor, tRef))
|
||||
tRef.stop()
|
||||
expectMsg(("preStart", 2))
|
||||
expectMsg(("postRestart", 2))
|
||||
expectNoMsg
|
||||
}
|
||||
}
|
||||
|
||||
"use freshInstance() if available" in {
|
||||
val actor = actorOf(new Restarter(testActor)).start()
|
||||
expectMsg(1 second, ("preStart", 1))
|
||||
val supervisor = actorOf[Supervisor].start()
|
||||
supervisor link actor
|
||||
actor ! 42
|
||||
actor ! Handover
|
||||
actor ! Kill
|
||||
within(1 second) {
|
||||
expectMsg(("preRestart", 1))
|
||||
expectMsg(("preStart", 2))
|
||||
expectMsg(("postRestart", 2))
|
||||
expectNoMsg
|
||||
}
|
||||
actor ! "get"
|
||||
expectMsg(1 second, 42)
|
||||
}
|
||||
|
||||
"fall back to default factory if freshInstance() fails" in {
|
||||
val actor = actorOf(new Restarter(testActor)).start()
|
||||
expectMsg(1 second, ("preStart", 1))
|
||||
val supervisor = actorOf[Supervisor].start()
|
||||
supervisor link actor
|
||||
actor ! 42
|
||||
actor ! Fail
|
||||
actor ! Kill
|
||||
within(1 second) {
|
||||
expectMsg(("preRestart", 1))
|
||||
expectMsg(("preStart", 2))
|
||||
expectMsg(("postRestart", 2))
|
||||
expectNoMsg
|
||||
}
|
||||
actor ! "get"
|
||||
expectMsg(1 second, 0)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -662,6 +662,18 @@ trait Actor {
|
|||
*/
|
||||
def preRestart(reason: Throwable) {}
|
||||
|
||||
/**
|
||||
* User overridable callback.
|
||||
* <p/>
|
||||
* Is called on the crashed Actor to give it the option of producing the
|
||||
* Actor's reincarnation. If it returns None, which is the default, the
|
||||
* initially provided actor factory is used.
|
||||
* <p/>
|
||||
* <b>Warning:</b> <i>Propagating state from a crashed actor carries the risk
|
||||
* of proliferating the cause of the error. Consider let-it-crash first.</i>
|
||||
*/
|
||||
def freshInstance(): Option[Actor] = None
|
||||
|
||||
/**
|
||||
* User overridable callback.
|
||||
* <p/>
|
||||
|
|
|
|||
|
|
@ -845,7 +845,20 @@ class LocalActorRef private[akka] (
|
|||
val stackBefore = refStack.get
|
||||
refStack.set(stackBefore.push(this))
|
||||
try {
|
||||
actorFactory()
|
||||
if (_status == ActorRefInternals.BEING_RESTARTED) {
|
||||
val a = actor
|
||||
val fresh = try a.freshInstance catch {
|
||||
case e ⇒
|
||||
EventHandler.error(e, a, "freshInstance() failed, falling back to initial actor factory")
|
||||
None
|
||||
}
|
||||
fresh match {
|
||||
case Some(ref) ⇒ ref
|
||||
case None ⇒ actorFactory()
|
||||
}
|
||||
} else {
|
||||
actorFactory()
|
||||
}
|
||||
} finally {
|
||||
val stackAfter = refStack.get
|
||||
if (stackAfter.nonEmpty)
|
||||
|
|
|
|||
|
|
@ -92,6 +92,83 @@ Here we create a light-weight actor-based thread, that can be used to spawn off
|
|||
... // do stuff
|
||||
}
|
||||
|
||||
Actor Internal API
|
||||
------------------
|
||||
|
||||
The :class:`Actor` trait defines only one abstract method, the abovementioned
|
||||
:meth:`receive`. In addition, it offers two convenience methods
|
||||
:meth:`become`/:meth:`unbecome` for modifying the hotswap behavior stack as
|
||||
described in :ref:`Actor.HotSwap` and the :obj:`self` reference to this actor’s
|
||||
:class:`ActorRef` object. The remaining visible methods are user-overridable
|
||||
life-cycle hooks which are described in the following::
|
||||
|
||||
def preStart() {}
|
||||
def preRestart(cause: Throwable) {}
|
||||
def freshInstance(): Option[Actor] = None
|
||||
def postRestart(cause: Throwable) {}
|
||||
def postStop() {}
|
||||
|
||||
The implementations shown above are the defaults provided by the :class:`Actor`
|
||||
trait.
|
||||
|
||||
Start Hook
|
||||
^^^^^^^^^^
|
||||
|
||||
Right after starting the actor, its :meth:`preStart` method is invoked. This is
|
||||
guaranteed to happen before the first message from external sources is queued
|
||||
to the actor’s mailbox.
|
||||
|
||||
::
|
||||
|
||||
override def preStart {
|
||||
// e.g. send initial message to self
|
||||
self ! GetMeStarted
|
||||
// or do any other stuff, e.g. registering with other actors
|
||||
someService ! Register(self)
|
||||
}
|
||||
|
||||
Restart Hooks
|
||||
^^^^^^^^^^^^^
|
||||
|
||||
A supervised actor, i.e. one which is linked to another actor with a fault
|
||||
handling strategy, will be restarted in case an exception is thrown while
|
||||
processing a message. This restart involves four of the hooks mentioned above:
|
||||
|
||||
1. The old actor is informed by calling :meth:`preRestart` with the exception
|
||||
which caused the restart; this method is the best place for cleaning up,
|
||||
preparing hand-over to the fresh actor instance, etc.
|
||||
2. The old actor’s :meth:`freshInstance` factory method is invoked, which may
|
||||
optionally produce the new actor instance which will replace this actor. If
|
||||
this method returns :obj:`None` or throws an exception, the initial factory
|
||||
from the ``Actor.actorOf`` call is used to produce the fresh instance.
|
||||
3. The new actor’s :meth:`preStart` method is invoked, just as in the normal
|
||||
start-up case.
|
||||
4. The new actor’s :meth:`postRestart` method is called with the exception
|
||||
which caused the restart.
|
||||
|
||||
.. warning::
|
||||
|
||||
The :meth:`freshInstance` hook may be used to propagate (part of) the failed
|
||||
actor’s state to the fresh instance. This carries the risk of proliferating
|
||||
the cause for the crash which triggered the restart. If you are tempted to
|
||||
take this route, it is strongly advised to step back and consider other
|
||||
possible approaches, e.g. distributing the state in question using other
|
||||
means or spawning short-lived worker actors for carrying out “risky” tasks.
|
||||
|
||||
An actor restart replaces only the actual actor object; the contents of the
|
||||
mailbox and the hotswap stack are unaffected by the restart, so processing of
|
||||
messages will resume after the :meth:`postRestart` hook returns. Any message
|
||||
sent to an actor which is being restarted will be queued to its mailbox as
|
||||
usual.
|
||||
|
||||
Stop Hook
|
||||
^^^^^^^^^
|
||||
|
||||
After stopping an actor, its :meth:`postStop` hook is called, which may be used
|
||||
e.g. for deregistering this actor from other services. This hook is guaranteed
|
||||
to run after message queuing has been disabled for this actor, i.e. sending
|
||||
messages would fail with an :class:`IllegalActorStateException`.
|
||||
|
||||
Identifying Actors
|
||||
------------------
|
||||
|
||||
|
|
@ -252,43 +329,6 @@ This method should return a ``PartialFunction``, e.g. a ‘match/case’ clause
|
|||
}
|
||||
}
|
||||
|
||||
Actor internal API
|
||||
------------------
|
||||
|
||||
The Actor trait contains almost no member fields or methods to invoke, you just use the Actor trait to implement the:
|
||||
|
||||
#. ``receive`` message handler
|
||||
#. life-cycle callbacks:
|
||||
|
||||
#. preStart
|
||||
#. postStop
|
||||
#. preRestart
|
||||
#. postRestart
|
||||
|
||||
The ``Actor`` trait has one single member field:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
val self: ActorRef
|
||||
|
||||
This ``self`` field holds a reference to its ``ActorRef`` and it is this reference you want to access the Actor's API. Here, for example, you find methods to reply to messages, send yourself messages, define timeouts, fault tolerance etc., start and stop etc.
|
||||
|
||||
However, for convenience you can import these functions and fields like below, which will allow you do drop the ``self`` prefix:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
class MyActor extends Actor {
|
||||
import self._
|
||||
id = ...
|
||||
dispatcher = ...
|
||||
start
|
||||
...
|
||||
}
|
||||
|
||||
But in this documentation we will always prefix the calls with ``self`` for clarity.
|
||||
|
||||
Let's start by looking how we can reply to messages in a convenient way using this ``ActorRef`` API.
|
||||
|
||||
Reply to messages
|
||||
-----------------
|
||||
|
||||
|
|
@ -441,6 +481,8 @@ You can also send an actor the ``akka.actor.PoisonPill`` message, which will sto
|
|||
|
||||
If the sender is a ``Future`` (e.g. the message is sent with ``?``), the ``Future`` will be completed with an ``akka.actor.ActorKilledException("PoisonPill")``.
|
||||
|
||||
.. _Actor.HotSwap:
|
||||
|
||||
HotSwap
|
||||
-------
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue