Merge branch 'wip-2006-binary-compat-√' of github.com:akka/akka into wip-2006-binary-compat-√
This commit is contained in:
commit
2161897424
2 changed files with 154 additions and 49 deletions
|
|
@ -8,29 +8,84 @@ import akka.util._
|
|||
import scala.collection.mutable
|
||||
import akka.routing.{ Deafen, Listen, Listeners }
|
||||
|
||||
//FIXME: Roland, could you go through this file?
|
||||
|
||||
object FSM {
|
||||
|
||||
/**
|
||||
* A partial function value which does not match anything and can be used to
|
||||
* “reset” `whenUnhandled` and `onTermination` handlers.
|
||||
*
|
||||
* {{{
|
||||
* onTermination(FSM.NullFunction)
|
||||
* }}}
|
||||
*/
|
||||
object NullFunction extends PartialFunction[Any, Nothing] {
|
||||
def isDefinedAt(o: Any) = false
|
||||
def apply(o: Any) = sys.error("undefined")
|
||||
}
|
||||
|
||||
/**
|
||||
* Message type which is sent directly to the subscribed actor in
|
||||
* [[akka.actor.FSM.SubscribeTransitionCallback]] before sending any
|
||||
* [[akka.actor.FSM.Transition]] messages.
|
||||
*/
|
||||
case class CurrentState[S](fsmRef: ActorRef, state: S)
|
||||
|
||||
/**
|
||||
* Message type which is used to communicate transitions between states to
|
||||
* all subscribed listeners (use [[akka.actor.FSM.SubscribeTransitionCallback]]).
|
||||
*/
|
||||
case class Transition[S](fsmRef: ActorRef, from: S, to: S)
|
||||
|
||||
/**
|
||||
* Send this to an [[akka.actor.FSM]] to request first the [[akka.actor.CurrentState]]
|
||||
* and then a series of [[akka.actor.Transition]] updates. Cancel the subscription
|
||||
* using [[akka.actor.FSM.UnsubscribeTransitionCallback]].
|
||||
*/
|
||||
case class SubscribeTransitionCallBack(actorRef: ActorRef)
|
||||
|
||||
/**
|
||||
* Unsubscribe from [[akka.actor.FSM.Transition]] notifications which was
|
||||
* effected by sending the corresponding [[akka.actor.FSM.SubscribeTransitionCallback]].
|
||||
*/
|
||||
case class UnsubscribeTransitionCallBack(actorRef: ActorRef)
|
||||
|
||||
/**
|
||||
* Reason why this [[akka.actor.FSM]] is shutting down.
|
||||
*/
|
||||
sealed trait Reason
|
||||
|
||||
/**
|
||||
* Default reason if calling `stop()`.
|
||||
*/
|
||||
case object Normal extends Reason
|
||||
|
||||
/**
|
||||
* Reason given when someone was calling `system.stop(fsm)` from outside;
|
||||
* also applies to `Stop` supervision directive.
|
||||
*/
|
||||
case object Shutdown extends Reason
|
||||
|
||||
/**
|
||||
* Signifies that the [[akka.actor.FSM]] is shutting itself down because of
|
||||
* an error, e.g. if the state to transition into does not exist. You can use
|
||||
* this to communicate a more precise cause to the [[akka.actor.FSM$onTermination]] block.
|
||||
*/
|
||||
case class Failure(cause: Any) extends Reason
|
||||
|
||||
/**
|
||||
* This case object is received in case of a state timeout.
|
||||
*/
|
||||
case object StateTimeout
|
||||
case class TimeoutMarker(generation: Long)
|
||||
|
||||
case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(implicit system: ActorSystem) {
|
||||
/**
|
||||
* Internal API
|
||||
*/
|
||||
private case class TimeoutMarker(generation: Long)
|
||||
|
||||
/**
|
||||
* Internal API
|
||||
*/
|
||||
private[akka] case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(implicit system: ActorSystem) {
|
||||
private var ref: Option[Cancellable] = _
|
||||
|
||||
def schedule(actor: ActorRef, timeout: Duration) {
|
||||
|
|
@ -57,8 +112,16 @@ object FSM {
|
|||
def unapply[S](in: (S, S)) = Some(in)
|
||||
}
|
||||
|
||||
/**
|
||||
* Log Entry of the [[akka.actor.LoggingFSM]], can be obtained by calling `getLog`.
|
||||
*/
|
||||
case class LogEntry[S, D](stateName: S, stateData: D, event: Any)
|
||||
|
||||
/**
|
||||
* This captures all of the managed state of the [[akka.actor.FSM]]: the state
|
||||
* name, the state data, possibly custom timeout, stop reason and replies
|
||||
* accumulated while processing the last message.
|
||||
*/
|
||||
case class State[S, D](stateName: S, stateData: D, timeout: Option[Duration] = None, stopReason: Option[Reason] = None, replies: List[Any] = Nil) {
|
||||
|
||||
/**
|
||||
|
|
@ -87,6 +150,9 @@ object FSM {
|
|||
copy(stateData = nextStateDate)
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal API.
|
||||
*/
|
||||
private[akka] def withStopReason(reason: Reason): State[S, D] = {
|
||||
copy(stopReason = Some(reason))
|
||||
}
|
||||
|
|
@ -183,8 +249,19 @@ trait FSM[S, D] extends Listeners with ActorLogging {
|
|||
type Timeout = Option[Duration]
|
||||
type TransitionHandler = PartialFunction[(S, S), Unit]
|
||||
|
||||
// “import” so that it is visible without an import
|
||||
/*
|
||||
* “import” so that these are visible without an import
|
||||
*/
|
||||
|
||||
/**
|
||||
* This extractor is just convenience for matching a (S, S) pair, including a
|
||||
* reminder what the new state is.
|
||||
*/
|
||||
val -> = FSM.->
|
||||
|
||||
/**
|
||||
* This case object is received in case of a state timeout.
|
||||
*/
|
||||
val StateTimeout = FSM.StateTimeout
|
||||
|
||||
/**
|
||||
|
|
@ -203,13 +280,9 @@ trait FSM[S, D] extends Listeners with ActorLogging {
|
|||
* @param stateTimeout default state timeout for this state
|
||||
* @param stateFunction partial function describing response to input
|
||||
*/
|
||||
protected final def when(stateName: S, stateTimeout: Duration = null)(stateFunction: StateFunction): Unit =
|
||||
final def when(stateName: S, stateTimeout: Duration = null)(stateFunction: StateFunction): Unit =
|
||||
register(stateName, stateFunction, Option(stateTimeout))
|
||||
|
||||
@deprecated("use the more import-friendly variant taking a Duration", "2.0")
|
||||
protected final def when(stateName: S, stateTimeout: Timeout)(stateFunction: StateFunction): Unit =
|
||||
register(stateName, stateFunction, stateTimeout)
|
||||
|
||||
/**
|
||||
* Set initial state. Call this method from the constructor before the #initialize method.
|
||||
*
|
||||
|
|
@ -217,9 +290,7 @@ trait FSM[S, D] extends Listeners with ActorLogging {
|
|||
* @param stateData initial state data
|
||||
* @param timeout state timeout for the initial state, overriding the default timeout for that state
|
||||
*/
|
||||
protected final def startWith(stateName: S,
|
||||
stateData: D,
|
||||
timeout: Timeout = None): Unit =
|
||||
final def startWith(stateName: S, stateData: D, timeout: Timeout = None): Unit =
|
||||
currentState = FSM.State(stateName, stateData, timeout)
|
||||
|
||||
/**
|
||||
|
|
@ -229,7 +300,7 @@ trait FSM[S, D] extends Listeners with ActorLogging {
|
|||
* @param nextStateName state designator for the next state
|
||||
* @return state transition descriptor
|
||||
*/
|
||||
protected final def goto(nextStateName: S): State = FSM.State(nextStateName, currentState.stateData)
|
||||
final def goto(nextStateName: S): State = FSM.State(nextStateName, currentState.stateData)
|
||||
|
||||
/**
|
||||
* Produce "empty" transition descriptor. Return this from a state function
|
||||
|
|
@ -237,29 +308,29 @@ trait FSM[S, D] extends Listeners with ActorLogging {
|
|||
*
|
||||
* @return descriptor for staying in current state
|
||||
*/
|
||||
protected final def stay(): State = goto(currentState.stateName) // cannot directly use currentState because of the timeout field
|
||||
final def stay(): State = goto(currentState.stateName) // cannot directly use currentState because of the timeout field
|
||||
|
||||
/**
|
||||
* Produce change descriptor to stop this FSM actor with reason "Normal".
|
||||
*/
|
||||
protected final def stop(): State = stop(Normal)
|
||||
final def stop(): State = stop(Normal)
|
||||
|
||||
/**
|
||||
* Produce change descriptor to stop this FSM actor including specified reason.
|
||||
*/
|
||||
protected final def stop(reason: Reason): State = stop(reason, currentState.stateData)
|
||||
final def stop(reason: Reason): State = stop(reason, currentState.stateData)
|
||||
|
||||
/**
|
||||
* Produce change descriptor to stop this FSM actor including specified reason.
|
||||
*/
|
||||
protected final def stop(reason: Reason, stateData: D): State = stay using stateData withStopReason (reason)
|
||||
final def stop(reason: Reason, stateData: D): State = stay using stateData withStopReason (reason)
|
||||
|
||||
protected final class TransformHelper(func: StateFunction) {
|
||||
final class TransformHelper(func: StateFunction) {
|
||||
def using(andThen: PartialFunction[State, State]): StateFunction =
|
||||
func andThen (andThen orElse { case x ⇒ x })
|
||||
}
|
||||
|
||||
protected final def transform(func: StateFunction): TransformHelper = new TransformHelper(func)
|
||||
final def transform(func: StateFunction): TransformHelper = new TransformHelper(func)
|
||||
|
||||
/**
|
||||
* Schedule named timer to deliver message after given delay, possibly repeating.
|
||||
|
|
@ -269,7 +340,9 @@ trait FSM[S, D] extends Listeners with ActorLogging {
|
|||
* @param repeat send once if false, scheduleAtFixedRate if true
|
||||
* @return current state descriptor
|
||||
*/
|
||||
protected[akka] def setTimer(name: String, msg: Any, timeout: Duration, repeat: Boolean): State = {
|
||||
final def setTimer(name: String, msg: Any, timeout: Duration, repeat: Boolean): State = {
|
||||
if (debugEvent)
|
||||
log.debug("setting " + (if (repeat) "repeating " else "") + "timer '" + name + "'/" + timeout + ": " + msg)
|
||||
if (timers contains name) {
|
||||
timers(name).cancel
|
||||
}
|
||||
|
|
@ -283,24 +356,27 @@ trait FSM[S, D] extends Listeners with ActorLogging {
|
|||
* Cancel named timer, ensuring that the message is not subsequently delivered (no race).
|
||||
* @param name of the timer to cancel
|
||||
*/
|
||||
protected[akka] def cancelTimer(name: String): Unit =
|
||||
final def cancelTimer(name: String): Unit = {
|
||||
if (debugEvent)
|
||||
log.debug("canceling timer '" + name + "'")
|
||||
if (timers contains name) {
|
||||
timers(name).cancel
|
||||
timers -= name
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Inquire whether the named timer is still active. Returns true unless the
|
||||
* timer does not exist, has previously been canceled or if it was a
|
||||
* single-shot timer whose message was already received.
|
||||
*/
|
||||
protected[akka] final def timerActive_?(name: String) = timers contains name
|
||||
final def timerActive_?(name: String) = timers contains name
|
||||
|
||||
/**
|
||||
* Set state timeout explicitly. This method can safely be used from within a
|
||||
* state handler.
|
||||
*/
|
||||
protected final def setStateTimeout(state: S, timeout: Timeout): Unit = stateTimeouts(state) = timeout
|
||||
final def setStateTimeout(state: S, timeout: Timeout): Unit = stateTimeouts(state) = timeout
|
||||
|
||||
/**
|
||||
* Set handler which is called upon each state transition, i.e. not when
|
||||
|
|
@ -327,50 +403,52 @@ trait FSM[S, D] extends Listeners with ActorLogging {
|
|||
* <b>Multiple handlers may be installed, and every one of them will be
|
||||
* called, not only the first one matching.</b>
|
||||
*/
|
||||
protected final def onTransition(transitionHandler: TransitionHandler): Unit = transitionEvent :+= transitionHandler
|
||||
final def onTransition(transitionHandler: TransitionHandler): Unit = transitionEvent :+= transitionHandler
|
||||
|
||||
/**
|
||||
* Convenience wrapper for using a total function instead of a partial
|
||||
* function literal. To be used with onTransition.
|
||||
*/
|
||||
implicit protected final def total2pf(transitionHandler: (S, S) ⇒ Unit): TransitionHandler =
|
||||
implicit final def total2pf(transitionHandler: (S, S) ⇒ Unit): TransitionHandler =
|
||||
new TransitionHandler {
|
||||
def isDefinedAt(in: (S, S)) = true
|
||||
def apply(in: (S, S)) { transitionHandler(in._1, in._2) }
|
||||
}
|
||||
|
||||
/**
|
||||
* Set handler which is called upon termination of this FSM actor.
|
||||
* Set handler which is called upon termination of this FSM actor. Calling
|
||||
* this method again will overwrite the previous contents.
|
||||
*/
|
||||
protected final def onTermination(terminationHandler: PartialFunction[StopEvent, Unit]): Unit =
|
||||
final def onTermination(terminationHandler: PartialFunction[StopEvent, Unit]): Unit =
|
||||
terminateEvent = terminationHandler
|
||||
|
||||
/**
|
||||
* Set handler which is called upon reception of unhandled messages.
|
||||
* Set handler which is called upon reception of unhandled messages. Calling
|
||||
* this method again will overwrite the previous contents.
|
||||
*/
|
||||
protected final def whenUnhandled(stateFunction: StateFunction): Unit =
|
||||
final def whenUnhandled(stateFunction: StateFunction): Unit =
|
||||
handleEvent = stateFunction orElse handleEventDefault
|
||||
|
||||
/**
|
||||
* Verify existence of initial state and setup timers. This should be the
|
||||
* last call within the constructor.
|
||||
*/
|
||||
protected final def initialize: Unit = makeTransition(currentState)
|
||||
final def initialize: Unit = makeTransition(currentState)
|
||||
|
||||
/**
|
||||
* Return current state name (i.e. object of type S)
|
||||
*/
|
||||
protected[akka] def stateName: S = currentState.stateName
|
||||
final def stateName: S = currentState.stateName
|
||||
|
||||
/**
|
||||
* Return current state data (i.e. object of type D)
|
||||
*/
|
||||
protected[akka] def stateData: D = currentState.stateData
|
||||
final def stateData: D = currentState.stateData
|
||||
|
||||
/**
|
||||
* Return next state data (available in onTransition handlers)
|
||||
*/
|
||||
protected[akka] def nextStateData = nextState.stateData
|
||||
final def nextStateData = nextState.stateData
|
||||
|
||||
/*
|
||||
* ****************************************************************
|
||||
|
|
@ -378,6 +456,8 @@ trait FSM[S, D] extends Listeners with ActorLogging {
|
|||
* ****************************************************************
|
||||
*/
|
||||
|
||||
private[akka] def debugEvent: Boolean = false
|
||||
|
||||
/*
|
||||
* FSM State data and current timeout handling
|
||||
*/
|
||||
|
|
@ -525,7 +605,21 @@ trait FSM[S, D] extends Listeners with ActorLogging {
|
|||
}
|
||||
}
|
||||
|
||||
override def postStop(): Unit = { terminate(stay withStopReason Shutdown) }
|
||||
/**
|
||||
* Call `onTermination` hook; if you want to retain this behavior when
|
||||
* overriding make sure to call `super.postStop()`.
|
||||
*
|
||||
* Please note that this method is called by default from `preRestart()`,
|
||||
* so override that one if `onTermination` shall not be called during
|
||||
* restart.
|
||||
*/
|
||||
override def postStop(): Unit = {
|
||||
/*
|
||||
* setting this instance’s state to terminated does no harm during restart
|
||||
* since the new instance will initialize fresh using startWith()
|
||||
*/
|
||||
terminate(stay withStopReason Shutdown)
|
||||
}
|
||||
|
||||
private def terminate(nextState: State): Unit = {
|
||||
if (!currentState.stopReason.isDefined) {
|
||||
|
|
@ -542,13 +636,22 @@ trait FSM[S, D] extends Listeners with ActorLogging {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* All messages sent to the [[akka.actor.FSM]] will be wrapped inside an
|
||||
* `Event`, which allows pattern matching to extract both state and data.
|
||||
*/
|
||||
case class Event(event: Any, stateData: D)
|
||||
|
||||
/**
|
||||
* Case class representing the state of the [[akka.actor.FSM]] whithin the
|
||||
* `onTermination` block.
|
||||
*/
|
||||
case class StopEvent(reason: Reason, currentState: S, stateData: D)
|
||||
}
|
||||
|
||||
/**
|
||||
* Stackable trait for FSM which adds a rolling event log.
|
||||
* Stackable trait for [[akka.actor.FSM]] which adds a rolling event log and
|
||||
* debug logging capabilities (analogous to [[akka.event.LoggingReceive]]).
|
||||
*
|
||||
* @since 1.2
|
||||
*/
|
||||
|
|
@ -558,7 +661,7 @@ trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒
|
|||
|
||||
def logDepth: Int = 0
|
||||
|
||||
private val debugEvent = context.system.settings.FsmDebugEvent
|
||||
private[akka] override val debugEvent = context.system.settings.FsmDebugEvent
|
||||
|
||||
private val events = new Array[Event](logDepth)
|
||||
private val states = new Array[AnyRef](logDepth)
|
||||
|
|
@ -575,18 +678,6 @@ trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒
|
|||
}
|
||||
}
|
||||
|
||||
protected[akka] abstract override def setTimer(name: String, msg: Any, timeout: Duration, repeat: Boolean): State = {
|
||||
if (debugEvent)
|
||||
log.debug("setting " + (if (repeat) "repeating " else "") + "timer '" + name + "'/" + timeout + ": " + msg)
|
||||
super.setTimer(name, msg, timeout, repeat)
|
||||
}
|
||||
|
||||
protected[akka] abstract override def cancelTimer(name: String): Unit = {
|
||||
if (debugEvent)
|
||||
log.debug("canceling timer '" + name + "'")
|
||||
super.cancelTimer(name)
|
||||
}
|
||||
|
||||
private[akka] abstract override def processEvent(event: Event, source: AnyRef): Unit = {
|
||||
if (debugEvent) {
|
||||
val srcstr = source match {
|
||||
|
|
@ -615,6 +706,7 @@ trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒
|
|||
/**
|
||||
* Retrieve current rolling log in oldest-first order. The log is filled with
|
||||
* each incoming event before processing by the user supplied state handler.
|
||||
* The log entries are lost when this actor is restarted.
|
||||
*/
|
||||
protected def getLog: IndexedSeq[LogEntry[S, D]] = {
|
||||
val log = events zip states filter (_._1 ne null) map (x ⇒ LogEntry(x._2.asInstanceOf[S], x._1.stateData, x._1.event))
|
||||
|
|
|
|||
|
|
@ -17,6 +17,19 @@ trait GracefulStopSupport {
|
|||
* Useful when you need to wait for termination or compose ordered termination of several actors,
|
||||
* which should only be done outside of the ActorSystem as blocking inside Actors is discouraged.
|
||||
*
|
||||
* <b>IMPORTANT NOTICE:</b> the actor being terminated and its supervisor
|
||||
* being informed of the availability of the deceased actor’s name are two
|
||||
* distinct operations, which do not obey any reliable ordering. Especially
|
||||
* the following will NOT work:
|
||||
*
|
||||
* {{{
|
||||
* def receive = {
|
||||
* case msg =>
|
||||
* Await.result(gracefulStop(someChild, timeout), timeout)
|
||||
* context.actorOf(Props(...), "someChild") // assuming that that was someChild’s name, this will NOT work
|
||||
* }
|
||||
* }}}
|
||||
*
|
||||
* If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]]
|
||||
* is completed with failure [[akka.pattern.AskTimeoutException]].
|
||||
*/
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue