diff --git a/supervisor/.#pom.xml b/supervisor/.#pom.xml new file mode 120000 index 0000000000..9df34ad1b7 --- /dev/null +++ b/supervisor/.#pom.xml @@ -0,0 +1 @@ +jboner@agu.8886:1236500639 \ No newline at end of file diff --git a/supervisor/README b/supervisor/README new file mode 100755 index 0000000000..1b23a275a1 --- /dev/null +++ b/supervisor/README @@ -0,0 +1,214 @@ +--- OVERVIEW --- +Scala Actors Behavior Module + +Implements Erlang-style Behaviors for Scala; Supervisor, GenericServer, GenericEvent and GenericFiniteStateMachine allowing creating fault-tolerant actor-based enterprise systems. + +The implementation consists of four main abstractions; + +* Supervisor -- The Supervisor manages hierarchies of Scala actors and provides fault-tolerance in terms of different restart +semantics. The configuration and semantics is almost a 1-1 port of the Erlang Supervisor implementation, explained +here: http://www.erlang.org/doc/design_principles/sup_princ.html, read this document in order to understand how to +configure the Supervisor properly. + +* GenericServer -- The GenericServer (which subclasses Actor) is a trait that forms the base for a server to be managed by a Supervisor. +The GenericServer is wrapped by a GenericServerContainer instance providing a necessary indirection needed to be able to +fully manage the life-cycle of the GenericServer. + +* GenericEvent -- TBD + +* GenericFiniteStateMachine -- TBD + +--- CHECK OUT --- +The SCM system used is Git. + +1. Download and install Git (google git). +2. Invoke 'git clone git://github.com/jboner/scala-otp.git'. + +--- BUILD --- +The build system used is Maven. + +1. Download and install Maven 2. +2. Step into the root dir 'scala-otp'. +3. Invoke 'mvn install' + +This will build the project, run all tests, create a jar and upload it to your local Maven repository ready for use. + +--- RUNTIME DEPENDENCIES --- +1. Scala 2.7.1-final +2. SLF4J 1.5.2 +3. LogBack Classic 0.9.9 + +--- USAGE --- +Here is a small step-by-step runnable tutorial on how to create a server, configure it, use it, hotswap its +implementation etc. For more details on the API, look at the code or the tests. + +You can find this code in the sample.scala file in the root directory. Run it by invoking 'scala -cp +target/scala-behavior-0.1-SNAPSHOT.jar: sample.scala' + +// ============================================= +// 1. Import statements and Server messages + +import scala.actors._ +import scala.actors.Actor._ + +import scala.actors.behavior._ +import scala.actors.behavior.Helpers._ + +sealed abstract class SampleMessage +case object Ping extends SampleMessage +case object Pong extends SampleMessage +case object OneWay extends SampleMessage +case object Die extends SampleMessage + +// ============================================= +// 2. Create the GenericServer by extending the GenericServer trait and override the 'body' method + +class SampleServer extends GenericServer { + + // This method implements the core server logic and naturally has to be overridden + override def body: PartialFunction[Any, Unit] = { + case Ping => + println("Received Ping"); reply(Pong) + + case OneWay => + println("Received OneWay") + + case Die => + println("Received Die..dying...") + throw new RuntimeException("Received Die message") + } + + // GenericServer also has some callback life-cycle methods, such as init(..) and shutdown(..) +} + +// ============================================= +// 3. Wrap our SampleServer in a GenericServerContainer and give it a name to be able to refer to it later. + +object sampleServer1 extends GenericServerContainer("sample1", () => new SampleServer) +object sampleServer2 extends GenericServerContainer("sample2", () => new SampleServer) + +// ============================================= +// 4. Create a Supervisor configuration (and a SupervisorFactory) that is configuring our SampleServer (takes a list of +'Worker' configurations, one or many) + +object factory extends SupervisorFactory { + override protected def getSupervisorConfig: SupervisorConfig = { + SupervisorConfig( + RestartStrategy(AllForOne, 3, 10000), + Worker( + sampleServer1, + LifeCycle(Permanent, 1000)) :: + Worker( + sampleServer2, + LifeCycle(Permanent, 1000)) :: + Nil) + } +} + +// ============================================= +// 5. Create a new Supervisor with the custom factory + +val supervisor = factory.newSupervisor + +// ============================================= +// 6. Start the Supervisor (which starts the server(s)) + +supervisor ! Start + +// ============================================= +// 7. Try to send a one way asyncronous message to our servers + +sampleServer1 ! OneWay + +// Try to get sampleServer2 from the Supervisor before sending a message +supervisor.getServer("sample2") match { + case Some(server2) => server2 ! OneWay + case None => println("server [sample2] could not be found") +} + +// ============================================= +// 8. Try to send an asyncronous message - receive a future - wait 100 ms (time-out) for the reply + +val future = sampleServer1 !! Ping +val reply1 = future.receiveWithin(100) match { + case Some(reply) => + println("Received reply: " + reply) + case None => + println("Did not get a reply witin 100 ms") +} + +// ============================================= +// 9. Try to send a message (Die) telling the server to kill itself (throw an exception) + +sampleServer1 ! Die + +// ============================================= +// 10. Send an asyncronous message and wait on a future. If it times out -> use error handler (in this case throw an +exception). It is likely that this call will time out since the server is in the middle of recovering from failure. + +val reply2 = try { + sampleServer1 !!! (Ping, throw new RuntimeException("Time-out"), 10) // time out is set to 10 ms (very low on purpose) + +} catch { case e => println("Expected exception: " + e.toString); Pong } + +// ============================================= +// 11. Server should be up again. Try the same call again + +val reply3 = try { + sampleServer1 !!! (Ping, throw new RuntimeException("Time-out"), 1000) +} catch { case e => println("Expected exception: " + e.toString); Pong } + +// Also check server number 2 +sampleServer2 ! Ping + +// ============================================= +// 11. Try to hotswap the server implementation + +sampleServer1.hotswap(Some({ + case Ping => + println("Hotswapped Ping") +})) + +// ============================================= +// 12. Try the hotswapped server out + +sampleServer1 ! Ping + +// ============================================= +// 13. Hotswap again + +sampleServer1.hotswap(Some({ + case Pong => + println("Hotswapped again, now doing Pong") + reply(Ping) +})) + +// ============================================= +// 14. Send an asyncronous message that will wait on a future. Method returns an Option[T] => if Some(result) -> return +result, if None -> print out an info message (or throw an exception or do whatever you like...) + +val reply4 = (sampleServer1 !!! Pong).getOrElse({println("Time out when sending Pong"); Ping}) + +// Same invocation with pattern matching syntax. + +val reply5 = sampleServer1 !!! Pong match { + case Some(result) => result + case None => println("Time out when sending Pong"); Ping +} + +// ============================================= +// 15. Hotswap back to original implementation by passing in None + +sampleServer1.hotswap(None) + +// ============================================= +// 16. Test the final hotswap by sending an async message + +sampleServer1 ! Ping + +// ============================================= +// 17. Shut down the supervisor and its server(s) + +supervisor ! Stop + + diff --git a/supervisor/pom.xml b/supervisor/pom.xml new file mode 100755 index 0000000000..bcee8fdf37 --- /dev/null +++ b/supervisor/pom.xml @@ -0,0 +1,140 @@ + + 4.0.0 + + akka-supervisor + Akka Supervisor Module + + jar + + + akka + ${akka.groupId} + ${akka.version} + + + + + ${akka.groupId} + akka-util-java + ${akka.version} + + + org.scala-lang + scala-library + ${scala.version} + + + net.lag + configgy + 1.2 + + + org.specs + specs + 1.4.3 + test + + + + + src/main/scala + + + + org.scala-tools + maven-scala-plugin + + + + compile + + + + + + + -target:jvm-1.5 + -unchecked + + ${scala.version} + 1.0 + + + + org.apache.maven.plugins + maven-eclipse-plugin + + true + + + ch.epfl.lamp.sdt.core.scalabuilder + + + + + ch.epfl.lamp.sdt.core.scalanature + + + + + org.eclipse.jdt.launching.JRE_CONTAINER + + + ch.epfl.lamp.sdt.launching.SCALA_CONTAINER + + + + + + + + + + + + + + + org.codehaus.mojo + cobertura-maven-plugin + 2.2 + + + xml + html + + + + + + + false + src/main/resources + + + false + src/main/scala + + ** + + + **/*.scala + + + + + + + + org.scala-tools + maven-scala-plugin + + 1.1 + ${scala.version} + + + + + diff --git a/supervisor/sample.scala b/supervisor/sample.scala new file mode 100755 index 0000000000..23baa6c686 --- /dev/null +++ b/supervisor/sample.scala @@ -0,0 +1,164 @@ +// ============================================= +// 1. Import statements and Server messages + +import scala.actors._ +import scala.actors.Actor._ + +import scala.actors.behavior._ +import scala.actors.behavior.Helpers._ + +sealed abstract class SampleMessage +case object Ping extends SampleMessage +case object Pong extends SampleMessage +case object OneWay extends SampleMessage +case object Die extends SampleMessage + +// ============================================= +// 2. Create the GenericServer by extending the GenericServer trait and override the 'body' method + +class SampleServer extends GenericServer { + + // This method implements the core server logic and naturally has to be overridden + override def body: PartialFunction[Any, Unit] = { + case Ping => + println("Received Ping"); reply(Pong) + + case OneWay => + println("Received OneWay") + + case Die => + println("Received Die..dying...") + throw new RuntimeException("Received Die message") + } + + // GenericServer also has some callback life-cycle methods, such as init(..) and shutdown(..) +} + +// ============================================= +// 3. Wrap our SampleServer in a GenericServerContainer and give it a name to be able to refer to it later. + +object sampleServer1 extends GenericServerContainer("sample1", () => new SampleServer) +object sampleServer2 extends GenericServerContainer("sample2", () => new SampleServer) + +// ============================================= +// 4. Create a Supervisor configuration (and a SupervisorFactory) that is configuring our SampleServer (takes a list of 'Worker' configurations, one or many) + +object factory extends SupervisorFactory { + override protected def getSupervisorConfig: SupervisorConfig = { + SupervisorConfig( + RestartStrategy(AllForOne, 3, 10000), + Worker( + sampleServer1, + LifeCycle(Permanent, 1000)) :: + Worker( + sampleServer2, + LifeCycle(Permanent, 1000)) :: + Nil) + } +} + +// ============================================= +// 5. Create a new Supervisor with the custom factory + +val supervisor = factory.newSupervisor + +// ============================================= +// 6. Start the Supervisor (which starts the server(s)) + +supervisor ! Start + +// ============================================= +// 7. Try to send a one way asyncronous message to our servers + +sampleServer1 ! OneWay + +// Try to get sampleServer2 from the Supervisor before sending a message +supervisor.getServer("sample2") match { + case Some(server2) => server2 ! OneWay + case None => println("server [sample2] could not be found") +} + +// ============================================= +// 8. Try to send an asyncronous message - receive a future - wait 100 ms (time-out) for the reply + +val future = sampleServer1 !! Ping +val reply1 = future.receiveWithin(100) match { + case Some(reply) => + println("Received reply: " + reply) + case None => + println("Did not get a reply witin 100 ms") +} + +// ============================================= +// 9. Try to send a message (Die) telling the server to kill itself (throw an exception) + +sampleServer1 ! Die + +// ============================================= +// 10. Send an asyncronous message and wait on a future. If it times out -> use error handler (in this case throw an exception). It is likely that this call will time out since the server is in the middle of recovering from failure. + +val reply2 = try { + sampleServer1 !!! (Ping, throw new RuntimeException("Time-out"), 10) // time out is set to 10 ms (very low on purpose) + +} catch { case e => println("Expected exception: " + e.toString); Pong } + +// ============================================= +// 11. Server should be up again. Try the same call again + +val reply3 = try { + sampleServer1 !!! (Ping, throw new RuntimeException("Time-out"), 1000) +} catch { case e => println("Expected exception: " + e.toString); Pong } + +// Also check server number 2 +sampleServer2 ! Ping + +// ============================================= +// 11. Try to hotswap the server implementation + +sampleServer1.hotswap(Some({ + case Ping => + println("Hotswapped Ping") +})) + +// ============================================= +// 12. Try the hotswapped server out + +sampleServer1 ! Ping + +// ============================================= +// 13. Hotswap again + +sampleServer1.hotswap(Some({ + case Pong => + println("Hotswapped again, now doing Pong") + reply(Ping) +})) + +// ============================================= +// 14. Send an asyncronous message that will wait on a future. Method returns an Option[T] => if Some(result) -> return result, if None -> print out an info message (or throw an exception or do whatever you like...) + +val reply4 = (sampleServer1 !!! Pong).getOrElse({println("Time out when sending Pong"); Ping}) + +// Same invocation with pattern matching syntax. + +val reply5 = sampleServer1 !!! Pong match { + case Some(result) => result + case None => println("Time out when sending Pong"); Ping +} + +// ============================================= +// 15. Hotswap back to original implementation by passing in None + +sampleServer1.hotswap(None) + +// ============================================= +// 16. Test the final hotswap by sending an async message + +sampleServer1 ! Ping + +// ============================================= +// 17. Shut down the supervisor and its server(s) + +supervisor ! Stop + + diff --git a/supervisor/src/main/scala/GenericServer.scala b/supervisor/src/main/scala/GenericServer.scala new file mode 100755 index 0000000000..b5c6a1bb70 --- /dev/null +++ b/supervisor/src/main/scala/GenericServer.scala @@ -0,0 +1,282 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package com.scalablesolutions.akka.supervisor + +import scala.actors._ +import scala.actors.Actor._ + +import com.scalablesolutions.akka.supervisor.Helpers._ + +sealed abstract class GenericServerMessage +case class Init(config: AnyRef) extends GenericServerMessage +case class ReInit(config: AnyRef) extends GenericServerMessage +case class Shutdown(reason: AnyRef) extends GenericServerMessage +case class Terminate(reason: AnyRef) extends GenericServerMessage +case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends GenericServerMessage + +/** + * Base trait for all user-defined servers/actors. + * + * @author Jonas Bonér + */ +trait GenericServer extends Actor { + + /** + * Template method implementing the server logic. + * To be implemented by subclassing server. + *

