Folding 3 volatiles into 1, all transactor-based stuff

This commit is contained in:
Viktor Klang 2010-10-09 15:13:55 +02:00
parent 33593e8f04
commit fa0db6cbcd
2 changed files with 45 additions and 39 deletions

View file

@ -29,7 +29,8 @@ import java.lang.reflect.Field
import scala.reflect.BeanProperty
import scala.collection.immutable.Stack
object ActorRefStatus {
private[akka] object ActorRefInternals {
/** LifeCycles for ActorRefs
*/
private[akka] sealed trait StatusType
@ -37,8 +38,13 @@ object ActorRefStatus {
object RUNNING extends StatusType
object BEING_RESTARTED extends StatusType
object SHUTDOWN extends StatusType
case class TransactorConfig(factory: Option[TransactionFactory] = None, config: TransactionConfig = DefaultGlobalTransactionConfig)
val DefaultTransactorConfig = TransactorConfig()
val NoTransactionConfig = TransactorConfig()
}
/**
* ActorRef is an immutable and serializable handle to an Actor.
* <p/>
@ -77,7 +83,7 @@ trait ActorRef extends ActorRefShared with TransactionManagement with Logging wi
@volatile
protected[akka] var _uuid = newUuid
@volatile
protected[this] var _status: ActorRefStatus.StatusType = ActorRefStatus.UNSTARTED
protected[this] var _status: ActorRefInternals.StatusType = ActorRefInternals.UNSTARTED
@volatile
protected[akka] var _homeAddress = new InetSocketAddress(RemoteServerModule.HOSTNAME, RemoteServerModule.PORT)
@volatile
@ -172,22 +178,18 @@ trait ActorRef extends ActorRefShared with TransactionManagement with Logging wi
* start if there is no one running, else it joins the existing transaction.
*/
@volatile
protected[akka] var isTransactor = false
protected[akka] var transactorConfig = ActorRefInternals.NoTransactionConfig
/**
* Configuration for TransactionFactory. User overridable.
* Returns true if this Actor is a Transactor
*/
@volatile
protected[akka] var _transactionConfig: TransactionConfig = DefaultGlobalTransactionConfig
def isTransactor: Boolean = {
val c = transactorConfig
(c ne ActorRefInternals.NoTransactionConfig) && (c ne null) //Could possibly be null if called before var init
}
/**
* TransactionFactory to be used for atomic when isTransactor. Configuration is overridable.
*/
@volatile
private[akka] var _transactionFactory: Option[TransactionFactory] = None
/**
* This is a reference to the message currently being processed by the actor
* This is a reference to the message currently being processed by the actor
*/
@volatile
protected[akka] var currentMessage: MessageInvocation = null
@ -220,25 +222,25 @@ trait ActorRef extends ActorRefShared with TransactionManagement with Logging wi
/**
* Is the actor being restarted?
*/
def isBeingRestarted: Boolean = _status == ActorRefStatus.BEING_RESTARTED
def isBeingRestarted: Boolean = _status == ActorRefInternals.BEING_RESTARTED
/**
* Is the actor running?
*/
def isRunning: Boolean = _status match {
case ActorRefStatus.BEING_RESTARTED | ActorRefStatus.RUNNING => true
case ActorRefInternals.BEING_RESTARTED | ActorRefInternals.RUNNING => true
case _ => false
}
/**
* Is the actor shut down?
*/
def isShutdown: Boolean = _status == ActorRefStatus.SHUTDOWN
def isShutdown: Boolean = _status == ActorRefInternals.SHUTDOWN
/**
* Is the actor ever started?
*/
def isUnstarted: Boolean = _status == ActorRefStatus.UNSTARTED
def isUnstarted: Boolean = _status == ActorRefInternals.UNSTARTED
/**
* Is the actor able to handle the message passed in as arguments?
@ -677,7 +679,7 @@ class LocalActorRef private[akka] (
_uuid = __uuid
id = __id
homeAddress = (__hostname, __port)
isTransactor = __isTransactor
transactorConfig = if (__isTransactor) ActorRefInternals.DefaultTransactorConfig else ActorRefInternals.NoTransactionConfig
timeout = __timeout
receiveTimeout = __receiveTimeout
lifeCycle = __lifeCycle
@ -745,7 +747,10 @@ class LocalActorRef private[akka] (
* However, it will always participate in an existing transaction.
*/
def makeTransactionRequired() = guard.withGuard {
if (!isRunning || isBeingRestarted) isTransactor = true
if (!isRunning || isBeingRestarted) {
if (transactorConfig eq ActorRefInternals.NoTransactionConfig)
transactorConfig = ActorRefInternals.DefaultTransactorConfig
}
else throw new ActorInitializationException(
"Can not make actor transaction required after it has been started")
}
@ -753,8 +758,8 @@ class LocalActorRef private[akka] (
/**
* Sets the transaction configuration for this actor. Needs to be invoked before the actor is started.
*/
def transactionConfig_=(config: TransactionConfig) = guard.withGuard {
if (!isRunning || isBeingRestarted) _transactionConfig = config
def transactionConfig_=(configuration: TransactionConfig) = guard.withGuard {
if (!isRunning || isBeingRestarted) transactorConfig = transactorConfig.copy(config = configuration)
else throw new ActorInitializationException(
"Cannot set transaction configuration for actor after it has been started")
}
@ -762,7 +767,7 @@ class LocalActorRef private[akka] (
/**
* Get the transaction configuration for this actor.
*/
def transactionConfig: TransactionConfig = _transactionConfig
def transactionConfig: TransactionConfig = transactorConfig.config
/**
* Set the contact address for this actor. This is used for replying to messages
@ -785,10 +790,10 @@ class LocalActorRef private[akka] (
if (!isRunning) {
dispatcher.register(this)
dispatcher.start
if (isTransactor) {
_transactionFactory = Some(TransactionFactory(_transactionConfig, id))
}
_status = ActorRefStatus.RUNNING
if (isTransactor)
transactorConfig = transactorConfig.copy(factory = Some(TransactionFactory(transactorConfig.config, id)))
_status = ActorRefInternals.RUNNING
//If we are not currently creating this ActorRef instance
if ((actorInstance ne null) && (actorInstance.get ne null))
@ -807,8 +812,8 @@ class LocalActorRef private[akka] (
receiveTimeout = None
cancelReceiveTimeout
dispatcher.unregister(this)
_transactionFactory = None
_status = ActorRefStatus.SHUTDOWN
transactorConfig = transactorConfig.copy(factory = None)
_status = ActorRefInternals.SHUTDOWN
actor.postStop
ActorRegistry.unregister(this)
if (isRemotingEnabled) {
@ -1066,7 +1071,7 @@ class LocalActorRef private[akka] (
stop
} else {
_status = ActorRefStatus.BEING_RESTARTED
_status = ActorRefInternals.BEING_RESTARTED
val failedActor = actorInstance.get
guard.withGuard {
lifeCycle match {
@ -1081,7 +1086,7 @@ class LocalActorRef private[akka] (
if (isProxyableDispatcher(failedActor)) restartProxyableDispatcher(failedActor, reason)
else restartActor(failedActor, reason)
_status = ActorRefStatus.RUNNING
_status = ActorRefInternals.RUNNING
// update restart parameters
if (maxNrOfRetries.isDefined && maxNrOfRetriesCount % maxNrOfRetries.get == 0 && maxNrOfRetriesCount != 0)
@ -1179,12 +1184,13 @@ class LocalActorRef private[akka] (
private def dispatch[T](messageHandle: MessageInvocation) = {
Actor.log.trace("Invoking actor with message: %s\n", messageHandle)
val message = messageHandle.message //serializeMessage(messageHandle.message)
val isXactor = isTransactor
var topLevelTransaction = false
val txSet: Option[CountDownCommitBarrier] =
if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet
else {
topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx
if (isTransactor) {
if (isXactor) {
Actor.log.trace("Creating a new transaction set (top-level transaction)\n\tfor actor " + toString +
"\n\twith message " + messageHandle)
Some(createNewTransactionSet)
@ -1194,8 +1200,8 @@ class LocalActorRef private[akka] (
try {
cancelReceiveTimeout // FIXME: leave this here?
if (isTransactor) {
val txFactory = _transactionFactory.getOrElse(DefaultGlobalTransactionFactory)
if (isXactor) {
val txFactory = transactorConfig.factory.getOrElse(DefaultGlobalTransactionFactory)
atomic(txFactory) {
actor(message)
setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
@ -1236,7 +1242,7 @@ class LocalActorRef private[akka] (
private def handleExceptionInDispatch(reason: Throwable, message: Any, topLevelTransaction: Boolean) = {
Actor.log.error(reason, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
_status = ActorRefStatus.BEING_RESTARTED
_status = ActorRefInternals.BEING_RESTARTED
// abort transaction set
if (isTransactionSetInScope) {
val txSet = getTransactionSetInScope
@ -1373,13 +1379,13 @@ private[akka] case class RemoteActorRef private[akka] (
}
def start: ActorRef = synchronized {
_status = ActorRefStatus.RUNNING
_status = ActorRefInternals.RUNNING
this
}
def stop: Unit = synchronized {
if (_status == ActorRefStatus.RUNNING) {
_status = ActorRefStatus.SHUTDOWN
if (_status == ActorRefInternals.RUNNING) {
_status = ActorRefInternals.SHUTDOWN
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
}
}

View file

@ -206,12 +206,12 @@ private[akka] object AsyncCallbackAdapter {
private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCallback) extends ActorRef with ScalaActorRef {
def start = {
_status = ActorRefStatus.RUNNING
_status = ActorRefInternals.RUNNING
this
}
def stop() = {
_status = ActorRefStatus.SHUTDOWN
_status = ActorRefInternals.SHUTDOWN
}
/**