From c2d3680a27dab6ea69fd320f1d026dc8a044d2dc Mon Sep 17 00:00:00 2001 From: Jan Van Besien Date: Wed, 3 Mar 2010 22:55:46 +0100 Subject: [PATCH] replaced synchronization in actor with explicit lock. Use tryLock in the dispatcher to give up immediately when the lock is already held. --- akka-core/pom.xml | 11 + akka-core/src/main/scala/actor/Actor.scala | 197 +++++++++--------- ...sedEventDrivenWorkStealingDispatcher.scala | 54 +++-- ...entDrivenWorkStealingDispatcherTest.scala} | 8 +- 4 files changed, 149 insertions(+), 121 deletions(-) rename akka-core/src/test/scala/{WorkStealingTest.scala => ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala} (91%) diff --git a/akka-core/pom.xml b/akka-core/pom.xml index d6ca57ebfe..6a4ef1cc64 100644 --- a/akka-core/pom.xml +++ b/akka-core/pom.xml @@ -108,4 +108,15 @@ test + + + + org.apache.maven.plugins + maven-surefire-plugin + + -agentlib:yjpagent=monitors + + + + diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 932c124c95..4244d4b7c2 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -21,6 +21,7 @@ import org.multiverse.api.ThreadLocalTransaction._ import java.util.{Deque, HashSet} import java.net.InetSocketAddress import java.util.concurrent.{CopyOnWriteArrayList, LinkedBlockingDeque} +import java.util.concurrent.locks.ReentrantLock /** * Implements the Transactor abstraction. E.g. a transactional actor. @@ -72,7 +73,7 @@ object Actor extends Logging { val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") val PORT = config.getInt("akka.remote.server.port", 9999) - object Sender{ + object Sender { implicit val Self: Option[Actor] = None } @@ -83,7 +84,7 @@ object Actor extends Logging { *
    * import Actor._
    *
-   * val a = actor  {
+   * val a = actor   {
    *   case msg => ... // handle message
    * }
    * 
@@ -100,9 +101,9 @@ object Actor extends Logging { *
    * import Actor._
    *
-   * val a = actor  {
+   * val a = actor   {
    *   ... // init stuff
-   * } receive  {
+   * } receive   {
    *   case msg => ... // handle message
    * }
    * 
@@ -130,7 +131,7 @@ object Actor extends Logging { *
    * import Actor._
    *
-   * spawn  {
+   * spawn   {
    *   ... // do stuff
    * }
    * 
@@ -153,7 +154,7 @@ object Actor extends Logging { *
    * import Actor._
    *
-   * val a = actor(LifeCycle(Temporary))  {
+   * val a = actor(LifeCycle(Temporary))   {
    *   case msg => ... // handle message
    * }
    * 
@@ -171,7 +172,7 @@ object Actor extends Logging { *
    * import Actor._
    *
-   * val a = actor("localhost", 9999)  {
+   * val a = actor("localhost", 9999)   {
    *   case msg => ... // handle message
    * }
    * 
@@ -213,15 +214,7 @@ trait Actor extends TransactionManagement { @volatile private[this] var _isEventBased: Boolean = false @volatile private[akka] var _isKilled = false - /** - * True if a dispatcher is currently dispatching a message on this actor, false otherwise. - *

- * This flag is guaranteed to be seen as true by other threads only if a dispatcher is really dispatching messages on it. - * A thread might however sometimes see this flag as false, even though a dispatcher is still dispatching messages on it. - *

- * In other words, the flag can be used safely to decide that no extra dispatching is required (if the flag is true). - */ - @volatile private[akka] var _isDispatching = false + private[akka] val lock = new ReentrantLock private var _hotswap: Option[PartialFunction[Any, Unit]] = None private[akka] var _remoteAddress: Option[InetSocketAddress] = None @@ -350,7 +343,7 @@ trait Actor extends TransactionManagement { *

* Example code: *

-   *   def receive =  {
+   *   def receive =   {
    *     case Ping =>
    *       println("got a ping")
    *       reply("pong")
@@ -409,20 +402,29 @@ trait Actor extends TransactionManagement {
   // ==== API ====
   // =============
 
+  def withLock[B](f: => B): B = {
+    lock.lock
+    try {
+      return f
+    } finally {
+      lock.unlock
+    }
+  }
+
   /**
    * Starts up the actor and its message queue.
    */
-  def start: Actor = synchronized {
+  def start: Actor = withLock {
     if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'exit'")
     if (!_isRunning) {
-      if (messageDispatcher.isShutdown && 
-          messageDispatcher.isInstanceOf[Dispatchers.globalExecutorBasedEventDrivenDispatcher.type]) {
+      if (messageDispatcher.isShutdown &&
+              messageDispatcher.isInstanceOf[Dispatchers.globalExecutorBasedEventDrivenDispatcher.type]) {
         messageDispatcher.asInstanceOf[ExecutorBasedEventDrivenDispatcher].init
       }
       messageDispatcher.register(this)
       messageDispatcher.start
       _isRunning = true
-      init 
+      init
     }
     Actor.log.debug("[%s] has started", toString)
     ActorRegistry.register(this)
@@ -438,7 +440,7 @@ trait Actor extends TransactionManagement {
   /**
    * Shuts down the actor its dispatcher and message queue.
    */
-  def stop = synchronized {
+  def stop = withLock {
     if (_isRunning) {
       messageDispatcher.unregister(this)
       if (messageDispatcher.canBeShutDown) messageDispatcher.shutdown // shut down in the dispatcher's references is zero
@@ -449,7 +451,6 @@ trait Actor extends TransactionManagement {
       _remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid))
     }
   }
-
   def isRunning = _isRunning
 
   /**
@@ -554,7 +555,7 @@ trait Actor extends TransactionManagement {
     } else throw new IllegalStateException(
       "Actor has not been started, you need to invoke 'actor.start' before using it")
   }
-  
+
   /**
    * Forwards the message and passes the original sender actor as the sender.
    * 

@@ -583,12 +584,12 @@ trait Actor extends TransactionManagement { case None => throw new IllegalStateException( "\n\tNo sender in scope, can't reply. " + - "\n\tYou have probably used the '!' method to either; " + - "\n\t\t1. Send a message to a remote actor which does not have a contact address." + - "\n\t\t2. Send a message from an instance that is *not* an actor" + - "\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " + - "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" + - "\n\tthat will be bound by the argument passed to 'reply'. Alternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.") + "\n\tYou have probably used the '!' method to either; " + + "\n\t\t1. Send a message to a remote actor which does not have a contact address." + + "\n\t\t2. Send a message from an instance that is *not* an actor" + + "\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " + + "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" + + "\n\tthat will be bound by the argument passed to 'reply'. Alternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.") case Some(future) => future.completeWithResult(message) } @@ -603,7 +604,7 @@ trait Actor extends TransactionManagement { /** * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. */ - def dispatcher_=(md: MessageDispatcher): Unit = synchronized { + def dispatcher_=(md: MessageDispatcher): Unit = withLock { if (!_isRunning) { messageDispatcher.unregister(this) messageDispatcher = md @@ -647,7 +648,7 @@ trait Actor extends TransactionManagement { * TransactionManagement.disableTransactions *

*/ - def makeTransactionRequired = synchronized { + def makeTransactionRequired = withLock { if (_isRunning) throw new IllegalArgumentException( "Can not make actor transaction required after it has been started") else isTransactionRequiresNew = true @@ -798,27 +799,27 @@ trait Actor extends TransactionManagement { protected[akka] def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = { if (_remoteAddress.isDefined) { val requestBuilder = RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) - .setTarget(this.getClass.getName) - .setTimeout(this.timeout) - .setUuid(this.id) - .setIsActor(true) - .setIsOneWay(true) - .setIsEscaped(false) - + .setId(RemoteRequestIdFactory.nextId) + .setTarget(this.getClass.getName) + .setTimeout(this.timeout) + .setUuid(this.id) + .setIsActor(true) + .setIsOneWay(true) + .setIsEscaped(false) + val id = registerSupervisorAsRemoteActor - if(id.isDefined) + if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) // set the source fields used to reply back to the original sender // (i.e. not the remote proxy actor) - if(sender.isDefined) { + if (sender.isDefined) { val s = sender.get requestBuilder.setSourceTarget(s.getClass.getName) requestBuilder.setSourceUuid(s.uuid) - val (host,port) = s._replyToAddress.map(a => (a.getHostName,a.getPort)).getOrElse((Actor.HOSTNAME,Actor.PORT)) - + val (host, port) = s._replyToAddress.map(a => (a.getHostName, a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT)) + log.debug("Setting sending actor as %s @ %s:%s", s.getClass.getName, host, port) requestBuilder.setSourceHostname(host) @@ -831,25 +832,25 @@ trait Actor extends TransactionManagement { if (_isEventBased) { _mailbox.add(invocation) if (_isSuspended) invocation.send - } + } else invocation.send } } protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout( - message: Any, - timeout: Long, - senderFuture: Option[CompletableFuture]): CompletableFuture = { + message: Any, + timeout: Long, + senderFuture: Option[CompletableFuture]): CompletableFuture = { if (_remoteAddress.isDefined) { val requestBuilder = RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) - .setTarget(this.getClass.getName) - .setTimeout(this.timeout) - .setUuid(this.id) - .setIsActor(true) - .setIsOneWay(false) - .setIsEscaped(false) + .setId(RemoteRequestIdFactory.nextId) + .setTarget(this.getClass.getName) + .setTimeout(this.timeout) + .setUuid(this.id) + .setIsActor(true) + .setIsOneWay(false) + .setIsEscaped(false) RemoteProtocolBuilder.setMessage(message, requestBuilder) val id = registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) @@ -858,7 +859,7 @@ trait Actor extends TransactionManagement { else throw new IllegalStateException("Expected a future from remote call to actor " + toString) } else { val future = if (senderFuture.isDefined) senderFuture.get - else new DefaultCompletableFuture(timeout) + else new DefaultCompletableFuture(timeout) val invocation = new MessageInvocation(this, message, Some(future), None, currentTransaction.get) if (_isEventBased) { _mailbox.add(invocation) @@ -871,18 +872,14 @@ trait Actor extends TransactionManagement { /** * Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods. */ - private[akka] def invoke(messageHandle: MessageInvocation) = { - _isDispatching = true - synchronized { - try { - if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle) - else dispatch(messageHandle) - } catch { - case e => - Actor.log.error(e, "Could not invoke actor [%s]", this) - throw e - } - _isDispatching = false + private[akka] def invoke(messageHandle: MessageInvocation) = withLock { + try { + if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle) + else dispatch(messageHandle) + } catch { + case e => + Actor.log.error(e, "Could not invoke actor [%s]", this) + throw e } } @@ -921,7 +918,7 @@ trait Actor extends TransactionManagement { if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function else throw new IllegalArgumentException( "Actor " + toString + " could not process message [" + message + "]" + - "\n\tsince no matching 'case' clause in its 'receive' method could be found") + "\n\tsince no matching 'case' clause in its 'receive' method could be found") } finally { decrementTransaction } @@ -931,8 +928,8 @@ trait Actor extends TransactionManagement { if (isTransactionRequiresNew && !isTransactionInScope) { if (senderFuture.isEmpty) throw new StmException( "Can't continue transaction in a one-way fire-forget message send" + - "\n\tE.g. using Actor '!' method or Active Object 'void' method" + - "\n\tPlease use the Actor '!!' method or Active Object method with non-void return type") + "\n\tE.g. using Actor '!' method or Active Object 'void' method" + + "\n\tPlease use the Actor '!!' method or Active Object method with non-void return type") atomic { proceed } @@ -956,23 +953,23 @@ trait Actor extends TransactionManagement { private def base: PartialFunction[Any, Unit] = lifeCycles orElse (_hotswap getOrElse receive) private val lifeCycles: PartialFunction[Any, Unit] = { - case HotSwap(code) => _hotswap = code - case Restart(reason) => restart(reason) + case HotSwap(code) => _hotswap = code + case Restart(reason) => restart(reason) case Exit(dead, reason) => handleTrapExit(dead, reason) - case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message") + case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message") } private[this] def handleTrapExit(dead: Actor, reason: Throwable): Unit = { if (trapExit.exists(_.isAssignableFrom(reason.getClass))) { if (faultHandler.isDefined) { faultHandler.get match { - // FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy + // FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy case AllForOneStrategy(maxNrOfRetries, withinTimeRange) => restartLinkedActors(reason) case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => dead.restart(reason) } } else throw new IllegalStateException( "No 'faultHandler' defined for an actor with the 'trapExit' member field defined " + - "\n\tto non-empty list of exception classes - can't proceed " + toString) + "\n\tto non-empty list of exception classes - can't proceed " + toString) } else { if (_supervisor.isDefined) _supervisor.get ! Exit(dead, reason) // if 'trapExit' is not defined then pass the Exit on } @@ -997,14 +994,14 @@ trait Actor extends TransactionManagement { } } - private[Actor] def restart(reason: Throwable) = synchronized { + private[Actor] def restart(reason: Throwable) = withLock { preRestart(reason) Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) postRestart(reason) _isKilled = false } - private[akka] def registerSupervisorAsRemoteActor: Option[String] = synchronized { + private[akka] def registerSupervisorAsRemoteActor: Option[String] = withLock { if (_supervisor.isDefined) { RemoteClient.clientFor(_remoteAddress.get).registerSupervisorForActor(this) Some(_supervisor.get.uuid) @@ -1021,26 +1018,26 @@ trait Actor extends TransactionManagement { private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) { if (!message.isInstanceOf[String] && - !message.isInstanceOf[Byte] && - !message.isInstanceOf[Int] && - !message.isInstanceOf[Long] && - !message.isInstanceOf[Float] && - !message.isInstanceOf[Double] && - !message.isInstanceOf[Boolean] && - !message.isInstanceOf[Char] && - !message.isInstanceOf[Tuple2[_, _]] && - !message.isInstanceOf[Tuple3[_, _, _]] && - !message.isInstanceOf[Tuple4[_, _, _, _]] && - !message.isInstanceOf[Tuple5[_, _, _, _, _]] && - !message.isInstanceOf[Tuple6[_, _, _, _, _, _]] && - !message.isInstanceOf[Tuple7[_, _, _, _, _, _, _]] && - !message.isInstanceOf[Tuple8[_, _, _, _, _, _, _, _]] && - !message.getClass.isArray && - !message.isInstanceOf[List[_]] && - !message.isInstanceOf[scala.collection.immutable.Map[_, _]] && - !message.isInstanceOf[scala.collection.immutable.Set[_]] && - !message.isInstanceOf[scala.collection.immutable.Tree[_, _]] && - !message.getClass.isAnnotationPresent(Annotations.immutable)) { + !message.isInstanceOf[Byte] && + !message.isInstanceOf[Int] && + !message.isInstanceOf[Long] && + !message.isInstanceOf[Float] && + !message.isInstanceOf[Double] && + !message.isInstanceOf[Boolean] && + !message.isInstanceOf[Char] && + !message.isInstanceOf[Tuple2[_, _]] && + !message.isInstanceOf[Tuple3[_, _, _]] && + !message.isInstanceOf[Tuple4[_, _, _, _]] && + !message.isInstanceOf[Tuple5[_, _, _, _, _]] && + !message.isInstanceOf[Tuple6[_, _, _, _, _, _]] && + !message.isInstanceOf[Tuple7[_, _, _, _, _, _, _]] && + !message.isInstanceOf[Tuple8[_, _, _, _, _, _, _, _]] && + !message.getClass.isArray && + !message.isInstanceOf[List[_]] && + !message.isInstanceOf[scala.collection.immutable.Map[_, _]] && + !message.isInstanceOf[scala.collection.immutable.Set[_]] && + !message.isInstanceOf[scala.collection.immutable.Tree[_, _]] && + !message.getClass.isAnnotationPresent(Annotations.immutable)) { Serializer.Java.deepClone(message) } else message } else message @@ -1053,8 +1050,8 @@ trait Actor extends TransactionManagement { override def equals(that: Any): Boolean = { that != null && - that.isInstanceOf[Actor] && - that.asInstanceOf[Actor]._uuid == _uuid + that.isInstanceOf[Actor] && + that.asInstanceOf[Actor]._uuid == _uuid } override def toString(): String = "Actor[" + id + ":" + uuid + "]" diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 3aba413673..3342c3a7f0 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -22,25 +22,35 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess init def dispatch(invocation: MessageInvocation) = if (active) { - if (!invocation.receiver._isDispatching) { - executor.execute(new Runnable() { - def run = { - processMailbox(invocation) - stealAndScheduleWork(invocation.receiver) - } - }) - } + // TODO: detect blocking with trylock ?! -> good idea... lets try that + executor.execute(new Runnable() { + def run = { + processMailbox(invocation) + stealAndScheduleWork(invocation.receiver) + } + }) } else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started") /** * Process the messages in the mailbox of the receiver of the invocation. */ private def processMailbox(invocation: MessageInvocation) = { - var messageInvocation = invocation.receiver._mailbox.poll - while (messageInvocation != null) { - log.debug("[%s] is processing [%s] in [%s]", invocation.receiver, messageInvocation.message, Thread.currentThread.getName) - messageInvocation.invoke - messageInvocation = invocation.receiver._mailbox.poll + val lockAcquired = invocation.receiver.lock.tryLock + if (lockAcquired) { + log.debug("[%s] has acquired lock for [%s] in [%s]", invocation.receiver, invocation.message, Thread.currentThread.getName) + try { + var messageInvocation = invocation.receiver._mailbox.poll + while (messageInvocation != null) { + log.debug("[%s] is processing [%s] in [%s]", invocation.receiver, messageInvocation.message, Thread.currentThread.getName) + messageInvocation.invoke + messageInvocation = invocation.receiver._mailbox.poll + } + } finally { + invocation.receiver.lock.unlock + } + } else { + // lock not acquired -> other dispatcher was busy -> no need to do anything + log.debug("[%s] has NOT acquired lock for [%s] in [%s]", invocation.receiver, invocation.message, Thread.currentThread.getName) } } @@ -71,15 +81,19 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess return None } + + override def register(actor: Actor) = { + super.register(actor) + executor.execute(new Runnable() { + def run = { + stealAndScheduleWork(actor) + } + }) + actor // TODO: why is this necessary? + } + def start = if (!active) { active = true - // TODO: prestart - // executor.execute(new Runnable() { - // def run = { - // // TODO: how to know which actor started me? - // // stealWork() - // } - // }) } def shutdown = if (active) { diff --git a/akka-core/src/test/scala/WorkStealingTest.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala similarity index 91% rename from akka-core/src/test/scala/WorkStealingTest.scala rename to akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala index 9eddd5d392..bc4d62ff08 100644 --- a/akka-core/src/test/scala/WorkStealingTest.scala +++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala @@ -8,7 +8,7 @@ import se.scalablesolutions.akka.dispatch.Dispatchers import java.util.Random -class WorkStealingTest extends JUnitSuite with MustMatchers with ActorTestUtil { +class ExecutorBasedEventDrivenWorkStealingDispatcherTest extends JUnitSuite with MustMatchers with ActorTestUtil { val finishedCounter = new CountDownLatch(101) @@ -18,7 +18,10 @@ class WorkStealingTest extends JUnitSuite with MustMatchers with ActorTestUtil { val poolDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher") class SlowActor extends Actor { + messageDispatcher = poolDispatcher +// id = "SlowActor" + val rnd = new Random def receive = { case x: Int => { @@ -31,7 +34,10 @@ class WorkStealingTest extends JUnitSuite with MustMatchers with ActorTestUtil { } class FastActor extends Actor { + messageDispatcher = poolDispatcher +// id = "FastActor" + def receive = { case x: Int => { // println("f processing " + x)