Updated actor ref to use transaction factory

This commit is contained in:
Peter Vlugter 2010-06-10 16:01:23 +12:00
parent ef4f525b52
commit 35743ee78c

View file

@ -9,6 +9,7 @@ import se.scalablesolutions.akka.config.Config.config
import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.stm.Transaction.Global._
import se.scalablesolutions.akka.stm.{TransactionConfig, TransactionFactory, TransactionManagement}
import se.scalablesolutions.akka.stm.TransactionManagement._
import se.scalablesolutions.akka.stm.TransactionManagement
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
@ -283,7 +284,17 @@ trait ActorRef extends TransactionManagement {
*/
@volatile protected[akka] var isTransactor = false
/**v
/**
* Configuration for TransactionFactory. User overridable.
*/
protected[akka] var _transactionConfig: TransactionConfig = DefaultGlobalTransactionConfig
/**
* TransactionFactory to be used for atomic when isTransactor. Configuration is overridable.
*/
private[akka] var _transactionFactory: Option[TransactionFactory] = None
/**
* This lock ensures thread safety in the dispatching: only one message can
* be dispatched at once on the actor.
*/
@ -500,6 +511,16 @@ trait ActorRef extends TransactionManagement {
*/
def makeTransactionRequired: Unit
/**
* Sets the transaction configuration for this actor. Needs to be invoked before the actor is started.
*/
def transactionConfig_=(config: TransactionConfig): Unit
/**
* Get the transaction configuration for this actor.
*/
def transactionConfig: TransactionConfig
/**
* Returns the home address and port for this actor.
*/
@ -870,6 +891,20 @@ sealed class LocalActorRef private[akka](
"Can not make actor transaction required after it has been started")
}
/**
* Sets the transaction configuration for this actor. Needs to be invoked before the actor is started.
*/
def transactionConfig_=(config: TransactionConfig) = guard.withGuard {
if (!isRunning || isBeingRestarted) _transactionConfig = config
else throw new ActorInitializationException(
"Cannot set transaction configuration for actor after it has been started")
}
/**
* Get the transaction configuration for this actor.
*/
def transactionConfig: TransactionConfig = guard.withGuard { _transactionConfig }
/**
* Set the contact address for this actor. This is used for replying to messages
* sent asynchronously when no reply channel exists.
@ -891,6 +926,9 @@ sealed class LocalActorRef private[akka](
if (!isRunning) {
dispatcher.register(this)
dispatcher.start
if (isTransactor) {
_transactionFactory = Some(TransactionFactory(_transactionConfig, actorClass.getName))
}
_isRunning = true
if (!isInInitialization) initializeActorInstance
else runActorInitialization = true
@ -904,6 +942,7 @@ sealed class LocalActorRef private[akka](
def stop = guard.withGuard {
if (isRunning) {
dispatcher.unregister(this)
_transactionFactory = None
_isRunning = false
_isShutDown = true
actor.shutdown
@ -1212,7 +1251,8 @@ sealed class LocalActorRef private[akka](
try {
if (isTransactor) {
atomic {
val txFactory = _transactionFactory.getOrElse(DefaultGlobalTransactionFactory)
atomic(txFactory) {
actor.base(message)
setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
}
@ -1452,6 +1492,8 @@ private[akka] case class RemoteActorRef private[akka] (
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
def dispatcher: MessageDispatcher = unsupported
def makeTransactionRequired: Unit = unsupported
def transactionConfig_=(config: TransactionConfig): Unit = unsupported
def transactionConfig: TransactionConfig = unsupported
def makeRemote(hostname: String, port: Int): Unit = unsupported
def makeRemote(address: InetSocketAddress): Unit = unsupported
def homeAddress_=(address: InetSocketAddress): Unit = unsupported