Merge branch 'master' of github.com:akka/akka

This commit is contained in:
Viktor Klang 2012-05-07 19:50:09 +02:00
commit 255e3ad61f
4 changed files with 211 additions and 109 deletions

View file

@ -262,6 +262,25 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
expectMsg(1 second, IndexedSeq(LogEntry(1, 1, "log"), LogEntry(1, 1, "count"), LogEntry(1, 2, "log")))
}
"allow transforming of state results" in {
import akka.actor.FSM._
val fsmref = system.actorOf(Props(new Actor with FSM[Int, Int] {
startWith(0, 0)
when(0)(transform {
case Event("go", _) stay
} using {
case x goto(1)
})
when(1) {
case _ stay
}
}))
fsmref ! SubscribeTransitionCallBack(testActor)
fsmref ! "go"
expectMsg(CurrentState(fsmref, 0))
expectMsg(Transition(fsmref, 0, 1))
}
}
}

View file

@ -172,7 +172,7 @@ object FSM {
* timerActive_? ("tock")
* </pre>
*/
trait FSM[S, D] extends Listeners {
trait FSM[S, D] extends Listeners with ActorLogging {
this: Actor
import FSM._
@ -186,8 +186,6 @@ trait FSM[S, D] extends Listeners {
val -> = FSM.->
val StateTimeout = FSM.StateTimeout
val log = Logging(context.system, this)
/**
* ****************************************
* DSL
@ -219,8 +217,8 @@ trait FSM[S, D] extends Listeners {
* @param timeout state timeout for the initial state, overriding the default timeout for that state
*/
protected final def startWith(stateName: S,
stateData: D,
timeout: Timeout = None): Unit =
stateData: D,
timeout: Timeout = None): Unit =
currentState = FSM.State(stateName, stateData, timeout)
/**
@ -255,6 +253,13 @@ trait FSM[S, D] extends Listeners {
*/
protected final def stop(reason: Reason, stateData: D): State = stay using stateData withStopReason (reason)
protected final class TransformHelper(func: StateFunction) {
def using(andThen: PartialFunction[State, State]): StateFunction =
func andThen (andThen orElse { case x x })
}
protected final def transform(func: StateFunction): TransformHelper = new TransformHelper(func)
/**
* Schedule named timer to deliver message after given delay, possibly repeating.
* @param name identifier to be used with cancelTimer()
@ -327,7 +332,7 @@ trait FSM[S, D] extends Listeners {
* Convenience wrapper for using a total function instead of a partial
* function literal. To be used with onTransition.
*/
implicit protected final def total2pf(transitionHandler: (S, S) Unit) =
implicit protected final def total2pf(transitionHandler: (S, S) Unit): TransitionHandler =
new TransitionHandler {
def isDefinedAt(in: (S, S)) = true
def apply(in: (S, S)) { transitionHandler(in._1, in._2) }
@ -336,7 +341,7 @@ trait FSM[S, D] extends Listeners {
/**
* Set handler which is called upon termination of this FSM actor.
*/
protected final def onTermination(terminationHandler: PartialFunction[StopEvent[S, D], Unit]): Unit =
protected final def onTermination(terminationHandler: PartialFunction[StopEvent, Unit]): Unit =
terminateEvent = terminationHandler
/**
@ -415,7 +420,7 @@ trait FSM[S, D] extends Listeners {
/*
* termination handling
*/
private var terminateEvent: PartialFunction[StopEvent[S, D], Unit] = NullFunction
private var terminateEvent: PartialFunction[StopEvent, Unit] = NullFunction
/*
* transition handling
@ -538,7 +543,7 @@ trait FSM[S, D] extends Listeners {
case class Event(event: Any, stateData: D)
case class StopEvent[S, D](reason: Reason, currentState: S, stateData: D)
case class StopEvent(reason: Reason, currentState: S, stateData: D)
}
/**

View file

@ -37,11 +37,15 @@ class FSMDocSpec extends AkkaSpec {
//#simple-fsm
class Buncher extends Actor with FSM[State, Data] {
//#fsm-body
startWith(Idle, Uninitialized)
//#when-syntax
when(Idle) {
case Event(SetTarget(ref), Uninitialized) stay using Todo(ref, Vector.empty)
case Event(SetTarget(ref), Uninitialized)
stay using Todo(ref, Vector.empty)
}
//#when-syntax
//#transition-elided
onTransition {
@ -51,10 +55,13 @@ class FSMDocSpec extends AkkaSpec {
}
}
//#transition-elided
//#when-syntax
when(Active, stateTimeout = 1 second) {
case Event(Flush | FSM.StateTimeout, t: Todo) goto(Idle) using t.copy(queue = Vector.empty)
case Event(Flush | StateTimeout, t: Todo)
goto(Idle) using t.copy(queue = Vector.empty)
}
//#when-syntax
//#unhandled-elided
whenUnhandled {
@ -67,10 +74,116 @@ class FSMDocSpec extends AkkaSpec {
stay
}
//#unhandled-elided
//#fsm-body
initialize
}
//#simple-fsm
object DemoCode {
trait StateType
case object SomeState extends StateType
case object Processing extends StateType
case object Error extends StateType
case object Idle extends StateType
case object Active extends StateType
class Dummy extends Actor with FSM[StateType, Int] {
class X
val newData = 42
object WillDo
object Tick
//#modifier-syntax
when(SomeState) {
case Event(msg, _)
goto(Processing) using (newData) forMax (5 seconds) replying (WillDo)
}
//#modifier-syntax
//#transition-syntax
onTransition {
case Idle -> Active setTimer("timeout", Tick, 1 second, true)
case Active -> _ cancelTimer("timeout")
case x -> Idle log.info("entering Idle from " + x)
}
//#transition-syntax
//#alt-transition-syntax
onTransition(handler _)
def handler(from: StateType, to: StateType) {
// handle it here ...
}
//#alt-transition-syntax
//#stop-syntax
when(Error) {
case Event("stop", _)
// do cleanup ...
stop()
}
//#stop-syntax
//#transform-syntax
when(SomeState)(transform {
case Event(bytes: Array[Byte], read) stay using (read + bytes.length)
case Event(bytes: List[Byte], read) stay using (read + bytes.size)
} using {
case s @ FSM.State(state, read, timeout, stopReason, replies) if read > 1000
goto(Processing)
})
//#transform-syntax
//#alt-transform-syntax
val processingTrigger: PartialFunction[State, State] = {
case s @ FSM.State(state, read, timeout, stopReason, replies) if read > 1000
goto(Processing)
}
when(SomeState)(transform {
case Event(bytes: Array[Byte], read) stay using (read + bytes.length)
case Event(bytes: List[Byte], read) stay using (read + bytes.size)
} using processingTrigger)
//#alt-transform-syntax
//#termination-syntax
onTermination {
case StopEvent(FSM.Normal, state, data) // ...
case StopEvent(FSM.Shutdown, state, data) // ...
case StopEvent(FSM.Failure(cause), state, data) // ...
}
//#termination-syntax
//#unhandled-syntax
whenUnhandled {
case Event(x: X, data)
log.info("Received unhandled event: " + x)
stay
case Event(msg, _)
log.warning("Received unknown event: " + msg)
goto(Error)
}
//#unhandled-syntax
}
//#logging-fsm
import akka.actor.LoggingFSM
class MyFSM extends Actor with LoggingFSM[StateType, Data] {
//#body-elided
override def logDepth = 12
onTermination {
case StopEvent(FSM.Failure(_), state, data)
val lastEvents = getLog.mkString("\n\t")
log.warning("Failure in state " + state + " with data " + data + "\n" +
"Events leading up to this point:\n\t" + lastEvents)
}
// ...
//#body-elided
}
//#logging-fsm
}
//#fsm-code-elided
"batch correctly" in {

View file

@ -118,19 +118,11 @@ The FSM Trait and Object
The :class:`FSM` trait may only be mixed into an :class:`Actor`. Instead of
extending :class:`Actor`, the self type approach was chosen in order to make it
obvious that an actor is actually created. Importing all members of the
:obj:`FSM` object is recommended if you want to directly access the symbols
like :obj:`StateTimeout`. This import is usually placed inside the state
machine definition:
obvious that an actor is actually created:
.. code-block:: scala
class MyFSM extends Actor with FSM[State, Data] {
import FSM._
...
}
.. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: simple-fsm
:exclude: fsm-body
The :class:`FSM` trait takes two type parameters:
@ -153,7 +145,7 @@ Defining States
A state is defined by one or more invocations of the method
:func:`when(<name>[, stateTimeout = <timeout>])(stateFunction)`.
:func:`when(<name>[, stateTimeout = <timeout>])(stateFunction)`.
The given name must be an object which is type-compatible with the first type
parameter given to the :class:`FSM` trait. This object is used as a hash key,
@ -165,27 +157,18 @@ If the :meth:`stateTimeout` parameter is given, then all transitions into this
state, including staying, receive this timeout by default. Initiating the
transition with an explicit timeout may be used to override this default, see
`Initiating Transitions`_ for more information. The state timeout of any state
may be changed during action processing with :func:`setStateTimeout(state,
duration)`. This enables runtime configuration e.g. via external message.
may be changed during action processing with
:func:`setStateTimeout(state, duration)`. This enables runtime configuration
e.g. via external message.
The :meth:`stateFunction` argument is a :class:`PartialFunction[Event, State]`,
The :meth:`stateFunction` argument is a :class:`PartialFunction[Event, State]`,
which is conveniently given using the partial function literal syntax as
demonstrated below:
.. code-block:: scala
.. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: when-syntax
when(Idle) {
case Event(Start(msg), _) =>
goto(Timer) using (msg, sender)
}
when(Timer, stateTimeout = 12 seconds) {
case Event(StateTimeout, (msg, sender)) =>
sender ! msg
goto(Idle)
}
The :class:`Event(msg: Any, data: D)` case class is parameterized with the data
The :class:`Event(msg: Any, data: D)` case class is parameterized with the data
type held by the FSM for convenient pattern matching.
Defining the Initial State
@ -193,7 +176,7 @@ Defining the Initial State
Each FSM needs a starting point, which is declared using
:func:`startWith(state, data[, timeout])`
:func:`startWith(state, data[, timeout])`
The optionally given timeout argument overrides any specification given for the
desired initial state. If you want to cancel a default timeout, use
@ -206,16 +189,8 @@ If a state doesn't handle a received event a warning is logged. If you want to
do something else in this case you can specify that with
:func:`whenUnhandled(stateFunction)`:
.. code-block:: scala
whenUnhandled {
case Event(x : X, data) =>
log.info(this, "Received unhandled event: " + x)
stay
case Event(msg, _) =>
log.warn(this, "Received unknown event: " + x)
goto(Error)
}
.. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: unhandled-syntax
**IMPORTANT**: This handler is not stacked, meaning that each invocation of
:func:`whenUnhandled` replaces the previously installed handler.
@ -230,7 +205,8 @@ The state definition can either be the current state, as described by the
:func:`goto(state)`. The resulting object allows further qualification by way
of the modifiers described in the following:
:meth:`forMax(duration)`
* :meth:`forMax(duration)`
This modifier sets a state timeout on the next state. This means that a timer
is started which upon expiry sends a :obj:`StateTimeout` message to the FSM.
This timer is canceled upon reception of any other message in the meantime;
@ -241,23 +217,21 @@ of the modifiers described in the following:
specified for the target state. If you want to cancel the default timeout,
use :obj:`Duration.Inf`.
:meth:`using(data)`
* :meth:`using(data)`
This modifier replaces the old state data with the new data given. If you
follow the advice :ref:`above <fsm-philosophy>`, this is the only place where
internal state data are ever modified.
:meth:`replying(msg)`
* :meth:`replying(msg)`
This modifier sends a reply to the currently processed message and otherwise
does not modify the state transition.
All modifier can be chained to achieve a nice and concise description:
.. code-block:: scala
when(State) {
case Event(msg, _) =>
goto(Processing) using (msg) forMax (5 seconds) replying (WillDo)
}
.. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: modifier-syntax
The parentheses are not actually needed in all cases, but they visually
distinguish between modifiers and their arguments and therefore make the code
@ -267,7 +241,7 @@ even more pleasant to read for foreigners.
Please note that the ``return`` statement may not be used in :meth:`when`
blocks or similar; this is a Scala restriction. Either refactor your code
using ``if () ... else ...`` or move it into a method definition.
using ``if () ... else ...`` or move it into a method definition.
Monitoring Transitions
----------------------
@ -293,13 +267,8 @@ The handler is a partial function which takes a pair of states as input; no
resulting state is needed as it is not possible to modify the transition in
progress.
.. code-block:: scala
onTransition {
case Idle -> Active => setTimer("timeout")
case Active -> _ => cancelTimer("timeout")
case x -> Idle => log.info("entering Idle from "+x)
}
.. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: transition-syntax
The convenience extractor :obj:`->` enables decomposition of the pair of states
with a clear visual reminder of the transition's direction. As usual in pattern
@ -311,13 +280,8 @@ It is also possible to pass a function object accepting two states to
:func:`onTransition`, in case your transition handling logic is implemented as
a method:
.. code-block:: scala
onTransition(handler _)
private def handler(from: State, to: State) {
...
}
.. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: alt-transition-syntax
The handlers registered with this method are stacked, so you can intersperse
:func:`onTransition` blocks with :func:`when` blocks as suits your design. It
@ -338,8 +302,8 @@ External Monitoring
External actors may be registered to be notified of state transitions by
sending a message :class:`SubscribeTransitionCallBack(actorRef)`. The named
actor will be sent a :class:`CurrentState(self, stateName)` message immediately
and will receive :class:`Transition(actorRef, oldState, newState)` messages
actor will be sent a :class:`CurrentState(self, stateName)` message immediately
and will receive :class:`Transition(actorRef, oldState, newState)` messages
whenever a new state is reached. External monitors may be unregistered by
sending :class:`UnsubscribeTransitionCallBack(actorRef)` to the FSM actor.
@ -347,13 +311,31 @@ Registering a not-running listener generates a warning and fails gracefully.
Stopping a listener without unregistering will remove the listener from the
subscription list upon the next transition.
Transforming State
------------------
The partial functions supplied as argument to the ``when()`` blocks can be
transformed using Scalas full supplement of functional programming tools. In
order to retain type inference, there is a helper function which may be used in
case some common handling logic shall be applied to different clauses:
.. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: transform-syntax
It goes without saying that the arguments to this method may also be stored, to
be used several times, e.g. when applying the same transformation to several
``when()`` blocks:
.. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: alt-transform-syntax
Timers
------
Besides state timeouts, FSM manages timers identified by :class:`String` names.
You may set a timer using
:func:`setTimer(name, msg, interval, repeat)`
:func:`setTimer(name, msg, interval, repeat)`
where :obj:`msg` is the message object which will be sent after the duration
:obj:`interval` has elapsed. If :obj:`repeat` is :obj:`true`, then the timer is
@ -376,7 +358,7 @@ Termination from Inside
The FSM is stopped by specifying the result state as
:func:`stop([reason[, data]])`
:func:`stop([reason[, data]])`
The reason must be one of :obj:`Normal` (which is the default), :obj:`Shutdown`
or :obj:`Failure(reason)`, and the second argument may be given to change the
@ -389,25 +371,15 @@ state data which is available during termination handling.
the same way as a state transition (but note that the ``return`` statement
may not be used within a :meth:`when` block).
.. code-block:: scala
when(A) {
case Event(Stop, _) =>
doCleanup()
stop()
}
.. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: stop-syntax
You can use :func:`onTermination(handler)` to specify custom code that is
executed when the FSM is stopped. The handler is a partial function which takes
a :class:`StopEvent(reason, stateName, stateData)` as argument:
a :class:`StopEvent(reason, stateName, stateData)` as argument:
.. code-block:: scala
onTermination {
case StopEvent(Normal, s, d) => ...
case StopEvent(Shutdown, _, _) => ...
case StopEvent(Failure(cause), s, d) => ...
}
.. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: termination-syntax
As for the :func:`whenUnhandled` case, this handler is not stacked, so each
invocation of :func:`onTermination` replaces the previously installed handler.
@ -419,7 +391,7 @@ When an :class:`ActorRef` associated to a FSM is stopped using the
:meth:`stop()` method, its :meth:`postStop` hook will be executed. The default
implementation by the :class:`FSM` trait is to execute the
:meth:`onTermination` handler if that is prepared to handle a
:obj:`StopEvent(Shutdown, ...)`.
:obj:`StopEvent(Shutdown, ...)`.
.. warning::
@ -438,11 +410,11 @@ Event Tracing
-------------
The setting ``akka.actor.debug.fsm`` in :ref:`configuration` enables logging of an
event trace by :class:`LoggingFSM` instances::
event trace by :class:`LoggingFSM` instances:
class MyFSM extends Actor with LoggingFSM[X, Z] {
...
}
.. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: logging-fsm
:exclude: body-elided
This FSM will log at DEBUG level:
@ -459,17 +431,10 @@ Rolling Event Log
The :class:`LoggingFSM` trait adds one more feature to the FSM: a rolling event
log which may be used during debugging (for tracing how the FSM entered a
certain failure state) or for other creative uses::
certain failure state) or for other creative uses:
class MyFSM extends Actor with LoggingFSM[X, Z] {
override def logDepth = 12
onTermination {
case StopEvent(Failure(_), state, data) =>
log.warning(this, "Failure in state "+state+" with data "+data+"\n"+
"Events leading up to this point:\n\t"+getLog.mkString("\n\t"))
}
...
}
.. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: logging-fsm
The :meth:`logDepth` defaults to zero, which turns off the event log.