replaced synchronization in actor with explicit lock. Use tryLock in the dispatcher to give up immediately when the lock is already held.

This commit is contained in:
Jan Van Besien 2010-03-03 22:55:46 +01:00
parent 71155bda7a
commit c2d3680a27
4 changed files with 149 additions and 121 deletions

View file

@ -108,4 +108,15 @@
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>-agentlib:yjpagent=monitors</argLine>
</configuration>
</plugin>
</plugins>
</build>
</project>

View file

@ -21,6 +21,7 @@ import org.multiverse.api.ThreadLocalTransaction._
import java.util.{Deque, HashSet}
import java.net.InetSocketAddress
import java.util.concurrent.{CopyOnWriteArrayList, LinkedBlockingDeque}
import java.util.concurrent.locks.ReentrantLock
/**
* Implements the Transactor abstraction. E.g. a transactional actor.
@ -72,7 +73,7 @@ object Actor extends Logging {
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
val PORT = config.getInt("akka.remote.server.port", 9999)
object Sender{
object Sender {
implicit val Self: Option[Actor] = None
}
@ -83,7 +84,7 @@ object Actor extends Logging {
* <pre>
* import Actor._
*
* val a = actor {
* val a = actor {
* case msg => ... // handle message
* }
* </pre>
@ -100,9 +101,9 @@ object Actor extends Logging {
* <pre>
* import Actor._
*
* val a = actor {
* val a = actor {
* ... // init stuff
* } receive {
* } receive {
* case msg => ... // handle message
* }
* </pre>
@ -130,7 +131,7 @@ object Actor extends Logging {
* <pre>
* import Actor._
*
* spawn {
* spawn {
* ... // do stuff
* }
* </pre>
@ -153,7 +154,7 @@ object Actor extends Logging {
* <pre>
* import Actor._
*
* val a = actor(LifeCycle(Temporary)) {
* val a = actor(LifeCycle(Temporary)) {
* case msg => ... // handle message
* }
* </pre>
@ -171,7 +172,7 @@ object Actor extends Logging {
* <pre>
* import Actor._
*
* val a = actor("localhost", 9999) {
* val a = actor("localhost", 9999) {
* case msg => ... // handle message
* }
* </pre>
@ -213,15 +214,7 @@ trait Actor extends TransactionManagement {
@volatile private[this] var _isEventBased: Boolean = false
@volatile private[akka] var _isKilled = false
/**
* True if a dispatcher is currently dispatching a message on this actor, false otherwise.
* <p/>
* This flag is guaranteed to be seen as true by other threads only if a dispatcher is really dispatching messages on it.
* A thread might however sometimes see this flag as false, even though a dispatcher is still dispatching messages on it.
* <p/>
* In other words, the flag can be used safely to decide that no extra dispatching is required (if the flag is true).
*/
@volatile private[akka] var _isDispatching = false
private[akka] val lock = new ReentrantLock
private var _hotswap: Option[PartialFunction[Any, Unit]] = None
private[akka] var _remoteAddress: Option[InetSocketAddress] = None
@ -350,7 +343,7 @@ trait Actor extends TransactionManagement {
* <p/>
* Example code:
* <pre>
* def receive = {
* def receive = {
* case Ping =>
* println("got a ping")
* reply("pong")
@ -409,20 +402,29 @@ trait Actor extends TransactionManagement {
// ==== API ====
// =============
def withLock[B](f: => B): B = {
lock.lock
try {
return f
} finally {
lock.unlock
}
}
/**
* Starts up the actor and its message queue.
*/
def start: Actor = synchronized {
def start: Actor = withLock {
if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'exit'")
if (!_isRunning) {
if (messageDispatcher.isShutdown &&
messageDispatcher.isInstanceOf[Dispatchers.globalExecutorBasedEventDrivenDispatcher.type]) {
if (messageDispatcher.isShutdown &&
messageDispatcher.isInstanceOf[Dispatchers.globalExecutorBasedEventDrivenDispatcher.type]) {
messageDispatcher.asInstanceOf[ExecutorBasedEventDrivenDispatcher].init
}
messageDispatcher.register(this)
messageDispatcher.start
_isRunning = true
init
init
}
Actor.log.debug("[%s] has started", toString)
ActorRegistry.register(this)
@ -438,7 +440,7 @@ trait Actor extends TransactionManagement {
/**
* Shuts down the actor its dispatcher and message queue.
*/
def stop = synchronized {
def stop = withLock {
if (_isRunning) {
messageDispatcher.unregister(this)
if (messageDispatcher.canBeShutDown) messageDispatcher.shutdown // shut down in the dispatcher's references is zero
@ -449,7 +451,6 @@ trait Actor extends TransactionManagement {
_remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid))
}
}
def isRunning = _isRunning
/**
@ -554,7 +555,7 @@ trait Actor extends TransactionManagement {
} else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
/**
* Forwards the message and passes the original sender actor as the sender.
* <p/>
@ -583,12 +584,12 @@ trait Actor extends TransactionManagement {
case None =>
throw new IllegalStateException(
"\n\tNo sender in scope, can't reply. " +
"\n\tYou have probably used the '!' method to either; " +
"\n\t\t1. Send a message to a remote actor which does not have a contact address." +
"\n\t\t2. Send a message from an instance that is *not* an actor" +
"\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " +
"\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" +
"\n\tthat will be bound by the argument passed to 'reply'. Alternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.")
"\n\tYou have probably used the '!' method to either; " +
"\n\t\t1. Send a message to a remote actor which does not have a contact address." +
"\n\t\t2. Send a message from an instance that is *not* an actor" +
"\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " +
"\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" +
"\n\tthat will be bound by the argument passed to 'reply'. Alternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.")
case Some(future) =>
future.completeWithResult(message)
}
@ -603,7 +604,7 @@ trait Actor extends TransactionManagement {
/**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/
def dispatcher_=(md: MessageDispatcher): Unit = synchronized {
def dispatcher_=(md: MessageDispatcher): Unit = withLock {
if (!_isRunning) {
messageDispatcher.unregister(this)
messageDispatcher = md
@ -647,7 +648,7 @@ trait Actor extends TransactionManagement {
* TransactionManagement.disableTransactions
* </pre>
*/
def makeTransactionRequired = synchronized {
def makeTransactionRequired = withLock {
if (_isRunning) throw new IllegalArgumentException(
"Can not make actor transaction required after it has been started")
else isTransactionRequiresNew = true
@ -798,27 +799,27 @@ trait Actor extends TransactionManagement {
protected[akka] def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = {
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(this.getClass.getName)
.setTimeout(this.timeout)
.setUuid(this.id)
.setIsActor(true)
.setIsOneWay(true)
.setIsEscaped(false)
.setId(RemoteRequestIdFactory.nextId)
.setTarget(this.getClass.getName)
.setTimeout(this.timeout)
.setUuid(this.id)
.setIsActor(true)
.setIsOneWay(true)
.setIsEscaped(false)
val id = registerSupervisorAsRemoteActor
if(id.isDefined)
if (id.isDefined)
requestBuilder.setSupervisorUuid(id.get)
// set the source fields used to reply back to the original sender
// (i.e. not the remote proxy actor)
if(sender.isDefined) {
if (sender.isDefined) {
val s = sender.get
requestBuilder.setSourceTarget(s.getClass.getName)
requestBuilder.setSourceUuid(s.uuid)
val (host,port) = s._replyToAddress.map(a => (a.getHostName,a.getPort)).getOrElse((Actor.HOSTNAME,Actor.PORT))
val (host, port) = s._replyToAddress.map(a => (a.getHostName, a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT))
log.debug("Setting sending actor as %s @ %s:%s", s.getClass.getName, host, port)
requestBuilder.setSourceHostname(host)
@ -831,25 +832,25 @@ trait Actor extends TransactionManagement {
if (_isEventBased) {
_mailbox.add(invocation)
if (_isSuspended) invocation.send
}
}
else
invocation.send
}
}
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
senderFuture: Option[CompletableFuture]): CompletableFuture = {
message: Any,
timeout: Long,
senderFuture: Option[CompletableFuture]): CompletableFuture = {
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(this.getClass.getName)
.setTimeout(this.timeout)
.setUuid(this.id)
.setIsActor(true)
.setIsOneWay(false)
.setIsEscaped(false)
.setId(RemoteRequestIdFactory.nextId)
.setTarget(this.getClass.getName)
.setTimeout(this.timeout)
.setUuid(this.id)
.setIsActor(true)
.setIsOneWay(false)
.setIsEscaped(false)
RemoteProtocolBuilder.setMessage(message, requestBuilder)
val id = registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
@ -858,7 +859,7 @@ trait Actor extends TransactionManagement {
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
} else {
val future = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture(timeout)
else new DefaultCompletableFuture(timeout)
val invocation = new MessageInvocation(this, message, Some(future), None, currentTransaction.get)
if (_isEventBased) {
_mailbox.add(invocation)
@ -871,18 +872,14 @@ trait Actor extends TransactionManagement {
/**
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods.
*/
private[akka] def invoke(messageHandle: MessageInvocation) = {
_isDispatching = true
synchronized {
try {
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
else dispatch(messageHandle)
} catch {
case e =>
Actor.log.error(e, "Could not invoke actor [%s]", this)
throw e
}
_isDispatching = false
private[akka] def invoke(messageHandle: MessageInvocation) = withLock {
try {
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
else dispatch(messageHandle)
} catch {
case e =>
Actor.log.error(e, "Could not invoke actor [%s]", this)
throw e
}
}
@ -921,7 +918,7 @@ trait Actor extends TransactionManagement {
if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
else throw new IllegalArgumentException(
"Actor " + toString + " could not process message [" + message + "]" +
"\n\tsince no matching 'case' clause in its 'receive' method could be found")
"\n\tsince no matching 'case' clause in its 'receive' method could be found")
} finally {
decrementTransaction
}
@ -931,8 +928,8 @@ trait Actor extends TransactionManagement {
if (isTransactionRequiresNew && !isTransactionInScope) {
if (senderFuture.isEmpty) throw new StmException(
"Can't continue transaction in a one-way fire-forget message send" +
"\n\tE.g. using Actor '!' method or Active Object 'void' method" +
"\n\tPlease use the Actor '!!' method or Active Object method with non-void return type")
"\n\tE.g. using Actor '!' method or Active Object 'void' method" +
"\n\tPlease use the Actor '!!' method or Active Object method with non-void return type")
atomic {
proceed
}
@ -956,23 +953,23 @@ trait Actor extends TransactionManagement {
private def base: PartialFunction[Any, Unit] = lifeCycles orElse (_hotswap getOrElse receive)
private val lifeCycles: PartialFunction[Any, Unit] = {
case HotSwap(code) => _hotswap = code
case Restart(reason) => restart(reason)
case HotSwap(code) => _hotswap = code
case Restart(reason) => restart(reason)
case Exit(dead, reason) => handleTrapExit(dead, reason)
case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
}
private[this] def handleTrapExit(dead: Actor, reason: Throwable): Unit = {
if (trapExit.exists(_.isAssignableFrom(reason.getClass))) {
if (faultHandler.isDefined) {
faultHandler.get match {
// FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy
// FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy
case AllForOneStrategy(maxNrOfRetries, withinTimeRange) => restartLinkedActors(reason)
case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => dead.restart(reason)
}
} else throw new IllegalStateException(
"No 'faultHandler' defined for an actor with the 'trapExit' member field defined " +
"\n\tto non-empty list of exception classes - can't proceed " + toString)
"\n\tto non-empty list of exception classes - can't proceed " + toString)
} else {
if (_supervisor.isDefined) _supervisor.get ! Exit(dead, reason) // if 'trapExit' is not defined then pass the Exit on
}
@ -997,14 +994,14 @@ trait Actor extends TransactionManagement {
}
}
private[Actor] def restart(reason: Throwable) = synchronized {
private[Actor] def restart(reason: Throwable) = withLock {
preRestart(reason)
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
postRestart(reason)
_isKilled = false
}
private[akka] def registerSupervisorAsRemoteActor: Option[String] = synchronized {
private[akka] def registerSupervisorAsRemoteActor: Option[String] = withLock {
if (_supervisor.isDefined) {
RemoteClient.clientFor(_remoteAddress.get).registerSupervisorForActor(this)
Some(_supervisor.get.uuid)
@ -1021,26 +1018,26 @@ trait Actor extends TransactionManagement {
private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) {
if (!message.isInstanceOf[String] &&
!message.isInstanceOf[Byte] &&
!message.isInstanceOf[Int] &&
!message.isInstanceOf[Long] &&
!message.isInstanceOf[Float] &&
!message.isInstanceOf[Double] &&
!message.isInstanceOf[Boolean] &&
!message.isInstanceOf[Char] &&
!message.isInstanceOf[Tuple2[_, _]] &&
!message.isInstanceOf[Tuple3[_, _, _]] &&
!message.isInstanceOf[Tuple4[_, _, _, _]] &&
!message.isInstanceOf[Tuple5[_, _, _, _, _]] &&
!message.isInstanceOf[Tuple6[_, _, _, _, _, _]] &&
!message.isInstanceOf[Tuple7[_, _, _, _, _, _, _]] &&
!message.isInstanceOf[Tuple8[_, _, _, _, _, _, _, _]] &&
!message.getClass.isArray &&
!message.isInstanceOf[List[_]] &&
!message.isInstanceOf[scala.collection.immutable.Map[_, _]] &&
!message.isInstanceOf[scala.collection.immutable.Set[_]] &&
!message.isInstanceOf[scala.collection.immutable.Tree[_, _]] &&
!message.getClass.isAnnotationPresent(Annotations.immutable)) {
!message.isInstanceOf[Byte] &&
!message.isInstanceOf[Int] &&
!message.isInstanceOf[Long] &&
!message.isInstanceOf[Float] &&
!message.isInstanceOf[Double] &&
!message.isInstanceOf[Boolean] &&
!message.isInstanceOf[Char] &&
!message.isInstanceOf[Tuple2[_, _]] &&
!message.isInstanceOf[Tuple3[_, _, _]] &&
!message.isInstanceOf[Tuple4[_, _, _, _]] &&
!message.isInstanceOf[Tuple5[_, _, _, _, _]] &&
!message.isInstanceOf[Tuple6[_, _, _, _, _, _]] &&
!message.isInstanceOf[Tuple7[_, _, _, _, _, _, _]] &&
!message.isInstanceOf[Tuple8[_, _, _, _, _, _, _, _]] &&
!message.getClass.isArray &&
!message.isInstanceOf[List[_]] &&
!message.isInstanceOf[scala.collection.immutable.Map[_, _]] &&
!message.isInstanceOf[scala.collection.immutable.Set[_]] &&
!message.isInstanceOf[scala.collection.immutable.Tree[_, _]] &&
!message.getClass.isAnnotationPresent(Annotations.immutable)) {
Serializer.Java.deepClone(message)
} else message
} else message
@ -1053,8 +1050,8 @@ trait Actor extends TransactionManagement {
override def equals(that: Any): Boolean = {
that != null &&
that.isInstanceOf[Actor] &&
that.asInstanceOf[Actor]._uuid == _uuid
that.isInstanceOf[Actor] &&
that.asInstanceOf[Actor]._uuid == _uuid
}
override def toString(): String = "Actor[" + id + ":" + uuid + "]"

View file

@ -22,25 +22,35 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
init
def dispatch(invocation: MessageInvocation) = if (active) {
if (!invocation.receiver._isDispatching) {
executor.execute(new Runnable() {
def run = {
processMailbox(invocation)
stealAndScheduleWork(invocation.receiver)
}
})
}
// TODO: detect blocking with trylock ?! -> good idea... lets try that
executor.execute(new Runnable() {
def run = {
processMailbox(invocation)
stealAndScheduleWork(invocation.receiver)
}
})
} else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started")
/**
* Process the messages in the mailbox of the receiver of the invocation.
*/
private def processMailbox(invocation: MessageInvocation) = {
var messageInvocation = invocation.receiver._mailbox.poll
while (messageInvocation != null) {
log.debug("[%s] is processing [%s] in [%s]", invocation.receiver, messageInvocation.message, Thread.currentThread.getName)
messageInvocation.invoke
messageInvocation = invocation.receiver._mailbox.poll
val lockAcquired = invocation.receiver.lock.tryLock
if (lockAcquired) {
log.debug("[%s] has acquired lock for [%s] in [%s]", invocation.receiver, invocation.message, Thread.currentThread.getName)
try {
var messageInvocation = invocation.receiver._mailbox.poll
while (messageInvocation != null) {
log.debug("[%s] is processing [%s] in [%s]", invocation.receiver, messageInvocation.message, Thread.currentThread.getName)
messageInvocation.invoke
messageInvocation = invocation.receiver._mailbox.poll
}
} finally {
invocation.receiver.lock.unlock
}
} else {
// lock not acquired -> other dispatcher was busy -> no need to do anything
log.debug("[%s] has NOT acquired lock for [%s] in [%s]", invocation.receiver, invocation.message, Thread.currentThread.getName)
}
}
@ -71,15 +81,19 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
return None
}
override def register(actor: Actor) = {
super.register(actor)
executor.execute(new Runnable() {
def run = {
stealAndScheduleWork(actor)
}
})
actor // TODO: why is this necessary?
}
def start = if (!active) {
active = true
// TODO: prestart
// executor.execute(new Runnable() {
// def run = {
// // TODO: how to know which actor started me?
// // stealWork()
// }
// })
}
def shutdown = if (active) {

View file

@ -8,7 +8,7 @@ import se.scalablesolutions.akka.dispatch.Dispatchers
import java.util.Random
class WorkStealingTest extends JUnitSuite with MustMatchers with ActorTestUtil {
class ExecutorBasedEventDrivenWorkStealingDispatcherTest extends JUnitSuite with MustMatchers with ActorTestUtil {
val finishedCounter = new CountDownLatch(101)
@ -18,7 +18,10 @@ class WorkStealingTest extends JUnitSuite with MustMatchers with ActorTestUtil {
val poolDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher")
class SlowActor extends Actor {
messageDispatcher = poolDispatcher
// id = "SlowActor"
val rnd = new Random
def receive = {
case x: Int => {
@ -31,7 +34,10 @@ class WorkStealingTest extends JUnitSuite with MustMatchers with ActorTestUtil {
}
class FastActor extends Actor {
messageDispatcher = poolDispatcher
// id = "FastActor"
def receive = {
case x: Int => {
// println("f processing " + x)