From 02b440313803dcec386e444bcd44db2e295048d2 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Thu, 10 Jun 2010 16:01:23 +1200 Subject: [PATCH] Updated actor ref to use transaction factory --- akka-core/src/main/scala/actor/ActorRef.scala | 46 ++++++++++++++++++- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index b9bcd08938..5413f4fc2a 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -9,6 +9,7 @@ import se.scalablesolutions.akka.config.Config.config import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy} import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.stm.Transaction.Global._ +import se.scalablesolutions.akka.stm.{TransactionConfig, TransactionFactory, TransactionManagement} import se.scalablesolutions.akka.stm.TransactionManagement._ import se.scalablesolutions.akka.stm.TransactionManagement import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ @@ -283,7 +284,17 @@ trait ActorRef extends TransactionManagement { */ @volatile protected[akka] var isTransactor = false - /**v + /** + * Configuration for TransactionFactory. User overridable. + */ + protected[akka] var _transactionConfig: TransactionConfig = DefaultGlobalTransactionConfig + + /** + * TransactionFactory to be used for atomic when isTransactor. Configuration is overridable. + */ + private[akka] var _transactionFactory: Option[TransactionFactory] = None + + /** * This lock ensures thread safety in the dispatching: only one message can * be dispatched at once on the actor. */ @@ -500,6 +511,16 @@ trait ActorRef extends TransactionManagement { */ def makeTransactionRequired: Unit + /** + * Sets the transaction configuration for this actor. Needs to be invoked before the actor is started. + */ + def transactionConfig_=(config: TransactionConfig): Unit + + /** + * Get the transaction configuration for this actor. + */ + def transactionConfig: TransactionConfig + /** * Returns the home address and port for this actor. */ @@ -870,6 +891,20 @@ sealed class LocalActorRef private[akka]( "Can not make actor transaction required after it has been started") } + /** + * Sets the transaction configuration for this actor. Needs to be invoked before the actor is started. + */ + def transactionConfig_=(config: TransactionConfig) = guard.withGuard { + if (!isRunning || isBeingRestarted) _transactionConfig = config + else throw new ActorInitializationException( + "Cannot set transaction configuration for actor after it has been started") + } + + /** + * Get the transaction configuration for this actor. + */ + def transactionConfig: TransactionConfig = guard.withGuard { _transactionConfig } + /** * Set the contact address for this actor. This is used for replying to messages * sent asynchronously when no reply channel exists. @@ -891,6 +926,9 @@ sealed class LocalActorRef private[akka]( if (!isRunning) { dispatcher.register(this) dispatcher.start + if (isTransactor) { + _transactionFactory = Some(TransactionFactory(_transactionConfig, actorClass.getName)) + } _isRunning = true if (!isInInitialization) initializeActorInstance else runActorInitialization = true @@ -904,6 +942,7 @@ sealed class LocalActorRef private[akka]( def stop = guard.withGuard { if (isRunning) { dispatcher.unregister(this) + _transactionFactory = None _isRunning = false _isShutDown = true actor.shutdown @@ -1212,7 +1251,8 @@ sealed class LocalActorRef private[akka]( try { if (isTransactor) { - atomic { + val txFactory = _transactionFactory.getOrElse(DefaultGlobalTransactionFactory) + atomic(txFactory) { actor.base(message) setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit } @@ -1452,6 +1492,8 @@ private[akka] case class RemoteActorRef private[akka] ( def dispatcher_=(md: MessageDispatcher): Unit = unsupported def dispatcher: MessageDispatcher = unsupported def makeTransactionRequired: Unit = unsupported + def transactionConfig_=(config: TransactionConfig): Unit = unsupported + def transactionConfig: TransactionConfig = unsupported def makeRemote(hostname: String, port: Int): Unit = unsupported def makeRemote(address: InetSocketAddress): Unit = unsupported def homeAddress_=(address: InetSocketAddress): Unit = unsupported