+ * Example code: + *

+   *   override def body: PartialFunction[Any, Unit] = {
+   *     case Ping =>
+   *       println("got a ping")
+   *       reply("pong")
+   *
+   *     case OneWay =>
+   *       println("got a oneway")
+   *   }
+   * 
+ */ + def body: PartialFunction[Any, Unit] + + /** + * Callback method that is called during initialization. + * To be implemented by subclassing server. + */ + def init(config: AnyRef) {} + + /** + * Callback method that is called during reinitialization after a server crash. + * To be implemented by subclassing server. + */ + def reinit(config: AnyRef) {} + + /** + * Callback method that is called during termination. + * To be implemented by subclassing server. + */ + def shutdown(reason: AnyRef) {} + + def act = loop { react { genericBase orElse actorBase } } + + private def actorBase: PartialFunction[Any, Unit] = hotswap getOrElse body + + private var hotswap: Option[PartialFunction[Any, Unit]] = None + + private val genericBase: PartialFunction[Any, Unit] = { + case Init(config) => init(config) + case ReInit(config) => reinit(config) + case HotSwap(code) => hotswap = code + case Shutdown(reason) => shutdown(reason); reply('success) + case Terminate(reason) => exit(reason) + } +} + +/** + * The container (proxy) for GenericServer, responsible for managing the life-cycle of the server; + * such as shutdown, restart, re-initialization etc. + * Each GenericServerContainer manages one GenericServer. + * + * @author Jonas Bonér + */ +class GenericServerContainer(val id: String, var serverFactory: () => GenericServer) extends Logging { + require(id != null && id != "") + + // TODO: see if we can parameterize class and add type safe getActor method + //class GenericServerContainer[T <: GenericServer](var factory: () => T) { + //def getActor: T = server + + var lifeCycle: Option[LifeCycle] = None + val lock = new ReadWriteLock + + private var server: GenericServer = null + private var currentConfig: Option[AnyRef] = None + private var timeout = 5000 + + /** + * Sends a one way message to the server - alias for cast(message). + *

+ * Example: + *

+   *   server ! Message
+   * 
+ */ + def !(message: Any) = { + require(server != null) + lock.withReadLock { server ! message } + } + + /** + * Sends a message to the server returns a FutureWithTimeout holding the future reply . + *

+ * Example: + *

+   *  val future = server !! Message
+   *  future.receiveWithin(100) match {
+   *    case None => ... // timed out
+   *    case Some(reply) => ... // handle reply
+   *  }
+   * 
+ */ + def !![T](message: Any): FutureWithTimeout[T] = { + require(server != null) + lock.withReadLock { server !!! message } + } + + /** + * Sends a message to the server and blocks indefinitely (no time out), waiting for the reply. + *

+ * Example: + *

+   *   val result: String = server !? Message
+   * 
+ */ + def !?[T](message: Any): T = { + require(server != null) + val future: Future[T] = lock.withReadLock { server.!![T](message, {case t => t.asInstanceOf[T]}) } + Actor.receive { + case (future.ch ! arg) => arg.asInstanceOf[T] + } + } + + /** + * Sends a message to the server and gets a future back with the reply. Returns + * an Option with either Some(result) if succesful or None if timeout. + *

+ * Timeout specified by the setTimeout(time: Int) method. + *

+ * Example: + *

+   *   (server !!! Message).getOrElse(throw new RuntimeException("time out")
+   * 
+ */ + def !!![T](message: Any): Option[T] = { + require(server != null) + val future: FutureWithTimeout[T] = lock.withReadLock { server !!! message } + future.receiveWithin(timeout) + } + + /** + * Sends a message to the server and gets a future back with the reply. + *

+ * Tries to get the reply within the timeout specified in the GenericServerContainer + * and else execute the error handler (which can return a default value, throw an exception + * or whatever is appropriate). + *

+ * Example: + *

+   *   server !!! (Message, throw new RuntimeException("time out"))
+   *   // OR
+   *   server !!! (Message, DefaultReturnValue)
+   * 
+ */ + def !!![T](message: Any, errorHandler: => T): T = !!!(message, errorHandler, timeout) + + /** + * Sends a message to the server and gets a future back with the reply. + *

+ * Tries to get the reply within the timeout specified as parameter to the method + * and else execute the error handler (which can return a default value, throw an exception + * or whatever is appropriate). + *

+ * Example: + *

+   *   server !!! (Message, throw new RuntimeException("time out"), 1000)
+   *   // OR
+   *   server !!! (Message, DefaultReturnValue, 1000)
+   * 
+ */ + def !!![T](message: Any, errorHandler: => T, time: Int): T = { + require(server != null) + val future: FutureWithTimeout[T] = lock.withReadLock { server !!! message } + future.receiveWithin(time) match { + case None => errorHandler + case Some(reply) => reply + } + } + + /** + * Initializes the server by sending a Init(config) message. + */ + def init(config: AnyRef) = lock.withWriteLock { + currentConfig = Some(config) + server ! Init(config) + } + + /** + * Re-initializes the server by sending a ReInit(config) message with the most recent configuration. + */ + def reinit = lock.withWriteLock { + currentConfig match { + case Some(config) => server ! ReInit(config) + case None => {} + } + } + + /** + * Hotswaps the server body by sending it a HotSwap(code) with the new code + * block (PartialFunction) to be executed. + */ + def hotswap(code: Option[PartialFunction[Any, Unit]]) = lock.withReadLock { server ! HotSwap(code) } + + /** + * Swaps the server factory, enabling creating of a completely new server implementation + * (upon failure and restart). + */ + def swapFactory(newFactory: () => GenericServer) = serverFactory = newFactory + + /** + * Sets the timeout for the call(..) method, e.g. the maximum time to wait for a reply + * before bailing out. Sets the timeout on the future return from the call to the server. + */ + def setTimeout(time: Int) = timeout = time + + /** + * Returns the next message in the servers mailbox. + */ + def nextMessage = lock.withReadLock { server ? } + + /** + * Creates a new actor for the GenericServerContainer, and return the newly created actor. + */ + private[supervisor] def newServer(): GenericServer = lock.withWriteLock { + server = serverFactory() + server + } + + /** + * Starts the server. + */ + private[supervisor] def start = lock.withReadLock { server.start } + + /** + * Terminates the server with a reason by sending a Terminate(Some(reason)) message. + */ + private[supervisor] def terminate(reason: AnyRef) = lock.withReadLock { server ! Terminate(reason) } + + /** + * Terminates the server with a reason by sending a Terminate(Some(reason)) message, + * the shutdownTime defines the maximal time to wait for the server to shutdown before + * killing it. + */ + private[supervisor] def terminate(reason: AnyRef, shutdownTime: Int) = lock.withReadLock { + if (shutdownTime > 0) { + log.debug("Waiting {} milliseconds for the server to shut down before killing it.", shutdownTime) + server !? (shutdownTime, Shutdown(reason)) match { + case Some('success) => log.debug("Server [{}] has been shut down cleanly.", id) + case None => log.warning("Server [{}] was **not able** to complete shutdown cleanly within its configured shutdown time [{}]", id, shutdownTime) + } + } + server ! Terminate(reason) + } + + private[supervisor] def reconfigure(reason: AnyRef, restartedServer: GenericServer, supervisor: Supervisor) = lock.withWriteLock { + server = restartedServer + reinit + } + + private[supervisor] def getServer: GenericServer = server +} + diff --git a/supervisor/src/main/scala/Helpers.scala b/supervisor/src/main/scala/Helpers.scala new file mode 100755 index 0000000000..50dbeb0b98 --- /dev/null +++ b/supervisor/src/main/scala/Helpers.scala @@ -0,0 +1,101 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package com.scalablesolutions.akka.supervisor + +import java.util.concurrent.locks.ReentrantReadWriteLock + +import scala.actors._ +import scala.actors.Actor._ + +import net.lag.logging.Logger + +class SystemFailure(cause: Throwable) extends RuntimeException(cause) + +/** + * Base trait for all classes that wants to be able use the logging infrastructure. + * + * @author Jonas Bonér + */ +trait Logging { + @transient val log = Logger.get(this.getClass.getName) +} + +/** + * @author Jonas Bonér + */ +object Helpers extends Logging { + + // ================================================ + class ReadWriteLock { + private val rwl = new ReentrantReadWriteLock + private val readLock = rwl.readLock + private val writeLock = rwl.writeLock + + def withWriteLock[T](body: => T): T = { + writeLock.lock + try { + body + } finally { + writeLock.unlock + } + } + + def withReadLock[T](body: => T): T = { + readLock.lock + try { + body + } finally { + readLock.unlock + } + } + } + + // ================================================ + // implicit conversion between regular actor and actor with a type future + implicit def actorWithFuture(a: Actor) = new ActorWithTypedFuture(a) + + abstract class FutureWithTimeout[T](ch: InputChannel[Any]) extends Future[T](ch) { + def receiveWithin(timeout: Int) : Option[T] + override def respond(f: T => Unit): Unit = throw new UnsupportedOperationException("Does not support the Responder API") + } + + def receiveOrFail[T](future: => FutureWithTimeout[T], timeout: Int, errorHandler: => T): T = { + future.receiveWithin(timeout) match { + case None => errorHandler + case Some(reply) => reply + } + } + + class ActorWithTypedFuture(a: Actor) { + require(a != null) + + def !!![A](msg: Any): FutureWithTimeout[A] = { + val ftch = new Channel[Any](Actor.self) + a.send(msg, ftch) + new FutureWithTimeout[A](ftch) { + def apply() = + if (isSet) value.get.asInstanceOf[A] + else ch.receive { + case a => + value = Some(a) + value.get.asInstanceOf[A] + } + def isSet = receiveWithin(0).isDefined + def receiveWithin(timeout: Int): Option[A] = value match { + case None => ch.receiveWithin(timeout) { + case TIMEOUT => + log.debug("Future timed out while waiting for actor: {}", a) + None + case a => + value = Some(a) + value.asInstanceOf[Option[A]] + } + case a => a.asInstanceOf[Option[A]] + } + } + } + } +} + diff --git a/supervisor/src/main/scala/Supervisor.scala b/supervisor/src/main/scala/Supervisor.scala new file mode 100755 index 0000000000..62f9d94ec1 --- /dev/null +++ b/supervisor/src/main/scala/Supervisor.scala @@ -0,0 +1,358 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package com.scalablesolutions.akka.supervisor + +import scala.actors._ +import scala.actors.Actor._ +import scala.collection.mutable.HashMap + +import com.scalablesolutions.akka.supervisor.Helpers._ + +//==================================================== + +/** + * Configuration classes - not to be used as messages. + * + * @author Jonas Bonér + */ +sealed abstract class ConfigElement + +abstract class Server extends ConfigElement +abstract class FailOverScheme extends ConfigElement +abstract class Scope extends ConfigElement + +case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server +case class Worker(serverContainer: GenericServerContainer, lifeCycle: LifeCycle) extends Server + +case class RestartStrategy(scheme: FailOverScheme, maxNrOfRetries: Int, withinTimeRange: Int) extends ConfigElement + +case object AllForOne extends FailOverScheme +case object OneForOne extends FailOverScheme + +case class LifeCycle(scope: Scope, shutdownTime: Int) extends ConfigElement +case object Permanent extends Scope +case object Transient extends Scope +case object Temporary extends Scope + +//==================================================== + +/** + * Messages that the supervisor responds to and returns. + * + * @author Jonas Bonér + */ +sealed abstract class SupervisorMessage +case object Start extends SupervisorMessage +case object Stop extends SupervisorMessage +case class Configure(config: SupervisorConfig, factory: SupervisorFactory) extends SupervisorMessage + +/** + * Abstract base class for all supervisor factories. + *

+ * Example usage: + *

+ *  class MySupervisorFactory extends SupervisorFactory {
+ *
+ *    override protected def getSupervisorConfig: SupervisorConfig = {
+ *      SupervisorConfig(
+ *        RestartStrategy(OneForOne, 3, 10),
+ *        Worker(
+ *          myFirstActorInstance,
+ *          LifeCycle(Permanent, 1000))
+ *        ::
+ *        Worker(
+ *          mySecondActorInstance,
+ *          LifeCycle(Permanent, 1000))
+ *        :: Nil)
+ *    }
+ * }
+ * 
+ * + * Then create a concrete factory in which we mix in support for the specific implementation of the Service we want to use. + * + *
+ * object factory extends MySupervisorFactory
+ * 
+ * + * Then create a new Supervisor tree with the concrete Services we have defined. + * + *
+ * val supervisor = factory.newSupervisor
+ * supervisor ! Start // start up all managed servers
+ * 
+ * + * @author Jonas Bonér + */ +abstract class SupervisorFactory extends Logging { + def newSupervisor: Supervisor = newSupervisorFor(getSupervisorConfig) + + def newSupervisorFor(config: SupervisorConfig): Supervisor = config match { + case SupervisorConfig(restartStrategy, _) => + val supervisor = create(restartStrategy) + supervisor.start + supervisor !? Configure(config, this) match { + case 'success => log.debug("Supervisor successfully configured") + case _ => log.error("Supervisor could not be configured") + } + supervisor + } + + /** + * To be overridden by concrete factory. + * Should return the SupervisorConfig for the supervisor. + */ + protected def getSupervisorConfig: SupervisorConfig + + protected def create(strategy: RestartStrategy): Supervisor = strategy match { + case RestartStrategy(scheme, maxNrOfRetries, timeRange) => + scheme match { + case AllForOne => new Supervisor(new AllForOneStrategy(maxNrOfRetries, timeRange)) + case OneForOne => new Supervisor(new OneForOneStrategy(maxNrOfRetries, timeRange)) + } + } +} + +//==================================================== +/** + * TODO: document + * + * @author Jonas Bonér + */ +class Supervisor(faultHandler: FaultHandlingStrategy) extends Actor with Logging { + + private val state = new SupervisorState(this, faultHandler) + + /** + * Returns an Option with the GenericServerContainer for the server with the name specified. + * If the server is found then Some(server) is returned else None. + */ + def getServer(id: String): Option[GenericServerContainer] = state.getServerContainer(id) + + /** + * Returns an the GenericServerContainer for the server with the name specified. + * If the server is not found then the error handler is invoked. + */ + def getServerOrElse(id: String, errorHandler: => GenericServerContainer): GenericServerContainer = { + getServer(id) match { + case Some(serverContainer) => serverContainer + case None => errorHandler + } + } + + def act = { + self.trapExit = true + loop { + react { + case Configure(config, factory) => + log.debug("Configuring supervisor:{} ", this) + configure(config, factory) + reply('success) + + case Start => + state.serverContainers.foreach { serverContainer => + serverContainer.start + log.info("Starting server: {}", serverContainer.getServer) + } + + case Stop => + state.serverContainers.foreach { serverContainer => + serverContainer.terminate('normal) + log.info("Stopping server: {}", serverContainer) + } + log.info("Stopping supervisor: {}", this) + exit('normal) + + case Exit(failedServer, reason) => + reason match { + case 'forced => {} // do nothing + case _ => state.faultHandler.handleFailure(state, failedServer, reason) + } + + case unexpected => log.warning("Unexpected message [{}], ignoring...", unexpected) + } + } + } + + private def configure(config: SupervisorConfig, factory: SupervisorFactory) = config match { + case SupervisorConfig(_, servers) => + servers.map(server => + server match { + case Worker(serverContainer, lifecycle) => + serverContainer.lifeCycle = Some(lifecycle) + spawnLink(serverContainer) + + case SupervisorConfig(_, _) => // recursive configuration + val supervisor = factory.newSupervisorFor(server.asInstanceOf[SupervisorConfig]) + supervisor ! Start + state.addSupervisor(supervisor) + }) + } + + private[supervisor] def spawnLink(serverContainer: GenericServerContainer): GenericServer = { + val newServer = serverContainer.newServer() + newServer.start + self.link(newServer) + log.debug("Linking actor [{}] to supervisor [{}]", newServer, this) + state.addServerContainer(serverContainer) + newServer + } +} + +//==================================================== +/** + * TODO: document + * + * @author Jonas Bonér + */ +abstract class FaultHandlingStrategy(val maxNrOfRetries: Int, val withinTimeRange: Int) extends Logging { + private[supervisor] var supervisor: Supervisor = _ + private var nrOfRetries = 0 + private var retryStartTime = currentTime + + private[supervisor] def handleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = { + nrOfRetries += 1 + if (timeRangeHasExpired) { + if (hasReachedMaximumNrOfRetries) { + log.info("Maximum of restarts [{}] for server [{}] has been reached - the supervisor including all its servers will now be shut down.", maxNrOfRetries, failedServer) + supervisor ! Stop // execution stops here + } else { + nrOfRetries = 0 + retryStartTime = currentTime + } + } + doHandleFailure(state, failedServer, reason) + } + + + private[supervisor] def restart(serverContainer: GenericServerContainer, reason: AnyRef, state: SupervisorState) = { + preRestart(serverContainer) + serverContainer.lock.withWriteLock { + + // TODO: this is the place to fail-over all pending messages in the failing actor's mailbox, if possible to get a hold of them + // e.g. something like 'serverContainer.getServer.getPendingMessages.map(newServer ! _)' + + self.unlink(serverContainer.getServer) + serverContainer.lifeCycle match { + case None => throw new IllegalStateException("Server [" + serverContainer.id + "] does not have a life-cycle defined.") + case Some(LifeCycle(scope, shutdownTime)) => + serverContainer.terminate(reason, shutdownTime) + + scope match { + case Permanent => + log.debug("Restarting server [{}] configured as PERMANENT.", serverContainer.id) + serverContainer.reconfigure(reason, supervisor.spawnLink(serverContainer), state.supervisor) + + case Temporary => + if (reason == 'normal) { + log.debug("Restarting server [{}] configured as TEMPORARY (since exited naturally).", serverContainer.id) + serverContainer.reconfigure(reason, supervisor.spawnLink(serverContainer), state.supervisor) + } else log.info("Server [{}] configured as TEMPORARY will not be restarted (received unnatural exit message).", serverContainer.id) + + case Transient => + log.info("Server [{}] configured as TRANSIENT will not be restarted.", serverContainer.id) + } + } + } + postRestart(serverContainer) + } + + /** + * To be overriden by concrete strategies. + */ + protected def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) + + /** + * To be overriden by concrete strategies. + */ + protected def preRestart(serverContainer: GenericServerContainer) = {} + + /** + * To be overriden by concrete strategies. + */ + protected def postRestart(serverContainer: GenericServerContainer) = {} + + private def hasReachedMaximumNrOfRetries: Boolean = nrOfRetries > maxNrOfRetries + private def timeRangeHasExpired: Boolean = (currentTime - retryStartTime) > withinTimeRange + private def currentTime: Long = System.currentTimeMillis +} + +//==================================================== +/** + * TODO: document + * + * @author Jonas Bonér + */ +class AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) +extends FaultHandlingStrategy(maxNrOfRetries, withinTimeRange) { + override def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = { + log.error("Server [{}] has failed due to [{}] - scheduling restart - scheme: ALL_FOR_ONE.", failedServer, reason) + for (serverContainer <- state.serverContainers) restart(serverContainer, reason, state) + state.supervisors.foreach(_ ! Exit(failedServer, reason)) + } +} + +//==================================================== +/** + * TODO: document + * + * @author Jonas Bonér + */ +class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) +extends FaultHandlingStrategy(maxNrOfRetries, withinTimeRange) { + override def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = { + log.error("Server [{}] has failed due to [{}] - scheduling restart - scheme: ONE_FOR_ONE.", failedServer, reason) + var serverContainer: Option[GenericServerContainer] = None + state.serverContainers.foreach { + container => if (container.getServer == failedServer) serverContainer = Some(container) + } + serverContainer match { + case None => throw new RuntimeException("Could not find a generic server for actor: " + failedServer) + case Some(container) => restart(container, reason, state) + } + } +} + +//==================================================== +/** + * TODO: document + * + * @author Jonas Bonér + */ +private[supervisor] class SupervisorState(val supervisor: Supervisor, val faultHandler: FaultHandlingStrategy) extends Logging { + faultHandler.supervisor = supervisor + + private val _lock = new ReadWriteLock + private val _serverContainerRegistry = new HashMap[String, GenericServerContainer] + private var _supervisors: List[Supervisor] = Nil + + def supervisors: List[Supervisor] = _lock.withReadLock { + _supervisors + } + + def addSupervisor(supervisor: Supervisor) = _lock.withWriteLock { + _supervisors = supervisor :: _supervisors + } + + def serverContainers: List[GenericServerContainer] = _lock.withReadLock { + _serverContainerRegistry.values.toList + } + + def getServerContainer(id: String): Option[GenericServerContainer] = _lock.withReadLock { + if (_serverContainerRegistry.contains(id)) Some(_serverContainerRegistry(id)) + else None + } + + def addServerContainer(serverContainer: GenericServerContainer) = _lock.withWriteLock { + _serverContainerRegistry += serverContainer.id -> serverContainer + } + + def removeServerContainer(id: String) = _lock.withWriteLock { + getServerContainer(id) match { + case Some(serverContainer) => _serverContainerRegistry - id + case None => {} + } + } +} + diff --git a/supervisor/src/test/scala/GenericServerContainerSuite.scala b/supervisor/src/test/scala/GenericServerContainerSuite.scala new file mode 100755 index 0000000000..1b85425d74 --- /dev/null +++ b/supervisor/src/test/scala/GenericServerContainerSuite.scala @@ -0,0 +1,202 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package com.scalablesolutions.akka.supervisor + +import org.specs.runner.JUnit4 +import org.specs.Specification + +import scala.actors._ +import scala.actors.Actor._ + +/** + * @author Jonas Bonér + */ +class GenericServerContainerTest extends JUnit4(genericServerContainerSpec) // for JUnit4 and Maven +object genericServerContainerSpec extends Specification { + + var inner: GenericServerContainerActor = null + var server: GenericServerContainer = null + def createProxy(f: () => GenericServer) = { + val server = new GenericServerContainer("server", f) + server.setTimeout(100) + server + } + + inner = new GenericServerContainerActor + server = createProxy(() => inner) + server.newServer + server.start + + "server should be initialized" in { + server.init("testInit") + Thread.sleep(100) + expect("initializing: testInit") { + inner.log + } + } + + "server should terminate with a reason " in { + server.terminate("testTerminateWithReason", 100) + Thread.sleep(100) + expect("terminating: testTerminateWithReason") { + inner.log + } + } + + "server respond to async oneway message" in { + server ! OneWay + Thread.sleep(100) + expect("got a oneway") { + inner.log + } + } + + "server respond to async ping message" in { + server ! Ping + Thread.sleep(100) + expect("got a ping") { + inner.log + } + } + + "server respond to !!!" in { + expect("pong") { + (server !!! Ping).getOrElse("nil") + } + expect("got a ping") { + inner.log + } + } + + "server respond to !?" in { + expect("pong") { + val res: String = server !? Ping + res + } + expect("got a ping") { + inner.log + } + } + + "server respond to !!! with timeout" in { + expect("pong") { + (server !!! Ping).getOrElse("nil") + } + expect("got a ping") { + inner.log + } + } + + "server respond to !!! with timeout" in { + expect("error handler") { + server !!! (OneWay, "error handler") + } + expect("got a oneway") { + inner.log + } + } + + "server respond to !!! and return future with timeout" in { + val future = server !! Ping + future.receiveWithin(100) match { + case None => fail("timed out") // timed out + case Some(reply) => + expect("got a ping") { + inner.log + } + assert("pong" === reply) + } + } + + "server respond to !!! and return future with timeout" in { + val future = server !! OneWay + future.receiveWithin(100) match { + case None => + expect("got a oneway") { + inner.log + } + case Some(reply) => + fail("expected a timeout, got Some(reply)") + } + } + + "server respond do hotswap" in { + // using base + expect("pong") { + (server !!! Ping).getOrElse("nil") + } + + // hotswapping + server.hotswap(Some({ + case Ping => reply("hotswapped pong") + })) + expect("hotswapped pong") { + (server !!! Ping).getOrElse("nil") + } + } + + "server respond do double hotswap" in { + // using base + expect("pong") { + (server !!! Ping).getOrElse("nil") + } + + // hotswapping + server.hotswap(Some({ + case Ping => reply("hotswapped pong") + })) + expect("hotswapped pong") { + (server !!! Ping).getOrElse("nil") + } + + // hotswapping again + server.hotswap(Some({ + case Ping => reply("hotswapped pong again") + })) + expect("hotswapped pong again") { + (server !!! Ping).getOrElse("nil") + } + } + + "server respond do hotswap and then revert" in { + // using base + expect("pong") { + (server !!! Ping).getOrElse("nil") + } + + // hotswapping + server.hotswap(Some({ + case Ping => reply("hotswapped pong") + })) + expect("hotswapped pong") { + (server !!! Ping).getOrElse("nil") + } + + // restoring original base + server.hotswap(None) + expect("pong") { + (server !!! Ping).getOrElse("nil") + } + } +} + + +class GenericServerContainerActor extends GenericServer { + var log = "" + + override def body: PartialFunction[Any, Unit] = { + case Ping => + log = "got a ping" + reply("pong") + + case OneWay => + log = "got a oneway" + } + + override def init(config: AnyRef) = log = "initializing: " + config + override def shutdown(reason: AnyRef) = log = "terminating: " + reason +} + + diff --git a/supervisor/src/test/scala/GenericServerSuite.scala b/supervisor/src/test/scala/GenericServerSuite.scala new file mode 100755 index 0000000000..44aab326eb --- /dev/null +++ b/supervisor/src/test/scala/GenericServerSuite.scala @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package com.scalablesolutions.akka.supervisor + +import org.specs.runner.JUnit4 +import org.specs.Specification + +import scala.actors._ +import scala.actors.Actor._ + +/** + * @author Jonas Bonér + */ +class GenericServerTest extends JUnit4(genericServerSpec) // for JUnit4 and Maven +object genericServerSpec extends Specification { + + "server should respond to a regular message" in { + val server = new TestGenericServerActor + server.start + server !? Ping match { + case reply: String => + assert("got a ping" === server.log) + assert("pong" === reply) + case _ => fail() + } + } +} + +class TestGenericServerActor extends GenericServer { + var log: String = "" + + override def body: PartialFunction[Any, Unit] = { + case Ping => + log = "got a ping" + reply("pong") + } +} + diff --git a/supervisor/src/test/scala/Messages.scala b/supervisor/src/test/scala/Messages.scala new file mode 100755 index 0000000000..0f014ca692 --- /dev/null +++ b/supervisor/src/test/scala/Messages.scala @@ -0,0 +1,12 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package com.scalablesolutions.akka.supervisor + +sealed abstract class TestMessage +case object Ping extends TestMessage +case object Pong extends TestMessage +case object OneWay extends TestMessage +case object Die extends TestMessage +case object NotifySupervisorExit extends TestMessage diff --git a/supervisor/src/test/scala/SupervisorStateSuite.scala b/supervisor/src/test/scala/SupervisorStateSuite.scala new file mode 100755 index 0000000000..6df3b7e059 --- /dev/null +++ b/supervisor/src/test/scala/SupervisorStateSuite.scala @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package com.scalablesolutions.akka.supervisor + +import org.specs.runner.JUnit4 +import org.specs.Specification + +import scala.actors._ +import scala.actors.Actor._ + +/** + * @author Jonas Bonér + */ +class SupervisorStateTest extends JUnit4(supervisorStateSpec) // for JUnit4 and Maven +object supervisorStateSpec extends Specification { + val dummyActor = new GenericServer { override def body: PartialFunction[Any, Unit] = { case _ => }} + val newDummyActor = () => dummyActor + var state: SupervisorState = _ + var proxy: GenericServerContainer = _ + var supervisor: Supervisor = _ + + proxy = new GenericServerContainer("server1", newDummyActor) + object factory extends SupervisorFactory { + override def getSupervisorConfig: SupervisorConfig = { + SupervisorConfig( + RestartStrategy(AllForOne, 3, 100), + Worker( + proxy, + LifeCycle(Permanent, 100)) + :: Nil) + } + } + + supervisor = factory.newSupervisor + state = new SupervisorState(supervisor, new AllForOneStrategy(3, 100)) + + "supervisor state should return added server" in { + state.addServerContainer(proxy) + state.getServerContainer("server1") match { + case None => fail("should have returned server") + case Some(server) => + assert(server != null) + assert(server.isInstanceOf[GenericServerContainer]) + assert(proxy === server) + } + } + + "supervisor state should remove added server" in { + state.addServerContainer(proxy) + + state.removeServerContainer("server1") + state.getServerContainer("server1") match { + case Some(_) => fail("should have returned None") + case None => + } + state.getServerContainer("dummyActor") match { + case Some(_) => fail("should have returned None") + case None => + } + } + + "supervisor state should fail getting non-existent server by symbol" in { + state.getServerContainer("server2") match { + case Some(_) => fail("should have returned None") + case None => + } + } + + "supervisor state should fail getting non-existent server by actor" in { + state.getServerContainer("dummyActor") match { + case Some(_) => fail("should have returned None") + case None => + } + } +} diff --git a/supervisor/src/test/scala/SupervisorSuite.scala b/supervisor/src/test/scala/SupervisorSuite.scala new file mode 100755 index 0000000000..4e8bd048e1 --- /dev/null +++ b/supervisor/src/test/scala/SupervisorSuite.scala @@ -0,0 +1,434 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package com.scalablesolutions.akka.supervisor + +import org.specs.runner.JUnit4 +import org.specs.Specification + +import scala.actors._ +import scala.actors.Actor._ +import scala.collection.Map +import scala.collection.mutable.HashMap + +/** + * @author Jonas Bonér + */ +class SupervisorTest extends JUnit4(supervisorSpec) // for JUnit4 and Maven +object supervisorSpec extends Specification { + + var messageLog: String = "" + val pingpong1 = new GenericServerContainer("pingpong1", () => new PingPong1Actor) + val pingpong2 = new GenericServerContainer("pingpong2", () => new PingPong2Actor) + val pingpong3 = new GenericServerContainer("pingpong3", () => new PingPong3Actor) + + pingpong1.setTimeout(100) + pingpong2.setTimeout(100) + pingpong3.setTimeout(100) + + @BeforeMethod + def setup = messageLog = "" + + // =========================================== + "starting supervisor should start the servers" in { + val sup = getSingleActorAllForOneSupervisor + sup ! Start + + expect("pong") { + (pingpong1 !!! Ping).getOrElse("nil") + } + } + + // =========================================== + "started supervisor should be able to return started servers" in { + val sup = getSingleActorAllForOneSupervisor + sup ! Start + val server = sup.getServerOrElse("pingpong1", throw new RuntimeException("server not found")) + assert(server.isInstanceOf[GenericServerContainer]) + assert(server === pingpong1) + } + + // =========================================== + "started supervisor should fail returning non-existing server" in { + val sup = getSingleActorAllForOneSupervisor + sup ! Start + intercept(classOf[RuntimeException]) { + sup.getServerOrElse("wrong_name", throw new RuntimeException("server not found")) + } + } + + // =========================================== + "supervisor should restart killed server with restart strategy one_for_one" in { + val sup = getSingleActorOneForOneSupervisor + sup ! Start + + intercept(classOf[RuntimeException]) { + pingpong1 !!! (Die, throw new RuntimeException("TIME OUT")) + } + Thread.sleep(100) + expect("oneforone") { + messageLog + } + } + + // =========================================== + "supervisor should restart used killed server with restart strategy one_for_one" in { + val sup = getSingleActorOneForOneSupervisor + sup ! Start + + expect("pong") { + (pingpong1 !!! Ping).getOrElse("nil") + } + Thread.sleep(100) + expect("ping") { + messageLog + } + intercept(classOf[RuntimeException]) { + pingpong1 !!! (Die, throw new RuntimeException("TIME OUT")) + } + Thread.sleep(100) + expect("pingoneforone") { + messageLog + } + expect("pong") { + (pingpong1 !!! Ping).getOrElse("nil") + } + Thread.sleep(100) + expect("pingoneforoneping") { + messageLog + } + } + + // =========================================== + "supervisor should restart killed server with restart strategy all_for_one" in { + val sup = getSingleActorAllForOneSupervisor + sup ! Start + intercept(classOf[RuntimeException]) { + pingpong1 !!! (Die, throw new RuntimeException("TIME OUT")) + } + Thread.sleep(100) + expect("allforone") { + messageLog + } + } + + // =========================================== + "supervisor should restart used killed server with restart strategy all_for_one" in { + val sup = getSingleActorAllForOneSupervisor + sup ! Start + expect("pong") { + (pingpong1 !!! Ping).getOrElse("nil") + } + Thread.sleep(100) + expect("ping") { + messageLog + } + intercept(classOf[RuntimeException]) { + pingpong1 !!! (Die, throw new RuntimeException("TIME OUT")) + } + Thread.sleep(100) + expect("pingallforone") { + messageLog + } + expect("pong") { + (pingpong1 !!! Ping).getOrElse("nil") + } + Thread.sleep(100) + expect("pingallforoneping") { + messageLog + } + } + + // =========================================== + "supervisor should restart killed multiple servers with restart strategy one_for_one" in { + val sup = getMultipleActorsOneForOneConf + sup ! Start + intercept(classOf[RuntimeException]) { + pingpong3 !!! (Die, throw new RuntimeException("TIME OUT")) + } + Thread.sleep(100) + expect("oneforone") { + messageLog + } + } + + // =========================================== + "supervisor should restart killed multiple servers with restart strategy one_for_one" in { + val sup = getMultipleActorsOneForOneConf + sup ! Start + expect("pong") { + (pingpong1 !!! Ping).getOrElse("nil") + } + Thread.sleep(100) + expect("pong") { + (pingpong2 !!! Ping).getOrElse("nil") + } + Thread.sleep(100) + expect("pong") { + (pingpong3 !!! Ping).getOrElse("nil") + } + Thread.sleep(100) + expect("pingpingping") { + messageLog + } + intercept(classOf[RuntimeException]) { + pingpong2 !!! (Die, throw new RuntimeException("TIME OUT")) + } + Thread.sleep(100) + expect("pingpingpingoneforone") { + messageLog + } + expect("pong") { + (pingpong1 !!! Ping).getOrElse("nil") + } + Thread.sleep(100) + expect("pong") { + (pingpong2 !!! Ping).getOrElse("nil") + } + Thread.sleep(100) + expect("pong") { + (pingpong3 !!! Ping).getOrElse("nil") + } + Thread.sleep(100) + expect("pingpingpingoneforonepingpingping") { + messageLog + } + } + + // =========================================== + "supervisor should restart killed muliple servers with restart strategy all_for_one" in { + val sup = getMultipleActorsAllForOneConf + sup ! Start + intercept(classOf[RuntimeException]) { + pingpong2 !!! (Die, throw new RuntimeException("TIME OUT")) + } + Thread.sleep(100) + expect("allforoneallforoneallforone") { + messageLog + } + } + + // =========================================== + "supervisor should restart killed muliple servers with restart strategy all_for_one" in { + val sup = getMultipleActorsAllForOneConf + sup ! Start + expect("pong") { + (pingpong1 !!! Ping).getOrElse("nil") + } + Thread.sleep(100) + expect("pong") { + (pingpong2 !!! Ping).getOrElse("nil") + } + Thread.sleep(100) + expect("pong") { + (pingpong3 !!! Ping).getOrElse("nil") + } + Thread.sleep(100) + expect("pingpingping") { + messageLog + } + intercept(classOf[RuntimeException]) { + pingpong2 !!! (Die, throw new RuntimeException("TIME OUT")) + } + Thread.sleep(100) + expect("pingpingpingallforoneallforoneallforone") { + messageLog + } + expect("pong") { + (pingpong1 !!! Ping).getOrElse("nil") + } + Thread.sleep(100) + expect("pong") { + (pingpong2 !!! Ping).getOrElse("nil") + } + Thread.sleep(100) + expect("pong") { + (pingpong3 !!! Ping).getOrElse("nil") + } + Thread.sleep(100) + expect("pingpingpingallforoneallforoneallforonepingpingping") { + messageLog + } + } + + "supervisor should restart killed first-level server with restart strategy all_for_one" in { + val sup = getNestedSupervisorsAllForOneConf + sup ! Start + intercept(classOf[RuntimeException]) { + pingpong1 !!! (Die, throw new RuntimeException("TIME OUT")) + } + Thread.sleep(100) + expect("allforoneallforoneallforone") { + messageLog + } + } + + + // ============================================= + // Creat some supervisors with different configurations + + def getSingleActorAllForOneSupervisor: Supervisor = { + + // Create an abstract SupervisorContainer that works for all implementations + // of the different Actors (Services). + // + // Then create a concrete container in which we mix in support for the specific + // implementation of the Actors we want to use. + + object factory extends TestSupervisorFactory { + override def getSupervisorConfig: SupervisorConfig = { + SupervisorConfig( + RestartStrategy(AllForOne, 3, 100), + Worker( + pingpong1, + LifeCycle(Permanent, 100)) + :: Nil) + } + } + factory.newSupervisor + } + + def getSingleActorOneForOneSupervisor: Supervisor = { + object factory extends TestSupervisorFactory { + override def getSupervisorConfig: SupervisorConfig = { + SupervisorConfig( + RestartStrategy(OneForOne, 3, 100), + Worker( + pingpong1, + LifeCycle(Permanent, 100)) + :: Nil) + } + } + factory.newSupervisor + } + + def getMultipleActorsAllForOneConf: Supervisor = { + object factory extends TestSupervisorFactory { + override def getSupervisorConfig: SupervisorConfig = { + SupervisorConfig( + RestartStrategy(AllForOne, 3, 100), + Worker( + pingpong1, + LifeCycle(Permanent, 100)) + :: + Worker( + pingpong2, + LifeCycle(Permanent, 100)) + :: + Worker( + pingpong3, + LifeCycle(Permanent, 100)) + :: Nil) + } + } + factory.newSupervisor + } + + def getMultipleActorsOneForOneConf: Supervisor = { + object factory extends TestSupervisorFactory { + override def getSupervisorConfig: SupervisorConfig = { + SupervisorConfig( + RestartStrategy(OneForOne, 3, 100), + Worker( + pingpong1, + LifeCycle(Permanent, 100)) + :: + Worker( + pingpong2, + LifeCycle(Permanent, 100)) + :: + Worker( + pingpong3, + LifeCycle(Permanent, 100)) + :: Nil) + } + } + factory.newSupervisor + } + + def getNestedSupervisorsAllForOneConf: Supervisor = { + object factory extends TestSupervisorFactory { + override def getSupervisorConfig: SupervisorConfig = { + SupervisorConfig( + RestartStrategy(AllForOne, 3, 100), + Worker( + pingpong1, + LifeCycle(Permanent, 100)) + :: + SupervisorConfig( + RestartStrategy(AllForOne, 3, 100), + Worker( + pingpong2, + LifeCycle(Permanent, 100)) + :: + Worker( + pingpong3, + LifeCycle(Permanent, 100)) + :: Nil) + :: Nil) + } + } + factory.newSupervisor + } + + class PingPong1Actor extends GenericServer { + override def body: PartialFunction[Any, Unit] = { + case Ping => + messageLog += "ping" + reply("pong") + case Die => + throw new RuntimeException("Recieved Die message") + } + } + + class PingPong2Actor extends GenericServer { + override def body: PartialFunction[Any, Unit] = { + case Ping => + messageLog += "ping" + reply("pong") + case Die => + throw new RuntimeException("Recieved Die message") + } + } + + class PingPong3Actor extends GenericServer { + override def body: PartialFunction[Any, Unit] = { + case Ping => + messageLog += "ping" + reply("pong") + case Die => + throw new RuntimeException("Recieved Die message") + } + } + + // ============================================= + + class TestAllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends AllForOneStrategy(maxNrOfRetries, withinTimeRange) { + override def postRestart(serverContainer: GenericServerContainer) = { + messageLog += "allforone" + } + } + + class TestOneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends OneForOneStrategy(maxNrOfRetries, withinTimeRange) { + override def postRestart(serverContainer: GenericServerContainer) = { + messageLog += "oneforone" + } + } + + abstract class TestSupervisorFactory extends SupervisorFactory { + override def create(strategy: RestartStrategy): Supervisor = strategy match { + case RestartStrategy(scheme, maxNrOfRetries, timeRange) => + scheme match { + case AllForOne => new Supervisor(new TestAllForOneStrategy(maxNrOfRetries, timeRange)) + case OneForOne => new Supervisor(new TestOneForOneStrategy(maxNrOfRetries, timeRange)) + } + } + } +} + + + + + +