Updated actor ref to use transaction factory
This commit is contained in:
parent
c5caf5832c
commit
02b4403138
1 changed files with 44 additions and 2 deletions
|
|
@ -9,6 +9,7 @@ import se.scalablesolutions.akka.config.Config.config
|
|||
import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy}
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.stm.Transaction.Global._
|
||||
import se.scalablesolutions.akka.stm.{TransactionConfig, TransactionFactory, TransactionManagement}
|
||||
import se.scalablesolutions.akka.stm.TransactionManagement._
|
||||
import se.scalablesolutions.akka.stm.TransactionManagement
|
||||
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
|
||||
|
|
@ -283,7 +284,17 @@ trait ActorRef extends TransactionManagement {
|
|||
*/
|
||||
@volatile protected[akka] var isTransactor = false
|
||||
|
||||
/**v
|
||||
/**
|
||||
* Configuration for TransactionFactory. User overridable.
|
||||
*/
|
||||
protected[akka] var _transactionConfig: TransactionConfig = DefaultGlobalTransactionConfig
|
||||
|
||||
/**
|
||||
* TransactionFactory to be used for atomic when isTransactor. Configuration is overridable.
|
||||
*/
|
||||
private[akka] var _transactionFactory: Option[TransactionFactory] = None
|
||||
|
||||
/**
|
||||
* This lock ensures thread safety in the dispatching: only one message can
|
||||
* be dispatched at once on the actor.
|
||||
*/
|
||||
|
|
@ -500,6 +511,16 @@ trait ActorRef extends TransactionManagement {
|
|||
*/
|
||||
def makeTransactionRequired: Unit
|
||||
|
||||
/**
|
||||
* Sets the transaction configuration for this actor. Needs to be invoked before the actor is started.
|
||||
*/
|
||||
def transactionConfig_=(config: TransactionConfig): Unit
|
||||
|
||||
/**
|
||||
* Get the transaction configuration for this actor.
|
||||
*/
|
||||
def transactionConfig: TransactionConfig
|
||||
|
||||
/**
|
||||
* Returns the home address and port for this actor.
|
||||
*/
|
||||
|
|
@ -870,6 +891,20 @@ sealed class LocalActorRef private[akka](
|
|||
"Can not make actor transaction required after it has been started")
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the transaction configuration for this actor. Needs to be invoked before the actor is started.
|
||||
*/
|
||||
def transactionConfig_=(config: TransactionConfig) = guard.withGuard {
|
||||
if (!isRunning || isBeingRestarted) _transactionConfig = config
|
||||
else throw new ActorInitializationException(
|
||||
"Cannot set transaction configuration for actor after it has been started")
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the transaction configuration for this actor.
|
||||
*/
|
||||
def transactionConfig: TransactionConfig = guard.withGuard { _transactionConfig }
|
||||
|
||||
/**
|
||||
* Set the contact address for this actor. This is used for replying to messages
|
||||
* sent asynchronously when no reply channel exists.
|
||||
|
|
@ -891,6 +926,9 @@ sealed class LocalActorRef private[akka](
|
|||
if (!isRunning) {
|
||||
dispatcher.register(this)
|
||||
dispatcher.start
|
||||
if (isTransactor) {
|
||||
_transactionFactory = Some(TransactionFactory(_transactionConfig, actorClass.getName))
|
||||
}
|
||||
_isRunning = true
|
||||
if (!isInInitialization) initializeActorInstance
|
||||
else runActorInitialization = true
|
||||
|
|
@ -904,6 +942,7 @@ sealed class LocalActorRef private[akka](
|
|||
def stop = guard.withGuard {
|
||||
if (isRunning) {
|
||||
dispatcher.unregister(this)
|
||||
_transactionFactory = None
|
||||
_isRunning = false
|
||||
_isShutDown = true
|
||||
actor.shutdown
|
||||
|
|
@ -1212,7 +1251,8 @@ sealed class LocalActorRef private[akka](
|
|||
|
||||
try {
|
||||
if (isTransactor) {
|
||||
atomic {
|
||||
val txFactory = _transactionFactory.getOrElse(DefaultGlobalTransactionFactory)
|
||||
atomic(txFactory) {
|
||||
actor.base(message)
|
||||
setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
|
||||
}
|
||||
|
|
@ -1452,6 +1492,8 @@ private[akka] case class RemoteActorRef private[akka] (
|
|||
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
|
||||
def dispatcher: MessageDispatcher = unsupported
|
||||
def makeTransactionRequired: Unit = unsupported
|
||||
def transactionConfig_=(config: TransactionConfig): Unit = unsupported
|
||||
def transactionConfig: TransactionConfig = unsupported
|
||||
def makeRemote(hostname: String, port: Int): Unit = unsupported
|
||||
def makeRemote(address: InetSocketAddress): Unit = unsupported
|
||||
def homeAddress_=(address: InetSocketAddress): Unit = unsupported
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue