diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index c5834ac633..d80d1285c7 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -18,6 +18,7 @@ import akka.experimental import akka.{ AkkaApplication, AkkaException } import scala.reflect.BeanProperty +import scala.util.control.NoStackTrace import com.eaio.uuid.UUID @@ -62,34 +63,41 @@ case object PoisonPill extends AutoReceivedMessage with PossiblyHarmful case object Kill extends AutoReceivedMessage with PossiblyHarmful +case class Terminated(@BeanProperty actor: ActorRef, @BeanProperty cause: Throwable) extends PossiblyHarmful + case object ReceiveTimeout extends PossiblyHarmful -case class Terminated(@BeanProperty actor: ActorRef, @BeanProperty cause: Throwable) - // Exceptions for Actors -class ActorStartException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause) { +class IllegalActorStateException private[akka] (message: String, cause: Throwable = null) + extends AkkaException(message, cause) { def this(msg: String) = this(msg, null); } -class IllegalActorStateException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause) { +class ActorKilledException private[akka] (message: String, cause: Throwable) + extends AkkaException(message, cause) + with NoStackTrace { def this(msg: String) = this(msg, null); } -class ActorKilledException private[akka] (message: String, cause: Throwable) extends AkkaException(message, cause) { +class ActorInitializationException private[akka] (message: String, cause: Throwable = null) + extends AkkaException(message, cause) { def this(msg: String) = this(msg, null); } -class ActorInitializationException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause) { +class ActorTimeoutException private[akka] (message: String, cause: Throwable = null) + extends AkkaException(message, cause) { def this(msg: String) = this(msg, null); } -class ActorTimeoutException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause) { +class InvalidMessageException private[akka] (message: String, cause: Throwable = null) + extends AkkaException(message, cause) + with NoStackTrace { def this(msg: String) = this(msg, null); } -class InvalidMessageException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause) { - def this(msg: String) = this(msg, null); -} +case class DeathPactException private[akka] (dead: ActorRef, cause: Throwable) + extends AkkaException("monitored actor " + dead + " terminated", cause) + with NoStackTrace /** * This message is thrown by default when an Actors behavior doesn't match a message @@ -391,8 +399,10 @@ trait Actor { * by default it does: EventHandler.warning(self, message) */ def unhandled(message: Any) { - //EventHandler.warning(self, message) - throw new UnhandledMessageException(message, self) + message match { + case Terminated(dead, cause) ⇒ throw new DeathPactException(dead, cause) + case _ ⇒ throw new UnhandledMessageException(message, self) + } } /** @@ -444,7 +454,7 @@ trait Actor { msg match { case msg if behaviorStack.nonEmpty && behaviorStack.head.isDefinedAt(msg) ⇒ behaviorStack.head.apply(msg) case msg if behaviorStack.isEmpty && processingBehavior.isDefinedAt(msg) ⇒ processingBehavior.apply(msg) - case unknown ⇒ unhandled(unknown) //This is the only line that differs from processingbehavior + case unknown ⇒ unhandled(unknown) } } } diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index a30fb5efe6..d654539e5d 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -4,50 +4,115 @@ package akka.actor import java.util.concurrent.TimeUnit +import scala.annotation.tailrec +import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConversions._ +import java.lang.{ Iterable ⇒ JIterable } case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) { - def requestRestartPermission(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = { - val denied = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) - false // Never deny an immortal - else if (maxNrOfRetries.nonEmpty && maxNrOfRetries.get < 1) - true //Always deny if no chance of restarting - else if (withinTimeRange.isEmpty) { - // restrict number of restarts - val retries = maxNrOfRetriesCount + 1 - maxNrOfRetriesCount = retries //Increment number of retries - retries > maxNrOfRetries.get - } else { - // cannot restart more than N within M timerange - val retries = maxNrOfRetriesCount + 1 - val windowStart = restartTimeWindowStartNanos - val now = System.nanoTime - // we are within the time window if it isn't the first restart, or if the window hasn't closed - val insideWindow = if (windowStart == 0) true else (now - windowStart) <= TimeUnit.MILLISECONDS.toNanos(withinTimeRange.get) - - if (windowStart == 0 || !insideWindow) //(Re-)set the start of the window - restartTimeWindowStartNanos = now - - // reset number of restarts if window has expired, otherwise, increment it - maxNrOfRetriesCount = if (windowStart != 0 && !insideWindow) 1 else retries // increment number of retries - - val restartCountLimit = if (maxNrOfRetries.isDefined) maxNrOfRetries.get else 1 - - // the actor is dead if it dies X times within the window of restart - insideWindow && retries > restartCountLimit + def requestRestartPermission(retriesWindow: (Option[Int], Option[Int])): Boolean = + retriesWindow match { + case (Some(retries), _) if retries < 1 ⇒ false + case (Some(retries), None) ⇒ maxNrOfRetriesCount += 1; maxNrOfRetriesCount <= retries + case (x @ (Some(_) | None), Some(window)) ⇒ retriesInWindowOkay(if (x.isDefined) x.get else 1, window) + case (None, _) ⇒ true } - denied == false // if we weren't denied, we have a go + private def retriesInWindowOkay(retries: Int, window: Int): Boolean = { + /* + * Simple window algorithm: window is kept open for a certain time + * after a restart and if enough restarts happen during this time, it + * denies. Otherwise window closes and the scheme starts over. + */ + val retriesDone = maxNrOfRetriesCount + 1 + val now = System.nanoTime + val windowStart = + if (restartTimeWindowStartNanos == 0) { + restartTimeWindowStartNanos = now + now + } else restartTimeWindowStartNanos + val insideWindow = (now - windowStart) <= TimeUnit.MILLISECONDS.toNanos(window) + if (insideWindow) { + maxNrOfRetriesCount = retriesDone + retriesDone <= retries + } else { + maxNrOfRetriesCount = 1 + restartTimeWindowStartNanos = now + true + } } } -sealed abstract class FaultHandlingStrategy { +object FaultHandlingStrategy { + sealed trait Action + case object Resume extends Action + case object Restart extends Action + case object Stop extends Action + case object Escalate extends Action - def trapExit: List[Class[_ <: Throwable]] + type Decider = PartialFunction[Class[_ <: Throwable], Action] + type JDecider = akka.japi.Function[Class[_ <: Throwable], Action] + type CauseAction = (Class[_ <: Throwable], Action) + + /** + * Backwards compatible Decider builder which just checks whether one of + * the given Throwables matches the cause and restarts, otherwise escalates. + */ + def makeDecider(trapExit: Array[Class[_ <: Throwable]]): Decider = + { case x ⇒ if (trapExit exists (_ isAssignableFrom x)) Restart else Escalate } + + /** + * Backwards compatible Decider builder which just checks whether one of + * the given Throwables matches the cause and restarts, otherwise escalates. + */ + def makeDecider(trapExit: List[Class[_ <: Throwable]]): Decider = + { case x ⇒ if (trapExit exists (_ isAssignableFrom x)) Restart else Escalate } + + /** + * Backwards compatible Decider builder which just checks whether one of + * the given Throwables matches the cause and restarts, otherwise escalates. + */ + def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider = makeDecider(trapExit.toList) + + /** + * Decider builder for Iterables of cause-action pairs, e.g. a map obtained + * from configuration; will sort the pairs so that the most specific type is + * checked before all its subtypes, allowing carving out subtrees of the + * Throwable hierarchy. + */ + def makeDecider(flat: Iterable[CauseAction]): Decider = { + val actions = sort(flat) + return { case x ⇒ actions find (_._1 isAssignableFrom x) map (_._2) getOrElse Escalate } + } + + def makeDecider(func: JDecider): Decider = { + case x ⇒ func(x) + } + + /** + * Sort so that subtypes always precede their supertypes, but without + * obeying any order between unrelated subtypes (insert sort). + */ + def sort(in: Iterable[CauseAction]): Seq[CauseAction] = + (new ArrayBuffer[CauseAction](in.size) /: in) { (buf, ca) ⇒ + buf.indexWhere(_._1 isAssignableFrom ca._1) match { + case -1 ⇒ buf append ca + case x ⇒ buf insert (x, ca) + } + buf + } +} + +abstract class FaultHandlingStrategy { + + import FaultHandlingStrategy._ + + def decider: Decider def handleChildTerminated(child: ActorRef, children: Vector[ChildRestartStats]): Vector[ChildRestartStats] - def processFailure(fail: Failed, children: Vector[ChildRestartStats]): Unit + def processFailure(restart: Boolean, fail: Failed, children: Vector[ChildRestartStats]): Unit def handleSupervisorFailing(supervisor: ActorRef, children: Vector[ChildRestartStats]): Unit = { if (children.nonEmpty) @@ -63,16 +128,25 @@ sealed abstract class FaultHandlingStrategy { * Returns whether it processed the failure or not */ final def handleFailure(fail: Failed, children: Vector[ChildRestartStats]): Boolean = { - if (trapExit.exists(_.isAssignableFrom(fail.cause.getClass))) { - processFailure(fail, children) - true - } else false + val cause = fail.cause.getClass + val action = if (decider.isDefinedAt(cause)) decider(cause) else Escalate + action match { + case Resume ⇒ fail.actor.resume(); true + case Restart ⇒ processFailure(true, fail, children); true + case Stop ⇒ processFailure(false, fail, children); true + case Escalate ⇒ false + } } } object AllForOneStrategy { def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): AllForOneStrategy = - new AllForOneStrategy(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + new AllForOneStrategy(FaultHandlingStrategy.makeDecider(trapExit), + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): AllForOneStrategy = + new AllForOneStrategy(FaultHandlingStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange) + def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int]): AllForOneStrategy = + new AllForOneStrategy(FaultHandlingStrategy.makeDecider(trapExit), maxNrOfRetries, None) } /** @@ -81,20 +155,31 @@ object AllForOneStrategy { * maxNrOfRetries = the number of times an actor is allowed to be restarted * withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window */ -case class AllForOneStrategy(trapExit: List[Class[_ <: Throwable]], +case class AllForOneStrategy(decider: FaultHandlingStrategy.Decider, maxNrOfRetries: Option[Int] = None, withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy { - def this(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = - this(trapExit, - if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + + def this(decider: FaultHandlingStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Int) = + this(FaultHandlingStrategy.makeDecider(decider), + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), + if (withinTimeRange < 0) None else Some(withinTimeRange)) + + def this(trapExit: JIterable[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = + this(FaultHandlingStrategy.makeDecider(trapExit), + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), + if (withinTimeRange < 0) None else Some(withinTimeRange)) def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = - this(trapExit.toList, - if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + this(FaultHandlingStrategy.makeDecider(trapExit), + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), + if (withinTimeRange < 0) None else Some(withinTimeRange)) - def this(trapExit: java.util.List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = - this(trapExit.toArray.toList.asInstanceOf[List[Class[_ <: Throwable]]], - if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + /* + * this is a performance optimization to avoid re-allocating the pairs upon + * every call to requestRestartPermission, assuming that strategies are shared + * across actors and thus this field does not take up much space + */ + val retriesWindow = (maxNrOfRetries, withinTimeRange) def handleChildTerminated(child: ActorRef, children: Vector[ChildRestartStats]): Vector[ChildRestartStats] = { children collect { @@ -102,9 +187,9 @@ case class AllForOneStrategy(trapExit: List[Class[_ <: Throwable]], } //TODO optimization to drop all children here already? } - def processFailure(fail: Failed, children: Vector[ChildRestartStats]): Unit = { + def processFailure(restart: Boolean, fail: Failed, children: Vector[ChildRestartStats]): Unit = { if (children.nonEmpty) { - if (children.forall(_.requestRestartPermission(maxNrOfRetries, withinTimeRange))) + if (restart && children.forall(_.requestRestartPermission(retriesWindow))) children.foreach(_.child.restart(fail.cause)) else children.foreach(_.child.stop()) @@ -114,7 +199,12 @@ case class AllForOneStrategy(trapExit: List[Class[_ <: Throwable]], object OneForOneStrategy { def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): OneForOneStrategy = - new OneForOneStrategy(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + new OneForOneStrategy(FaultHandlingStrategy.makeDecider(trapExit), + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): OneForOneStrategy = + new OneForOneStrategy(FaultHandlingStrategy.makeDecider(trapExit), maxNrOfRetries, withinTimeRange) + def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Option[Int]): OneForOneStrategy = + new OneForOneStrategy(FaultHandlingStrategy.makeDecider(trapExit), maxNrOfRetries, None) } /** @@ -123,28 +213,39 @@ object OneForOneStrategy { * maxNrOfRetries = the number of times an actor is allowed to be restarted * withinTimeRange = millisecond time window for maxNrOfRetries, negative means no window */ -case class OneForOneStrategy(trapExit: List[Class[_ <: Throwable]], +case class OneForOneStrategy(decider: FaultHandlingStrategy.Decider, maxNrOfRetries: Option[Int] = None, withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy { - def this(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = - this(trapExit, - if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + + def this(decider: FaultHandlingStrategy.JDecider, maxNrOfRetries: Int, withinTimeRange: Int) = + this(FaultHandlingStrategy.makeDecider(decider), + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), + if (withinTimeRange < 0) None else Some(withinTimeRange)) + + def this(trapExit: JIterable[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = + this(FaultHandlingStrategy.makeDecider(trapExit), + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), + if (withinTimeRange < 0) None else Some(withinTimeRange)) def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = - this(trapExit.toList, - if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + this(FaultHandlingStrategy.makeDecider(trapExit), + if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), + if (withinTimeRange < 0) None else Some(withinTimeRange)) - def this(trapExit: java.util.List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) = - this(trapExit.toArray.toList.asInstanceOf[List[Class[_ <: Throwable]]], - if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange)) + /* + * this is a performance optimization to avoid re-allocating the pairs upon + * every call to requestRestartPermission, assuming that strategies are shared + * across actors and thus this field does not take up much space + */ + val retriesWindow = (maxNrOfRetries, withinTimeRange) def handleChildTerminated(child: ActorRef, children: Vector[ChildRestartStats]): Vector[ChildRestartStats] = - children.filterNot(_.child == child) + children.filterNot(_.child == child) // TODO: check: I think this copies the whole vector in addition to allocating a closure ... - def processFailure(fail: Failed, children: Vector[ChildRestartStats]): Unit = { + def processFailure(restart: Boolean, fail: Failed, children: Vector[ChildRestartStats]): Unit = { children.find(_.child == fail.actor) match { case Some(stats) ⇒ - if (stats.requestRestartPermission(maxNrOfRetries, withinTimeRange)) + if (restart && stats.requestRestartPermission(retriesWindow)) fail.actor.restart(fail.cause) else fail.actor.stop() //TODO optimization to drop child here already? diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index 45528091f7..ba668ae280 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -16,10 +16,17 @@ import collection.immutable.Stack * FIXME document me */ object Props { + import FaultHandlingStrategy._ + final val defaultCreator: () ⇒ Actor = () ⇒ throw new UnsupportedOperationException("No actor creator specified!") final val defaultDispatcher: MessageDispatcher = null final val defaultTimeout: Timeout = Timeout(Duration.MinusInf) - final val defaultFaultHandler: FaultHandlingStrategy = OneForOneStrategy(classOf[Exception] :: Nil, None, None) + final val defaultDecider: Decider = { + case _: ActorInitializationException ⇒ Stop + case _: Exception ⇒ Restart + case _ ⇒ Escalate + } + final val defaultFaultHandler: FaultHandlingStrategy = OneForOneStrategy(defaultDecider, None, None) final val defaultSupervisor: Option[ActorRef] = None final val noHotSwap: Stack[Actor.Receive] = Stack.empty final val randomAddress: String = "" @@ -34,6 +41,8 @@ object Props { */ def apply(): Props = default + def empty = Props(context ⇒ { case null ⇒ }) + /** * Returns a Props that has default values except for "creator" which will be a function that creates an instance * of the supplied type using the default constructor diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index dc302c5af9..ac478b7a81 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -7,7 +7,7 @@ package akka.dispatch import akka.AkkaException import akka.event.EventHandler -import akka.actor.{ Actor, UntypedChannel, Timeout, ExceptionChannel } +import akka.actor.{ UntypedChannel, Timeout, ExceptionChannel } import scala.Option import akka.japi.{ Procedure, Function ⇒ JFunc, Option ⇒ JOption }