-1 volatile field in ActorRef, trapExit is migrated into faultHandler

This commit is contained in:
Viktor Klang 2010-10-08 13:40:13 +02:00
parent 2080f5b693
commit a43fcb0934
11 changed files with 87 additions and 141 deletions

View file

@ -6,7 +6,6 @@ package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.dispatch._
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.config.{ AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy }
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.stm.TransactionManagement._
@ -27,6 +26,7 @@ import java.util.{ Map => JMap }
import java.lang.reflect.Field
import scala.reflect.BeanProperty
import se.scalablesolutions.akka.config.{NoFaultHandlingStrategy, AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy}
object ActorRefStatus {
/** LifeCycles for ActorRefs
@ -126,39 +126,19 @@ trait ActorRef extends ActorRefShared with TransactionManagement with Logging wi
/**
* Akka Java API
* Set 'trapExit' to the list of exception classes that the actor should be able to trap
* from the actor it is supervising. When the supervising actor throws these exceptions
* then they will trigger a restart.
* <p/>
*
* Trap all exceptions:
* <pre>
* getContext().setTrapExit(new Class[]{Throwable.class});
* </pre>
*
* Trap specific exceptions only:
* <pre>
* getContext().setTrapExit(new Class[]{MyApplicationException.class, MyApplicationError.class});
* </pre>
*/
def setTrapExit(exceptions: Array[Class[_ <: Throwable]]) = trapExit = exceptions.toList
def getTrapExit(): Array[Class[_ <: Throwable]] = trapExit.toArray
/**
* Akka Java API
* If 'trapExit' is set for the actor to act as supervisor, then a 'faultHandler' must be defined.
* A faultHandler defines what should be done when a linked actor signals an error.
* <p/>
* Can be one of:
* <pre>
* getContext().setFaultHandler(new AllForOneStrategy(maxNrOfRetries, withinTimeRange));
* getContext().setFaultHandler(new AllForOneStrategy(new Class[]{Throwable.class},maxNrOfRetries, withinTimeRange));
* </pre>
* Or:
* <pre>
* getContext().setFaultHandler(new OneForOneStrategy(maxNrOfRetries, withinTimeRange));
* getContext().setFaultHandler(new OneForOneStrategy(new Class[]{Throwable.class},maxNrOfRetries, withinTimeRange));
* </pre>
*/
def setFaultHandler(handler: FaultHandlingStrategy) = this.faultHandler = Some(handler)
def getFaultHandler(): Option[FaultHandlingStrategy] = faultHandler
def setFaultHandler(handler: FaultHandlingStrategy)
def getFaultHandler(): FaultHandlingStrategy
@volatile
private[akka] var _dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher
@ -520,7 +500,7 @@ trait ActorRef extends ActorRefShared with TransactionManagement with Logging wi
* Links an other actor to this actor. Links are unidirectional and means that a the linking actor will
* receive a notification if the linked actor has crashed.
* <p/>
* If the 'trapExit' member field has been set to at contain at least one exception class then it will
* If the 'trapExit' member field of the 'faultHandler' has been set to at contain at least one exception class then it will
* 'trap' these exceptions and automatically restart the linked actors according to the restart strategy
* defined by the 'faultHandler'.
*/
@ -845,7 +825,7 @@ class LocalActorRef private[akka] (
* Links an other actor to this actor. Links are unidirectional and means that a the linking actor will
* receive a notification if the linked actor has crashed.
* <p/>
* If the 'trapExit' member field has been set to at contain at least one exception class then it will
* If the 'trapExit' member field of the 'faultHandler' has been set to at contain at least one exception class then it will
* 'trap' these exceptions and automatically restart the linked actors according to the restart strategy
* defined by the 'faultHandler'.
* <p/>
@ -1034,20 +1014,19 @@ class LocalActorRef private[akka] (
}
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = {
if (trapExit.exists(_.isAssignableFrom(reason.getClass))) {
if (faultHandler.trapExit.exists(_.isAssignableFrom(reason.getClass))) {
faultHandler match {
case Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange)) =>
case AllForOneStrategy(_,maxNrOfRetries, withinTimeRange) =>
restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
case Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange)) =>
case OneForOneStrategy(_,maxNrOfRetries, withinTimeRange) =>
dead.restart(reason, maxNrOfRetries, withinTimeRange)
case None => throw new IllegalActorStateException(
"No 'faultHandler' defined for an actor with the 'trapExit' member field defined " +
"\n\tto non-empty list of exception classes - can't proceed " + toString)
case NoFaultHandlingStrategy =>
notifySupervisorWithMessage(Exit(this, reason)) //This shouldn't happen
}
} else {
notifySupervisorWithMessage(Exit(this, reason)) // if 'trapExit' is not defined then pass the Exit on
notifySupervisorWithMessage(Exit(this, reason)) // if 'trapExit' isn't triggered then pass the Exit on
}
}
@ -1360,7 +1339,7 @@ object RemoteActorSystemMessage {
*/
private[akka] case class RemoteActorRef private[akka] (
classOrServiceName: String,
val className: String,
val actorClassName: String,
val hostname: String,
val port: Int,
_timeout: Long,
@ -1374,7 +1353,6 @@ private[akka] case class RemoteActorRef private[akka] (
timeout = _timeout
start
lazy val remoteClient = RemoteClientModule.clientFor(hostname, port, loader)
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
RemoteClientModule.send[Any](
@ -1391,21 +1369,18 @@ private[akka] case class RemoteActorRef private[akka] (
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
}
def start: ActorRef = {
def start: ActorRef = synchronized {
_status = ActorRefStatus.RUNNING
this
}
def stop: Unit = {
_status = ActorRefStatus.SHUTDOWN
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
def stop: Unit = synchronized {
if (_status != ActorRefStatus.SHUTDOWN) {
_status = ActorRefStatus.SHUTDOWN
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
}
}
/**
* Returns the class name for the Actor instance that is managed by the ActorRef.
*/
def actorClassName: String = className
protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = None
val remoteAddress: Option[InetSocketAddress] = Some(new InetSocketAddress(hostname, port))
@ -1496,47 +1471,21 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
/**
* User overridable callback/setting.
*
* <p/>
* Set trapExit to the list of exception classes that the actor should be able to trap
* from the actor it is supervising. When the supervising actor throws these exceptions
* then they will trigger a restart.
* <p/>
*
* Trap no exceptions:
* <pre>
* trapExit = Nil
* </pre>
*
* Trap all exceptions:
* <pre>
* trapExit = List(classOf[Throwable])
* </pre>
*
* Trap specific exceptions only:
* <pre>
* trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError])
* </pre>
*/
@volatile
var trapExit: List[Class[_ <: Throwable]] = Nil
/**
* User overridable callback/setting.
* <p/>
* If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined.
* Don't forget to supply a List of exception types to intercept (trapExit)
* <p/>
* Can be one of:
* <pre>
* faultHandler = Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange))
* faultHandler = AllForOneStrategy(trapExit = List(classOf[Exception]),maxNrOfRetries, withinTimeRange)
* </pre>
* Or:
* <pre>
* faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange))
* faultHandler = OneForOneStrategy(trapExit = List(classOf[Exception]),maxNrOfRetries, withinTimeRange)
* </pre>
*/
@volatile
var faultHandler: Option[FaultHandlingStrategy] = None
@BeanProperty
var faultHandler: FaultHandlingStrategy = NoFaultHandlingStrategy
/**
* The reference sender Actor of the last received message.

View file

@ -79,14 +79,14 @@ object Supervisor {
object SupervisorFactory {
def apply(config: SupervisorConfig) = new SupervisorFactory(config)
private[akka] def retrieveFaultHandlerAndTrapExitsFrom(config: SupervisorConfig):
Tuple2[FaultHandlingStrategy, List[Class[_ <: Throwable]]] = config match {
case SupervisorConfig(RestartStrategy(scheme, maxNrOfRetries, timeRange, trapExceptions), _) =>
scheme match {
case AllForOne => (AllForOneStrategy(maxNrOfRetries, timeRange), trapExceptions)
case OneForOne => (OneForOneStrategy(maxNrOfRetries, timeRange), trapExceptions)
}
}
private[akka] def retrieveFaultHandlerAndTrapExitsFrom(config: SupervisorConfig): FaultHandlingStrategy =
config match {
case SupervisorConfig(RestartStrategy(scheme, maxNrOfRetries, timeRange, trapExceptions), _) =>
scheme match {
case AllForOne => AllForOneStrategy(trapExceptions,maxNrOfRetries, timeRange)
case OneForOne => OneForOneStrategy(trapExceptions,maxNrOfRetries, timeRange)
}
}
}
/**
@ -99,9 +99,8 @@ class SupervisorFactory private[akka] (val config: SupervisorConfig) extends Log
def newInstance: Supervisor = newInstanceFor(config)
def newInstanceFor(config: SupervisorConfig): Supervisor = {
val (handler, trapExits) = SupervisorFactory.retrieveFaultHandlerAndTrapExitsFrom(config)
val supervisor = new Supervisor(handler, trapExits)
def newInstanceFor(config: SupervisorConfig): Supervisor = {
val supervisor = new Supervisor(SupervisorFactory.retrieveFaultHandlerAndTrapExitsFrom(config))
supervisor.configure(config)
supervisor.start
supervisor
@ -121,13 +120,13 @@ class SupervisorFactory private[akka] (val config: SupervisorConfig) extends Log
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed class Supervisor private[akka] (
handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]]) {
handler: FaultHandlingStrategy) {
import Supervisor._
private val _childActors = new ConcurrentHashMap[String, List[ActorRef]]
private val _childSupervisors = new CopyOnWriteArrayList[Supervisor]
private[akka] val supervisor = actorOf(new SupervisorActor(handler, trapExceptions)).start
private[akka] val supervisor = actorOf(new SupervisorActor(handler)).start
def uuid = supervisor.uuid
@ -179,13 +178,9 @@ sealed class Supervisor private[akka] (
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
final class SupervisorActor private[akka] (
handler: FaultHandlingStrategy,
trapExceptions: List[Class[_ <: Throwable]]) extends Actor {
final class SupervisorActor private[akka] (handler: FaultHandlingStrategy) extends Actor {
import self._
trapExit = trapExceptions
faultHandler = Some(handler)
faultHandler = handler
override def postStop(): Unit = shutdownLinkedActors

View file

@ -7,20 +7,45 @@ package se.scalablesolutions.akka.config
import se.scalablesolutions.akka.actor.{ActorRef}
import se.scalablesolutions.akka.dispatch.MessageDispatcher
sealed abstract class FaultHandlingStrategy
object AllForOneStrategy {
def apply(maxNrOfRetries: Int, withinTimeRange: Int): AllForOneStrategy =
AllForOneStrategy(if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
if (withinTimeRange < 0) None else Some(withinTimeRange))
sealed abstract class FaultHandlingStrategy {
def trapExit: List[Class[_ <: Throwable]]
}
object AllForOneStrategy {
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
new AllForOneStrategy(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
def apply(trapExit: Array[Class[Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
new AllForOneStrategy(trapExit.toList,maxNrOfRetries,withinTimeRange)
}
case class AllForOneStrategy(trapExit: List[Class[_ <: Throwable]],
maxNrOfRetries: Option[Int] = None,
withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy {
def this(trapExit: List[Class[_ <: Throwable]],maxNrOfRetries: Int, withinTimeRange: Int) =
this(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
def this(trapExit: Array[Class[Throwable]],maxNrOfRetries: Int, withinTimeRange: Int) =
this(trapExit.toList,maxNrOfRetries,withinTimeRange)
}
case class AllForOneStrategy(maxNrOfRetries: Option[Int] = None, withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy
object OneForOneStrategy {
def apply(maxNrOfRetries: Int, withinTimeRange: Int): OneForOneStrategy =
this(if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
if (withinTimeRange < 0) None else Some(withinTimeRange))
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
new OneForOneStrategy(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
def apply(trapExit: Array[Class[Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
new OneForOneStrategy(trapExit.toList,maxNrOfRetries,withinTimeRange)
}
case class OneForOneStrategy(trapExit: List[Class[_ <: Throwable]],
maxNrOfRetries: Option[Int] = None,
withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy {
def this(trapExit: List[Class[_ <: Throwable]],maxNrOfRetries: Int, withinTimeRange: Int) =
this(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
def this(trapExit: Array[Class[Throwable]],maxNrOfRetries: Int, withinTimeRange: Int) =
this(trapExit.toList,maxNrOfRetries,withinTimeRange)
}
case object NoFaultHandlingStrategy extends FaultHandlingStrategy {
def trapExit: List[Class[_ <: Throwable]] = Nil
}
case class OneForOneStrategy(maxNrOfRetries: Option[Int] = None, withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy
/**
* Configuration classes - not to be used as messages.

View file

@ -22,8 +22,7 @@ class RestartStrategySpec extends JUnitSuite {
def slaveShouldStayDeadAfterMaxRestarts = {
val boss = actorOf(new Actor{
self.trapExit = List(classOf[Throwable])
self.faultHandler = Some(OneForOneStrategy(1, 1000))
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), 1, 1000)
protected def receive = { case _ => () }
}).start
@ -75,8 +74,7 @@ class RestartStrategySpec extends JUnitSuite {
def slaveShouldBeImmortalWithoutMaxRestarts = {
val boss = actorOf(new Actor{
self.trapExit = List(classOf[Throwable])
self.faultHandler = Some(OneForOneStrategy(None, None))
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), None, None)
protected def receive = { case _ => () }
}).start

View file

@ -37,8 +37,7 @@ class SupervisorHierarchySpec extends JUnitSuite {
val workerThree = actorOf(new CountDownActor(countDown))
val boss = actorOf(new Actor{
self.trapExit = List(classOf[Throwable])
self.faultHandler = Some(OneForOneStrategy(5, 1000))
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), 5, 1000)
protected def receive = { case _ => () }
}).start
@ -63,8 +62,7 @@ class SupervisorHierarchySpec extends JUnitSuite {
val countDown = new CountDownLatch(2)
val crasher = actorOf(new CountDownActor(countDown))
val boss = actorOf(new Actor{
self.trapExit = List(classOf[Throwable])
self.faultHandler = Some(OneForOneStrategy(1, 5000))
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), 1, 5000)
protected def receive = {
case MaximumNumberOfRestartsWithinTimeRangeReached(_, _, _, _) =>
countDown.countDown

View file

@ -95,8 +95,7 @@ object SupervisorSpec {
}
class Master extends Actor {
self.trapExit = classOf[Exception] :: Nil
self.faultHandler = Some(OneForOneStrategy(5, 1000))
self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, 1000)
val temp = self.spawnLink[TemporaryActor]
override def receive = {
case Die => temp !! (Die, 5000)

View file

@ -184,8 +184,7 @@ object AMQP {
class AMQPSupervisorActor extends Actor {
import self._
faultHandler = Some(OneForOneStrategy(5, 5000))
trapExit = List(classOf[Throwable])
faultHandler = OneForOneStrategy(List(classOf[Throwable]),5, 5000)
def receive = {
case _ => {} // ignore all messages

View file

@ -17,9 +17,7 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio
self.id = "amqp-connection-%s".format(host)
self.lifeCycle = Permanent
self.trapExit = List(classOf[Throwable])
self.faultHandler = Some(OneForOneStrategy(5, 5000))
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]),5, 5000)
val reconnectionTimer = new Timer("%s-timer".format(self.id))

View file

@ -170,9 +170,7 @@ trait RedisChatStorageFactory { this: Actor =>
* Chat server. Manages sessions and redirects all other messages to the Session for the client.
*/
trait ChatServer extends Actor {
self.faultHandler = Some(OneForOneStrategy(5, 5000))
self.trapExit = List(classOf[Exception])
self.faultHandler = OneForOneStrategy(List(classOf[Exception]),5, 5000)
val storage: ActorRef
log.info("Chat server is starting up...")

View file

@ -665,13 +665,12 @@ object TypedActor extends Logging {
* @param trapExceptions array of exceptions that should be handled by the supervisor
*/
def link(supervisor: AnyRef, supervised: AnyRef,
handler: FaultHandlingStrategy, trapExceptions: Array[Class[_ <: Throwable]]) = {
handler: FaultHandlingStrategy) = {
val supervisorActor = actorFor(supervisor).getOrElse(
throw new IllegalActorStateException("Can't link when the supervisor is not an Typed Actor"))
val supervisedActor = actorFor(supervised).getOrElse(
throw new IllegalActorStateException("Can't link when the supervised is not an Typed Actor"))
supervisorActor.trapExit = trapExceptions.toList
supervisorActor.faultHandler = Some(handler)
supervisorActor.faultHandler = handler
supervisorActor.link(supervisedActor)
}
@ -688,18 +687,6 @@ object TypedActor extends Logging {
supervisorActor.unlink(supervisedActor)
}
/**
* Sets the trap exit for the given supervisor Typed Actor.
* @param supervisor the supervisor Typed Actor
* @param trapExceptions array of exceptions that should be handled by the supervisor
*/
def trapExit(supervisor: AnyRef, trapExceptions: Array[Class[_ <: Throwable]]) = {
val supervisorActor = actorFor(supervisor).getOrElse(
throw new IllegalActorStateException("Can't set trap exceptions when the supervisor is not an Typed Actor"))
supervisorActor.trapExit = trapExceptions.toList
this
}
/**
* Sets the fault handling strategy for the given supervisor Typed Actor.
* @param supervisor the supervisor Typed Actor
@ -708,7 +695,7 @@ object TypedActor extends Logging {
def faultHandler(supervisor: AnyRef, handler: FaultHandlingStrategy) = {
val supervisorActor = actorFor(supervisor).getOrElse(
throw new IllegalActorStateException("Can't set fault handler when the supervisor is not an Typed Actor"))
supervisorActor.faultHandler = Some(handler)
supervisorActor.faultHandler = handler
this
}

View file

@ -87,7 +87,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
SamplePojoImpl.reset
val pojo = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl])
val supervisor = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl])
link(supervisor, pojo, OneForOneStrategy(3, 2000), Array(classOf[Throwable]))
link(supervisor, pojo, OneForOneStrategy(Array(classOf[Throwable]), 3, 2000))
pojo.throwException
Thread.sleep(500)
SimpleJavaPojoImpl._pre should be(true)