diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index 8253bb6d10..b227a6ffe2 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -29,7 +29,8 @@ import java.lang.reflect.Field import scala.reflect.BeanProperty import scala.collection.immutable.Stack -object ActorRefStatus { +private[akka] object ActorRefInternals { + /** LifeCycles for ActorRefs */ private[akka] sealed trait StatusType @@ -37,8 +38,13 @@ object ActorRefStatus { object RUNNING extends StatusType object BEING_RESTARTED extends StatusType object SHUTDOWN extends StatusType + + case class TransactorConfig(factory: Option[TransactionFactory] = None, config: TransactionConfig = DefaultGlobalTransactionConfig) + val DefaultTransactorConfig = TransactorConfig() + val NoTransactionConfig = TransactorConfig() } + /** * ActorRef is an immutable and serializable handle to an Actor. *
@@ -77,7 +83,7 @@ trait ActorRef extends ActorRefShared with TransactionManagement with Logging wi @volatile protected[akka] var _uuid = newUuid @volatile - protected[this] var _status: ActorRefStatus.StatusType = ActorRefStatus.UNSTARTED + protected[this] var _status: ActorRefInternals.StatusType = ActorRefInternals.UNSTARTED @volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServerModule.HOSTNAME, RemoteServerModule.PORT) @volatile @@ -172,22 +178,18 @@ trait ActorRef extends ActorRefShared with TransactionManagement with Logging wi * start if there is no one running, else it joins the existing transaction. */ @volatile - protected[akka] var isTransactor = false + protected[akka] var transactorConfig = ActorRefInternals.NoTransactionConfig /** - * Configuration for TransactionFactory. User overridable. + * Returns true if this Actor is a Transactor */ - @volatile - protected[akka] var _transactionConfig: TransactionConfig = DefaultGlobalTransactionConfig + def isTransactor: Boolean = { + val c = transactorConfig + (c ne ActorRefInternals.NoTransactionConfig) && (c ne null) //Could possibly be null if called before var init + } /** - * TransactionFactory to be used for atomic when isTransactor. Configuration is overridable. - */ - @volatile - private[akka] var _transactionFactory: Option[TransactionFactory] = None - - /** - * This is a reference to the message currently being processed by the actor + * This is a reference to the message currently being processed by the actor */ @volatile protected[akka] var currentMessage: MessageInvocation = null @@ -220,25 +222,25 @@ trait ActorRef extends ActorRefShared with TransactionManagement with Logging wi /** * Is the actor being restarted? */ - def isBeingRestarted: Boolean = _status == ActorRefStatus.BEING_RESTARTED + def isBeingRestarted: Boolean = _status == ActorRefInternals.BEING_RESTARTED /** * Is the actor running? */ def isRunning: Boolean = _status match { - case ActorRefStatus.BEING_RESTARTED | ActorRefStatus.RUNNING => true + case ActorRefInternals.BEING_RESTARTED | ActorRefInternals.RUNNING => true case _ => false } /** * Is the actor shut down? */ - def isShutdown: Boolean = _status == ActorRefStatus.SHUTDOWN + def isShutdown: Boolean = _status == ActorRefInternals.SHUTDOWN /** * Is the actor ever started? */ - def isUnstarted: Boolean = _status == ActorRefStatus.UNSTARTED + def isUnstarted: Boolean = _status == ActorRefInternals.UNSTARTED /** * Is the actor able to handle the message passed in as arguments? @@ -677,7 +679,7 @@ class LocalActorRef private[akka] ( _uuid = __uuid id = __id homeAddress = (__hostname, __port) - isTransactor = __isTransactor + transactorConfig = if (__isTransactor) ActorRefInternals.DefaultTransactorConfig else ActorRefInternals.NoTransactionConfig timeout = __timeout receiveTimeout = __receiveTimeout lifeCycle = __lifeCycle @@ -745,7 +747,10 @@ class LocalActorRef private[akka] ( * However, it will always participate in an existing transaction. */ def makeTransactionRequired() = guard.withGuard { - if (!isRunning || isBeingRestarted) isTransactor = true + if (!isRunning || isBeingRestarted) { + if (transactorConfig eq ActorRefInternals.NoTransactionConfig) + transactorConfig = ActorRefInternals.DefaultTransactorConfig + } else throw new ActorInitializationException( "Can not make actor transaction required after it has been started") } @@ -753,8 +758,8 @@ class LocalActorRef private[akka] ( /** * Sets the transaction configuration for this actor. Needs to be invoked before the actor is started. */ - def transactionConfig_=(config: TransactionConfig) = guard.withGuard { - if (!isRunning || isBeingRestarted) _transactionConfig = config + def transactionConfig_=(configuration: TransactionConfig) = guard.withGuard { + if (!isRunning || isBeingRestarted) transactorConfig = transactorConfig.copy(config = configuration) else throw new ActorInitializationException( "Cannot set transaction configuration for actor after it has been started") } @@ -762,7 +767,7 @@ class LocalActorRef private[akka] ( /** * Get the transaction configuration for this actor. */ - def transactionConfig: TransactionConfig = _transactionConfig + def transactionConfig: TransactionConfig = transactorConfig.config /** * Set the contact address for this actor. This is used for replying to messages @@ -785,10 +790,10 @@ class LocalActorRef private[akka] ( if (!isRunning) { dispatcher.register(this) dispatcher.start - if (isTransactor) { - _transactionFactory = Some(TransactionFactory(_transactionConfig, id)) - } - _status = ActorRefStatus.RUNNING + if (isTransactor) + transactorConfig = transactorConfig.copy(factory = Some(TransactionFactory(transactorConfig.config, id))) + + _status = ActorRefInternals.RUNNING //If we are not currently creating this ActorRef instance if ((actorInstance ne null) && (actorInstance.get ne null)) @@ -807,8 +812,8 @@ class LocalActorRef private[akka] ( receiveTimeout = None cancelReceiveTimeout dispatcher.unregister(this) - _transactionFactory = None - _status = ActorRefStatus.SHUTDOWN + transactorConfig = transactorConfig.copy(factory = None) + _status = ActorRefInternals.SHUTDOWN actor.postStop ActorRegistry.unregister(this) if (isRemotingEnabled) { @@ -1066,7 +1071,7 @@ class LocalActorRef private[akka] ( stop } else { - _status = ActorRefStatus.BEING_RESTARTED + _status = ActorRefInternals.BEING_RESTARTED val failedActor = actorInstance.get guard.withGuard { lifeCycle match { @@ -1081,7 +1086,7 @@ class LocalActorRef private[akka] ( if (isProxyableDispatcher(failedActor)) restartProxyableDispatcher(failedActor, reason) else restartActor(failedActor, reason) - _status = ActorRefStatus.RUNNING + _status = ActorRefInternals.RUNNING // update restart parameters if (maxNrOfRetries.isDefined && maxNrOfRetriesCount % maxNrOfRetries.get == 0 && maxNrOfRetriesCount != 0) @@ -1179,12 +1184,13 @@ class LocalActorRef private[akka] ( private def dispatch[T](messageHandle: MessageInvocation) = { Actor.log.trace("Invoking actor with message: %s\n", messageHandle) val message = messageHandle.message //serializeMessage(messageHandle.message) + val isXactor = isTransactor var topLevelTransaction = false val txSet: Option[CountDownCommitBarrier] = if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet else { topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx - if (isTransactor) { + if (isXactor) { Actor.log.trace("Creating a new transaction set (top-level transaction)\n\tfor actor " + toString + "\n\twith message " + messageHandle) Some(createNewTransactionSet) @@ -1194,8 +1200,8 @@ class LocalActorRef private[akka] ( try { cancelReceiveTimeout // FIXME: leave this here? - if (isTransactor) { - val txFactory = _transactionFactory.getOrElse(DefaultGlobalTransactionFactory) + if (isXactor) { + val txFactory = transactorConfig.factory.getOrElse(DefaultGlobalTransactionFactory) atomic(txFactory) { actor(message) setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit @@ -1236,7 +1242,7 @@ class LocalActorRef private[akka] ( private def handleExceptionInDispatch(reason: Throwable, message: Any, topLevelTransaction: Boolean) = { Actor.log.error(reason, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message) - _status = ActorRefStatus.BEING_RESTARTED + _status = ActorRefInternals.BEING_RESTARTED // abort transaction set if (isTransactionSetInScope) { val txSet = getTransactionSetInScope @@ -1373,13 +1379,13 @@ private[akka] case class RemoteActorRef private[akka] ( } def start: ActorRef = synchronized { - _status = ActorRefStatus.RUNNING + _status = ActorRefInternals.RUNNING this } def stop: Unit = synchronized { - if (_status == ActorRefStatus.RUNNING) { - _status = ActorRefStatus.SHUTDOWN + if (_status == ActorRefInternals.RUNNING) { + _status = ActorRefInternals.SHUTDOWN postMessageToMailbox(RemoteActorSystemMessage.Stop, None) } } diff --git a/akka-camel/src/main/scala/component/ActorComponent.scala b/akka-camel/src/main/scala/component/ActorComponent.scala index 297a4c3a84..962bc6c8a9 100644 --- a/akka-camel/src/main/scala/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/component/ActorComponent.scala @@ -206,12 +206,12 @@ private[akka] object AsyncCallbackAdapter { private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCallback) extends ActorRef with ScalaActorRef { def start = { - _status = ActorRefStatus.RUNNING + _status = ActorRefInternals.RUNNING this } def stop() = { - _status = ActorRefStatus.SHUTDOWN + _status = ActorRefInternals.SHUTDOWN } /**