added the supervisor module to akka

This commit is contained in:
Jonas Boner 2009-03-10 00:59:32 +01:00
parent 606f34e908
commit 240261e2ee
12 changed files with 2025 additions and 0 deletions

1
supervisor/.#pom.xml Symbolic link
View file

@ -0,0 +1 @@
jboner@agu.8886:1236500639

214
supervisor/README Executable file
View file

@ -0,0 +1,214 @@
--- OVERVIEW ---
Scala Actors Behavior Module
Implements Erlang-style Behaviors for Scala; Supervisor, GenericServer, GenericEvent and GenericFiniteStateMachine allowing creating fault-tolerant actor-based enterprise systems.
The implementation consists of four main abstractions;
* Supervisor -- The Supervisor manages hierarchies of Scala actors and provides fault-tolerance in terms of different restart
semantics. The configuration and semantics is almost a 1-1 port of the Erlang Supervisor implementation, explained
here: http://www.erlang.org/doc/design_principles/sup_princ.html, read this document in order to understand how to
configure the Supervisor properly.
* GenericServer -- The GenericServer (which subclasses Actor) is a trait that forms the base for a server to be managed by a Supervisor.
The GenericServer is wrapped by a GenericServerContainer instance providing a necessary indirection needed to be able to
fully manage the life-cycle of the GenericServer.
* GenericEvent -- TBD
* GenericFiniteStateMachine -- TBD
--- CHECK OUT ---
The SCM system used is Git.
1. Download and install Git (google git).
2. Invoke 'git clone git://github.com/jboner/scala-otp.git'.
--- BUILD ---
The build system used is Maven.
1. Download and install Maven 2.
2. Step into the root dir 'scala-otp'.
3. Invoke 'mvn install'
This will build the project, run all tests, create a jar and upload it to your local Maven repository ready for use.
--- RUNTIME DEPENDENCIES ---
1. Scala 2.7.1-final
2. SLF4J 1.5.2
3. LogBack Classic 0.9.9
--- USAGE ---
Here is a small step-by-step runnable tutorial on how to create a server, configure it, use it, hotswap its
implementation etc. For more details on the API, look at the code or the tests.
You can find this code in the sample.scala file in the root directory. Run it by invoking 'scala -cp
target/scala-behavior-0.1-SNAPSHOT.jar:<path to slf4j and logback jars> sample.scala'
// =============================================
// 1. Import statements and Server messages
import scala.actors._
import scala.actors.Actor._
import scala.actors.behavior._
import scala.actors.behavior.Helpers._
sealed abstract class SampleMessage
case object Ping extends SampleMessage
case object Pong extends SampleMessage
case object OneWay extends SampleMessage
case object Die extends SampleMessage
// =============================================
// 2. Create the GenericServer by extending the GenericServer trait and override the 'body' method
class SampleServer extends GenericServer {
// This method implements the core server logic and naturally has to be overridden
override def body: PartialFunction[Any, Unit] = {
case Ping =>
println("Received Ping"); reply(Pong)
case OneWay =>
println("Received OneWay")
case Die =>
println("Received Die..dying...")
throw new RuntimeException("Received Die message")
}
// GenericServer also has some callback life-cycle methods, such as init(..) and shutdown(..)
}
// =============================================
// 3. Wrap our SampleServer in a GenericServerContainer and give it a name to be able to refer to it later.
object sampleServer1 extends GenericServerContainer("sample1", () => new SampleServer)
object sampleServer2 extends GenericServerContainer("sample2", () => new SampleServer)
// =============================================
// 4. Create a Supervisor configuration (and a SupervisorFactory) that is configuring our SampleServer (takes a list of
'Worker' configurations, one or many)
object factory extends SupervisorFactory {
override protected def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(
RestartStrategy(AllForOne, 3, 10000),
Worker(
sampleServer1,
LifeCycle(Permanent, 1000)) ::
Worker(
sampleServer2,
LifeCycle(Permanent, 1000)) ::
Nil)
}
}
// =============================================
// 5. Create a new Supervisor with the custom factory
val supervisor = factory.newSupervisor
// =============================================
// 6. Start the Supervisor (which starts the server(s))
supervisor ! Start
// =============================================
// 7. Try to send a one way asyncronous message to our servers
sampleServer1 ! OneWay
// Try to get sampleServer2 from the Supervisor before sending a message
supervisor.getServer("sample2") match {
case Some(server2) => server2 ! OneWay
case None => println("server [sample2] could not be found")
}
// =============================================
// 8. Try to send an asyncronous message - receive a future - wait 100 ms (time-out) for the reply
val future = sampleServer1 !! Ping
val reply1 = future.receiveWithin(100) match {
case Some(reply) =>
println("Received reply: " + reply)
case None =>
println("Did not get a reply witin 100 ms")
}
// =============================================
// 9. Try to send a message (Die) telling the server to kill itself (throw an exception)
sampleServer1 ! Die
// =============================================
// 10. Send an asyncronous message and wait on a future. If it times out -> use error handler (in this case throw an
exception). It is likely that this call will time out since the server is in the middle of recovering from failure.
val reply2 = try {
sampleServer1 !!! (Ping, throw new RuntimeException("Time-out"), 10) // time out is set to 10 ms (very low on purpose)
} catch { case e => println("Expected exception: " + e.toString); Pong }
// =============================================
// 11. Server should be up again. Try the same call again
val reply3 = try {
sampleServer1 !!! (Ping, throw new RuntimeException("Time-out"), 1000)
} catch { case e => println("Expected exception: " + e.toString); Pong }
// Also check server number 2
sampleServer2 ! Ping
// =============================================
// 11. Try to hotswap the server implementation
sampleServer1.hotswap(Some({
case Ping =>
println("Hotswapped Ping")
}))
// =============================================
// 12. Try the hotswapped server out
sampleServer1 ! Ping
// =============================================
// 13. Hotswap again
sampleServer1.hotswap(Some({
case Pong =>
println("Hotswapped again, now doing Pong")
reply(Ping)
}))
// =============================================
// 14. Send an asyncronous message that will wait on a future. Method returns an Option[T] => if Some(result) -> return
result, if None -> print out an info message (or throw an exception or do whatever you like...)
val reply4 = (sampleServer1 !!! Pong).getOrElse({println("Time out when sending Pong"); Ping})
// Same invocation with pattern matching syntax.
val reply5 = sampleServer1 !!! Pong match {
case Some(result) => result
case None => println("Time out when sending Pong"); Ping
}
// =============================================
// 15. Hotswap back to original implementation by passing in None
sampleServer1.hotswap(None)
// =============================================
// 16. Test the final hotswap by sending an async message
sampleServer1 ! Ping
// =============================================
// 17. Shut down the supervisor and its server(s)
supervisor ! Stop

