diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index 762f23b16b..50c769e156 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -8,29 +8,84 @@ import akka.util._ import scala.collection.mutable import akka.routing.{ Deafen, Listen, Listeners } -//FIXME: Roland, could you go through this file? - object FSM { + /** + * A partial function value which does not match anything and can be used to + * “reset” `whenUnhandled` and `onTermination` handlers. + * + * {{{ + * onTermination(FSM.NullFunction) + * }}} + */ object NullFunction extends PartialFunction[Any, Nothing] { def isDefinedAt(o: Any) = false def apply(o: Any) = sys.error("undefined") } + /** + * Message type which is sent directly to the subscribed actor in + * [[akka.actor.FSM.SubscribeTransitionCallback]] before sending any + * [[akka.actor.FSM.Transition]] messages. + */ case class CurrentState[S](fsmRef: ActorRef, state: S) + + /** + * Message type which is used to communicate transitions between states to + * all subscribed listeners (use [[akka.actor.FSM.SubscribeTransitionCallback]]). + */ case class Transition[S](fsmRef: ActorRef, from: S, to: S) + + /** + * Send this to an [[akka.actor.FSM]] to request first the [[akka.actor.CurrentState]] + * and then a series of [[akka.actor.Transition]] updates. Cancel the subscription + * using [[akka.actor.FSM.UnsubscribeTransitionCallback]]. + */ case class SubscribeTransitionCallBack(actorRef: ActorRef) + + /** + * Unsubscribe from [[akka.actor.FSM.Transition]] notifications which was + * effected by sending the corresponding [[akka.actor.FSM.SubscribeTransitionCallback]]. + */ case class UnsubscribeTransitionCallBack(actorRef: ActorRef) + /** + * Reason why this [[akka.actor.FSM]] is shutting down. + */ sealed trait Reason + + /** + * Default reason if calling `stop()`. + */ case object Normal extends Reason + + /** + * Reason given when someone was calling `system.stop(fsm)` from outside; + * also applies to `Stop` supervision directive. + */ case object Shutdown extends Reason + + /** + * Signifies that the [[akka.actor.FSM]] is shutting itself down because of + * an error, e.g. if the state to transition into does not exist. You can use + * this to communicate a more precise cause to the [[akka.actor.FSM$onTermination]] block. + */ case class Failure(cause: Any) extends Reason + /** + * This case object is received in case of a state timeout. + */ case object StateTimeout - case class TimeoutMarker(generation: Long) - case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(implicit system: ActorSystem) { + /** + * Internal API + */ + private case class TimeoutMarker(generation: Long) + + /** + * Internal API + */ + private[akka] case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(implicit system: ActorSystem) { private var ref: Option[Cancellable] = _ def schedule(actor: ActorRef, timeout: Duration) { @@ -57,8 +112,16 @@ object FSM { def unapply[S](in: (S, S)) = Some(in) } + /** + * Log Entry of the [[akka.actor.LoggingFSM]], can be obtained by calling `getLog`. + */ case class LogEntry[S, D](stateName: S, stateData: D, event: Any) + /** + * This captures all of the managed state of the [[akka.actor.FSM]]: the state + * name, the state data, possibly custom timeout, stop reason and replies + * accumulated while processing the last message. + */ case class State[S, D](stateName: S, stateData: D, timeout: Option[Duration] = None, stopReason: Option[Reason] = None, replies: List[Any] = Nil) { /** @@ -87,6 +150,9 @@ object FSM { copy(stateData = nextStateDate) } + /** + * Internal API. + */ private[akka] def withStopReason(reason: Reason): State[S, D] = { copy(stopReason = Some(reason)) } @@ -183,8 +249,19 @@ trait FSM[S, D] extends Listeners with ActorLogging { type Timeout = Option[Duration] type TransitionHandler = PartialFunction[(S, S), Unit] - // “import” so that it is visible without an import + /* + * “import” so that these are visible without an import + */ + + /** + * This extractor is just convenience for matching a (S, S) pair, including a + * reminder what the new state is. + */ val -> = FSM.-> + + /** + * This case object is received in case of a state timeout. + */ val StateTimeout = FSM.StateTimeout /** @@ -203,13 +280,9 @@ trait FSM[S, D] extends Listeners with ActorLogging { * @param stateTimeout default state timeout for this state * @param stateFunction partial function describing response to input */ - protected final def when(stateName: S, stateTimeout: Duration = null)(stateFunction: StateFunction): Unit = + final def when(stateName: S, stateTimeout: Duration = null)(stateFunction: StateFunction): Unit = register(stateName, stateFunction, Option(stateTimeout)) - @deprecated("use the more import-friendly variant taking a Duration", "2.0") - protected final def when(stateName: S, stateTimeout: Timeout)(stateFunction: StateFunction): Unit = - register(stateName, stateFunction, stateTimeout) - /** * Set initial state. Call this method from the constructor before the #initialize method. * @@ -217,9 +290,7 @@ trait FSM[S, D] extends Listeners with ActorLogging { * @param stateData initial state data * @param timeout state timeout for the initial state, overriding the default timeout for that state */ - protected final def startWith(stateName: S, - stateData: D, - timeout: Timeout = None): Unit = + final def startWith(stateName: S, stateData: D, timeout: Timeout = None): Unit = currentState = FSM.State(stateName, stateData, timeout) /** @@ -229,7 +300,7 @@ trait FSM[S, D] extends Listeners with ActorLogging { * @param nextStateName state designator for the next state * @return state transition descriptor */ - protected final def goto(nextStateName: S): State = FSM.State(nextStateName, currentState.stateData) + final def goto(nextStateName: S): State = FSM.State(nextStateName, currentState.stateData) /** * Produce "empty" transition descriptor. Return this from a state function @@ -237,29 +308,29 @@ trait FSM[S, D] extends Listeners with ActorLogging { * * @return descriptor for staying in current state */ - protected final def stay(): State = goto(currentState.stateName) // cannot directly use currentState because of the timeout field + final def stay(): State = goto(currentState.stateName) // cannot directly use currentState because of the timeout field /** * Produce change descriptor to stop this FSM actor with reason "Normal". */ - protected final def stop(): State = stop(Normal) + final def stop(): State = stop(Normal) /** * Produce change descriptor to stop this FSM actor including specified reason. */ - protected final def stop(reason: Reason): State = stop(reason, currentState.stateData) + final def stop(reason: Reason): State = stop(reason, currentState.stateData) /** * Produce change descriptor to stop this FSM actor including specified reason. */ - protected final def stop(reason: Reason, stateData: D): State = stay using stateData withStopReason (reason) + final def stop(reason: Reason, stateData: D): State = stay using stateData withStopReason (reason) - protected final class TransformHelper(func: StateFunction) { + final class TransformHelper(func: StateFunction) { def using(andThen: PartialFunction[State, State]): StateFunction = func andThen (andThen orElse { case x ⇒ x }) } - protected final def transform(func: StateFunction): TransformHelper = new TransformHelper(func) + final def transform(func: StateFunction): TransformHelper = new TransformHelper(func) /** * Schedule named timer to deliver message after given delay, possibly repeating. @@ -269,7 +340,9 @@ trait FSM[S, D] extends Listeners with ActorLogging { * @param repeat send once if false, scheduleAtFixedRate if true * @return current state descriptor */ - protected[akka] def setTimer(name: String, msg: Any, timeout: Duration, repeat: Boolean): State = { + final def setTimer(name: String, msg: Any, timeout: Duration, repeat: Boolean): State = { + if (debugEvent) + log.debug("setting " + (if (repeat) "repeating " else "") + "timer '" + name + "'/" + timeout + ": " + msg) if (timers contains name) { timers(name).cancel } @@ -283,24 +356,27 @@ trait FSM[S, D] extends Listeners with ActorLogging { * Cancel named timer, ensuring that the message is not subsequently delivered (no race). * @param name of the timer to cancel */ - protected[akka] def cancelTimer(name: String): Unit = + final def cancelTimer(name: String): Unit = { + if (debugEvent) + log.debug("canceling timer '" + name + "'") if (timers contains name) { timers(name).cancel timers -= name } + } /** * Inquire whether the named timer is still active. Returns true unless the * timer does not exist, has previously been canceled or if it was a * single-shot timer whose message was already received. */ - protected[akka] final def timerActive_?(name: String) = timers contains name + final def timerActive_?(name: String) = timers contains name /** * Set state timeout explicitly. This method can safely be used from within a * state handler. */ - protected final def setStateTimeout(state: S, timeout: Timeout): Unit = stateTimeouts(state) = timeout + final def setStateTimeout(state: S, timeout: Timeout): Unit = stateTimeouts(state) = timeout /** * Set handler which is called upon each state transition, i.e. not when @@ -327,50 +403,52 @@ trait FSM[S, D] extends Listeners with ActorLogging { * Multiple handlers may be installed, and every one of them will be * called, not only the first one matching. */ - protected final def onTransition(transitionHandler: TransitionHandler): Unit = transitionEvent :+= transitionHandler + final def onTransition(transitionHandler: TransitionHandler): Unit = transitionEvent :+= transitionHandler /** * Convenience wrapper for using a total function instead of a partial * function literal. To be used with onTransition. */ - implicit protected final def total2pf(transitionHandler: (S, S) ⇒ Unit): TransitionHandler = + implicit final def total2pf(transitionHandler: (S, S) ⇒ Unit): TransitionHandler = new TransitionHandler { def isDefinedAt(in: (S, S)) = true def apply(in: (S, S)) { transitionHandler(in._1, in._2) } } /** - * Set handler which is called upon termination of this FSM actor. + * Set handler which is called upon termination of this FSM actor. Calling + * this method again will overwrite the previous contents. */ - protected final def onTermination(terminationHandler: PartialFunction[StopEvent, Unit]): Unit = + final def onTermination(terminationHandler: PartialFunction[StopEvent, Unit]): Unit = terminateEvent = terminationHandler /** - * Set handler which is called upon reception of unhandled messages. + * Set handler which is called upon reception of unhandled messages. Calling + * this method again will overwrite the previous contents. */ - protected final def whenUnhandled(stateFunction: StateFunction): Unit = + final def whenUnhandled(stateFunction: StateFunction): Unit = handleEvent = stateFunction orElse handleEventDefault /** * Verify existence of initial state and setup timers. This should be the * last call within the constructor. */ - protected final def initialize: Unit = makeTransition(currentState) + final def initialize: Unit = makeTransition(currentState) /** * Return current state name (i.e. object of type S) */ - protected[akka] def stateName: S = currentState.stateName + final def stateName: S = currentState.stateName /** * Return current state data (i.e. object of type D) */ - protected[akka] def stateData: D = currentState.stateData + final def stateData: D = currentState.stateData /** * Return next state data (available in onTransition handlers) */ - protected[akka] def nextStateData = nextState.stateData + final def nextStateData = nextState.stateData /* * **************************************************************** @@ -378,6 +456,8 @@ trait FSM[S, D] extends Listeners with ActorLogging { * **************************************************************** */ + private[akka] def debugEvent: Boolean = false + /* * FSM State data and current timeout handling */ @@ -525,7 +605,21 @@ trait FSM[S, D] extends Listeners with ActorLogging { } } - override def postStop(): Unit = { terminate(stay withStopReason Shutdown) } + /** + * Call `onTermination` hook; if you want to retain this behavior when + * overriding make sure to call `super.postStop()`. + * + * Please note that this method is called by default from `preRestart()`, + * so override that one if `onTermination` shall not be called during + * restart. + */ + override def postStop(): Unit = { + /* + * setting this instance’s state to terminated does no harm during restart + * since the new instance will initialize fresh using startWith() + */ + terminate(stay withStopReason Shutdown) + } private def terminate(nextState: State): Unit = { if (!currentState.stopReason.isDefined) { @@ -542,13 +636,22 @@ trait FSM[S, D] extends Listeners with ActorLogging { } } + /** + * All messages sent to the [[akka.actor.FSM]] will be wrapped inside an + * `Event`, which allows pattern matching to extract both state and data. + */ case class Event(event: Any, stateData: D) + /** + * Case class representing the state of the [[akka.actor.FSM]] whithin the + * `onTermination` block. + */ case class StopEvent(reason: Reason, currentState: S, stateData: D) } /** - * Stackable trait for FSM which adds a rolling event log. + * Stackable trait for [[akka.actor.FSM]] which adds a rolling event log and + * debug logging capabilities (analogous to [[akka.event.LoggingReceive]]). * * @since 1.2 */ @@ -558,7 +661,7 @@ trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒ def logDepth: Int = 0 - private val debugEvent = context.system.settings.FsmDebugEvent + private[akka] override val debugEvent = context.system.settings.FsmDebugEvent private val events = new Array[Event](logDepth) private val states = new Array[AnyRef](logDepth) @@ -575,18 +678,6 @@ trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒ } } - protected[akka] abstract override def setTimer(name: String, msg: Any, timeout: Duration, repeat: Boolean): State = { - if (debugEvent) - log.debug("setting " + (if (repeat) "repeating " else "") + "timer '" + name + "'/" + timeout + ": " + msg) - super.setTimer(name, msg, timeout, repeat) - } - - protected[akka] abstract override def cancelTimer(name: String): Unit = { - if (debugEvent) - log.debug("canceling timer '" + name + "'") - super.cancelTimer(name) - } - private[akka] abstract override def processEvent(event: Event, source: AnyRef): Unit = { if (debugEvent) { val srcstr = source match { @@ -615,6 +706,7 @@ trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒ /** * Retrieve current rolling log in oldest-first order. The log is filled with * each incoming event before processing by the user supplied state handler. + * The log entries are lost when this actor is restarted. */ protected def getLog: IndexedSeq[LogEntry[S, D]] = { val log = events zip states filter (_._1 ne null) map (x ⇒ LogEntry(x._2.asInstanceOf[S], x._1.stateData, x._1.event)) diff --git a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala index 5f78e8ba27..adcbe53f0b 100644 --- a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala @@ -17,6 +17,19 @@ trait GracefulStopSupport { * Useful when you need to wait for termination or compose ordered termination of several actors, * which should only be done outside of the ActorSystem as blocking inside Actors is discouraged. * + * IMPORTANT NOTICE: the actor being terminated and its supervisor + * being informed of the availability of the deceased actor’s name are two + * distinct operations, which do not obey any reliable ordering. Especially + * the following will NOT work: + * + * {{{ + * def receive = { + * case msg => + * Await.result(gracefulStop(someChild, timeout), timeout) + * context.actorOf(Props(...), "someChild") // assuming that that was someChild’s name, this will NOT work + * } + * }}} + * * If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]] * is completed with failure [[akka.pattern.AskTimeoutException]]. */