diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 4244d4b7c2..309d45600a 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -20,7 +20,7 @@ import org.multiverse.api.ThreadLocalTransaction._ import java.util.{Deque, HashSet} import java.net.InetSocketAddress -import java.util.concurrent.{CopyOnWriteArrayList, LinkedBlockingDeque} +import java.util.concurrent.LinkedBlockingDeque import java.util.concurrent.locks.ReentrantLock /** @@ -84,7 +84,7 @@ object Actor extends Logging { *
* import Actor._
*
- * val a = actor {
+ * val a = actor {
* case msg => ... // handle message
* }
*
@@ -101,9 +101,9 @@ object Actor extends Logging {
*
* import Actor._
*
- * val a = actor {
+ * val a = actor {
* ... // init stuff
- * } receive {
+ * } receive {
* case msg => ... // handle message
* }
*
@@ -131,7 +131,7 @@ object Actor extends Logging {
*
* import Actor._
*
- * spawn {
+ * spawn {
* ... // do stuff
* }
*
@@ -154,7 +154,7 @@ object Actor extends Logging {
*
* import Actor._
*
- * val a = actor(LifeCycle(Temporary)) {
+ * val a = actor(LifeCycle(Temporary)) {
* case msg => ... // handle message
* }
*
@@ -172,7 +172,7 @@ object Actor extends Logging {
*
* import Actor._
*
- * val a = actor("localhost", 9999) {
+ * val a = actor("localhost", 9999) {
* case msg => ... // handle message
* }
*
@@ -208,6 +208,7 @@ trait Actor extends TransactionManagement {
// private fields
// ====================================
+
@volatile private[this] var _isRunning = false
@volatile private[this] var _isSuspended = true
@volatile private[this] var _isShutDown = false
@@ -343,7 +344,7 @@ trait Actor extends TransactionManagement {
*
* Example code:
*
- * def receive = {
+ * def receive = {
* case Ping =>
* println("got a ping")
* reply("pong")
@@ -402,19 +403,20 @@ trait Actor extends TransactionManagement {
// ==== API ====
// =============
- def withLock[B](f: => B): B = {
- lock.lock
- try {
- return f
- } finally {
- lock.unlock
- }
- }
+ // private def withLock[B](f: => B): B = {
+ // lock.lock
+ // try {
+ // return f
+ // } finally {
+ // lock.unlock
+ // }
+ // }
/**
* Starts up the actor and its message queue.
*/
- def start: Actor = withLock {
+ def start: Actor = {lock.lock;
+ try {
if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'exit'")
if (!_isRunning) {
if (messageDispatcher.isShutdown &&
@@ -429,7 +431,7 @@ trait Actor extends TransactionManagement {
Actor.log.debug("[%s] has started", toString)
ActorRegistry.register(this)
this
- }
+ } finally {lock.unlock}}
/**
* Shuts down the actor its dispatcher and message queue.
@@ -440,17 +442,21 @@ trait Actor extends TransactionManagement {
/**
* Shuts down the actor its dispatcher and message queue.
*/
- def stop = withLock {
- if (_isRunning) {
- messageDispatcher.unregister(this)
- if (messageDispatcher.canBeShutDown) messageDispatcher.shutdown // shut down in the dispatcher's references is zero
- _isRunning = false
- _isShutDown = true
- shutdown
- ActorRegistry.unregister(this)
- _remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid))
- }
+ def stop = {
+ lock.lock
+ try {
+ if (_isRunning) {
+ messageDispatcher.unregister(this)
+ if (messageDispatcher.canBeShutDown) messageDispatcher.shutdown // shut down in the dispatcher's references is zero
+ _isRunning = false
+ _isShutDown = true
+ shutdown
+ ActorRegistry.unregister(this)
+ _remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid))
+ }
+ } finally {lock.unlock}
}
+
def isRunning = _isRunning
/**
@@ -604,7 +610,8 @@ trait Actor extends TransactionManagement {
/**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/
- def dispatcher_=(md: MessageDispatcher): Unit = withLock {
+ def dispatcher_=(md: MessageDispatcher): Unit = {lock.lock;
+ try {
if (!_isRunning) {
messageDispatcher.unregister(this)
messageDispatcher = md
@@ -612,7 +619,7 @@ trait Actor extends TransactionManagement {
_isEventBased = messageDispatcher.isInstanceOf[ExecutorBasedEventDrivenDispatcher]
} else throw new IllegalArgumentException(
"Can not swap dispatcher for " + toString + " after it has been started")
- }
+ } finally {lock.unlock}}
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
@@ -648,11 +655,12 @@ trait Actor extends TransactionManagement {
* TransactionManagement.disableTransactions
*
*/
- def makeTransactionRequired = withLock {
+ def makeTransactionRequired = {lock.lock;
+ try {
if (_isRunning) throw new IllegalArgumentException(
"Can not make actor transaction required after it has been started")
else isTransactionRequiresNew = true
- }
+ } finally {lock.unlock}}
/**
* Links an other actor to this actor. Links are unidirectional and means that a the linking actor will
@@ -872,7 +880,8 @@ trait Actor extends TransactionManagement {
/**
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods.
*/
- private[akka] def invoke(messageHandle: MessageInvocation) = withLock {
+ private[akka] def invoke(messageHandle: MessageInvocation) = {lock.lock;
+ try {
try {
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
else dispatch(messageHandle)
@@ -881,7 +890,7 @@ trait Actor extends TransactionManagement {
Actor.log.error(e, "Could not invoke actor [%s]", this)
throw e
}
- }
+ } finally {lock.unlock}}
private def dispatch[T](messageHandle: MessageInvocation) = {
setTransaction(messageHandle.tx)
@@ -994,19 +1003,21 @@ trait Actor extends TransactionManagement {
}
}
- private[Actor] def restart(reason: Throwable) = withLock {
+ private[Actor] def restart(reason: Throwable) = {lock.lock;
+ try {
preRestart(reason)
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
postRestart(reason)
_isKilled = false
- }
+ } finally {lock.unlock}}
- private[akka] def registerSupervisorAsRemoteActor: Option[String] = withLock {
+ private[akka] def registerSupervisorAsRemoteActor: Option[String] = {lock.lock;
+ try {
if (_supervisor.isDefined) {
RemoteClient.clientFor(_remoteAddress.get).registerSupervisorForActor(this)
Some(_supervisor.get.uuid)
} else None
- }
+ } finally {lock.unlock}}
protected def getLinkedActors: HashSet[Actor] = {
if (_linkedActors.isEmpty) {
diff --git a/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala
index bc4d62ff08..e362a59711 100644
--- a/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala
+++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala
@@ -20,7 +20,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherTest extends JUnitSuite with
class SlowActor extends Actor {
messageDispatcher = poolDispatcher
-// id = "SlowActor"
+ id = "SlowActor"
val rnd = new Random
def receive = {
@@ -36,7 +36,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherTest extends JUnitSuite with
class FastActor extends Actor {
messageDispatcher = poolDispatcher
-// id = "FastActor"
+ id = "FastActor"
def receive = {
case x: Int => {