From 550d910b02dff4555502c34360cd34600ab8a57f Mon Sep 17 00:00:00 2001 From: Jonas Boner Date: Sun, 22 Mar 2009 17:26:42 +0100 Subject: [PATCH] moved supervisor code into kernel --- kernel/src/main/scala/ActiveObject.scala | 9 +- kernel/src/main/scala/Configuration.scala | 26 +- kernel/src/main/scala/GenericServer.scala | 282 +++++++++++++++++ kernel/src/main/scala/Helpers.scala | 92 ++++++ kernel/src/main/scala/Supervisor.scala | 358 ++++++++++++++++++++++ 5 files changed, 749 insertions(+), 18 deletions(-) create mode 100644 kernel/src/main/scala/GenericServer.scala create mode 100644 kernel/src/main/scala/Helpers.scala create mode 100644 kernel/src/main/scala/Supervisor.scala diff --git a/kernel/src/main/scala/ActiveObject.scala b/kernel/src/main/scala/ActiveObject.scala index c580087f05..03a5efc234 100755 --- a/kernel/src/main/scala/ActiveObject.scala +++ b/kernel/src/main/scala/ActiveObject.scala @@ -4,15 +4,13 @@ package com.scalablesolutions.akka.kernel -import com.scalablesolutions.akka.supervisor._ - import java.util.{List => JList, ArrayList} import java.lang.reflect.{Method, Field, InvocationHandler, Proxy, InvocationTargetException} import java.lang.annotation.Annotation -import voldemort.client.{SocketStoreClientFactory, StoreClient, StoreClientFactory} -import voldemort.versioning.Versioned +//import voldemort.client.{SocketStoreClientFactory, StoreClient, StoreClientFactory} +//import voldemort.versioning.Versioned sealed class ActiveObjectException(msg: String) extends RuntimeException(msg) class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectException(msg) @@ -50,7 +48,7 @@ object ActiveObject { override def getSupervisorConfig = SupervisorConfig(restartStrategy, components) } val supervisor = factory.newSupervisor - supervisor ! com.scalablesolutions.akka.supervisor.Start + supervisor ! com.scalablesolutions.akka.kernel.Start supervisor } @@ -100,6 +98,7 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I val result: AnyRef = if (invocation.method.isAnnotationPresent(oneway)) server ! invocation else { + val transaction = _ val result: ErrRef[AnyRef] = server !!! (invocation, ErrRef({ throw new ActiveObjectInvocationTimeoutException( "proxy invocation timed out after " + timeout + " milliseconds") diff --git a/kernel/src/main/scala/Configuration.scala b/kernel/src/main/scala/Configuration.scala index a4cb87ee44..11b1dffe8e 100755 --- a/kernel/src/main/scala/Configuration.scala +++ b/kernel/src/main/scala/Configuration.scala @@ -5,7 +5,7 @@ package com.scalablesolutions.akka.kernel.configuration import com.scalablesolutions.akka.kernel.{ActiveObject, ActiveObjectProxy} -import google.inject.{AbstractModule} +import com.google.inject.{AbstractModule} import java.util.{List => JList, ArrayList} import scala.reflect.BeanProperty @@ -17,42 +17,42 @@ sealed class ConfigurationException(msg: String) extends RuntimeException(msg) sealed abstract class Configuration class RestartStrategy(@BeanProperty val scheme: FailOverScheme, @BeanProperty val maxNrOfRetries: Int, @BeanProperty val withinTimeRange: Int) extends Configuration { - def transform = com.scalablesolutions.akka.supervisor.RestartStrategy(scheme.transform, maxNrOfRetries, withinTimeRange) + def transform = com.scalablesolutions.akka.kernel.RestartStrategy(scheme.transform, maxNrOfRetries, withinTimeRange) } class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val shutdownTime: Int) extends Configuration { - def transform = com.scalablesolutions.akka.supervisor.LifeCycle(scope.transform, shutdownTime) + def transform = com.scalablesolutions.akka.kernel.LifeCycle(scope.transform, shutdownTime) } abstract class Scope extends Configuration { - def transform: com.scalablesolutions.akka.supervisor.Scope + def transform: com.scalablesolutions.akka.kernel.Scope } class Permanent extends Scope { - override def transform = com.scalablesolutions.akka.supervisor.Permanent + override def transform = com.scalablesolutions.akka.kernel.Permanent } class Transient extends Scope { - override def transform = com.scalablesolutions.akka.supervisor.Transient + override def transform = com.scalablesolutions.akka.kernel.Transient } class Temporary extends Scope { - override def transform = com.scalablesolutions.akka.supervisor.Temporary + override def transform = com.scalablesolutions.akka.kernel.Temporary } abstract class FailOverScheme extends Configuration { - def transform: com.scalablesolutions.akka.supervisor.FailOverScheme + def transform: com.scalablesolutions.akka.kernel.FailOverScheme } class AllForOne extends FailOverScheme { - override def transform = com.scalablesolutions.akka.supervisor.AllForOne + override def transform = com.scalablesolutions.akka.kernel.AllForOne } class OneForOne extends FailOverScheme { - override def transform = com.scalablesolutions.akka.supervisor.OneForOne + override def transform = com.scalablesolutions.akka.kernel.OneForOne } abstract class Server extends Configuration -//class SupervisorConfig(@BeanProperty val restartStrategy: RestartStrategy, @BeanProperty val servers: JList[Server]) extends Server { -// def transform = com.scalablesolutions.akka.supervisor.SupervisorConfig(restartStrategy.transform, servers.toArray.toList.asInstanceOf[List[Server]].map(_.transform)) +//class kernelConfig(@BeanProperty val restartStrategy: RestartStrategy, @BeanProperty val servers: JList[Server]) extends Server { +// def transform = com.scalablesolutions.akka.kernel.kernelConfig(restartStrategy.transform, servers.toArray.toList.asInstanceOf[List[Server]].map(_.transform)) //} class Component(@BeanProperty val intf: Class[_], @BeanProperty val target: Class[_], @BeanProperty val lifeCycle: LifeCycle, @BeanProperty val timeout: Int) extends Server { - def newWorker(proxy: ActiveObjectProxy) = com.scalablesolutions.akka.supervisor.Worker(proxy.server, lifeCycle.transform) + def newWorker(proxy: ActiveObjectProxy) = com.scalablesolutions.akka.kernel.Worker(proxy.server, lifeCycle.transform) } diff --git a/kernel/src/main/scala/GenericServer.scala b/kernel/src/main/scala/GenericServer.scala new file mode 100644 index 0000000000..f8eafec25a --- /dev/null +++ b/kernel/src/main/scala/GenericServer.scala @@ -0,0 +1,282 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package com.scalablesolutions.akka.kernel + +import scala.actors._ +import scala.actors.Actor._ + +import com.scalablesolutions.akka.kernel.Helpers._ + +sealed abstract class GenericServerMessage +case class Init(config: AnyRef) extends GenericServerMessage +case class ReInit(config: AnyRef) extends GenericServerMessage +case class Shutdown(reason: AnyRef) extends GenericServerMessage +case class Terminate(reason: AnyRef) extends GenericServerMessage +case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends GenericServerMessage + +/** + * Base trait for all user-defined servers/actors. + * + * @author Jonas Bonér + */ +trait GenericServer extends Actor { + + /** + * Template method implementing the server logic. + * To be implemented by subclassing server. + *

