- * def receive: PartialFunction[Any, Unit] = {
- * case Ping =>
- * println("got a ping")
- * reply("pong")
- *
- * case OneWay =>
- * println("got a oneway")
- *
- * case _ =>
- * println("unknown message, ignoring")
- * }
- *
- */
- protected def receive: PartialFunction[Any, Unit]
-
- /**
- * Mandatory callback method that is called during restart and reinitialization after a server crash.
- * To be implemented by subclassing actor.
- */
- protected def restart(config: Option[AnyRef])
-
- /**
- * Optional callback method that is called during initialization.
- * To be implemented by subclassing actor.
- */
- protected def init(config: AnyRef) {}
-
- /**
- * Optional callback method that is called during termination.
- * To be implemented by subclassing actor.
- */
- protected def shutdown(reason: AnyRef) {}
-
- // =============
- // ==== API ====
- // =============
-
- def !(message: AnyRef) =
- if (isRunning) mailbox.append(new MessageHandle(this, message, new NullFutureResult))
- else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
-
- def !(implicit timeout: Long): Option[T] = if (isRunning) {
- val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
- future.await_?
- getResultOrThrowException(future)
- } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
-
- def !?[T](message: AnyRef): Option[T] = if (isRunning) {
- val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, 0)
- future.await_!
- getResultOrThrowException(future)
- } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
-
- def link(actor: Actor) =
- if (isRunning) linkedActors.add(actor)
- else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
-
- def unlink(actor: Actor) =
- if (isRunning) linkedActors.remove(actor)
- else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
-
- def start = synchronized {
- if (!isRunning) {
- dispatcherType match {
- case EventBased =>
- dispatcher = new EventBasedSingleThreadDispatcher
- case ThreadBased =>
- dispatcher = new EventBasedThreadPoolDispatcher
- }
- mailbox = dispatcher.messageQueue
- dispatcher.registerHandler(this, new ActorMessageHandler(this))
- dispatcher.start
- isRunning = true
- }
- }
-
- def stop =
- if (isRunning) {
- this ! Stop("Actor gracefully stopped")
- dispatcher.unregisterHandler(this)
- isRunning = false
- } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
-
- protected def reply(message: AnyRef) = senderFuture match {
- case None => throw new IllegalStateException("No sender future in scope, can't reply")
- case Some(future) => future.completeWithResult(message)
- }
-
- // ================================
- // ==== IMPLEMENTATION DETAILS ====
- // ================================
-
- private def postMessageToMailboxAndCreateFutureResultWithTimeout(
- message: AnyRef, timeout: Long): CompletableFutureResult = {
- val future = new DefaultCompletableFutureResult(timeout)
- mailbox.append(new MessageHandle(this, message, future))
- future
- }
-
- private def getResultOrThrowException[T](future: FutureResult): Option[T] =
- if (future.exception.isDefined) throw future.exception.get
- else future.result.asInstanceOf[Option[T]]
-
- private[kernel] def handle(message: AnyRef, future: CompletableFutureResult) = {
- try {
- senderFuture = Some(future)
- if (base.isDefinedAt(message)) base(message)
- else throw new IllegalArgumentException("No handler matching message [" + message + "] in actor [" + this.getClass.getName + "]")
- } catch {
- case e =>
- future.completeWithException(e)
- handleFailure(this, e)
- }
-/*
- try {
- val result = message.asInstanceOf[Invocation].joinpoint.proceed
- future.completeWithResult(result)
- } catch {
- case e: Exception => future.completeWithException(e)
- }
-*/
- }
-
- private def base: PartialFunction[Any, Unit] = lifeCycle orElse (hotswap getOrElse receive)
-
- private val lifeCycle: PartialFunction[Any, Unit] = {
- case Init(config) => init(config)
- case HotSwap(code) => hotswap = code
- case Restart => restart(config)
- case Stop(reason) => shutdown(reason); exit
- case Exit(dead, reason) => handleFailure(dead, reason)
- }
-
- private[this] def handleFailure(dead: Actor, e: Throwable) = {
- if (trapExit) {
- restartLinkedActors
- scheduleRestart
- } else linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach(_ ! Exit(this, e))
- }
-
- private[this] def restartLinkedActors = linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach(_.scheduleRestart)
-
- private[Actor] def scheduleRestart = mailbox.prepend(new MessageHandle(this, Restart, new NullFutureResult))
-}
\ No newline at end of file
diff --git a/kernel/src/main/scala/Boot.scala b/kernel/src/main/scala/Boot.scala
index 9eaef003af..5b917f1435 100644
--- a/kernel/src/main/scala/Boot.scala
+++ b/kernel/src/main/scala/Boot.scala
@@ -4,11 +4,10 @@
package se.scalablesolutions.akka
-import kernel.Logging
-
import java.io.File
import java.lang.reflect.Method
import java.net.{URL, URLClassLoader}
+import kernel.util.Logging
/**
* Bootstraps the Akka server by isolating the server classes and all its dependency JARs into its own classloader.
diff --git a/kernel/src/main/scala/GenericServer.scala b/kernel/src/main/scala/GenericServer.scala
deleted file mode 100644
index b66b27b932..0000000000
--- a/kernel/src/main/scala/GenericServer.scala
+++ /dev/null
@@ -1,292 +0,0 @@
-/**
- * Copyright (C) 2009 Scalable Solutions.
- */
-
-package se.scalablesolutions.akka.kernel
-
-import scala.actors._
-import scala.actors.Actor._
-
-import se.scalablesolutions.akka.kernel.config.ScalaConfig._
-import se.scalablesolutions.akka.kernel.Helpers._
-
-sealed abstract class GenericServerMessage
-case class Init(config: AnyRef) extends GenericServerMessage
-case class ReInit(config: AnyRef) extends GenericServerMessage
-case class Shutdown(reason: AnyRef) extends GenericServerMessage
-case class Terminate(reason: AnyRef) extends GenericServerMessage
-case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends GenericServerMessage
-
-/**
- * Base trait for all user-defined servers.
- *
- * @author Jonas Bonér
- */
-trait GenericServer extends Actor {
- /**
- * Template method implementing the server logic.
- * To be implemented by subclassing server.
- *
- * Example code:
- *
- * override def body: PartialFunction[Any, Unit] = {
- * case Ping =>
- * println("got a ping")
- * reply("pong")
- *
- * case OneWay =>
- * println("got a oneway")
- * }
- *
- */
- def body: PartialFunction[Any, Unit]
-
- /**
- * Callback method that is called during initialization.
- * To be implemented by subclassing server.
- */
- def init(config: AnyRef) {}
-
- /**
- * Callback method that is called during reinitialization after a server crash.
- * To be implemented by subclassing server.
- */
- def reinit(config: AnyRef) {}
-
- /**
- * Callback method that is called during termination.
- * To be implemented by subclassing server.
- */
- def shutdown(reason: AnyRef) {}
-
- def act = loop { react { genericBase orElse base } }
-
- private def base: PartialFunction[Any, Unit] = hotswap getOrElse body
-
- private var hotswap: Option[PartialFunction[Any, Unit]] = None
-
- private val genericBase: PartialFunction[Any, Unit] = {
- case Init(config) => init(config)
- case ReInit(config) => reinit(config)
- case HotSwap(code) => hotswap = code
- case Shutdown(reason) => shutdown(reason); reply('success)
- case Terminate(reason) => exit(reason)
- }
-}
-
-/**
- * The container (proxy) for GenericServer, responsible for managing the life-cycle of the server;
- * such as shutdown, restart, re-initialization etc.
- * Each GenericServerContainer manages one GenericServer.
- *
- * @author Jonas Bonér
- */
-class GenericServerContainer(
- val id: String,
- private[kernel] var serverFactory: () => GenericServer) extends Logging {
- require(id != null && id != "")
-
- private[kernel] var lifeCycle: Option[LifeCycle] = None
- private[kernel] val lock = new ReadWriteLock
- private[kernel] val txItemsLock = new ReadWriteLock
- private[kernel] val serializer = new JavaSerializationSerializer
-
- private var server: GenericServer = _
- private var currentConfig: Option[AnyRef] = None
- private[kernel] var timeout = 5000
-
- private[kernel] def transactionalItems: List[Transactional] = txItemsLock.withReadLock {
- _transactionalMaps ::: _transactionalVectors ::: _transactionalRefs
- }
-
- // TX Maps
- private[kernel] var _transactionalMaps: List[TransactionalMap[_, _]] = Nil
- private[kernel] def transactionalMaps_=(maps: List[TransactionalMap[_, _]]) = txItemsLock.withWriteLock {
- _transactionalMaps = maps
- }
- private[kernel] def transactionalMaps: List[TransactionalMap[_, _]] = txItemsLock.withReadLock {
- _transactionalMaps
- }
-
- // TX Vectors
- private[kernel] var _transactionalVectors: List[TransactionalVector[_]] = Nil
- private[kernel] def transactionalVectors_=(vectors: List[TransactionalVector[_]]) = txItemsLock.withWriteLock {
- _transactionalVectors = vectors
- }
- private[kernel] def transactionalVectors: List[TransactionalVector[_]] = txItemsLock.withReadLock {
- _transactionalVectors
- }
-
- // TX Refs
- private[kernel] var _transactionalRefs: List[TransactionalRef[_]] = Nil
- private[kernel] def transactionalRefs_=(refs: List[TransactionalRef[_]]) = txItemsLock.withWriteLock {
- _transactionalRefs = refs
- }
- private[kernel] def transactionalRefs: List[TransactionalRef[_]] = txItemsLock.withReadLock {
- _transactionalRefs
- }
-
- /**
- * Sends a one way message to the server - alias for cast(message).
- * - * Example: - *
- * server ! Message - *- */ - def !(message: Any) = { - require(server != null) - lock.withReadLock { server ! message } - } - - /** - * Sends a message to the server and gets a future back with the reply. Returns - * an Option with either Some(result) if succesful or None if timeout. - *
- * Timeout specified by the setTimeout(time: Int) method.
- *
- * Example: - *
- * (server !!! Message).getOrElse(throw new RuntimeException("time out")
- *
- */
- def !!: Option[T] = {
- require(server != null)
- val future: FutureWithTimeout[T] = lock.withReadLock { server !!! message }
- future.receiveWithin(timeout)
- }
-
- /**
- * Sends a message to the server and gets a future back with the reply.
- * - * Tries to get the reply within the timeout specified in the GenericServerContainer - * and else execute the error handler (which can return a default value, throw an exception - * or whatever is appropriate). - *
- * Example: - *
- * server !!! (Message, throw new RuntimeException("time out"))
- * // OR
- * server !!! (Message, DefaultReturnValue)
- *
- */
- def !!: T = !!!(message, errorHandler, timeout)
-
- /**
- * Sends a message to the server and gets a future back with the reply.
- * - * Tries to get the reply within the timeout specified as parameter to the method - * and else execute the error handler (which can return a default value, throw an exception - * or whatever is appropriate). - *
- * Example: - *
- * server !!! (Message, throw new RuntimeException("time out"), 1000)
- * // OR
- * server !!! (Message, DefaultReturnValue, 1000)
- *
- */
- def !!: T = {
- require(server != null)
- val future: FutureWithTimeout[T] = lock.withReadLock { server !!! message }
- future.receiveWithin(time) match {
- case None => errorHandler
- case Some(reply) => reply
- }
- }
-
- /**
- * Initializes the server by sending a Init(config) message.
- */
- def init(config: AnyRef) = lock.withWriteLock {
- currentConfig = Some(config)
- server ! Init(config)
- }
-
- /**
- * Re-initializes the server by sending a ReInit(config) message with the most recent configuration.
- */
- def reinit = lock.withWriteLock {
- currentConfig match {
- case Some(config) => server ! ReInit(config)
- case None => {}
- }
- }
-
- /**
- * Hotswaps the server body by sending it a HotSwap(code) with the new code
- * block (PartialFunction) to be executed.
- */
- def hotswap(code: Option[PartialFunction[Any, Unit]]) = lock.withReadLock { server ! HotSwap(code) }
-
- /**
- * Swaps the server factory, enabling creating of a completely new server implementation
- * (upon failure and restart).
- */
- def swapFactory(newFactory: () => GenericServer) = serverFactory = newFactory
-
- /**
- * Sets the timeout for the call(..) method, e.g. the maximum time to wait for a reply
- * before bailing out. Sets the timeout on the future return from the call to the server.
- */
- def setTimeout(time: Int) = timeout = time
-
- /**
- * Returns the next message in the servers mailbox.
- */
- def nextMessage = lock.withReadLock { server ? }
-
- /**
- * Creates a new actor for the GenericServerContainer, and return the newly created actor.
- */
- private[kernel] def newServer(): GenericServer = lock.withWriteLock {
- server = serverFactory()
- server
- }
-
- /**
- * Starts the server.
- */
- private[kernel] def start = lock.withReadLock { server.start }
-
- /**
- * Terminates the server with a reason by sending a Terminate(Some(reason)) message.
- */
- private[kernel] def terminate(reason: AnyRef) = lock.withReadLock { server ! Terminate(reason) }
-
- /**
- * Terminates the server with a reason by sending a Terminate(Some(reason)) message,
- * the shutdownTime defines the maximal time to wait for the server to shutdown before
- * killing it.
- */
- private[kernel] def terminate(reason: AnyRef, shutdownTime: Int) = lock.withReadLock {
- if (shutdownTime > 0) {
- log.debug("Waiting [%s milliseconds for the server to shut down before killing it.", shutdownTime)
- server !? (shutdownTime, Shutdown(reason)) match {
- case Some('success) => log.debug("Server [%s] has been shut down cleanly.", id)
- case None => log.warning("Server [%s] was **not able** to complete shutdown cleanly within its configured shutdown time [%s]", id, shutdownTime)
- }
- }
- server ! Terminate(reason)
- }
-
- private[kernel] def reconfigure(reason: AnyRef, restartedServer: GenericServer, supervisor: Supervisor) = lock.withWriteLock {
- server = restartedServer
- reinit
- }
-
- private[kernel] def getServer: GenericServer = server
-
- private[kernel] def cloneServerAndReturnOldVersion: GenericServer = lock.withWriteLock {
- val oldServer = server
- server = serializer.deepClone(server)
- oldServer
- }
-
- private[kernel] def swapServer(newServer: GenericServer) = lock.withWriteLock {
- server = newServer
- }
-
- override def toString(): String = "GenericServerContainer[" + server + "]"
-}
-
diff --git a/kernel/src/main/scala/Kernel.scala b/kernel/src/main/scala/Kernel.scala
old mode 100755
new mode 100644
index 004cd8dc3d..a79d197a6e
--- a/kernel/src/main/scala/Kernel.scala
+++ b/kernel/src/main/scala/Kernel.scala
@@ -23,6 +23,8 @@ import java.io.{File, IOException}
import javax.ws.rs.core.UriBuilder
import javax.management.JMException
+import kernel.state.CassandraNode
+import kernel.util.Logging
/**
* @author Jonas Bonér
diff --git a/kernel/src/main/scala/Supervisor.scala b/kernel/src/main/scala/Supervisor.scala
deleted file mode 100644
index 9c38d43739..0000000000
--- a/kernel/src/main/scala/Supervisor.scala
+++ /dev/null
@@ -1,334 +0,0 @@
-/**
- * Copyright (C) 2009 Scalable Solutions.
- */
-
-package se.scalablesolutions.akka.kernel
-
-import scala.actors._
-import scala.actors.Actor._
-import scala.collection.mutable.HashMap
-
-import se.scalablesolutions.akka.kernel.Helpers._
-
-import se.scalablesolutions.akka.kernel.config.ScalaConfig._
-
-/**
- * Messages that the supervisor responds to and returns.
- *
- * @author Jonas Bonér
- */
-sealed abstract class SupervisorMessage
-case object Start extends SupervisorMessage
-case object Stop extends SupervisorMessage
-case class Configure(config: SupervisorConfig, factory: SupervisorFactory) extends SupervisorMessage
-
-/**
- * Abstract base class for all supervisor factories.
- * - * Example usage: - *
- * class MySupervisorFactory extends SupervisorFactory {
- *
- * override protected def getSupervisorConfig: SupervisorConfig = {
- * SupervisorConfig(
- * RestartStrategy(OneForOne, 3, 10),
- * Worker(
- * myFirstActorInstance,
- * LifeCycle(Permanent, 1000))
- * ::
- * Worker(
- * mySecondActorInstance,
- * LifeCycle(Permanent, 1000))
- * :: Nil)
- * }
- * }
- *
- *
- * Then create a concrete factory in which we mix in support for the specific implementation of the Service we want to use.
- *
- * - * object factory extends MySupervisorFactory - *- * - * Then create a new Supervisor tree with the concrete Services we have defined. - * - *
- * val supervisor = factory.newSupervisor - * supervisor ! Start // start up all managed servers - *- * - * @author Jonas Bonér - */ -abstract class SupervisorFactory extends Logging { - def newSupervisor: Supervisor = newSupervisorFor(getSupervisorConfig) - - def newSupervisorFor(config: SupervisorConfig): Supervisor = config match { - case SupervisorConfig(restartStrategy, _) => - val supervisor = create(restartStrategy) - supervisor.start - supervisor !? Configure(config, this) match { - case 'configSuccess => log.debug("Supervisor successfully configured") - case _ => log.error("Supervisor could not be configured") - } - supervisor - } - - /** - * To be overridden by concrete factory. - * Should return the SupervisorConfig for the supervisor. - */ - protected def getSupervisorConfig: SupervisorConfig - - protected def create(strategy: RestartStrategy): Supervisor = strategy match { - case RestartStrategy(scheme, maxNrOfRetries, timeRange) => - scheme match { - case AllForOne => new Supervisor(new AllForOneStrategy(maxNrOfRetries, timeRange)) - case OneForOne => new Supervisor(new OneForOneStrategy(maxNrOfRetries, timeRange)) - } - } -} - -//==================================================== -/** - * TODO: document - * - * @author Jonas Bonér - */ -class Supervisor(faultHandler: FaultHandlingStrategy) extends Actor with Logging { - - private val state = new SupervisorState(this, faultHandler) - - /** - * Returns an Option with the GenericServerContainer for the server with the name specified. - * If the server is found then Some(server) is returned else None. - */ - def getServer(id: String): Option[GenericServerContainer] = state.getServerContainer(id) - - /** - * Returns an the GenericServerContainer for the server with the name specified. - * If the server is not found then the error handler is invoked. - */ - def getServerOrElse(id: String, errorHandler: => GenericServerContainer): GenericServerContainer = { - getServer(id) match { - case Some(serverContainer) => serverContainer - case None => errorHandler - } - } - - def stop = Actor.self ! Stop - - def act = { - self.trapExit = true - loop { - react { - case Configure(config, factory) => - log.debug("Configuring supervisor:%s ", this) - configure(config, factory) - reply('configSuccess) - - case Start => - state.serverContainers.foreach { serverContainer => - serverContainer.start - log.info("Starting server: %s", serverContainer.getServer) - } - - case Stop => - state.serverContainers.foreach { serverContainer => - serverContainer.terminate('normal) - log.info("Stopping ser-ver: %s", serverContainer) - } - log.info("Stopping supervisor: %s", this) - exit('normal) - - case Exit(failedServer, reason) => - reason match { - case 'forced => {} // do nothing - case _ => state.faultHandler.handleFailure(state, failedServer, reason) - } - - case unexpected => log.warning("Unexpected message [%s] from [%s] ignoring...", unexpected, sender) - } - } - } - - private def configure(config: SupervisorConfig, factory: SupervisorFactory) = config match { - case SupervisorConfig(_, servers) => - servers.map(server => - server match { - case Worker(serverContainer, lifecycle) => - serverContainer.lifeCycle = Some(lifecycle) - spawnLink(serverContainer) - - case SupervisorConfig(_, _) => // recursive configuration - val supervisor = factory.newSupervisorFor(server.asInstanceOf[SupervisorConfig]) - supervisor ! Start - state.addSupervisor(supervisor) - }) - } - - private[kernel] def spawnLink(serverContainer: GenericServerContainer): GenericServer = { - val newServer = serverContainer.newServer() - newServer.start - self.link(newServer) - log.debug("Linking actor [%s] to supervisor [%s]", newServer, this) - state.addServerContainer(serverContainer) - newServer - } -} - -/** - * TODO: document - * - * @author Jonas Bonér - */ -abstract class FaultHandlingStrategy(val maxNrOfRetries: Int, val withinTimeRange: Int) extends Logging { - private[kernel] var supervisor: Supervisor = _ - private var nrOfRetries = 0 - private var retryStartTime = currentTime - - private[kernel] def handleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = { - nrOfRetries += 1 - if (timeRangeHasExpired) { - if (hasReachedMaximumNrOfRetries) { - log.info("Maximum of restarts [%s] for server [%s] has been reached - the supervisor including all its servers will now be shut down.", maxNrOfRetries, failedServer) - supervisor ! Stop // execution stops here - } else { - nrOfRetries = 0 - retryStartTime = currentTime - } - } - doHandleFailure(state, failedServer, reason) - } - - private[kernel] def restart(serverContainer: GenericServerContainer, reason: AnyRef, state: SupervisorState) = { - preRestart(serverContainer) - serverContainer.lock.withWriteLock { - // TODO: this is the place to fail-over all pending messages in the failing actor's mailbox, if possible to get a hold of them - // e.g. something like 'serverContainer.getServer.getPendingMessages.map(newServer ! _)' - - self.unlink(serverContainer.getServer) - serverContainer.lifeCycle match { - case None => - throw new IllegalStateException("Server [" + serverContainer.id + "] does not have a life-cycle defined.") - case Some(LifeCycle(scope, shutdownTime)) => { - serverContainer.terminate(reason, shutdownTime) - - scope match { - case Permanent => { - log.debug("Restarting server [%s] configured as PERMANENT.", serverContainer.id) - serverContainer.reconfigure(reason, supervisor.spawnLink(serverContainer), state.supervisor) - } - - case Temporary => - if (reason == 'normal) { - log.debug("Restarting server [%s] configured as TEMPORARY (since exited naturally).", serverContainer.id) - serverContainer.reconfigure(reason, supervisor.spawnLink(serverContainer), state.supervisor) - } else log.info("Server [%s] configured as TEMPORARY will not be restarted (received unnatural exit message).", serverContainer.id) - - case Transient => - log.info("Server [%s] configured as TRANSIENT will not be restarted.", serverContainer.id) - } - } - } - } - postRestart(serverContainer) - } - - /** - * To be overriden by concrete strategies. - */ - protected def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) - - /** - * To be overriden by concrete strategies. - */ - protected def preRestart(serverContainer: GenericServerContainer) = {} - - /** - * To be overriden by concrete strategies. - */ - protected def postRestart(serverContainer: GenericServerContainer) = {} - - private def hasReachedMaximumNrOfRetries: Boolean = nrOfRetries > maxNrOfRetries - private def timeRangeHasExpired: Boolean = (currentTime - retryStartTime) > withinTimeRange - private def currentTime: Long = System.currentTimeMillis -} - -//==================================================== -/** - * TODO: document - * - * @author Jonas Bonér - */ -class AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) -extends FaultHandlingStrategy(maxNrOfRetries, withinTimeRange) { - override def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = { - log.error("Server [%s] has failed due to [%s] - scheduling restart - scheme: ALL_FOR_ONE.", failedServer, reason) - for (serverContainer <- state.serverContainers) restart(serverContainer, reason, state) - state.supervisors.foreach(_ ! Exit(failedServer, reason)) - } -} - -//==================================================== -/** - * TODO: document - * - * @author Jonas Bonér - */ -class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) -extends FaultHandlingStrategy(maxNrOfRetries, withinTimeRange) { - override def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = { - log.error("Server [%s] has failed due to [%s] - scheduling restart - scheme: ONE_FOR_ONE.", failedServer, reason) - var serverContainer: Option[GenericServerContainer] = None - state.serverContainers.foreach { - container => if (container.getServer == failedServer) serverContainer = Some(container) - } - serverContainer match { - case None => throw new RuntimeException("Could not find a generic server for actor: " + failedServer) - case Some(container) => restart(container, reason, state) - } - } -} - -//==================================================== -/** - * TODO: document - * - * @author Jonas Bonér - */ -private[kernel] class SupervisorState(val supervisor: Supervisor, val faultHandler: FaultHandlingStrategy) extends Logging { - faultHandler.supervisor = supervisor - - private val _lock = new ReadWriteLock - private val _serverContainerRegistry = new HashMap[String, GenericServerContainer] - private var _supervisors: List[Supervisor] = Nil - - def supervisors: List[Supervisor] = _lock.withReadLock { - _supervisors - } - - def addSupervisor(supervisor: Supervisor) = _lock.withWriteLock { - _supervisors = supervisor :: _supervisors - } - - def serverContainers: List[GenericServerContainer] = _lock.withReadLock { - _serverContainerRegistry.values.toList - } - - def getServerContainer(id: String): Option[GenericServerContainer] = _lock.withReadLock { - if (_serverContainerRegistry.contains(id)) Some(_serverContainerRegistry(id)) - else None - } - - def addServerContainer(serverContainer: GenericServerContainer) = _lock.withWriteLock { - _serverContainerRegistry += serverContainer.id -> serverContainer - } - - def removeServerContainer(id: String) = _lock.withWriteLock { - getServerContainer(id) match { - case Some(serverContainer) => _serverContainerRegistry - id - case None => {} - } - } -} - diff --git a/kernel/src/main/scala/ActiveObject.scala b/kernel/src/main/scala/actor/ActiveObject.scala old mode 100755 new mode 100644 similarity index 75% rename from kernel/src/main/scala/ActiveObject.scala rename to kernel/src/main/scala/actor/ActiveObject.scala index 7f83da11bd..2b85e46e2d --- a/kernel/src/main/scala/ActiveObject.scala +++ b/kernel/src/main/scala/actor/ActiveObject.scala @@ -2,16 +2,20 @@ * Copyright (C) 2009 Scalable Solutions. */ -package se.scalablesolutions.akka.kernel - -import kernel.camel.{MessageDriven, ActiveObjectProducer} -import config.ActiveObjectGuiceConfigurator -import config.ScalaConfig._ +package se.scalablesolutions.akka.kernel.actor import java.util.{List => JList, ArrayList} import java.lang.reflect.{Method, Field} import java.lang.annotation.Annotation +import kernel.config.ActiveObjectGuiceConfigurator +import kernel.config.ScalaConfig._ +import kernel.camel.{MessageDriven, ActiveObjectProducer} +import kernel.util.Helpers.ReadWriteLock +import kernel.util.{HashCode, ResultOrFailure} +import kernel.state.{Transactional, TransactionalMap, TransactionalRef, TransactionalVector} +import kernel.stm.Transaction + import org.codehaus.aspectwerkz.intercept.{Advisable, AroundAdvice} import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint} import org.codehaus.aspectwerkz.proxy.Proxy @@ -32,15 +36,17 @@ object Annotations { } /** + * Factory for Java API. + * * @author Jonas Bonér */ class ActiveObjectFactory { - def newInstance[T](target: Class[T], server: GenericServerContainer): T = { - ActiveObject.newInstance(target, server) + def newInstance[T](target: Class[T]): T = { + ActiveObject.newInstance(target) } - def newInstance[T](intf: Class[T], target: AnyRef, server: GenericServerContainer): T = { - ActiveObject.newInstance(intf, target, server) + def newInstance[T](intf: Class[T], target: AnyRef): T = { + ActiveObject.newInstance(intf, target) } def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor = @@ -48,6 +54,8 @@ class ActiveObjectFactory { } /** + * Factory for Scala API. + * * @author Jonas Bonér */ object ActiveObject { @@ -59,16 +67,16 @@ object ActiveObject { tl } - def newInstance[T](target: Class[T], server: GenericServerContainer): T = { + def newInstance[T](target: Class[T]): T = { val proxy = Proxy.newInstance(target, false, true) // FIXME switch to weaving in the aspect at compile time - proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new TransactionalAroundAdvice(target, proxy, server)) + proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new SequentialTransactionalAroundAdvice(target, proxy)) proxy.asInstanceOf[T] } - def newInstance[T](intf: Class[T], target: AnyRef, server: GenericServerContainer): T = { + def newInstance[T](intf: Class[T], target: AnyRef): T = { val proxy = Proxy.newInstance(Array(intf), Array(target), false, true) - proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new TransactionalAroundAdvice(intf, target, server)) + proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new SequentialTransactionalAroundAdvice(intf, target)) proxy.asInstanceOf[T] } @@ -77,7 +85,7 @@ object ActiveObject { override def getSupervisorConfig = SupervisorConfig(restartStrategy, components) } val supervisor = factory.newSupervisor - supervisor ! se.scalablesolutions.akka.kernel.Start + supervisor ! StartSupervisor supervisor } } @@ -87,19 +95,19 @@ object ActiveObject { */ // FIXME: STM that allows concurrent updates, detects collision, rolls back and restarts -sealed class TransactionalAroundAdvice(target: Class[_], - targetInstance: AnyRef, - server: GenericServerContainer) extends AroundAdvice { +sealed class SequentialTransactionalAroundAdvice(target: Class[_], targetInstance: AnyRef) extends AroundAdvice { + private val changeSet = new ChangeSet(target.getName) + private val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance) - server.transactionalRefs = refs - server.transactionalMaps = maps - server.transactionalVectors = vectors + changeSet.refs = refs + changeSet.maps = maps + changeSet.vectors = vectors import kernel.reactor._ - private[this] var dispatcher = new ProxyDispatcher + private[this] var dispatcher = new ProxyMessageDispatcher private[this] var mailbox = dispatcher.messageQueue dispatcher.start - + import ActiveObject.threadBoundTx private[this] var activeTx: Option[Transaction] = None @@ -107,14 +115,14 @@ sealed class TransactionalAroundAdvice(target: Class[_], def invoke(joinpoint: JoinPoint): AnyRef = { val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti] val method = rtti.getMethod - if (method.isAnnotationPresent(Annotations.transactional)) { + if (reenteringExistingTransaction) { tryToCommitTransaction - startNewTransaction + if (method.isAnnotationPresent(Annotations.transactional)) startNewTransaction + joinExistingTransaction } - joinExistingTransaction - incrementTransaction val result: AnyRef = try { - if (rtti.getMethod.isAnnotationPresent(Annotations.oneway)) sendOneWay(joinpoint) + incrementTransaction + if (rtti.getMethod.isAnnotationPresent(Annotations.oneway)) sendOneWay(joinpoint) // FIXME put in 2 different aspects else handleResult(sendAndReceiveEventually(joinpoint)) } finally { decrementTransaction @@ -135,7 +143,7 @@ sealed class TransactionalAroundAdvice(target: Class[_], private def startNewTransaction = { val newTx = new Transaction - newTx.begin(server) + newTx.begin(changeSet) threadBoundTx.set(Some(newTx)) } @@ -143,20 +151,25 @@ sealed class TransactionalAroundAdvice(target: Class[_], val cflowTx = threadBoundTx.get if (!activeTx.isDefined && cflowTx.isDefined) { val currentTx = cflowTx.get - currentTx.join(server) + currentTx.join(changeSet) activeTx = Some(currentTx) } - activeTx = threadBoundTx.get } - private def tryToPrecommitTransaction = if (activeTx.isDefined) activeTx.get.precommit(server) + private def tryToPrecommitTransaction = if (activeTx.isDefined) activeTx.get.precommit(changeSet) + + private def reenteringExistingTransaction= if (activeTx.isDefined) { + val cflowTx = threadBoundTx.get + if (cflowTx.isDefined && cflowTx.get.id == activeTx.get.id) false + else true + } else true private def tryToCommitTransaction = if (activeTx.isDefined) { val tx = activeTx.get - tx.commit(server) + tx.commit(changeSet) removeTransactionIfTopLevel } - + private def handleResult(result: ResultOrFailure[AnyRef]): AnyRef = { try { result() @@ -170,14 +183,14 @@ sealed class TransactionalAroundAdvice(target: Class[_], private def rollback(tx: Option[Transaction]) = tx match { case None => {} // no tx; nothing to do case Some(tx) => - tx.rollback(server) + tx.rollback(changeSet) } - private def sendOneWay(joinpoint: JoinPoint) = + private def sendOneWay(joinpoint: JoinPoint) = mailbox.append(new MessageHandle(this, Invocation(joinpoint, activeTx), new NullFutureResult)) private def sendAndReceiveEventually(joinpoint: JoinPoint): ResultOrFailure[AnyRef] = { - val future = postMessageToMailboxAndCreateFutureResultWithTimeout(Invocation(joinpoint, activeTx), 1000) + val future = postMessageToMailboxAndCreateFutureResultWithTimeout(Invocation(joinpoint, activeTx), 1000) // FIXME configure future.await_? getResultOrThrowException(future) } @@ -192,7 +205,8 @@ sealed class TransactionalAroundAdvice(target: Class[_], private def getResultOrThrowException[T](future: FutureResult): ResultOrFailure[AnyRef] = if (future.exception.isDefined) { var resultOrFailure = ResultOrFailure(activeTx) - resultOrFailure() = throw future.exception.get + val (toBlame, cause) = future.exception.get + resultOrFailure() = throw cause resultOrFailure } else ResultOrFailure(future.result.get, activeTx) @@ -230,43 +244,44 @@ sealed class TransactionalAroundAdvice(target: Class[_], else getTransactionalItemsFor(parent) } - // start the search for transactional items, crawl the class hierarchy up until we reach 'null' + // start the search for transactional items, crawl the class hierarchy up until we reach Object getTransactionalItemsFor(targetInstance.getClass) } } -/** - * Generic GenericServer managing Invocation dispatch, transaction and error management. - * - * @author Jonas Bonér - */ -private[kernel] class Dispatcher(val targetName: String) extends GenericServer { - override def body: PartialFunction[Any, Unit] = { - - case Invocation(joinpoint: JoinPoint, tx: Option[Transaction]) => - ActiveObject.threadBoundTx.set(tx) - try { - reply(ResultOrFailure(joinpoint.proceed, tx)) - } catch { - case e => - val resultOrFailure = ResultOrFailure(tx) - resultOrFailure() = throw e - reply(resultOrFailure) - } - - case 'exit => - exit - -/* case exchange: Exchange => - println("=============> Exchange From Actor: " + exchange) - val invocation = exchange.getIn.getBody.asInstanceOf[Invocation] - invocation.invoke -*/ - case unexpected => - throw new ActiveObjectException("Unexpected message [" + unexpected + "] to [" + this + "] from [" + sender + "]") +class ChangeSet(val id: String) { + private val lock = new ReadWriteLock + + private[kernel] def full: List[Transactional] = lock.withReadLock { + _maps ::: _vectors ::: _refs } - override def toString(): String = "GenericServer[" + targetName + "]" + // TX Maps + private[kernel] var _maps: List[TransactionalMap[_, _]] = Nil + private[kernel] def maps_=(maps: List[TransactionalMap[_, _]]) = lock.withWriteLock { + _maps = maps + } + private[kernel] def maps: List[TransactionalMap[_, _]] = lock.withReadLock { + _maps + } + + // TX Vectors + private[kernel] var _vectors: List[TransactionalVector[_]] = Nil + private[kernel] def vectors_=(vectors: List[TransactionalVector[_]]) = lock.withWriteLock { + _vectors = vectors + } + private[kernel] def vectors: List[TransactionalVector[_]] = lock.withReadLock { + _vectors + } + + // TX Refs + private[kernel] var _refs: List[TransactionalRef[_]] = Nil + private[kernel] def refs_=(refs: List[TransactionalRef[_]]) = lock.withWriteLock { + _refs = refs + } + private[kernel] def refs: List[TransactionalRef[_]] = lock.withReadLock { + _refs + } } /** @@ -274,7 +289,7 @@ private[kernel] class Dispatcher(val targetName: String) extends GenericServer { * * @author Jonas Bonér */ -private[kernel] case class Invocation(val joinpoint: JoinPoint, val transaction: Option[Transaction]) { +@serializable private[kernel] case class Invocation(val joinpoint: JoinPoint, val transaction: Option[Transaction]) { override def toString: String = synchronized { "Invocation [joinpoint: " + joinpoint.toString+ " | transaction: " + transaction.toString + "]" @@ -349,5 +364,33 @@ ublic class CamelInvocationHandler implements InvocationHandler { exchange.getOut.getBody } else +*/ -*/ \ No newline at end of file +/** + * Generic GenericServer managing Invocation dispatch, transaction and error management. + * + * @author Jonas Bonér + */ +private[kernel] class Dispatcher(val targetName: String) extends Actor { + override def receive: PartialFunction[Any, Unit] = { + + case Invocation(joinpoint: JoinPoint, tx: Option[Transaction]) => + ActiveObject.threadBoundTx.set(tx) + try { + reply(ResultOrFailure(joinpoint.proceed, tx)) + } catch { + case e => + val resultOrFailure = ResultOrFailure(tx) + resultOrFailure() = throw e + reply(resultOrFailure) + } + + case 'exit => + exit + + case unexpected => + throw new ActiveObjectException("Unexpected message [" + unexpected + "] sent to [" + this + "]") + } + + override def toString(): String = "Actor[" + targetName + "]" +} \ No newline at end of file diff --git a/kernel/src/main/scala/actor/Actor.scala b/kernel/src/main/scala/actor/Actor.scala new file mode 100644 index 0000000000..ddf9b70ac8 --- /dev/null +++ b/kernel/src/main/scala/actor/Actor.scala @@ -0,0 +1,267 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel.actor + +import java.util.concurrent.{CopyOnWriteArraySet, TimeUnit} + +import kernel.reactor._ +import kernel.config.ScalaConfig._ +import kernel.util.Logging +import kernel.util.Helpers._ + +sealed abstract class LifecycleMessage +case class Init(config: AnyRef) extends LifecycleMessage +case class Stop(reason: AnyRef) extends LifecycleMessage +case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifecycleMessage +case class Restart(reason: AnyRef) extends LifecycleMessage +case class Exit(dead: Actor, killer: Throwable) extends LifecycleMessage + +sealed abstract class DispatcherType +object DispatcherType { + case object EventBasedThreadPooledProxyInvokingDispatcher extends DispatcherType + case object EventBasedSingleThreadingDispatcher extends DispatcherType + case object EventBasedThreadPoolingDispatcher extends DispatcherType + case object ThreadBasedDispatcher extends DispatcherType +} + +class ActorMessageHandler(val actor: Actor) extends MessageHandler { + def handle(handle: MessageHandle) = actor.handle(handle.message, handle.future) +} + +trait Actor extends Logging { + var timeout: Long = 1000L + + @volatile private[this] var isRunning: Boolean = false + protected[this] var id: String = super.toString + protected[this] var dispatcher: MessageDispatcher = _ + protected[this] var senderFuture: Option[CompletableFutureResult] = None + protected[this] val linkedActors = new CopyOnWriteArraySet[Actor] + + protected[actor] var mailbox: MessageQueue = _ + protected[actor] var supervisor: Option[Actor] = None + protected[actor] var lifeCycleConfig: Option[LifeCycle] = None + + private var hotswap: Option[PartialFunction[Any, Unit]] = None + private var config: Option[AnyRef] = None + + // ==================================== + // ==== USER CALLBACKS TO OVERRIDE ==== + // ==================================== + + protected var faultHandler: Option[FaultHandlingStrategy] = None + + /** + * Set dispatcher type to either ThreadBasedDispatcher, EventBasedSingleThreadingDispatcher or EventBasedThreadPoolingDispatcher. + * Default is EventBasedThreadPoolingDispatcher. + */ + protected[this] var dispatcherType: DispatcherType = DispatcherType.EventBasedThreadPoolingDispatcher + + /** + * Set trapExit to true if actor should be able to trap linked actors exit messages. + */ + @volatile protected[this] var trapExit: Boolean = false + + /** + * Partial function implementing the server logic. + * To be implemented by subclassing server. + * + * Example code: + *
+ * def receive: PartialFunction[Any, Unit] = {
+ * case Ping =>
+ * println("got a ping")
+ * reply("pong")
+ *
+ * case OneWay =>
+ * println("got a oneway")
+ *
+ * case _ =>
+ * println("unknown message, ignoring")
+ * }
+ *
+ */
+ protected def receive: PartialFunction[Any, Unit]
+
+ /**
+ * Optional callback method that is called during initialization.
+ * To be implemented by subclassing actor.
+ */
+ protected def init(config: AnyRef) {}
+
+ /**
+ * Mandatory callback method that is called during restart and reinitialization after a server crash.
+ * To be implemented by subclassing actor.
+ */
+ protected def preRestart(reason: AnyRef, config: Option[AnyRef]) {}
+
+ /**
+ * Mandatory callback method that is called during restart and reinitialization after a server crash.
+ * To be implemented by subclassing actor.
+ */
+ protected def postRestart(reason: AnyRef, config: Option[AnyRef]) {}
+
+ /**
+ * Optional callback method that is called during termination.
+ * To be implemented by subclassing actor.
+ */
+ protected def shutdown(reason: AnyRef) {}
+
+ // =============
+ // ==== API ====
+ // =============
+
+ def !(message: AnyRef) =
+ if (isRunning) mailbox.append(new MessageHandle(this, message, new NullFutureResult))
+ else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
+
+ def !: Option[T] = if (isRunning) {
+ val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
+ future.await_?
+ getResultOrThrowException(future)
+ } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
+
+ def !: Option[T] = !
+
+ def !?[T](message: AnyRef): T = if (isRunning) {
+ val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, 0)
+ future.await_!
+ getResultOrThrowException(future).get
+ } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
+
+ // FIXME can be deadlock prone - HOWTO?
+ def link(actor: Actor) = synchronized { actor.synchronized {
+ if (isRunning) {
+ linkedActors.add(actor)
+ if (actor.supervisor.isDefined) throw new IllegalStateException("Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails")
+ actor.supervisor = Some(this)
+ log.debug("Linking actor [%s] to actor [%s]", actor, this)
+ } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
+ }}
+
+ // FIXME can be deadlock prone - HOWTO?
+ def unlink(actor: Actor) = synchronized { actor.synchronized {
+ if (isRunning) {
+ if (!linkedActors.contains(actor)) throw new IllegalStateException("Actor [" + actor + "] is not a linked actor, can't unlink")
+ linkedActors.remove(actor)
+ actor.supervisor = None
+ log.debug("Unlinking actor [%s] from actor [%s]", actor, this)
+ } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
+ }}
+
+ /**
+ * Atomically start and link actor.
+ */
+ def spawnLink(actor: Actor) = actor.synchronized {
+ actor.start
+ link(actor)
+ }
+
+ def start = synchronized {
+ if (!isRunning) {
+ dispatcherType match {
+ case DispatcherType.EventBasedSingleThreadingDispatcher =>
+ dispatcher = new EventBasedSingleThreadDispatcher
+ case DispatcherType.EventBasedThreadPoolingDispatcher =>
+ dispatcher = new EventBasedThreadPoolDispatcher
+ case DispatcherType.ThreadBasedDispatcher =>
+ dispatcher = new ThreadBasedDispatcher
+ }
+ mailbox = dispatcher.messageQueue
+ dispatcher.registerHandler(this, new ActorMessageHandler(this))
+ dispatcher.start
+ isRunning = true
+ }
+ }
+
+ def stop = synchronized {
+ if (isRunning) {
+ dispatcher.unregisterHandler(this)
+ isRunning = false
+ } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
+ }
+
+ protected def reply(message: AnyRef) = senderFuture match {
+ case None => throw new IllegalStateException("No sender future in scope, can't reply. Have you used '!' (async, fire-and-forget)? If so switch to '!!' to wait using a future." )
+ case Some(future) => future.completeWithResult(message)
+ }
+
+ // ================================
+ // ==== IMPLEMENTATION DETAILS ====
+ // ================================
+
+ private[kernel] def handle(message: AnyRef, future: CompletableFutureResult) = synchronized {
+ try {
+ senderFuture = Some(future)
+ if (base.isDefinedAt(message)) base(message)
+ else throw new IllegalArgumentException("No handler matching message [" + message + "] in actor [" + this.getClass.getName + "]")
+ } catch {
+ case e =>
+ future.completeWithException(this, e)
+ }
+ }
+
+ private def postMessageToMailboxAndCreateFutureResultWithTimeout(
+ message: AnyRef, timeout: Long): CompletableFutureResult = {
+ val future = new DefaultCompletableFutureResult(timeout)
+ mailbox.append(new MessageHandle(this, message, future))
+ future
+ }
+
+ private def getResultOrThrowException[T](future: FutureResult): Option[T] =
+ if (future.exception.isDefined) {
+ val (toBlame, cause) = future.exception.get
+ if (supervisor.isDefined) supervisor.get ! Exit(toBlame.asInstanceOf[Actor], cause)
+ throw cause
+ } else future.result.asInstanceOf[Option[T]]
+
+ private def base: PartialFunction[Any, Unit] = lifeCycle orElse (hotswap getOrElse receive)
+
+ private val lifeCycle: PartialFunction[Any, Unit] = {
+ case Init(config) => init(config)
+ case HotSwap(code) => hotswap = code
+ case Restart(reason) => restart(reason)
+ case Stop(reason) => shutdown(reason); stop
+ case Exit(dead, reason) => handleTrapExit(dead, reason)
+ }
+
+ private[kernel] def handleTrapExit(dead: Actor, reason: Throwable): Unit = if (trapExit) {
+ if (faultHandler.isDefined) {
+ faultHandler.get match {
+ case AllForOneStrategy(maxNrOfRetries, withinTimeRange) => restartLinkedActors(reason)
+ case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => dead.restart(reason)
+ }
+ }
+ }
+
+ private[this] def restartLinkedActors(reason: AnyRef) =
+ linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach(_.restart(reason))
+
+ private[Actor] def restart(reason: AnyRef) = synchronized {
+ lifeCycleConfig match {
+ case None => throw new IllegalStateException("Server [" + id + "] does not have a life-cycle defined.")
+ case Some(LifeCycle(scope, shutdownTime)) => {
+ scope match {
+ case Permanent => {
+ preRestart(reason, config)
+ log.debug("Restarting actor [%s] configured as PERMANENT.", id)
+ // FIXME SWAP actor
+ postRestart(reason, config)
+ }
+
+ case Temporary =>
+// if (reason == 'normal) {
+// log.debug("Restarting actor [%s] configured as TEMPORARY (since exited naturally).", id)
+// scheduleRestart
+// } else log.info("Server [%s] configured as TEMPORARY will not be restarted (received unnatural exit message).", id)
+
+ case Transient =>
+ log.info("Server [%s] configured as TRANSIENT will not be restarted.", id)
+ }
+ }
+ }
+ }
+
+ override def toString(): String = "Actor[" + id + "]"
+}
\ No newline at end of file
diff --git a/kernel/src/main/scala/actor/Supervisor.scala b/kernel/src/main/scala/actor/Supervisor.scala
new file mode 100644
index 0000000000..93fa910d22
--- /dev/null
+++ b/kernel/src/main/scala/actor/Supervisor.scala
@@ -0,0 +1,128 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.kernel.actor
+
+import kernel.util.Logging
+import kernel.config.ScalaConfig._
+import kernel.util.Helpers._
+import scala.collection.mutable.HashMap
+
+/**
+ * Messages that the supervisor responds to and returns.
+ *
+ * @author Jonas Bonér
+ */
+sealed abstract class SupervisorMessage
+case object StartSupervisor extends SupervisorMessage
+case object StopSupervisor extends SupervisorMessage
+case class ConfigureSupervisor(config: SupervisorConfig, factory: SupervisorFactory) extends SupervisorMessage
+case object ConfigSupervisorSuccess extends SupervisorMessage
+
+sealed abstract class FaultHandlingStrategy
+case class AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends FaultHandlingStrategy
+case class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends FaultHandlingStrategy
+
+/**
+ * Abstract base class for all supervisor factories.
+ * + * Example usage: + *
+ * class MySupervisorFactory extends SupervisorFactory {
+ *
+ * override protected def getSupervisorConfig: SupervisorConfig = {
+ * SupervisorConfig(
+ * RestartStrategy(OneForOne, 3, 10),
+ * Worker(
+ * myFirstActorInstance,
+ * LifeCycle(Permanent, 1000))
+ * ::
+ * Worker(
+ * mySecondActorInstance,
+ * LifeCycle(Permanent, 1000))
+ * :: Nil)
+ * }
+ * }
+ *
+ *
+ * Then create a concrete factory in which we mix in support for the specific implementation of the Service we want to use.
+ *
+ * + * object factory extends MySupervisorFactory + *+ * + * Then create a new Supervisor tree with the concrete Services we have defined. + * + *
+ * val supervisor = factory.newSupervisor + * supervisor ! Start // start up all managed servers + *+ * + * @author Jonas Bonér + */ +abstract class SupervisorFactory extends Logging { + def newSupervisor: Supervisor = newSupervisorFor(getSupervisorConfig) + + def newSupervisorFor(config: SupervisorConfig): Supervisor = config match { + case SupervisorConfig(restartStrategy, _) => + val supervisor = create(restartStrategy) + supervisor.start + supervisor.configure(config, this) + supervisor + } + + /** + * To be overridden by concrete factory. + * Should return the SupervisorConfig for the supervisor. + */ + protected def getSupervisorConfig: SupervisorConfig + + protected def create(strategy: RestartStrategy): Supervisor = strategy match { + case RestartStrategy(scheme, maxNrOfRetries, timeRange) => + scheme match { + case AllForOne => new Supervisor(AllForOneStrategy(maxNrOfRetries, timeRange)) + case OneForOne => new Supervisor(OneForOneStrategy(maxNrOfRetries, timeRange)) + } + } +} + +/** + * @author Jonas Bonér + */ +class Supervisor(handler: FaultHandlingStrategy) extends Actor with Logging { + trapExit = true + faultHandler = Some(handler) + + def startSupervisor = { + start + this ! StartSupervisor + } + + def stopSupervisor = this ! StopSupervisor + + protected def receive: PartialFunction[Any, Unit] = { + case StartSupervisor => + linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor => actor.start; log.info("Starting actor: %s", actor) } + + case StopSupervisor => + linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor => actor.stop; log.info("Stopping actor: %s", actor) } + log.info("Stopping supervisor: %s", this) + stop + } + + def configure(config: SupervisorConfig, factory: SupervisorFactory) = config match { + case SupervisorConfig(_, servers) => + servers.map(server => + server match { + case Worker(actor, lifecycle) => + actor.lifeCycleConfig = Some(lifecycle) + spawnLink(actor) + + case SupervisorConfig(_, _) => // recursive configuration + val supervisor = factory.newSupervisorFor(server.asInstanceOf[SupervisorConfig]) + supervisor ! StartSupervisor + // FIXME what to do with recursively supervisors? + }) + } +} diff --git a/kernel/src/main/scala/camel/ActiveObjectConsumer.scala b/kernel/src/main/scala/camel/ActiveObjectConsumer.scala index a3ac5425a9..e7be8b0dd9 100644 --- a/kernel/src/main/scala/camel/ActiveObjectConsumer.scala +++ b/kernel/src/main/scala/camel/ActiveObjectConsumer.scala @@ -6,7 +6,7 @@ package se.scalablesolutions.akka.kernel.camel import java.util.concurrent.{BlockingQueue, ExecutorService, Executors, ThreadFactory, TimeUnit} -import se.scalablesolutions.akka.kernel.{Logging, GenericServerContainer} +import kernel.util.Logging import org.apache.camel.{AsyncCallback, AsyncProcessor, Consumer, Exchange, Processor} import org.apache.camel.impl.ServiceSupport diff --git a/kernel/src/main/scala/camel/ActiveObjectEndpoint.scala b/kernel/src/main/scala/camel/ActiveObjectEndpoint.scala index dfe70efc2e..e70a5bd937 100644 --- a/kernel/src/main/scala/camel/ActiveObjectEndpoint.scala +++ b/kernel/src/main/scala/camel/ActiveObjectEndpoint.scala @@ -4,8 +4,8 @@ package se.scalablesolutions.akka.kernel.camel -import config.ActiveObjectConfigurator -import se.scalablesolutions.akka.kernel.Logging +import kernel.config.ActiveObjectConfigurator +import kernel.util.Logging import java.util.{ArrayList, HashSet, List, Set} import java.util.concurrent.{BlockingQueue, CopyOnWriteArraySet, LinkedBlockingQueue} diff --git a/kernel/src/main/scala/camel/ActiveObjectProducer.scala b/kernel/src/main/scala/camel/ActiveObjectProducer.scala index 0c7a6a4d71..f2af00517c 100644 --- a/kernel/src/main/scala/camel/ActiveObjectProducer.scala +++ b/kernel/src/main/scala/camel/ActiveObjectProducer.scala @@ -4,11 +4,10 @@ package se.scalablesolutions.akka.kernel.camel -import java.util.Collection; +import java.util.Collection +import kernel.util.Logging; import java.util.concurrent.BlockingQueue; -import se.scalablesolutions.akka.kernel.{Logging, GenericServerContainer} - import org.apache.camel.{Exchange, AsyncProcessor, AsyncCallback} import org.apache.camel.impl.DefaultProducer diff --git a/kernel/src/main/scala/camel/SupervisorAwareCamelContext.scala b/kernel/src/main/scala/camel/SupervisorAwareCamelContext.scala index 36cc8a21fc..b56d3f8d82 100644 --- a/kernel/src/main/scala/camel/SupervisorAwareCamelContext.scala +++ b/kernel/src/main/scala/camel/SupervisorAwareCamelContext.scala @@ -4,8 +4,9 @@ package se.scalablesolutions.akka.kernel.camel +import kernel.actor.Supervisor +import kernel.util.Logging import org.apache.camel.impl.{DefaultCamelContext, DefaultEndpoint, DefaultComponent} -import se.scalablesolutions.akka.kernel.{Supervisor, Logging} /** * @author Jonas Bonér diff --git a/kernel/src/main/scala/config/ActiveObjectConfigurator.scala b/kernel/src/main/scala/config/ActiveObjectConfigurator.scala index d48690a33f..30e0a4ad44 100644 --- a/kernel/src/main/scala/config/ActiveObjectConfigurator.scala +++ b/kernel/src/main/scala/config/ActiveObjectConfigurator.scala @@ -4,8 +4,9 @@ package se.scalablesolutions.akka.kernel.config -import config.ScalaConfig.{RestartStrategy, Component} +import ScalaConfig.{RestartStrategy, Component} import javax.servlet.ServletContext +import kernel.util.Logging object ActiveObjectConfigurator extends Logging { diff --git a/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala index 2a5a52e45f..d3220e02c7 100644 --- a/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala +++ b/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala @@ -7,17 +7,19 @@ package se.scalablesolutions.akka.kernel.config import com.google.inject._ import com.google.inject.jsr250.ResourceProviderFactory -import java.lang.reflect.Method -import javax.servlet.ServletContext +import ScalaConfig._ +import kernel.actor.{Supervisor, ActiveObjectFactory} +import kernel.camel.ActiveObjectComponent +import kernel.util.Logging import org.apache.camel.impl.{JndiRegistry, DefaultCamelContext} import org.apache.camel.{CamelContext, Endpoint, Routes} import scala.collection.mutable.HashMap -import kernel.camel.ActiveObjectComponent -import kernel.{ActiveObjectFactory, Supervisor} -import kernel.config.ScalaConfig._ +import java.lang.reflect.Method +import javax.servlet.ServletContext + /** * @author Jonas Bonér @@ -32,7 +34,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC private var workers: List[Worker] = Nil private var bindings: List[DependencyBinding] = Nil private var configRegistry = new HashMap[Class[_], Component] // TODO is configRegistry needed? - private var activeObjectRegistry = new HashMap[Class[_], Tuple4[AnyRef, AnyRef, Component, GenericServerContainer]] + private var activeObjectRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, Component]] private var activeObjectFactory = new ActiveObjectFactory private var camelContext = new DefaultCamelContext private var modules = new java.util.ArrayList[Module] @@ -47,7 +49,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC override def getActiveObject[T](clazz: Class[T]): T = synchronized { log.debug("Retrieving active object [%s]", clazz.getName) if (injector == null) throw new IllegalStateException("inject() and/or supervise() must be called before invoking getActiveObject(clazz)") - val (proxy, targetInstance, component, server) = + val (proxy, targetInstance, component) = activeObjectRegistry.getOrElse(clazz, throw new IllegalStateException("Class [" + clazz.getName + "] has not been put under supervision (by passing in the config to the 'configureActiveObjects' and then invoking 'supervise') method")) injector.injectMembers(targetInstance) proxy.asInstanceOf[T] @@ -104,23 +106,21 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC private def newSubclassingProxy(component: Component): DependencyBinding = { val targetClass = component.target - val server = new GenericServerContainer(targetClass.getName, () => new Dispatcher(component.target.getName)) - server.setTimeout(component.timeout) - workers ::= Worker(server, component.lifeCycle) - val proxy = activeObjectFactory.newInstance(targetClass, server).asInstanceOf[AnyRef] - activeObjectRegistry.put(targetClass, (proxy, proxy, component, server)) + + // FIXME add wrapping Actor and pass into Worker + workers ::= Worker(null, component.lifeCycle) + val proxy = activeObjectFactory.newInstance(targetClass).asInstanceOf[AnyRef] + activeObjectRegistry.put(targetClass, (proxy, proxy, component)) new DependencyBinding(targetClass, proxy) } private def newDelegatingProxy(component: Component): DependencyBinding = { val targetClass = component.intf.get - val server = new GenericServerContainer(targetClass.getName, () => new Dispatcher(component.target.getName)) - server.setTimeout(component.timeout) - workers ::= Worker(server, component.lifeCycle) - component.target.getConstructor(Array[Class[_]]()).setAccessible(true) val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry - val proxy = activeObjectFactory.newInstance(targetClass, targetInstance, server).asInstanceOf[AnyRef] - activeObjectRegistry.put(targetClass, (proxy, targetInstance, component, server)) + workers ::= Worker(null, component.lifeCycle) // TODO null is not an Actor + component.target.getConstructor(Array[Class[_]]()).setAccessible(true) + val proxy = activeObjectFactory.newInstance(targetClass, targetInstance).asInstanceOf[AnyRef] + activeObjectRegistry.put(targetClass, (proxy, targetInstance, component)) new DependencyBinding(targetClass, proxy) } @@ -168,7 +168,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC def reset = synchronized { modules = new java.util.ArrayList[Module] configRegistry = new HashMap[Class[_], Component] - activeObjectRegistry = new HashMap[Class[_], Tuple4[AnyRef, AnyRef, Component, GenericServerContainer]] + activeObjectRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, Component]] methodToUriRegistry = new HashMap[Method, String] injector = null restartStrategy = null diff --git a/kernel/src/main/scala/config/ActiveObjectGuiceConfiguratorForJava.scala b/kernel/src/main/scala/config/ActiveObjectGuiceConfiguratorForJava.scala index b74f2de230..382c54da7a 100644 --- a/kernel/src/main/scala/config/ActiveObjectGuiceConfiguratorForJava.scala +++ b/kernel/src/main/scala/config/ActiveObjectGuiceConfiguratorForJava.scala @@ -5,7 +5,6 @@ package se.scalablesolutions.akka.kernel.config import akka.kernel.config.JavaConfig._ -import akka.kernel.{Supervisor, ActiveObjectFactory} import com.google.inject._ import com.google.inject.jsr250.ResourceProviderFactory diff --git a/kernel/src/main/scala/config/Config.scala b/kernel/src/main/scala/config/Config.scala index e1f6c5fdf3..4d3bdbd0e6 100644 --- a/kernel/src/main/scala/config/Config.scala +++ b/kernel/src/main/scala/config/Config.scala @@ -4,10 +4,9 @@ package se.scalablesolutions.akka.kernel.config +import kernel.actor.Actor import reflect.BeanProperty -import se.scalablesolutions.akka.kernel.GenericServerContainer - /** * Configuration classes - not to be used as messages. * @@ -21,7 +20,7 @@ object ScalaConfig { abstract class Scope extends ConfigElement case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server - case class Worker(serverContainer: GenericServerContainer, lifeCycle: LifeCycle) extends Server + case class Worker(actor: Actor, lifeCycle: LifeCycle) extends Server case class RestartStrategy(scheme: FailOverScheme, maxNrOfRetries: Int, withinTimeRange: Int) extends ConfigElement @@ -96,8 +95,8 @@ object JavaConfig { this(null, target, lifeCycle, timeout) def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.Component( intf, target, lifeCycle.transform, timeout) - def newWorker(server: GenericServerContainer) = - se.scalablesolutions.akka.kernel.config.ScalaConfig.Worker(server, lifeCycle.transform) + def newWorker(actor: Actor) = + se.scalablesolutions.akka.kernel.config.ScalaConfig.Worker(actor, lifeCycle.transform) } } \ No newline at end of file diff --git a/kernel/src/main/scala/Configuration.scala b/kernel/src/main/scala/config/Configuration.scala similarity index 97% rename from kernel/src/main/scala/Configuration.scala rename to kernel/src/main/scala/config/Configuration.scala index d908552bc2..906f33cf01 100755 --- a/kernel/src/main/scala/Configuration.scala +++ b/kernel/src/main/scala/config/Configuration.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009 Scalable Solutions. */ -package se.scalablesolutions.akka.kernel.configuration +package se.scalablesolutions.akka.kernel.config /* import se.scalablesolutions.akka.kernel.{ActiveObject, ActiveObjectProxy} diff --git a/kernel/src/main/scala/jersey/ActiveObjectComponentProvider.scala b/kernel/src/main/scala/jersey/ActiveObjectComponentProvider.scala index bae4bd5f2c..617f0b10af 100644 --- a/kernel/src/main/scala/jersey/ActiveObjectComponentProvider.scala +++ b/kernel/src/main/scala/jersey/ActiveObjectComponentProvider.scala @@ -6,9 +6,8 @@ package se.scalablesolutions.akka.kernel.jersey import com.sun.jersey.core.spi.component.ioc.IoCFullyManagedComponentProvider -import kernel.Logging -import config.ActiveObjectConfigurator - +import kernel.config.ActiveObjectConfigurator +import kernel.util.Logging import java.lang.reflect.{Constructor, InvocationTargetException} class ActiveObjectComponentProvider(val clazz: Class[_], val configurator: ActiveObjectConfigurator) diff --git a/kernel/src/main/scala/nio/ProxyServer.scala b/kernel/src/main/scala/nio/ProxyServer.scala new file mode 100644 index 0000000000..e69de29bb2 diff --git a/kernel/src/main/scala/reactor/Future.scala b/kernel/src/main/scala/reactor/Future.scala index 6fb1fb258d..5921ae099e 100644 --- a/kernel/src/main/scala/reactor/Future.scala +++ b/kernel/src/main/scala/reactor/Future.scala @@ -19,12 +19,12 @@ sealed trait FutureResult { def isExpired: Boolean def timeoutInNanos: Long def result: Option[AnyRef] - def exception: Option[Throwable] + def exception: Option[Tuple2[AnyRef, Throwable]] } trait CompletableFutureResult extends FutureResult { def completeWithResult(result: AnyRef) - def completeWithException(exception: Throwable) + def completeWithException(toBlame: AnyRef, exception: Throwable) } class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureResult { @@ -37,8 +37,8 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes private val _signal = _lock.newCondition private var _completed: Boolean = _ private var _result: Option[AnyRef] = None - private var _exception: Option[Throwable] = None - + private var _exception: Option[Tuple2[AnyRef, Throwable]] = None + override def await_? = try { _lock.lock var wait = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) @@ -86,7 +86,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes _lock.unlock } - override def exception: Option[Throwable] = try { + override def exception: Option[Tuple2[AnyRef, Throwable]] = try { _lock.lock _exception } finally { @@ -104,11 +104,11 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes _lock.unlock } - override def completeWithException(exception: Throwable) = try { + override def completeWithException(toBlame: AnyRef, exception: Throwable) = try { _lock.lock if (!_completed) { _completed = true - _exception = Some(exception) + _exception = Some((toBlame, exception)) } } finally { @@ -121,12 +121,12 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes class NullFutureResult extends CompletableFutureResult { override def completeWithResult(result: AnyRef) = {} - override def completeWithException(exception: Throwable) = {} + override def completeWithException(toBlame: AnyRef, exception: Throwable) = {} override def await_? = throw new UnsupportedOperationException("Not implemented for NullFutureResult") override def await_! = throw new UnsupportedOperationException("Not implemented for NullFutureResult") override def isCompleted: Boolean = throw new UnsupportedOperationException("Not implemented for NullFutureResult") override def isExpired: Boolean = throw new UnsupportedOperationException("Not implemented for NullFutureResult") override def timeoutInNanos: Long = throw new UnsupportedOperationException("Not implemented for NullFutureResult") override def result: Option[AnyRef] = None - override def exception: Option[Throwable] = None + override def exception: Option[Tuple2[AnyRef, Throwable]] = None } diff --git a/kernel/src/main/scala/ManagedActorScheduler.scala b/kernel/src/main/scala/reactor/ManagedActorScheduler.scala similarity index 98% rename from kernel/src/main/scala/ManagedActorScheduler.scala rename to kernel/src/main/scala/reactor/ManagedActorScheduler.scala index 64960fbd34..a251557fc5 100644 --- a/kernel/src/main/scala/ManagedActorScheduler.scala +++ b/kernel/src/main/scala/reactor/ManagedActorScheduler.scala @@ -4,6 +4,7 @@ package se.scalablesolutions.akka.kernel +import kernel.util.Logging import net.lag.logging.Logger import java.util.concurrent.Executors diff --git a/kernel/src/main/scala/reactor/ProxyDispatcher.scala b/kernel/src/main/scala/reactor/ProxyMessageDispatcher.scala similarity index 90% rename from kernel/src/main/scala/reactor/ProxyDispatcher.scala rename to kernel/src/main/scala/reactor/ProxyMessageDispatcher.scala index 16a2cd8292..1a320ffed9 100644 --- a/kernel/src/main/scala/reactor/ProxyDispatcher.scala +++ b/kernel/src/main/scala/reactor/ProxyMessageDispatcher.scala @@ -10,7 +10,10 @@ */ package se.scalablesolutions.akka.kernel.reactor -class ProxyDispatcher extends MessageDispatcherBase { + +import kernel.actor.Invocation + +class ProxyMessageDispatcher extends MessageDispatcherBase { import java.util.concurrent.Executors import java.util.HashSet import org.codehaus.aspectwerkz.joinpoint.JoinPoint @@ -35,12 +38,13 @@ class ProxyDispatcher extends MessageDispatcherBase { for (index <- 0 until queue.size) { val handle = queue.remove handlerExecutor.execute(new Runnable { + val invocation = handle.message.asInstanceOf[Invocation] override def run = { try { - val result = handle.message.asInstanceOf[Invocation].joinpoint.proceed + val result = invocation.joinpoint.proceed handle.future.completeWithResult(result) } catch { - case e: Exception => handle.future.completeWithException(e) + case e: Exception => handle.future.completeWithException(invocation.joinpoint.getTarget, e) } messageDemultiplexer.wakeUp } diff --git a/kernel/src/main/scala/reactor/Reactor.scala b/kernel/src/main/scala/reactor/Reactor.scala index 4f59b5b593..461b7a8deb 100644 --- a/kernel/src/main/scala/reactor/Reactor.scala +++ b/kernel/src/main/scala/reactor/Reactor.scala @@ -11,6 +11,7 @@ package se.scalablesolutions.akka.kernel.reactor import java.util.{LinkedList, Queue} +import kernel.util.HashCode trait MessageHandler { def handle(message: MessageHandle) @@ -50,7 +51,7 @@ class MessageHandle(val sender: AnyRef, val message: AnyRef, val future: Complet } class MessageQueue { - private val queue: Queue[MessageHandle] = new LinkedList[MessageHandle] + private[kernel] val queue: Queue[MessageHandle] = new LinkedList[MessageHandle] @volatile private var interrupted = false def append(handle: MessageHandle) = queue.synchronized { diff --git a/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala b/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala new file mode 100644 index 0000000000..6584e1e867 --- /dev/null +++ b/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +/** + * Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf]. + * See also this article: [http://today.java.net/cs/user/print/a/350]. + * + * Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/]. + */ +package se.scalablesolutions.akka.kernel.reactor + +class ThreadBasedDispatcher extends MessageDispatcherBase { + def start = if (!active) { + active = true + val messageDemultiplexer = new EventBasedSingleThreadDemultiplexer(messageQueue) + selectorThread = new Thread { + override def run = { + while (active) { + guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf] + try { + messageDemultiplexer.select + } catch { case e: InterruptedException => active = false } + val queue = messageDemultiplexer.acquireSelectedQueue + for (index <- 0 until queue.size) { + val handle = queue.remove + val handler = messageHandlers.get(handle.sender) + if (handler != null) handler.handle(handle) + } + } + } + } + selectorThread.start + } +} + +class ThreadBasedDemultiplexer(private val messageQueue: MessageQueue) extends MessageDemultiplexer { + import java.util.{LinkedList, Queue} + + private val selectedQueue: Queue[MessageHandle] = new LinkedList[MessageHandle] + + def select = messageQueue.read(selectedQueue) + + def acquireSelectedQueue: Queue[MessageHandle] = selectedQueue + + def releaseSelectedQueue = throw new UnsupportedOperationException("ThreadBasedDemultiplexer can't release its queue") + + def wakeUp = throw new UnsupportedOperationException("ThreadBasedDemultiplexer can't be woken up") +} diff --git a/kernel/src/main/scala/CassandraNode.scala b/kernel/src/main/scala/state/CassandraNode.scala old mode 100755 new mode 100644 similarity index 98% rename from kernel/src/main/scala/CassandraNode.scala rename to kernel/src/main/scala/state/CassandraNode.scala index 4af446352e..2f3695c3f5 --- a/kernel/src/main/scala/CassandraNode.scala +++ b/kernel/src/main/scala/state/CassandraNode.scala @@ -2,10 +2,11 @@ * Copyright (C) 2009 Scalable Solutions. */ -package se.scalablesolutions.akka.kernel +package se.scalablesolutions.akka.kernel.state import java.io.File import java.lang.reflect.Constructor +import kernel.util.{Serializer, JavaSerializationSerializer, Logging} import org.apache.cassandra.config.DatabaseDescriptor import org.apache.cassandra.service._ diff --git a/kernel/src/main/scala/DataFlowVariable.scala b/kernel/src/main/scala/state/DataFlowVariable.scala similarity index 99% rename from kernel/src/main/scala/DataFlowVariable.scala rename to kernel/src/main/scala/state/DataFlowVariable.scala index 7d6fd4ae4c..93d5865e8f 100644 --- a/kernel/src/main/scala/DataFlowVariable.scala +++ b/kernel/src/main/scala/state/DataFlowVariable.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009 Scalable Solutions. */ -package se.scalablesolutions.akka.kernel +package se.scalablesolutions.akka.kernel.state import scala.actors.Actor import scala.actors.OutputChannel diff --git a/kernel/src/main/scala/State.scala b/kernel/src/main/scala/state/State.scala similarity index 99% rename from kernel/src/main/scala/State.scala rename to kernel/src/main/scala/state/State.scala index 9dbda3af45..60aaa9e0d5 100755 --- a/kernel/src/main/scala/State.scala +++ b/kernel/src/main/scala/state/State.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009 Scalable Solutions. */ -package se.scalablesolutions.akka.kernel +package se.scalablesolutions.akka.kernel.state import org.codehaus.aspectwerkz.proxy.Uuid import se.scalablesolutions.akka.collection._ diff --git a/kernel/src/main/scala/Transaction.scala b/kernel/src/main/scala/stm/Transaction.scala similarity index 73% rename from kernel/src/main/scala/Transaction.scala rename to kernel/src/main/scala/stm/Transaction.scala index 3b2dff044e..28c205aa1c 100644 --- a/kernel/src/main/scala/Transaction.scala +++ b/kernel/src/main/scala/stm/Transaction.scala @@ -2,12 +2,14 @@ * Copyright (C) 2009 Scalable Solutions. */ -package se.scalablesolutions.akka.kernel +package se.scalablesolutions.akka.kernel.stm import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} +import kernel.actor.ChangeSet +import kernel.util.Logging import scala.collection.mutable.{HashSet, HashMap} -sealed abstract class TransactionStatus +@serializable sealed abstract class TransactionStatus object TransactionStatus { case object New extends TransactionStatus case object Active extends TransactionStatus @@ -31,15 +33,15 @@ object TransactionIdFactory { * * @author Jonas Bonér */ -class Transaction extends Logging { +@serializable class Transaction extends Logging { val id = TransactionIdFactory.newId log.debug("Creating a new transaction with id [%s]", id) // FIXME: add support for nested transactions private[this] var parent: Option[Transaction] = None - private[this] val participants = new HashSet[GenericServerContainer] - private[this] val precommitted = new HashSet[GenericServerContainer] + private[this] val participants = new HashSet[ChangeSet] + private[this] val precommitted = new HashSet[ChangeSet] private[this] val depth = new AtomicInteger(0) @volatile private[this] var status: TransactionStatus = TransactionStatus.New @@ -47,54 +49,53 @@ class Transaction extends Logging { def decrement = depth.decrementAndGet def topLevel_? = depth.get == 0 - def begin(server: GenericServerContainer) = synchronized { + def begin(changeSet: ChangeSet) = synchronized { ensureIsActiveOrNew - if (status == TransactionStatus.New) log.debug("Server [%s] is starting NEW transaction [%s]", server.id, this) - else log.debug("Server [%s] is participating in transaction", server) - server.transactionalItems.foreach(_.begin) - participants + server + if (status == TransactionStatus.New) log.debug("Server [%s] is starting NEW transaction [%s]", changeSet.id, this) + else log.debug("Server [%s] is participating in transaction", changeSet.id) + changeSet.full.foreach(_.begin) + participants + changeSet status = TransactionStatus.Active } - def precommit(server: GenericServerContainer) = synchronized { + def precommit(changeSet: ChangeSet) = synchronized { if (status == TransactionStatus.Active) { - log.debug("Pre-committing transaction [%s] for server [%s]", this, server.id) - precommitted + server + log.debug("Pre-committing transaction [%s] for server [%s]", this, changeSet.id) + precommitted + changeSet } } - def commit(server: GenericServerContainer) = synchronized { + def commit(changeSet: ChangeSet) = synchronized { if (status == TransactionStatus.Active) { - log.debug("Committing transaction [%s] for server [%s]", this, server.id) + log.debug("Committing transaction [%s] for server [%s]", this, changeSet.id) val haveAllPreCommitted = if (participants.size == precommitted.size) {{ for (server <- participants) yield { - if (precommitted.exists(_.id == server.id)) true + if (precommitted.exists(_.id == changeSet.id)) true else false }}.exists(_ == true) } else false if (haveAllPreCommitted) { - participants.foreach(_.transactionalItems.foreach(_.commit)) + participants.foreach(_.full.foreach(_.commit)) status = TransactionStatus.Completed - } - else rollback(server) + } else rollback(changeSet) } participants.clear precommitted.clear } - def rollback(server: GenericServerContainer) = synchronized { + def rollback(changeSet: ChangeSet) = synchronized { ensureIsActiveOrAborted - log.debug("Server [%s] has initiated transaction rollback for [%s], rolling back [%s]", server.id, this, participants) - participants.foreach(_.transactionalItems.foreach(_.rollback)) + log.debug("Server [%s] has initiated transaction rollback for [%s], rolling back [%s]", changeSet.id, this, participants) + participants.foreach(_.full.foreach(_.rollback)) status = TransactionStatus.Aborted } - def join(server: GenericServerContainer) = synchronized { + def join(changeSet: ChangeSet) = synchronized { ensureIsActive - log.debug("Server [%s] is joining transaction [%s]" , server.id, this) - server.transactionalItems.foreach(_.begin) - participants + server + log.debug("Server [%s] is joining transaction [%s]" , changeSet.id, this) + changeSet.full.foreach(_.begin) + participants + changeSet } def isNew = status == TransactionStatus.New diff --git a/kernel/src/main/scala/HashCode.scala b/kernel/src/main/scala/util/HashCode.scala similarity index 97% rename from kernel/src/main/scala/HashCode.scala rename to kernel/src/main/scala/util/HashCode.scala index 012d807a75..7d71376172 100755 --- a/kernel/src/main/scala/HashCode.scala +++ b/kernel/src/main/scala/util/HashCode.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009 Scalable Solutions. */ -package se.scalablesolutions.akka.kernel +package se.scalablesolutions.akka.kernel.util import java.lang.reflect.{Array => JArray} import java.lang.{Float => JFloat, Double => JDouble} diff --git a/kernel/src/main/scala/Helpers.scala b/kernel/src/main/scala/util/Helpers.scala similarity index 98% rename from kernel/src/main/scala/Helpers.scala rename to kernel/src/main/scala/util/Helpers.scala index 265cef0a93..85a81525d7 100644 --- a/kernel/src/main/scala/Helpers.scala +++ b/kernel/src/main/scala/util/Helpers.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009 Scalable Solutions. */ -package se.scalablesolutions.akka.kernel +package se.scalablesolutions.akka.kernel.util import java.util.concurrent.locks.ReentrantReadWriteLock diff --git a/kernel/src/main/scala/Logging.scala b/kernel/src/main/scala/util/Logging.scala similarity index 97% rename from kernel/src/main/scala/Logging.scala rename to kernel/src/main/scala/util/Logging.scala index 1a818005c8..3440b60cdd 100755 --- a/kernel/src/main/scala/Logging.scala +++ b/kernel/src/main/scala/util/Logging.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009 Scalable Solutions. */ -package se.scalablesolutions.akka.kernel +package se.scalablesolutions.akka.kernel.util import java.util.logging.Level import net.lag.configgy.Config diff --git a/kernel/src/main/scala/ResultOrFailure.scala b/kernel/src/main/scala/util/ResultOrFailure.scala similarity index 91% rename from kernel/src/main/scala/ResultOrFailure.scala rename to kernel/src/main/scala/util/ResultOrFailure.scala index 5dea63840d..69d3f90b38 100644 --- a/kernel/src/main/scala/ResultOrFailure.scala +++ b/kernel/src/main/scala/util/ResultOrFailure.scala @@ -2,7 +2,9 @@ * Copyright (C) 2009 Scalable Solutions. */ -package se.scalablesolutions.akka.kernel +package se.scalablesolutions.akka.kernel.util + +import kernel.stm.Transaction /** * Reference that can hold either a typed value or an exception. diff --git a/kernel/src/main/scala/Serializer.scala b/kernel/src/main/scala/util/Serializer.scala similarity index 95% rename from kernel/src/main/scala/Serializer.scala rename to kernel/src/main/scala/util/Serializer.scala index 8edb1efcf2..c9a750c469 100755 --- a/kernel/src/main/scala/Serializer.scala +++ b/kernel/src/main/scala/util/Serializer.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009 Scalable Solutions. */ -package se.scalablesolutions.akka.kernel +package se.scalablesolutions.akka.kernel.util import java.io.{ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream} diff --git a/kernel/src/test/scala/ActorTest.scala b/kernel/src/test/scala/ActorTest.scala index e9310be715..ce7eb73212 100644 --- a/kernel/src/test/scala/ActorTest.scala +++ b/kernel/src/test/scala/ActorTest.scala @@ -45,7 +45,7 @@ class ActorTest { val actor = new TestActor actor.start val result = actor !? "Hello" - assertEquals("World", result.get.asInstanceOf[String]) + assertEquals("World", result.asInstanceOf[String]) actor.stop } diff --git a/kernel/src/test/scala/AllSuite.scala b/kernel/src/test/scala/AllSuite.scala index ddf582d654..9609ec8b06 100755 --- a/kernel/src/test/scala/AllSuite.scala +++ b/kernel/src/test/scala/AllSuite.scala @@ -12,11 +12,8 @@ import org.scalatest._ class AllSuite extends SuperSuite( List( - new SupervisorSpec, - new SupervisorStateSpec, - new GenericServerSpec, - new GenericServerContainerSpec -// new ActiveObjectSpec, + new SupervisorSpec + // new ActiveObjectSpec, // new RestManagerSpec ) ) diff --git a/kernel/src/test/scala/GenericServerContainerSuite.scala b/kernel/src/test/scala/GenericServerContainerSuite.scala deleted file mode 100755 index 38a49988eb..0000000000 --- a/kernel/src/test/scala/GenericServerContainerSuite.scala +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Copyright (C) 2009 Scalable Solutions. - */ - -package se.scalablesolutions.akka.kernel - -import scala.actors._ -import scala.actors.Actor._ - -import com.jteigen.scalatest.JUnit4Runner -import org.junit.runner.RunWith -import org.scalatest._ - -/** - * @author Jonas Bonér - */ -@RunWith(classOf[JUnit4Runner]) -class GenericServerContainerSpec extends Suite { - - var inner: GenericServerContainerActor = null - var server: GenericServerContainer = null - def createProxy(f: () => GenericServer) = { val server = new GenericServerContainer("server", f); server.setTimeout(100); server } - - def setup = { - inner = new GenericServerContainerActor - server = createProxy(() => inner) - server.newServer - server.start - } - - def testInit = { - setup - server.init("testInit") - Thread.sleep(100) - expect("initializing: testInit") { - inner.log - } - } - - def testTerminateWithReason = { - setup - server.terminate("testTerminateWithReason", 100) - Thread.sleep(100) - expect("terminating: testTerminateWithReason") { - inner.log - } - } - - def test_bang_1 = { - setup - server ! OneWay - Thread.sleep(100) - expect("got a oneway") { - inner.log - } - } - - def test_bang_2 = { - setup - server ! Ping - Thread.sleep(100) - expect("got a ping") { - inner.log - } - } - - def test_bangbangbang = { - setup - expect("pong") { - (server !!! Ping).getOrElse("nil") - } - expect("got a ping") { - inner.log - } - } - - def test_bangbangbang_Timeout1 = { - setup - expect("pong") { - (server !!! Ping).getOrElse("nil") - } - expect("got a ping") { - inner.log - } - } - - def test_bangbangbang_Timeout2 = { - setup - expect("error handler") { - server !!! (OneWay, "error handler") - } - expect("got a oneway") { - inner.log - } - } - - def testHotSwap = { - setup - // using base - expect("pong") { - (server !!! Ping).getOrElse("nil") - } - - // hotswapping - server.hotswap(Some({ - case Ping => reply("hotswapped pong") - })) - expect("hotswapped pong") { - (server !!! Ping).getOrElse("nil") - } - } - - def testDoubleHotSwap = { - setup - // using base - expect("pong") { - (server !!! Ping).getOrElse("nil") - } - - // hotswapping - server.hotswap(Some({ - case Ping => reply("hotswapped pong") - })) - expect("hotswapped pong") { - (server !!! Ping).getOrElse("nil") - } - - // hotswapping again - server.hotswap(Some({ - case Ping => reply("hotswapped pong again") - })) - expect("hotswapped pong again") { - (server !!! Ping).getOrElse("nil") - } - } - - - def testHotSwapReturnToBase = { - setup - // using base - expect("pong") { - (server !!! Ping).getOrElse("nil") - } - - // hotswapping - server.hotswap(Some({ - case Ping => reply("hotswapped pong") - })) - expect("hotswapped pong") { - (server !!! Ping).getOrElse("nil") - } - - // restoring original base - server.hotswap(None) - expect("pong") { - (server !!! Ping).getOrElse("nil") - } - } -} - - -class GenericServerContainerActor extends GenericServer { - var log = "" - - override def body: PartialFunction[Any, Unit] = { - case Ping => - log = "got a ping" - reply("pong") - - case OneWay => - log = "got a oneway" - } - - override def init(config: AnyRef) = log = "initializing: " + config - override def shutdown(reason: AnyRef) = log = "terminating: " + reason -} - - diff --git a/kernel/src/test/scala/GenericServerSpec.scala b/kernel/src/test/scala/GenericServerSpec.scala deleted file mode 100755 index eb0affae46..0000000000 --- a/kernel/src/test/scala/GenericServerSpec.scala +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Copyright (C) 2009 Scalable Solutions. - */ - -package se.scalablesolutions.akka.kernel - -import com.jteigen.scalatest.JUnit4Runner -import org.junit.runner.RunWith -import org.scalatest._ - -import scala.actors.Actor._ - -/** - * @author Jonas Bonér - */ -@RunWith(classOf[JUnit4Runner]) -class GenericServerSpec extends Suite { - - def testSendRegularMessage = { - val server = new MyGenericServerActor - server.start - server !? Ping match { - case reply: String => - assert("got a ping" === server.log) - assert("pong" === reply) - case _ => fail() - } - } -} - -class MyGenericServerActor extends GenericServer { - var log: String = "" - - override def body: PartialFunction[Any, Unit] = { - case Ping => - log = "got a ping" - reply("pong") - } -} - diff --git a/kernel/src/test/scala/SupervisorSpec.scala b/kernel/src/test/scala/SupervisorSpec.scala old mode 100755 new mode 100644 index 3c7fadc128..30bf162e0e --- a/kernel/src/test/scala/SupervisorSpec.scala +++ b/kernel/src/test/scala/SupervisorSpec.scala @@ -4,9 +4,9 @@ package se.scalablesolutions.akka.kernel -import se.scalablesolutions.akka.kernel.config.ScalaConfig.{SupervisorConfig, Worker, LifeCycle, RestartStrategy, OneForOne, AllForOne, Permanent} -import scala.actors._ -import scala.actors.Actor._ +import kernel.actor.{Supervisor, SupervisorFactory, Actor, StartSupervisor} +import kernel.config.ScalaConfig._ + import scala.collection.Map import scala.collection.mutable.HashMap @@ -21,52 +21,30 @@ import org.scalatest._ class SupervisorSpec extends Suite { var messageLog: String = "" - val pingpong1 = new GenericServerContainer("pingpong1", () => new PingPong1Actor) - val pingpong2 = new GenericServerContainer("pingpong2", () => new PingPong2Actor) - val pingpong3 = new GenericServerContainer("pingpong3", () => new PingPong3Actor) - - pingpong1.setTimeout(100) - pingpong2.setTimeout(100) - pingpong3.setTimeout(100) + var pingpong1: PingPong1Actor = _ + var pingpong2: PingPong2Actor = _ + var pingpong3: PingPong3Actor = _ def testStartServer = { messageLog = "" val sup = getSingleActorAllForOneSupervisor - sup ! Start + sup ! StartSupervisor expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") - } - } - - def testGetServer = { - messageLog = "" - val sup = getSingleActorAllForOneSupervisor - sup ! Start - val server = sup.getServerOrElse("pingpong1", throw new RuntimeException("server not found")) - assert(server.isInstanceOf[GenericServerContainer]) - assert(server === pingpong1) - } - - def testGetServerOrFail = { - messageLog = "" - val sup = getSingleActorAllForOneSupervisor - sup ! Start - intercept(classOf[RuntimeException]) { - sup.getServerOrElse("wrong_name", throw new RuntimeException("server not found")) + (pingpong1 !! Ping).getOrElse("nil") } } def testKillSingleActorOneForOne = { messageLog = "" val sup = getSingleActorOneForOneSupervisor - sup ! Start + sup ! StartSupervisor intercept(classOf[RuntimeException]) { - pingpong1 !!! (Die, throw new RuntimeException("TIME OUT")) + pingpong1 !! Die } Thread.sleep(100) - expect("oneforone") { + expect("DIE") { messageLog } } @@ -74,27 +52,27 @@ class SupervisorSpec extends Suite { def testCallKillCallSingleActorOneForOne = { messageLog = "" val sup = getSingleActorOneForOneSupervisor - sup ! Start + sup ! StartSupervisor expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") + (pingpong1 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("ping") { messageLog } intercept(classOf[RuntimeException]) { - pingpong1 !!! (Die, throw new RuntimeException("TIME OUT")) + pingpong1 !! Die } Thread.sleep(100) - expect("pingoneforone") { + expect("pingDIE") { messageLog } expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") + (pingpong1 !! Ping).getOrElse("nil") } Thread.sleep(100) - expect("pingoneforoneping") { + expect("pingDIEping") { messageLog } } @@ -102,12 +80,12 @@ class SupervisorSpec extends Suite { def testKillSingleActorAllForOne = { messageLog = "" val sup = getSingleActorAllForOneSupervisor - sup ! Start + sup ! StartSupervisor intercept(classOf[RuntimeException]) { - pingpong1 !!! (Die, throw new RuntimeException("TIME OUT")) + pingpong1 !! Die } Thread.sleep(100) - expect("allforone") { + expect("DIE") { messageLog } } @@ -115,26 +93,28 @@ class SupervisorSpec extends Suite { def testCallKillCallSingleActorAllForOne = { messageLog = "" val sup = getSingleActorAllForOneSupervisor - sup ! Start + pingpong1.timeout = 10000000 + sup.timeout = 10000000 + sup ! StartSupervisor expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") + (pingpong1 !! Ping).getOrElse("nil") } - Thread.sleep(100) + Thread.sleep(500) expect("ping") { messageLog } intercept(classOf[RuntimeException]) { - pingpong1 !!! (Die, throw new RuntimeException("TIME OUT")) + pingpong1 !! Die } - Thread.sleep(100) - expect("pingallforone") { + Thread.sleep(1100) + expect("pingDIE") { messageLog } expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") + (pingpong1 !! Ping).getOrElse("nil") } - Thread.sleep(100) - expect("pingallforoneping") { + Thread.sleep(500) + expect("pingDIEping") { messageLog } } @@ -142,12 +122,12 @@ class SupervisorSpec extends Suite { def testKillMultipleActorsOneForOne = { messageLog = "" val sup = getMultipleActorsOneForOneConf - sup ! Start + sup ! StartSupervisor intercept(classOf[RuntimeException]) { - pingpong3 !!! (Die, throw new RuntimeException("TIME OUT")) + pingpong3 !! Die } Thread.sleep(100) - expect("oneforone") { + expect("DIE") { messageLog } } @@ -155,42 +135,42 @@ class SupervisorSpec extends Suite { def tesCallKillCallMultipleActorsOneForOne = { messageLog = "" val sup = getMultipleActorsOneForOneConf - sup ! Start + sup ! StartSupervisor expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") + (pingpong1 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pong") { - (pingpong2 !!! Ping).getOrElse("nil") + (pingpong2 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pong") { - (pingpong3 !!! Ping).getOrElse("nil") + (pingpong3 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pingpingping") { messageLog } intercept(classOf[RuntimeException]) { - pingpong2 !!! (Die, throw new RuntimeException("TIME OUT")) + pingpong2 !! Die } Thread.sleep(100) - expect("pingpingpingoneforone") { + expect("pingpingpingDIE") { messageLog } expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") + (pingpong1 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pong") { - (pingpong2 !!! Ping).getOrElse("nil") + (pingpong2 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pong") { - (pingpong3 !!! Ping).getOrElse("nil") + (pingpong3 !! Ping).getOrElse("nil") } Thread.sleep(100) - expect("pingpingpingoneforonepingpingping") { + expect("pingpingpingDIEpingpingping") { messageLog } } @@ -198,12 +178,12 @@ class SupervisorSpec extends Suite { def testKillMultipleActorsAllForOne = { messageLog = "" val sup = getMultipleActorsAllForOneConf - sup ! Start + sup ! StartSupervisor intercept(classOf[RuntimeException]) { - pingpong2 !!! (Die, throw new RuntimeException("TIME OUT")) + pingpong2 !! Die } Thread.sleep(100) - expect("allforoneallforoneallforone") { + expect("DIEDIEDIE") { messageLog } } @@ -211,60 +191,61 @@ class SupervisorSpec extends Suite { def tesCallKillCallMultipleActorsAllForOne = { messageLog = "" val sup = getMultipleActorsAllForOneConf - sup ! Start + sup ! StartSupervisor expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") + (pingpong1 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pong") { - (pingpong2 !!! Ping).getOrElse("nil") + (pingpong2 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pong") { - (pingpong3 !!! Ping).getOrElse("nil") + (pingpong3 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pingpingping") { messageLog } intercept(classOf[RuntimeException]) { - pingpong2 !!! (Die, throw new RuntimeException("TIME OUT")) + pingpong2 !! Die } Thread.sleep(100) - expect("pingpingpingallforoneallforoneallforone") { + expect("pingpingpingDIEDIEDIE") { messageLog } expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") + (pingpong1 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pong") { - (pingpong2 !!! Ping).getOrElse("nil") + (pingpong2 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pong") { - (pingpong3 !!! Ping).getOrElse("nil") + (pingpong3 !! Ping).getOrElse("nil") } Thread.sleep(100) - expect("pingpingpingallforoneallforoneallforonepingpingping") { + expect("pingpingpingDIEDIEDIEpingpingping") { messageLog } } - def testTerminateFirstLevelActorAllForOne = { + /* + def testNestedSupervisorsTerminateFirstLevelActorAllForOne = { messageLog = "" val sup = getNestedSupervisorsAllForOneConf - sup ! Start + sup ! StartSupervisor intercept(classOf[RuntimeException]) { - pingpong1 !!! (Die, throw new RuntimeException("TIME OUT")) + pingpong1 !! Die } Thread.sleep(100) - expect("allforoneallforoneallforone") { + expect("DIEDIEDIE") { messageLog } } - - +*/ + // ============================================= // Creat some supervisors with different configurations @@ -276,7 +257,9 @@ class SupervisorSpec extends Suite { // Then create a concrete container in which we mix in support for the specific // implementation of the Actors we want to use. - object factory extends TestSupervisorFactory { + pingpong1 = new PingPong1Actor + + object factory extends SupervisorFactory { override def getSupervisorConfig: SupervisorConfig = { SupervisorConfig( RestartStrategy(AllForOne, 3, 100), @@ -290,7 +273,9 @@ class SupervisorSpec extends Suite { } def getSingleActorOneForOneSupervisor: Supervisor = { - object factory extends TestSupervisorFactory { + pingpong1 = new PingPong1Actor + + object factory extends SupervisorFactory { override def getSupervisorConfig: SupervisorConfig = { SupervisorConfig( RestartStrategy(OneForOne, 3, 100), @@ -304,7 +289,11 @@ class SupervisorSpec extends Suite { } def getMultipleActorsAllForOneConf: Supervisor = { - object factory extends TestSupervisorFactory { + pingpong1 = new PingPong1Actor + pingpong2 = new PingPong2Actor + pingpong3 = new PingPong3Actor + + object factory extends SupervisorFactory { override def getSupervisorConfig: SupervisorConfig = { SupervisorConfig( RestartStrategy(AllForOne, 3, 100), @@ -326,7 +315,11 @@ class SupervisorSpec extends Suite { } def getMultipleActorsOneForOneConf: Supervisor = { - object factory extends TestSupervisorFactory { + pingpong1 = new PingPong1Actor + pingpong2 = new PingPong2Actor + pingpong3 = new PingPong3Actor + + object factory extends SupervisorFactory { override def getSupervisorConfig: SupervisorConfig = { SupervisorConfig( RestartStrategy(OneForOne, 3, 100), @@ -348,7 +341,11 @@ class SupervisorSpec extends Suite { } def getNestedSupervisorsAllForOneConf: Supervisor = { - object factory extends TestSupervisorFactory { + pingpong1 = new PingPong1Actor + pingpong2 = new PingPong2Actor + pingpong3 = new PingPong3Actor + + object factory extends SupervisorFactory { override def getSupervisorConfig: SupervisorConfig = { SupervisorConfig( RestartStrategy(AllForOne, 3, 100), @@ -372,46 +369,57 @@ class SupervisorSpec extends Suite { factory.newSupervisor } - class PingPong1Actor extends GenericServer { - override def body: PartialFunction[Any, Unit] = { + class PingPong1Actor extends Actor { + override def receive: PartialFunction[Any, Unit] = { case Ping => messageLog += "ping" reply("pong") + case Die => - throw new RuntimeException("Recieved Die message") + throw new RuntimeException("DIE") + } + override protected def postRestart(reason: AnyRef, config: Option[AnyRef]) { + messageLog += reason.asInstanceOf[Exception].getMessage } } - class PingPong2Actor extends GenericServer { - override def body: PartialFunction[Any, Unit] = { + class PingPong2Actor extends Actor { + override def receive: PartialFunction[Any, Unit] = { case Ping => messageLog += "ping" reply("pong") case Die => - throw new RuntimeException("Recieved Die message") + throw new RuntimeException("DIE") + } + override protected def postRestart(reason: AnyRef, config: Option[AnyRef]) { + messageLog += reason.asInstanceOf[Exception].getMessage } } - class PingPong3Actor extends GenericServer { - override def body: PartialFunction[Any, Unit] = { + class PingPong3Actor extends Actor { + override def receive: PartialFunction[Any, Unit] = { case Ping => messageLog += "ping" reply("pong") case Die => - throw new RuntimeException("Recieved Die message") + throw new RuntimeException("DIE") + } + + override protected def postRestart(reason: AnyRef, config: Option[AnyRef]) { + messageLog += reason.asInstanceOf[Exception].getMessage } } // ============================================= - +/* class TestAllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends AllForOneStrategy(maxNrOfRetries, withinTimeRange) { - override def postRestart(serverContainer: GenericServerContainer) = { + override def postRestart(serverContainer: ActorContainer) = { messageLog += "allforone" } } class TestOneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends OneForOneStrategy(maxNrOfRetries, withinTimeRange) { - override def postRestart(serverContainer: GenericServerContainer) = { + override def postRestart(serverContainer: ActorContainer) = { messageLog += "oneforone" } } @@ -425,4 +433,5 @@ class SupervisorSpec extends Suite { } } } + */ } diff --git a/kernel/src/test/scala/SupervisorStateSpec.scala b/kernel/src/test/scala/SupervisorStateSpec.scala deleted file mode 100755 index b68b6c5daa..0000000000 --- a/kernel/src/test/scala/SupervisorStateSpec.scala +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Copyright (C) 2009 Scalable Solutions. - */ - -package se.scalablesolutions.akka.kernel - -import com.jteigen.scalatest.JUnit4Runner -import config.ScalaConfig.{SupervisorConfig, RestartStrategy, Worker, LifeCycle, AllForOne, Permanent} - - -import org.junit.runner.RunWith -import org.scalatest._ - -import scala.actors.Actor._ - -/** - * @author Jonas Bonér - */ -@RunWith(classOf[JUnit4Runner]) -class SupervisorStateSpec extends Suite { - - val dummyActor = new GenericServer { override def body: PartialFunction[Any, Unit] = { case _ => }} - val newDummyActor = () => dummyActor - var state: SupervisorState = _ - var proxy: GenericServerContainer = _ - var supervisor: Supervisor = _ - - def setup = { - proxy = new GenericServerContainer("server1", newDummyActor) - object factory extends SupervisorFactory { - override def getSupervisorConfig: SupervisorConfig = { - SupervisorConfig( - RestartStrategy(AllForOne, 3, 100), - Worker( - proxy, - LifeCycle(Permanent, 100)) - :: Nil) - } - } - supervisor = factory.newSupervisor - state = new SupervisorState(supervisor, new AllForOneStrategy(3, 100)) - } - - def testAddServer = { - setup - state.addServerContainer(proxy) - state.getServerContainer("server1") match { - case None => fail("should have returned server") - case Some(server) => - assert(server != null) - assert(server.isInstanceOf[GenericServerContainer]) - assert(proxy === server) - } - } - - def testGetServer = { - setup - state.addServerContainer(proxy) - state.getServerContainer("server1") match { - case None => fail("should have returned server") - case Some(server) => - assert(server != null) - assert(server.isInstanceOf[GenericServerContainer]) - assert(proxy === server) - } - } - - def testRemoveServer = { - setup - state.addServerContainer(proxy) - - state.removeServerContainer("server1") - state.getServerContainer("server1") match { - case Some(_) => fail("should have returned None") - case None => - } - state.getServerContainer("dummyActor") match { - case Some(_) => fail("should have returned None") - case None => - } - } - - def testGetNonExistingServerBySymbol = { - setup - state.getServerContainer("server2") match { - case Some(_) => fail("should have returned None") - case None => - } - } - - def testGetNonExistingServerByActor = { - setup - state.getServerContainer("dummyActor") match { - case Some(_) => fail("should have returned None") - case None => - } - } -} diff --git a/kernel/web.xml b/kernel/web.xml index 68a6bf82c5..21bf9fe1f7 100644 --- a/kernel/web.xml +++ b/kernel/web.xml @@ -2,7 +2,7 @@ +