/** * Copyright (C) 2009-2010 Scalable Solutions AB */ package se.scalablesolutions.akka.stm import se.scalablesolutions.akka.util.Logging import java.util.concurrent.atomic.AtomicBoolean import org.multiverse.api.{StmUtils => MultiverseStmUtils} import org.multiverse.api.ThreadLocalTransaction._ import org.multiverse.api.{Transaction => MultiverseTransaction} import org.multiverse.commitbarriers.CountDownCommitBarrier import org.multiverse.templates.{TransactionalCallable, OrElseTemplate} class StmException(msg: String) extends RuntimeException(msg) class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) { override def toString = "TransactionAwareWrapperException[" + cause + ", " + tx + "]" } object TransactionManagement extends TransactionManagement { import se.scalablesolutions.akka.config.Config._ // move to stm.global.fair? val FAIR_TRANSACTIONS = config.getBool("akka.stm.fair", true) private[akka] val transactionSet = new ThreadLocal[Option[CountDownCommitBarrier]]() { override protected def initialValue: Option[CountDownCommitBarrier] = None } private[akka] val transaction = new ThreadLocal[Option[Transaction]]() { override protected def initialValue: Option[Transaction] = None } private[akka] def getTransactionSet: CountDownCommitBarrier = { val option = transactionSet.get if ((option eq null) || option.isEmpty) throw new StmConfigurationException("No Transaction set in scope") else option.get } private[akka] def getTransaction: Transaction = { val option = transaction.get if ((option eq null) || option.isEmpty) throw new StmConfigurationException("No Transaction in scope") option.get } } trait TransactionManagement { private[akka] def createNewTransactionSet: CountDownCommitBarrier = { val txSet = new CountDownCommitBarrier(1, TransactionManagement.FAIR_TRANSACTIONS) TransactionManagement.transactionSet.set(Some(txSet)) txSet } private[akka] def setTransactionSet(txSet: Option[CountDownCommitBarrier]) = if (txSet.isDefined) TransactionManagement.transactionSet.set(txSet) private[akka] def setTransaction(tx: Option[Transaction]) = if (tx.isDefined) TransactionManagement.transaction.set(tx) private[akka] def clearTransactionSet = { TransactionManagement.transactionSet.set(None) } private[akka] def clearTransaction = { TransactionManagement.transaction.set(None) setThreadLocalTransaction(null) } private[akka] def getTransactionSetInScope = TransactionManagement.getTransactionSet private[akka] def getTransactionInScope = TransactionManagement.getTransaction private[akka] def isTransactionSetInScope = { val option = TransactionManagement.transactionSet.get (option ne null) && option.isDefined } private[akka] def isTransactionInScope = { val option = TransactionManagement.transaction.get (option ne null) && option.isDefined } } /** * Local transaction management, local in the context of threads. * Use this if you do not need to have one transaction span * multiple threads (or Actors). *

* Example of atomic transaction management using the atomic block. *

*

 * import se.scalablesolutions.akka.stm.local._
 *
 * atomic  {
 *   // do something within a transaction
 * }
 * 
*/ class LocalStm extends TransactionManagement with Logging { val DefaultLocalTransactionConfig = TransactionConfig() val DefaultLocalTransactionFactory = TransactionFactory(DefaultLocalTransactionConfig, "DefaultLocalTransaction") def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultLocalTransactionFactory): T = atomic(factory)(body) def atomic[T](factory: TransactionFactory)(body: => T): T = { factory.boilerplate.execute(new TransactionalCallable[T]() { def call(mtx: MultiverseTransaction): T = { factory.addHooks body } }) } } /** * Global transaction management, global in the context of multiple threads. * Use this if you need to have one transaction span multiple threads (or Actors). *

* Example of atomic transaction management using the atomic block: *

*

 * import se.scalablesolutions.akka.stm.global._
 *
 * atomic  {
 *   // do something within a transaction
 * }
 * 
*/ class GlobalStm extends TransactionManagement with Logging { val DefaultGlobalTransactionConfig = TransactionConfig() val DefaultGlobalTransactionFactory = TransactionFactory(DefaultGlobalTransactionConfig, "DefaultGlobalTransaction") def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultGlobalTransactionFactory): T = atomic(factory)(body) def atomic[T](factory: TransactionFactory)(body: => T): T = { factory.boilerplate.execute(new TransactionalCallable[T]() { def call(mtx: MultiverseTransaction): T = { if (!isTransactionSetInScope) createNewTransactionSet factory.addHooks val result = body val txSet = getTransactionSetInScope log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet) // FIXME ? txSet.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS) try { txSet.joinCommit(mtx) } catch { case e: IllegalStateException => {} } clearTransaction result } }) } } trait StmUtil { /** * Schedule a deferred task on the thread local transaction (use within an atomic). * This is executed when the transaction commits. */ def deferred[T](body: => T): Unit = MultiverseStmUtils.scheduleDeferredTask(new Runnable { def run = body }) /** * Schedule a compensating task on the thread local transaction (use within an atomic). * This is executed when the transaction aborts. */ def compensating[T](body: => T): Unit = MultiverseStmUtils.scheduleCompensatingTask(new Runnable { def run = body }) /** * STM retry for blocking transactions (use within an atomic). * Can be used to wait for a condition. */ def retry = MultiverseStmUtils.retry /** * Use either-orElse to combine two blocking transactions. */ def either[T](firstBody: => T) = new { def orElse(secondBody: => T) = new OrElseTemplate[T] { def either(mtx: MultiverseTransaction) = firstBody def orelse(mtx: MultiverseTransaction) = secondBody }.execute() } }