140
supervisor/pom.xml Executable file
View file

@ -0,0 +1,140 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>akka-supervisor</artifactId>
<name>Akka Supervisor Module</name>
<packaging>jar</packaging>
<parent>
<artifactId>akka</artifactId>
<groupId>${akka.groupId}</groupId>
<version>${akka.version}</version>
</parent>
<dependencies>
<dependency>
<groupId>${akka.groupId}</groupId>
<artifactId>akka-util-java</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>net.lag</groupId>
<artifactId>configgy</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.4.3</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<!-- <testSourceDirectory>src/test/scala</testSourceDirectory> -->
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<!-- <goal>testCompile</goal> -->
</goals>
</execution>
</executions>
<configuration>
<args>
<arg>-target:jvm-1.5</arg>
<arg>-unchecked</arg>
</args>
<scalaVersion>${scala.version}</scalaVersion>
<vscaladocVersion>1.0</vscaladocVersion>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<configuration>
<downloadSources>true</downloadSources>
<buildcommands>
<buildcommand>
ch.epfl.lamp.sdt.core.scalabuilder
</buildcommand>
</buildcommands>
<additionalProjectnatures>
<projectnature>
ch.epfl.lamp.sdt.core.scalanature
</projectnature>
</additionalProjectnatures>
<classpathContainers>
<classpathContainer>
org.eclipse.jdt.launching.JRE_CONTAINER
</classpathContainer>
<classpathContainer>
ch.epfl.lamp.sdt.launching.SCALA_CONTAINER
</classpathContainer>
</classpathContainers>
</configuration>
</plugin>
<!-- <plugin> -->
<!-- <groupId>org.apache.maven.plugins</groupId> -->
<!-- <artifactId>maven-surefire-plugin</artifactId> -->
<!-- <configuration> -->
<!-- <excludes> -->
<!-- <exclude>**/Abstract*</exclude> -->
<!-- </excludes> -->
<!-- </configuration> -->
<!-- </plugin> -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>cobertura-maven-plugin</artifactId>
<version>2.2</version>
<configuration>
<formats>
<format>xml</format>
<format>html</format>
</formats>
</configuration>
</plugin>
</plugins>
<resources>
<resource>
<filtering>false</filtering>
<directory>src/main/resources</directory>
</resource>
<resource>
<filtering>false</filtering>
<directory>src/main/scala</directory>
<includes>
<include>**</include>
</includes>
<excludes>
<exclude>**/*.scala</exclude>
</excludes>
</resource>
</resources>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<vscaladocVersion>1.1</vscaladocVersion>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
</project>

164
supervisor/sample.scala Executable file
View file

@ -0,0 +1,164 @@
// =============================================
// 1. Import statements and Server messages
import scala.actors._
import scala.actors.Actor._
import scala.actors.behavior._
import scala.actors.behavior.Helpers._
sealed abstract class SampleMessage
case object Ping extends SampleMessage
case object Pong extends SampleMessage
case object OneWay extends SampleMessage
case object Die extends SampleMessage
// =============================================
// 2. Create the GenericServer by extending the GenericServer trait and override the 'body' method
class SampleServer extends GenericServer {
// This method implements the core server logic and naturally has to be overridden
override def body: PartialFunction[Any, Unit] = {
case Ping =>
println("Received Ping"); reply(Pong)
case OneWay =>
println("Received OneWay")
case Die =>
println("Received Die..dying...")
throw new RuntimeException("Received Die message")
}
// GenericServer also has some callback life-cycle methods, such as init(..) and shutdown(..)
}
// =============================================
// 3. Wrap our SampleServer in a GenericServerContainer and give it a name to be able to refer to it later.
object sampleServer1 extends GenericServerContainer("sample1", () => new SampleServer)
object sampleServer2 extends GenericServerContainer("sample2", () => new SampleServer)
// =============================================
// 4. Create a Supervisor configuration (and a SupervisorFactory) that is configuring our SampleServer (takes a list of 'Worker' configurations, one or many)
object factory extends SupervisorFactory {
override protected def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(
RestartStrategy(AllForOne, 3, 10000),
Worker(
sampleServer1,
LifeCycle(Permanent, 1000)) ::
Worker(
sampleServer2,
LifeCycle(Permanent, 1000)) ::
Nil)
}
}
// =============================================
// 5. Create a new Supervisor with the custom factory
val supervisor = factory.newSupervisor
// =============================================
// 6. Start the Supervisor (which starts the server(s))
supervisor ! Start
// =============================================
// 7. Try to send a one way asyncronous message to our servers
sampleServer1 ! OneWay
// Try to get sampleServer2 from the Supervisor before sending a message
supervisor.getServer("sample2") match {
case Some(server2) => server2 ! OneWay
case None => println("server [sample2] could not be found")
}
// =============================================
// 8. Try to send an asyncronous message - receive a future - wait 100 ms (time-out) for the reply
val future = sampleServer1 !! Ping
val reply1 = future.receiveWithin(100) match {
case Some(reply) =>
println("Received reply: " + reply)
case None =>
println("Did not get a reply witin 100 ms")
}
// =============================================
// 9. Try to send a message (Die) telling the server to kill itself (throw an exception)
sampleServer1 ! Die
// =============================================
// 10. Send an asyncronous message and wait on a future. If it times out -> use error handler (in this case throw an exception). It is likely that this call will time out since the server is in the middle of recovering from failure.
val reply2 = try {
sampleServer1 !!! (Ping, throw new RuntimeException("Time-out"), 10) // time out is set to 10 ms (very low on purpose)
} catch { case e => println("Expected exception: " + e.toString); Pong }
// =============================================
// 11. Server should be up again. Try the same call again
val reply3 = try {
sampleServer1 !!! (Ping, throw new RuntimeException("Time-out"), 1000)
} catch { case e => println("Expected exception: " + e.toString); Pong }
// Also check server number 2
sampleServer2 ! Ping
// =============================================
// 11. Try to hotswap the server implementation
sampleServer1.hotswap(Some({
case Ping =>
println("Hotswapped Ping")
}))
// =============================================
// 12. Try the hotswapped server out
sampleServer1 ! Ping
// =============================================
// 13. Hotswap again
sampleServer1.hotswap(Some({
case Pong =>
println("Hotswapped again, now doing Pong")
reply(Ping)
}))
// =============================================
// 14. Send an asyncronous message that will wait on a future. Method returns an Option[T] => if Some(result) -> return result, if None -> print out an info message (or throw an exception or do whatever you like...)
val reply4 = (sampleServer1 !!! Pong).getOrElse({println("Time out when sending Pong"); Ping})
// Same invocation with pattern matching syntax.
val reply5 = sampleServer1 !!! Pong match {
case Some(result) => result
case None => println("Time out when sending Pong"); Ping
}
// =============================================
// 15. Hotswap back to original implementation by passing in None
sampleServer1.hotswap(None)
// =============================================
// 16. Test the final hotswap by sending an async message
sampleServer1 ! Ping
// =============================================
// 17. Shut down the supervisor and its server(s)
supervisor ! Stop

View file

@ -0,0 +1,282 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package com.scalablesolutions.akka.supervisor
import scala.actors._
import scala.actors.Actor._
import com.scalablesolutions.akka.supervisor.Helpers._
sealed abstract class GenericServerMessage
case class Init(config: AnyRef) extends GenericServerMessage
case class ReInit(config: AnyRef) extends GenericServerMessage
case class Shutdown(reason: AnyRef) extends GenericServerMessage
case class Terminate(reason: AnyRef) extends GenericServerMessage
case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends GenericServerMessage
/**
* Base trait for all user-defined servers/actors.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait GenericServer extends Actor {
/**
* Template method implementing the server logic.
* To be implemented by subclassing server.
* <p/>
* Example code:
* <pre>
* override def body: PartialFunction[Any, Unit] = {
* case Ping =>
* println("got a ping")
* reply("pong")
*
* case OneWay =>
* println("got a oneway")
* }
* </pre>
*/
def body: PartialFunction[Any, Unit]
/**
* Callback method that is called during initialization.
* To be implemented by subclassing server.
*/
def init(config: AnyRef) {}
/**
* Callback method that is called during reinitialization after a server crash.
* To be implemented by subclassing server.
*/
def reinit(config: AnyRef) {}
/**
* Callback method that is called during termination.
* To be implemented by subclassing server.
*/
def shutdown(reason: AnyRef) {}
def act = loop { react { genericBase orElse actorBase } }
private def actorBase: PartialFunction[Any, Unit] = hotswap getOrElse body
private var hotswap: Option[PartialFunction[Any, Unit]] = None
private val genericBase: PartialFunction[Any, Unit] = {
case Init(config) => init(config)
case ReInit(config) => reinit(config)
case HotSwap(code) => hotswap = code
case Shutdown(reason) => shutdown(reason); reply('success)
case Terminate(reason) => exit(reason)
}
}
/**
* The container (proxy) for GenericServer, responsible for managing the life-cycle of the server;
* such as shutdown, restart, re-initialization etc.
* Each GenericServerContainer manages one GenericServer.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class GenericServerContainer(val id: String, var serverFactory: () => GenericServer) extends Logging {
require(id != null && id != "")
// TODO: see if we can parameterize class and add type safe getActor method
//class GenericServerContainer[T <: GenericServer](var factory: () => T) {
//def getActor: T = server
var lifeCycle: Option[LifeCycle] = None
val lock = new ReadWriteLock
private var server: GenericServer = null
private var currentConfig: Option[AnyRef] = None
private var timeout = 5000
/**
* Sends a one way message to the server - alias for <code>cast(message)</code>.
* <p>
* Example:
* <pre>
* server ! Message
* </pre>
*/
def !(message: Any) = {
require(server != null)
lock.withReadLock { server ! message }
}
/**
* Sends a message to the server returns a FutureWithTimeout holding the future reply .
* <p>
* Example:
* <pre>
* val future = server !! Message
* future.receiveWithin(100) match {
* case None => ... // timed out
* case Some(reply) => ... // handle reply
* }
* </pre>
*/
def !![T](message: Any): FutureWithTimeout[T] = {
require(server != null)
lock.withReadLock { server !!! message }
}
/**
* Sends a message to the server and blocks indefinitely (no time out), waiting for the reply.
* <p>
* Example:
* <pre>
* val result: String = server !? Message
* </pre>
*/
def !?[T](message: Any): T = {
require(server != null)
val future: Future[T] = lock.withReadLock { server.!![T](message, {case t => t.asInstanceOf[T]}) }
Actor.receive {
case (future.ch ! arg) => arg.asInstanceOf[T]
}
}
/**
* Sends a message to the server and gets a future back with the reply. Returns
* an Option with either Some(result) if succesful or None if timeout.
* <p>
* Timeout specified by the <code>setTimeout(time: Int)</code> method.
* <p>
* Example:
* <pre>
* (server !!! Message).getOrElse(throw new RuntimeException("time out")
* </pre>
*/
def !!![T](message: Any): Option[T] = {
require(server != null)
val future: FutureWithTimeout[T] = lock.withReadLock { server !!! message }
future.receiveWithin(timeout)
}
/**
* Sends a message to the server and gets a future back with the reply.
* <p>
* Tries to get the reply within the timeout specified in the GenericServerContainer
* and else execute the error handler (which can return a default value, throw an exception
* or whatever is appropriate).
* <p>
* Example:
* <pre>
* server !!! (Message, throw new RuntimeException("time out"))
* // OR
* server !!! (Message, DefaultReturnValue)
* </pre>
*/
def !!![T](message: Any, errorHandler: => T): T = !!!(message, errorHandler, timeout)
/**
* Sends a message to the server and gets a future back with the reply.
* <p>
* Tries to get the reply within the timeout specified as parameter to the method
* and else execute the error handler (which can return a default value, throw an exception
* or whatever is appropriate).
* <p>
* Example:
* <pre>
* server !!! (Message, throw new RuntimeException("time out"), 1000)
* // OR
* server !!! (Message, DefaultReturnValue, 1000)
* </pre>
*/
def !!![T](message: Any, errorHandler: => T, time: Int): T = {
require(server != null)
val future: FutureWithTimeout[T] = lock.withReadLock { server !!! message }
future.receiveWithin(time) match {
case None => errorHandler
case Some(reply) => reply
}
}
/**
* Initializes the server by sending a Init(config) message.
*/
def init(config: AnyRef) = lock.withWriteLock {
currentConfig = Some(config)
server ! Init(config)
}
/**
* Re-initializes the server by sending a ReInit(config) message with the most recent configuration.
*/
def reinit = lock.withWriteLock {
currentConfig match {
case Some(config) => server ! ReInit(config)
case None => {}
}
}
/**
* Hotswaps the server body by sending it a HotSwap(code) with the new code
* block (PartialFunction) to be executed.
*/
def hotswap(code: Option[PartialFunction[Any, Unit]]) = lock.withReadLock { server ! HotSwap(code) }
/**
* Swaps the server factory, enabling creating of a completely new server implementation
* (upon failure and restart).
*/
def swapFactory(newFactory: () => GenericServer) = serverFactory = newFactory
/**
* Sets the timeout for the call(..) method, e.g. the maximum time to wait for a reply
* before bailing out. Sets the timeout on the future return from the call to the server.
*/
def setTimeout(time: Int) = timeout = time
/**
* Returns the next message in the servers mailbox.
*/
def nextMessage = lock.withReadLock { server ? }
/**
* Creates a new actor for the GenericServerContainer, and return the newly created actor.
*/
private[supervisor] def newServer(): GenericServer = lock.withWriteLock {
server = serverFactory()
server
}
/**
* Starts the server.
*/
private[supervisor] def start = lock.withReadLock { server.start }
/**
* Terminates the server with a reason by sending a Terminate(Some(reason)) message.
*/
private[supervisor] def terminate(reason: AnyRef) = lock.withReadLock { server ! Terminate(reason) }
/**
* Terminates the server with a reason by sending a Terminate(Some(reason)) message,
* the shutdownTime defines the maximal time to wait for the server to shutdown before
* killing it.
*/
private[supervisor] def terminate(reason: AnyRef, shutdownTime: Int) = lock.withReadLock {
if (shutdownTime > 0) {
log.debug("Waiting {} milliseconds for the server to shut down before killing it.", shutdownTime)
server !? (shutdownTime, Shutdown(reason)) match {
case Some('success) => log.debug("Server [{}] has been shut down cleanly.", id)
case None => log.warning("Server [{}] was **not able** to complete shutdown cleanly within its configured shutdown time [{}]", id, shutdownTime)
}
}
server ! Terminate(reason)
}
private[supervisor] def reconfigure(reason: AnyRef, restartedServer: GenericServer, supervisor: Supervisor) = lock.withWriteLock {
server = restartedServer
reinit
}
private[supervisor] def getServer: GenericServer = server
}

