From 3d7eecb243c3d2be76275c48bbbffb135bc17dea Mon Sep 17 00:00:00 2001 From: jboner Date: Wed, 28 Oct 2009 13:20:28 +0100 Subject: [PATCH] cleaned up actor field access modifiers and prefixed internal fields with _ to avoid name clashes --- akka-actors/src/main/scala/actor/Actor.scala | 141 ++++++++++-------- .../src/main/scala/actor/Supervisor.scala | 4 +- .../src/main/scala/dispatch/Reactor.scala | 2 +- .../src/main/scala/nio/RemoteClient.scala | 12 +- .../scala/stm/TransactionManagement.scala | 5 - .../EventBasedSingleThreadActorTest.scala | 2 +- 6 files changed, 88 insertions(+), 78 deletions(-) diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index 73d6691644..b4b721eebd 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -20,6 +20,7 @@ import se.scalablesolutions.akka.util.Helpers.ReadWriteLock import se.scalablesolutions.akka.util.Logging import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint} +import org.codehaus.aspectwerkz.proxy.Uuid import org.multiverse.utils.ThreadLocalTransaction._ @@ -54,28 +55,25 @@ object Actor { } /** + * Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model' * @author Jonas Bonér */ trait Actor extends Logging with TransactionManagement { ActorRegistry.register(this) + + // FIXME http://www.assembla.com/spaces/akka/tickets/56-Change-UUID-generation-for-the-TransactionManagement-trait + val uuid = Uuid.newUuid.toString - @volatile private[this] var isRunning: Boolean = false - private[this] val remoteFlagLock = new ReadWriteLock - private[this] val transactionalFlagLock = new ReadWriteLock - - private var hotswap: Option[PartialFunction[Any, Unit]] = None - private var config: Option[AnyRef] = None - - @volatile protected[this] var isTransactionRequiresNew = false - @volatile protected[this] var remoteAddress: Option[InetSocketAddress] = None - @volatile protected[akka] var supervisor: Option[Actor] = None - - protected[akka] var mailbox: MessageQueue = _ - protected[this] var senderFuture: Option[CompletableFutureResult] = None - protected[this] val linkedActors = new HashSet[Actor] - protected[actor] var lifeCycleConfig: Option[LifeCycle] = None - - val name = this.getClass.getName + // private fields + @volatile private var _isRunning: Boolean = false + private var _hotswap: Option[PartialFunction[Any, Unit]] = None + private var _config: Option[AnyRef] = None + private val _remoteFlagLock = new ReadWriteLock + private var _senderFuture: Option[CompletableFutureResult] = None + private var _remoteAddress: Option[InetSocketAddress] = None + private[akka] val _linkedActors = new HashSet[Actor] + private[akka] var _mailbox: MessageQueue = _ + private[akka] var _supervisor: Option[Actor] = None // ==================================== // ==== USER CALLBACKS TO OVERRIDE ==== @@ -88,6 +86,15 @@ trait Actor extends Logging with TransactionManagement { */ @volatile var timeout: Long = Actor.TIMEOUT + /** + * User overridable callback/setting. + * + * Defines the life-cycle for a supervised actor. + * + * Needs to be set if the actor is supervised programmatically. + */ + @volatile var lifeCycleConfig: Option[LifeCycle] = None + /** * User overridable callback/setting. * @@ -110,7 +117,7 @@ trait Actor extends Logging with TransactionManagement { */ protected[akka] var messageDispatcher: MessageDispatcher = { val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher(getClass.getName) - mailbox = dispatcher.messageQueue + _mailbox = dispatcher.messageQueue dispatcher.registerHandler(this, new ActorMessageInvoker(this)) dispatcher } @@ -129,6 +136,14 @@ trait Actor extends Logging with TransactionManagement { */ protected[this] var trapExit: Boolean = false + /** + * User overridable callback/setting. + * + * Set to true if messages should have REQUIRES_NEW semantics, e.g. a new transaction should + * start if there is no one running, else it joins the existing transaction. + */ + @volatile protected var isTransactionRequiresNew = false + /** * User overridable callback/setting. * @@ -213,9 +228,9 @@ trait Actor extends Logging with TransactionManagement { * Starts up the actor and its message queue. */ def start = synchronized { - if (!isRunning) { + if (!_isRunning) { messageDispatcher.start - isRunning = true + _isRunning = true //if (isTransactional) this !! TransactionalInit } log.info("[%s] has started", toString) @@ -225,11 +240,11 @@ trait Actor extends Logging with TransactionManagement { * Stops the actor and its message queue. */ def stop = synchronized { - if (isRunning) { + if (_isRunning) { dispatcher.unregisterHandler(this) if (dispatcher.isInstanceOf[ThreadBasedDispatcher]) dispatcher.shutdown // FIXME: Need to do reference count to know if EventBasedThreadPoolDispatcher and EventBasedSingleThreadDispatcher can be shut down - isRunning = false + _isRunning = false shutdown } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } @@ -238,7 +253,7 @@ trait Actor extends Logging with TransactionManagement { * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. */ def !(message: AnyRef) = - if (isRunning) postMessageToMailbox(message) + if (_isRunning) postMessageToMailbox(message) else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") /** @@ -251,7 +266,7 @@ trait Actor extends Logging with TransactionManagement { * If you are sending messages using !! then you have to use reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def !![T](message: AnyRef, timeout: Long): Option[T] = if (isRunning) { + def !![T](message: AnyRef, timeout: Long): Option[T] = if (_isRunning) { val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout) val isActiveObject = message.isInstanceOf[Invocation] if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.completeWithResult(None) @@ -283,7 +298,7 @@ trait Actor extends Logging with TransactionManagement { * NOTE: * Should be used with care (almost never), since very dangerous (will block a thread indefinitely if no reply). */ - def !?[T](message: AnyRef): T = if (isRunning) { + def !?[T](message: AnyRef): T = if (_isRunning) { val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, 0) future.awaitBlocking getResultOrThrowException(future).get @@ -297,7 +312,7 @@ trait Actor extends Logging with TransactionManagement { * Does only work together with the actor !! method and/or active objects not annotated * with @oneway. */ - protected[this] def reply(message: AnyRef) = senderFuture match { + protected[this] def reply(message: AnyRef) = _senderFuture match { case None => throw new IllegalStateException( "\n\tNo sender in scope, can't reply. " + "\n\tHave you used the '!' message send or the '@oneway' active object annotation? " + @@ -311,9 +326,9 @@ trait Actor extends Logging with TransactionManagement { * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. */ def dispatcher_=(dispatcher: MessageDispatcher): Unit = synchronized { - if (!isRunning) { + if (!_isRunning) { messageDispatcher = dispatcher - mailbox = messageDispatcher.messageQueue + _mailbox = messageDispatcher.messageQueue messageDispatcher.registerHandler(this, new ActorMessageInvoker(this)) } else throw new IllegalArgumentException("Can not swap dispatcher for " + toString + " after it has been started") } @@ -321,15 +336,15 @@ trait Actor extends Logging with TransactionManagement { /** * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. */ - def makeRemote(hostname: String, port: Int): Unit = remoteFlagLock.withWriteLock { + def makeRemote(hostname: String, port: Int): Unit = _remoteFlagLock.withWriteLock { makeRemote(new InetSocketAddress(hostname, port)) } /** * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. */ - def makeRemote(address: InetSocketAddress): Unit = remoteFlagLock.withWriteLock { - remoteAddress = Some(address) + def makeRemote(address: InetSocketAddress): Unit = _remoteFlagLock.withWriteLock { + _remoteAddress = Some(address) } /** @@ -341,7 +356,7 @@ trait Actor extends Logging with TransactionManagement { * */ def makeTransactionRequired = synchronized { - if (isRunning) throw new IllegalArgumentException("Can not make actor transaction required after it has been started") + if (_isRunning) throw new IllegalArgumentException("Can not make actor transaction required after it has been started") else isTransactionRequiresNew = true } @@ -352,10 +367,10 @@ trait Actor extends Logging with TransactionManagement { * To be invoked from within the actor itself. */ protected[this] def link(actor: Actor) = { - if (isRunning) { - linkedActors.add(actor) - if (actor.supervisor.isDefined) throw new IllegalStateException("Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails") - actor.supervisor = Some(this) + if (_isRunning) { + _linkedActors.add(actor) + if (actor._supervisor.isDefined) throw new IllegalStateException("Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails") + actor._supervisor = Some(this) log.debug("Linking actor [%s] to actor [%s]", actor, this) } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } @@ -366,10 +381,10 @@ trait Actor extends Logging with TransactionManagement { * To be invoked from within the actor itself. */ protected[this] def unlink(actor: Actor) = { - if (isRunning) { - if (!linkedActors.contains(actor)) throw new IllegalStateException("Actor [" + actor + "] is not a linked actor, can't unlink") - linkedActors.remove(actor) - actor.supervisor = None + if (_isRunning) { + if (!_linkedActors.contains(actor)) throw new IllegalStateException("Actor [" + actor + "] is not a linked actor, can't unlink") + _linkedActors.remove(actor) + actor._supervisor = None log.debug("Unlinking actor [%s] from actor [%s]", actor, this) } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } @@ -404,7 +419,7 @@ trait Actor extends Logging with TransactionManagement { val actor = actorClass.newInstance.asInstanceOf[T] if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) { actor.dispatcher = dispatcher - actor.mailbox = mailbox + actor._mailbox = _mailbox } actor.start actor @@ -420,7 +435,7 @@ trait Actor extends Logging with TransactionManagement { actor.makeRemote(hostname, port) if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) { actor.dispatcher = dispatcher - actor.mailbox = mailbox + actor._mailbox = _mailbox } actor.start actor @@ -453,8 +468,8 @@ trait Actor extends Logging with TransactionManagement { // ==== IMPLEMENTATION DETAILS ==== // ================================ - private def postMessageToMailbox(message: AnyRef): Unit = remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime - if (remoteAddress.isDefined) { + private def postMessageToMailbox(message: AnyRef): Unit = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime + if (_remoteAddress.isDefined) { val requestBuilder = RemoteRequest.newBuilder .setId(RemoteRequestIdFactory.nextId) .setTarget(this.getClass.getName) @@ -465,15 +480,15 @@ trait Actor extends Logging with TransactionManagement { val id = registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) RemoteProtocolBuilder.setMessage(message, requestBuilder) - RemoteClient.clientFor(remoteAddress.get).send(requestBuilder.build) + RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build) } else { val handle = new MessageInvocation(this, message, None, currentTransaction.get) handle.send } } - private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime - if (remoteAddress.isDefined) { + private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime + if (_remoteAddress.isDefined) { val requestBuilder = RemoteRequest.newBuilder .setId(RemoteRequestIdFactory.nextId) .setTarget(this.getClass.getName) @@ -484,7 +499,7 @@ trait Actor extends Logging with TransactionManagement { RemoteProtocolBuilder.setMessage(message, requestBuilder) val id = registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) - val future = RemoteClient.clientFor(remoteAddress.get).send(requestBuilder.build) + val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build) if (future.isDefined) future.get else throw new IllegalStateException("Expected a future from remote call to actor " + toString) } else { @@ -509,13 +524,13 @@ trait Actor extends Logging with TransactionManagement { val message = messageHandle.message //serializeMessage(messageHandle.message) val future = messageHandle.future try { - senderFuture = future + _senderFuture = future if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString) } catch { case e => // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client - if (supervisor.isDefined) supervisor.get ! Exit(this, e) + if (_supervisor.isDefined) _supervisor.get ! Exit(this, e) if (future.isDefined) future.get.completeWithException(this, e) else e.printStackTrace } finally { @@ -540,9 +555,9 @@ trait Actor extends Logging with TransactionManagement { } try { - senderFuture = future + _senderFuture = future if (isTransactionRequiresNew && !isTransactionInScope) { - if (senderFuture.isEmpty) throw new StmException( + if (_senderFuture.isEmpty) throw new StmException( "\n\tCan't continue transaction in a one-way fire-forget message send" + "\n\tE.g. using Actor '!' method or Active Object 'void' method" + "\n\tPlease use the Actor '!!', '!?' methods or Active Object method with non-void return type") @@ -560,7 +575,7 @@ trait Actor extends Logging with TransactionManagement { clearTransaction // need to clear currentTransaction before call to supervisor // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client - if (supervisor.isDefined) supervisor.get ! Exit(this, e) + if (_supervisor.isDefined) _supervisor.get ! Exit(this, e) } finally { clearTransaction } @@ -570,11 +585,11 @@ trait Actor extends Logging with TransactionManagement { if (future.exception.isDefined) throw future.exception.get._2 else future.result.asInstanceOf[Option[T]] - private def base: PartialFunction[Any, Unit] = lifeCycle orElse (hotswap getOrElse receive) + private def base: PartialFunction[Any, Unit] = lifeCycle orElse (_hotswap getOrElse receive) private val lifeCycle: PartialFunction[Any, Unit] = { case Init(config) => init(config) - case HotSwap(code) => hotswap = code + case HotSwap(code) => _hotswap = code case Restart(reason) => restart(reason) case Exit(dead, reason) => handleTrapExit(dead, reason) // case TransactionalInit => initTransactionalState @@ -590,12 +605,12 @@ trait Actor extends Logging with TransactionManagement { } } else throw new IllegalStateException("No 'faultHandler' defined for actor with the 'trapExit' flag set to true - can't proceed " + toString) } else { - if (supervisor.isDefined) supervisor.get ! Exit(dead, reason) // if 'trapExit' is not defined then pass the Exit on + if (_supervisor.isDefined) _supervisor.get ! Exit(dead, reason) // if 'trapExit' is not defined then pass the Exit on } } private[this] def restartLinkedActors(reason: AnyRef) = - linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach(_.restart(reason)) + _linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach(_.restart(reason)) private[Actor] def restart(reason: AnyRef) = synchronized { lifeCycleConfig match { @@ -605,9 +620,9 @@ trait Actor extends Logging with TransactionManagement { case Some(LifeCycle(scope, shutdownTime, _)) => { scope match { case Permanent => { - preRestart(reason, config) + preRestart(reason, _config) log.info("Restarting actor [%s] configured as PERMANENT.", id) - postRestart(reason, config) + postRestart(reason, _config) } case Temporary => @@ -626,16 +641,16 @@ trait Actor extends Logging with TransactionManagement { } private[akka] def registerSupervisorAsRemoteActor: Option[String] = synchronized { - if (supervisor.isDefined) { - RemoteClient.clientFor(remoteAddress.get).registerSupervisorForActor(this) - Some(supervisor.get.uuid) + if (_supervisor.isDefined) { + RemoteClient.clientFor(_remoteAddress.get).registerSupervisorForActor(this) + Some(_supervisor.get.uuid) } else None } private[akka] def swapDispatcher(disp: MessageDispatcher) = synchronized { messageDispatcher = disp - mailbox = messageDispatcher.messageQueue + _mailbox = messageDispatcher.messageQueue messageDispatcher.registerHandler(this, new ActorMessageInvoker(this)) } diff --git a/akka-actors/src/main/scala/actor/Supervisor.scala b/akka-actors/src/main/scala/actor/Supervisor.scala index 0c02bccc67..3b5236625d 100644 --- a/akka-actors/src/main/scala/actor/Supervisor.scala +++ b/akka-actors/src/main/scala/actor/Supervisor.scala @@ -127,10 +127,10 @@ class Supervisor private[akka] (handler: FaultHandlingStrategy) extends Actor wi protected def receive: PartialFunction[Any, Unit] = { case StartSupervisor => - linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor => actor.start; log.info("Starting actor: %s", actor) } + _linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor => actor.start; log.info("Starting actor: %s", actor) } case StopSupervisor => - linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor => actor.stop; log.info("Stopping actor: %s", actor) } + _linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor => actor.stop; log.info("Stopping actor: %s", actor) } log.info("Stopping supervisor: %s", this) stop } diff --git a/akka-actors/src/main/scala/dispatch/Reactor.scala b/akka-actors/src/main/scala/dispatch/Reactor.scala index 2949bf1456..36be55a391 100644 --- a/akka-actors/src/main/scala/dispatch/Reactor.scala +++ b/akka-actors/src/main/scala/dispatch/Reactor.scala @@ -46,7 +46,7 @@ class MessageInvocation(val receiver: Actor, private [akka] val nrOfDeliveryAttempts = new AtomicInteger(0) def send = synchronized { - receiver.mailbox.append(this) + receiver._mailbox.append(this) nrOfDeliveryAttempts.incrementAndGet } diff --git a/akka-actors/src/main/scala/nio/RemoteClient.scala b/akka-actors/src/main/scala/nio/RemoteClient.scala index 6ac441dfa7..c25782fe31 100644 --- a/akka-actors/src/main/scala/nio/RemoteClient.scala +++ b/akka-actors/src/main/scala/nio/RemoteClient.scala @@ -108,12 +108,12 @@ class RemoteClient(hostname: String, port: Int) extends Logging { } else throw new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.") def registerSupervisorForActor(actor: Actor) = - if (!actor.supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actor + " since it is not under supervision") - else supervisors.putIfAbsent(actor.supervisor.get.uuid, actor) + if (!actor._supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actor + " since it is not under supervision") + else supervisors.putIfAbsent(actor._supervisor.get.uuid, actor) def deregisterSupervisorForActor(actor: Actor) = - if (!actor.supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actor + " since it is not under supervision") - else supervisors.remove(actor.supervisor.get.uuid) + if (!actor._supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actor + " since it is not under supervision") + else supervisors.remove(actor._supervisor.get.uuid) def deregisterSupervisorWithUuid(uuid: String) = supervisors.remove(uuid) } @@ -169,8 +169,8 @@ class RemoteClientHandler(val name: String, val supervisorUuid = reply.getSupervisorUuid if (!supervisors.containsKey(supervisorUuid)) throw new IllegalStateException("Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found") val supervisedActor = supervisors.get(supervisorUuid) - if (!supervisedActor.supervisor.isDefined) throw new IllegalStateException("Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") - else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply)) + if (!supervisedActor._supervisor.isDefined) throw new IllegalStateException("Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") + else supervisedActor._supervisor.get ! Exit(supervisedActor, parseException(reply)) } future.completeWithException(null, parseException(reply)) } diff --git a/akka-actors/src/main/scala/stm/TransactionManagement.scala b/akka-actors/src/main/scala/stm/TransactionManagement.scala index f8bf35c10d..ad6c49463e 100644 --- a/akka-actors/src/main/scala/stm/TransactionManagement.scala +++ b/akka-actors/src/main/scala/stm/TransactionManagement.scala @@ -8,8 +8,6 @@ import java.util.concurrent.atomic.AtomicBoolean import se.scalablesolutions.akka.util.Logging -import org.codehaus.aspectwerkz.proxy.Uuid - import scala.collection.mutable.HashSet import org.multiverse.utils.ThreadLocalTransaction._ @@ -38,9 +36,6 @@ object TransactionManagement extends TransactionManagement { } trait TransactionManagement extends Logging { - // FIXME http://www.assembla.com/spaces/akka/tickets/56-Change-UUID-generation-for-the-TransactionManagement-trait - var uuid = Uuid.newUuid.toString - import TransactionManagement.currentTransaction private[akka] val activeTransactions = new HashSet[Transaction] diff --git a/akka-actors/src/test/scala/EventBasedSingleThreadActorTest.scala b/akka-actors/src/test/scala/EventBasedSingleThreadActorTest.scala index 150c9ee8e4..d8a3917293 100644 --- a/akka-actors/src/test/scala/EventBasedSingleThreadActorTest.scala +++ b/akka-actors/src/test/scala/EventBasedSingleThreadActorTest.scala @@ -11,7 +11,7 @@ class EventBasedSingleThreadActorTest extends JUnitSuite { private val unit = TimeUnit.MILLISECONDS class TestActor extends Actor { - dispatcher = Dispatchers.newEventBasedSingleThreadDispatcher(name) + dispatcher = Dispatchers.newEventBasedSingleThreadDispatcher(uuid) def receive: PartialFunction[Any, Unit] = { case "Hello" =>