pekko/akka-core/src/main/scala/stm/TransactionManagement.scala

187 lines
6.3 KiB
Scala
Raw Normal View History

/**
2009-12-27 16:01:53 +01:00
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
2009-09-02 09:10:21 +02:00
package se.scalablesolutions.akka.stm
import se.scalablesolutions.akka.util.Logging
import java.util.concurrent.atomic.AtomicBoolean
2010-06-18 17:13:10 +12:00
import org.multiverse.api.{StmUtils => MultiverseStmUtils}
2009-11-25 20:37:00 +01:00
import org.multiverse.api.ThreadLocalTransaction._
import org.multiverse.api.{Transaction => MultiverseTransaction}
import org.multiverse.commitbarriers.CountDownCommitBarrier
import org.multiverse.templates.{TransactionalCallable, OrElseTemplate}
class StmException(msg: String) extends RuntimeException(msg)
class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
override def toString = "TransactionAwareWrapperException[" + cause + ", " + tx + "]"
}
object TransactionManagement extends TransactionManagement {
import se.scalablesolutions.akka.config.Config._
// move to stm.global.fair?
val FAIR_TRANSACTIONS = config.getBool("akka.stm.fair", true)
private[akka] val transactionSet = new ThreadLocal[Option[CountDownCommitBarrier]]() {
override protected def initialValue: Option[CountDownCommitBarrier] = None
}
private[akka] val transaction = new ThreadLocal[Option[Transaction]]() {
override protected def initialValue: Option[Transaction] = None
}
private[akka] def getTransactionSet: CountDownCommitBarrier = {
val option = transactionSet.get
2010-04-22 09:53:45 +02:00
if ((option eq null) || option.isEmpty) throw new StmConfigurationException("No Transaction set in scope")
else option.get
}
private[akka] def getTransaction: Transaction = {
val option = transaction.get
2010-04-22 09:53:45 +02:00
if ((option eq null) || option.isEmpty) throw new StmConfigurationException("No Transaction in scope")
option.get
}
}
trait TransactionManagement {
private[akka] def createNewTransactionSet: CountDownCommitBarrier = {
val txSet = new CountDownCommitBarrier(1, TransactionManagement.FAIR_TRANSACTIONS)
TransactionManagement.transactionSet.set(Some(txSet))
txSet
}
private[akka] def setTransactionSet(txSet: Option[CountDownCommitBarrier]) =
if (txSet.isDefined) TransactionManagement.transactionSet.set(txSet)
private[akka] def setTransaction(tx: Option[Transaction]) =
if (tx.isDefined) TransactionManagement.transaction.set(tx)
private[akka] def clearTransactionSet = {
TransactionManagement.transactionSet.set(None)
}
private[akka] def clearTransaction = {
TransactionManagement.transaction.set(None)
setThreadLocalTransaction(null)
}
private[akka] def getTransactionSetInScope = TransactionManagement.getTransactionSet
private[akka] def getTransactionInScope = TransactionManagement.getTransaction
2010-02-22 13:22:10 +01:00
private[akka] def isTransactionSetInScope = {
val option = TransactionManagement.transactionSet.get
(option ne null) && option.isDefined
}
private[akka] def isTransactionInScope = {
val option = TransactionManagement.transaction.get
(option ne null) && option.isDefined
}
}
2010-06-18 17:13:10 +12:00
/**
* Local transaction management, local in the context of threads.
* Use this if you do <b>not</b> need to have one transaction span
* multiple threads (or Actors).
* <p/>
* Example of atomic transaction management using the atomic block.
* <p/>
* <pre>
* import se.scalablesolutions.akka.stm.local._
*
* atomic {
* // do something within a transaction
* }
* </pre>
*/
class LocalStm extends TransactionManagement with Logging {
val DefaultLocalTransactionConfig = TransactionConfig()
val DefaultLocalTransactionFactory = TransactionFactory(DefaultLocalTransactionConfig, "DefaultLocalTransaction")
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultLocalTransactionFactory): T = atomic(factory)(body)
def atomic[T](factory: TransactionFactory)(body: => T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = {
factory.addHooks
body
}
})
}
}
2010-06-18 17:13:10 +12:00
/**
* Global transaction management, global in the context of multiple threads.
2010-06-20 16:40:18 +12:00
* Use this if you need to have one transaction span multiple threads (or Actors).
2010-06-18 17:13:10 +12:00
* <p/>
* Example of atomic transaction management using the atomic block:
* <p/>
* <pre>
* import se.scalablesolutions.akka.stm.global._
*
* atomic {
* // do something within a transaction
* }
* </pre>
*/
class GlobalStm extends TransactionManagement with Logging {
val DefaultGlobalTransactionConfig = TransactionConfig()
val DefaultGlobalTransactionFactory = TransactionFactory(DefaultGlobalTransactionConfig, "DefaultGlobalTransaction")
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultGlobalTransactionFactory): T = atomic(factory)(body)
def atomic[T](factory: TransactionFactory)(body: => T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = {
if (!isTransactionSetInScope) createNewTransactionSet
factory.addHooks
val result = body
val txSet = getTransactionSetInScope
log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet)
// FIXME ? txSet.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
txSet.joinCommit(mtx)
clearTransaction
result
}
})
}
}
trait StmUtil {
2010-06-18 17:13:10 +12:00
/**
* Schedule a deferred task on the thread local transaction (use within an atomic).
* This is executed when the transaction commits.
*/
def deferred[T](body: => T): Unit = MultiverseStmUtils.scheduleDeferredTask(new Runnable { def run = body })
/**
* Schedule a compensating task on the thread local transaction (use within an atomic).
* This is executed when the transaction aborts.
*/
def compensating[T](body: => T): Unit = MultiverseStmUtils.scheduleCompensatingTask(new Runnable { def run = body })
/**
2010-06-20 16:40:18 +12:00
* STM retry for blocking transactions (use within an atomic).
* Can be used to wait for a condition.
2010-06-18 17:13:10 +12:00
*/
def retry = MultiverseStmUtils.retry
/**
* Use either-orElse to combine two blocking transactions.
*/
def either[T](firstBody: => T) = new {
def orElse(secondBody: => T) = new OrElseTemplate[T] {
def either(mtx: MultiverseTransaction) = firstBody
def orelse(mtx: MultiverseTransaction) = secondBody
}.execute()
}
}