diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala
index 73d6691644..b4b721eebd 100644
--- a/akka-actors/src/main/scala/actor/Actor.scala
+++ b/akka-actors/src/main/scala/actor/Actor.scala
@@ -20,6 +20,7 @@ import se.scalablesolutions.akka.util.Helpers.ReadWriteLock
import se.scalablesolutions.akka.util.Logging
import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint}
+import org.codehaus.aspectwerkz.proxy.Uuid
import org.multiverse.utils.ThreadLocalTransaction._
@@ -54,28 +55,25 @@ object Actor {
}
/**
+ * Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model'
* @author Jonas Bonér
*/
trait Actor extends Logging with TransactionManagement {
ActorRegistry.register(this)
+
+ // FIXME http://www.assembla.com/spaces/akka/tickets/56-Change-UUID-generation-for-the-TransactionManagement-trait
+ val uuid = Uuid.newUuid.toString
- @volatile private[this] var isRunning: Boolean = false
- private[this] val remoteFlagLock = new ReadWriteLock
- private[this] val transactionalFlagLock = new ReadWriteLock
-
- private var hotswap: Option[PartialFunction[Any, Unit]] = None
- private var config: Option[AnyRef] = None
-
- @volatile protected[this] var isTransactionRequiresNew = false
- @volatile protected[this] var remoteAddress: Option[InetSocketAddress] = None
- @volatile protected[akka] var supervisor: Option[Actor] = None
-
- protected[akka] var mailbox: MessageQueue = _
- protected[this] var senderFuture: Option[CompletableFutureResult] = None
- protected[this] val linkedActors = new HashSet[Actor]
- protected[actor] var lifeCycleConfig: Option[LifeCycle] = None
-
- val name = this.getClass.getName
+ // private fields
+ @volatile private var _isRunning: Boolean = false
+ private var _hotswap: Option[PartialFunction[Any, Unit]] = None
+ private var _config: Option[AnyRef] = None
+ private val _remoteFlagLock = new ReadWriteLock
+ private var _senderFuture: Option[CompletableFutureResult] = None
+ private var _remoteAddress: Option[InetSocketAddress] = None
+ private[akka] val _linkedActors = new HashSet[Actor]
+ private[akka] var _mailbox: MessageQueue = _
+ private[akka] var _supervisor: Option[Actor] = None
// ====================================
// ==== USER CALLBACKS TO OVERRIDE ====
@@ -88,6 +86,15 @@ trait Actor extends Logging with TransactionManagement {
*/
@volatile var timeout: Long = Actor.TIMEOUT
+ /**
+ * User overridable callback/setting.
+ *
+ * Defines the life-cycle for a supervised actor.
+ *
+ * Needs to be set if the actor is supervised programmatically.
+ */
+ @volatile var lifeCycleConfig: Option[LifeCycle] = None
+
/**
* User overridable callback/setting.
*
@@ -110,7 +117,7 @@ trait Actor extends Logging with TransactionManagement {
*/
protected[akka] var messageDispatcher: MessageDispatcher = {
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher(getClass.getName)
- mailbox = dispatcher.messageQueue
+ _mailbox = dispatcher.messageQueue
dispatcher.registerHandler(this, new ActorMessageInvoker(this))
dispatcher
}
@@ -129,6 +136,14 @@ trait Actor extends Logging with TransactionManagement {
*/
protected[this] var trapExit: Boolean = false
+ /**
+ * User overridable callback/setting.
+ *
+ * Set to true if messages should have REQUIRES_NEW semantics, e.g. a new transaction should
+ * start if there is no one running, else it joins the existing transaction.
+ */
+ @volatile protected var isTransactionRequiresNew = false
+
/**
* User overridable callback/setting.
*
@@ -213,9 +228,9 @@ trait Actor extends Logging with TransactionManagement {
* Starts up the actor and its message queue.
*/
def start = synchronized {
- if (!isRunning) {
+ if (!_isRunning) {
messageDispatcher.start
- isRunning = true
+ _isRunning = true
//if (isTransactional) this !! TransactionalInit
}
log.info("[%s] has started", toString)
@@ -225,11 +240,11 @@ trait Actor extends Logging with TransactionManagement {
* Stops the actor and its message queue.
*/
def stop = synchronized {
- if (isRunning) {
+ if (_isRunning) {
dispatcher.unregisterHandler(this)
if (dispatcher.isInstanceOf[ThreadBasedDispatcher]) dispatcher.shutdown
// FIXME: Need to do reference count to know if EventBasedThreadPoolDispatcher and EventBasedSingleThreadDispatcher can be shut down
- isRunning = false
+ _isRunning = false
shutdown
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
}
@@ -238,7 +253,7 @@ trait Actor extends Logging with TransactionManagement {
* Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
*/
def !(message: AnyRef) =
- if (isRunning) postMessageToMailbox(message)
+ if (_isRunning) postMessageToMailbox(message)
else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
/**
@@ -251,7 +266,7 @@ trait Actor extends Logging with TransactionManagement {
* If you are sending messages using !! then you have to use reply(..)
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
- def !: Option[T] = if (isRunning) {
+ def !: Option[T] = if (_isRunning) {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
val isActiveObject = message.isInstanceOf[Invocation]
if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.completeWithResult(None)
@@ -283,7 +298,7 @@ trait Actor extends Logging with TransactionManagement {
* NOTE:
* Should be used with care (almost never), since very dangerous (will block a thread indefinitely if no reply).
*/
- def !?[T](message: AnyRef): T = if (isRunning) {
+ def !?[T](message: AnyRef): T = if (_isRunning) {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, 0)
future.awaitBlocking
getResultOrThrowException(future).get
@@ -297,7 +312,7 @@ trait Actor extends Logging with TransactionManagement {
* Does only work together with the actor !! method and/or active objects not annotated
* with @oneway.
*/
- protected[this] def reply(message: AnyRef) = senderFuture match {
+ protected[this] def reply(message: AnyRef) = _senderFuture match {
case None => throw new IllegalStateException(
"\n\tNo sender in scope, can't reply. " +
"\n\tHave you used the '!' message send or the '@oneway' active object annotation? " +
@@ -311,9 +326,9 @@ trait Actor extends Logging with TransactionManagement {
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/
def dispatcher_=(dispatcher: MessageDispatcher): Unit = synchronized {
- if (!isRunning) {
+ if (!_isRunning) {
messageDispatcher = dispatcher
- mailbox = messageDispatcher.messageQueue
+ _mailbox = messageDispatcher.messageQueue
messageDispatcher.registerHandler(this, new ActorMessageInvoker(this))
} else throw new IllegalArgumentException("Can not swap dispatcher for " + toString + " after it has been started")
}
@@ -321,15 +336,15 @@ trait Actor extends Logging with TransactionManagement {
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
- def makeRemote(hostname: String, port: Int): Unit = remoteFlagLock.withWriteLock {
+ def makeRemote(hostname: String, port: Int): Unit = _remoteFlagLock.withWriteLock {
makeRemote(new InetSocketAddress(hostname, port))
}
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
- def makeRemote(address: InetSocketAddress): Unit = remoteFlagLock.withWriteLock {
- remoteAddress = Some(address)
+ def makeRemote(address: InetSocketAddress): Unit = _remoteFlagLock.withWriteLock {
+ _remoteAddress = Some(address)
}
/**
@@ -341,7 +356,7 @@ trait Actor extends Logging with TransactionManagement {
*
*/
def makeTransactionRequired = synchronized {
- if (isRunning) throw new IllegalArgumentException("Can not make actor transaction required after it has been started")
+ if (_isRunning) throw new IllegalArgumentException("Can not make actor transaction required after it has been started")
else isTransactionRequiresNew = true
}
@@ -352,10 +367,10 @@ trait Actor extends Logging with TransactionManagement {
* To be invoked from within the actor itself.
*/
protected[this] def link(actor: Actor) = {
- if (isRunning) {
- linkedActors.add(actor)
- if (actor.supervisor.isDefined) throw new IllegalStateException("Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails")
- actor.supervisor = Some(this)
+ if (_isRunning) {
+ _linkedActors.add(actor)
+ if (actor._supervisor.isDefined) throw new IllegalStateException("Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails")
+ actor._supervisor = Some(this)
log.debug("Linking actor [%s] to actor [%s]", actor, this)
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
}
@@ -366,10 +381,10 @@ trait Actor extends Logging with TransactionManagement {
* To be invoked from within the actor itself.
*/
protected[this] def unlink(actor: Actor) = {
- if (isRunning) {
- if (!linkedActors.contains(actor)) throw new IllegalStateException("Actor [" + actor + "] is not a linked actor, can't unlink")
- linkedActors.remove(actor)
- actor.supervisor = None
+ if (_isRunning) {
+ if (!_linkedActors.contains(actor)) throw new IllegalStateException("Actor [" + actor + "] is not a linked actor, can't unlink")
+ _linkedActors.remove(actor)
+ actor._supervisor = None
log.debug("Unlinking actor [%s] from actor [%s]", actor, this)
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
}
@@ -404,7 +419,7 @@ trait Actor extends Logging with TransactionManagement {
val actor = actorClass.newInstance.asInstanceOf[T]
if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) {
actor.dispatcher = dispatcher
- actor.mailbox = mailbox
+ actor._mailbox = _mailbox
}
actor.start
actor
@@ -420,7 +435,7 @@ trait Actor extends Logging with TransactionManagement {
actor.makeRemote(hostname, port)
if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) {
actor.dispatcher = dispatcher
- actor.mailbox = mailbox
+ actor._mailbox = _mailbox
}
actor.start
actor
@@ -453,8 +468,8 @@ trait Actor extends Logging with TransactionManagement {
// ==== IMPLEMENTATION DETAILS ====
// ================================
- private def postMessageToMailbox(message: AnyRef): Unit = remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
- if (remoteAddress.isDefined) {
+ private def postMessageToMailbox(message: AnyRef): Unit = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
+ if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(this.getClass.getName)
@@ -465,15 +480,15 @@ trait Actor extends Logging with TransactionManagement {
val id = registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
RemoteProtocolBuilder.setMessage(message, requestBuilder)
- RemoteClient.clientFor(remoteAddress.get).send(requestBuilder.build)
+ RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build)
} else {
val handle = new MessageInvocation(this, message, None, currentTransaction.get)
handle.send
}
}
- private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
- if (remoteAddress.isDefined) {
+ private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
+ if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(this.getClass.getName)
@@ -484,7 +499,7 @@ trait Actor extends Logging with TransactionManagement {
RemoteProtocolBuilder.setMessage(message, requestBuilder)
val id = registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
- val future = RemoteClient.clientFor(remoteAddress.get).send(requestBuilder.build)
+ val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build)
if (future.isDefined) future.get
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
} else {
@@ -509,13 +524,13 @@ trait Actor extends Logging with TransactionManagement {
val message = messageHandle.message //serializeMessage(messageHandle.message)
val future = messageHandle.future
try {
- senderFuture = future
+ _senderFuture = future
if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString)
} catch {
case e =>
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
- if (supervisor.isDefined) supervisor.get ! Exit(this, e)
+ if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
if (future.isDefined) future.get.completeWithException(this, e)
else e.printStackTrace
} finally {
@@ -540,9 +555,9 @@ trait Actor extends Logging with TransactionManagement {
}
try {
- senderFuture = future
+ _senderFuture = future
if (isTransactionRequiresNew && !isTransactionInScope) {
- if (senderFuture.isEmpty) throw new StmException(
+ if (_senderFuture.isEmpty) throw new StmException(
"\n\tCan't continue transaction in a one-way fire-forget message send" +
"\n\tE.g. using Actor '!' method or Active Object 'void' method" +
"\n\tPlease use the Actor '!!', '!?' methods or Active Object method with non-void return type")
@@ -560,7 +575,7 @@ trait Actor extends Logging with TransactionManagement {
clearTransaction // need to clear currentTransaction before call to supervisor
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
- if (supervisor.isDefined) supervisor.get ! Exit(this, e)
+ if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
} finally {
clearTransaction
}
@@ -570,11 +585,11 @@ trait Actor extends Logging with TransactionManagement {
if (future.exception.isDefined) throw future.exception.get._2
else future.result.asInstanceOf[Option[T]]
- private def base: PartialFunction[Any, Unit] = lifeCycle orElse (hotswap getOrElse receive)
+ private def base: PartialFunction[Any, Unit] = lifeCycle orElse (_hotswap getOrElse receive)
private val lifeCycle: PartialFunction[Any, Unit] = {
case Init(config) => init(config)
- case HotSwap(code) => hotswap = code
+ case HotSwap(code) => _hotswap = code
case Restart(reason) => restart(reason)
case Exit(dead, reason) => handleTrapExit(dead, reason)
// case TransactionalInit => initTransactionalState
@@ -590,12 +605,12 @@ trait Actor extends Logging with TransactionManagement {
}
} else throw new IllegalStateException("No 'faultHandler' defined for actor with the 'trapExit' flag set to true - can't proceed " + toString)
} else {
- if (supervisor.isDefined) supervisor.get ! Exit(dead, reason) // if 'trapExit' is not defined then pass the Exit on
+ if (_supervisor.isDefined) _supervisor.get ! Exit(dead, reason) // if 'trapExit' is not defined then pass the Exit on
}
}
private[this] def restartLinkedActors(reason: AnyRef) =
- linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach(_.restart(reason))
+ _linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach(_.restart(reason))
private[Actor] def restart(reason: AnyRef) = synchronized {
lifeCycleConfig match {
@@ -605,9 +620,9 @@ trait Actor extends Logging with TransactionManagement {
case Some(LifeCycle(scope, shutdownTime, _)) => {
scope match {
case Permanent => {
- preRestart(reason, config)
+ preRestart(reason, _config)
log.info("Restarting actor [%s] configured as PERMANENT.", id)
- postRestart(reason, config)
+ postRestart(reason, _config)
}
case Temporary =>
@@ -626,16 +641,16 @@ trait Actor extends Logging with TransactionManagement {
}
private[akka] def registerSupervisorAsRemoteActor: Option[String] = synchronized {
- if (supervisor.isDefined) {
- RemoteClient.clientFor(remoteAddress.get).registerSupervisorForActor(this)
- Some(supervisor.get.uuid)
+ if (_supervisor.isDefined) {
+ RemoteClient.clientFor(_remoteAddress.get).registerSupervisorForActor(this)
+ Some(_supervisor.get.uuid)
} else None
}
private[akka] def swapDispatcher(disp: MessageDispatcher) = synchronized {
messageDispatcher = disp
- mailbox = messageDispatcher.messageQueue
+ _mailbox = messageDispatcher.messageQueue
messageDispatcher.registerHandler(this, new ActorMessageInvoker(this))
}
diff --git a/akka-actors/src/main/scala/actor/Supervisor.scala b/akka-actors/src/main/scala/actor/Supervisor.scala
index 0c02bccc67..3b5236625d 100644
--- a/akka-actors/src/main/scala/actor/Supervisor.scala
+++ b/akka-actors/src/main/scala/actor/Supervisor.scala
@@ -127,10 +127,10 @@ class Supervisor private[akka] (handler: FaultHandlingStrategy) extends Actor wi
protected def receive: PartialFunction[Any, Unit] = {
case StartSupervisor =>
- linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor => actor.start; log.info("Starting actor: %s", actor) }
+ _linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor => actor.start; log.info("Starting actor: %s", actor) }
case StopSupervisor =>
- linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor => actor.stop; log.info("Stopping actor: %s", actor) }
+ _linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor => actor.stop; log.info("Stopping actor: %s", actor) }
log.info("Stopping supervisor: %s", this)
stop
}
diff --git a/akka-actors/src/main/scala/dispatch/Reactor.scala b/akka-actors/src/main/scala/dispatch/Reactor.scala
index 2949bf1456..36be55a391 100644
--- a/akka-actors/src/main/scala/dispatch/Reactor.scala
+++ b/akka-actors/src/main/scala/dispatch/Reactor.scala
@@ -46,7 +46,7 @@ class MessageInvocation(val receiver: Actor,
private [akka] val nrOfDeliveryAttempts = new AtomicInteger(0)
def send = synchronized {
- receiver.mailbox.append(this)
+ receiver._mailbox.append(this)
nrOfDeliveryAttempts.incrementAndGet
}
diff --git a/akka-actors/src/main/scala/nio/RemoteClient.scala b/akka-actors/src/main/scala/nio/RemoteClient.scala
index 6ac441dfa7..c25782fe31 100644
--- a/akka-actors/src/main/scala/nio/RemoteClient.scala
+++ b/akka-actors/src/main/scala/nio/RemoteClient.scala
@@ -108,12 +108,12 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
} else throw new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.")
def registerSupervisorForActor(actor: Actor) =
- if (!actor.supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actor + " since it is not under supervision")
- else supervisors.putIfAbsent(actor.supervisor.get.uuid, actor)
+ if (!actor._supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actor + " since it is not under supervision")
+ else supervisors.putIfAbsent(actor._supervisor.get.uuid, actor)
def deregisterSupervisorForActor(actor: Actor) =
- if (!actor.supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actor + " since it is not under supervision")
- else supervisors.remove(actor.supervisor.get.uuid)
+ if (!actor._supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actor + " since it is not under supervision")
+ else supervisors.remove(actor._supervisor.get.uuid)
def deregisterSupervisorWithUuid(uuid: String) = supervisors.remove(uuid)
}
@@ -169,8 +169,8 @@ class RemoteClientHandler(val name: String,
val supervisorUuid = reply.getSupervisorUuid
if (!supervisors.containsKey(supervisorUuid)) throw new IllegalStateException("Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found")
val supervisedActor = supervisors.get(supervisorUuid)
- if (!supervisedActor.supervisor.isDefined) throw new IllegalStateException("Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
- else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply))
+ if (!supervisedActor._supervisor.isDefined) throw new IllegalStateException("Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
+ else supervisedActor._supervisor.get ! Exit(supervisedActor, parseException(reply))
}
future.completeWithException(null, parseException(reply))
}
diff --git a/akka-actors/src/main/scala/stm/TransactionManagement.scala b/akka-actors/src/main/scala/stm/TransactionManagement.scala
index f8bf35c10d..ad6c49463e 100644
--- a/akka-actors/src/main/scala/stm/TransactionManagement.scala
+++ b/akka-actors/src/main/scala/stm/TransactionManagement.scala
@@ -8,8 +8,6 @@ import java.util.concurrent.atomic.AtomicBoolean
import se.scalablesolutions.akka.util.Logging
-import org.codehaus.aspectwerkz.proxy.Uuid
-
import scala.collection.mutable.HashSet
import org.multiverse.utils.ThreadLocalTransaction._
@@ -38,9 +36,6 @@ object TransactionManagement extends TransactionManagement {
}
trait TransactionManagement extends Logging {
- // FIXME http://www.assembla.com/spaces/akka/tickets/56-Change-UUID-generation-for-the-TransactionManagement-trait
- var uuid = Uuid.newUuid.toString
-
import TransactionManagement.currentTransaction
private[akka] val activeTransactions = new HashSet[Transaction]
diff --git a/akka-actors/src/test/scala/EventBasedSingleThreadActorTest.scala b/akka-actors/src/test/scala/EventBasedSingleThreadActorTest.scala
index 150c9ee8e4..d8a3917293 100644
--- a/akka-actors/src/test/scala/EventBasedSingleThreadActorTest.scala
+++ b/akka-actors/src/test/scala/EventBasedSingleThreadActorTest.scala
@@ -11,7 +11,7 @@ class EventBasedSingleThreadActorTest extends JUnitSuite {
private val unit = TimeUnit.MILLISECONDS
class TestActor extends Actor {
- dispatcher = Dispatchers.newEventBasedSingleThreadDispatcher(name)
+ dispatcher = Dispatchers.newEventBasedSingleThreadDispatcher(uuid)
def receive: PartialFunction[Any, Unit] = {
case "Hello" =>