cast(message).
diff --git a/kernel/src/main/scala/Helpers.scala b/kernel/src/main/scala/Helpers.scala
index 0a02be63bb..265cef0a93 100644
--- a/kernel/src/main/scala/Helpers.scala
+++ b/kernel/src/main/scala/Helpers.scala
@@ -47,7 +47,7 @@ object Helpers extends Logging {
// implicit conversion between regular actor and actor with a type future
implicit def actorWithFuture(a: Actor) = new ActorWithTypedFuture(a)
- abstract class FutureWithTimeout[T](ch: InputChannel[Any]) extends Future[T](ch) {
+ abstract class FutureWithTimeout[T](ch: InputChannel[T]) extends Future[T](ch) {
def receiveWithin(timeout: Int) : Option[T]
override def respond(f: T => Unit): Unit = throw new UnsupportedOperationException("Does not support the Responder API")
}
@@ -63,8 +63,8 @@ object Helpers extends Logging {
require(a != null)
def !!: FutureWithTimeout[A] = {
- val ftch = new Channel[Any](Actor.self)
- a.send(msg, ftch)
+ val ftch = new Channel[A](Actor.self)
+ a.send(msg, ftch.asInstanceOf[OutputChannel[Any]])
new FutureWithTimeout[A](ftch) {
def apply() =
if (isSet) value.get.asInstanceOf[A]
diff --git a/kernel/src/main/scala/Kernel.scala b/kernel/src/main/scala/Kernel.scala
index 0596dcf2f1..e3e0541e89 100755
--- a/kernel/src/main/scala/Kernel.scala
+++ b/kernel/src/main/scala/Kernel.scala
@@ -9,9 +9,9 @@ package se.scalablesolutions.akka.kernel
//import org.apache.zookeeper.server.ServerConfig
//import org.apache.zookeeper.server.NIOServerCnxn
-import voldemort.client.{SocketStoreClientFactory, StoreClient, StoreClientFactory}
-import voldemort.server.{VoldemortConfig, VoldemortServer}
-import voldemort.versioning.Versioned
+//import voldemort.client.{SocketStoreClientFactory, StoreClient, StoreClientFactory}
+//import voldemort.server.{VoldemortConfig, VoldemortServer}
+//import voldemort.versioning.Versioned
import com.sun.grizzly.http.SelectorThread
import com.sun.jersey.api.container.grizzly.GrizzlyWebContainerFactory
@@ -36,29 +36,29 @@ object Kernel extends Logging {
val JERSEY_REST_CLASSES_ROOT_PACKAGE = "se.scalablesolutions.akka.kernel"
val JERSEY_BASE_URI = UriBuilder.fromUri(JERSEY_SERVER_URL).port(getPort(JERSEY_SERVER_PORT)).build()
+/*
val VOLDEMORT_SERVER_URL = "tcp://" + SERVER_URL
val VOLDEMORT_SERVER_PORT = 6666
val VOLDEMORT_BOOTSTRAP_URL = VOLDEMORT_SERVER_URL + ":" + VOLDEMORT_SERVER_PORT
-
val ZOO_KEEPER_SERVER_URL = SERVER_URL
val ZOO_KEEPER_SERVER_PORT = 9898
-
- private[this] var cassandraNode: CassandraNode = _
-
- //private[this] var storageFactory: StoreClientFactory = _
- //private[this] var storageServer: VoldemortServer = _
+ private[this] var storageFactory: StoreClientFactory = _
+ private[this] var storageServer: VoldemortServer = _
+*/
def main(args: Array[String]): Unit = {
log.info("Starting Akka kernel...")
- //startZooKeeper
- //startVoldemort
startCassandra
+ cassandraBenchmark
+
//val threadSelector = startJersey
-
// TODO: handle shutdown of Jersey in separate thread
// TODO: spawn main in new thread an communicate using socket
//System.in.read
//threadSelector.stopEndpoint
+
+ //startZooKeeper
+ //startVoldemort
}
private[akka] def startJersey: SelectorThread = {
@@ -70,10 +70,51 @@ object Kernel extends Logging {
}
private[akka] def startCassandra = {
- cassandraNode = new CassandraNode
- cassandraNode.start
+ CassandraNode.start
}
+ private def cassandraBenchmark = {
+ val NR_ENTRIES = 1000000
+
+ println("=================================================")
+ var start = System.currentTimeMillis
+ for (i <- 1 to NR_ENTRIES) CassandraNode.insertActorStorageEntry("test", i.toString, "data")
+ var end = System.currentTimeMillis
+ println("Writes per second: " + NR_ENTRIES / ((end - start).toDouble / 1000))
+
+ /*
+FIXME: batch_insert fails with the following exception:
+
+ERROR - Exception was generated at : 04/27/2009 15:26:35 on thread main
+[B cannot be cast to org.apache.cassandra.db.WriteResponse
+java.lang.ClassCastException: [B cannot be cast to org.apache.cassandra.db.WriteResponse
+ at org.apache.cassandra.service.WriteResponseResolver.resolve(WriteResponseResolver.java:50)
+ at org.apache.cassandra.service.WriteResponseResolver.resolve(WriteResponseResolver.java:31)
+ at org.apache.cassandra.service.QuorumResponseHandler.get(QuorumResponseHandler.java:101)
+ at org.apache.cassandra.service.StorageProxy.insertBlocking(StorageProxy.java:135)
+ at org.apache.cassandra.service.CassandraServer.batch_insert_blocking(CassandraServer.java:489)
+ at se.scalablesolutions.akka.kernel.CassandraNode$.insertHashEntries(CassandraNode.scala:59)
+ at se.scalablesolutions.akka.kernel.Kernel$.cassandraBenchmark(Kernel.scala:91)
+ at se.scalablesolutions.akka.kernel.Kernel$.main(Kernel.scala:52)
+ at se.scalablesolutions.akka.kernel.Kernel.main(Kernel.scala)
+
+ println("=================================================")
+ var start = System.currentTimeMillis
+ println(start)
+ val entries = new scala.collection.mutable.ArrayBuffer[Tuple2[String, String]]
+ for (i <- 1 to NR_ENTRIES) entries += (i.toString, "data")
+ CassandraNode.insertHashEntries("test", entries.toList)
+ var end = System.currentTimeMillis
+ println("Writes per second - batch: " + NR_ENTRIES / ((end - start).toDouble / 1000))
+ */
+ println("=================================================")
+ start = System.currentTimeMillis
+ for (i <- 1 to NR_ENTRIES) CassandraNode.getActorStorageEntryFor("test", i.toString)
+ end = System.currentTimeMillis
+ println("Reads per second: " + NR_ENTRIES / ((end - start).toDouble / 1000))
+
+ System.exit(0)
+ }
// private[akka] def startVoldemort = {
// // Start Voldemort server
// val config = VoldemortConfig.loadFromVoldemortHome(Boot.HOME)
diff --git a/kernel/src/main/scala/NetCat.scala b/kernel/src/main/scala/NetCat.scala
deleted file mode 100644
index ed6ec2b085..0000000000
--- a/kernel/src/main/scala/NetCat.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-package org.apache.mina.example.scala.netcat
-
-import _root_.scala.actors.Actor
-import _root_.scala.actors.Actor._
-import _root_.scala.actors.Exit
-import _root_.scala.collection.immutable
-
-import java.net.InetSocketAddress
-import java.nio.charset.Charset
-
-import org.apache.mina.common._
-import org.apache.mina.filter.codec.ProtocolCodecFilter
-import org.apache.mina.filter.codec.textline.TextLineCodecFactory
-import org.apache.mina.integration.scala.common._
-import org.apache.mina.integration.scala.common.IoHandlerEvent._
-import org.apache.mina.integration.scala.common.IoServiceEvent._
-import org.apache.mina.integration.scala.common.IoSessionCall._
-import org.apache.mina.integration.scala.common.IoSessionConfigOption._
-import org.apache.mina.integration.scala.util._
-import org.apache.mina.integration.scala.util.CallableActor._
-import org.apache.mina.transport.socket.nio.NioSocketConnector
-
-/**
- * (Entry point) NetCat client. NetCat client connects to the specified
- * endpoint and prints out received data. NetCat client disconnects
- * automatically when no data is read for 10 seconds.
- *
- * @author The Apache MINA Project (dev@mina.apache.org)
- * @version $Rev:$
- */
-object NetCat {
-
- def handleSession(session: Actor) = {
- loop {
- react {
- case Opened => {
- // Set reader idle time to 10 seconds.
- // sessionIdle(...) method will be invoked when no data is read
- // for 10 seconds.
- val config = immutable.Map.empty[Any, Any] + Tuple2(IdleTime(IdleStatus.READER_IDLE), 10)
- session.callReact(SetConfig(config)) {
- case OK(_) => ()
- case Error(cause) => exit(('setConfigFailed, cause))
- }
- }
- case Closed => {
- // Print out total number of bytes read from the remote peer.
- session.callReact(GetReadBytes) {
- case OK(readBytes) => {
- System.err.println
- System.err.println("Total " + readBytes + " byte(s)");
- exit()
- }
- case Error(cause) => exit(('getReadBytesFailed, cause))
- }
- }
- case Idle(status) => {
- // Close the connection if reader is idle.
- if (status == IdleStatus.READER_IDLE) {
- session.callReact(CloseOnFlush) {
- case OK(_) => exit()
- case Error(cause) => exit(('idleCloseFailed, cause))
- }
- }
- }
- case MessageReceived(buf: IoBuffer) => {
- // Print out read buffer content.
- while (buf.hasRemaining) {
- System.out.print(buf.get.asInstanceOf[char])
- }
- System.out.flush()
- }
- // Consume other IoHandlerEvents, or exit if something goes wrong.
- case ExceptionCaught(cause) => exit(('exceptionCaught, cause))
- case _: IoHandlerEvent => () // Consume
- case unexpected => exit(('unexpectedMessage, unexpected))
- }
- }
- }
-
- def main(args: Array[String]) {
- var host = System.getProperty("netcat.host")
- var portString = System.getProperty("netcat.port")
- if ((host eq null) || (portString eq null)) {
- // Read from command line
- if (args.length != 2) {
- System.out.println(this.getClass().getName() + "
- * override def body: PartialFunction[Any, Unit] = {
- * case Ping =>
- * println("got a ping")
- * reply("pong")
- *
- * case OneWay =>
- * println("got a oneway")
- * }
- *
- */
- def body: PartialFunction[Any, Unit]
-
- /**
- * Callback method that is called during initialization.
- * To be implemented by subclassing server.
- */
- def init(config: AnyRef) {}
-
- /**
- * Callback method that is called during reinitialization after a server crash.
- * To be implemented by subclassing server.
- */
- def reinit(config: AnyRef) {}
-
- /**
- * Callback method that is called during termination.
- * To be implemented by subclassing server.
- */
- def shutdown(reason: AnyRef) {}
-
- def act = loop { react { genericBase orElse actorBase } }
-
- private def actorBase: PartialFunction[Any, Unit] = hotswap getOrElse body
-
- private var hotswap: Option[PartialFunction[Any, Unit]] = None
-
- private val genericBase: PartialFunction[Any, Unit] = {
- case Init(config) => init(config)
- case ReInit(config) => reinit(config)
- case HotSwap(code) => hotswap = code
- case Shutdown(reason) => shutdown(reason); reply('success)
- case Terminate(reason) => exit(reason)
- }
-}
-
-/**
- * The container (proxy) for GenericServer, responsible for managing the life-cycle of the server;
- * such as shutdown, restart, re-initialization etc.
- * Each GenericServerContainer manages one GenericServer.
- *
- * @author Jonas Bonér
- */
-class GenericServerContainer(val id: String, var serverFactory: () => GenericServer) extends Logging {
- require(id != null && id != "")
-
- // TODO: see if we can parameterize class and add type safe getActor method
- //class GenericServerContainer[T <: GenericServer](var factory: () => T) {
- //def getActor: T = server
-
- var lifeCycle: Option[LifeCycle] = None
- val lock = new ReadWriteLock
-
- private var server: GenericServer = null
- private var currentConfig: Option[AnyRef] = None
- private var timeout = 5000
-
- /**
- * Sends a one way message to the server - alias for cast(message).
- * - * Example: - *
- * server ! Message - *- */ - def !(message: Any) = { - require(server != null) - lock.withReadLock { server ! message } - } - - /** - * Sends a message to the server returns a FutureWithTimeout holding the future reply . - *
- * Example: - *
- * val future = server !! Message
- * future.receiveWithin(100) match {
- * case None => ... // timed out
- * case Some(reply) => ... // handle reply
- * }
- *
- */
- def !: FutureWithTimeout[T] = {
- require(server != null)
- lock.withReadLock { server !!! message }
- }
-
- /**
- * Sends a message to the server and blocks indefinitely (no time out), waiting for the reply.
- * - * Example: - *
- * val result: String = server !? Message - *- */ - def !?[T](message: Any): T = { - require(server != null) - val future: Future[T] = lock.withReadLock { server.! } - Actor.receive { - case (future.ch ! arg) => arg.asInstanceOf[T] - } - } - - /** - * Sends a message to the server and gets a future back with the reply. Returns - * an Option with either Some(result) if succesful or None if timeout. - *
- * Timeout specified by the setTimeout(time: Int) method.
- *
- * Example: - *
- * (server !!! Message).getOrElse(throw new RuntimeException("time out")
- *
- */
- def !!: Option[T] = {
- require(server != null)
- val future: FutureWithTimeout[T] = lock.withReadLock { server !!! message }
- future.receiveWithin(timeout)
- }
-
- /**
- * Sends a message to the server and gets a future back with the reply.
- * - * Tries to get the reply within the timeout specified in the GenericServerContainer - * and else execute the error handler (which can return a default value, throw an exception - * or whatever is appropriate). - *
- * Example: - *
- * server !!! (Message, throw new RuntimeException("time out"))
- * // OR
- * server !!! (Message, DefaultReturnValue)
- *
- */
- def !!: T = !!!(message, errorHandler, timeout)
-
- /**
- * Sends a message to the server and gets a future back with the reply.
- * - * Tries to get the reply within the timeout specified as parameter to the method - * and else execute the error handler (which can return a default value, throw an exception - * or whatever is appropriate). - *
- * Example: - *
- * server !!! (Message, throw new RuntimeException("time out"), 1000)
- * // OR
- * server !!! (Message, DefaultReturnValue, 1000)
- *
- */
- def !!: T = {
- require(server != null)
- val future: FutureWithTimeout[T] = lock.withReadLock { server !!! message }
- future.receiveWithin(time) match {
- case None => errorHandler
- case Some(reply) => reply
- }
- }
-
- /**
- * Initializes the server by sending a Init(config) message.
- */
- def init(config: AnyRef) = lock.withWriteLock {
- currentConfig = Some(config)
- server ! Init(config)
- }
-
- /**
- * Re-initializes the server by sending a ReInit(config) message with the most recent configuration.
- */
- def reinit = lock.withWriteLock {
- currentConfig match {
- case Some(config) => server ! ReInit(config)
- case None => {}
- }
- }
-
- /**
- * Hotswaps the server body by sending it a HotSwap(code) with the new code
- * block (PartialFunction) to be executed.
- */
- def hotswap(code: Option[PartialFunction[Any, Unit]]) = lock.withReadLock { server ! HotSwap(code) }
-
- /**
- * Swaps the server factory, enabling creating of a completely new server implementation
- * (upon failure and restart).
- */
- def swapFactory(newFactory: () => GenericServer) = serverFactory = newFactory
-
- /**
- * Sets the timeout for the call(..) method, e.g. the maximum time to wait for a reply
- * before bailing out. Sets the timeout on the future return from the call to the server.
- */
- def setTimeout(time: Int) = timeout = time
-
- /**
- * Returns the next message in the servers mailbox.
- */
- def nextMessage = lock.withReadLock { server ? }
-
- /**
- * Creates a new actor for the GenericServerContainer, and return the newly created actor.
- */
- private[supervisor] def newServer(): GenericServer = lock.withWriteLock {
- server = serverFactory()
- server
- }
-
- /**
- * Starts the server.
- */
- private[supervisor] def start = lock.withReadLock { server.start }
-
- /**
- * Terminates the server with a reason by sending a Terminate(Some(reason)) message.
- */
- private[supervisor] def terminate(reason: AnyRef) = lock.withReadLock { server ! Terminate(reason) }
-
- /**
- * Terminates the server with a reason by sending a Terminate(Some(reason)) message,
- * the shutdownTime defines the maximal time to wait for the server to shutdown before
- * killing it.
- */
- private[supervisor] def terminate(reason: AnyRef, shutdownTime: Int) = lock.withReadLock {
- if (shutdownTime > 0) {
- log.debug("Waiting {} milliseconds for the server to shut down before killing it.", shutdownTime)
- server !? (shutdownTime, Shutdown(reason)) match {
- case Some('success) => log.debug("Server [{}] has been shut down cleanly.", id)
- case None => log.warning("Server [{}] was **not able** to complete shutdown cleanly within its configured shutdown time [{}]", id, shutdownTime)
- }
- }
- server ! Terminate(reason)
- }
-
- private[supervisor] def reconfigure(reason: AnyRef, restartedServer: GenericServer, supervisor: Supervisor) = lock.withWriteLock {
- server = restartedServer
- reinit
- }
-
- private[supervisor] def getServer: GenericServer = server
-}
-
diff --git a/supervisor/src/main/scala/Helpers.scala b/supervisor/src/main/scala/Helpers.scala
deleted file mode 100755
index 50dbeb0b98..0000000000
--- a/supervisor/src/main/scala/Helpers.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Copyright (C) 2009 Scalable Solutions.
- */
-
-package com.scalablesolutions.akka.supervisor
-
-import java.util.concurrent.locks.ReentrantReadWriteLock
-
-import scala.actors._
-import scala.actors.Actor._
-
-import net.lag.logging.Logger
-
-class SystemFailure(cause: Throwable) extends RuntimeException(cause)
-
-/**
- * Base trait for all classes that wants to be able use the logging infrastructure.
- *
- * @author Jonas Bonér
- */
-trait Logging {
- @transient val log = Logger.get(this.getClass.getName)
-}
-
-/**
- * @author Jonas Bonér
- */
-object Helpers extends Logging {
-
- // ================================================
- class ReadWriteLock {
- private val rwl = new ReentrantReadWriteLock
- private val readLock = rwl.readLock
- private val writeLock = rwl.writeLock
-
- def withWriteLock[T](body: => T): T = {
- writeLock.lock
- try {
- body
- } finally {
- writeLock.unlock
- }
- }
-
- def withReadLock[T](body: => T): T = {
- readLock.lock
- try {
- body
- } finally {
- readLock.unlock
- }
- }
- }
-
- // ================================================
- // implicit conversion between regular actor and actor with a type future
- implicit def actorWithFuture(a: Actor) = new ActorWithTypedFuture(a)
-
- abstract class FutureWithTimeout[T](ch: InputChannel[Any]) extends Future[T](ch) {
- def receiveWithin(timeout: Int) : Option[T]
- override def respond(f: T => Unit): Unit = throw new UnsupportedOperationException("Does not support the Responder API")
- }
-
- def receiveOrFail[T](future: => FutureWithTimeout[T], timeout: Int, errorHandler: => T): T = {
- future.receiveWithin(timeout) match {
- case None => errorHandler
- case Some(reply) => reply
- }
- }
-
- class ActorWithTypedFuture(a: Actor) {
- require(a != null)
-
- def !!: FutureWithTimeout[A] = {
- val ftch = new Channel[Any](Actor.self)
- a.send(msg, ftch)
- new FutureWithTimeout[A](ftch) {
- def apply() =
- if (isSet) value.get.asInstanceOf[A]
- else ch.receive {
- case a =>
- value = Some(a)
- value.get.asInstanceOf[A]
- }
- def isSet = receiveWithin(0).isDefined
- def receiveWithin(timeout: Int): Option[A] = value match {
- case None => ch.receiveWithin(timeout) {
- case TIMEOUT =>
- log.debug("Future timed out while waiting for actor: {}", a)
- None
- case a =>
- value = Some(a)
- value.asInstanceOf[Option[A]]
- }
- case a => a.asInstanceOf[Option[A]]
- }
- }
- }
- }
-}
-
diff --git a/supervisor/src/main/scala/Supervisor.scala b/supervisor/src/main/scala/Supervisor.scala
deleted file mode 100755
index 62f9d94ec1..0000000000
--- a/supervisor/src/main/scala/Supervisor.scala
+++ /dev/null
@@ -1,358 +0,0 @@
-/**
- * Copyright (C) 2009 Scalable Solutions.
- */
-
-package com.scalablesolutions.akka.supervisor
-
-import scala.actors._
-import scala.actors.Actor._
-import scala.collection.mutable.HashMap
-
-import com.scalablesolutions.akka.supervisor.Helpers._
-
-//====================================================
-
-/**
- * Configuration classes - not to be used as messages.
- *
- * @author Jonas Bonér
- */
-sealed abstract class ConfigElement
-
-abstract class Server extends ConfigElement
-abstract class FailOverScheme extends ConfigElement
-abstract class Scope extends ConfigElement
-
-case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server
-case class Worker(serverContainer: GenericServerContainer, lifeCycle: LifeCycle) extends Server
-
-case class RestartStrategy(scheme: FailOverScheme, maxNrOfRetries: Int, withinTimeRange: Int) extends ConfigElement
-
-case object AllForOne extends FailOverScheme
-case object OneForOne extends FailOverScheme
-
-case class LifeCycle(scope: Scope, shutdownTime: Int) extends ConfigElement
-case object Permanent extends Scope
-case object Transient extends Scope
-case object Temporary extends Scope
-
-//====================================================
-
-/**
- * Messages that the supervisor responds to and returns.
- *
- * @author Jonas Bonér
- */
-sealed abstract class SupervisorMessage
-case object Start extends SupervisorMessage
-case object Stop extends SupervisorMessage
-case class Configure(config: SupervisorConfig, factory: SupervisorFactory) extends SupervisorMessage
-
-/**
- * Abstract base class for all supervisor factories.
- * - * Example usage: - *
- * class MySupervisorFactory extends SupervisorFactory {
- *
- * override protected def getSupervisorConfig: SupervisorConfig = {
- * SupervisorConfig(
- * RestartStrategy(OneForOne, 3, 10),
- * Worker(
- * myFirstActorInstance,
- * LifeCycle(Permanent, 1000))
- * ::
- * Worker(
- * mySecondActorInstance,
- * LifeCycle(Permanent, 1000))
- * :: Nil)
- * }
- * }
- *
- *
- * Then create a concrete factory in which we mix in support for the specific implementation of the Service we want to use.
- *
- * - * object factory extends MySupervisorFactory - *- * - * Then create a new Supervisor tree with the concrete Services we have defined. - * - *
- * val supervisor = factory.newSupervisor - * supervisor ! Start // start up all managed servers - *- * - * @author Jonas Bonér - */ -abstract class SupervisorFactory extends Logging { - def newSupervisor: Supervisor = newSupervisorFor(getSupervisorConfig) - - def newSupervisorFor(config: SupervisorConfig): Supervisor = config match { - case SupervisorConfig(restartStrategy, _) => - val supervisor = create(restartStrategy) - supervisor.start - supervisor !? Configure(config, this) match { - case 'success => log.debug("Supervisor successfully configured") - case _ => log.error("Supervisor could not be configured") - } - supervisor - } - - /** - * To be overridden by concrete factory. - * Should return the SupervisorConfig for the supervisor. - */ - protected def getSupervisorConfig: SupervisorConfig - - protected def create(strategy: RestartStrategy): Supervisor = strategy match { - case RestartStrategy(scheme, maxNrOfRetries, timeRange) => - scheme match { - case AllForOne => new Supervisor(new AllForOneStrategy(maxNrOfRetries, timeRange)) - case OneForOne => new Supervisor(new OneForOneStrategy(maxNrOfRetries, timeRange)) - } - } -} - -//==================================================== -/** - * TODO: document - * - * @author Jonas Bonér - */ -class Supervisor(faultHandler: FaultHandlingStrategy) extends Actor with Logging { - - private val state = new SupervisorState(this, faultHandler) - - /** - * Returns an Option with the GenericServerContainer for the server with the name specified. - * If the server is found then Some(server) is returned else None. - */ - def getServer(id: String): Option[GenericServerContainer] = state.getServerContainer(id) - - /** - * Returns an the GenericServerContainer for the server with the name specified. - * If the server is not found then the error handler is invoked. - */ - def getServerOrElse(id: String, errorHandler: => GenericServerContainer): GenericServerContainer = { - getServer(id) match { - case Some(serverContainer) => serverContainer - case None => errorHandler - } - } - - def act = { - self.trapExit = true - loop { - react { - case Configure(config, factory) => - log.debug("Configuring supervisor:{} ", this) - configure(config, factory) - reply('success) - - case Start => - state.serverContainers.foreach { serverContainer => - serverContainer.start - log.info("Starting server: {}", serverContainer.getServer) - } - - case Stop => - state.serverContainers.foreach { serverContainer => - serverContainer.terminate('normal) - log.info("Stopping server: {}", serverContainer) - } - log.info("Stopping supervisor: {}", this) - exit('normal) - - case Exit(failedServer, reason) => - reason match { - case 'forced => {} // do nothing - case _ => state.faultHandler.handleFailure(state, failedServer, reason) - } - - case unexpected => log.warning("Unexpected message [{}], ignoring...", unexpected) - } - } - } - - private def configure(config: SupervisorConfig, factory: SupervisorFactory) = config match { - case SupervisorConfig(_, servers) => - servers.map(server => - server match { - case Worker(serverContainer, lifecycle) => - serverContainer.lifeCycle = Some(lifecycle) - spawnLink(serverContainer) - - case SupervisorConfig(_, _) => // recursive configuration - val supervisor = factory.newSupervisorFor(server.asInstanceOf[SupervisorConfig]) - supervisor ! Start - state.addSupervisor(supervisor) - }) - } - - private[supervisor] def spawnLink(serverContainer: GenericServerContainer): GenericServer = { - val newServer = serverContainer.newServer() - newServer.start - self.link(newServer) - log.debug("Linking actor [{}] to supervisor [{}]", newServer, this) - state.addServerContainer(serverContainer) - newServer - } -} - -//==================================================== -/** - * TODO: document - * - * @author Jonas Bonér - */ -abstract class FaultHandlingStrategy(val maxNrOfRetries: Int, val withinTimeRange: Int) extends Logging { - private[supervisor] var supervisor: Supervisor = _ - private var nrOfRetries = 0 - private var retryStartTime = currentTime - - private[supervisor] def handleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = { - nrOfRetries += 1 - if (timeRangeHasExpired) { - if (hasReachedMaximumNrOfRetries) { - log.info("Maximum of restarts [{}] for server [{}] has been reached - the supervisor including all its servers will now be shut down.", maxNrOfRetries, failedServer) - supervisor ! Stop // execution stops here - } else { - nrOfRetries = 0 - retryStartTime = currentTime - } - } - doHandleFailure(state, failedServer, reason) - } - - - private[supervisor] def restart(serverContainer: GenericServerContainer, reason: AnyRef, state: SupervisorState) = { - preRestart(serverContainer) - serverContainer.lock.withWriteLock { - - // TODO: this is the place to fail-over all pending messages in the failing actor's mailbox, if possible to get a hold of them - // e.g. something like 'serverContainer.getServer.getPendingMessages.map(newServer ! _)' - - self.unlink(serverContainer.getServer) - serverContainer.lifeCycle match { - case None => throw new IllegalStateException("Server [" + serverContainer.id + "] does not have a life-cycle defined.") - case Some(LifeCycle(scope, shutdownTime)) => - serverContainer.terminate(reason, shutdownTime) - - scope match { - case Permanent => - log.debug("Restarting server [{}] configured as PERMANENT.", serverContainer.id) - serverContainer.reconfigure(reason, supervisor.spawnLink(serverContainer), state.supervisor) - - case Temporary => - if (reason == 'normal) { - log.debug("Restarting server [{}] configured as TEMPORARY (since exited naturally).", serverContainer.id) - serverContainer.reconfigure(reason, supervisor.spawnLink(serverContainer), state.supervisor) - } else log.info("Server [{}] configured as TEMPORARY will not be restarted (received unnatural exit message).", serverContainer.id) - - case Transient => - log.info("Server [{}] configured as TRANSIENT will not be restarted.", serverContainer.id) - } - } - } - postRestart(serverContainer) - } - - /** - * To be overriden by concrete strategies. - */ - protected def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) - - /** - * To be overriden by concrete strategies. - */ - protected def preRestart(serverContainer: GenericServerContainer) = {} - - /** - * To be overriden by concrete strategies. - */ - protected def postRestart(serverContainer: GenericServerContainer) = {} - - private def hasReachedMaximumNrOfRetries: Boolean = nrOfRetries > maxNrOfRetries - private def timeRangeHasExpired: Boolean = (currentTime - retryStartTime) > withinTimeRange - private def currentTime: Long = System.currentTimeMillis -} - -//==================================================== -/** - * TODO: document - * - * @author Jonas Bonér - */ -class AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) -extends FaultHandlingStrategy(maxNrOfRetries, withinTimeRange) { - override def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = { - log.error("Server [{}] has failed due to [{}] - scheduling restart - scheme: ALL_FOR_ONE.", failedServer, reason) - for (serverContainer <- state.serverContainers) restart(serverContainer, reason, state) - state.supervisors.foreach(_ ! Exit(failedServer, reason)) - } -} - -//==================================================== -/** - * TODO: document - * - * @author Jonas Bonér - */ -class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) -extends FaultHandlingStrategy(maxNrOfRetries, withinTimeRange) { - override def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = { - log.error("Server [{}] has failed due to [{}] - scheduling restart - scheme: ONE_FOR_ONE.", failedServer, reason) - var serverContainer: Option[GenericServerContainer] = None - state.serverContainers.foreach { - container => if (container.getServer == failedServer) serverContainer = Some(container) - } - serverContainer match { - case None => throw new RuntimeException("Could not find a generic server for actor: " + failedServer) - case Some(container) => restart(container, reason, state) - } - } -} - -//==================================================== -/** - * TODO: document - * - * @author Jonas Bonér - */ -private[supervisor] class SupervisorState(val supervisor: Supervisor, val faultHandler: FaultHandlingStrategy) extends Logging { - faultHandler.supervisor = supervisor - - private val _lock = new ReadWriteLock - private val _serverContainerRegistry = new HashMap[String, GenericServerContainer] - private var _supervisors: List[Supervisor] = Nil - - def supervisors: List[Supervisor] = _lock.withReadLock { - _supervisors - } - - def addSupervisor(supervisor: Supervisor) = _lock.withWriteLock { - _supervisors = supervisor :: _supervisors - } - - def serverContainers: List[GenericServerContainer] = _lock.withReadLock { - _serverContainerRegistry.values.toList - } - - def getServerContainer(id: String): Option[GenericServerContainer] = _lock.withReadLock { - if (_serverContainerRegistry.contains(id)) Some(_serverContainerRegistry(id)) - else None - } - - def addServerContainer(serverContainer: GenericServerContainer) = _lock.withWriteLock { - _serverContainerRegistry += serverContainer.id -> serverContainer - } - - def removeServerContainer(id: String) = _lock.withWriteLock { - getServerContainer(id) match { - case Some(serverContainer) => _serverContainerRegistry - id - case None => {} - } - } -} - diff --git a/supervisor/test-code/test/scala/GenericServerContainerSuite.scala b/supervisor/test-code/test/scala/GenericServerContainerSuite.scala deleted file mode 100755 index 1b85425d74..0000000000 --- a/supervisor/test-code/test/scala/GenericServerContainerSuite.scala +++ /dev/null @@ -1,202 +0,0 @@ -/** - * Copyright (C) 2009 Scalable Solutions. - */ - -package com.scalablesolutions.akka.supervisor - -import org.specs.runner.JUnit4 -import org.specs.Specification - -import scala.actors._ -import scala.actors.Actor._ - -/** - * @author Jonas Bonér - */ -class GenericServerContainerTest extends JUnit4(genericServerContainerSpec) // for JUnit4 and Maven -object genericServerContainerSpec extends Specification { - - var inner: GenericServerContainerActor = null - var server: GenericServerContainer = null - def createProxy(f: () => GenericServer) = { - val server = new GenericServerContainer("server", f) - server.setTimeout(100) - server - } - - inner = new GenericServerContainerActor - server = createProxy(() => inner) - server.newServer - server.start - - "server should be initialized" in { - server.init("testInit") - Thread.sleep(100) - expect("initializing: testInit") { - inner.log - } - } - - "server should terminate with a reason " in { - server.terminate("testTerminateWithReason", 100) - Thread.sleep(100) - expect("terminating: testTerminateWithReason") { - inner.log - } - } - - "server respond to async oneway message" in { - server ! OneWay - Thread.sleep(100) - expect("got a oneway") { - inner.log - } - } - - "server respond to async ping message" in { - server ! Ping - Thread.sleep(100) - expect("got a ping") { - inner.log - } - } - - "server respond to !!!" in { - expect("pong") { - (server !!! Ping).getOrElse("nil") - } - expect("got a ping") { - inner.log - } - } - - "server respond to !?" in { - expect("pong") { - val res: String = server !? Ping - res - } - expect("got a ping") { - inner.log - } - } - - "server respond to !!! with timeout" in { - expect("pong") { - (server !!! Ping).getOrElse("nil") - } - expect("got a ping") { - inner.log - } - } - - "server respond to !!! with timeout" in { - expect("error handler") { - server !!! (OneWay, "error handler") - } - expect("got a oneway") { - inner.log - } - } - - "server respond to !!! and return future with timeout" in { - val future = server !! Ping - future.receiveWithin(100) match { - case None => fail("timed out") // timed out - case Some(reply) => - expect("got a ping") { - inner.log - } - assert("pong" === reply) - } - } - - "server respond to !!! and return future with timeout" in { - val future = server !! OneWay - future.receiveWithin(100) match { - case None => - expect("got a oneway") { - inner.log - } - case Some(reply) => - fail("expected a timeout, got Some(reply)") - } - } - - "server respond do hotswap" in { - // using base - expect("pong") { - (server !!! Ping).getOrElse("nil") - } - - // hotswapping - server.hotswap(Some({ - case Ping => reply("hotswapped pong") - })) - expect("hotswapped pong") { - (server !!! Ping).getOrElse("nil") - } - } - - "server respond do double hotswap" in { - // using base - expect("pong") { - (server !!! Ping).getOrElse("nil") - } - - // hotswapping - server.hotswap(Some({ - case Ping => reply("hotswapped pong") - })) - expect("hotswapped pong") { - (server !!! Ping).getOrElse("nil") - } - - // hotswapping again - server.hotswap(Some({ - case Ping => reply("hotswapped pong again") - })) - expect("hotswapped pong again") { - (server !!! Ping).getOrElse("nil") - } - } - - "server respond do hotswap and then revert" in { - // using base - expect("pong") { - (server !!! Ping).getOrElse("nil") - } - - // hotswapping - server.hotswap(Some({ - case Ping => reply("hotswapped pong") - })) - expect("hotswapped pong") { - (server !!! Ping).getOrElse("nil") - } - - // restoring original base - server.hotswap(None) - expect("pong") { - (server !!! Ping).getOrElse("nil") - } - } -} - - -class GenericServerContainerActor extends GenericServer { - var log = "" - - override def body: PartialFunction[Any, Unit] = { - case Ping => - log = "got a ping" - reply("pong") - - case OneWay => - log = "got a oneway" - } - - override def init(config: AnyRef) = log = "initializing: " + config - override def shutdown(reason: AnyRef) = log = "terminating: " + reason -} - - diff --git a/supervisor/test-code/test/scala/GenericServerSuite.scala b/supervisor/test-code/test/scala/GenericServerSuite.scala deleted file mode 100755 index 44aab326eb..0000000000 --- a/supervisor/test-code/test/scala/GenericServerSuite.scala +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Copyright (C) 2009 Scalable Solutions. - */ - -package com.scalablesolutions.akka.supervisor - -import org.specs.runner.JUnit4 -import org.specs.Specification - -import scala.actors._ -import scala.actors.Actor._ - -/** - * @author Jonas Bonér - */ -class GenericServerTest extends JUnit4(genericServerSpec) // for JUnit4 and Maven -object genericServerSpec extends Specification { - - "server should respond to a regular message" in { - val server = new TestGenericServerActor - server.start - server !? Ping match { - case reply: String => - assert("got a ping" === server.log) - assert("pong" === reply) - case _ => fail() - } - } -} - -class TestGenericServerActor extends GenericServer { - var log: String = "" - - override def body: PartialFunction[Any, Unit] = { - case Ping => - log = "got a ping" - reply("pong") - } -} - diff --git a/supervisor/test-code/test/scala/Messages.scala b/supervisor/test-code/test/scala/Messages.scala deleted file mode 100755 index 0f014ca692..0000000000 --- a/supervisor/test-code/test/scala/Messages.scala +++ /dev/null @@ -1,12 +0,0 @@ -/** - * Copyright (C) 2009 Scalable Solutions. - */ - -package com.scalablesolutions.akka.supervisor - -sealed abstract class TestMessage -case object Ping extends TestMessage -case object Pong extends TestMessage -case object OneWay extends TestMessage -case object Die extends TestMessage -case object NotifySupervisorExit extends TestMessage diff --git a/supervisor/test-code/test/scala/SupervisorStateSuite.scala b/supervisor/test-code/test/scala/SupervisorStateSuite.scala deleted file mode 100755 index 6df3b7e059..0000000000 --- a/supervisor/test-code/test/scala/SupervisorStateSuite.scala +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Copyright (C) 2009 Scalable Solutions. - */ - -package com.scalablesolutions.akka.supervisor - -import org.specs.runner.JUnit4 -import org.specs.Specification - -import scala.actors._ -import scala.actors.Actor._ - -/** - * @author Jonas Bonér - */ -class SupervisorStateTest extends JUnit4(supervisorStateSpec) // for JUnit4 and Maven -object supervisorStateSpec extends Specification { - val dummyActor = new GenericServer { override def body: PartialFunction[Any, Unit] = { case _ => }} - val newDummyActor = () => dummyActor - var state: SupervisorState = _ - var proxy: GenericServerContainer = _ - var supervisor: Supervisor = _ - - proxy = new GenericServerContainer("server1", newDummyActor) - object factory extends SupervisorFactory { - override def getSupervisorConfig: SupervisorConfig = { - SupervisorConfig( - RestartStrategy(AllForOne, 3, 100), - Worker( - proxy, - LifeCycle(Permanent, 100)) - :: Nil) - } - } - - supervisor = factory.newSupervisor - state = new SupervisorState(supervisor, new AllForOneStrategy(3, 100)) - - "supervisor state should return added server" in { - state.addServerContainer(proxy) - state.getServerContainer("server1") match { - case None => fail("should have returned server") - case Some(server) => - assert(server != null) - assert(server.isInstanceOf[GenericServerContainer]) - assert(proxy === server) - } - } - - "supervisor state should remove added server" in { - state.addServerContainer(proxy) - - state.removeServerContainer("server1") - state.getServerContainer("server1") match { - case Some(_) => fail("should have returned None") - case None => - } - state.getServerContainer("dummyActor") match { - case Some(_) => fail("should have returned None") - case None => - } - } - - "supervisor state should fail getting non-existent server by symbol" in { - state.getServerContainer("server2") match { - case Some(_) => fail("should have returned None") - case None => - } - } - - "supervisor state should fail getting non-existent server by actor" in { - state.getServerContainer("dummyActor") match { - case Some(_) => fail("should have returned None") - case None => - } - } -} diff --git a/supervisor/test-code/test/scala/SupervisorSuite.scala b/supervisor/test-code/test/scala/SupervisorSuite.scala deleted file mode 100755 index 4e8bd048e1..0000000000 --- a/supervisor/test-code/test/scala/SupervisorSuite.scala +++ /dev/null @@ -1,434 +0,0 @@ -/** - * Copyright (C) 2009 Scalable Solutions. - */ - -package com.scalablesolutions.akka.supervisor - -import org.specs.runner.JUnit4 -import org.specs.Specification - -import scala.actors._ -import scala.actors.Actor._ -import scala.collection.Map -import scala.collection.mutable.HashMap - -/** - * @author Jonas Bonér - */ -class SupervisorTest extends JUnit4(supervisorSpec) // for JUnit4 and Maven -object supervisorSpec extends Specification { - - var messageLog: String = "" - val pingpong1 = new GenericServerContainer("pingpong1", () => new PingPong1Actor) - val pingpong2 = new GenericServerContainer("pingpong2", () => new PingPong2Actor) - val pingpong3 = new GenericServerContainer("pingpong3", () => new PingPong3Actor) - - pingpong1.setTimeout(100) - pingpong2.setTimeout(100) - pingpong3.setTimeout(100) - - @BeforeMethod - def setup = messageLog = "" - - // =========================================== - "starting supervisor should start the servers" in { - val sup = getSingleActorAllForOneSupervisor - sup ! Start - - expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") - } - } - - // =========================================== - "started supervisor should be able to return started servers" in { - val sup = getSingleActorAllForOneSupervisor - sup ! Start - val server = sup.getServerOrElse("pingpong1", throw new RuntimeException("server not found")) - assert(server.isInstanceOf[GenericServerContainer]) - assert(server === pingpong1) - } - - // =========================================== - "started supervisor should fail returning non-existing server" in { - val sup = getSingleActorAllForOneSupervisor - sup ! Start - intercept(classOf[RuntimeException]) { - sup.getServerOrElse("wrong_name", throw new RuntimeException("server not found")) - } - } - - // =========================================== - "supervisor should restart killed server with restart strategy one_for_one" in { - val sup = getSingleActorOneForOneSupervisor - sup ! Start - - intercept(classOf[RuntimeException]) { - pingpong1 !!! (Die, throw new RuntimeException("TIME OUT")) - } - Thread.sleep(100) - expect("oneforone") { - messageLog - } - } - - // =========================================== - "supervisor should restart used killed server with restart strategy one_for_one" in { - val sup = getSingleActorOneForOneSupervisor - sup ! Start - - expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") - } - Thread.sleep(100) - expect("ping") { - messageLog - } - intercept(classOf[RuntimeException]) { - pingpong1 !!! (Die, throw new RuntimeException("TIME OUT")) - } - Thread.sleep(100) - expect("pingoneforone") { - messageLog - } - expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") - } - Thread.sleep(100) - expect("pingoneforoneping") { - messageLog - } - } - - // =========================================== - "supervisor should restart killed server with restart strategy all_for_one" in { - val sup = getSingleActorAllForOneSupervisor - sup ! Start - intercept(classOf[RuntimeException]) { - pingpong1 !!! (Die, throw new RuntimeException("TIME OUT")) - } - Thread.sleep(100) - expect("allforone") { - messageLog - } - } - - // =========================================== - "supervisor should restart used killed server with restart strategy all_for_one" in { - val sup = getSingleActorAllForOneSupervisor - sup ! Start - expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") - } - Thread.sleep(100) - expect("ping") { - messageLog - } - intercept(classOf[RuntimeException]) { - pingpong1 !!! (Die, throw new RuntimeException("TIME OUT")) - } - Thread.sleep(100) - expect("pingallforone") { - messageLog - } - expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") - } - Thread.sleep(100) - expect("pingallforoneping") { - messageLog - } - } - - // =========================================== - "supervisor should restart killed multiple servers with restart strategy one_for_one" in { - val sup = getMultipleActorsOneForOneConf - sup ! Start - intercept(classOf[RuntimeException]) { - pingpong3 !!! (Die, throw new RuntimeException("TIME OUT")) - } - Thread.sleep(100) - expect("oneforone") { - messageLog - } - } - - // =========================================== - "supervisor should restart killed multiple servers with restart strategy one_for_one" in { - val sup = getMultipleActorsOneForOneConf - sup ! Start - expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") - } - Thread.sleep(100) - expect("pong") { - (pingpong2 !!! Ping).getOrElse("nil") - } - Thread.sleep(100) - expect("pong") { - (pingpong3 !!! Ping).getOrElse("nil") - } - Thread.sleep(100) - expect("pingpingping") { - messageLog - } - intercept(classOf[RuntimeException]) { - pingpong2 !!! (Die, throw new RuntimeException("TIME OUT")) - } - Thread.sleep(100) - expect("pingpingpingoneforone") { - messageLog - } - expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") - } - Thread.sleep(100) - expect("pong") { - (pingpong2 !!! Ping).getOrElse("nil") - } - Thread.sleep(100) - expect("pong") { - (pingpong3 !!! Ping).getOrElse("nil") - } - Thread.sleep(100) - expect("pingpingpingoneforonepingpingping") { - messageLog - } - } - - // =========================================== - "supervisor should restart killed muliple servers with restart strategy all_for_one" in { - val sup = getMultipleActorsAllForOneConf - sup ! Start - intercept(classOf[RuntimeException]) { - pingpong2 !!! (Die, throw new RuntimeException("TIME OUT")) - } - Thread.sleep(100) - expect("allforoneallforoneallforone") { - messageLog - } - } - - // =========================================== - "supervisor should restart killed muliple servers with restart strategy all_for_one" in { - val sup = getMultipleActorsAllForOneConf - sup ! Start - expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") - } - Thread.sleep(100) - expect("pong") { - (pingpong2 !!! Ping).getOrElse("nil") - } - Thread.sleep(100) - expect("pong") { - (pingpong3 !!! Ping).getOrElse("nil") - } - Thread.sleep(100) - expect("pingpingping") { - messageLog - } - intercept(classOf[RuntimeException]) { - pingpong2 !!! (Die, throw new RuntimeException("TIME OUT")) - } - Thread.sleep(100) - expect("pingpingpingallforoneallforoneallforone") { - messageLog - } - expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") - } - Thread.sleep(100) - expect("pong") { - (pingpong2 !!! Ping).getOrElse("nil") - } - Thread.sleep(100) - expect("pong") { - (pingpong3 !!! Ping).getOrElse("nil") - } - Thread.sleep(100) - expect("pingpingpingallforoneallforoneallforonepingpingping") { - messageLog - } - } - - "supervisor should restart killed first-level server with restart strategy all_for_one" in { - val sup = getNestedSupervisorsAllForOneConf - sup ! Start - intercept(classOf[RuntimeException]) { - pingpong1 !!! (Die, throw new RuntimeException("TIME OUT")) - } - Thread.sleep(100) - expect("allforoneallforoneallforone") { - messageLog - } - } - - - // ============================================= - // Creat some supervisors with different configurations - - def getSingleActorAllForOneSupervisor: Supervisor = { - - // Create an abstract SupervisorContainer that works for all implementations - // of the different Actors (Services). - // - // Then create a concrete container in which we mix in support for the specific - // implementation of the Actors we want to use. - - object factory extends TestSupervisorFactory { - override def getSupervisorConfig: SupervisorConfig = { - SupervisorConfig( - RestartStrategy(AllForOne, 3, 100), - Worker( - pingpong1, - LifeCycle(Permanent, 100)) - :: Nil) - } - } - factory.newSupervisor - } - - def getSingleActorOneForOneSupervisor: Supervisor = { - object factory extends TestSupervisorFactory { - override def getSupervisorConfig: SupervisorConfig = { - SupervisorConfig( - RestartStrategy(OneForOne, 3, 100), - Worker( - pingpong1, - LifeCycle(Permanent, 100)) - :: Nil) - } - } - factory.newSupervisor - } - - def getMultipleActorsAllForOneConf: Supervisor = { - object factory extends TestSupervisorFactory { - override def getSupervisorConfig: SupervisorConfig = { - SupervisorConfig( - RestartStrategy(AllForOne, 3, 100), - Worker( - pingpong1, - LifeCycle(Permanent, 100)) - :: - Worker( - pingpong2, - LifeCycle(Permanent, 100)) - :: - Worker( - pingpong3, - LifeCycle(Permanent, 100)) - :: Nil) - } - } - factory.newSupervisor - } - - def getMultipleActorsOneForOneConf: Supervisor = { - object factory extends TestSupervisorFactory { - override def getSupervisorConfig: SupervisorConfig = { - SupervisorConfig( - RestartStrategy(OneForOne, 3, 100), - Worker( - pingpong1, - LifeCycle(Permanent, 100)) - :: - Worker( - pingpong2, - LifeCycle(Permanent, 100)) - :: - Worker( - pingpong3, - LifeCycle(Permanent, 100)) - :: Nil) - } - } - factory.newSupervisor - } - - def getNestedSupervisorsAllForOneConf: Supervisor = { - object factory extends TestSupervisorFactory { - override def getSupervisorConfig: SupervisorConfig = { - SupervisorConfig( - RestartStrategy(AllForOne, 3, 100), - Worker( - pingpong1, - LifeCycle(Permanent, 100)) - :: - SupervisorConfig( - RestartStrategy(AllForOne, 3, 100), - Worker( - pingpong2, - LifeCycle(Permanent, 100)) - :: - Worker( - pingpong3, - LifeCycle(Permanent, 100)) - :: Nil) - :: Nil) - } - } - factory.newSupervisor - } - - class PingPong1Actor extends GenericServer { - override def body: PartialFunction[Any, Unit] = { - case Ping => - messageLog += "ping" - reply("pong") - case Die => - throw new RuntimeException("Recieved Die message") - } - } - - class PingPong2Actor extends GenericServer { - override def body: PartialFunction[Any, Unit] = { - case Ping => - messageLog += "ping" - reply("pong") - case Die => - throw new RuntimeException("Recieved Die message") - } - } - - class PingPong3Actor extends GenericServer { - override def body: PartialFunction[Any, Unit] = { - case Ping => - messageLog += "ping" - reply("pong") - case Die => - throw new RuntimeException("Recieved Die message") - } - } - - // ============================================= - - class TestAllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends AllForOneStrategy(maxNrOfRetries, withinTimeRange) { - override def postRestart(serverContainer: GenericServerContainer) = { - messageLog += "allforone" - } - } - - class TestOneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends OneForOneStrategy(maxNrOfRetries, withinTimeRange) { - override def postRestart(serverContainer: GenericServerContainer) = { - messageLog += "oneforone" - } - } - - abstract class TestSupervisorFactory extends SupervisorFactory { - override def create(strategy: RestartStrategy): Supervisor = strategy match { - case RestartStrategy(scheme, maxNrOfRetries, timeRange) => - scheme match { - case AllForOne => new Supervisor(new TestAllForOneStrategy(maxNrOfRetries, timeRange)) - case OneForOne => new Supervisor(new TestOneForOneStrategy(maxNrOfRetries, timeRange)) - } - } - } -} - - - - - - diff --git a/util-java/util-java.iml b/util-java/util-java.iml index 723223dded..138b161455 100755 --- a/util-java/util-java.iml +++ b/util-java/util-java.iml @@ -1,12 +1,12 @@ - -