diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala
index f363253fe8..815ab1076c 100644
--- a/akka-actor/src/main/scala/akka/actor/FSM.scala
+++ b/akka-actor/src/main/scala/akka/actor/FSM.scala
@@ -138,7 +138,7 @@ object FSM {
* timerActive_? ("tock")
*
*/
-trait FSM[S, D] {
+trait FSM[S, D] extends ListenerManagement {
this: Actor =>
import FSM._
@@ -147,7 +147,9 @@ trait FSM[S, D] {
type Timeout = Option[Duration]
type TransitionHandler = PartialFunction[(S, S), Unit]
- /* DSL */
+ /******************************************
+ * DSL
+ ******************************************/
/**
* Insert a new StateFunction at the end of the processing chain for the
@@ -324,15 +326,25 @@ trait FSM[S, D] {
makeTransition(currentState)
}
- /**FSM State data and default handlers */
+ /******************************************************************
+ * PRIVATE IMPLEMENTATION DETAILS
+ ******************************************************************/
+
+ /*
+ * FSM State data and current timeout handling
+ */
private var currentState: State = _
private var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None
private var generation: Long = 0L
- private var transitionCallBackList: List[ActorRef] = Nil
-
+ /*
+ * Timer handling
+ */
private val timers = mutable.Map[String, Timer]()
+ /*
+ * State definitions
+ */
private val stateFunctions = mutable.Map[S, StateFunction]()
private val stateTimeouts = mutable.Map[S, Timeout]()
@@ -346,23 +358,38 @@ trait FSM[S, D] {
}
}
+ /*
+ * unhandled event handler
+ */
private val handleEventDefault: StateFunction = {
case Event(value, stateData) =>
stay
}
private var handleEvent: StateFunction = handleEventDefault
+ /*
+ * termination handling
+ */
private var terminateEvent: PartialFunction[StopEvent[S,D], Unit] = {
case StopEvent(Failure(cause), _, _) =>
case StopEvent(reason, _, _) =>
}
+ /*
+ * transition handling
+ */
private var transitionEvent: List[TransitionHandler] = Nil
private def handleTransition(prev : S, next : S) {
val tuple = (prev, next)
for (te <- transitionEvent) { if (te.isDefinedAt(tuple)) te(tuple) }
}
+ // ListenerManagement shall not start() or stop() listener actors
+ override protected val manageLifeCycleOfListeners = false
+
+ /*********************************************
+ * Main actor receive() method
+ *********************************************/
override final protected def receive: Receive = {
case TimeoutMarker(gen) =>
if (generation == gen) {
@@ -376,16 +403,16 @@ trait FSM[S, D] {
}
}
case SubscribeTransitionCallBack(actorRef) =>
- // send current state back as reference point
+ addListener(actorRef)
+ // send current state back as reference point
try {
actorRef ! CurrentState(self, currentState.stateName)
- transitionCallBackList ::= actorRef
} catch {
case e : ActorInitializationException =>
EventHandler.warning(this, "trying to register not running listener")
}
case UnsubscribeTransitionCallBack(actorRef) =>
- transitionCallBackList = transitionCallBackList.filterNot(_ == actorRef)
+ removeListener(actorRef)
case value => {
if (timeoutFuture.isDefined) {
timeoutFuture.get.cancel(true)
@@ -417,19 +444,7 @@ trait FSM[S, D] {
} else {
if (currentState.stateName != nextState.stateName) {
handleTransition(currentState.stateName, nextState.stateName)
- if (!transitionCallBackList.isEmpty) {
- val transition = Transition(self, currentState.stateName, nextState.stateName)
- transitionCallBackList = transitionCallBackList flatMap { cb =>
- try {
- cb ! transition
- Some(cb)
- } catch {
- case e : ActorInitializationException =>
- EventHandler.warning(this, "registered transition listener went away")
- None
- }
- }
- }
+ notifyListeners(Transition(self, currentState.stateName, nextState.stateName))
}
applyState(nextState)
}
diff --git a/akka-actor/src/main/scala/akka/util/ListenerManagement.scala b/akka-actor/src/main/scala/akka/util/ListenerManagement.scala
index 777c048d70..349d51255d 100644
--- a/akka-actor/src/main/scala/akka/util/ListenerManagement.scala
+++ b/akka-actor/src/main/scala/akka/util/ListenerManagement.scala
@@ -5,8 +5,7 @@
package akka.util
import java.util.concurrent.ConcurrentSkipListSet
-
-import akka.actor.ActorRef
+import akka.actor.{ActorInitializationException, ActorRef}
/**
* A manager for listener actors. Intended for mixin by observables.
@@ -46,7 +45,8 @@ trait ListenerManagement {
def hasListeners: Boolean = !listeners.isEmpty
/**
- * Checks if a specfic listener is registered.
+ * Checks if a specfic listener is registered. ActorInitializationException leads to removal of listener if that
+ * one isShutdown.
*/
def hasListener(listener: ActorRef): Boolean = listeners.contains(listener)
@@ -56,13 +56,19 @@ trait ListenerManagement {
val iterator = listeners.iterator
while (iterator.hasNext) {
val listener = iterator.next
- if (listener.isRunning) listener ! msg
+ if (listener.isShutdown) iterator.remove()
+ else try {
+ listener ! msg
+ } catch {
+ case e : ActorInitializationException =>
+ if (listener.isShutdown) iterator.remove()
+ }
}
}
}
/**
- * Execute f with each listener as argument.
+ * Execute f with each listener as argument. ActorInitializationException is not handled.
*/
protected[akka] def foreachListener(f: (ActorRef) => Unit) {
val iterator = listeners.iterator