diff --git a/akka-core/pom.xml b/akka-core/pom.xml
index d6ca57ebfe..6a4ef1cc64 100644
--- a/akka-core/pom.xml
+++ b/akka-core/pom.xml
@@ -108,4 +108,15 @@
* import Actor._
*
- * val a = actor {
+ * val a = actor {
* case msg => ... // handle message
* }
*
@@ -100,9 +101,9 @@ object Actor extends Logging {
*
* import Actor._
*
- * val a = actor {
+ * val a = actor {
* ... // init stuff
- * } receive {
+ * } receive {
* case msg => ... // handle message
* }
*
@@ -130,7 +131,7 @@ object Actor extends Logging {
*
* import Actor._
*
- * spawn {
+ * spawn {
* ... // do stuff
* }
*
@@ -153,7 +154,7 @@ object Actor extends Logging {
*
* import Actor._
*
- * val a = actor(LifeCycle(Temporary)) {
+ * val a = actor(LifeCycle(Temporary)) {
* case msg => ... // handle message
* }
*
@@ -171,7 +172,7 @@ object Actor extends Logging {
*
* import Actor._
*
- * val a = actor("localhost", 9999) {
+ * val a = actor("localhost", 9999) {
* case msg => ... // handle message
* }
*
@@ -213,15 +214,7 @@ trait Actor extends TransactionManagement {
@volatile private[this] var _isEventBased: Boolean = false
@volatile private[akka] var _isKilled = false
- /**
- * True if a dispatcher is currently dispatching a message on this actor, false otherwise.
- *
- * This flag is guaranteed to be seen as true by other threads only if a dispatcher is really dispatching messages on it.
- * A thread might however sometimes see this flag as false, even though a dispatcher is still dispatching messages on it.
- *
- * In other words, the flag can be used safely to decide that no extra dispatching is required (if the flag is true).
- */
- @volatile private[akka] var _isDispatching = false
+ private[akka] val lock = new ReentrantLock
private var _hotswap: Option[PartialFunction[Any, Unit]] = None
private[akka] var _remoteAddress: Option[InetSocketAddress] = None
@@ -350,7 +343,7 @@ trait Actor extends TransactionManagement {
*
* Example code:
*
- * def receive = {
+ * def receive = {
* case Ping =>
* println("got a ping")
* reply("pong")
@@ -409,20 +402,29 @@ trait Actor extends TransactionManagement {
// ==== API ====
// =============
+ def withLock[B](f: => B): B = {
+ lock.lock
+ try {
+ return f
+ } finally {
+ lock.unlock
+ }
+ }
+
/**
* Starts up the actor and its message queue.
*/
- def start: Actor = synchronized {
+ def start: Actor = withLock {
if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'exit'")
if (!_isRunning) {
- if (messageDispatcher.isShutdown &&
- messageDispatcher.isInstanceOf[Dispatchers.globalExecutorBasedEventDrivenDispatcher.type]) {
+ if (messageDispatcher.isShutdown &&
+ messageDispatcher.isInstanceOf[Dispatchers.globalExecutorBasedEventDrivenDispatcher.type]) {
messageDispatcher.asInstanceOf[ExecutorBasedEventDrivenDispatcher].init
}
messageDispatcher.register(this)
messageDispatcher.start
_isRunning = true
- init
+ init
}
Actor.log.debug("[%s] has started", toString)
ActorRegistry.register(this)
@@ -438,7 +440,7 @@ trait Actor extends TransactionManagement {
/**
* Shuts down the actor its dispatcher and message queue.
*/
- def stop = synchronized {
+ def stop = withLock {
if (_isRunning) {
messageDispatcher.unregister(this)
if (messageDispatcher.canBeShutDown) messageDispatcher.shutdown // shut down in the dispatcher's references is zero
@@ -449,7 +451,6 @@ trait Actor extends TransactionManagement {
_remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid))
}
}
-
def isRunning = _isRunning
/**
@@ -554,7 +555,7 @@ trait Actor extends TransactionManagement {
} else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
-
+
/**
* Forwards the message and passes the original sender actor as the sender.
*
@@ -583,12 +584,12 @@ trait Actor extends TransactionManagement {
case None =>
throw new IllegalStateException(
"\n\tNo sender in scope, can't reply. " +
- "\n\tYou have probably used the '!' method to either; " +
- "\n\t\t1. Send a message to a remote actor which does not have a contact address." +
- "\n\t\t2. Send a message from an instance that is *not* an actor" +
- "\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " +
- "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" +
- "\n\tthat will be bound by the argument passed to 'reply'. Alternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.")
+ "\n\tYou have probably used the '!' method to either; " +
+ "\n\t\t1. Send a message to a remote actor which does not have a contact address." +
+ "\n\t\t2. Send a message from an instance that is *not* an actor" +
+ "\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " +
+ "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" +
+ "\n\tthat will be bound by the argument passed to 'reply'. Alternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.")
case Some(future) =>
future.completeWithResult(message)
}
@@ -603,7 +604,7 @@ trait Actor extends TransactionManagement {
/**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/
- def dispatcher_=(md: MessageDispatcher): Unit = synchronized {
+ def dispatcher_=(md: MessageDispatcher): Unit = withLock {
if (!_isRunning) {
messageDispatcher.unregister(this)
messageDispatcher = md
@@ -647,7 +648,7 @@ trait Actor extends TransactionManagement {
* TransactionManagement.disableTransactions
*
*/
- def makeTransactionRequired = synchronized {
+ def makeTransactionRequired = withLock {
if (_isRunning) throw new IllegalArgumentException(
"Can not make actor transaction required after it has been started")
else isTransactionRequiresNew = true
@@ -798,27 +799,27 @@ trait Actor extends TransactionManagement {
protected[akka] def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = {
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
- .setId(RemoteRequestIdFactory.nextId)
- .setTarget(this.getClass.getName)
- .setTimeout(this.timeout)
- .setUuid(this.id)
- .setIsActor(true)
- .setIsOneWay(true)
- .setIsEscaped(false)
-
+ .setId(RemoteRequestIdFactory.nextId)
+ .setTarget(this.getClass.getName)
+ .setTimeout(this.timeout)
+ .setUuid(this.id)
+ .setIsActor(true)
+ .setIsOneWay(true)
+ .setIsEscaped(false)
+
val id = registerSupervisorAsRemoteActor
- if(id.isDefined)
+ if (id.isDefined)
requestBuilder.setSupervisorUuid(id.get)
// set the source fields used to reply back to the original sender
// (i.e. not the remote proxy actor)
- if(sender.isDefined) {
+ if (sender.isDefined) {
val s = sender.get
requestBuilder.setSourceTarget(s.getClass.getName)
requestBuilder.setSourceUuid(s.uuid)
- val (host,port) = s._replyToAddress.map(a => (a.getHostName,a.getPort)).getOrElse((Actor.HOSTNAME,Actor.PORT))
-
+ val (host, port) = s._replyToAddress.map(a => (a.getHostName, a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT))
+
log.debug("Setting sending actor as %s @ %s:%s", s.getClass.getName, host, port)
requestBuilder.setSourceHostname(host)
@@ -831,25 +832,25 @@ trait Actor extends TransactionManagement {
if (_isEventBased) {
_mailbox.add(invocation)
if (_isSuspended) invocation.send
- }
+ }
else
invocation.send
}
}
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
- message: Any,
- timeout: Long,
- senderFuture: Option[CompletableFuture]): CompletableFuture = {
+ message: Any,
+ timeout: Long,
+ senderFuture: Option[CompletableFuture]): CompletableFuture = {
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
- .setId(RemoteRequestIdFactory.nextId)
- .setTarget(this.getClass.getName)
- .setTimeout(this.timeout)
- .setUuid(this.id)
- .setIsActor(true)
- .setIsOneWay(false)
- .setIsEscaped(false)
+ .setId(RemoteRequestIdFactory.nextId)
+ .setTarget(this.getClass.getName)
+ .setTimeout(this.timeout)
+ .setUuid(this.id)
+ .setIsActor(true)
+ .setIsOneWay(false)
+ .setIsEscaped(false)
RemoteProtocolBuilder.setMessage(message, requestBuilder)
val id = registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
@@ -858,7 +859,7 @@ trait Actor extends TransactionManagement {
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
} else {
val future = if (senderFuture.isDefined) senderFuture.get
- else new DefaultCompletableFuture(timeout)
+ else new DefaultCompletableFuture(timeout)
val invocation = new MessageInvocation(this, message, Some(future), None, currentTransaction.get)
if (_isEventBased) {
_mailbox.add(invocation)
@@ -871,18 +872,14 @@ trait Actor extends TransactionManagement {
/**
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods.
*/
- private[akka] def invoke(messageHandle: MessageInvocation) = {
- _isDispatching = true
- synchronized {
- try {
- if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
- else dispatch(messageHandle)
- } catch {
- case e =>
- Actor.log.error(e, "Could not invoke actor [%s]", this)
- throw e
- }
- _isDispatching = false
+ private[akka] def invoke(messageHandle: MessageInvocation) = withLock {
+ try {
+ if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
+ else dispatch(messageHandle)
+ } catch {
+ case e =>
+ Actor.log.error(e, "Could not invoke actor [%s]", this)
+ throw e
}
}
@@ -921,7 +918,7 @@ trait Actor extends TransactionManagement {
if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
else throw new IllegalArgumentException(
"Actor " + toString + " could not process message [" + message + "]" +
- "\n\tsince no matching 'case' clause in its 'receive' method could be found")
+ "\n\tsince no matching 'case' clause in its 'receive' method could be found")
} finally {
decrementTransaction
}
@@ -931,8 +928,8 @@ trait Actor extends TransactionManagement {
if (isTransactionRequiresNew && !isTransactionInScope) {
if (senderFuture.isEmpty) throw new StmException(
"Can't continue transaction in a one-way fire-forget message send" +
- "\n\tE.g. using Actor '!' method or Active Object 'void' method" +
- "\n\tPlease use the Actor '!!' method or Active Object method with non-void return type")
+ "\n\tE.g. using Actor '!' method or Active Object 'void' method" +
+ "\n\tPlease use the Actor '!!' method or Active Object method with non-void return type")
atomic {
proceed
}
@@ -956,23 +953,23 @@ trait Actor extends TransactionManagement {
private def base: PartialFunction[Any, Unit] = lifeCycles orElse (_hotswap getOrElse receive)
private val lifeCycles: PartialFunction[Any, Unit] = {
- case HotSwap(code) => _hotswap = code
- case Restart(reason) => restart(reason)
+ case HotSwap(code) => _hotswap = code
+ case Restart(reason) => restart(reason)
case Exit(dead, reason) => handleTrapExit(dead, reason)
- case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
+ case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
}
private[this] def handleTrapExit(dead: Actor, reason: Throwable): Unit = {
if (trapExit.exists(_.isAssignableFrom(reason.getClass))) {
if (faultHandler.isDefined) {
faultHandler.get match {
- // FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy
+ // FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy
case AllForOneStrategy(maxNrOfRetries, withinTimeRange) => restartLinkedActors(reason)
case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => dead.restart(reason)
}
} else throw new IllegalStateException(
"No 'faultHandler' defined for an actor with the 'trapExit' member field defined " +
- "\n\tto non-empty list of exception classes - can't proceed " + toString)
+ "\n\tto non-empty list of exception classes - can't proceed " + toString)
} else {
if (_supervisor.isDefined) _supervisor.get ! Exit(dead, reason) // if 'trapExit' is not defined then pass the Exit on
}
@@ -997,14 +994,14 @@ trait Actor extends TransactionManagement {
}
}
- private[Actor] def restart(reason: Throwable) = synchronized {
+ private[Actor] def restart(reason: Throwable) = withLock {
preRestart(reason)
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
postRestart(reason)
_isKilled = false
}
- private[akka] def registerSupervisorAsRemoteActor: Option[String] = synchronized {
+ private[akka] def registerSupervisorAsRemoteActor: Option[String] = withLock {
if (_supervisor.isDefined) {
RemoteClient.clientFor(_remoteAddress.get).registerSupervisorForActor(this)
Some(_supervisor.get.uuid)
@@ -1021,26 +1018,26 @@ trait Actor extends TransactionManagement {
private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) {
if (!message.isInstanceOf[String] &&
- !message.isInstanceOf[Byte] &&
- !message.isInstanceOf[Int] &&
- !message.isInstanceOf[Long] &&
- !message.isInstanceOf[Float] &&
- !message.isInstanceOf[Double] &&
- !message.isInstanceOf[Boolean] &&
- !message.isInstanceOf[Char] &&
- !message.isInstanceOf[Tuple2[_, _]] &&
- !message.isInstanceOf[Tuple3[_, _, _]] &&
- !message.isInstanceOf[Tuple4[_, _, _, _]] &&
- !message.isInstanceOf[Tuple5[_, _, _, _, _]] &&
- !message.isInstanceOf[Tuple6[_, _, _, _, _, _]] &&
- !message.isInstanceOf[Tuple7[_, _, _, _, _, _, _]] &&
- !message.isInstanceOf[Tuple8[_, _, _, _, _, _, _, _]] &&
- !message.getClass.isArray &&
- !message.isInstanceOf[List[_]] &&
- !message.isInstanceOf[scala.collection.immutable.Map[_, _]] &&
- !message.isInstanceOf[scala.collection.immutable.Set[_]] &&
- !message.isInstanceOf[scala.collection.immutable.Tree[_, _]] &&
- !message.getClass.isAnnotationPresent(Annotations.immutable)) {
+ !message.isInstanceOf[Byte] &&
+ !message.isInstanceOf[Int] &&
+ !message.isInstanceOf[Long] &&
+ !message.isInstanceOf[Float] &&
+ !message.isInstanceOf[Double] &&
+ !message.isInstanceOf[Boolean] &&
+ !message.isInstanceOf[Char] &&
+ !message.isInstanceOf[Tuple2[_, _]] &&
+ !message.isInstanceOf[Tuple3[_, _, _]] &&
+ !message.isInstanceOf[Tuple4[_, _, _, _]] &&
+ !message.isInstanceOf[Tuple5[_, _, _, _, _]] &&
+ !message.isInstanceOf[Tuple6[_, _, _, _, _, _]] &&
+ !message.isInstanceOf[Tuple7[_, _, _, _, _, _, _]] &&
+ !message.isInstanceOf[Tuple8[_, _, _, _, _, _, _, _]] &&
+ !message.getClass.isArray &&
+ !message.isInstanceOf[List[_]] &&
+ !message.isInstanceOf[scala.collection.immutable.Map[_, _]] &&
+ !message.isInstanceOf[scala.collection.immutable.Set[_]] &&
+ !message.isInstanceOf[scala.collection.immutable.Tree[_, _]] &&
+ !message.getClass.isAnnotationPresent(Annotations.immutable)) {
Serializer.Java.deepClone(message)
} else message
} else message
@@ -1053,8 +1050,8 @@ trait Actor extends TransactionManagement {
override def equals(that: Any): Boolean = {
that != null &&
- that.isInstanceOf[Actor] &&
- that.asInstanceOf[Actor]._uuid == _uuid
+ that.isInstanceOf[Actor] &&
+ that.asInstanceOf[Actor]._uuid == _uuid
}
override def toString(): String = "Actor[" + id + ":" + uuid + "]"
diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
index 3aba413673..3342c3a7f0 100644
--- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
+++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
@@ -22,25 +22,35 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
init
def dispatch(invocation: MessageInvocation) = if (active) {
- if (!invocation.receiver._isDispatching) {
- executor.execute(new Runnable() {
- def run = {
- processMailbox(invocation)
- stealAndScheduleWork(invocation.receiver)
- }
- })
- }
+ // TODO: detect blocking with trylock ?! -> good idea... lets try that
+ executor.execute(new Runnable() {
+ def run = {
+ processMailbox(invocation)
+ stealAndScheduleWork(invocation.receiver)
+ }
+ })
} else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started")
/**
* Process the messages in the mailbox of the receiver of the invocation.
*/
private def processMailbox(invocation: MessageInvocation) = {
- var messageInvocation = invocation.receiver._mailbox.poll
- while (messageInvocation != null) {
- log.debug("[%s] is processing [%s] in [%s]", invocation.receiver, messageInvocation.message, Thread.currentThread.getName)
- messageInvocation.invoke
- messageInvocation = invocation.receiver._mailbox.poll
+ val lockAcquired = invocation.receiver.lock.tryLock
+ if (lockAcquired) {
+ log.debug("[%s] has acquired lock for [%s] in [%s]", invocation.receiver, invocation.message, Thread.currentThread.getName)
+ try {
+ var messageInvocation = invocation.receiver._mailbox.poll
+ while (messageInvocation != null) {
+ log.debug("[%s] is processing [%s] in [%s]", invocation.receiver, messageInvocation.message, Thread.currentThread.getName)
+ messageInvocation.invoke
+ messageInvocation = invocation.receiver._mailbox.poll
+ }
+ } finally {
+ invocation.receiver.lock.unlock
+ }
+ } else {
+ // lock not acquired -> other dispatcher was busy -> no need to do anything
+ log.debug("[%s] has NOT acquired lock for [%s] in [%s]", invocation.receiver, invocation.message, Thread.currentThread.getName)
}
}
@@ -71,15 +81,19 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
return None
}
+
+ override def register(actor: Actor) = {
+ super.register(actor)
+ executor.execute(new Runnable() {
+ def run = {
+ stealAndScheduleWork(actor)
+ }
+ })
+ actor // TODO: why is this necessary?
+ }
+
def start = if (!active) {
active = true
- // TODO: prestart
- // executor.execute(new Runnable() {
- // def run = {
- // // TODO: how to know which actor started me?
- // // stealWork()
- // }
- // })
}
def shutdown = if (active) {
diff --git a/akka-core/src/test/scala/WorkStealingTest.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala
similarity index 91%
rename from akka-core/src/test/scala/WorkStealingTest.scala
rename to akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala
index 9eddd5d392..bc4d62ff08 100644
--- a/akka-core/src/test/scala/WorkStealingTest.scala
+++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala
@@ -8,7 +8,7 @@ import se.scalablesolutions.akka.dispatch.Dispatchers
import java.util.Random
-class WorkStealingTest extends JUnitSuite with MustMatchers with ActorTestUtil {
+class ExecutorBasedEventDrivenWorkStealingDispatcherTest extends JUnitSuite with MustMatchers with ActorTestUtil {
val finishedCounter = new CountDownLatch(101)
@@ -18,7 +18,10 @@ class WorkStealingTest extends JUnitSuite with MustMatchers with ActorTestUtil {
val poolDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher")
class SlowActor extends Actor {
+
messageDispatcher = poolDispatcher
+// id = "SlowActor"
+
val rnd = new Random
def receive = {
case x: Int => {
@@ -31,7 +34,10 @@ class WorkStealingTest extends JUnitSuite with MustMatchers with ActorTestUtil {
}
class FastActor extends Actor {
+
messageDispatcher = poolDispatcher
+// id = "FastActor"
+
def receive = {
case x: Int => {
// println("f processing " + x)