Merge branch 'wip-1485-fsm-docs-∂π'

This commit is contained in:
Roland 2012-01-24 09:35:35 +01:00
commit ee5ae1068b
8 changed files with 215 additions and 191 deletions

View file

@ -49,21 +49,6 @@ object FSM {
}
}
/*
* This extractor is just convenience for matching a (S, S) pair, including a
* reminder what the new state is.
*/
object -> {
def unapply[S](in: (S, S)) = Some(in)
}
/*
* With these implicits in scope, you can write "5 seconds" anywhere a
* Duration or Option[Duration] is expected. This is conveniently true
* for derived classes.
*/
implicit def d2od(d: Duration): Option[Duration] = Some(d)
case class LogEntry[S, D](stateName: S, stateData: D, event: Any)
case class State[S, D](stateName: S, stateData: D, timeout: Option[Duration] = None, stopReason: Option[Reason] = None, replies: List[Any] = Nil) {
@ -208,9 +193,12 @@ trait FSM[S, D] extends Listeners {
* @param stateTimeout default state timeout for this state
* @param stateFunction partial function describing response to input
*/
protected final def when(stateName: S, stateTimeout: Timeout = None)(stateFunction: StateFunction) = {
protected final def when(stateName: S, stateTimeout: Duration = null)(stateFunction: StateFunction): Unit =
register(stateName, stateFunction, Option(stateTimeout))
@deprecated("use the more import-friendly variant taking a Duration", "2.0")
protected final def when(stateName: S, stateTimeout: Timeout)(stateFunction: StateFunction): Unit =
register(stateName, stateFunction, stateTimeout)
}
/**
* Set initial state. Call this method from the constructor before the #initialize method.
@ -221,9 +209,8 @@ trait FSM[S, D] extends Listeners {
*/
protected final def startWith(stateName: S,
stateData: D,
timeout: Timeout = None) = {
timeout: Timeout = None): Unit =
currentState = FSM.State(stateName, stateData, timeout)
}
/**
* Produce transition to other state. Return this from a state function in
@ -232,9 +219,7 @@ trait FSM[S, D] extends Listeners {
* @param nextStateName state designator for the next state
* @return state transition descriptor
*/
protected final def goto(nextStateName: S): State = {
FSM.State(nextStateName, currentState.stateData)
}
protected final def goto(nextStateName: S): State = FSM.State(nextStateName, currentState.stateData)
/**
* Produce "empty" transition descriptor. Return this from a state function
@ -242,31 +227,22 @@ trait FSM[S, D] extends Listeners {
*
* @return descriptor for staying in current state
*/
protected final def stay(): State = {
// cannot directly use currentState because of the timeout field
goto(currentState.stateName)
}
protected final def stay(): State = goto(currentState.stateName) // cannot directly use currentState because of the timeout field
/**
* Produce change descriptor to stop this FSM actor with reason "Normal".
*/
protected final def stop(): State = {
stop(Normal)
}
protected final def stop(): State = stop(Normal)
/**
* Produce change descriptor to stop this FSM actor including specified reason.
*/
protected final def stop(reason: Reason): State = {
stop(reason, currentState.stateData)
}
protected final def stop(reason: Reason): State = stop(reason, currentState.stateData)
/**
* Produce change descriptor to stop this FSM actor including specified reason.
*/
protected final def stop(reason: Reason, stateData: D): State = {
stay using stateData withStopReason (reason)
}
protected final def stop(reason: Reason, stateData: D): State = stay using stateData withStopReason (reason)
/**
* Schedule named timer to deliver message after given delay, possibly repeating.
@ -290,12 +266,11 @@ trait FSM[S, D] extends Listeners {
* Cancel named timer, ensuring that the message is not subsequently delivered (no race).
* @param name of the timer to cancel
*/
protected[akka] def cancelTimer(name: String) = {
protected[akka] def cancelTimer(name: String): Unit =
if (timers contains name) {
timers(name).cancel
timers -= name
}
}
/**
* Inquire whether the named timer is still active. Returns true unless the
@ -308,8 +283,14 @@ trait FSM[S, D] extends Listeners {
* Set state timeout explicitly. This method can safely be used from within a
* state handler.
*/
protected final def setStateTimeout(state: S, timeout: Timeout) {
stateTimeouts(state) = timeout
protected final def setStateTimeout(state: S, timeout: Timeout): Unit = stateTimeouts(state) = timeout
/**
* This extractor is just convenience for matching a (S, S) pair, including a
* reminder what the new state is.
*/
object -> {
def unapply[S](in: (S, S)) = Some(in)
}
/**
@ -337,9 +318,7 @@ trait FSM[S, D] extends Listeners {
* <b>Multiple handlers may be installed, and every one of them will be
* called, not only the first one matching.</b>
*/
protected final def onTransition(transitionHandler: TransitionHandler) {
transitionEvent :+= transitionHandler
}
protected final def onTransition(transitionHandler: TransitionHandler): Unit = transitionEvent :+= transitionHandler
/**
* Convenience wrapper for using a total function instead of a partial
@ -354,24 +333,20 @@ trait FSM[S, D] extends Listeners {
/**
* Set handler which is called upon termination of this FSM actor.
*/
protected final def onTermination(terminationHandler: PartialFunction[StopEvent[S, D], Unit]) = {
protected final def onTermination(terminationHandler: PartialFunction[StopEvent[S, D], Unit]): Unit =
terminateEvent = terminationHandler
}
/**
* Set handler which is called upon reception of unhandled messages.
*/
protected final def whenUnhandled(stateFunction: StateFunction) = {
protected final def whenUnhandled(stateFunction: StateFunction): Unit =
handleEvent = stateFunction orElse handleEventDefault
}
/**
* Verify existence of initial state and setup timers. This should be the
* last call within the constructor.
*/
protected final def initialize {
makeTransition(currentState)
}
protected final def initialize: Unit = makeTransition(currentState)
/**
* Return current state name (i.e. object of type S)
@ -414,7 +389,7 @@ trait FSM[S, D] extends Listeners {
private val stateFunctions = mutable.Map[S, StateFunction]()
private val stateTimeouts = mutable.Map[S, Timeout]()
private def register(name: S, function: StateFunction, timeout: Timeout) {
private def register(name: S, function: StateFunction, timeout: Timeout): Unit = {
if (stateFunctions contains name) {
stateFunctions(name) = stateFunctions(name) orElse function
stateTimeouts(name) = timeout orElse stateTimeouts(name)
@ -494,12 +469,12 @@ trait FSM[S, D] extends Listeners {
}
}
private def processMsg(value: Any, source: AnyRef) {
private def processMsg(value: Any, source: AnyRef): Unit = {
val event = Event(value, currentState.stateData)
processEvent(event, source)
}
private[akka] def processEvent(event: Event, source: AnyRef) {
private[akka] def processEvent(event: Event, source: AnyRef): Unit = {
val stateFunc = stateFunctions(currentState.stateName)
val nextState = if (stateFunc isDefinedAt event) {
stateFunc(event)
@ -510,7 +485,7 @@ trait FSM[S, D] extends Listeners {
applyState(nextState)
}
private[akka] def applyState(nextState: State) {
private[akka] def applyState(nextState: State): Unit = {
nextState.stopReason match {
case None makeTransition(nextState)
case _
@ -520,7 +495,7 @@ trait FSM[S, D] extends Listeners {
}
}
private[akka] def makeTransition(nextState: State) {
private[akka] def makeTransition(nextState: State): Unit = {
if (!stateFunctions.contains(nextState.stateName)) {
terminate(stay withStopReason Failure("Next state %s does not exist".format(nextState.stateName)))
} else {
@ -541,9 +516,9 @@ trait FSM[S, D] extends Listeners {
}
}
override def postStop() { terminate(stay withStopReason Shutdown) }
override def postStop(): Unit = { terminate(stay withStopReason Shutdown) }
private def terminate(nextState: State) {
private def terminate(nextState: State): Unit = {
if (!currentState.stopReason.isDefined) {
val reason = nextState.stopReason.get
reason match {
@ -600,13 +575,13 @@ trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒
super.setTimer(name, msg, timeout, repeat)
}
protected[akka] abstract override def cancelTimer(name: String) = {
protected[akka] abstract override def cancelTimer(name: String): Unit = {
if (debugEvent)
log.debug("canceling timer '" + name + "'")
super.cancelTimer(name)
}
private[akka] abstract override def processEvent(event: Event, source: AnyRef) {
private[akka] abstract override def processEvent(event: Event, source: AnyRef): Unit = {
if (debugEvent) {
val srcstr = source match {
case s: String s

View file

@ -50,12 +50,17 @@ depending on the configuration of the actor system:
- There are several special types of actor references which behave like local
actor references for all practical purposes:
- :class:`AskActorRef` is the special representation of a :meth:`Promise` for
- :class:`PromiseActorRef` is the special representation of a :meth:`Promise` for
the purpose of being completed by the response from an actor; it is created
by the :meth:`ActorRef.ask` invocation.
- :class:`DeadLetterActorRef` is the default implementation of the dead
letters service, where all messages are re-routed whose routees are shut
down or non-existent.
- :class:`EmptyLocalActorRef` is what is returned when looking up a
non-existing local actor path: it is equivalent to a
:class:`DeadLetterActorRef`, but it retains its path so that it can be sent
over the network and compared to other existing actor refs for that path,
some of which might have been obtained before the actor stopped existing.
- And then there are some one-off internal implementations which you should
never really see:
@ -309,12 +314,3 @@ other actors are found. The next level consists of the following:
- ``"/remote"`` is an artificial path below which all actors reside whose
supervisors are remote actor references
Future extensions:
- ``"/service"`` is an artificial path below which actors can be presented by
means of configuration, i.e. deployed at system start-up or just-in-time
(triggered by look-up)
- ``"/alias"`` is an artificial path below which other actors may be “mounted”
(as in the Unix file-system sense) by path—local or remote—to give them
logical names.

View file

@ -255,6 +255,10 @@ currently traversed actor, otherwise it will step “down” to the named child.
It should be noted that the ``..`` in actor paths here always means the logical
structure, i.e. the supervisor.
If the path being looked up does not exist, a special actor reference is
returned which behaves like the actor systems dead letter queue but retains
its identity (i.e. the path which was looked up).
Remote actor addresses may also be looked up, if remoting is enabled::
getContext().actorFor("akka://app@otherhost:1234/user/serviceB")

View file

@ -287,6 +287,10 @@ currently traversed actor, otherwise it will step “down” to the named child.
It should be noted that the ``..`` in actor paths here always means the logical
structure, i.e. the supervisor.
If the path being looked up does not exist, a special actor reference is
returned which behaves like the actor systems dead letter queue but retains
its identity (i.e. the path which was looked up).
Remote actor addresses may also be looked up, if remoting is enabled::
context.actorFor("akka://app@otherhost:1234/user/serviceB")

View file

@ -0,0 +1,96 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.docs.actor
//#test-code
import akka.testkit.AkkaSpec
import akka.actor.Props
class FSMDocSpec extends AkkaSpec {
"simple finite state machine" must {
//#fsm-code-elided
//#simple-imports
import akka.actor.{ Actor, ActorRef, FSM }
import akka.util.duration._
//#simple-imports
//#simple-events
// received events
case class SetTarget(ref: ActorRef)
case class Queue(obj: Any)
case object Flush
// sent events
case class Batch(obj: Seq[Any])
//#simple-events
//#simple-state
// states
sealed trait State
case object Idle extends State
case object Active extends State
sealed trait Data
case object Uninitialized extends Data
case class Todo(target: ActorRef, queue: Seq[Any]) extends Data
//#simple-state
//#simple-fsm
class Buncher extends Actor with FSM[State, Data] {
startWith(Idle, Uninitialized)
when(Idle) {
case Event(SetTarget(ref), Uninitialized) stay using Todo(ref, Vector.empty)
}
//#transition-elided
onTransition {
case Active -> Idle
stateData match {
case Todo(ref, queue) ref ! Batch(queue)
}
}
//#transition-elided
when(Active, stateTimeout = 1 second) {
case Event(Flush | FSM.StateTimeout, t: Todo) goto(Idle) using t.copy(queue = Vector.empty)
}
//#unhandled-elided
whenUnhandled {
// common code for both states
case Event(Queue(obj), t @ Todo(_, v))
goto(Active) using t.copy(queue = v :+ obj)
case Event(e, s)
log.warning("received unhandled request {} in state {}/{}", e, stateName, s)
stay
}
//#unhandled-elided
initialize
}
//#simple-fsm
//#fsm-code-elided
"batch correctly" in {
val buncher = system.actorOf(Props(new Buncher))
buncher ! SetTarget(testActor)
buncher ! Queue(42)
buncher ! Queue(43)
expectMsg(Batch(Seq(42, 43)))
buncher ! Queue(44)
buncher ! Flush
buncher ! Queue(45)
expectMsg(Batch(Seq(44)))
expectMsg(Batch(Seq(45)))
}
"batch not if uninitialized" in {
val buncher = system.actorOf(Props(new Buncher))
buncher ! Queue(42)
expectNoMsg
}
}
}
//#test-code

View file

@ -26,146 +26,104 @@ These relations are interpreted as meaning:
A Simple Example
================
To demonstrate the usage of states we start with a simple FSM without state
data. The state can be of any type so for this example we create the states A,
B and C.
To demonstrate most of the features of the :class:`FSM` trait, consider an
actor which shall receive and queue messages while they arrive in a burst and
send them on after the burst ended or a flush request is received.
.. code-block:: scala
First, consider all of the below to use these import statements:
sealed trait ExampleState
case object A extends ExampleState
case object B extends ExampleState
case object C extends ExampleState
.. includecode:: code/akka/docs/actor/FSMDocSpec.scala#simple-imports
Now lets create an object representing the FSM and defining the behavior.
The contract of our “Buncher” actor is that is accepts or produces the following messages:
.. code-block:: scala
.. includecode:: code/akka/docs/actor/FSMDocSpec.scala#simple-events
import akka.actor.{Actor, FSM}
import akka.util.duration._
``SetTarget`` is needed for starting it up, setting the destination for the
``Batches`` to be passed on; ``Queue`` will add to the internal queue while
``Flush`` will mark the end of a burst.
case object Move
.. includecode:: code/akka/docs/actor/FSMDocSpec.scala#simple-state
class ABC extends Actor with FSM[ExampleState, Unit] {
The actor can be in two states: no message queued (aka ``Idle``) or some
message queued (aka ``Active``). It will stay in the active state as long as
messages keep arriving and no flush is requested. The internal state data of
the actor is made up of the target actor reference to send the batches to and
the actual queue of messages.
import FSM._
Now lets take a look at the skeleton for our FSM actor:
startWith(A, Unit)
.. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: simple-fsm
:exclude: transition-elided,unhandled-elided
when(A) {
case Ev(Move) =>
log.info(this, "Go to B and move on after 5 seconds")
goto(B) forMax (5 seconds)
}
The basic strategy is to declare the actor, mixing in the :class:`FSM` trait
and specifying the possible states and data values as type paramters. Within
the body of the actor a DSL is used for declaring the state machine:
when(B) {
case Ev(StateTimeout) =>
log.info(this, "Moving to C")
goto(C)
}
* :meth:`startsWith` defines the initial state and initial data
* then there is one :meth:`when(<state>) { ... }` declaration per state to be
handled (could potentially be multiple ones, the passed
:class:`PartialFunction` will be concatenated using :meth:`orElse`)
* finally starting it up using :meth:`initialize`, which performs the
transition into the initial state and sets up timers (if required).
when(C) {
case Ev(Move) =>
log.info(this, "Stopping")
stop
}
In this case, we start out in the ``Idle`` and ``Uninitialized`` state, where
only the ``SetTarget()`` message is handled; ``stay`` prepares to end this
events processing for not leaving the current state, while the ``using``
modifier makes the FSM replace the internal state (which is ``Uninitialized``
at this point) with a fresh ``Todo()`` object containing the target actor
reference. The ``Active`` state has a state timeout declared, which means that
if no message is received for 1 second, a ``FSM.StateTimeout`` message will be
generated. This has the same effect as receiving the ``Flush`` command in this
case, namely to transition back into the ``Idle`` state and resetting the
internal queue to the empty vector. But how do messages get queued? Since this
shall work identically in both states, we make use of the fact that any event
which is not handled by the ``when()`` block is passed to the
``whenUnhandled()`` block:
initialize // this checks validity of the initial state and sets up timeout if needed
}
.. includecode:: code/akka/docs/actor/FSMDocSpec.scala#unhandled-elided
Each state is described by one or more :func:`when(state)` blocks; if more than
one is given for the same state, they are tried in the order given until the
first is found which matches the incoming event. Events are matched using
either :func:`Ev(msg)` (if no state data are to be extracted) or
:func:`Event(msg, data)`, see below. The statements for each case are the
actions to be taken, where the final expression must describe the transition
into the next state. This can either be :func:`stay` when no transition is
needed or :func:`goto(target)` for changing into the target state. The
transition may be annotated with additional properties, where this example
includes a state timeout of 5 seconds after the transition into state B:
:func:`forMax(duration)` arranges for a :obj:`StateTimeout` message to be
scheduled, unless some other message is received first. The construction of the
FSM is finished by calling the :func:`initialize` method as last part of the
ABC constructor.
The first case handled here is adding ``Queue()`` requests to the internal
queue and going to the ``Active`` state (this does the obvious thing of staying
in the ``Active`` state if already there), but only if the FSM data are not
``Uninitialized`` when the ``Queue()`` event is received. Otherwise—and in all
other non-handled cases—the second case just logs a warning and does not change
the internal state.
State Data
==========
The only missing piece is where the ``Batches`` are actually sent to the
target, for which we use the ``onTransition`` mechanism: you can declare
multiple such blocks and all of them will be tried for matching behavior in
case a state transition occurs (i.e. only when the state actually changes).
The FSM can also hold state data associated with the internal state of the
state machine. The state data can be of any type but to demonstrate let's look
at a lock with a :class:`String` as state data holding the entered unlock code.
First we need two states for the lock:
.. includecode:: code/akka/docs/actor/FSMDocSpec.scala#transition-elided
.. code-block:: scala
The transition callback is a partial function which takes as input a pair of
states—the current and the next state. The FSM trait includes a convenience
extractor for these in form of an arrow operator, which conveniently reminds
you of the direction of the state change which is being matched. During the
state change, the old state data is available via ``stateData`` as shown, and
the new state data would be available as ``nextStateData``.
sealed trait LockState
case object Locked extends LockState
case object Open extends LockState
To verify that this buncher actually works, it is quite easy to write a test
using the :ref:`akka-testkit`, which is conveniently bundled with ScalaTest traits
into ``AkkaSpec``:
Now we can create a lock FSM that takes :class:`LockState` as a state and a
:class:`String` as state data:
.. code-block:: scala
import akka.actor.{Actor, FSM}
class Lock(code: String) extends Actor with FSM[LockState, String] {
import FSM._
val emptyCode = ""
startWith(Locked, emptyCode)
when(Locked) {
// receive a digit and the code that we have so far
case Event(digit: Char, soFar) => {
// add the digit to what we have
soFar + digit match {
case incomplete if incomplete.length < code.length =>
// not enough digits yet so stay using the
// incomplete code as the new state data
stay using incomplete
case `code` =>
// code matched the one from the lock
// so go to Open state and reset the state data
goto(Open) using emptyCode forMax (1 seconds)
case wrong =>
// wrong code, stay Locked and reset the state data
stay using emptyCode
}
}
}
when(Open) {
case Ev(StateTimeout, _) => {
// after the timeout, go back to Locked state
goto(Locked)
}
}
initialize
}
This very simple example shows how the complete state of the FSM is encoded in
the :obj:`(State, Data)` pair and only explicitly updated during transitions.
This encapsulation is what makes state machines a powerful abstraction, e.g.
for handling socket states in a network server application.
.. includecode:: code/akka/docs/actor/FSMDocSpec.scala
:include: test-code
:exclude: fsm-code-elided
Reference
=========
This section describes the DSL in a more formal way, refer to `Examples`_ for more sample material.
The FSM Trait and Object
------------------------
The :class:`FSM` trait may only be mixed into an :class:`Actor`. Instead of
extending :class:`Actor`, the self type approach was chosen in order to make it
obvious that an actor is actually created. Importing all members of the
:obj:`FSM` object is recommended to receive useful implicits and directly
access the symbols like :obj:`StateTimeout`. This import is usually placed
inside the state machine definition:
:obj:`FSM` object is recommended if you want to directly access the symbols
like :obj:`StateTimeout`. This import is usually placed inside the state
machine definition:
.. code-block:: scala
@ -192,15 +150,6 @@ The :class:`FSM` trait takes two type parameters:
to the FSM class you have the advantage of making all changes of the
internal state explicit in a few well-known places.
Defining Timeouts
-----------------
The :class:`FSM` module uses :ref:`Duration` for all timing configuration.
Several methods, like :func:`when()` and :func:`startWith()` take a
:class:`FSM.Timeout`, which is an alias for :class:`Option[Duration]`. There is
an implicit conversion available in the :obj:`FSM` object which makes this
transparent, just import it into your FSM body.
Defining States
---------------

View file

@ -64,7 +64,7 @@ abstract class GenericBuncher[A: Manifest, B](val singleTimeout: Duration, val m
case Event(Stop, _) stop
}
when(Active, stateTimeout = Some(singleTimeout)) {
when(Active, stateTimeout = singleTimeout) {
case Event(Msg(m), acc)
stay using merge(acc, m)
case Event(StateTimeout, acc)

View file

@ -62,8 +62,8 @@ class TestFSMRef[S, D, T <: Actor](
* corresponding transition initiated from within the FSM, including timeout
* and stop handling.
*/
def setState(stateName: S = fsm.stateName, stateData: D = fsm.stateData, timeout: Option[Duration] = None, stopReason: Option[FSM.Reason] = None) {
fsm.applyState(FSM.State(stateName, stateData, timeout, stopReason))
def setState(stateName: S = fsm.stateName, stateData: D = fsm.stateData, timeout: Duration = null, stopReason: Option[FSM.Reason] = None) {
fsm.applyState(FSM.State(stateName, stateData, Option(timeout), stopReason))
}
/**