diff --git a/akka.iml b/akka.iml index c9548a55ff..f9d299bab7 100644 --- a/akka.iml +++ b/akka.iml @@ -9,11 +9,11 @@ - + + + - - diff --git a/akka.ipr b/akka.ipr index baba1e69ea..8cb30124e6 100644 --- a/akka.ipr +++ b/akka.ipr @@ -122,6 +122,242 @@ @@ -499,17 +735,6 @@ - - - - - - - - - - - @@ -1170,6 +1395,17 @@ + + + + + + + + + + + diff --git a/akka.iws b/akka.iws index a38e49c491..ed8c6c5a84 100644 --- a/akka.iws +++ b/akka.iws @@ -6,32 +6,38 @@ - - - - - + + + + + - - + + + + + + + + + + + + + - - - - - - - - - + + + + - + @@ -47,6 +53,21 @@ + + + + + + + + + + + + + - + - + @@ -76,15 +86,6 @@ - - - - - - - - - diff --git a/kernel/src/main/scala/Kernel.scala b/kernel/src/main/scala/Kernel.scala index a79d197a6e..4999bcec13 100644 --- a/kernel/src/main/scala/Kernel.scala +++ b/kernel/src/main/scala/Kernel.scala @@ -23,6 +23,7 @@ import java.io.{File, IOException} import javax.ws.rs.core.UriBuilder import javax.management.JMException +import kernel.nio.{RemoteClient, RemoteServer} import kernel.state.CassandraNode import kernel.util.Logging @@ -48,8 +49,11 @@ object Kernel extends Logging { private[this] var storageServer: VoldemortServer = _ */ + private[this] var remoteServer: RemoteServer = _ + def main(args: Array[String]): Unit = { log.info("Starting Akka kernel...") + startRemoteService startCassandra cassandraBenchmark @@ -63,6 +67,21 @@ object Kernel extends Logging { //startVoldemort } + + private[akka] def startRemoteService = { + // FIXME manage remote serve thread for graceful shutdown + val remoteServerThread = new Thread(new Runnable() { + def run = { + val server = new RemoteServer + server.connect + } + }) + remoteServerThread.start + + Thread.sleep(1000) // wait for server to start up + RemoteClient.connect + } + private[akka] def startJersey: SelectorThread = { val initParams = new java.util.HashMap[String, String] initParams.put( diff --git a/kernel/src/main/scala/actor/ActiveObject.scala b/kernel/src/main/scala/actor/ActiveObject.scala index b437652c8a..a8911ba306 100644 --- a/kernel/src/main/scala/actor/ActiveObject.scala +++ b/kernel/src/main/scala/actor/ActiveObject.scala @@ -11,7 +11,7 @@ import java.lang.annotation.Annotation import kernel.config.ActiveObjectGuiceConfigurator import kernel.config.ScalaConfig._ import kernel.camel.{MessageDriven, ActiveObjectProducer} -import kernel.nio.{RemoteRequest, NettyClient} +import kernel.nio.{RemoteRequest, RemoteClient} import kernel.stm.{TransactionManagement, TransactionAwareWrapperException, ChangeSet, Transaction} import kernel.util.Helpers.ReadWriteLock @@ -43,13 +43,13 @@ object Annotations { * @author Jonas Bonér */ class ActiveObjectFactory { - def newInstance[T](target: Class[T]): T = ActiveObject.newInstance(target, new Dispatcher(target.getName), false) + def newInstance[T](target: Class[T], timeout: Long): T = ActiveObject.newInstance(target, new Dispatcher(target.getName), false, timeout) - def newInstance[T](intf: Class[T], target: AnyRef): T = ActiveObject.newInstance(intf, target, new Dispatcher(intf.getName), false) + def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T = ActiveObject.newInstance(intf, target, new Dispatcher(intf.getName), false, timeout) - def newRemoteInstance[T](target: Class[T]): T = ActiveObject.newInstance(target, new Dispatcher(target.getName), true) + def newRemoteInstance[T](target: Class[T], timeout: Long): T = ActiveObject.newInstance(target, new Dispatcher(target.getName), true, timeout) - def newRemoteInstance[T](intf: Class[T], target: AnyRef): T = ActiveObject.newInstance(intf, target, new Dispatcher(intf.getName), true) + def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T = ActiveObject.newInstance(intf, target, new Dispatcher(intf.getName), true, timeout) /* def newInstanceAndLink[T](target: Class[T], supervisor: AnyRef): T = { @@ -64,12 +64,12 @@ class ActiveObjectFactory { */ // ================================================ - private[kernel] def newInstance[T](target: Class[T], actor: Actor, remote: Boolean): T = { - ActiveObject.newInstance(target, actor, remote) + private[kernel] def newInstance[T](target: Class[T], actor: Actor, remote: Boolean, timeout: Long): T = { + ActiveObject.newInstance(target, actor, remote, timeout) } - private[kernel] def newInstance[T](intf: Class[T], target: AnyRef, actor: Actor, remote: Boolean): T = { - ActiveObject.newInstance(intf, target, actor, remote) + private[kernel] def newInstance[T](intf: Class[T], target: AnyRef, actor: Actor, remote: Boolean, timeout: Long): T = { + ActiveObject.newInstance(intf, target, actor, remote, timeout) } private[kernel] def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor = @@ -85,30 +85,30 @@ object ActiveObject { val MATCH_ALL = "execution(* *.*(..))" val AKKA_CAMEL_ROUTING_SCHEME = "akka" - def newInstance[T](target: Class[T]): T = newInstance(target, new Dispatcher(target.getName), false) + def newInstance[T](target: Class[T], timeout: Long): T = newInstance(target, new Dispatcher(target.getName), false, timeout) - def newInstance[T](intf: Class[T], target: AnyRef): T = newInstance(intf, target, new Dispatcher(intf.getName), false) + def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T = newInstance(intf, target, new Dispatcher(intf.getName), false, timeout) - def newRemoteInstance[T](target: Class[T]): T = newInstance(target, new Dispatcher(target.getName), true) + def newRemoteInstance[T](target: Class[T], timeout: Long): T = newInstance(target, new Dispatcher(target.getName), true, timeout) - def newRemoteInstance[T](intf: Class[T], target: AnyRef): T = newInstance(intf, target, new Dispatcher(intf.getName), true) + def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T = newInstance(intf, target, new Dispatcher(intf.getName), true, timeout) // ================================================ - private[kernel] def newInstance[T](target: Class[T], actor: Actor, remote: Boolean): T = { - if (remote) NettyClient.connect + private[kernel] def newInstance[T](target: Class[T], actor: Actor, remote: Boolean, timeout: Long): T = { + if (remote) RemoteClient.connect val proxy = Proxy.newInstance(target, false, true) // FIXME switch to weaving in the aspect at compile time proxy.asInstanceOf[Advisable].aw_addAdvice( - MATCH_ALL, new TransactionalAroundAdvice(target, proxy, actor, remote)) + MATCH_ALL, new ActorAroundAdvice(target, proxy, actor, remote, timeout)) proxy.asInstanceOf[T] } - private[kernel] def newInstance[T](intf: Class[T], target: AnyRef, actor: Actor, remote: Boolean): T = { - if (remote) NettyClient.connect + private[kernel] def newInstance[T](intf: Class[T], target: AnyRef, actor: Actor, remote: Boolean, timeout: Long): T = { + if (remote) RemoteClient.connect val proxy = Proxy.newInstance(Array(intf), Array(target), false, true) proxy.asInstanceOf[Advisable].aw_addAdvice( - MATCH_ALL, new TransactionalAroundAdvice(intf, target, actor, remote)) + MATCH_ALL, new ActorAroundAdvice(intf, target, actor, remote, timeout)) proxy.asInstanceOf[T] } @@ -125,13 +125,18 @@ object ActiveObject { /** * @author Jonas Bonér */ -@serializable sealed class TransactionalAroundAdvice( - val target: Class[_], val targetInstance: AnyRef, actor: Actor, val isRemote: Boolean) - extends AroundAdvice { +@serializable +sealed class ActorAroundAdvice(val target: Class[_], + val targetInstance: AnyRef, + val actor: Actor, + val isRemote: Boolean, + val timeout: Long) extends AroundAdvice { val id = target.getName + actor.timeout = timeout actor.start import kernel.reactor._ + // FIXME make configurable!!!!!! MUST private[this] var dispatcher = new ProxyMessageDispatcher private[this] var mailbox = dispatcher.messageQueue dispatcher.start @@ -156,8 +161,9 @@ object ActiveObject { private def remoteDispatch(joinpoint: JoinPoint): AnyRef = { val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti] val oneWay = isOneWay(rtti) - val future = NettyClient.send( - new RemoteRequest(false, rtti.getParameterValues, rtti.getMethod.getName, target.getName, None, oneWay, false)) + val future = RemoteClient.send( + new RemoteRequest(false, rtti.getParameterValues, rtti.getMethod.getName, target.getName, + timeout, None, oneWay, false, actor.registerSupervisorAsRemoteActor)) if (oneWay) null // for void methods else { if (future.isDefined) { diff --git a/kernel/src/main/scala/actor/Actor.scala b/kernel/src/main/scala/actor/Actor.scala index d232be5c02..dc266a30e4 100644 --- a/kernel/src/main/scala/actor/Actor.scala +++ b/kernel/src/main/scala/actor/Actor.scala @@ -8,9 +8,10 @@ import java.util.concurrent.CopyOnWriteArraySet import kernel.reactor._ import kernel.config.ScalaConfig._ -import kernel.nio.{NettyClient, RemoteRequest} +import kernel.nio.{RemoteClient, RemoteRequest} import kernel.stm.{TransactionAwareWrapperException, TransactionManagement, Transaction} import kernel.util.Logging +import org.codehaus.aspectwerkz.proxy.Uuid sealed abstract class LifecycleMessage case class Init(config: AnyRef) extends LifecycleMessage @@ -22,8 +23,8 @@ case class Exit(dead: Actor, killer: Throwable) extends LifecycleMessage sealed abstract class DispatcherType object DispatcherType { case object EventBasedThreadPooledProxyInvokingDispatcher extends DispatcherType - case object EventBasedSingleThreadingDispatcher extends DispatcherType - case object EventBasedThreadPoolingDispatcher extends DispatcherType + case object EventBasedSingleThreadDispatcher extends DispatcherType + case object EventBasedThreadPoolDispatcher extends DispatcherType case object ThreadBasedDispatcher extends DispatcherType } @@ -33,16 +34,14 @@ class ActorMessageHandler(val actor: Actor) extends MessageHandler { trait Actor extends Logging with TransactionManagement { val id: String = this.getClass.toString - - @volatile private[this] var isRemote = false - @volatile private[this] var isTransactional = false + val uuid = Uuid.newUuid.toString @volatile private[this] var isRunning: Boolean = false - protected[this] var dispatcher: MessageDispatcher = _ + + protected[Actor] var mailbox: MessageQueue = _ protected[this] var senderFuture: Option[CompletableFutureResult] = None protected[this] val linkedActors = new CopyOnWriteArraySet[Actor] - protected[actor] var mailbox: MessageQueue = _ - protected[actor] var supervisor: Option[Actor] = None + protected[kernel] var supervisor: Option[Actor] = None protected[actor] var lifeCycleConfig: Option[LifeCycle] = None private var hotswap: Option[PartialFunction[Any, Unit]] = None @@ -53,22 +52,83 @@ trait Actor extends Logging with TransactionManagement { // ==================================== /** - * TODO: document + * User overridable callback/setting. + * + * Defines the default timeout for '!!' invocations, e.g. the timeout for the future returned by the call to '!!'. */ - protected var faultHandler: Option[FaultHandlingStrategy] = None + @volatile var timeout: Long = 5000L /** - * Set dispatcher type to either ThreadBasedDispatcher, EventBasedSingleThreadingDispatcher or EventBasedThreadPoolingDispatcher. - * Default is EventBasedThreadPoolingDispatcher. + * User overridable callback/setting. + * + * Setting 'isRemote' to true means that an actor will be moved to and invoked on a remote host. + * See also 'makeRemote'. */ - protected[this] var dispatcherType: DispatcherType = DispatcherType.EventBasedThreadPoolingDispatcher - + @volatile private[this] var isRemote = false + /** + * User overridable callback/setting. + * + * Setting 'isTransactional' to true means that the actor will **start** a new transaction if non exists. + * However, it will always participate in an existing transaction. + * If transactionality want to be completely turned off then do it by invoking: + *
+   *  TransactionManagement.disableTransactions
+   * 
+ * See also 'makeTransactional'. + */ + @volatile private[this] var isTransactional = false + + /** + * User overridable callback/setting. + * + * User can (and is encouraged to) override the default configuration so it fits the specific use-case that the actor is used for. + *

+ * It is beneficial to have actors share the same dispatcher, easily +100 actors can share the same. + *
+ * But if you are running many many actors then it can be a good idea to have split them up in terms of dispatcher sharing. + *
+ * Default is that all actors that are created and spawned from within this actor is sharing the same dispatcher as its creator. + *

+ * There are currently two different dispatchers available (but the interface can easily be implemented for custom implementation): + *

+   *  // default - executorService can be build up using the ThreadPoolBuilder
+   *  new EventBasedThreadPoolDispatcher(executor: ExecutorService)
+   * 
+   *  new EventBasedSingleThreadDispatcher
+   * 
+ */ + protected[Actor] var dispatcher: MessageDispatcher = { + val threadPool = ThreadPoolBuilder.newBuilder.newThreadPoolWithLinkedBlockingQueueWithCapacity(0).build + val dispatcher = new EventBasedThreadPoolDispatcher(threadPool) + mailbox = dispatcher.messageQueue + dispatcher.registerHandler(this, new ActorMessageHandler(this)) + dispatcher + } + + /** + * User overridable callback/setting. + * * Set trapExit to true if actor should be able to trap linked actors exit messages. */ protected[this] var trapExit: Boolean = false /** + * User overridable callback/setting. + * + * If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined. + * Can be one of: + *
+   *  AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int)
+   * 
+   *  OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int)
+   * 
+ */ + protected var faultHandler: Option[FaultHandlingStrategy] = None + + /** + * User overridable callback/setting. + * * Partial function implementing the server logic. * To be implemented by subclassing server. *

@@ -90,24 +150,32 @@ trait Actor extends Logging with TransactionManagement { protected def receive: PartialFunction[Any, Unit] /** + * User overridable callback/setting. + * * Optional callback method that is called during initialization. * To be implemented by subclassing actor. */ protected def init(config: AnyRef) {} /** + * User overridable callback/setting. + * * Mandatory callback method that is called during restart and reinitialization after a server crash. * To be implemented by subclassing actor. */ protected def preRestart(reason: AnyRef, config: Option[AnyRef]) {} /** + * User overridable callback/setting. + * * Mandatory callback method that is called during restart and reinitialization after a server crash. * To be implemented by subclassing actor. */ protected def postRestart(reason: AnyRef, config: Option[AnyRef]) {} /** + * User overridable callback/setting. + * * Optional callback method that is called during termination. * To be implemented by subclassing actor. */ @@ -118,10 +186,25 @@ trait Actor extends Logging with TransactionManagement { // ============= /** - * TODO: document + * Starts up the actor and its message queue. */ - @volatile var timeout: Long = 5000L + def start = synchronized { + if (!isRunning) { + dispatcher.start + isRunning = true + } + } + /** + * Stops the actor and its message queue. + */ + def stop = synchronized { + if (isRunning) { + dispatcher.unregisterHandler(this) + isRunning = false + } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") + } + /** * TODO: document */ @@ -171,112 +254,118 @@ trait Actor extends Logging with TransactionManagement { // FIXME can be deadlock prone if cyclic linking? - HOWTO? /** - * TODO: document + * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will receive a notification nif the linked actor has crashed. + * If the 'trapExit' flag has been set then it will 'trap' the failure and automatically restart the linked actors according to the restart strategy defined by the 'faultHandler'. + *

+ * To be invoked from within the actor itself. */ - protected[this] def link(actor: Actor) = synchronized { actor.synchronized { + protected[this] def link(actor: Actor) = synchronized { if (isRunning) { linkedActors.add(actor) if (actor.supervisor.isDefined) throw new IllegalStateException("Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails") actor.supervisor = Some(this) log.debug("Linking actor [%s] to actor [%s]", actor, this) } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") - }} + } // FIXME can be deadlock prone if cyclic linking? - HOWTO? /** - * TODO: document + * Unlink the actor. + *

+ * To be invoked from within the actor itself. */ - protected[this] def unlink(actor: Actor) = synchronized { actor.synchronized { + protected[this] def unlink(actor: Actor) = synchronized { if (isRunning) { if (!linkedActors.contains(actor)) throw new IllegalStateException("Actor [" + actor + "] is not a linked actor, can't unlink") linkedActors.remove(actor) actor.supervisor = None log.debug("Unlinking actor [%s] from actor [%s]", actor, this) } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") - }} - - /** - * TODO: document - */ - def start = synchronized { - if (!isRunning) { - dispatcherType match { - case DispatcherType.EventBasedSingleThreadingDispatcher => - dispatcher = new EventBasedSingleThreadDispatcher - case DispatcherType.EventBasedThreadPoolingDispatcher => - dispatcher = new EventBasedThreadPoolDispatcher - case DispatcherType.EventBasedThreadPooledProxyInvokingDispatcher => - dispatcher = new ProxyMessageDispatcher - case DispatcherType.ThreadBasedDispatcher => - throw new UnsupportedOperationException("ThreadBasedDispatcher is not yet implemented. Please help out and send in a patch.") - } - mailbox = dispatcher.messageQueue - dispatcher.registerHandler(this, new ActorMessageHandler(this)) - dispatcher.start - isRunning = true - } - } - - /** - * TODO: document - */ - def stop = synchronized { - if (isRunning) { - dispatcher.unregisterHandler(this) - isRunning = false - } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } /** - * Atomically start and link actor. + * Atomically start and link an actor. + *

+ * To be invoked from within the actor itself. */ - def spawnLink(actor: Actor) = actor.synchronized { + protected[this] def startLink(actor: Actor) = synchronized { actor.start link(actor) } /** - * Atomically start and link a remote actor. + * Atomically start, link and make an actor remote. + *

+ * To be invoked from within the actor itself. */ - def spawnLinkRemote(actor: Actor) = actor.synchronized { + protected[this] def startLinkRemote(actor: Actor) = synchronized { actor.makeRemote actor.start link(actor) } /** - * TODO: document + * Atomically create (from actor class) and start an actor. + *

+ * To be invoked from within the actor itself. */ - def spawn(actorClass: Class[_]) = { - - // FIXME: should pass in dispatcher etc. - inherit + protected[this] def spawn(actorClass: Class[_]): Actor = synchronized { + val actor = actorClass.newInstance.asInstanceOf[Actor] + actor.dispatcher = dispatcher + actor.mailbox = mailbox + actor.start + actor } /** - * TODO: document + * Atomically create (from actor class), start and make an actor remote. + *

+ * To be invoked from within the actor itself. */ - def spawnRemote(actorClass: Class[_]) = { + protected[this] def spawnRemote(actorClass: Class[_]): Actor = synchronized { + val actor = actorClass.newInstance.asInstanceOf[Actor] + actor.makeRemote + actor.dispatcher = dispatcher + actor.mailbox = mailbox + actor.start + actor } /** - * TODO: document + * Atomically create (from actor class), start and link an actor. + *

+ * To be invoked from within the actor itself. */ - def spawnLink(actorClass: Class[_]) = { + protected[this] def spawnLink(actorClass: Class[_]): Actor = synchronized { + val actor = spawn(actorClass) + link(actor) + actor } /** - * TODO: document + * Atomically create (from actor class), start, link and make an actor remote. + *

+ * To be invoked from within the actor itself. */ - def spawnLinkRemote(actorClass: Class[_]) = { + protected[this] def spawnLinkRemote(actorClass: Class[_]): Actor = synchronized { + val actor = spawn(actorClass) + actor.makeRemote + link(actor) + actor } /** - * TODO: document + * Invoking 'makeRemote' means that an actor will be moved to andinvoked on a remote host */ def makeRemote = isRemote = true /** - * TODO: document + * Invoking 'makeTransactional' means that the actor will **start** a new transaction if non exists. + * However, it will always participate in an existing transaction. + * If transactionality want to be completely turned off then do it by invoking: + *

+   *  TransactionManagement.disableTransactions
+   * 
*/ def makeTransactional = synchronized { if (isRunning) throw new IllegalArgumentException("Can not make actor transactional after it has been started") @@ -287,32 +376,16 @@ trait Actor extends Logging with TransactionManagement { // ==== IMPLEMENTATION DETAILS ==== // ================================ - private[kernel] def handle(messageHandle: MessageHandle) = synchronized { - val message = messageHandle.message - val future = messageHandle.future - try { - if (messageHandle.tx.isDefined) - TransactionManagement.threadBoundTx.set(messageHandle.tx) - senderFuture = future - if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function - else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString) - } catch { - case e => - if (supervisor.isDefined) supervisor.get ! Exit(this, e) - if (future.isDefined) future.get.completeWithException(this, e) - else e.printStackTrace - } finally { - TransactionManagement.threadBoundTx.set(None) - } - } - private def postMessageToMailbox(message: AnyRef): Unit = - if (isRemote) NettyClient.send(new RemoteRequest(true, message, null, this.getClass.getName, null, true, false)) - else mailbox.append(new MessageHandle(this, message, None, activeTx)) + if (isRemote) { + val supervisorUuid = registerSupervisorAsRemoteActor + RemoteClient.send(new RemoteRequest(true, message, null, this.getClass.getName, timeout, null, true, false, supervisorUuid)) + } else mailbox.append(new MessageHandle(this, message, None, activeTx)) private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = - if (isRemote) { - val future = NettyClient.send(new RemoteRequest(true, message, null, this.getClass.getName, null, false, false)) + if (isRemote) { + val supervisorUuid = registerSupervisorAsRemoteActor + val future = RemoteClient.send(new RemoteRequest(true, message, null, this.getClass.getName, timeout, null, false, false, supervisorUuid)) if (future.isDefined) future.get else throw new IllegalStateException("Expected a future from remote call to actor " + toString) } @@ -351,31 +424,33 @@ trait Actor extends Logging with TransactionManagement { } private def getResultOrThrowException[T](future: FutureResult): Option[T] = - if (isRemote) getRemoteResultOrThrowException(future) - else { - if (future.exception.isDefined) { - val (_, cause) = future.exception.get - if (TransactionManagement.isTransactionalityEnabled) throw new TransactionAwareWrapperException(cause, activeTx) - else throw cause - } else { - future.result.asInstanceOf[Option[T]] - } - } - - // FIXME: UGLY - Either: make the remote tx work - // FIXME: OR: remove this method along with the tx tuple making in NettyClient.messageReceived and ActiveObject.getResultOrThrowException - private def getRemoteResultOrThrowException[T](future: FutureResult): Option[T] = if (future.exception.isDefined) { val (_, cause) = future.exception.get if (TransactionManagement.isTransactionalityEnabled) throw new TransactionAwareWrapperException(cause, activeTx) else throw cause } else { - if (future.result.isDefined) { - val (res, tx) = future.result.get.asInstanceOf[Tuple2[Option[T], Option[Transaction]]] - res - } else None + future.result.asInstanceOf[Option[T]] } + private[kernel] def handle(messageHandle: MessageHandle) = synchronized { + val message = messageHandle.message + val future = messageHandle.future + try { + if (messageHandle.tx.isDefined) TransactionManagement.threadBoundTx.set(messageHandle.tx) + senderFuture = future + if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function + else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString) + } catch { + case e => + // FIXME to fix supervisor restart of actor for oneway calls, inject a supervisor proxy that can send notification back to client + if (supervisor.isDefined) supervisor.get ! Exit(this, e) + if (future.isDefined) future.get.completeWithException(this, e) + else e.printStackTrace + } finally { + TransactionManagement.threadBoundTx.set(None) + } + } + private def base: PartialFunction[Any, Unit] = lifeCycle orElse (hotswap getOrElse receive) private val lifeCycle: PartialFunction[Any, Unit] = { @@ -386,12 +461,16 @@ trait Actor extends Logging with TransactionManagement { case Exit(dead, reason) => handleTrapExit(dead, reason) } - private[kernel] def handleTrapExit(dead: Actor, reason: Throwable): Unit = if (trapExit) { - if (faultHandler.isDefined) { - faultHandler.get match { - case AllForOneStrategy(maxNrOfRetries, withinTimeRange) => restartLinkedActors(reason) - case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => dead.restart(reason) - } + private[kernel] def handleTrapExit(dead: Actor, reason: Throwable): Unit = { + if (trapExit) { + if (faultHandler.isDefined) { + faultHandler.get match { + case AllForOneStrategy(maxNrOfRetries, withinTimeRange) => restartLinkedActors(reason) + case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => dead.restart(reason) + } + } else throw new IllegalStateException("No 'faultHandler' defined for actor with the 'trapExit' flag set to true " + toString) + } else { + if (supervisor.isDefined) supervisor.get ! Exit(dead, reason) // if 'trapExit' is not defined then pass the Exit on } } @@ -405,12 +484,12 @@ trait Actor extends Logging with TransactionManagement { scope match { case Permanent => { preRestart(reason, config) - log.debug("Restarting actor [%s] configured as PERMANENT.", id) - // FIXME SWAP actor + log.info("Restarting actor [%s] configured as PERMANENT.", id) postRestart(reason, config) } case Temporary => + // FIXME handle temporary actors // if (reason == 'normal) { // log.debug("Restarting actor [%s] configured as TEMPORARY (since exited naturally).", id) // scheduleRestart @@ -423,5 +502,12 @@ trait Actor extends Logging with TransactionManagement { } } + private[kernel] def registerSupervisorAsRemoteActor: Option[String] = { + if (supervisor.isDefined) { + RemoteClient.registerSupervisorForActor(this) + Some(supervisor.get.uuid) + } else None + } + override def toString(): String = "Actor[" + id + "]" } diff --git a/kernel/src/main/scala/actor/Supervisor.scala b/kernel/src/main/scala/actor/Supervisor.scala index dd3b521150..a20e4be093 100644 --- a/kernel/src/main/scala/actor/Supervisor.scala +++ b/kernel/src/main/scala/actor/Supervisor.scala @@ -8,7 +8,7 @@ import kernel.util.Logging import kernel.config.ScalaConfig._ import kernel.util.Helpers._ import scala.collection.mutable.HashMap - + /** * Messages that the supervisor responds to and returns. * @@ -35,11 +35,11 @@ case class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends * SupervisorConfig( * RestartStrategy(OneForOne, 3, 10), * Worker( - * myFirstActorInstance, + * myFirstActor, * LifeCycle(Permanent, 1000)) * :: * Worker( - * mySecondActorInstance, + * mySecondActor, * LifeCycle(Permanent, 1000)) * :: Nil) * } @@ -87,15 +87,21 @@ abstract class SupervisorFactory extends Logging { } } -// FIXME remove Supervisor - all Actors can be supervisors - move SupervisorFactory config into actor /** + * NOTE: + *

+ * The supervisor class is only used for the configuration system when configuring supervisor hierarchies declaratively. + * Should not be used in development. Instead wire the actors together using 'link', 'spawnLink' etc. and set the 'trapExit' + * flag in the actors that should trap error signals and trigger restart. + *

+ * See the ScalaDoc for the SupervisorFactory for an example on how to declaratively wire up actors. + * * @author Jonas Bonér */ -class Supervisor(handler: FaultHandlingStrategy) extends Actor with Logging { - makeTransactional +class Supervisor(handler: FaultHandlingStrategy) extends Actor with Logging { trapExit = true faultHandler = Some(handler) - + def startSupervisor = { start this ! StartSupervisor @@ -119,7 +125,7 @@ class Supervisor(handler: FaultHandlingStrategy) extends Actor with Logging { server match { case Worker(actor, lifecycle) => actor.lifeCycleConfig = Some(lifecycle) - spawnLink(actor) + startLink(actor) case SupervisorConfig(_, _) => // recursive configuration val supervisor = factory.newSupervisorFor(server.asInstanceOf[SupervisorConfig]) diff --git a/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala index f2353e2702..60058f85ff 100644 --- a/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala +++ b/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala @@ -22,7 +22,6 @@ import scala.collection.mutable.HashMap import java.lang.reflect.Method import javax.servlet.ServletContext - /** * @author Jonas Bonér */ @@ -57,15 +56,6 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC proxy.asInstanceOf[T] } - /* - override def getActiveObjectProxy(clazz: Class[_]): ActiveObjectProxy = synchronized { - log.debug("Looking up active object proxy [%s]", clazz.getName) - if (injector == null) throw new IllegalStateException("inject() and/or supervise() must be called before invoking getActiveObjectProxy(clazz)") - val activeObjectOption: Option[Tuple3[Class[_], Class[_], ActiveObjectProxy]] = activeObjectRegistry.get(clazz) - if (activeObjectOption.isDefined) activeObjectOption.get._3 - else throw new IllegalStateException("Class [" + clazz.getName + "] has not been put under supervision (by passing in the config to the 'supervise') method") - } - */ override def getExternalDependency[T](clazz: Class[T]): T = synchronized { injector.getInstance(clazz).asInstanceOf[T] } @@ -109,7 +99,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC private def newSubclassingProxy(component: Component): DependencyBinding = { val targetClass = component.target val actor = new Dispatcher(targetClass.getName) - val proxy = activeObjectFactory.newInstance(targetClass, actor, false).asInstanceOf[AnyRef] + val proxy = activeObjectFactory.newInstance(targetClass, actor, false, component.timeout).asInstanceOf[AnyRef] workers ::= Worker(actor, component.lifeCycle) activeObjectRegistry.put(targetClass, (proxy, proxy, component)) new DependencyBinding(targetClass, proxy) @@ -120,7 +110,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry component.target.getConstructor(Array[Class[_]]()).setAccessible(true) val actor = new Dispatcher(targetClass.getName) - val proxy = activeObjectFactory.newInstance(targetClass, targetInstance, actor, false).asInstanceOf[AnyRef] + val proxy = activeObjectFactory.newInstance(targetClass, targetInstance, actor, false, component.timeout).asInstanceOf[AnyRef] workers ::= Worker(actor, component.lifeCycle) activeObjectRegistry.put(targetClass, (proxy, targetInstance, component)) new DependencyBinding(targetClass, proxy) diff --git a/kernel/src/main/scala/nio/NettyClient.scala b/kernel/src/main/scala/nio/RemoteClient.scala similarity index 60% rename from kernel/src/main/scala/nio/NettyClient.scala rename to kernel/src/main/scala/nio/RemoteClient.scala index e33a363bdf..53c968b72f 100644 --- a/kernel/src/main/scala/nio/NettyClient.scala +++ b/kernel/src/main/scala/nio/RemoteClient.scala @@ -7,27 +7,32 @@ package se.scalablesolutions.akka.kernel.nio import java.net.InetSocketAddress import java.util.concurrent.{Executors, ConcurrentMap, ConcurrentHashMap} +import kernel.actor.{Exit, Actor} import kernel.reactor.{DefaultCompletableFutureResult, CompletableFutureResult} import kernel.util.Logging +import org.jboss.netty.channel._ +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory import org.jboss.netty.handler.codec.serialization.{ObjectEncoder, ObjectDecoder} import org.jboss.netty.bootstrap.ClientBootstrap -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory -import org.jboss.netty.channel._ -object NettyClient extends Logging { +// FIXME need to be able support multiple remote clients +object RemoteClient extends Logging { + // FIXME make host options configurable private val HOSTNAME = "localhost" private val PORT = 9999 @volatile private var isRunning = false private val futures = new ConcurrentHashMap[Long, CompletableFutureResult] + private val supervisors = new ConcurrentHashMap[String, Actor] + // TODO is this Netty channelFactory and other options always the best or should it be configurable? private val channelFactory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool, Executors.newCachedThreadPool) private val bootstrap = new ClientBootstrap(channelFactory) - private val handler = new ObjectClientHandler(futures) + private val handler = new ObjectClientHandler(futures, supervisors) bootstrap.getPipeline.addLast("handler", handler) bootstrap.setOption("tcpNoDelay", true) @@ -38,12 +43,12 @@ object NettyClient extends Logging { def connect = synchronized { if (!isRunning) { connection = bootstrap.connect(new InetSocketAddress(HOSTNAME, PORT)) - log.info("Starting NIO client at [%s:%s]", HOSTNAME, PORT) + log.info("Starting remote client connection to [%s:%s]", HOSTNAME, PORT) // Wait until the connection attempt succeeds or fails. connection.awaitUninterruptibly if (!connection.isSuccess) { - log.error("Connection has failed due to [%s]", connection.getCause) + log.error("Remote connection to [%s:%s] has failed due to [%s]", HOSTNAME, PORT, connection.getCause) connection.getCause.printStackTrace } isRunning = true @@ -64,13 +69,23 @@ object NettyClient extends Logging { None } else { futures.synchronized { - val futureResult = new DefaultCompletableFutureResult(100000) + val futureResult = new DefaultCompletableFutureResult(request.timeout) futures.put(request.id, futureResult) connection.getChannel.write(escapedRequest) Some(futureResult) } } - } else throw new IllegalStateException("Netty client is not running, make sure you have invoked 'connect' before using the client") + } else throw new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.") + + def registerSupervisorForActor(actor: Actor) = + if (!actor.supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actor + " since it is not under supervision") + else supervisors.putIfAbsent(actor.supervisor.get.uuid, actor) + + def deregisterSupervisorForActor(actor: Actor) = + if (!actor.supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actor + " since it is not under supervision") + else supervisors.remove(actor.supervisor.get.uuid) + + def deregisterSupervisorWithUuid(uuid: String) = supervisors.remove(uuid) private def escapeRequest(request: RemoteRequest) = { if (request.message.isInstanceOf[Array[Object]]) { @@ -89,7 +104,9 @@ object NettyClient extends Logging { } @ChannelPipelineCoverage { val value = "all" } -class ObjectClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResult]) extends SimpleChannelUpstreamHandler with Logging { +class ObjectClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResult], + val supervisors: ConcurrentMap[String, Actor]) + extends SimpleChannelUpstreamHandler with Logging { override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { @@ -115,20 +132,31 @@ class ObjectClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResu val result = event.getMessage if (result.isInstanceOf[RemoteReply]) { val reply = result.asInstanceOf[RemoteReply] -// val tx = reply.tx val future = futures.get(reply.id) - //if (reply.successful) future.completeWithResult((reply.message, tx)) + //val tx = reply.tx + //if (reply.successful) future.completeWithResult((reply.message, tx)) if (reply.successful) future.completeWithResult(reply.message) - else future.completeWithException(null, reply.exception) - futures.remove(reply.id) - } else throw new IllegalArgumentException("Unknown message received in NIO client handler: " + result) + else { + if (reply.supervisorUuid.isDefined) { + val supervisorUuid = reply.supervisorUuid.get + if (!supervisors.containsKey(supervisorUuid)) throw new IllegalStateException("Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found") + val supervisedActor = supervisors.get(supervisorUuid) + if (!supervisedActor.supervisor.isDefined) throw new IllegalStateException("Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") + else supervisedActor.supervisor.get ! Exit(supervisedActor, reply.exception) + } + future.completeWithException(null, reply.exception) + } + futures.remove(reply.id) + } else throw new IllegalArgumentException("Unknown message received in remote client handler: " + result) } catch { - case e: Exception => log.error("Unexpected exception in NIO client handler: %s", e); throw e + case e: Exception => + log.error("Unexpected exception in remote client handler: %s", e) + throw e } } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) { - log.error("Unexpected exception from downstream: %s", event.getCause) + log.error("Unexpected exception from downstream in remote client: %s", event.getCause) event.getChannel.close } } diff --git a/kernel/src/main/scala/nio/NettyServer.scala b/kernel/src/main/scala/nio/RemoteServer.scala similarity index 79% rename from kernel/src/main/scala/nio/NettyServer.scala rename to kernel/src/main/scala/nio/RemoteServer.scala index 40273b2297..0097db6734 100644 --- a/kernel/src/main/scala/nio/NettyServer.scala +++ b/kernel/src/main/scala/nio/RemoteServer.scala @@ -24,11 +24,12 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory import org.jboss.netty.handler.codec.serialization.ObjectDecoder import org.jboss.netty.handler.codec.serialization.ObjectEncoder -class NettyServer extends Logging { - def connect = NettyServer.connect +class RemoteServer extends Logging { + def connect = RemoteServer.connect } -object NettyServer extends Logging { +object RemoteServer extends Logging { + // FIXME make all remote server option configurable private val HOSTNAME = "localhost" private val PORT = 9999 private val CONNECTION_TIMEOUT_MILLIS = 100 @@ -53,7 +54,7 @@ object NettyServer extends Logging { def connect = synchronized { if (!isRunning) { - log.info("Starting NIO server at [%s:%s]", HOSTNAME, PORT) + log.info("Starting remote server at [%s:%s]", HOSTNAME, PORT) bootstrap.bind(new InetSocketAddress(HOSTNAME, PORT)) isRunning = true } @@ -66,9 +67,6 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging { private val activeObjects = new ConcurrentHashMap[String, AnyRef] private val actors = new ConcurrentHashMap[String, Actor] - private val MESSAGE_HANDLE = classOf[Actor].getDeclaredMethod( - "handle", Array[Class[_]](classOf[MessageHandle])) - override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { log.debug(event.toString) @@ -87,56 +85,50 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging { override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) ={ val message = event.getMessage - if (message == null) throw new IllegalStateException("Message in MessageEvent is null: " + event) + if (message == null) throw new IllegalStateException("Message in remote MessageEvent is null: " + event) if (message.isInstanceOf[RemoteRequest]) handleRemoteRequest(message.asInstanceOf[RemoteRequest], event.getChannel) } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { event.getCause.printStackTrace - log.error("Unexpected exception from downstream: %s", event.getCause) + log.error("Unexpected exception from remote downstream: %s", event.getCause) event.getChannel.close } private def handleRemoteRequest(request: RemoteRequest, channel: Channel) = { - try { - log.debug(request.toString) - if (request.isActor) dispatchToActor(request, channel) - else dispatchToActiveObject(request, channel) - } catch { - case e: Exception => - log.error("Could not invoke remote active object or actor [%s :: %s] due to: %s", request.method, request.target, e) - e.printStackTrace - } + log.debug(request.toString) + if (request.isActor) dispatchToActor(request, channel) + else dispatchToActiveObject(request, channel) } private def dispatchToActor(request: RemoteRequest, channel: Channel) = { - log.debug("Dispatching to actor [%s]", request.target) - val actor = createActor(request.target) + log.debug("Dispatching to remote actor [%s]", request.target) + val actor = createActor(request.target, request.timeout) actor.start if (request.isOneWay) actor ! request.message else { try { val resultOrNone = actor !! request.message - val result = if (resultOrNone.isDefined) resultOrNone else null + val result: AnyRef = if (resultOrNone.isDefined) resultOrNone.get else null log.debug("Returning result from actor invocation [%s]", result) //channel.write(request.newReplyWithMessage(result, TransactionManagement.threadBoundTx.get)) channel.write(request.newReplyWithMessage(result, null)) } catch { - case e: InvocationTargetException => - log.error("Could not invoke remote actor [%s] due to: %s", request.target, e.getCause) - e.getCause.printStackTrace - channel.write(request.newReplyWithException(e.getCause)) + case e: Throwable => + log.error("Could not invoke remote actor [%s] due to: %s", request.target, e) + e.printStackTrace + channel.write(request.newReplyWithException(e)) } } } private def dispatchToActiveObject(request: RemoteRequest, channel: Channel) = { - log.debug("Dispatching to [%s :: %s]", request.method, request.target) - val activeObject = createActiveObject(request.target) + log.debug("Dispatching to remote active object [%s :: %s]", request.method, request.target) + val activeObject = createActiveObject(request.target, request.timeout) val args = request.message.asInstanceOf[scala.List[AnyRef]] - val argClazzes = args.map(_.getClass) - val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClazzes) + val argClasses = args.map(_.getClass) + val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClasses, request.timeout) continueTransaction(request) try { @@ -144,7 +136,7 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging { if (request.isOneWay) messageReceiver.invoke(activeObject, unescapedArgs) else { val result = messageReceiver.invoke(activeObject, unescapedArgs) - log.debug("Returning result from active object invocation [%s]", result) + log.debug("Returning result from remote active object invocation [%s]", result) //channel.write(request.newReplyWithMessage(result, TransactionManagement.threadBoundTx.get)) channel.write(request.newReplyWithMessage(result, null)) } @@ -153,6 +145,10 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging { log.error("Could not invoke remote active object [%s :: %s] due to: %s", request.method, request.target, e.getCause) e.getCause.printStackTrace channel.write(request.newReplyWithException(e.getCause)) + case e: Throwable => + log.error("Could not invoke remote active object [%s :: %s] due to: %s", request.method, request.target, e) + e.printStackTrace + channel.write(request.newReplyWithException(e)) } } @@ -164,14 +160,14 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging { } else TransactionManagement.threadBoundTx.set(None) } - private def unescapeArgs(args: scala.List[AnyRef], argClasses: scala.List[Class[_]]) = { + private def unescapeArgs(args: scala.List[AnyRef], argClasses: scala.List[Class[_]], timeout: Long) = { val unescapedArgs = new Array[AnyRef](args.size) val unescapedArgClasses = new Array[Class[_]](args.size) val escapedArgs = for (i <- 0 until args.size) { if (args(i).isInstanceOf[ProxyWrapper]) { val proxyName = args(i).asInstanceOf[ProxyWrapper].proxyName - val activeObject = createActiveObject(proxyName) + val activeObject = createActiveObject(proxyName, timeout) unescapedArgs(i) = activeObject unescapedArgClasses(i) = Class.forName(proxyName) } else { @@ -182,13 +178,13 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging { (unescapedArgs, unescapedArgClasses) } - private def createActiveObject(name: String): AnyRef = { + private def createActiveObject(name: String, timeout: Long): AnyRef = { val activeObjectOrNull = activeObjects.get(name) if (activeObjectOrNull == null) { val clazz = Class.forName(name) try { val actor = new Dispatcher(clazz.getName) - val newInstance = activeObjectFactory.newInstance(clazz, actor, false).asInstanceOf[AnyRef] + val newInstance = activeObjectFactory.newInstance(clazz, actor, false, timeout).asInstanceOf[AnyRef] activeObjects.put(name, newInstance) newInstance } catch { @@ -200,12 +196,13 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging { } else activeObjectOrNull } - private def createActor(name: String): Actor = { + private def createActor(name: String, timeout: Long): Actor = { val actorOrNull = actors.get(name) if (actorOrNull == null) { val clazz = Class.forName(name) try { val newInstance = clazz.newInstance.asInstanceOf[Actor] + newInstance.timeout = timeout actors.put(name, newInstance) newInstance } catch { diff --git a/kernel/src/main/scala/nio/RequestReply.scala b/kernel/src/main/scala/nio/RequestReply.scala index c7de631087..111b513ad5 100644 --- a/kernel/src/main/scala/nio/RequestReply.scala +++ b/kernel/src/main/scala/nio/RequestReply.scala @@ -19,15 +19,17 @@ object IdFactory { val message: AnyRef, val method: String, val target: String, + val timeout: Long, val tx: Option[Transaction], val isOneWay: Boolean, - val isEscaped: Boolean) { + val isEscaped: Boolean, + val supervisorUuid: Option[String]) { private[RemoteRequest] var _id = IdFactory.nextId def id = _id override def toString: String = synchronized { - "RemoteRequest[isActor: " + isActor + " | message: " + message + " | method: " + method + - " | target: " + target + " | tx: " + tx + " | isOneWay: " + isOneWay + "]" + "RemoteRequest[isActor: " + isActor + " | message: " + message + " | timeout: " + timeout + " | method: " + method + + " | target: " + target + " | tx: " + tx + " | isOneWay: " + isOneWay + " | supervisorUuid: " + supervisorUuid + "]" } override def hashCode(): Int = synchronized { @@ -36,6 +38,11 @@ object IdFactory { result = HashCode.hash(result, message) result = HashCode.hash(result, method) result = HashCode.hash(result, target) + result = HashCode.hash(result, timeout) + result = HashCode.hash(result, isOneWay) + result = HashCode.hash(result, isEscaped) + result = if (tx.isDefined) HashCode.hash(result, tx.get) else result + result = if (supervisorUuid.isDefined) HashCode.hash(result, supervisorUuid.get) else result result } @@ -45,15 +52,22 @@ object IdFactory { that.asInstanceOf[RemoteRequest].isActor == isActor && that.asInstanceOf[RemoteRequest].message == message && that.asInstanceOf[RemoteRequest].method == method && - that.asInstanceOf[RemoteRequest].target == target + that.asInstanceOf[RemoteRequest].target == target && + that.asInstanceOf[RemoteRequest].timeout == timeout && + that.asInstanceOf[RemoteRequest].isOneWay == isOneWay && + that.asInstanceOf[RemoteRequest].isEscaped == isEscaped && + that.asInstanceOf[RemoteRequest].tx.isDefined == tx.isDefined && + that.asInstanceOf[RemoteRequest].tx.get == tx.get && + that.asInstanceOf[RemoteRequest].supervisorUuid.isDefined == supervisorUuid.isDefined && + that.asInstanceOf[RemoteRequest].supervisorUuid.get == supervisorUuid.get } - def newReplyWithMessage(message: AnyRef, tx: Option[Transaction]) = synchronized { new RemoteReply(true, id, message, null, tx) } + def newReplyWithMessage(message: AnyRef, tx: Option[Transaction]) = synchronized { new RemoteReply(true, id, message, null, tx, supervisorUuid) } - def newReplyWithException(error: Throwable) = synchronized { new RemoteReply(false, id, null, error, None) } + def newReplyWithException(error: Throwable) = synchronized { new RemoteReply(false, id, null, error, None, supervisorUuid) } def cloneWithNewMessage(message: AnyRef, isEscaped: Boolean) = synchronized { - val request = new RemoteRequest(isActor, message, method, target, tx, isOneWay, isEscaped) + val request = new RemoteRequest(isActor, message, method, target, timeout, tx, isOneWay, isEscaped, supervisorUuid) request._id = id request } @@ -63,10 +77,11 @@ object IdFactory { val id: Long, val message: AnyRef, val exception: Throwable, - val tx: Option[Transaction]) { + val tx: Option[Transaction], + val supervisorUuid: Option[String]) { override def toString: String = synchronized { - "RemoteReply[successful: " + successful + " | id: " + id + " | message: " + - message + " | exception: " + exception + " | tx: " + tx + "]" + "RemoteReply[successful: " + successful + " | id: " + id + " | message: " + message + + " | exception: " + exception + " | tx: " + tx + " | supervisorUuid: " + supervisorUuid + "]" } override def hashCode(): Int = synchronized { @@ -75,6 +90,8 @@ object IdFactory { result = HashCode.hash(result, id) result = HashCode.hash(result, message) result = HashCode.hash(result, exception) + result = if (tx.isDefined) HashCode.hash(result, tx.get) else result + result = if (supervisorUuid.isDefined) HashCode.hash(result, supervisorUuid.get) else result result } @@ -84,6 +101,10 @@ object IdFactory { that.asInstanceOf[RemoteReply].successful == successful && that.asInstanceOf[RemoteReply].id == id && that.asInstanceOf[RemoteReply].message == message && - that.asInstanceOf[RemoteReply].exception == exception + that.asInstanceOf[RemoteReply].exception == exception && + that.asInstanceOf[RemoteRequest].tx.isDefined == tx.isDefined && + that.asInstanceOf[RemoteRequest].tx.get == tx.get && + that.asInstanceOf[RemoteRequest].supervisorUuid.isDefined == supervisorUuid.isDefined && + that.asInstanceOf[RemoteRequest].supervisorUuid.get == supervisorUuid.get } } \ No newline at end of file diff --git a/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala b/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala index 3b23525f6d..2a3c937b3e 100644 --- a/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala +++ b/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala @@ -10,28 +10,13 @@ */ package se.scalablesolutions.akka.kernel.reactor +import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.ExecutorService +import java.util.{HashSet, LinkedList, Queue} -import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, RejectedExecutionHandler, ThreadPoolExecutor} - -class EventBasedThreadPoolDispatcher extends MessageDispatcherBase { - import java.util.concurrent.Executors - import java.util.HashSet - - // FIXME: make configurable using configgy + JMX +class EventBasedThreadPoolDispatcher(private val threadPool: ExecutorService) extends MessageDispatcherBase { private val busyHandlers = new HashSet[AnyRef] - private val minNrThreads, maxNrThreads = 10 - private val timeOut = 1000L // ???? - private val timeUnit = TimeUnit.MILLISECONDS - private val threadFactory = new MonitorableThreadFactory("akka:kernel") - private val rejectedExecutionHandler = new RejectedExecutionHandler() { - def rejectedExecution(runnable: Runnable, threadPoolExecutor: ThreadPoolExecutor) { - - } - } - private val queue = new LinkedBlockingQueue[Runnable] - private val handlerExecutor = new ThreadPoolExecutor(minNrThreads, maxNrThreads, timeOut, timeUnit, queue, threadFactory, rejectedExecutionHandler) - def start = if (!active) { active = true val messageDemultiplexer = new EventBasedThreadPoolDemultiplexer(messageQueue) @@ -48,7 +33,7 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase { val message = queue.peek val messageHandler = getIfNotBusy(message.sender) if (messageHandler.isDefined) { - handlerExecutor.execute(new Runnable { + threadPool.execute(new Runnable { override def run = { messageHandler.get.handle(message) free(message.sender) @@ -67,22 +52,21 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase { selectorThread.start } - override protected def doShutdown = handlerExecutor.shutdownNow + override protected def doShutdown = threadPool.shutdownNow - private def getIfNotBusy(key: AnyRef): Option[MessageHandler] = synchronized { + private def getIfNotBusy(key: AnyRef): Option[MessageHandler] = guard.synchronized { if (!busyHandlers.contains(key) && messageHandlers.containsKey(key)) { busyHandlers.add(key) Some(messageHandlers.get(key)) } else None } - private def free(key: AnyRef) = synchronized { busyHandlers.remove(key) } + private def free(key: AnyRef) = guard.synchronized { + busyHandlers.remove(key) + } } class EventBasedThreadPoolDemultiplexer(private val messageQueue: MessageQueue) extends MessageDemultiplexer { - import java.util.concurrent.locks.ReentrantLock - import java.util.{LinkedList, Queue} - private val selectedQueue: Queue[MessageHandle] = new LinkedList[MessageHandle] private val selectedQueueLock = new ReentrantLock @@ -99,7 +83,7 @@ class EventBasedThreadPoolDemultiplexer(private val messageQueue: MessageQueue) } def releaseSelectedQueue = { - selectedQueue.clear + //selectedQueue.clear selectedQueueLock.unlock } diff --git a/kernel/src/main/scala/reactor/MessageDispatcherBase.scala b/kernel/src/main/scala/reactor/MessageDispatcherBase.scala index be68c1d838..b456d4da94 100644 --- a/kernel/src/main/scala/reactor/MessageDispatcherBase.scala +++ b/kernel/src/main/scala/reactor/MessageDispatcherBase.scala @@ -4,13 +4,13 @@ package se.scalablesolutions.akka.kernel.reactor -import java.util.concurrent.{ConcurrentMap, ConcurrentHashMap} +import java.util.HashMap trait MessageDispatcherBase extends MessageDispatcher { val messageQueue = new MessageQueue @volatile protected var active: Boolean = false - protected val messageHandlers = new ConcurrentHashMap[AnyRef, MessageHandler] + protected val messageHandlers = new HashMap[AnyRef, MessageHandler] protected var selectorThread: Thread = _ protected val guard = new Object diff --git a/kernel/src/main/scala/reactor/Reactor.scala b/kernel/src/main/scala/reactor/Reactor.scala index 07890d4ae2..fdb3e3632c 100644 --- a/kernel/src/main/scala/reactor/Reactor.scala +++ b/kernel/src/main/scala/reactor/Reactor.scala @@ -84,34 +84,3 @@ class MessageQueue { queue.notifyAll } } - -class MonitorableThreadFactory(val name: String) extends ThreadFactory { - def newThread(runnable: Runnable) = - //new MonitorableThread(runnable, name) - new Thread(runnable) -} - -object MonitorableThread { - val DEFAULT_NAME = "MonitorableThread" - val created = new AtomicInteger - val alive = new AtomicInteger - @volatile val debugLifecycle = false -} -class MonitorableThread(runnable: Runnable, name: String) - extends Thread(runnable, name + "-" + MonitorableThread.created.incrementAndGet) {//with Logging { - setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - def uncaughtException(thread: Thread, cause: Throwable) = {} //log.error("UNCAUGHT in thread [%s] cause [%s]", thread.getName, cause) - }) - - override def run = { - val debug = MonitorableThread.debugLifecycle - //if (debug) log.debug("Created %s", getName) - try { - MonitorableThread.alive.incrementAndGet - super.run - } finally { - MonitorableThread.alive.decrementAndGet - //if (debug) log.debug("Exiting %s", getName) - } - } -} diff --git a/kernel/src/main/scala/reactor/ThreadPoolBuilder.scala b/kernel/src/main/scala/reactor/ThreadPoolBuilder.scala new file mode 100644 index 0000000000..5ef48060e0 --- /dev/null +++ b/kernel/src/main/scala/reactor/ThreadPoolBuilder.scala @@ -0,0 +1,236 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel.reactor + +import java.util.concurrent._ +import atomic.{AtomicLong, AtomicInteger} +import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy +import java.util.Collection + +/** + * Scala API. + *

+ * Example usage: + *

+ *   val threadPool = ThreadPoolBuilder.newBuilder
+ *     .newThreadPoolWithBoundedBlockingQueue(100)
+ *     .setCorePoolSize(16)
+ *     .setMaxPoolSize(128)
+ *     .setKeepAliveTimeInMillis(60000)
+ *     .setRejectionPolicy(new CallerRunsPolicy)
+ *     .build
+ * 
+ * + * @author Jonas Bonér + */ +object ThreadPoolBuilder { + def newBuilder = new ThreadPoolBuilder +} + +/** + * Java API. + *

+ * Example usage: + *

+ *   ThreadPoolBuilder builder = new ThreadPoolBuilder();
+ *   Executor threadPool =
+ *      builder
+ *     .newThreadPoolWithBoundedBlockingQueue(100)
+ *     .setCorePoolSize(16)
+ *     .setMaxPoolSize(128)
+ *     .setKeepAliveTimeInMillis(60000)
+ *     .setRejectionPolicy(new CallerRunsPolicy())
+ *     .build();
+ * 
+ * + * @author Jonas Bonér + */ +class ThreadPoolBuilder { + val NR_START_THREADS = 16 + val NR_MAX_THREADS = 128 + val KEEP_ALIVE_TIME = 60000L // default is one minute + val MILLISECONDS = TimeUnit.MILLISECONDS + + private var inProcessOfBuilding = false + private var threadPool: ThreadPoolExecutor = _ + private val threadFactory = new MonitorableThreadFactory("akka") + private var boundedExecutorBound = -1 + + def build: ExecutorService = synchronized { + inProcessOfBuilding = false + if (boundedExecutorBound > 0) { + val executor = new BoundedExecutorDecorator(threadPool, boundedExecutorBound) + boundedExecutorBound = -1 + executor + } else threadPool + } + + def newThreadPool(queue: BlockingQueue[Runnable]) = synchronized { + verifyNotInConstructionPhase + threadPool = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, queue) + this + } + + /** + * Creates an new thread pool in which the number of tasks in the pending queue is bounded. Will block when exceeeded. + *

+ * The 'bound' variable should specify the number equal to the size of the thread pool PLUS the number of queued tasks that should be followed. + */ + def newThreadPoolWithBoundedBlockingQueue(bound: Int) = synchronized { + verifyNotInConstructionPhase + threadPool = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new LinkedBlockingQueue[Runnable], threadFactory) + boundedExecutorBound = bound + this + } + + /** + * Negative or zero capacity creates an unbounded task queue. + */ + def newThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int) = synchronized { + verifyNotInConstructionPhase + val queue = if (capacity < 1) new LinkedBlockingQueue[Runnable] else new LinkedBlockingQueue[Runnable](capacity) + threadPool = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, queue, threadFactory, new CallerRunsPolicy) + this + } + + def newThreadPoolWithSynchronousQueueWithFairness(fair: Boolean) = synchronized { + verifyNotInConstructionPhase + threadPool = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new SynchronousQueue[Runnable](fair), threadFactory, new CallerRunsPolicy) + this + } + + def newThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean) = synchronized { + verifyNotInConstructionPhase + threadPool = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new ArrayBlockingQueue[Runnable](capacity, fair), threadFactory, new CallerRunsPolicy) + this + } + + /** + * Default is 16. + */ + def setCorePoolSize(size: Int) = synchronized { + verifyInConstructionPhase + threadPool.setCorePoolSize(size) + this + } + + /** + * Default is 128. + */ + def setMaxPoolSize(size: Int) = synchronized { + verifyInConstructionPhase + threadPool.setMaximumPoolSize(size) + this + } + + /** + * Default is 60000 (one minute). + */ + def setKeepAliveTimeInMillis(time: Long) = synchronized { + verifyInConstructionPhase + threadPool.setKeepAliveTime(time, MILLISECONDS) + this + } + + /** + * Default ThreadPoolExecutor.CallerRunsPolicy. To allow graceful backing off when pool is overloaded. + */ + def setRejectionPolicy(policy: RejectedExecutionHandler) = synchronized { + verifyInConstructionPhase + threadPool.setRejectedExecutionHandler(policy) + this + } + + private def verifyNotInConstructionPhase = { + if (inProcessOfBuilding) throw new IllegalStateException("Is already in the process of building a thread pool") + inProcessOfBuilding = true + } + + private def verifyInConstructionPhase = { + if (!inProcessOfBuilding) throw new IllegalStateException("Is not in the process of building a thread pool, start building one by invoking one of the 'newThreadPool*' methods") + } +} + +/** + * @author Jonas Bonér + */ +class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorService { + private val semaphore = new Semaphore(bound) + + def execute(command: Runnable) = { + semaphore.acquire + try { + executor.execute(new Runnable() { + def run = { + try { + command.run + } finally { + semaphore.release + } + } + }) + } catch { + case e: RejectedExecutionException => + semaphore.release + } + } + + // Delegating methods for the ExecutorService interface + def shutdown = executor.shutdown + def shutdownNow = executor.shutdownNow + def isShutdown = executor.isShutdown + def isTerminated = executor.isTerminated + def awaitTermination(l: Long, timeUnit: TimeUnit) = executor.awaitTermination(l, timeUnit) + def submit[T](callable: Callable[T]) = executor.submit(callable) + def submit[T](runnable: Runnable, t: T) = executor.submit(runnable, t) + def submit(runnable: Runnable) = executor.submit(runnable) + def invokeAll[T](callables: Collection[Callable[T]]) = executor.invokeAll(callables) + def invokeAll[T](callables: Collection[Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit) + def invokeAny[T](callables: Collection[Callable[T]]) = executor.invokeAny(callables) + def invokeAny[T](callables: Collection[Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit) +} + +/** + * @author Jonas Bonér + */ +class MonitorableThreadFactory(val name: String) extends ThreadFactory { + private val counter = new AtomicLong + def newThread(runnable: Runnable) = + //new MonitorableThread(runnable, name) + new Thread(runnable, name + "-" + counter.getAndIncrement) +} + +/** + * @author Jonas Bonér + */ +object MonitorableThread { + val DEFAULT_NAME = "MonitorableThread" + val created = new AtomicInteger + val alive = new AtomicInteger + @volatile val debugLifecycle = false +} + +import kernel.util.Logging +/** + * @author Jonas Bonér + */ +class MonitorableThread(runnable: Runnable, name: String) + extends Thread(runnable, name + "-" + MonitorableThread.created.incrementAndGet) {//with Logging { + setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + def uncaughtException(thread: Thread, cause: Throwable) = {} //log.error("UNCAUGHT in thread [%s] cause [%s]", thread.getName, cause) + }) + + override def run = { + val debug = MonitorableThread.debugLifecycle + //if (debug) log.debug("Created %s", getName) + try { + MonitorableThread.alive.incrementAndGet + super.run + } finally { + MonitorableThread.alive.decrementAndGet + //if (debug) log.debug("Exiting %s", getName) + } + } +} diff --git a/kernel/src/main/scala/stm/ChangeSet.scala b/kernel/src/main/scala/stm/ChangeSet.scala index d1a1038e15..5910714da8 100644 --- a/kernel/src/main/scala/stm/ChangeSet.scala +++ b/kernel/src/main/scala/stm/ChangeSet.scala @@ -24,34 +24,6 @@ class ChangeSet { transactionalItems = new HashSet } - - /* - // TX Maps - private[kernel] var _maps: List[TransactionalMap[_, _]] = Nil - private[kernel] def maps_=(maps: List[TransactionalMap[_, _]]) = lock.withWriteLock { - _maps = maps - } - private[kernel] def maps: List[TransactionalMap[_, _]] = lock.withReadLock { - _maps - } - - // TX Vectors - private[kernel] var _vectors: List[TransactionalVector[_]] = Nil - private[kernel] def vectors_=(vectors: List[TransactionalVector[_]]) = lock.withWriteLock { - _vectors = vectors - } - private[kernel] def vectors: List[TransactionalVector[_]] = lock.withReadLock { - _vectors - } - - // TX Refs - private[kernel] var _refs: List[TransactionalRef[_]] = Nil - private[kernel] def refs_=(refs: List[TransactionalRef[_]]) = lock.withWriteLock { - _refs = refs - } - private[kernel] def refs: List[TransactionalRef[_]] = lock.withReadLock { - _refs - } - */ + // FIXME: add hashCode and equals - VERY IMPORTANT } diff --git a/kernel/src/main/scala/stm/TransactionManagement.scala b/kernel/src/main/scala/stm/TransactionManagement.scala index 84042f47d0..4c7c4c89f1 100644 --- a/kernel/src/main/scala/stm/TransactionManagement.scala +++ b/kernel/src/main/scala/stm/TransactionManagement.scala @@ -16,12 +16,10 @@ object TransactionManagement { private val txEnabled = new AtomicBoolean(true) def isTransactionalityEnabled = txEnabled.get - def enableTransactions = txEnabled.set(true) + def disableTransactions = txEnabled.set(false) - private[kernel] val threadBoundTx: ThreadLocal[Option[Transaction]] = { - val tl = new ThreadLocal[Option[Transaction]] - tl.set(None) - tl + private[kernel] val threadBoundTx: ThreadLocal[Option[Transaction]] = new ThreadLocal[Option[Transaction]]() { + override protected def initialValue: Option[Transaction] = None } } @@ -64,7 +62,14 @@ trait TransactionManagement extends Logging { tx.rollback(id) } - protected def isInExistingTransaction = TransactionManagement.threadBoundTx.get.isDefined + protected def isInExistingTransaction = + // FIXME should not need to have this runtime "fix" - investigate what is causing this to happen +// if (TransactionManagement.threadBoundTx.get == null) { +// TransactionManagement.threadBoundTx.set(None) +// false +// } else { + TransactionManagement.threadBoundTx.get.isDefined +// } protected def isTransactionAborted = activeTx.isDefined && activeTx.get.isAborted diff --git a/kernel/src/test/scala/EventBasedDispatcherTest.scala b/kernel/src/test/scala/EventBasedDispatcherTest.scala index e8047a414b..8f346f69d9 100644 --- a/kernel/src/test/scala/EventBasedDispatcherTest.scala +++ b/kernel/src/test/scala/EventBasedDispatcherTest.scala @@ -7,8 +7,9 @@ import java.util.concurrent.locks.Lock import java.util.concurrent.locks.ReentrantLock import org.junit.{Test, Before} import org.junit.Assert._ +import junit.framework.TestCase -class EventBasedDispatcherTest { +class EventBasedDispatcherTest extends TestCase { private var threadingIssueDetected: AtomicBoolean = null class TestMessageHandle(handleLatch: CountDownLatch) extends MessageHandler { @@ -31,7 +32,7 @@ class EventBasedDispatcherTest { } @Before - def setUp = { + override def setUp = { threadingIssueDetected = new AtomicBoolean(false) } diff --git a/kernel/src/test/scala/RemoteActorSpec.scala b/kernel/src/test/scala/RemoteActorSpec.scala index 96aa9616a0..54b723aaa1 100644 --- a/kernel/src/test/scala/RemoteActorSpec.scala +++ b/kernel/src/test/scala/RemoteActorSpec.scala @@ -1,12 +1,8 @@ package se.scalablesolutions.akka.kernel.actor -import concurrent.Lock -import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.TimeUnit import junit.framework.TestCase -import kernel.nio.{NettyClient, NettyServer} -import reactor._ - +import kernel.nio.{RemoteServer, RemoteClient} import org.junit.{Test, Before} import org.junit.Assert._ @@ -33,12 +29,12 @@ class RemoteActorSpec extends TestCase { new Thread(new Runnable() { def run = { - val server = new NettyServer + val server = new RemoteServer server.connect } }).start Thread.sleep(1000) - NettyClient.connect + RemoteClient.connect private val unit = TimeUnit.MILLISECONDS diff --git a/kernel/src/test/scala/RemoteSupervisorSpec.scala b/kernel/src/test/scala/RemoteSupervisorSpec.scala new file mode 100644 index 0000000000..1268536adc --- /dev/null +++ b/kernel/src/test/scala/RemoteSupervisorSpec.scala @@ -0,0 +1,634 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel + +import kernel.nio.{RemoteClient, RemoteServer} +import kernel.actor.{Supervisor, SupervisorFactory, Actor, StartSupervisor} +import kernel.config.ScalaConfig._ + +import com.jteigen.scalatest.JUnit4Runner +import org.junit.runner.RunWith +import org.scalatest._ + +object Log { + var messageLog: String = "" + var oneWayLog: String = "" +} +/** + * @author Jonas Bonér + */ +@RunWith(classOf[JUnit4Runner]) +class RemoteSupervisorSpec extends Suite { + + new Thread(new Runnable() { + def run = { + val server = new RemoteServer + server.connect + } + }).start + Thread.sleep(1000) + RemoteClient.connect + + var pingpong1: RemotePingPong1Actor = _ + var pingpong2: RemotePingPong2Actor = _ + var pingpong3: RemotePingPong3Actor = _ + + def testStartServer = { + Log.messageLog = "" + val sup = getSingleActorAllForOneSupervisor + sup ! StartSupervisor + + expect("pong") { + (pingpong1 !! Ping).getOrElse("nil") + } + } + + def testKillSingleActorOneForOne = { + Log.messageLog = "" + val sup = getSingleActorOneForOneSupervisor + sup ! StartSupervisor + Thread.sleep(500) + intercept(classOf[RuntimeException]) { + pingpong1 !! Die + } + Thread.sleep(500) + expect("DIE") { + Log.messageLog + } + } + + def testCallKillCallSingleActorOneForOne = { + Log.messageLog = "" + val sup = getSingleActorOneForOneSupervisor + sup ! StartSupervisor + Thread.sleep(500) + expect("pong") { + (pingpong1 !! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("ping") { + Log.messageLog + } + intercept(classOf[RuntimeException]) { + pingpong1 !! Die + } + Thread.sleep(500) + expect("pingDIE") { + Log.messageLog + } + expect("pong") { + (pingpong1 !! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pingDIEping") { + Log.messageLog + } + } + + def testKillSingleActorAllForOne = { + Log.messageLog = "" + val sup = getSingleActorAllForOneSupervisor + sup ! StartSupervisor + Thread.sleep(500) + intercept(classOf[RuntimeException]) { + pingpong1 !! Die + } + Thread.sleep(500) + expect("DIE") { + Log.messageLog + } + } + + def testCallKillCallSingleActorAllForOne = { + Log.messageLog = "" + val sup = getSingleActorAllForOneSupervisor + sup ! StartSupervisor + Thread.sleep(500) + expect("pong") { + (pingpong1 !! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("ping") { + Log.messageLog + } + intercept(classOf[RuntimeException]) { + pingpong1 !! Die + } + Thread.sleep(500) + expect("pingDIE") { + Log.messageLog + } + expect("pong") { + (pingpong1 !! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pingDIEping") { + Log.messageLog + } + } + + def testKillMultipleActorsOneForOne = { + Log.messageLog = "" + val sup = getMultipleActorsOneForOneConf + sup ! StartSupervisor + Thread.sleep(500) + intercept(classOf[RuntimeException]) { + pingpong3 !! Die + } + Thread.sleep(500) + expect("DIE") { + Log.messageLog + } + } + + def tesCallKillCallMultipleActorsOneForOne = { + Log.messageLog = "" + val sup = getMultipleActorsOneForOneConf + sup ! StartSupervisor + Thread.sleep(500) + expect("pong") { + (pingpong1 !! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pong") { + (pingpong2 !! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pong") { + (pingpong3 !! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pingpingping") { + Log.messageLog + } + intercept(classOf[RuntimeException]) { + pingpong2 !! Die + } + Thread.sleep(500) + expect("pingpingpingDIE") { + Log.messageLog + } + expect("pong") { + (pingpong1 !! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pong") { + (pingpong2 !! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pong") { + (pingpong3 !! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pingpingpingDIEpingpingping") { + Log.messageLog + } + } + + def testKillMultipleActorsAllForOne = { + Log.messageLog = "" + val sup = getMultipleActorsAllForOneConf + sup ! StartSupervisor + Thread.sleep(500) + intercept(classOf[RuntimeException]) { + pingpong2 !! Die + } + Thread.sleep(500) + expect("DIEDIEDIE") { + Log.messageLog + } + } + + def tesCallKillCallMultipleActorsAllForOne = { + Log.messageLog = "" + val sup = getMultipleActorsAllForOneConf + sup ! StartSupervisor + Thread.sleep(500) + expect("pong") { + (pingpong1 !! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pong") { + (pingpong2 !! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pong") { + (pingpong3 !! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pingpingping") { + Log.messageLog + } + intercept(classOf[RuntimeException]) { + pingpong2 !! Die + } + Thread.sleep(500) + expect("pingpingpingDIEDIEDIE") { + Log.messageLog + } + expect("pong") { + (pingpong1 !! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pong") { + (pingpong2 !! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pong") { + (pingpong3 !! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pingpingpingDIEDIEDIEpingpingping") { + Log.messageLog + } + } + + def testOneWayKillSingleActorOneForOne = { + Log.messageLog = "" + val sup = getSingleActorOneForOneSupervisor + sup ! StartSupervisor + Thread.sleep(500) + pingpong1 ! Die + Thread.sleep(500) + expect("DIE") { + Log.messageLog + } + } + + def testOneWayCallKillCallSingleActorOneForOne = { + Log.messageLog = "" + val sup = getSingleActorOneForOneSupervisor + sup ! StartSupervisor + Thread.sleep(500) + pingpong1 ! OneWay + Thread.sleep(500) + expect("oneway") { + Log.oneWayLog + } + pingpong1 ! Die + Thread.sleep(500) + expect("DIE") { + Log.messageLog + } + pingpong1 ! OneWay + Thread.sleep(500) + expect("onewayoneway") { + Log.oneWayLog + } + } + + /* + def testOneWayKillSingleActorAllForOne = { + Log.messageLog = "" + val sup = getSingleActorAllForOneSupervisor + sup ! StartSupervisor + Thread.sleep(500) + intercept(classOf[RuntimeException]) { + pingpong1 ! Die + } + Thread.sleep(500) + expect("DIE") { + Log.messageLog + } + } + + def testOneWayCallKillCallSingleActorAllForOne = { + Log.messageLog = "" + val sup = getSingleActorAllForOneSupervisor + sup ! StartSupervisor + Thread.sleep(500) + expect("pong") { + (pingpong1 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("ping") { + Log.messageLog + } + intercept(classOf[RuntimeException]) { + pingpong1 ! Die + } + Thread.sleep(500) + expect("pingDIE") { + Log.messageLog + } + expect("pong") { + (pingpong1 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pingDIEping") { + Log.messageLog + } + } + + def testOneWayKillMultipleActorsOneForOne = { + Log.messageLog = "" + val sup = getMultipleActorsOneForOneConf + sup ! StartSupervisor + Thread.sleep(500) + intercept(classOf[RuntimeException]) { + pingpong3 ! Die + } + Thread.sleep(500) + expect("DIE") { + Log.messageLog + } + } + + def tesOneWayCallKillCallMultipleActorsOneForOne = { + Log.messageLog = "" + val sup = getMultipleActorsOneForOneConf + sup ! StartSupervisor + Thread.sleep(500) + expect("pong") { + (pingpong1 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pong") { + (pingpong2 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pong") { + (pingpong3 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pingpingping") { + Log.messageLog + } + intercept(classOf[RuntimeException]) { + pingpong2 ! Die + } + Thread.sleep(500) + expect("pingpingpingDIE") { + Log.messageLog + } + expect("pong") { + (pingpong1 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pong") { + (pingpong2 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pong") { + (pingpong3 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pingpingpingDIEpingpingping") { + Log.messageLog + } + } + + def testOneWayKillMultipleActorsAllForOne = { + Log.messageLog = "" + val sup = getMultipleActorsAllForOneConf + sup ! StartSupervisor + Thread.sleep(500) + intercept(classOf[RuntimeException]) { + pingpong2 ! Die + } + Thread.sleep(500) + expect("DIEDIEDIE") { + Log.messageLog + } + } + + def tesOneWayCallKillCallMultipleActorsAllForOne = { + Log.messageLog = "" + val sup = getMultipleActorsAllForOneConf + sup ! StartSupervisor + Thread.sleep(500) + expect("pong") { + pingpong1 ! Ping + } + Thread.sleep(500) + expect("pong") { + (pingpong2 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pong") { + (pingpong3 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pingpingping") { + Log.messageLog + } + intercept(classOf[RuntimeException]) { + pingpong2 ! Die + } + Thread.sleep(500) + expect("pingpingpingDIEDIEDIE") { + Log.messageLog + } + expect("pong") { + (pingpong1 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pong") { + (pingpong2 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pong") { + (pingpong3 ! Ping).getOrElse("nil") + } + Thread.sleep(500) + expect("pingpingpingDIEDIEDIEpingpingping") { + Log.messageLog + } + } + */ + + /* + def testNestedSupervisorsTerminateFirstLevelActorAllForOne = { + Log.messageLog = "" + val sup = getNestedSupervisorsAllForOneConf + sup ! StartSupervisor + intercept(classOf[RuntimeException]) { + pingpong1 !! Die + } + Thread.sleep(500) + expect("DIEDIEDIE") { + Log.messageLog + } + } +*/ + + // ============================================= + // Creat some supervisors with different configurations + + def getSingleActorAllForOneSupervisor: Supervisor = { + + // Create an abstract SupervisorContainer that works for all implementations + // of the different Actors (Services). + // + // Then create a concrete container in which we mix in support for the specific + // implementation of the Actors we want to use. + + pingpong1 = new RemotePingPong1Actor + pingpong1.makeRemote + + object factory extends SupervisorFactory { + override def getSupervisorConfig: SupervisorConfig = { + SupervisorConfig( + RestartStrategy(AllForOne, 3, 100), + Worker( + pingpong1, + LifeCycle(Permanent, 100)) + :: Nil) + } + } + factory.newSupervisor + } + + def getSingleActorOneForOneSupervisor: Supervisor = { + pingpong1 = new RemotePingPong1Actor + pingpong1.makeRemote + + object factory extends SupervisorFactory { + override def getSupervisorConfig: SupervisorConfig = { + SupervisorConfig( + RestartStrategy(OneForOne, 3, 100), + Worker( + pingpong1, + LifeCycle(Permanent, 100)) + :: Nil) + } + } + factory.newSupervisor + } + + def getMultipleActorsAllForOneConf: Supervisor = { + pingpong1 = new RemotePingPong1Actor + pingpong1.makeRemote + pingpong2 = new RemotePingPong2Actor + pingpong2.makeRemote + pingpong3 = new RemotePingPong3Actor + pingpong3.makeRemote + + object factory extends SupervisorFactory { + override def getSupervisorConfig: SupervisorConfig = { + SupervisorConfig( + RestartStrategy(AllForOne, 3, 100), + Worker( + pingpong1, + LifeCycle(Permanent, 100)) + :: + Worker( + pingpong2, + LifeCycle(Permanent, 100)) + :: + Worker( + pingpong3, + LifeCycle(Permanent, 100)) + :: Nil) + } + } + factory.newSupervisor + } + + def getMultipleActorsOneForOneConf: Supervisor = { + pingpong1 = new RemotePingPong1Actor + pingpong1.makeRemote + pingpong2 = new RemotePingPong2Actor + pingpong2.makeRemote + pingpong3 = new RemotePingPong3Actor + pingpong3.makeRemote + + object factory extends SupervisorFactory { + override def getSupervisorConfig: SupervisorConfig = { + SupervisorConfig( + RestartStrategy(OneForOne, 3, 100), + Worker( + pingpong1, + LifeCycle(Permanent, 100)) + :: + Worker( + pingpong2, + LifeCycle(Permanent, 100)) + :: + Worker( + pingpong3, + LifeCycle(Permanent, 100)) + :: Nil) + } + } + factory.newSupervisor + } + + def getNestedSupervisorsAllForOneConf: Supervisor = { + pingpong1 = new RemotePingPong1Actor + pingpong1.makeRemote + pingpong2 = new RemotePingPong2Actor + pingpong2.makeRemote + pingpong3 = new RemotePingPong3Actor + pingpong3.makeRemote + + object factory extends SupervisorFactory { + override def getSupervisorConfig: SupervisorConfig = { + SupervisorConfig( + RestartStrategy(AllForOne, 3, 100), + Worker( + pingpong1, + LifeCycle(Permanent, 100)) + :: + SupervisorConfig( + RestartStrategy(AllForOne, 3, 100), + Worker( + pingpong2, + LifeCycle(Permanent, 100)) + :: + Worker( + pingpong3, + LifeCycle(Permanent, 100)) + :: Nil) + :: Nil) + } + } + factory.newSupervisor + } + +} +class RemotePingPong1Actor extends Actor { + override def receive: PartialFunction[Any, Unit] = { + case Ping => + Log.messageLog += "ping" + reply("pong") + + case OneWay => + Log.oneWayLog += "oneway" + + case Die => + throw new RuntimeException("DIE") + } + override protected def postRestart(reason: AnyRef, config: Option[AnyRef]) { + Log.messageLog += reason.asInstanceOf[Exception].getMessage + } +} + +class RemotePingPong2Actor extends Actor { + override def receive: PartialFunction[Any, Unit] = { + case Ping => + Log.messageLog += "ping" + reply("pong") + case Die => + throw new RuntimeException("DIE") + } + override protected def postRestart(reason: AnyRef, config: Option[AnyRef]) { + Log.messageLog += reason.asInstanceOf[Exception].getMessage + } +} + +class RemotePingPong3Actor extends Actor { + override def receive: PartialFunction[Any, Unit] = { + case Ping => + Log.messageLog += "ping" + reply("pong") + case Die => + throw new RuntimeException("DIE") + } + + override protected def postRestart(reason: AnyRef, config: Option[AnyRef]) { + Log.messageLog += reason.asInstanceOf[Exception].getMessage + } +} diff --git a/kernel/src/test/scala/ThreadBasedDispatcherTest.scala b/kernel/src/test/scala/ThreadBasedDispatcherTest.scala index effe74ea1c..4f0f38c93a 100644 --- a/kernel/src/test/scala/ThreadBasedDispatcherTest.scala +++ b/kernel/src/test/scala/ThreadBasedDispatcherTest.scala @@ -1,20 +1,20 @@ package se.scalablesolutions.akka.kernel.reactor -import java.util.concurrent.CountDownLatch -import java.util.concurrent.CyclicBarrier -import java.util.concurrent.TimeUnit +import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.Lock import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.{Executors, CountDownLatch, CyclicBarrier, TimeUnit} import org.junit.Before import org.junit.Test import org.junit.Assert._ +import junit.framework.TestCase -class ThreadBasedDispatcherTest { +class ThreadBasedDispatcherTest extends TestCase { private var threadingIssueDetected: AtomicBoolean = null @Before - def setUp = { + override def setUp = { threadingIssueDetected = new AtomicBoolean(false) } @@ -33,33 +33,37 @@ class ThreadBasedDispatcherTest { internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder } - class TestMessageHandle(handleLatch: CountDownLatch) extends MessageHandler { - val guardLock: Lock = new ReentrantLock - - def handle(message: MessageHandle) { - try { - if (threadingIssueDetected.get) return - if (guardLock.tryLock) { - handleLatch.countDown - } else { - threadingIssueDetected.set(true) - } - } catch { - case e: Exception => threadingIssueDetected.set(true) - } finally { - guardLock.unlock - } - } - } - private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = { val guardLock = new ReentrantLock - val handleLatch = new CountDownLatch(100) + val handleLatch = new CountDownLatch(10) val key = "key" - val dispatcher = new EventBasedSingleThreadDispatcher - dispatcher.registerHandler(key, new TestMessageHandle(handleLatch)) + val pool = ThreadPoolBuilder.newBuilder + .newThreadPoolWithBoundedBlockingQueue(100) + .setCorePoolSize(2) + .setMaxPoolSize(4) + .setKeepAliveTimeInMillis(60000) + .setRejectionPolicy(new CallerRunsPolicy) + .build + val dispatcher = new EventBasedThreadPoolDispatcher(pool) + dispatcher.registerHandler(key, new MessageHandler { + def handle(message: MessageHandle) { + try { + if (threadingIssueDetected.get) return + if (guardLock.tryLock) { + Thread.sleep(100) + handleLatch.countDown + } else { + threadingIssueDetected.set(true) + } + } catch { + case e: Exception => threadingIssueDetected.set(true); e.printStackTrace + } finally { + guardLock.unlock + } + } + }) dispatcher.start - for (i <- 0 until 100) { + for (i <- 0 until 10) { dispatcher.messageQueue.append(new MessageHandle(key, new Object, None, None)) } assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) @@ -70,7 +74,14 @@ class ThreadBasedDispatcherTest { val handlersBarrier = new CyclicBarrier(3) val key1 = "key1" val key2 = "key2" - val dispatcher = new EventBasedThreadPoolDispatcher + val pool = ThreadPoolBuilder.newBuilder + .newThreadPoolWithBoundedBlockingQueue(100) + .setCorePoolSize(2) + .setMaxPoolSize(4) + .setKeepAliveTimeInMillis(60000) + .setRejectionPolicy(new CallerRunsPolicy) + .build + val dispatcher = new EventBasedThreadPoolDispatcher(pool) dispatcher.registerHandler(key1, new MessageHandler { def handle(message: MessageHandle) = synchronized { try {handlersBarrier.await(1, TimeUnit.SECONDS)} @@ -88,14 +99,20 @@ class ThreadBasedDispatcherTest { dispatcher.messageQueue.append(new MessageHandle(key2, "Sending Message 2", None, None)) handlersBarrier.await(5, TimeUnit.SECONDS) assertFalse(threadingIssueDetected.get) - //dispatcher.shutdown } private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = { val handleLatch = new CountDownLatch(200) val key1 = "key1" val key2 = "key2" - val dispatcher = new EventBasedSingleThreadDispatcher + val pool = ThreadPoolBuilder.newBuilder + .newThreadPoolWithBoundedBlockingQueue(100) + .setCorePoolSize(2) + .setMaxPoolSize(4) + .setKeepAliveTimeInMillis(60000) + .setRejectionPolicy(new CallerRunsPolicy) + .build + val dispatcher = new EventBasedThreadPoolDispatcher(pool) dispatcher.registerHandler(key1, new MessageHandler { var currentValue = -1; def handle(message: MessageHandle) { @@ -125,6 +142,5 @@ class ThreadBasedDispatcherTest { } assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertFalse(threadingIssueDetected.get) - dispatcher.shutdown } } diff --git a/pom.xml b/pom.xml index ac7ae197e1..84ea905ad5 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ 0.1 se.scalablesolutions.akka - 2.7.4 + 2.7.5