use ListenerManagement in FSM

remove quite some duplication, and make ListenerManagement more robust
(catch ActorInitializationException; there was a race condition in
notifyListeners)
This commit is contained in:
Roland Kuhn 2011-04-17 20:18:32 +02:00
parent 779c5436df
commit bd18e7e88f
2 changed files with 47 additions and 26 deletions

View file

@ -138,7 +138,7 @@ object FSM {
* timerActive_? ("tock")
* </pre>
*/
trait FSM[S, D] {
trait FSM[S, D] extends ListenerManagement {
this: Actor =>
import FSM._
@ -147,7 +147,9 @@ trait FSM[S, D] {
type Timeout = Option[Duration]
type TransitionHandler = PartialFunction[(S, S), Unit]
/* DSL */
/******************************************
* DSL
******************************************/
/**
* Insert a new StateFunction at the end of the processing chain for the
@ -324,15 +326,25 @@ trait FSM[S, D] {
makeTransition(currentState)
}
/**FSM State data and default handlers */
/******************************************************************
* PRIVATE IMPLEMENTATION DETAILS
******************************************************************/
/*
* FSM State data and current timeout handling
*/
private var currentState: State = _
private var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None
private var generation: Long = 0L
private var transitionCallBackList: List[ActorRef] = Nil
/*
* Timer handling
*/
private val timers = mutable.Map[String, Timer]()
/*
* State definitions
*/
private val stateFunctions = mutable.Map[S, StateFunction]()
private val stateTimeouts = mutable.Map[S, Timeout]()
@ -346,23 +358,38 @@ trait FSM[S, D] {
}
}
/*
* unhandled event handler
*/
private val handleEventDefault: StateFunction = {
case Event(value, stateData) =>
stay
}
private var handleEvent: StateFunction = handleEventDefault
/*
* termination handling
*/
private var terminateEvent: PartialFunction[StopEvent[S,D], Unit] = {
case StopEvent(Failure(cause), _, _) =>
case StopEvent(reason, _, _) =>
}
/*
* transition handling
*/
private var transitionEvent: List[TransitionHandler] = Nil
private def handleTransition(prev : S, next : S) {
val tuple = (prev, next)
for (te <- transitionEvent) { if (te.isDefinedAt(tuple)) te(tuple) }
}
// ListenerManagement shall not start() or stop() listener actors
override protected val manageLifeCycleOfListeners = false
/*********************************************
* Main actor receive() method
*********************************************/
override final protected def receive: Receive = {
case TimeoutMarker(gen) =>
if (generation == gen) {
@ -376,16 +403,16 @@ trait FSM[S, D] {
}
}
case SubscribeTransitionCallBack(actorRef) =>
// send current state back as reference point
addListener(actorRef)
// send current state back as reference point
try {
actorRef ! CurrentState(self, currentState.stateName)
transitionCallBackList ::= actorRef
} catch {
case e : ActorInitializationException =>
EventHandler.warning(this, "trying to register not running listener")
}
case UnsubscribeTransitionCallBack(actorRef) =>
transitionCallBackList = transitionCallBackList.filterNot(_ == actorRef)
removeListener(actorRef)
case value => {
if (timeoutFuture.isDefined) {
timeoutFuture.get.cancel(true)
@ -417,19 +444,7 @@ trait FSM[S, D] {
} else {
if (currentState.stateName != nextState.stateName) {
handleTransition(currentState.stateName, nextState.stateName)
if (!transitionCallBackList.isEmpty) {
val transition = Transition(self, currentState.stateName, nextState.stateName)
transitionCallBackList = transitionCallBackList flatMap { cb =>
try {
cb ! transition
Some(cb)
} catch {
case e : ActorInitializationException =>
EventHandler.warning(this, "registered transition listener went away")
None
}
}
}
notifyListeners(Transition(self, currentState.stateName, nextState.stateName))
}
applyState(nextState)
}

View file

@ -5,8 +5,7 @@
package akka.util
import java.util.concurrent.ConcurrentSkipListSet
import akka.actor.ActorRef
import akka.actor.{ActorInitializationException, ActorRef}
/**
* A manager for listener actors. Intended for mixin by observables.
@ -46,7 +45,8 @@ trait ListenerManagement {
def hasListeners: Boolean = !listeners.isEmpty
/**
* Checks if a specfic listener is registered.
* Checks if a specfic listener is registered. ActorInitializationException leads to removal of listener if that
* one isShutdown.
*/
def hasListener(listener: ActorRef): Boolean = listeners.contains(listener)
@ -56,13 +56,19 @@ trait ListenerManagement {
val iterator = listeners.iterator
while (iterator.hasNext) {
val listener = iterator.next
if (listener.isRunning) listener ! msg
if (listener.isShutdown) iterator.remove()
else try {
listener ! msg
} catch {
case e : ActorInitializationException =>
if (listener.isShutdown) iterator.remove()
}
}
}
}
/**
* Execute <code>f</code> with each listener as argument.
* Execute <code>f</code> with each listener as argument. ActorInitializationException is not handled.
*/
protected[akka] def foreachListener(f: (ActorRef) => Unit) {
val iterator = listeners.iterator