View file

@ -0,0 +1,101 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package com.scalablesolutions.akka.supervisor
import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.actors._
import scala.actors.Actor._
import net.lag.logging.Logger
class SystemFailure(cause: Throwable) extends RuntimeException(cause)
/**
* Base trait for all classes that wants to be able use the logging infrastructure.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Logging {
@transient val log = Logger.get(this.getClass.getName)
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Helpers extends Logging {
// ================================================
class ReadWriteLock {
private val rwl = new ReentrantReadWriteLock
private val readLock = rwl.readLock
private val writeLock = rwl.writeLock
def withWriteLock[T](body: => T): T = {
writeLock.lock
try {
body
} finally {
writeLock.unlock
}
}
def withReadLock[T](body: => T): T = {
readLock.lock
try {
body
} finally {
readLock.unlock
}
}
}
// ================================================
// implicit conversion between regular actor and actor with a type future
implicit def actorWithFuture(a: Actor) = new ActorWithTypedFuture(a)
abstract class FutureWithTimeout[T](ch: InputChannel[Any]) extends Future[T](ch) {
def receiveWithin(timeout: Int) : Option[T]
override def respond(f: T => Unit): Unit = throw new UnsupportedOperationException("Does not support the Responder API")
}
def receiveOrFail[T](future: => FutureWithTimeout[T], timeout: Int, errorHandler: => T): T = {
future.receiveWithin(timeout) match {
case None => errorHandler
case Some(reply) => reply
}
}
class ActorWithTypedFuture(a: Actor) {
require(a != null)
def !!![A](msg: Any): FutureWithTimeout[A] = {
val ftch = new Channel[Any](Actor.self)
a.send(msg, ftch)
new FutureWithTimeout[A](ftch) {
def apply() =
if (isSet) value.get.asInstanceOf[A]
else ch.receive {
case a =>
value = Some(a)
value.get.asInstanceOf[A]
}
def isSet = receiveWithin(0).isDefined
def receiveWithin(timeout: Int): Option[A] = value match {
case None => ch.receiveWithin(timeout) {
case TIMEOUT =>
log.debug("Future timed out while waiting for actor: {}", a)
None
case a =>
value = Some(a)
value.asInstanceOf[Option[A]]
}
case a => a.asInstanceOf[Option[A]]
}
}
}
}
}

View file

@ -0,0 +1,358 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package com.scalablesolutions.akka.supervisor
import scala.actors._
import scala.actors.Actor._
import scala.collection.mutable.HashMap
import com.scalablesolutions.akka.supervisor.Helpers._
//====================================================
/**
* Configuration classes - not to be used as messages.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed abstract class ConfigElement
abstract class Server extends ConfigElement
abstract class FailOverScheme extends ConfigElement
abstract class Scope extends ConfigElement
case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server
case class Worker(serverContainer: GenericServerContainer, lifeCycle: LifeCycle) extends Server
case class RestartStrategy(scheme: FailOverScheme, maxNrOfRetries: Int, withinTimeRange: Int) extends ConfigElement
case object AllForOne extends FailOverScheme
case object OneForOne extends FailOverScheme
case class LifeCycle(scope: Scope, shutdownTime: Int) extends ConfigElement
case object Permanent extends Scope
case object Transient extends Scope
case object Temporary extends Scope
//====================================================
/**
* Messages that the supervisor responds to and returns.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed abstract class SupervisorMessage
case object Start extends SupervisorMessage
case object Stop extends SupervisorMessage
case class Configure(config: SupervisorConfig, factory: SupervisorFactory) extends SupervisorMessage
/**
* Abstract base class for all supervisor factories.
* <p>
* Example usage:
* <pre>
* class MySupervisorFactory extends SupervisorFactory {
*
* override protected def getSupervisorConfig: SupervisorConfig = {
* SupervisorConfig(
* RestartStrategy(OneForOne, 3, 10),
* Worker(
* myFirstActorInstance,
* LifeCycle(Permanent, 1000))
* ::
* Worker(
* mySecondActorInstance,
* LifeCycle(Permanent, 1000))
* :: Nil)
* }
* }
* </pre>
*
* Then create a concrete factory in which we mix in support for the specific implementation of the Service we want to use.
*
* <pre>
* object factory extends MySupervisorFactory
* </pre>
*
* Then create a new Supervisor tree with the concrete Services we have defined.
*
* <pre>
* val supervisor = factory.newSupervisor
* supervisor ! Start // start up all managed servers
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
abstract class SupervisorFactory extends Logging {
def newSupervisor: Supervisor = newSupervisorFor(getSupervisorConfig)
def newSupervisorFor(config: SupervisorConfig): Supervisor = config match {
case SupervisorConfig(restartStrategy, _) =>
val supervisor = create(restartStrategy)
supervisor.start
supervisor !? Configure(config, this) match {
case 'success => log.debug("Supervisor successfully configured")
case _ => log.error("Supervisor could not be configured")
}
supervisor
}
/**
* To be overridden by concrete factory.
* Should return the SupervisorConfig for the supervisor.
*/
protected def getSupervisorConfig: SupervisorConfig
protected def create(strategy: RestartStrategy): Supervisor = strategy match {
case RestartStrategy(scheme, maxNrOfRetries, timeRange) =>
scheme match {
case AllForOne => new Supervisor(new AllForOneStrategy(maxNrOfRetries, timeRange))
case OneForOne => new Supervisor(new OneForOneStrategy(maxNrOfRetries, timeRange))
}
}
}
//====================================================
/**
* TODO: document
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class Supervisor(faultHandler: FaultHandlingStrategy) extends Actor with Logging {
private val state = new SupervisorState(this, faultHandler)
/**
* Returns an Option with the GenericServerContainer for the server with the name specified.
* If the server is found then Some(server) is returned else None.
*/
def getServer(id: String): Option[GenericServerContainer] = state.getServerContainer(id)
/**
* Returns an the GenericServerContainer for the server with the name specified.
* If the server is not found then the error handler is invoked.
*/
def getServerOrElse(id: String, errorHandler: => GenericServerContainer): GenericServerContainer = {
getServer(id) match {
case Some(serverContainer) => serverContainer
case None => errorHandler
}
}
def act = {
self.trapExit = true
loop {
react {
case Configure(config, factory) =>
log.debug("Configuring supervisor:{} ", this)
configure(config, factory)
reply('success)
case Start =>
state.serverContainers.foreach { serverContainer =>
serverContainer.start
log.info("Starting server: {}", serverContainer.getServer)
}
case Stop =>
state.serverContainers.foreach { serverContainer =>
serverContainer.terminate('normal)
log.info("Stopping server: {}", serverContainer)
}
log.info("Stopping supervisor: {}", this)
exit('normal)
case Exit(failedServer, reason) =>
reason match {
case 'forced => {} // do nothing
case _ => state.faultHandler.handleFailure(state, failedServer, reason)
}
case unexpected => log.warning("Unexpected message [{}], ignoring...", unexpected)
}
}
}
private def configure(config: SupervisorConfig, factory: SupervisorFactory) = config match {
case SupervisorConfig(_, servers) =>
servers.map(server =>
server match {
case Worker(serverContainer, lifecycle) =>
serverContainer.lifeCycle = Some(lifecycle)
spawnLink(serverContainer)
case SupervisorConfig(_, _) => // recursive configuration
val supervisor = factory.newSupervisorFor(server.asInstanceOf[SupervisorConfig])
supervisor ! Start
state.addSupervisor(supervisor)
})
}
private[supervisor] def spawnLink(serverContainer: GenericServerContainer): GenericServer = {
val newServer = serverContainer.newServer()
newServer.start
self.link(newServer)
log.debug("Linking actor [{}] to supervisor [{}]", newServer, this)
state.addServerContainer(serverContainer)
newServer
}
}
//====================================================
/**
* TODO: document
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
abstract class FaultHandlingStrategy(val maxNrOfRetries: Int, val withinTimeRange: Int) extends Logging {
private[supervisor] var supervisor: Supervisor = _
private var nrOfRetries = 0
private var retryStartTime = currentTime
private[supervisor] def handleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = {
nrOfRetries += 1
if (timeRangeHasExpired) {
if (hasReachedMaximumNrOfRetries) {
log.info("Maximum of restarts [{}] for server [{}] has been reached - the supervisor including all its servers will now be shut down.", maxNrOfRetries, failedServer)
supervisor ! Stop // execution stops here
} else {
nrOfRetries = 0
retryStartTime = currentTime
}
}
doHandleFailure(state, failedServer, reason)
}
private[supervisor] def restart(serverContainer: GenericServerContainer, reason: AnyRef, state: SupervisorState) = {
preRestart(serverContainer)
serverContainer.lock.withWriteLock {
// TODO: this is the place to fail-over all pending messages in the failing actor's mailbox, if possible to get a hold of them
// e.g. something like 'serverContainer.getServer.getPendingMessages.map(newServer ! _)'
self.unlink(serverContainer.getServer)
serverContainer.lifeCycle match {
case None => throw new IllegalStateException("Server [" + serverContainer.id + "] does not have a life-cycle defined.")
case Some(LifeCycle(scope, shutdownTime)) =>
serverContainer.terminate(reason, shutdownTime)
scope match {
case Permanent =>
log.debug("Restarting server [{}] configured as PERMANENT.", serverContainer.id)
serverContainer.reconfigure(reason, supervisor.spawnLink(serverContainer), state.supervisor)
case Temporary =>
if (reason == 'normal) {
log.debug("Restarting server [{}] configured as TEMPORARY (since exited naturally).", serverContainer.id)
serverContainer.reconfigure(reason, supervisor.spawnLink(serverContainer), state.supervisor)
} else log.info("Server [{}] configured as TEMPORARY will not be restarted (received unnatural exit message).", serverContainer.id)
case Transient =>
log.info("Server [{}] configured as TRANSIENT will not be restarted.", serverContainer.id)
}
}
}
postRestart(serverContainer)
}
/**
* To be overriden by concrete strategies.
*/
protected def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef)
/**
* To be overriden by concrete strategies.
*/
protected def preRestart(serverContainer: GenericServerContainer) = {}
/**
* To be overriden by concrete strategies.
*/
protected def postRestart(serverContainer: GenericServerContainer) = {}
private def hasReachedMaximumNrOfRetries: Boolean = nrOfRetries > maxNrOfRetries
private def timeRangeHasExpired: Boolean = (currentTime - retryStartTime) > withinTimeRange
private def currentTime: Long = System.currentTimeMillis
}
//====================================================
/**
* TODO: document
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int)
extends FaultHandlingStrategy(maxNrOfRetries, withinTimeRange) {
override def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = {
log.error("Server [{}] has failed due to [{}] - scheduling restart - scheme: ALL_FOR_ONE.", failedServer, reason)
for (serverContainer <- state.serverContainers) restart(serverContainer, reason, state)
state.supervisors.foreach(_ ! Exit(failedServer, reason))
}
}
//====================================================
/**
* TODO: document
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int)
extends FaultHandlingStrategy(maxNrOfRetries, withinTimeRange) {
override def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = {
log.error("Server [{}] has failed due to [{}] - scheduling restart - scheme: ONE_FOR_ONE.", failedServer, reason)
var serverContainer: Option[GenericServerContainer] = None
state.serverContainers.foreach {
container => if (container.getServer == failedServer) serverContainer = Some(container)
}
serverContainer match {
case None => throw new RuntimeException("Could not find a generic server for actor: " + failedServer)
case Some(container) => restart(container, reason, state)
}
}
}
//====================================================
/**
* TODO: document
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[supervisor] class SupervisorState(val supervisor: Supervisor, val faultHandler: FaultHandlingStrategy) extends Logging {
faultHandler.supervisor = supervisor
private val _lock = new ReadWriteLock
private val _serverContainerRegistry = new HashMap[String, GenericServerContainer]
private var _supervisors: List[Supervisor] = Nil
def supervisors: List[Supervisor] = _lock.withReadLock {
_supervisors
}
def addSupervisor(supervisor: Supervisor) = _lock.withWriteLock {
_supervisors = supervisor :: _supervisors
}
def serverContainers: List[GenericServerContainer] = _lock.withReadLock {
_serverContainerRegistry.values.toList
}
def getServerContainer(id: String): Option[GenericServerContainer] = _lock.withReadLock {
if (_serverContainerRegistry.contains(id)) Some(_serverContainerRegistry(id))
else None
}
def addServerContainer(serverContainer: GenericServerContainer) = _lock.withWriteLock {
_serverContainerRegistry += serverContainer.id -> serverContainer
}
def removeServerContainer(id: String) = _lock.withWriteLock {
getServerContainer(id) match {
case Some(serverContainer) => _serverContainerRegistry - id
case None => {}
}
}
}

View file

@ -0,0 +1,202 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package com.scalablesolutions.akka.supervisor
import org.specs.runner.JUnit4
import org.specs.Specification
import scala.actors._
import scala.actors.Actor._
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class GenericServerContainerTest extends JUnit4(genericServerContainerSpec) // for JUnit4 and Maven
object genericServerContainerSpec extends Specification {
var inner: GenericServerContainerActor = null
var server: GenericServerContainer = null
def createProxy(f: () => GenericServer) = {
val server = new GenericServerContainer("server", f)
server.setTimeout(100)
server
}
inner = new GenericServerContainerActor
server = createProxy(() => inner)
server.newServer
server.start
"server should be initialized" in {
server.init("testInit")
Thread.sleep(100)
expect("initializing: testInit") {
inner.log
}
}
"server should terminate with a reason " in {
server.terminate("testTerminateWithReason", 100)
Thread.sleep(100)
expect("terminating: testTerminateWithReason") {
inner.log
}
}
"server respond to async oneway message" in {
server ! OneWay
Thread.sleep(100)
expect("got a oneway") {
inner.log
}
}
"server respond to async ping message" in {
server ! Ping
Thread.sleep(100)
expect("got a ping") {
inner.log
}
}
"server respond to !!!" in {
expect("pong") {
(server !!! Ping).getOrElse("nil")
}
expect("got a ping") {
inner.log
}
}
"server respond to !?" in {
expect("pong") {
val res: String = server !? Ping
res
}
expect("got a ping") {
inner.log
}
}
"server respond to !!! with timeout" in {
expect("pong") {
(server !!! Ping).getOrElse("nil")
}
expect("got a ping") {
inner.log
}
}
"server respond to !!! with timeout" in {
expect("error handler") {
server !!! (OneWay, "error handler")
}
expect("got a oneway") {
inner.log
}
}
"server respond to !!! and return future with timeout" in {
val future = server !! Ping
future.receiveWithin(100) match {
case None => fail("timed out") // timed out
case Some(reply) =>
expect("got a ping") {
inner.log
}
assert("pong" === reply)
}
}
"server respond to !!! and return future with timeout" in {
val future = server !! OneWay
future.receiveWithin(100) match {
case None =>
expect("got a oneway") {
inner.log
}
case Some(reply) =>
fail("expected a timeout, got Some(reply)")
}
}
"server respond do hotswap" in {
// using base
expect("pong") {
(server !!! Ping).getOrElse("nil")
}
// hotswapping
server.hotswap(Some({
case Ping => reply("hotswapped pong")
}))
expect("hotswapped pong") {
(server !!! Ping).getOrElse("nil")
}
}
"server respond do double hotswap" in {
// using base
expect("pong") {
(server !!! Ping).getOrElse("nil")
}
// hotswapping
server.hotswap(Some({
case Ping => reply("hotswapped pong")
}))
expect("hotswapped pong") {
(server !!! Ping).getOrElse("nil")
}
// hotswapping again
server.hotswap(Some({
case Ping => reply("hotswapped pong again")
}))
expect("hotswapped pong again") {
(server !!! Ping).getOrElse("nil")
}
}
"server respond do hotswap and then revert" in {
// using base
expect("pong") {
(server !!! Ping).getOrElse("nil")
}
// hotswapping
server.hotswap(Some({
case Ping => reply("hotswapped pong")
}))
expect("hotswapped pong") {
(server !!! Ping).getOrElse("nil")
}
// restoring original base
server.hotswap(None)
expect("pong") {
(server !!! Ping).getOrElse("nil")
}
}
}
class GenericServerContainerActor extends GenericServer {
var log = ""
override def body: PartialFunction[Any, Unit] = {
case Ping =>
log = "got a ping"
reply("pong")
case OneWay =>
log = "got a oneway"
}
override def init(config: AnyRef) = log = "initializing: " + config
override def shutdown(reason: AnyRef) = log = "terminating: " + reason
}

View file

@ -0,0 +1,40 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package com.scalablesolutions.akka.supervisor
import org.specs.runner.JUnit4
import org.specs.Specification
import scala.actors._
import scala.actors.Actor._
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class GenericServerTest extends JUnit4(genericServerSpec) // for JUnit4 and Maven
object genericServerSpec extends Specification {
"server should respond to a regular message" in {
val server = new TestGenericServerActor
server.start
server !? Ping match {
case reply: String =>
assert("got a ping" === server.log)
assert("pong" === reply)
case _ => fail()
}
}
}
class TestGenericServerActor extends GenericServer {
var log: String = ""
override def body: PartialFunction[Any, Unit] = {
case Ping =>
log = "got a ping"
reply("pong")
}
}

View file

@ -0,0 +1,12 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package com.scalablesolutions.akka.supervisor
sealed abstract class TestMessage
case object Ping extends TestMessage
case object Pong extends TestMessage
case object OneWay extends TestMessage
case object Die extends TestMessage
case object NotifySupervisorExit extends TestMessage

View file

@ -0,0 +1,77 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package com.scalablesolutions.akka.supervisor
import org.specs.runner.JUnit4
import org.specs.Specification
import scala.actors._
import scala.actors.Actor._
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class SupervisorStateTest extends JUnit4(supervisorStateSpec) // for JUnit4 and Maven
object supervisorStateSpec extends Specification {
val dummyActor = new GenericServer { override def body: PartialFunction[Any, Unit] = { case _ => }}
val newDummyActor = () => dummyActor
var state: SupervisorState = _
var proxy: GenericServerContainer = _
var supervisor: Supervisor = _
proxy = new GenericServerContainer("server1", newDummyActor)
object factory extends SupervisorFactory {
override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100),
Worker(
proxy,
LifeCycle(Permanent, 100))
:: Nil)
}
}
supervisor = factory.newSupervisor
state = new SupervisorState(supervisor, new AllForOneStrategy(3, 100))
"supervisor state should return added server" in {
state.addServerContainer(proxy)
state.getServerContainer("server1") match {
case None => fail("should have returned server")
case Some(server) =>
assert(server != null)
assert(server.isInstanceOf[GenericServerContainer])
assert(proxy === server)
}
}
"supervisor state should remove added server" in {
state.addServerContainer(proxy)
state.removeServerContainer("server1")
state.getServerContainer("server1") match {
case Some(_) => fail("should have returned None")
case None =>
}
state.getServerContainer("dummyActor") match {
case Some(_) => fail("should have returned None")
case None =>
}
}
"supervisor state should fail getting non-existent server by symbol" in {
state.getServerContainer("server2") match {
case Some(_) => fail("should have returned None")
case None =>
}
}
"supervisor state should fail getting non-existent server by actor" in {
state.getServerContainer("dummyActor") match {
case Some(_) => fail("should have returned None")
case None =>
}
}
}

View file

@ -0,0 +1,434 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package com.scalablesolutions.akka.supervisor
import org.specs.runner.JUnit4
import org.specs.Specification
import scala.actors._
import scala.actors.Actor._
import scala.collection.Map
import scala.collection.mutable.HashMap
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class SupervisorTest extends JUnit4(supervisorSpec) // for JUnit4 and Maven
object supervisorSpec extends Specification {
var messageLog: String = ""
val pingpong1 = new GenericServerContainer("pingpong1", () => new PingPong1Actor)
val pingpong2 = new GenericServerContainer("pingpong2", () => new PingPong2Actor)
val pingpong3 = new GenericServerContainer("pingpong3", () => new PingPong3Actor)
pingpong1.setTimeout(100)
pingpong2.setTimeout(100)
pingpong3.setTimeout(100)
@BeforeMethod
def setup = messageLog = ""
// ===========================================
"starting supervisor should start the servers" in {
val sup = getSingleActorAllForOneSupervisor
sup ! Start
expect("pong") {
(pingpong1 !!! Ping).getOrElse("nil")
}
}
// ===========================================
"started supervisor should be able to return started servers" in {
val sup = getSingleActorAllForOneSupervisor
sup ! Start
val server = sup.getServerOrElse("pingpong1", throw new RuntimeException("server not found"))
assert(server.isInstanceOf[GenericServerContainer])
assert(server === pingpong1)
}
// ===========================================
"started supervisor should fail returning non-existing server" in {
val sup = getSingleActorAllForOneSupervisor
sup ! Start
intercept(classOf[RuntimeException]) {
sup.getServerOrElse("wrong_name", throw new RuntimeException("server not found"))
}
}
// ===========================================
"supervisor should restart killed server with restart strategy one_for_one" in {
val sup = getSingleActorOneForOneSupervisor
sup ! Start
intercept(classOf[RuntimeException]) {
pingpong1 !!! (Die, throw new RuntimeException("TIME OUT"))
}
Thread.sleep(100)
expect("oneforone") {
messageLog
}
}
// ===========================================
"supervisor should restart used killed server with restart strategy one_for_one" in {
val sup = getSingleActorOneForOneSupervisor
sup ! Start
expect("pong") {
(pingpong1 !!! Ping).getOrElse("nil")
}
Thread.sleep(100)
expect("ping") {
messageLog
}
intercept(classOf[RuntimeException]) {
pingpong1 !!! (Die, throw new RuntimeException("TIME OUT"))
}
Thread.sleep(100)
expect("pingoneforone") {
messageLog
}
expect("pong") {
(pingpong1 !!! Ping).getOrElse("nil")
}
Thread.sleep(100)
expect("pingoneforoneping") {
messageLog
}
}
// ===========================================
"supervisor should restart killed server with restart strategy all_for_one" in {
val sup = getSingleActorAllForOneSupervisor
sup ! Start
intercept(classOf[RuntimeException]) {
pingpong1 !!! (Die, throw new RuntimeException("TIME OUT"))
}
Thread.sleep(100)
expect("allforone") {
messageLog
}
}
// ===========================================
"supervisor should restart used killed server with restart strategy all_for_one" in {
val sup = getSingleActorAllForOneSupervisor
sup ! Start
expect("pong") {
(pingpong1 !!! Ping).getOrElse("nil")
}
Thread.sleep(100)
expect("ping") {
messageLog
}
intercept(classOf[RuntimeException]) {
pingpong1 !!! (Die, throw new RuntimeException("TIME OUT"))
}
Thread.sleep(100)
expect("pingallforone") {
messageLog
}
expect("pong") {
(pingpong1 !!! Ping).getOrElse("nil")
}
Thread.sleep(100)
expect("pingallforoneping") {
messageLog
}
}
// ===========================================
"supervisor should restart killed multiple servers with restart strategy one_for_one" in {
val sup = getMultipleActorsOneForOneConf
sup ! Start
intercept(classOf[RuntimeException]) {
pingpong3 !!! (Die, throw new RuntimeException("TIME OUT"))
}
Thread.sleep(100)
expect("oneforone") {
messageLog
}
}
// ===========================================
"supervisor should restart killed multiple servers with restart strategy one_for_one" in {
val sup = getMultipleActorsOneForOneConf
sup ! Start
expect("pong") {
(pingpong1 !!! Ping).getOrElse("nil")
}
Thread.sleep(100)
expect("pong") {
(pingpong2 !!! Ping).getOrElse("nil")
}
Thread.sleep(100)
expect("pong") {
(pingpong3 !!! Ping).getOrElse("nil")
}
Thread.sleep(100)
expect("pingpingping") {
messageLog
}
intercept(classOf[RuntimeException]) {
pingpong2 !!! (Die, throw new RuntimeException("TIME OUT"))
}
Thread.sleep(100)
expect("pingpingpingoneforone") {
messageLog
}
expect("pong") {
(pingpong1 !!! Ping).getOrElse("nil")
}
Thread.sleep(100)
expect("pong") {
(pingpong2 !!! Ping).getOrElse("nil")
}
Thread.sleep(100)
expect("pong") {
(pingpong3 !!! Ping).getOrElse("nil")
}
Thread.sleep(100)
expect("pingpingpingoneforonepingpingping") {
messageLog
}
}
// ===========================================
"supervisor should restart killed muliple servers with restart strategy all_for_one" in {
val sup = getMultipleActorsAllForOneConf
sup ! Start
intercept(classOf[RuntimeException]) {
pingpong2 !!! (Die, throw new RuntimeException("TIME OUT"))
}
Thread.sleep(100)
expect("allforoneallforoneallforone") {
messageLog
}
}
// ===========================================
"supervisor should restart killed muliple servers with restart strategy all_for_one" in {
val sup = getMultipleActorsAllForOneConf
sup ! Start
expect("pong") {
(pingpong1 !!! Ping).getOrElse("nil")
}
Thread.sleep(100)
expect("pong") {
(pingpong2 !!! Ping).getOrElse("nil")
}
Thread.sleep(100)
expect("pong") {
(pingpong3 !!! Ping).getOrElse("nil")
}
Thread.sleep(100)
expect("pingpingping") {
messageLog
}
intercept(classOf[RuntimeException]) {
pingpong2 !!! (Die, throw new RuntimeException("TIME OUT"))
}
Thread.sleep(100)
expect("pingpingpingallforoneallforoneallforone") {
messageLog
}
expect("pong") {
(pingpong1 !!! Ping).getOrElse("nil")
}
Thread.sleep(100)
expect("pong") {
(pingpong2 !!! Ping).getOrElse("nil")
}
Thread.sleep(100)
expect("pong") {
(pingpong3 !!! Ping).getOrElse("nil")
}
Thread.sleep(100)
expect("pingpingpingallforoneallforoneallforonepingpingping") {
messageLog
}
}
"supervisor should restart killed first-level server with restart strategy all_for_one" in {
val sup = getNestedSupervisorsAllForOneConf
sup ! Start
intercept(classOf[RuntimeException]) {
pingpong1 !!! (Die, throw new RuntimeException("TIME OUT"))
}
Thread.sleep(100)
expect("allforoneallforoneallforone") {
messageLog
}
}
// =============================================
// Creat some supervisors with different configurations
def getSingleActorAllForOneSupervisor: Supervisor = {
// Create an abstract SupervisorContainer that works for all implementations
// of the different Actors (Services).
//
// Then create a concrete container in which we mix in support for the specific
// implementation of the Actors we want to use.
object factory extends TestSupervisorFactory {
override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100),
Worker(
pingpong1,
LifeCycle(Permanent, 100))
:: Nil)
}
}
factory.newSupervisor
}
def getSingleActorOneForOneSupervisor: Supervisor = {
object factory extends TestSupervisorFactory {
override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100),
Worker(
pingpong1,
LifeCycle(Permanent, 100))
:: Nil)
}
}
factory.newSupervisor
}
def getMultipleActorsAllForOneConf: Supervisor = {
object factory extends TestSupervisorFactory {
override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100),
Worker(
pingpong1,
LifeCycle(Permanent, 100))
::
Worker(
pingpong2,
LifeCycle(Permanent, 100))
::
Worker(
pingpong3,
LifeCycle(Permanent, 100))
:: Nil)
}
}
factory.newSupervisor
}
def getMultipleActorsOneForOneConf: Supervisor = {
object factory extends TestSupervisorFactory {
override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100),
Worker(
pingpong1,
LifeCycle(Permanent, 100))
::
Worker(
pingpong2,
LifeCycle(Permanent, 100))
::
Worker(
pingpong3,
LifeCycle(Permanent, 100))
:: Nil)
}
}
factory.newSupervisor
}
def getNestedSupervisorsAllForOneConf: Supervisor = {
object factory extends TestSupervisorFactory {
override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100),
Worker(
pingpong1,
LifeCycle(Permanent, 100))
::
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100),
Worker(
pingpong2,
LifeCycle(Permanent, 100))
::
Worker(
pingpong3,
LifeCycle(Permanent, 100))
:: Nil)
:: Nil)
}
}
factory.newSupervisor
}
class PingPong1Actor extends GenericServer {
override def body: PartialFunction[Any, Unit] = {
case Ping =>
messageLog += "ping"
reply("pong")
case Die =>
throw new RuntimeException("Recieved Die message")
}
}
class PingPong2Actor extends GenericServer {
override def body: PartialFunction[Any, Unit] = {
case Ping =>
messageLog += "ping"
reply("pong")
case Die =>
throw new RuntimeException("Recieved Die message")
}
}
class PingPong3Actor extends GenericServer {
override def body: PartialFunction[Any, Unit] = {
case Ping =>
messageLog += "ping"
reply("pong")
case Die =>
throw new RuntimeException("Recieved Die message")
}
}
// =============================================
class TestAllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends AllForOneStrategy(maxNrOfRetries, withinTimeRange) {
override def postRestart(serverContainer: GenericServerContainer) = {
messageLog += "allforone"
}
}
class TestOneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends OneForOneStrategy(maxNrOfRetries, withinTimeRange) {
override def postRestart(serverContainer: GenericServerContainer) = {
messageLog += "oneforone"
}
}
abstract class TestSupervisorFactory extends SupervisorFactory {
override def create(strategy: RestartStrategy): Supervisor = strategy match {
case RestartStrategy(scheme, maxNrOfRetries, timeRange) =>
scheme match {
case AllForOne => new Supervisor(new TestAllForOneStrategy(maxNrOfRetries, timeRange))
case OneForOne => new Supervisor(new TestOneForOneStrategy(maxNrOfRetries, timeRange))
}
}
}
}