+ * Example code: + *

+   *   override def body: PartialFunction[Any, Unit] = {
+   *     case Ping =>
+   *       println("got a ping")
+   *       reply("pong")
+   *
+   *     case OneWay =>
+   *       println("got a oneway")
+   *   }
+   * 
+ */ + def body: PartialFunction[Any, Unit] + + /** + * Callback method that is called during initialization. + * To be implemented by subclassing server. + */ + def init(config: AnyRef) {} + + /** + * Callback method that is called during reinitialization after a server crash. + * To be implemented by subclassing server. + */ + def reinit(config: AnyRef) {} + + /** + * Callback method that is called during termination. + * To be implemented by subclassing server. + */ + def shutdown(reason: AnyRef) {} + + def act = loop { react { genericBase orElse actorBase } } + + private def actorBase: PartialFunction[Any, Unit] = hotswap getOrElse body + + private var hotswap: Option[PartialFunction[Any, Unit]] = None + + private val genericBase: PartialFunction[Any, Unit] = { + case Init(config) => init(config) + case ReInit(config) => reinit(config) + case HotSwap(code) => hotswap = code + case Shutdown(reason) => shutdown(reason); reply('success) + case Terminate(reason) => exit(reason) + } +} + +/** + * The container (proxy) for GenericServer, responsible for managing the life-cycle of the server; + * such as shutdown, restart, re-initialization etc. + * Each GenericServerContainer manages one GenericServer. + * + * @author Jonas Bonér + */ +class GenericServerContainer(val id: String, var serverFactory: () => GenericServer) extends Logging { + require(id != null && id != "") + + // TODO: see if we can parameterize class and add type safe getActor method + //class GenericServerContainer[T <: GenericServer](var factory: () => T) { + //def getActor: T = server + + var lifeCycle: Option[LifeCycle] = None + val lock = new ReadWriteLock + + private var server: GenericServer = null + private var currentConfig: Option[AnyRef] = None + private var timeout = 5000 + + /** + * Sends a one way message to the server - alias for cast(message). + *

+ * Example: + *

+   *   server ! Message
+   * 
+ */ + def !(message: Any) = { + require(server != null) + lock.withReadLock { server ! message } + } + + /** + * Sends a message to the server returns a FutureWithTimeout holding the future reply . + *

+ * Example: + *

+   *  val future = server !! Message
+   *  future.receiveWithin(100) match {
+   *    case None => ... // timed out
+   *    case Some(reply) => ... // handle reply
+   *  }
+   * 
+ */ + def !![T](message: Any): FutureWithTimeout[T] = { + require(server != null) + lock.withReadLock { server !!! message } + } + + /** + * Sends a message to the server and blocks indefinitely (no time out), waiting for the reply. + *

+ * Example: + *

+   *   val result: String = server !? Message
+   * 
+ */ + def !?[T](message: Any): T = { + require(server != null) + val future: Future[T] = lock.withReadLock { server.!![T](message, {case t => t.asInstanceOf[T]}) } + Actor.receive { + case (future.ch ! arg) => arg.asInstanceOf[T] + } + } + + /** + * Sends a message to the server and gets a future back with the reply. Returns + * an Option with either Some(result) if succesful or None if timeout. + *

+ * Timeout specified by the setTimeout(time: Int) method. + *

+ * Example: + *

+   *   (server !!! Message).getOrElse(throw new RuntimeException("time out")
+   * 
+ */ + def !!![T](message: Any): Option[T] = { + require(server != null) + val future: FutureWithTimeout[T] = lock.withReadLock { server !!! message } + future.receiveWithin(timeout) + } + + /** + * Sends a message to the server and gets a future back with the reply. + *

+ * Tries to get the reply within the timeout specified in the GenericServerContainer + * and else execute the error handler (which can return a default value, throw an exception + * or whatever is appropriate). + *

+ * Example: + *

+   *   server !!! (Message, throw new RuntimeException("time out"))
+   *   // OR
+   *   server !!! (Message, DefaultReturnValue)
+   * 
+ */ + def !!![T](message: Any, errorHandler: => T): T = !!!(message, errorHandler, timeout) + + /** + * Sends a message to the server and gets a future back with the reply. + *

+ * Tries to get the reply within the timeout specified as parameter to the method + * and else execute the error handler (which can return a default value, throw an exception + * or whatever is appropriate). + *

+ * Example: + *

+   *   server !!! (Message, throw new RuntimeException("time out"), 1000)
+   *   // OR
+   *   server !!! (Message, DefaultReturnValue, 1000)
+   * 
+ */ + def !!![T](message: Any, errorHandler: => T, time: Int): T = { + require(server != null) + val future: FutureWithTimeout[T] = lock.withReadLock { server !!! message } + future.receiveWithin(time) match { + case None => errorHandler + case Some(reply) => reply + } + } + + /** + * Initializes the server by sending a Init(config) message. + */ + def init(config: AnyRef) = lock.withWriteLock { + currentConfig = Some(config) + server ! Init(config) + } + + /** + * Re-initializes the server by sending a ReInit(config) message with the most recent configuration. + */ + def reinit = lock.withWriteLock { + currentConfig match { + case Some(config) => server ! ReInit(config) + case None => {} + } + } + + /** + * Hotswaps the server body by sending it a HotSwap(code) with the new code + * block (PartialFunction) to be executed. + */ + def hotswap(code: Option[PartialFunction[Any, Unit]]) = lock.withReadLock { server ! HotSwap(code) } + + /** + * Swaps the server factory, enabling creating of a completely new server implementation + * (upon failure and restart). + */ + def swapFactory(newFactory: () => GenericServer) = serverFactory = newFactory + + /** + * Sets the timeout for the call(..) method, e.g. the maximum time to wait for a reply + * before bailing out. Sets the timeout on the future return from the call to the server. + */ + def setTimeout(time: Int) = timeout = time + + /** + * Returns the next message in the servers mailbox. + */ + def nextMessage = lock.withReadLock { server ? } + + /** + * Creates a new actor for the GenericServerContainer, and return the newly created actor. + */ + private[kernel] def newServer(): GenericServer = lock.withWriteLock { + server = serverFactory() + server + } + + /** + * Starts the server. + */ + private[kernel] def start = lock.withReadLock { server.start } + + /** + * Terminates the server with a reason by sending a Terminate(Some(reason)) message. + */ + private[kernel] def terminate(reason: AnyRef) = lock.withReadLock { server ! Terminate(reason) } + + /** + * Terminates the server with a reason by sending a Terminate(Some(reason)) message, + * the shutdownTime defines the maximal time to wait for the server to shutdown before + * killing it. + */ + private[kernel] def terminate(reason: AnyRef, shutdownTime: Int) = lock.withReadLock { + if (shutdownTime > 0) { + log.debug("Waiting {} milliseconds for the server to shut down before killing it.", shutdownTime) + server !? (shutdownTime, Shutdown(reason)) match { + case Some('success) => log.debug("Server [{}] has been shut down cleanly.", id) + case None => log.warning("Server [{}] was **not able** to complete shutdown cleanly within its configured shutdown time [{}]", id, shutdownTime) + } + } + server ! Terminate(reason) + } + + private[kernel] def reconfigure(reason: AnyRef, restartedServer: GenericServer, supervisor: Supervisor) = lock.withWriteLock { + server = restartedServer + reinit + } + + private[kernel] def getServer: GenericServer = server +} + diff --git a/kernel/src/main/scala/Helpers.scala b/kernel/src/main/scala/Helpers.scala new file mode 100644 index 0000000000..a97faca625 --- /dev/null +++ b/kernel/src/main/scala/Helpers.scala @@ -0,0 +1,92 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package com.scalablesolutions.akka.kernel + +import java.util.concurrent.locks.ReentrantReadWriteLock + +import scala.actors._ +import scala.actors.Actor._ + +import net.lag.logging.Logger + +class SystemFailure(cause: Throwable) extends RuntimeException(cause) + +/** + * @author Jonas Bonér + */ +object Helpers extends Logging { + + // ================================================ + class ReadWriteLock { + private val rwl = new ReentrantReadWriteLock + private val readLock = rwl.readLock + private val writeLock = rwl.writeLock + + def withWriteLock[T](body: => T): T = { + writeLock.lock + try { + body + } finally { + writeLock.unlock + } + } + + def withReadLock[T](body: => T): T = { + readLock.lock + try { + body + } finally { + readLock.unlock + } + } + } + + // ================================================ + // implicit conversion between regular actor and actor with a type future + implicit def actorWithFuture(a: Actor) = new ActorWithTypedFuture(a) + + abstract class FutureWithTimeout[T](ch: InputChannel[Any]) extends Future[T](ch) { + def receiveWithin(timeout: Int) : Option[T] + override def respond(f: T => Unit): Unit = throw new UnsupportedOperationException("Does not support the Responder API") + } + + def receiveOrFail[T](future: => FutureWithTimeout[T], timeout: Int, errorHandler: => T): T = { + future.receiveWithin(timeout) match { + case None => errorHandler + case Some(reply) => reply + } + } + + class ActorWithTypedFuture(a: Actor) { + require(a != null) + + def !!![A](msg: Any): FutureWithTimeout[A] = { + val ftch = new Channel[Any](Actor.self) + a.send(msg, ftch) + new FutureWithTimeout[A](ftch) { + def apply() = + if (isSet) value.get.asInstanceOf[A] + else ch.receive { + case a => + value = Some(a) + value.get.asInstanceOf[A] + } + def isSet = receiveWithin(0).isDefined + def receiveWithin(timeout: Int): Option[A] = value match { + case None => ch.receiveWithin(timeout) { + case TIMEOUT => + log.debug("Future timed out while waiting for actor: {}", a) + None + case a => + value = Some(a) + value.asInstanceOf[Option[A]] + } + case a => a.asInstanceOf[Option[A]] + } + } + } + } +} + diff --git a/kernel/src/main/scala/Supervisor.scala b/kernel/src/main/scala/Supervisor.scala new file mode 100644 index 0000000000..9cd05b5b12 --- /dev/null +++ b/kernel/src/main/scala/Supervisor.scala @@ -0,0 +1,358 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package com.scalablesolutions.akka.kernel + +import scala.actors._ +import scala.actors.Actor._ +import scala.collection.mutable.HashMap + +import com.scalablesolutions.akka.kernel.Helpers._ + +//==================================================== + +/** + * Configuration classes - not to be used as messages. + * + * @author Jonas Bonér + */ +sealed abstract class ConfigElement + +abstract class Server extends ConfigElement +abstract class FailOverScheme extends ConfigElement +abstract class Scope extends ConfigElement + +case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server +case class Worker(serverContainer: GenericServerContainer, lifeCycle: LifeCycle) extends Server + +case class RestartStrategy(scheme: FailOverScheme, maxNrOfRetries: Int, withinTimeRange: Int) extends ConfigElement + +case object AllForOne extends FailOverScheme +case object OneForOne extends FailOverScheme + +case class LifeCycle(scope: Scope, shutdownTime: Int) extends ConfigElement +case object Permanent extends Scope +case object Transient extends Scope +case object Temporary extends Scope + +//==================================================== + +/** + * Messages that the supervisor responds to and returns. + * + * @author Jonas Bonér + */ +sealed abstract class SupervisorMessage +case object Start extends SupervisorMessage +case object Stop extends SupervisorMessage +case class Configure(config: SupervisorConfig, factory: SupervisorFactory) extends SupervisorMessage + +/** + * Abstract base class for all supervisor factories. + *

+ * Example usage: + *

+ *  class MySupervisorFactory extends SupervisorFactory {
+ *
+ *    override protected def getSupervisorConfig: SupervisorConfig = {
+ *      SupervisorConfig(
+ *        RestartStrategy(OneForOne, 3, 10),
+ *        Worker(
+ *          myFirstActorInstance,
+ *          LifeCycle(Permanent, 1000))
+ *        ::
+ *        Worker(
+ *          mySecondActorInstance,
+ *          LifeCycle(Permanent, 1000))
+ *        :: Nil)
+ *    }
+ * }
+ * 
+ * + * Then create a concrete factory in which we mix in support for the specific implementation of the Service we want to use. + * + *
+ * object factory extends MySupervisorFactory
+ * 
+ * + * Then create a new Supervisor tree with the concrete Services we have defined. + * + *
+ * val supervisor = factory.newSupervisor
+ * supervisor ! Start // start up all managed servers
+ * 
+ * + * @author Jonas Bonér + */ +abstract class SupervisorFactory extends Logging { + def newSupervisor: Supervisor = newSupervisorFor(getSupervisorConfig) + + def newSupervisorFor(config: SupervisorConfig): Supervisor = config match { + case SupervisorConfig(restartStrategy, _) => + val supervisor = create(restartStrategy) + supervisor.start + supervisor !? Configure(config, this) match { + case 'success => log.debug("Supervisor successfully configured") + case _ => log.error("Supervisor could not be configured") + } + supervisor + } + + /** + * To be overridden by concrete factory. + * Should return the SupervisorConfig for the supervisor. + */ + protected def getSupervisorConfig: SupervisorConfig + + protected def create(strategy: RestartStrategy): Supervisor = strategy match { + case RestartStrategy(scheme, maxNrOfRetries, timeRange) => + scheme match { + case AllForOne => new Supervisor(new AllForOneStrategy(maxNrOfRetries, timeRange)) + case OneForOne => new Supervisor(new OneForOneStrategy(maxNrOfRetries, timeRange)) + } + } +} + +//==================================================== +/** + * TODO: document + * + * @author Jonas Bonér + */ +class Supervisor(faultHandler: FaultHandlingStrategy) extends Actor with Logging { + + private val state = new SupervisorState(this, faultHandler) + + /** + * Returns an Option with the GenericServerContainer for the server with the name specified. + * If the server is found then Some(server) is returned else None. + */ + def getServer(id: String): Option[GenericServerContainer] = state.getServerContainer(id) + + /** + * Returns an the GenericServerContainer for the server with the name specified. + * If the server is not found then the error handler is invoked. + */ + def getServerOrElse(id: String, errorHandler: => GenericServerContainer): GenericServerContainer = { + getServer(id) match { + case Some(serverContainer) => serverContainer + case None => errorHandler + } + } + + def act = { + self.trapExit = true + loop { + react { + case Configure(config, factory) => + log.debug("Configuring supervisor:{} ", this) + configure(config, factory) + reply('success) + + case Start => + state.serverContainers.foreach { serverContainer => + serverContainer.start + log.info("Starting server: {}", serverContainer.getServer) + } + + case Stop => + state.serverContainers.foreach { serverContainer => + serverContainer.terminate('normal) + log.info("Stopping server: {}", serverContainer) + } + log.info("Stopping supervisor: {}", this) + exit('normal) + + case Exit(failedServer, reason) => + reason match { + case 'forced => {} // do nothing + case _ => state.faultHandler.handleFailure(state, failedServer, reason) + } + + case unexpected => log.warning("Unexpected message [{}], ignoring...", unexpected) + } + } + } + + private def configure(config: SupervisorConfig, factory: SupervisorFactory) = config match { + case SupervisorConfig(_, servers) => + servers.map(server => + server match { + case Worker(serverContainer, lifecycle) => + serverContainer.lifeCycle = Some(lifecycle) + spawnLink(serverContainer) + + case SupervisorConfig(_, _) => // recursive configuration + val supervisor = factory.newSupervisorFor(server.asInstanceOf[SupervisorConfig]) + supervisor ! Start + state.addSupervisor(supervisor) + }) + } + + private[kernel] def spawnLink(serverContainer: GenericServerContainer): GenericServer = { + val newServer = serverContainer.newServer() + newServer.start + self.link(newServer) + log.debug("Linking actor [{}] to supervisor [{}]", newServer, this) + state.addServerContainer(serverContainer) + newServer + } +} + +//==================================================== +/** + * TODO: document + * + * @author Jonas Bonér + */ +abstract class FaultHandlingStrategy(val maxNrOfRetries: Int, val withinTimeRange: Int) extends Logging { + private[kernel] var supervisor: Supervisor = _ + private var nrOfRetries = 0 + private var retryStartTime = currentTime + + private[kernel] def handleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = { + nrOfRetries += 1 + if (timeRangeHasExpired) { + if (hasReachedMaximumNrOfRetries) { + log.info("Maximum of restarts [{}] for server [{}] has been reached - the supervisor including all its servers will now be shut down.", maxNrOfRetries, failedServer) + supervisor ! Stop // execution stops here + } else { + nrOfRetries = 0 + retryStartTime = currentTime + } + } + doHandleFailure(state, failedServer, reason) + } + + + private[kernel] def restart(serverContainer: GenericServerContainer, reason: AnyRef, state: SupervisorState) = { + preRestart(serverContainer) + serverContainer.lock.withWriteLock { + + // TODO: this is the place to fail-over all pending messages in the failing actor's mailbox, if possible to get a hold of them + // e.g. something like 'serverContainer.getServer.getPendingMessages.map(newServer ! _)' + + self.unlink(serverContainer.getServer) + serverContainer.lifeCycle match { + case None => throw new IllegalStateException("Server [" + serverContainer.id + "] does not have a life-cycle defined.") + case Some(LifeCycle(scope, shutdownTime)) => + serverContainer.terminate(reason, shutdownTime) + + scope match { + case Permanent => + log.debug("Restarting server [{}] configured as PERMANENT.", serverContainer.id) + serverContainer.reconfigure(reason, supervisor.spawnLink(serverContainer), state.supervisor) + + case Temporary => + if (reason == 'normal) { + log.debug("Restarting server [{}] configured as TEMPORARY (since exited naturally).", serverContainer.id) + serverContainer.reconfigure(reason, supervisor.spawnLink(serverContainer), state.supervisor) + } else log.info("Server [{}] configured as TEMPORARY will not be restarted (received unnatural exit message).", serverContainer.id) + + case Transient => + log.info("Server [{}] configured as TRANSIENT will not be restarted.", serverContainer.id) + } + } + } + postRestart(serverContainer) + } + + /** + * To be overriden by concrete strategies. + */ + protected def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) + + /** + * To be overriden by concrete strategies. + */ + protected def preRestart(serverContainer: GenericServerContainer) = {} + + /** + * To be overriden by concrete strategies. + */ + protected def postRestart(serverContainer: GenericServerContainer) = {} + + private def hasReachedMaximumNrOfRetries: Boolean = nrOfRetries > maxNrOfRetries + private def timeRangeHasExpired: Boolean = (currentTime - retryStartTime) > withinTimeRange + private def currentTime: Long = System.currentTimeMillis +} + +//==================================================== +/** + * TODO: document + * + * @author Jonas Bonér + */ +class AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) +extends FaultHandlingStrategy(maxNrOfRetries, withinTimeRange) { + override def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = { + log.error("Server [{}] has failed due to [{}] - scheduling restart - scheme: ALL_FOR_ONE.", failedServer, reason) + for (serverContainer <- state.serverContainers) restart(serverContainer, reason, state) + state.supervisors.foreach(_ ! Exit(failedServer, reason)) + } +} + +//==================================================== +/** + * TODO: document + * + * @author Jonas Bonér + */ +class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) +extends FaultHandlingStrategy(maxNrOfRetries, withinTimeRange) { + override def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = { + log.error("Server [{}] has failed due to [{}] - scheduling restart - scheme: ONE_FOR_ONE.", failedServer, reason) + var serverContainer: Option[GenericServerContainer] = None + state.serverContainers.foreach { + container => if (container.getServer == failedServer) serverContainer = Some(container) + } + serverContainer match { + case None => throw new RuntimeException("Could not find a generic server for actor: " + failedServer) + case Some(container) => restart(container, reason, state) + } + } +} + +//==================================================== +/** + * TODO: document + * + * @author Jonas Bonér + */ +private[kernel] class SupervisorState(val supervisor: Supervisor, val faultHandler: FaultHandlingStrategy) extends Logging { + faultHandler.supervisor = supervisor + + private val _lock = new ReadWriteLock + private val _serverContainerRegistry = new HashMap[String, GenericServerContainer] + private var _supervisors: List[Supervisor] = Nil + + def supervisors: List[Supervisor] = _lock.withReadLock { + _supervisors + } + + def addSupervisor(supervisor: Supervisor) = _lock.withWriteLock { + _supervisors = supervisor :: _supervisors + } + + def serverContainers: List[GenericServerContainer] = _lock.withReadLock { + _serverContainerRegistry.values.toList + } + + def getServerContainer(id: String): Option[GenericServerContainer] = _lock.withReadLock { + if (_serverContainerRegistry.contains(id)) Some(_serverContainerRegistry(id)) + else None + } + + def addServerContainer(serverContainer: GenericServerContainer) = _lock.withWriteLock { + _serverContainerRegistry += serverContainer.id -> serverContainer + } + + def removeServerContainer(id: String) = _lock.withWriteLock { + getServerContainer(id) match { + case Some(serverContainer) => _serverContainerRegistry - id + case None => {} + } + } +} +