moved supervisor code into kernel

This commit is contained in:
Jonas Boner 2009-03-22 17:26:42 +01:00
parent be820235dc
commit 550d910b02
5 changed files with 749 additions and 18 deletions

View file

@ -4,15 +4,13 @@
package com.scalablesolutions.akka.kernel
import com.scalablesolutions.akka.supervisor._
import java.util.{List => JList, ArrayList}
import java.lang.reflect.{Method, Field, InvocationHandler, Proxy, InvocationTargetException}
import java.lang.annotation.Annotation
import voldemort.client.{SocketStoreClientFactory, StoreClient, StoreClientFactory}
import voldemort.versioning.Versioned
//import voldemort.client.{SocketStoreClientFactory, StoreClient, StoreClientFactory}
//import voldemort.versioning.Versioned
sealed class ActiveObjectException(msg: String) extends RuntimeException(msg)
class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectException(msg)
@ -50,7 +48,7 @@ object ActiveObject {
override def getSupervisorConfig = SupervisorConfig(restartStrategy, components)
}
val supervisor = factory.newSupervisor
supervisor ! com.scalablesolutions.akka.supervisor.Start
supervisor ! com.scalablesolutions.akka.kernel.Start
supervisor
}
@ -100,6 +98,7 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I
val result: AnyRef =
if (invocation.method.isAnnotationPresent(oneway)) server ! invocation
else {
val transaction = _
val result: ErrRef[AnyRef] = server !!! (invocation, ErrRef({
throw new ActiveObjectInvocationTimeoutException(
"proxy invocation timed out after " + timeout + " milliseconds")

View file

@ -5,7 +5,7 @@
package com.scalablesolutions.akka.kernel.configuration
import com.scalablesolutions.akka.kernel.{ActiveObject, ActiveObjectProxy}
import google.inject.{AbstractModule}
import com.google.inject.{AbstractModule}
import java.util.{List => JList, ArrayList}
import scala.reflect.BeanProperty
@ -17,42 +17,42 @@ sealed class ConfigurationException(msg: String) extends RuntimeException(msg)
sealed abstract class Configuration
class RestartStrategy(@BeanProperty val scheme: FailOverScheme, @BeanProperty val maxNrOfRetries: Int, @BeanProperty val withinTimeRange: Int) extends Configuration {
def transform = com.scalablesolutions.akka.supervisor.RestartStrategy(scheme.transform, maxNrOfRetries, withinTimeRange)
def transform = com.scalablesolutions.akka.kernel.RestartStrategy(scheme.transform, maxNrOfRetries, withinTimeRange)
}
class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val shutdownTime: Int) extends Configuration {
def transform = com.scalablesolutions.akka.supervisor.LifeCycle(scope.transform, shutdownTime)
def transform = com.scalablesolutions.akka.kernel.LifeCycle(scope.transform, shutdownTime)
}
abstract class Scope extends Configuration {
def transform: com.scalablesolutions.akka.supervisor.Scope
def transform: com.scalablesolutions.akka.kernel.Scope
}
class Permanent extends Scope {
override def transform = com.scalablesolutions.akka.supervisor.Permanent
override def transform = com.scalablesolutions.akka.kernel.Permanent
}
class Transient extends Scope {
override def transform = com.scalablesolutions.akka.supervisor.Transient
override def transform = com.scalablesolutions.akka.kernel.Transient
}
class Temporary extends Scope {
override def transform = com.scalablesolutions.akka.supervisor.Temporary
override def transform = com.scalablesolutions.akka.kernel.Temporary
}
abstract class FailOverScheme extends Configuration {
def transform: com.scalablesolutions.akka.supervisor.FailOverScheme
def transform: com.scalablesolutions.akka.kernel.FailOverScheme
}
class AllForOne extends FailOverScheme {
override def transform = com.scalablesolutions.akka.supervisor.AllForOne
override def transform = com.scalablesolutions.akka.kernel.AllForOne
}
class OneForOne extends FailOverScheme {
override def transform = com.scalablesolutions.akka.supervisor.OneForOne
override def transform = com.scalablesolutions.akka.kernel.OneForOne
}
abstract class Server extends Configuration
//class SupervisorConfig(@BeanProperty val restartStrategy: RestartStrategy, @BeanProperty val servers: JList[Server]) extends Server {
// def transform = com.scalablesolutions.akka.supervisor.SupervisorConfig(restartStrategy.transform, servers.toArray.toList.asInstanceOf[List[Server]].map(_.transform))
//class kernelConfig(@BeanProperty val restartStrategy: RestartStrategy, @BeanProperty val servers: JList[Server]) extends Server {
// def transform = com.scalablesolutions.akka.kernel.kernelConfig(restartStrategy.transform, servers.toArray.toList.asInstanceOf[List[Server]].map(_.transform))
//}
class Component(@BeanProperty val intf: Class[_],
@BeanProperty val target: Class[_],
@BeanProperty val lifeCycle: LifeCycle,
@BeanProperty val timeout: Int) extends Server {
def newWorker(proxy: ActiveObjectProxy) = com.scalablesolutions.akka.supervisor.Worker(proxy.server, lifeCycle.transform)
def newWorker(proxy: ActiveObjectProxy) = com.scalablesolutions.akka.kernel.Worker(proxy.server, lifeCycle.transform)
}

View file

@ -0,0 +1,282 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package com.scalablesolutions.akka.kernel
import scala.actors._
import scala.actors.Actor._
import com.scalablesolutions.akka.kernel.Helpers._
sealed abstract class GenericServerMessage
case class Init(config: AnyRef) extends GenericServerMessage
case class ReInit(config: AnyRef) extends GenericServerMessage
case class Shutdown(reason: AnyRef) extends GenericServerMessage
case class Terminate(reason: AnyRef) extends GenericServerMessage
case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends GenericServerMessage
/**
* Base trait for all user-defined servers/actors.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait GenericServer extends Actor {
/**
* Template method implementing the server logic.
* To be implemented by subclassing server.
* <p/>
* Example code:
* <pre>
* override def body: PartialFunction[Any, Unit] = {
* case Ping =>
* println("got a ping")
* reply("pong")
*
* case OneWay =>
* println("got a oneway")
* }
* </pre>
*/
def body: PartialFunction[Any, Unit]
/**
* Callback method that is called during initialization.
* To be implemented by subclassing server.
*/
def init(config: AnyRef) {}
/**
* Callback method that is called during reinitialization after a server crash.
* To be implemented by subclassing server.
*/
def reinit(config: AnyRef) {}
/**
* Callback method that is called during termination.
* To be implemented by subclassing server.
*/
def shutdown(reason: AnyRef) {}
def act = loop { react { genericBase orElse actorBase } }
private def actorBase: PartialFunction[Any, Unit] = hotswap getOrElse body
private var hotswap: Option[PartialFunction[Any, Unit]] = None
private val genericBase: PartialFunction[Any, Unit] = {
case Init(config) => init(config)
case ReInit(config) => reinit(config)
case HotSwap(code) => hotswap = code
case Shutdown(reason) => shutdown(reason); reply('success)
case Terminate(reason) => exit(reason)
}
}
/**
* The container (proxy) for GenericServer, responsible for managing the life-cycle of the server;
* such as shutdown, restart, re-initialization etc.
* Each GenericServerContainer manages one GenericServer.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class GenericServerContainer(val id: String, var serverFactory: () => GenericServer) extends Logging {
require(id != null && id != "")
// TODO: see if we can parameterize class and add type safe getActor method
//class GenericServerContainer[T <: GenericServer](var factory: () => T) {
//def getActor: T = server
var lifeCycle: Option[LifeCycle] = None
val lock = new ReadWriteLock
private var server: GenericServer = null
private var currentConfig: Option[AnyRef] = None
private var timeout = 5000
/**
* Sends a one way message to the server - alias for <code>cast(message)</code>.
* <p>
* Example:
* <pre>
* server ! Message
* </pre>
*/
def !(message: Any) = {
require(server != null)
lock.withReadLock { server ! message }
}
/**
* Sends a message to the server returns a FutureWithTimeout holding the future reply .
* <p>
* Example:
* <pre>
* val future = server !! Message
* future.receiveWithin(100) match {
* case None => ... // timed out
* case Some(reply) => ... // handle reply
* }
* </pre>
*/
def !![T](message: Any): FutureWithTimeout[T] = {
require(server != null)
lock.withReadLock { server !!! message }
}
/**
* Sends a message to the server and blocks indefinitely (no time out), waiting for the reply.
* <p>
* Example:
* <pre>
* val result: String = server !? Message
* </pre>
*/
def !?[T](message: Any): T = {
require(server != null)
val future: Future[T] = lock.withReadLock { server.!![T](message, {case t => t.asInstanceOf[T]}) }
Actor.receive {
case (future.ch ! arg) => arg.asInstanceOf[T]
}
}
/**
* Sends a message to the server and gets a future back with the reply. Returns
* an Option with either Some(result) if succesful or None if timeout.
* <p>
* Timeout specified by the <code>setTimeout(time: Int)</code> method.
* <p>
* Example:
* <pre>
* (server !!! Message).getOrElse(throw new RuntimeException("time out")
* </pre>
*/
def !!![T](message: Any): Option[T] = {
require(server != null)
val future: FutureWithTimeout[T] = lock.withReadLock { server !!! message }
future.receiveWithin(timeout)
}
/**
* Sends a message to the server and gets a future back with the reply.
* <p>
* Tries to get the reply within the timeout specified in the GenericServerContainer
* and else execute the error handler (which can return a default value, throw an exception
* or whatever is appropriate).
* <p>
* Example:
* <pre>
* server !!! (Message, throw new RuntimeException("time out"))
* // OR
* server !!! (Message, DefaultReturnValue)
* </pre>
*/
def !!![T](message: Any, errorHandler: => T): T = !!!(message, errorHandler, timeout)
/**
* Sends a message to the server and gets a future back with the reply.
* <p>
* Tries to get the reply within the timeout specified as parameter to the method
* and else execute the error handler (which can return a default value, throw an exception
* or whatever is appropriate).
* <p>
* Example:
* <pre>
* server !!! (Message, throw new RuntimeException("time out"), 1000)
* // OR
* server !!! (Message, DefaultReturnValue, 1000)
* </pre>
*/
def !!![T](message: Any, errorHandler: => T, time: Int): T = {
require(server != null)
val future: FutureWithTimeout[T] = lock.withReadLock { server !!! message }
future.receiveWithin(time) match {
case None => errorHandler
case Some(reply) => reply
}
}
/**
* Initializes the server by sending a Init(config) message.
*/
def init(config: AnyRef) = lock.withWriteLock {
currentConfig = Some(config)
server ! Init(config)
}
/**
* Re-initializes the server by sending a ReInit(config) message with the most recent configuration.
*/
def reinit = lock.withWriteLock {
currentConfig match {
case Some(config) => server ! ReInit(config)
case None => {}
}
}
/**
* Hotswaps the server body by sending it a HotSwap(code) with the new code
* block (PartialFunction) to be executed.
*/
def hotswap(code: Option[PartialFunction[Any, Unit]]) = lock.withReadLock { server ! HotSwap(code) }
/**
* Swaps the server factory, enabling creating of a completely new server implementation
* (upon failure and restart).
*/
def swapFactory(newFactory: () => GenericServer) = serverFactory = newFactory
/**
* Sets the timeout for the call(..) method, e.g. the maximum time to wait for a reply
* before bailing out. Sets the timeout on the future return from the call to the server.
*/
def setTimeout(time: Int) = timeout = time
/**
* Returns the next message in the servers mailbox.
*/
def nextMessage = lock.withReadLock { server ? }
/**
* Creates a new actor for the GenericServerContainer, and return the newly created actor.
*/
private[kernel] def newServer(): GenericServer = lock.withWriteLock {
server = serverFactory()
server
}
/**
* Starts the server.
*/
private[kernel] def start = lock.withReadLock { server.start }
/**
* Terminates the server with a reason by sending a Terminate(Some(reason)) message.
*/
private[kernel] def terminate(reason: AnyRef) = lock.withReadLock { server ! Terminate(reason) }
/**
* Terminates the server with a reason by sending a Terminate(Some(reason)) message,
* the shutdownTime defines the maximal time to wait for the server to shutdown before
* killing it.
*/
private[kernel] def terminate(reason: AnyRef, shutdownTime: Int) = lock.withReadLock {
if (shutdownTime > 0) {
log.debug("Waiting {} milliseconds for the server to shut down before killing it.", shutdownTime)
server !? (shutdownTime, Shutdown(reason)) match {
case Some('success) => log.debug("Server [{}] has been shut down cleanly.", id)
case None => log.warning("Server [{}] was **not able** to complete shutdown cleanly within its configured shutdown time [{}]", id, shutdownTime)
}
}
server ! Terminate(reason)
}
private[kernel] def reconfigure(reason: AnyRef, restartedServer: GenericServer, supervisor: Supervisor) = lock.withWriteLock {
server = restartedServer
reinit
}
private[kernel] def getServer: GenericServer = server
}

View file

@ -0,0 +1,92 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package com.scalablesolutions.akka.kernel
import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.actors._
import scala.actors.Actor._
import net.lag.logging.Logger
class SystemFailure(cause: Throwable) extends RuntimeException(cause)
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Helpers extends Logging {
// ================================================
class ReadWriteLock {
private val rwl = new ReentrantReadWriteLock
private val readLock = rwl.readLock
private val writeLock = rwl.writeLock
def withWriteLock[T](body: => T): T = {
writeLock.lock
try {
body
} finally {
writeLock.unlock
}
}
def withReadLock[T](body: => T): T = {
readLock.lock
try {
body
} finally {
readLock.unlock
}
}
}
// ================================================
// implicit conversion between regular actor and actor with a type future
implicit def actorWithFuture(a: Actor) = new ActorWithTypedFuture(a)
abstract class FutureWithTimeout[T](ch: InputChannel[Any]) extends Future[T](ch) {
def receiveWithin(timeout: Int) : Option[T]
override def respond(f: T => Unit): Unit = throw new UnsupportedOperationException("Does not support the Responder API")
}
def receiveOrFail[T](future: => FutureWithTimeout[T], timeout: Int, errorHandler: => T): T = {
future.receiveWithin(timeout) match {
case None => errorHandler
case Some(reply) => reply
}
}
class ActorWithTypedFuture(a: Actor) {
require(a != null)
def !!![A](msg: Any): FutureWithTimeout[A] = {
val ftch = new Channel[Any](Actor.self)
a.send(msg, ftch)
new FutureWithTimeout[A](ftch) {
def apply() =
if (isSet) value.get.asInstanceOf[A]
else ch.receive {
case a =>
value = Some(a)
value.get.asInstanceOf[A]
}
def isSet = receiveWithin(0).isDefined
def receiveWithin(timeout: Int): Option[A] = value match {
case None => ch.receiveWithin(timeout) {
case TIMEOUT =>
log.debug("Future timed out while waiting for actor: {}", a)
None
case a =>
value = Some(a)
value.asInstanceOf[Option[A]]
}
case a => a.asInstanceOf[Option[A]]
}
}
}
}
}

View file

@ -0,0 +1,358 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package com.scalablesolutions.akka.kernel
import scala.actors._
import scala.actors.Actor._
import scala.collection.mutable.HashMap
import com.scalablesolutions.akka.kernel.Helpers._
//====================================================
/**
* Configuration classes - not to be used as messages.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed abstract class ConfigElement
abstract class Server extends ConfigElement
abstract class FailOverScheme extends ConfigElement
abstract class Scope extends ConfigElement
case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server
case class Worker(serverContainer: GenericServerContainer, lifeCycle: LifeCycle) extends Server
case class RestartStrategy(scheme: FailOverScheme, maxNrOfRetries: Int, withinTimeRange: Int) extends ConfigElement
case object AllForOne extends FailOverScheme
case object OneForOne extends FailOverScheme
case class LifeCycle(scope: Scope, shutdownTime: Int) extends ConfigElement
case object Permanent extends Scope
case object Transient extends Scope
case object Temporary extends Scope
//====================================================
/**
* Messages that the supervisor responds to and returns.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed abstract class SupervisorMessage
case object Start extends SupervisorMessage
case object Stop extends SupervisorMessage
case class Configure(config: SupervisorConfig, factory: SupervisorFactory) extends SupervisorMessage
/**
* Abstract base class for all supervisor factories.
* <p>
* Example usage:
* <pre>
* class MySupervisorFactory extends SupervisorFactory {
*
* override protected def getSupervisorConfig: SupervisorConfig = {
* SupervisorConfig(
* RestartStrategy(OneForOne, 3, 10),
* Worker(
* myFirstActorInstance,
* LifeCycle(Permanent, 1000))
* ::
* Worker(
* mySecondActorInstance,
* LifeCycle(Permanent, 1000))
* :: Nil)
* }
* }
* </pre>
*
* Then create a concrete factory in which we mix in support for the specific implementation of the Service we want to use.
*
* <pre>
* object factory extends MySupervisorFactory
* </pre>
*
* Then create a new Supervisor tree with the concrete Services we have defined.
*
* <pre>
* val supervisor = factory.newSupervisor
* supervisor ! Start // start up all managed servers
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
abstract class SupervisorFactory extends Logging {
def newSupervisor: Supervisor = newSupervisorFor(getSupervisorConfig)
def newSupervisorFor(config: SupervisorConfig): Supervisor = config match {
case SupervisorConfig(restartStrategy, _) =>
val supervisor = create(restartStrategy)
supervisor.start
supervisor !? Configure(config, this) match {
case 'success => log.debug("Supervisor successfully configured")
case _ => log.error("Supervisor could not be configured")
}
supervisor
}
/**
* To be overridden by concrete factory.
* Should return the SupervisorConfig for the supervisor.
*/
protected def getSupervisorConfig: SupervisorConfig
protected def create(strategy: RestartStrategy): Supervisor = strategy match {
case RestartStrategy(scheme, maxNrOfRetries, timeRange) =>
scheme match {
case AllForOne => new Supervisor(new AllForOneStrategy(maxNrOfRetries, timeRange))
case OneForOne => new Supervisor(new OneForOneStrategy(maxNrOfRetries, timeRange))
}
}
}
//====================================================
/**
* TODO: document
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class Supervisor(faultHandler: FaultHandlingStrategy) extends Actor with Logging {
private val state = new SupervisorState(this, faultHandler)
/**
* Returns an Option with the GenericServerContainer for the server with the name specified.
* If the server is found then Some(server) is returned else None.
*/
def getServer(id: String): Option[GenericServerContainer] = state.getServerContainer(id)
/**
* Returns an the GenericServerContainer for the server with the name specified.
* If the server is not found then the error handler is invoked.
*/
def getServerOrElse(id: String, errorHandler: => GenericServerContainer): GenericServerContainer = {
getServer(id) match {
case Some(serverContainer) => serverContainer
case None => errorHandler
}
}
def act = {
self.trapExit = true
loop {
react {
case Configure(config, factory) =>
log.debug("Configuring supervisor:{} ", this)
configure(config, factory)
reply('success)
case Start =>
state.serverContainers.foreach { serverContainer =>
serverContainer.start
log.info("Starting server: {}", serverContainer.getServer)
}
case Stop =>
state.serverContainers.foreach { serverContainer =>
serverContainer.terminate('normal)
log.info("Stopping server: {}", serverContainer)
}
log.info("Stopping supervisor: {}", this)
exit('normal)
case Exit(failedServer, reason) =>
reason match {
case 'forced => {} // do nothing
case _ => state.faultHandler.handleFailure(state, failedServer, reason)
}
case unexpected => log.warning("Unexpected message [{}], ignoring...", unexpected)
}
}
}
private def configure(config: SupervisorConfig, factory: SupervisorFactory) = config match {
case SupervisorConfig(_, servers) =>
servers.map(server =>
server match {
case Worker(serverContainer, lifecycle) =>
serverContainer.lifeCycle = Some(lifecycle)
spawnLink(serverContainer)
case SupervisorConfig(_, _) => // recursive configuration
val supervisor = factory.newSupervisorFor(server.asInstanceOf[SupervisorConfig])
supervisor ! Start
state.addSupervisor(supervisor)
})
}
private[kernel] def spawnLink(serverContainer: GenericServerContainer): GenericServer = {
val newServer = serverContainer.newServer()
newServer.start
self.link(newServer)
log.debug("Linking actor [{}] to supervisor [{}]", newServer, this)
state.addServerContainer(serverContainer)
newServer
}
}
//====================================================
/**
* TODO: document
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
abstract class FaultHandlingStrategy(val maxNrOfRetries: Int, val withinTimeRange: Int) extends Logging {
private[kernel] var supervisor: Supervisor = _
private var nrOfRetries = 0
private var retryStartTime = currentTime
private[kernel] def handleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = {
nrOfRetries += 1
if (timeRangeHasExpired) {
if (hasReachedMaximumNrOfRetries) {
log.info("Maximum of restarts [{}] for server [{}] has been reached - the supervisor including all its servers will now be shut down.", maxNrOfRetries, failedServer)
supervisor ! Stop // execution stops here
} else {
nrOfRetries = 0
retryStartTime = currentTime
}
}
doHandleFailure(state, failedServer, reason)
}
private[kernel] def restart(serverContainer: GenericServerContainer, reason: AnyRef, state: SupervisorState) = {
preRestart(serverContainer)
serverContainer.lock.withWriteLock {
// TODO: this is the place to fail-over all pending messages in the failing actor's mailbox, if possible to get a hold of them
// e.g. something like 'serverContainer.getServer.getPendingMessages.map(newServer ! _)'
self.unlink(serverContainer.getServer)
serverContainer.lifeCycle match {
case None => throw new IllegalStateException("Server [" + serverContainer.id + "] does not have a life-cycle defined.")
case Some(LifeCycle(scope, shutdownTime)) =>
serverContainer.terminate(reason, shutdownTime)
scope match {
case Permanent =>
log.debug("Restarting server [{}] configured as PERMANENT.", serverContainer.id)
serverContainer.reconfigure(reason, supervisor.spawnLink(serverContainer), state.supervisor)
case Temporary =>
if (reason == 'normal) {
log.debug("Restarting server [{}] configured as TEMPORARY (since exited naturally).", serverContainer.id)
serverContainer.reconfigure(reason, supervisor.spawnLink(serverContainer), state.supervisor)
} else log.info("Server [{}] configured as TEMPORARY will not be restarted (received unnatural exit message).", serverContainer.id)
case Transient =>
log.info("Server [{}] configured as TRANSIENT will not be restarted.", serverContainer.id)
}
}
}
postRestart(serverContainer)
}
/**
* To be overriden by concrete strategies.
*/
protected def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef)
/**
* To be overriden by concrete strategies.
*/
protected def preRestart(serverContainer: GenericServerContainer) = {}
/**
* To be overriden by concrete strategies.
*/
protected def postRestart(serverContainer: GenericServerContainer) = {}
private def hasReachedMaximumNrOfRetries: Boolean = nrOfRetries > maxNrOfRetries
private def timeRangeHasExpired: Boolean = (currentTime - retryStartTime) > withinTimeRange
private def currentTime: Long = System.currentTimeMillis
}
//====================================================
/**
* TODO: document
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int)
extends FaultHandlingStrategy(maxNrOfRetries, withinTimeRange) {
override def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = {
log.error("Server [{}] has failed due to [{}] - scheduling restart - scheme: ALL_FOR_ONE.", failedServer, reason)
for (serverContainer <- state.serverContainers) restart(serverContainer, reason, state)
state.supervisors.foreach(_ ! Exit(failedServer, reason))
}
}
//====================================================
/**
* TODO: document
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int)
extends FaultHandlingStrategy(maxNrOfRetries, withinTimeRange) {
override def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = {
log.error("Server [{}] has failed due to [{}] - scheduling restart - scheme: ONE_FOR_ONE.", failedServer, reason)
var serverContainer: Option[GenericServerContainer] = None
state.serverContainers.foreach {
container => if (container.getServer == failedServer) serverContainer = Some(container)
}
serverContainer match {
case None => throw new RuntimeException("Could not find a generic server for actor: " + failedServer)
case Some(container) => restart(container, reason, state)
}
}
}
//====================================================
/**
* TODO: document
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[kernel] class SupervisorState(val supervisor: Supervisor, val faultHandler: FaultHandlingStrategy) extends Logging {
faultHandler.supervisor = supervisor
private val _lock = new ReadWriteLock
private val _serverContainerRegistry = new HashMap[String, GenericServerContainer]
private var _supervisors: List[Supervisor] = Nil
def supervisors: List[Supervisor] = _lock.withReadLock {
_supervisors
}
def addSupervisor(supervisor: Supervisor) = _lock.withWriteLock {
_supervisors = supervisor :: _supervisors
}
def serverContainers: List[GenericServerContainer] = _lock.withReadLock {
_serverContainerRegistry.values.toList
}
def getServerContainer(id: String): Option[GenericServerContainer] = _lock.withReadLock {
if (_serverContainerRegistry.contains(id)) Some(_serverContainerRegistry(id))
else None
}
def addServerContainer(serverContainer: GenericServerContainer) = _lock.withWriteLock {
_serverContainerRegistry += serverContainer.id -> serverContainer
}
def removeServerContainer(id: String) = _lock.withWriteLock {
getServerContainer(id) match {
case Some(serverContainer) => _serverContainerRegistry - id
case None => {}
}
}
}