From 7f60a4a93eae6c7e5e7923e70c0a4a1df30955e7 Mon Sep 17 00:00:00 2001 From: Jonas Boner Date: Wed, 3 Jun 2009 02:41:10 +0200 Subject: [PATCH] finished actor library together with tests --- akka.iws | 572 +++++++++++------- kernel/src/main/scala/ActiveObject.scala | 4 +- kernel/src/main/scala/GenericServer.scala | 174 ++++-- kernel/src/main/scala/Supervisor.scala | 91 +-- ...tcher.scala => EventBasedDispatcher.scala} | 27 +- kernel/src/main/scala/reactor/Future.scala | 2 +- kernel/src/main/scala/reactor/Reactor.scala | 22 +- .../scala/reactor/ThreadBasedDispatcher.scala | 13 +- .../scala/GenericServerContainerSuite.scala | 22 +- kernel/src/test/scala/SupervisorSpec.scala | 52 +- 10 files changed, 583 insertions(+), 396 deletions(-) rename kernel/src/main/scala/reactor/{EventDrivenDispatcher.scala => EventBasedDispatcher.scala} (63%) diff --git a/akka.iws b/akka.iws index d20a6273d0..0654de0c36 100644 --- a/akka.iws +++ b/akka.iws @@ -2,12 +2,15 @@ - - - - + + + + + + + - + @@ -27,6 +30,112 @@ + + + + + + + + + + + + + + + + + + @@ -344,7 +387,7 @@ - + @@ -623,7 +666,67 @@ - - + + @@ -686,18 +791,18 @@ - - + + - + - + - + @@ -748,107 +853,114 @@ - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/kernel/src/main/scala/ActiveObject.scala b/kernel/src/main/scala/ActiveObject.scala index 73da2bcd0b..311d5fdc12 100755 --- a/kernel/src/main/scala/ActiveObject.scala +++ b/kernel/src/main/scala/ActiveObject.scala @@ -163,7 +163,7 @@ sealed class TransactionalAroundAdvice(target: Class[_], private def sendOneWay(joinpoint: JoinPoint) = server ! Invocation(joinpoint, activeTx) private def sendAndReceiveEventually(joinpoint: JoinPoint): ResultOrFailure[AnyRef] = { - server !!! (Invocation(joinpoint, activeTx), { + server !! (Invocation(joinpoint, activeTx), { var resultOrFailure = ResultOrFailure(activeTx) resultOrFailure() = throw new ActiveObjectInvocationTimeoutException("Invocation to active object [" + targetInstance.getClass.getName + "] timed out after " + server.timeout + " milliseconds") resultOrFailure @@ -228,7 +228,7 @@ private[kernel] class Dispatcher(val targetName: String) extends GenericServer { } case 'exit => - exit; reply() + exit /* case exchange: Exchange => println("=============> Exchange From Actor: " + exchange) diff --git a/kernel/src/main/scala/GenericServer.scala b/kernel/src/main/scala/GenericServer.scala index 16c17fa6da..3d579d689f 100644 --- a/kernel/src/main/scala/GenericServer.scala +++ b/kernel/src/main/scala/GenericServer.scala @@ -4,11 +4,12 @@ package se.scalablesolutions.akka.kernel -import scala.actors._ -import scala.actors.Actor._ +import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} -import se.scalablesolutions.akka.kernel.config.ScalaConfig._ -import se.scalablesolutions.akka.kernel.Helpers._ +import kernel.config.ScalaConfig._ +import kernel.Helpers._ +import kernel.reactor._ +import scala.collection.mutable.HashSet sealed abstract class GenericServerMessage case class Init(config: AnyRef) extends GenericServerMessage @@ -16,13 +17,21 @@ case class ReInit(config: AnyRef) extends GenericServerMessage case class Shutdown(reason: AnyRef) extends GenericServerMessage case class Terminate(reason: AnyRef) extends GenericServerMessage case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends GenericServerMessage +case class Killed(victim: GenericServer, reason: AnyRef) extends GenericServerMessage /** * Base trait for all user-defined servers. * * @author Jonas Bonér */ -trait GenericServer extends Actor { +trait GenericServer { + private val unit = TimeUnit.MILLISECONDS + private[kernel] var sender: GenericServerContainer = _ + private[kernel] var container: GenericServerContainer = _ + private[kernel] var mailbox: MessageQueue = _ + @volatile protected[this] var isRunning = true + protected var trapExit: Boolean = false + /** * Template method implementing the server logic. * To be implemented by subclassing server. @@ -39,39 +48,93 @@ trait GenericServer extends Actor { * } * */ - def body: PartialFunction[Any, Unit] + protected def body: PartialFunction[Any, Unit] /** * Callback method that is called during initialization. * To be implemented by subclassing server. */ - def init(config: AnyRef) {} + protected def init(config: AnyRef) {} /** * Callback method that is called during reinitialization after a server crash. * To be implemented by subclassing server. */ - def reinit(config: AnyRef) {} + protected def reinit(config: AnyRef) {} /** * Callback method that is called during termination. * To be implemented by subclassing server. */ - def shutdown(reason: AnyRef) {} + protected def shutdown(reason: AnyRef) {} - def act = loop { react { genericBase orElse base } } + protected def reply(message: AnyRef) = sender ! message + + private[kernel] def link(server: GenericServer) = container.link(server) - private def base: PartialFunction[Any, Unit] = hotswap getOrElse body + private[kernel] def unlink(server: GenericServer) = container.unlink(server) + + private[kernel] def !(message: AnyRef) = mailbox.put(new MessageHandle(this, message, new NullFutureResult)) + + private[kernel] def !![T](message: AnyRef)(implicit timeout: Long): Option[T] = { + val future = new GenericFutureResult(unit.toNanos(timeout)) + mailbox.put(new MessageHandle(this, message, future)) + future.await + val result = future.result + if (result == null) None + else Some(result.asInstanceOf[T]) + } + + private[kernel] def !?(message: AnyRef): AnyRef = { + val future = new GenericFutureResult(unit.toNanos(100000)) + mailbox.put(new MessageHandle(this, message, future)) + future.await + future.result.asInstanceOf[AnyRef] + } + + private[kernel] def start = {} //try { act } catch { case e: SuspendServerException => act } + + private[kernel] def shutdown(shutdownTime: Int, reason: AnyRef) { + //FIXME: how to implement? + } + + private[kernel] def handle(message: AnyRef, future: CompletableFutureResult) = { + react(message) { lifecycle orElse base } +/* + try { + val result = message.asInstanceOf[Invocation].joinpoint.proceed + future.completeWithResult(result) + } catch { + case e: Exception => future.completeWithException(e) + } +*/ + } + + //private[kernel] def act = while(isRunning) { react { lifecycle orElse base } } + + private[this] def base: PartialFunction[Any, Unit] = hotswap getOrElse body private var hotswap: Option[PartialFunction[Any, Unit]] = None - private val genericBase: PartialFunction[Any, Unit] = { + private val lifecycle: PartialFunction[Any, Unit] = { case Init(config) => init(config) case ReInit(config) => reinit(config) case HotSwap(code) => hotswap = code - case Shutdown(reason) => shutdown(reason); reply('success) - case Terminate(reason) => exit(reason) + case Terminate(reason) => shutdown(reason); exit } + + protected[this] def react(message: AnyRef)(f: PartialFunction[AnyRef, Unit]): Unit = { + // FIXME: loop over all elements, grab the first that matches + if (f.isDefinedAt(message)) { + // sender = message.sender.asInstanceOf[GenericServerContainer] + f(message) + } else { + throw new IllegalArgumentException("Message not defined: " + message) + } + //throw new SuspendServerException + } + + private[this] def exit = isRunning = false } /** @@ -83,17 +146,18 @@ trait GenericServer extends Actor { */ class GenericServerContainer( val id: String, - private[kernel] var serverFactory: () => GenericServer) extends Logging { + private[this] var serverFactory: () => GenericServer) extends Logging { require(id != null && id != "") - private[kernel] var lifeCycle: Option[LifeCycle] = None - private[kernel] val lock = new ReadWriteLock - private[kernel] val txItemsLock = new ReadWriteLock - private[kernel] val serializer = new JavaSerializationSerializer + private[this] val txItemsLock = new ReadWriteLock + private[this] val serializer = new JavaSerializationSerializer + private[this] val linkedServers = new HashSet[GenericServer] + private[kernel] var server: GenericServer = serverFactory() + private[this] var currentConfig: Option[AnyRef] = None - private var server: GenericServer = _ - private var currentConfig: Option[AnyRef] = None - private[kernel] var timeout = 5000 + private[kernel] val lock = new ReadWriteLock + private[kernel] var lifeCycle: Option[LifeCycle] = None + private[kernel] implicit var timeout = 5000L private[kernel] def transactionalItems: List[Transactional] = txItemsLock.withReadLock { _transactionalMaps ::: _transactionalVectors ::: _transactionalRefs @@ -126,15 +190,19 @@ class GenericServerContainer( _transactionalRefs } + def link(server: GenericServer) = lock.withWriteLock { linkedServers + server } + + def unlink(server: GenericServer) = lock.withWriteLock { linkedServers - server } + /** - * Sends a one way message to the server - alias for cast(message). + * Sends a one way message to the server. *

* Example: *

    *   server ! Message
    * 
*/ - def !(message: Any) = { + def !(message: AnyRef) = { require(server != null) lock.withReadLock { server ! message } } @@ -147,13 +215,12 @@ class GenericServerContainer( *

* Example: *

-   *   (server !!! Message).getOrElse(throw new RuntimeException("time out")
+   *   (server !! Message).getOrElse(throw new RuntimeException("time out")
    * 
*/ - def !!![T](message: Any): Option[T] = { + def !![T](message: AnyRef): Option[T] = { require(server != null) - val future: FutureWithTimeout[T] = lock.withReadLock { server !!! message } - future.receiveWithin(timeout) + lock.withReadLock { server !! message } } /** @@ -165,13 +232,16 @@ class GenericServerContainer( *

* Example: *

-   *   server !!! (Message, throw new RuntimeException("time out"))
+   *   server !! (Message, throw new RuntimeException("time out"))
    *   // OR
-   *   server !!! (Message, DefaultReturnValue)
+   *   server !! (Message, DefaultReturnValue)
    * 
*/ - def !!![T](message: Any, errorHandler: => T): T = !!!(message, errorHandler, timeout) - + def !![T](message: AnyRef, errorHandler: => T): T = { + require(server != null) + lock.withReadLock { (server !! message).getOrElse(errorHandler).asInstanceOf[T] } + } + /** * Sends a message to the server and gets a future back with the reply. *

@@ -181,18 +251,27 @@ class GenericServerContainer( *

* Example: *

-   *   server !!! (Message, throw new RuntimeException("time out"), 1000)
+   *   server !! (Message, throw new RuntimeException("time out"), 1000)
    *   // OR
-   *   server !!! (Message, DefaultReturnValue, 1000)
+   *   server !! (Message, DefaultReturnValue, 1000)
    * 
*/ - def !!![T](message: Any, errorHandler: => T, time: Int): T = { + def !![T](message: AnyRef, errorHandler: => T, time: Long): T = { require(server != null) - val future: FutureWithTimeout[T] = lock.withReadLock { server !!! message } - future.receiveWithin(time) match { - case None => errorHandler - case Some(reply) => reply - } + lock.withReadLock { (server.!!(message)(time)).getOrElse(errorHandler).asInstanceOf[T] } + } + + /** + * Sends a synchronous message to the server. + *

+ * Example: + *

+   *   val result = server !? Message
+   * 
+ */ + def !?(message: AnyRef) = { + require(server != null) + lock.withReadLock { server !? message } } /** @@ -231,16 +310,12 @@ class GenericServerContainer( */ def setTimeout(time: Int) = timeout = time - /** - * Returns the next message in the servers mailbox. - */ - def nextMessage = lock.withReadLock { server ? } - /** * Creates a new actor for the GenericServerContainer, and return the newly created actor. */ private[kernel] def newServer(): GenericServer = lock.withWriteLock { server = serverFactory() + server.container = this server } @@ -262,10 +337,7 @@ class GenericServerContainer( private[kernel] def terminate(reason: AnyRef, shutdownTime: Int) = lock.withReadLock { if (shutdownTime > 0) { log.debug("Waiting [%s milliseconds for the server to shut down before killing it.", shutdownTime) - server !? (shutdownTime, Shutdown(reason)) match { - case Some('success) => log.debug("Server [%s] has been shut down cleanly.", id) - case None => log.warning("Server [%s] was **not able** to complete shutdown cleanly within its configured shutdown time [%s]", id, shutdownTime) - } + server.shutdown(shutdownTime, Shutdown(reason)) } server ! Terminate(reason) } @@ -285,8 +357,12 @@ class GenericServerContainer( private[kernel] def swapServer(newServer: GenericServer) = lock.withWriteLock { server = newServer + server.container = this } override def toString(): String = "GenericServerContainer[" + server + "]" } +private[kernel] class SuspendServerException extends Throwable { + override def fillInStackTrace(): Throwable = this +} \ No newline at end of file diff --git a/kernel/src/main/scala/Supervisor.scala b/kernel/src/main/scala/Supervisor.scala index b7256a12c5..e965609dad 100644 --- a/kernel/src/main/scala/Supervisor.scala +++ b/kernel/src/main/scala/Supervisor.scala @@ -4,8 +4,7 @@ package se.scalablesolutions.akka.kernel -import scala.actors._ -import scala.actors.Actor._ +import kernel.reactor.{MessageQueue, GenericServerMessageHandler, EventBasedDispatcher} import scala.collection.mutable.HashMap import se.scalablesolutions.akka.kernel.Helpers._ @@ -68,7 +67,7 @@ abstract class SupervisorFactory extends Logging { supervisor.start supervisor !? Configure(config, this) match { case 'configSuccess => log.debug("Supervisor successfully configured") - case _ => log.error("Supervisor could not be configured") + case error => log.error("Supervisor could not be configured due to [" + error + "]") } supervisor } @@ -94,7 +93,15 @@ abstract class SupervisorFactory extends Logging { * * @author Jonas Bonér */ -class Supervisor(faultHandler: FaultHandlingStrategy) extends Actor with Logging { +class Supervisor(faultHandler: FaultHandlingStrategy) extends GenericServer with Logging { + trapExit = true + + private[kernel] val messageQueue = new MessageQueue + mailbox = messageQueue + + // FIXME: make dispatcher configurable + private[kernel] val messageDispatcher = new EventBasedDispatcher + messageDispatcher.dispatch(messageQueue) private val state = new SupervisorState(this, faultHandler) @@ -115,40 +122,37 @@ class Supervisor(faultHandler: FaultHandlingStrategy) extends Actor with Logging } } - def stop = Actor.self ! Stop - - def act = { - self.trapExit = true - loop { - react { - case Configure(config, factory) => - log.debug("Configuring supervisor:%s ", this) - configure(config, factory) - reply('configSuccess) + def stop = this ! Stop - case Start => - state.serverContainers.foreach { serverContainer => + override def body: PartialFunction[Any, Unit] = { + case Configure(config, factory) => + log.debug("Configuring supervisor: %s ", this) + configure(config, factory) + reply('configSuccess) + + case Start => + state.serverContainers.foreach { + serverContainer => serverContainer.start log.info("Starting server: %s", serverContainer.getServer) - } - - case Stop => - state.serverContainers.foreach { serverContainer => - serverContainer.terminate('normal) - log.info("Stopping ser-ver: %s", serverContainer) - } - log.info("Stopping supervisor: %s", this) - exit('normal) - - case Exit(failedServer, reason) => - reason match { - case 'forced => {} // do nothing - case _ => state.faultHandler.handleFailure(state, failedServer, reason) - } - - case unexpected => log.warning("Unexpected message [%s] from [%s] ignoring...", unexpected, sender) } - } + + case Stop => + state.serverContainers.foreach { + serverContainer => + serverContainer.terminate('normal) + log.info("Stopping server: %s", serverContainer) + } + log.info("Stopping supervisor: %s", this) + exit + + case Killed(failedServer, reason) => + reason match { + case 'forced => {} // do nothing + case _ => state.faultHandler.handleFailure(state, failedServer, reason) + } + + case unexpected => log.warning("Unexpected message [%s] from [%s] ignoring...", unexpected, sender) } private def configure(config: SupervisorConfig, factory: SupervisorFactory) = config match { @@ -167,10 +171,13 @@ class Supervisor(faultHandler: FaultHandlingStrategy) extends Actor with Logging } private[kernel] def spawnLink(serverContainer: GenericServerContainer): GenericServer = { + println("----------------- SPAWN LINK") val newServer = serverContainer.newServer() + messageDispatcher.registerHandler(serverContainer.id, new GenericServerMessageHandler(newServer)); + newServer.mailbox = messageQueue newServer.start - self.link(newServer) - log.debug("Linking actor [%s] to supervisor [%s]", newServer, this) + link(newServer) + log.debug("Linking server [%s] to supervisor [%s]", newServer, this) state.addServerContainer(serverContainer) newServer } @@ -186,7 +193,7 @@ abstract class FaultHandlingStrategy(val maxNrOfRetries: Int, val withinTimeRang private var nrOfRetries = 0 private var retryStartTime = currentTime - private[kernel] def handleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = { + private[kernel] def handleFailure(state: SupervisorState, failedServer: GenericServer, reason: AnyRef) = { nrOfRetries += 1 if (timeRangeHasExpired) { if (hasReachedMaximumNrOfRetries) { @@ -206,7 +213,7 @@ abstract class FaultHandlingStrategy(val maxNrOfRetries: Int, val withinTimeRang // TODO: this is the place to fail-over all pending messages in the failing actor's mailbox, if possible to get a hold of them // e.g. something like 'serverContainer.getServer.getPendingMessages.map(newServer ! _)' - self.unlink(serverContainer.getServer) + supervisor.unlink(serverContainer.getServer) serverContainer.lifeCycle match { case None => throw new IllegalStateException("Server [" + serverContainer.id + "] does not have a life-cycle defined.") @@ -237,7 +244,7 @@ abstract class FaultHandlingStrategy(val maxNrOfRetries: Int, val withinTimeRang /** * To be overriden by concrete strategies. */ - protected def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) + protected def doHandleFailure(state: SupervisorState, failedServer: GenericServer, reason: AnyRef) /** * To be overriden by concrete strategies. @@ -262,10 +269,10 @@ abstract class FaultHandlingStrategy(val maxNrOfRetries: Int, val withinTimeRang */ class AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends FaultHandlingStrategy(maxNrOfRetries, withinTimeRange) { - override def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = { + override def doHandleFailure(state: SupervisorState, failedServer: GenericServer, reason: AnyRef) = { log.error("Server [%s] has failed due to [%s] - scheduling restart - scheme: ALL_FOR_ONE.", failedServer, reason) for (serverContainer <- state.serverContainers) restart(serverContainer, reason, state) - state.supervisors.foreach(_ ! Exit(failedServer, reason)) + state.supervisors.foreach(_ ! Killed(failedServer, reason)) } } @@ -277,7 +284,7 @@ extends FaultHandlingStrategy(maxNrOfRetries, withinTimeRange) { */ class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends FaultHandlingStrategy(maxNrOfRetries, withinTimeRange) { - override def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = { + override def doHandleFailure(state: SupervisorState, failedServer: GenericServer, reason: AnyRef) = { log.error("Server [%s] has failed due to [%s] - scheduling restart - scheme: ONE_FOR_ONE.", failedServer, reason) var serverContainer: Option[GenericServerContainer] = None state.serverContainers.foreach { diff --git a/kernel/src/main/scala/reactor/EventDrivenDispatcher.scala b/kernel/src/main/scala/reactor/EventBasedDispatcher.scala similarity index 63% rename from kernel/src/main/scala/reactor/EventDrivenDispatcher.scala rename to kernel/src/main/scala/reactor/EventBasedDispatcher.scala index 000d795a74..011697f959 100644 --- a/kernel/src/main/scala/reactor/EventDrivenDispatcher.scala +++ b/kernel/src/main/scala/reactor/EventBasedDispatcher.scala @@ -11,13 +11,12 @@ package se.scalablesolutions.akka.kernel.reactor import java.util.concurrent.{ConcurrentMap, ConcurrentHashMap} -import java.util.{LinkedList, Queue} +import java.util.{LinkedList, Queue} -class EventDrivenDispatcher extends MessageDispatcher { +class EventBasedDispatcher extends MessageDispatcher { private val handlers = new ConcurrentHashMap[AnyRef, MessageHandler] private var selectorThread: Thread = _ @volatile private var active: Boolean = false - private val guard = new Object def registerHandler(key: AnyRef, handler: MessageHandler) = handlers.put(key, handler) @@ -25,17 +24,15 @@ class EventDrivenDispatcher extends MessageDispatcher { def dispatch(messageQueue: MessageQueue) = if (!active) { active = true - val messageDemultiplexer = new EventDrivenDemultiplexer(messageQueue) + val messageDemultiplexer = new EventBasedDemultiplexer(messageQueue) selectorThread = new Thread { override def run = { while (active) { - guard.synchronized { /* empty */ } messageDemultiplexer.select - val handles = messageDemultiplexer.acquireSelectedQueue - val handlesList = handles.toArray.toList.asInstanceOf[List[MessageHandle]] - for (index <- 0 to handles.size) { - val handle = handles.remove - val handler = handlers.get(handle.key) + val queue = messageDemultiplexer.acquireSelectedQueue + for (index <- 0 to queue.size) { + val handle = queue.remove + val handler = handlers.get(handle.sender) if (handler != null) handler.handle(handle) } } @@ -50,18 +47,14 @@ class EventDrivenDispatcher extends MessageDispatcher { } } -class EventDrivenDemultiplexer(private val messageQueue: MessageQueue) extends MessageDemultiplexer { +class EventBasedDemultiplexer(private val messageQueue: MessageQueue) extends MessageDemultiplexer { private val selectedQueue: Queue[MessageHandle] = new LinkedList[MessageHandle] def select = messageQueue.read(selectedQueue) def acquireSelectedQueue: Queue[MessageHandle] = selectedQueue - def releaseSelectedQueue = { - throw new UnsupportedOperationException - } + def releaseSelectedQueue = throw new UnsupportedOperationException("EventBasedDemultiplexer can't release its queue since it is always alive and kicking") - def wakeUp = { - throw new UnsupportedOperationException - } + def wakeUp = throw new UnsupportedOperationException("EventBasedDemultiplexer can't be woken up isince always alive and kicking") } diff --git a/kernel/src/main/scala/reactor/Future.scala b/kernel/src/main/scala/reactor/Future.scala index f664bae7c7..4cc7027f0c 100644 --- a/kernel/src/main/scala/reactor/Future.scala +++ b/kernel/src/main/scala/reactor/Future.scala @@ -25,6 +25,7 @@ trait CompletableFutureResult extends FutureResult { } class GenericFutureResult(val timeoutInNanos: Long) extends CompletableFutureResult { + private val _startTimeInNanos = currentTimeInNanos private val _lock = new ReentrantLock private val _signal = _lock.newCondition @@ -82,7 +83,6 @@ class GenericFutureResult(val timeoutInNanos: Long) extends CompletableFutureRes _completed = true _result = result } - } finally { _signal.signalAll _lock.unlock diff --git a/kernel/src/main/scala/reactor/Reactor.scala b/kernel/src/main/scala/reactor/Reactor.scala index 399404e85d..9b66e080ce 100644 --- a/kernel/src/main/scala/reactor/Reactor.scala +++ b/kernel/src/main/scala/reactor/Reactor.scala @@ -26,11 +26,11 @@ trait MessageDemultiplexer { def wakeUp } -class MessageHandle(val key: AnyRef, val message: AnyRef, val future: CompletableFutureResult) { +class MessageHandle(val sender: AnyRef, val message: AnyRef, val future: CompletableFutureResult) { override def hashCode(): Int = { var result = HashCode.SEED - result = HashCode.hash(result, key) + result = HashCode.hash(result, sender) result = HashCode.hash(result, message) result = HashCode.hash(result, future) result @@ -39,7 +39,7 @@ class MessageHandle(val key: AnyRef, val message: AnyRef, val future: Completabl override def equals(that: Any): Boolean = that != null && that.isInstanceOf[MessageHandle] && - that.asInstanceOf[MessageHandle].key == key && + that.asInstanceOf[MessageHandle].sender == sender && that.asInstanceOf[MessageHandle].message == message && that.asInstanceOf[MessageHandle].future == future } @@ -48,6 +48,9 @@ trait MessageHandler { def handle(message: MessageHandle) } +class GenericServerMessageHandler(val server: GenericServer) extends MessageHandler { + def handle(handle: MessageHandle) = server.handle(handle.message, handle.future) +} class MessageQueue { private val handles: Queue[MessageHandle] = new LinkedList[MessageHandle] @@ -59,16 +62,9 @@ class MessageQueue { } def read(destination: Queue[MessageHandle]) = handles.synchronized { - while (handles.isEmpty && !interrupted) { - handles.wait - } - if (!interrupted) { - while (!handles.isEmpty) { - destination.offer(handles.remove) - } - } else { - interrupted = false - } + while (handles.isEmpty && !interrupted) handles.wait + if (!interrupted) while (!handles.isEmpty) destination.offer(handles.remove) + else interrupted = false } def interrupt = handles.synchronized { diff --git a/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala b/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala index 64eb7806cb..7f0aa8d853 100644 --- a/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala +++ b/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala @@ -14,16 +14,18 @@ import java.util.concurrent.{ConcurrentHashMap, Executors} import java.util.concurrent.locks.ReentrantLock import java.util.{HashSet, LinkedList, Queue} +// FIXME: lift up common stuff in base class ThreadBasedDispatcher(val threadPoolSize: Int) extends MessageDispatcher { private val handlers = new ConcurrentHashMap[AnyRef, MessageHandler] private val busyHandlers = new HashSet[AnyRef] private val handlerExecutor = Executors.newFixedThreadPool(threadPoolSize) - @volatile private var selectorThread: Thread = null + private var selectorThread: Thread = null @volatile private var active: Boolean = false + private val guard = new Object - def registerHandler(key: AnyRef, handler: MessageHandler) = handlers.put(key, handler) + def registerHandler(key: AnyRef, handler: MessageHandler) = guard.synchronized { handlers.put(key, handler) } - def unregisterHandler(key: AnyRef) = handlers.remove(key) + def unregisterHandler(key: AnyRef) = guard.synchronized { handlers.remove(key) } def dispatch(messageQueue: MessageQueue) = { if (!active) { @@ -33,16 +35,17 @@ class ThreadBasedDispatcher(val threadPoolSize: Int) extends MessageDispatcher { override def run = { while (active) { try { + guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf] messageDemultiplexer.select val handles = messageDemultiplexer.acquireSelectedQueue for (index <- 0 to handles.size) { val handle = handles.peek - val handler = checkIfNotBusyThenGet(handle.key) + val handler = checkIfNotBusyThenGet(handle.sender) if (handler.isDefined) { handlerExecutor.execute(new Runnable { override def run = { handler.get.handle(handle) - free(handle.key) + free(handle.sender) messageDemultiplexer.wakeUp } }) diff --git a/kernel/src/test/scala/GenericServerContainerSuite.scala b/kernel/src/test/scala/GenericServerContainerSuite.scala index 38a49988eb..00da62bcfc 100755 --- a/kernel/src/test/scala/GenericServerContainerSuite.scala +++ b/kernel/src/test/scala/GenericServerContainerSuite.scala @@ -67,7 +67,7 @@ class GenericServerContainerSpec extends Suite { def test_bangbangbang = { setup expect("pong") { - (server !!! Ping).getOrElse("nil") + (server !! Ping).getOrElse("nil") } expect("got a ping") { inner.log @@ -77,7 +77,7 @@ class GenericServerContainerSpec extends Suite { def test_bangbangbang_Timeout1 = { setup expect("pong") { - (server !!! Ping).getOrElse("nil") + (server !! Ping).getOrElse("nil") } expect("got a ping") { inner.log @@ -87,7 +87,7 @@ class GenericServerContainerSpec extends Suite { def test_bangbangbang_Timeout2 = { setup expect("error handler") { - server !!! (OneWay, "error handler") + server !! (OneWay, "error handler") } expect("got a oneway") { inner.log @@ -98,7 +98,7 @@ class GenericServerContainerSpec extends Suite { setup // using base expect("pong") { - (server !!! Ping).getOrElse("nil") + (server !! Ping).getOrElse("nil") } // hotswapping @@ -106,7 +106,7 @@ class GenericServerContainerSpec extends Suite { case Ping => reply("hotswapped pong") })) expect("hotswapped pong") { - (server !!! Ping).getOrElse("nil") + (server !! Ping).getOrElse("nil") } } @@ -114,7 +114,7 @@ class GenericServerContainerSpec extends Suite { setup // using base expect("pong") { - (server !!! Ping).getOrElse("nil") + (server !! Ping).getOrElse("nil") } // hotswapping @@ -122,7 +122,7 @@ class GenericServerContainerSpec extends Suite { case Ping => reply("hotswapped pong") })) expect("hotswapped pong") { - (server !!! Ping).getOrElse("nil") + (server !! Ping).getOrElse("nil") } // hotswapping again @@ -130,7 +130,7 @@ class GenericServerContainerSpec extends Suite { case Ping => reply("hotswapped pong again") })) expect("hotswapped pong again") { - (server !!! Ping).getOrElse("nil") + (server !! Ping).getOrElse("nil") } } @@ -139,7 +139,7 @@ class GenericServerContainerSpec extends Suite { setup // using base expect("pong") { - (server !!! Ping).getOrElse("nil") + (server !! Ping).getOrElse("nil") } // hotswapping @@ -147,13 +147,13 @@ class GenericServerContainerSpec extends Suite { case Ping => reply("hotswapped pong") })) expect("hotswapped pong") { - (server !!! Ping).getOrElse("nil") + (server !! Ping).getOrElse("nil") } // restoring original base server.hotswap(None) expect("pong") { - (server !!! Ping).getOrElse("nil") + (server !! Ping).getOrElse("nil") } } } diff --git a/kernel/src/test/scala/SupervisorSpec.scala b/kernel/src/test/scala/SupervisorSpec.scala index 529e24a300..f7845993ec 100755 --- a/kernel/src/test/scala/SupervisorSpec.scala +++ b/kernel/src/test/scala/SupervisorSpec.scala @@ -35,7 +35,7 @@ class SupervisorSpec extends Suite { sup ! Start expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") + (pingpong1 !! Ping).getOrElse("nil") } } @@ -63,7 +63,7 @@ class SupervisorSpec extends Suite { sup ! Start intercept(classOf[RuntimeException]) { - pingpong1 !!! (Die, throw new RuntimeException("TIME OUT")) + pingpong1 !! (Die, throw new RuntimeException("TIME OUT")) } Thread.sleep(100) expect("oneforone") { @@ -77,21 +77,21 @@ class SupervisorSpec extends Suite { sup ! Start expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") + (pingpong1 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("ping") { messageLog } intercept(classOf[RuntimeException]) { - pingpong1 !!! (Die, throw new RuntimeException("TIME OUT")) + pingpong1 !! (Die, throw new RuntimeException("TIME OUT")) } Thread.sleep(100) expect("pingoneforone") { messageLog } expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") + (pingpong1 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pingoneforoneping") { @@ -104,7 +104,7 @@ class SupervisorSpec extends Suite { val sup = getSingleActorAllForOneSupervisor sup ! Start intercept(classOf[RuntimeException]) { - pingpong1 !!! (Die, throw new RuntimeException("TIME OUT")) + pingpong1 !! (Die, throw new RuntimeException("TIME OUT")) } Thread.sleep(100) expect("allforone") { @@ -117,21 +117,21 @@ class SupervisorSpec extends Suite { val sup = getSingleActorAllForOneSupervisor sup ! Start expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") + (pingpong1 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("ping") { messageLog } intercept(classOf[RuntimeException]) { - pingpong1 !!! (Die, throw new RuntimeException("TIME OUT")) + pingpong1 !! (Die, throw new RuntimeException("TIME OUT")) } Thread.sleep(100) expect("pingallforone") { messageLog } expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") + (pingpong1 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pingallforoneping") { @@ -144,7 +144,7 @@ class SupervisorSpec extends Suite { val sup = getMultipleActorsOneForOneConf sup ! Start intercept(classOf[RuntimeException]) { - pingpong3 !!! (Die, throw new RuntimeException("TIME OUT")) + pingpong3 !! (Die, throw new RuntimeException("TIME OUT")) } Thread.sleep(100) expect("oneforone") { @@ -157,37 +157,37 @@ class SupervisorSpec extends Suite { val sup = getMultipleActorsOneForOneConf sup ! Start expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") + (pingpong1 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pong") { - (pingpong2 !!! Ping).getOrElse("nil") + (pingpong2 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pong") { - (pingpong3 !!! Ping).getOrElse("nil") + (pingpong3 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pingpingping") { messageLog } intercept(classOf[RuntimeException]) { - pingpong2 !!! (Die, throw new RuntimeException("TIME OUT")) + pingpong2 !! (Die, throw new RuntimeException("TIME OUT")) } Thread.sleep(100) expect("pingpingpingoneforone") { messageLog } expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") + (pingpong1 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pong") { - (pingpong2 !!! Ping).getOrElse("nil") + (pingpong2 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pong") { - (pingpong3 !!! Ping).getOrElse("nil") + (pingpong3 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pingpingpingoneforonepingpingping") { @@ -200,7 +200,7 @@ class SupervisorSpec extends Suite { val sup = getMultipleActorsAllForOneConf sup ! Start intercept(classOf[RuntimeException]) { - pingpong2 !!! (Die, throw new RuntimeException("TIME OUT")) + pingpong2 !! (Die, throw new RuntimeException("TIME OUT")) } Thread.sleep(100) expect("allforoneallforoneallforone") { @@ -213,37 +213,37 @@ class SupervisorSpec extends Suite { val sup = getMultipleActorsAllForOneConf sup ! Start expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") + (pingpong1 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pong") { - (pingpong2 !!! Ping).getOrElse("nil") + (pingpong2 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pong") { - (pingpong3 !!! Ping).getOrElse("nil") + (pingpong3 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pingpingping") { messageLog } intercept(classOf[RuntimeException]) { - pingpong2 !!! (Die, throw new RuntimeException("TIME OUT")) + pingpong2 !! (Die, throw new RuntimeException("TIME OUT")) } Thread.sleep(100) expect("pingpingpingallforoneallforoneallforone") { messageLog } expect("pong") { - (pingpong1 !!! Ping).getOrElse("nil") + (pingpong1 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pong") { - (pingpong2 !!! Ping).getOrElse("nil") + (pingpong2 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pong") { - (pingpong3 !!! Ping).getOrElse("nil") + (pingpong3 !! Ping).getOrElse("nil") } Thread.sleep(100) expect("pingpingpingallforoneallforoneallforonepingpingping") { @@ -256,7 +256,7 @@ class SupervisorSpec extends Suite { val sup = getNestedSupervisorsAllForOneConf sup ! Start intercept(classOf[RuntimeException]) { - pingpong1 !!! (Die, throw new RuntimeException("TIME OUT")) + pingpong1 !! (Die, throw new RuntimeException("TIME OUT")) } Thread.sleep(100) expect("allforoneallforoneallforone") {