Had to remove the withLock method, otherwize java.lang.AbstractMethodError at runtime. The work stealing now actually works and gives a real improvement. Actors seem to be stealing work multiple times (going back and forth between actors) though... might need to tweak that.

This commit is contained in:
Jan Van Besien 2010-03-03 23:26:33 +01:00
parent c2d3680a27
commit 809821fc8e
2 changed files with 51 additions and 40 deletions

View file

@ -20,7 +20,7 @@ import org.multiverse.api.ThreadLocalTransaction._
import java.util.{Deque, HashSet}
import java.net.InetSocketAddress
import java.util.concurrent.{CopyOnWriteArrayList, LinkedBlockingDeque}
import java.util.concurrent.LinkedBlockingDeque
import java.util.concurrent.locks.ReentrantLock
/**
@ -84,7 +84,7 @@ object Actor extends Logging {
* <pre>
* import Actor._
*
* val a = actor {
* val a = actor {
* case msg => ... // handle message
* }
* </pre>
@ -101,9 +101,9 @@ object Actor extends Logging {
* <pre>
* import Actor._
*
* val a = actor {
* val a = actor {
* ... // init stuff
* } receive {
* } receive {
* case msg => ... // handle message
* }
* </pre>
@ -131,7 +131,7 @@ object Actor extends Logging {
* <pre>
* import Actor._
*
* spawn {
* spawn {
* ... // do stuff
* }
* </pre>
@ -154,7 +154,7 @@ object Actor extends Logging {
* <pre>
* import Actor._
*
* val a = actor(LifeCycle(Temporary)) {
* val a = actor(LifeCycle(Temporary)) {
* case msg => ... // handle message
* }
* </pre>
@ -172,7 +172,7 @@ object Actor extends Logging {
* <pre>
* import Actor._
*
* val a = actor("localhost", 9999) {
* val a = actor("localhost", 9999) {
* case msg => ... // handle message
* }
* </pre>
@ -208,6 +208,7 @@ trait Actor extends TransactionManagement {
// private fields
// ====================================
@volatile private[this] var _isRunning = false
@volatile private[this] var _isSuspended = true
@volatile private[this] var _isShutDown = false
@ -343,7 +344,7 @@ trait Actor extends TransactionManagement {
* <p/>
* Example code:
* <pre>
* def receive = {
* def receive = {
* case Ping =>
* println("got a ping")
* reply("pong")
@ -402,19 +403,20 @@ trait Actor extends TransactionManagement {
// ==== API ====
// =============
def withLock[B](f: => B): B = {
lock.lock
try {
return f
} finally {
lock.unlock
}
}
// private def withLock[B](f: => B): B = {
// lock.lock
// try {
// return f
// } finally {
// lock.unlock
// }
// }
/**
* Starts up the actor and its message queue.
*/
def start: Actor = withLock {
def start: Actor = {lock.lock;
try {
if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'exit'")
if (!_isRunning) {
if (messageDispatcher.isShutdown &&
@ -429,7 +431,7 @@ trait Actor extends TransactionManagement {
Actor.log.debug("[%s] has started", toString)
ActorRegistry.register(this)
this
}
} finally {lock.unlock}}
/**
* Shuts down the actor its dispatcher and message queue.
@ -440,17 +442,21 @@ trait Actor extends TransactionManagement {
/**
* Shuts down the actor its dispatcher and message queue.
*/
def stop = withLock {
if (_isRunning) {
messageDispatcher.unregister(this)
if (messageDispatcher.canBeShutDown) messageDispatcher.shutdown // shut down in the dispatcher's references is zero
_isRunning = false
_isShutDown = true
shutdown
ActorRegistry.unregister(this)
_remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid))
}
def stop = {
lock.lock
try {
if (_isRunning) {
messageDispatcher.unregister(this)
if (messageDispatcher.canBeShutDown) messageDispatcher.shutdown // shut down in the dispatcher's references is zero
_isRunning = false
_isShutDown = true
shutdown
ActorRegistry.unregister(this)
_remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid))
}
} finally {lock.unlock}
}
def isRunning = _isRunning
/**
@ -604,7 +610,8 @@ trait Actor extends TransactionManagement {
/**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/
def dispatcher_=(md: MessageDispatcher): Unit = withLock {
def dispatcher_=(md: MessageDispatcher): Unit = {lock.lock;
try {
if (!_isRunning) {
messageDispatcher.unregister(this)
messageDispatcher = md
@ -612,7 +619,7 @@ trait Actor extends TransactionManagement {
_isEventBased = messageDispatcher.isInstanceOf[ExecutorBasedEventDrivenDispatcher]
} else throw new IllegalArgumentException(
"Can not swap dispatcher for " + toString + " after it has been started")
}
} finally {lock.unlock}}
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
@ -648,11 +655,12 @@ trait Actor extends TransactionManagement {
* TransactionManagement.disableTransactions
* </pre>
*/
def makeTransactionRequired = withLock {
def makeTransactionRequired = {lock.lock;
try {
if (_isRunning) throw new IllegalArgumentException(
"Can not make actor transaction required after it has been started")
else isTransactionRequiresNew = true
}
} finally {lock.unlock}}
/**
* Links an other actor to this actor. Links are unidirectional and means that a the linking actor will
@ -872,7 +880,8 @@ trait Actor extends TransactionManagement {
/**
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods.
*/
private[akka] def invoke(messageHandle: MessageInvocation) = withLock {
private[akka] def invoke(messageHandle: MessageInvocation) = {lock.lock;
try {
try {
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
else dispatch(messageHandle)
@ -881,7 +890,7 @@ trait Actor extends TransactionManagement {
Actor.log.error(e, "Could not invoke actor [%s]", this)
throw e
}
}
} finally {lock.unlock}}
private def dispatch[T](messageHandle: MessageInvocation) = {
setTransaction(messageHandle.tx)
@ -994,19 +1003,21 @@ trait Actor extends TransactionManagement {
}
}
private[Actor] def restart(reason: Throwable) = withLock {
private[Actor] def restart(reason: Throwable) = {lock.lock;
try {
preRestart(reason)
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
postRestart(reason)
_isKilled = false
}
} finally {lock.unlock}}
private[akka] def registerSupervisorAsRemoteActor: Option[String] = withLock {
private[akka] def registerSupervisorAsRemoteActor: Option[String] = {lock.lock;
try {
if (_supervisor.isDefined) {
RemoteClient.clientFor(_remoteAddress.get).registerSupervisorForActor(this)
Some(_supervisor.get.uuid)
} else None
}
} finally {lock.unlock}}
protected def getLinkedActors: HashSet[Actor] = {
if (_linkedActors.isEmpty) {

View file

@ -20,7 +20,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherTest extends JUnitSuite with
class SlowActor extends Actor {
messageDispatcher = poolDispatcher
// id = "SlowActor"
id = "SlowActor"
val rnd = new Random
def receive = {
@ -36,7 +36,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherTest extends JUnitSuite with
class FastActor extends Actor {
messageDispatcher = poolDispatcher
// id = "FastActor"
id = "FastActor"
def receive = {
case x: Int => {