2009-06-29 15:01:20 +02:00
|
|
|
/**
|
2009-12-27 16:01:53 +01:00
|
|
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
2009-06-29 15:01:20 +02:00
|
|
|
*/
|
|
|
|
|
|
2009-09-02 09:10:21 +02:00
|
|
|
package se.scalablesolutions.akka.stm
|
2009-06-29 15:01:20 +02:00
|
|
|
|
2010-04-05 11:53:43 +02:00
|
|
|
import se.scalablesolutions.akka.util.Logging
|
|
|
|
|
|
2009-06-29 15:01:20 +02:00
|
|
|
import java.util.concurrent.atomic.AtomicBoolean
|
2010-07-13 12:37:11 +02:00
|
|
|
import java.util.concurrent.TimeUnit
|
2009-06-29 15:01:20 +02:00
|
|
|
|
2010-06-18 17:13:10 +12:00
|
|
|
import org.multiverse.api.{StmUtils => MultiverseStmUtils}
|
2009-11-25 20:37:00 +01:00
|
|
|
import org.multiverse.api.ThreadLocalTransaction._
|
2010-06-14 21:16:35 +12:00
|
|
|
import org.multiverse.api.{Transaction => MultiverseTransaction}
|
2010-02-23 19:49:01 +01:00
|
|
|
import org.multiverse.commitbarriers.CountDownCommitBarrier
|
2010-06-14 21:16:35 +12:00
|
|
|
import org.multiverse.templates.{TransactionalCallable, OrElseTemplate}
|
2009-08-15 22:44:29 +02:00
|
|
|
|
2010-07-13 12:37:11 +02:00
|
|
|
class TransactionSetAbortedException(msg: String) extends RuntimeException(msg)
|
2009-06-29 15:01:20 +02:00
|
|
|
|
2010-07-13 12:37:11 +02:00
|
|
|
// TODO Should we remove TransactionAwareWrapperException? Not used anywhere yet.
|
2010-02-23 19:49:01 +01:00
|
|
|
class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
|
|
|
|
|
override def toString = "TransactionAwareWrapperException[" + cause + ", " + tx + "]"
|
2009-06-29 15:01:20 +02:00
|
|
|
}
|
|
|
|
|
|
2010-07-13 12:37:11 +02:00
|
|
|
/**
|
|
|
|
|
* Internal helper methods and properties for transaction management.
|
|
|
|
|
*/
|
2009-10-17 00:37:56 +02:00
|
|
|
object TransactionManagement extends TransactionManagement {
|
2010-03-10 22:38:52 +01:00
|
|
|
import se.scalablesolutions.akka.config.Config._
|
2009-11-30 10:11:52 +01:00
|
|
|
|
2010-07-13 12:37:11 +02:00
|
|
|
// FIXME move to stm.global.fair?
|
2010-06-14 21:16:35 +12:00
|
|
|
val FAIR_TRANSACTIONS = config.getBool("akka.stm.fair", true)
|
2009-06-29 15:01:20 +02:00
|
|
|
|
2010-02-23 19:49:01 +01:00
|
|
|
private[akka] val transactionSet = new ThreadLocal[Option[CountDownCommitBarrier]]() {
|
|
|
|
|
override protected def initialValue: Option[CountDownCommitBarrier] = None
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private[akka] val transaction = new ThreadLocal[Option[Transaction]]() {
|
2009-07-01 15:29:06 +02:00
|
|
|
override protected def initialValue: Option[Transaction] = None
|
2009-06-29 15:01:20 +02:00
|
|
|
}
|
2010-02-23 19:49:01 +01:00
|
|
|
|
|
|
|
|
private[akka] def getTransactionSet: CountDownCommitBarrier = {
|
|
|
|
|
val option = transactionSet.get
|
2010-04-22 09:53:45 +02:00
|
|
|
if ((option eq null) || option.isEmpty) throw new StmConfigurationException("No Transaction set in scope")
|
2010-03-20 09:49:59 +01:00
|
|
|
else option.get
|
2010-02-23 19:49:01 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private[akka] def getTransaction: Transaction = {
|
|
|
|
|
val option = transaction.get
|
2010-04-22 09:53:45 +02:00
|
|
|
if ((option eq null) || option.isEmpty) throw new StmConfigurationException("No Transaction in scope")
|
2010-02-23 19:49:01 +01:00
|
|
|
option.get
|
|
|
|
|
}
|
2009-06-29 15:01:20 +02:00
|
|
|
}
|
|
|
|
|
|
2010-07-13 12:37:11 +02:00
|
|
|
/**
|
|
|
|
|
* Internal helper methods for transaction management.
|
|
|
|
|
*/
|
2010-04-06 12:45:09 +02:00
|
|
|
trait TransactionManagement {
|
2009-06-29 15:01:20 +02:00
|
|
|
|
2010-02-23 19:49:01 +01:00
|
|
|
private[akka] def createNewTransactionSet: CountDownCommitBarrier = {
|
|
|
|
|
val txSet = new CountDownCommitBarrier(1, TransactionManagement.FAIR_TRANSACTIONS)
|
|
|
|
|
TransactionManagement.transactionSet.set(Some(txSet))
|
|
|
|
|
txSet
|
2009-10-08 19:01:04 +02:00
|
|
|
}
|
|
|
|
|
|
2010-02-23 19:49:01 +01:00
|
|
|
private[akka] def setTransactionSet(txSet: Option[CountDownCommitBarrier]) =
|
|
|
|
|
if (txSet.isDefined) TransactionManagement.transactionSet.set(txSet)
|
|
|
|
|
|
|
|
|
|
private[akka] def setTransaction(tx: Option[Transaction]) =
|
|
|
|
|
if (tx.isDefined) TransactionManagement.transaction.set(tx)
|
|
|
|
|
|
2010-04-05 11:53:43 +02:00
|
|
|
private[akka] def clearTransactionSet = {
|
|
|
|
|
TransactionManagement.transactionSet.set(None)
|
|
|
|
|
}
|
2010-02-23 19:49:01 +01:00
|
|
|
|
2009-10-17 00:37:56 +02:00
|
|
|
private[akka] def clearTransaction = {
|
2010-02-23 19:49:01 +01:00
|
|
|
TransactionManagement.transaction.set(None)
|
2009-10-08 19:01:04 +02:00
|
|
|
setThreadLocalTransaction(null)
|
2009-07-04 12:06:07 +02:00
|
|
|
}
|
|
|
|
|
|
2010-02-23 19:49:01 +01:00
|
|
|
private[akka] def getTransactionSetInScope = TransactionManagement.getTransactionSet
|
2009-06-29 15:01:20 +02:00
|
|
|
|
2010-02-23 19:49:01 +01:00
|
|
|
private[akka] def getTransactionInScope = TransactionManagement.getTransaction
|
2010-02-22 13:22:10 +01:00
|
|
|
|
2010-02-23 19:49:01 +01:00
|
|
|
private[akka] def isTransactionSetInScope = {
|
|
|
|
|
val option = TransactionManagement.transactionSet.get
|
|
|
|
|
(option ne null) && option.isDefined
|
|
|
|
|
}
|
2009-06-29 15:01:20 +02:00
|
|
|
|
2010-02-23 19:49:01 +01:00
|
|
|
private[akka] def isTransactionInScope = {
|
|
|
|
|
val option = TransactionManagement.transaction.get
|
|
|
|
|
(option ne null) && option.isDefined
|
|
|
|
|
}
|
2010-04-06 12:45:09 +02:00
|
|
|
}
|
2010-06-14 21:16:35 +12:00
|
|
|
|
2010-06-18 17:13:10 +12:00
|
|
|
/**
|
|
|
|
|
* Local transaction management, local in the context of threads.
|
|
|
|
|
* Use this if you do <b>not</b> need to have one transaction span
|
|
|
|
|
* multiple threads (or Actors).
|
|
|
|
|
* <p/>
|
|
|
|
|
* Example of atomic transaction management using the atomic block.
|
|
|
|
|
* <p/>
|
|
|
|
|
* <pre>
|
|
|
|
|
* import se.scalablesolutions.akka.stm.local._
|
|
|
|
|
*
|
|
|
|
|
* atomic {
|
|
|
|
|
* // do something within a transaction
|
|
|
|
|
* }
|
|
|
|
|
* </pre>
|
|
|
|
|
*/
|
2010-06-14 21:16:35 +12:00
|
|
|
class LocalStm extends TransactionManagement with Logging {
|
|
|
|
|
|
|
|
|
|
val DefaultLocalTransactionConfig = TransactionConfig()
|
|
|
|
|
val DefaultLocalTransactionFactory = TransactionFactory(DefaultLocalTransactionConfig, "DefaultLocalTransaction")
|
|
|
|
|
|
|
|
|
|
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultLocalTransactionFactory): T = atomic(factory)(body)
|
|
|
|
|
|
|
|
|
|
def atomic[T](factory: TransactionFactory)(body: => T): T = {
|
|
|
|
|
factory.boilerplate.execute(new TransactionalCallable[T]() {
|
|
|
|
|
def call(mtx: MultiverseTransaction): T = {
|
|
|
|
|
factory.addHooks
|
2010-07-13 12:37:11 +02:00
|
|
|
val result = body
|
|
|
|
|
log.ifTrace("Committing local transaction [" + mtx + "]")
|
|
|
|
|
result
|
2010-06-14 21:16:35 +12:00
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2010-06-18 17:13:10 +12:00
|
|
|
/**
|
|
|
|
|
* Global transaction management, global in the context of multiple threads.
|
2010-06-20 16:40:18 +12:00
|
|
|
* Use this if you need to have one transaction span multiple threads (or Actors).
|
2010-06-18 17:13:10 +12:00
|
|
|
* <p/>
|
|
|
|
|
* Example of atomic transaction management using the atomic block:
|
|
|
|
|
* <p/>
|
|
|
|
|
* <pre>
|
|
|
|
|
* import se.scalablesolutions.akka.stm.global._
|
|
|
|
|
*
|
|
|
|
|
* atomic {
|
|
|
|
|
* // do something within a transaction
|
|
|
|
|
* }
|
|
|
|
|
* </pre>
|
|
|
|
|
*/
|
2010-06-14 21:16:35 +12:00
|
|
|
class GlobalStm extends TransactionManagement with Logging {
|
|
|
|
|
|
|
|
|
|
val DefaultGlobalTransactionConfig = TransactionConfig()
|
|
|
|
|
val DefaultGlobalTransactionFactory = TransactionFactory(DefaultGlobalTransactionConfig, "DefaultGlobalTransaction")
|
|
|
|
|
|
|
|
|
|
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultGlobalTransactionFactory): T = atomic(factory)(body)
|
|
|
|
|
|
|
|
|
|
def atomic[T](factory: TransactionFactory)(body: => T): T = {
|
|
|
|
|
factory.boilerplate.execute(new TransactionalCallable[T]() {
|
|
|
|
|
def call(mtx: MultiverseTransaction): T = {
|
|
|
|
|
if (!isTransactionSetInScope) createNewTransactionSet
|
|
|
|
|
factory.addHooks
|
|
|
|
|
val result = body
|
|
|
|
|
val txSet = getTransactionSetInScope
|
2010-07-13 12:37:11 +02:00
|
|
|
log.ifTrace("Committing global transaction [" + mtx + "]\n\tand joining transaction set [" + txSet + "]")
|
|
|
|
|
// Need to catch IllegalStateException until we have fix in Multiverse, since it throws it by mistake
|
|
|
|
|
try { txSet.tryJoinCommit(mtx, TransactionConfig.TIMEOUT, TimeUnit.MILLISECONDS) } catch { case e: IllegalStateException => {} }
|
2010-06-14 21:16:35 +12:00
|
|
|
result
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
trait StmUtil {
|
2010-07-13 12:37:11 +02:00
|
|
|
|
2010-06-18 17:13:10 +12:00
|
|
|
/**
|
|
|
|
|
* Schedule a deferred task on the thread local transaction (use within an atomic).
|
|
|
|
|
* This is executed when the transaction commits.
|
|
|
|
|
*/
|
2010-07-02 11:14:49 +02:00
|
|
|
def deferred[T](body: => T): Unit =
|
|
|
|
|
MultiverseStmUtils.scheduleDeferredTask(new Runnable { def run = body })
|
2010-06-18 17:13:10 +12:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Schedule a compensating task on the thread local transaction (use within an atomic).
|
|
|
|
|
* This is executed when the transaction aborts.
|
|
|
|
|
*/
|
2010-07-02 11:14:49 +02:00
|
|
|
def compensating[T](body: => T): Unit =
|
|
|
|
|
MultiverseStmUtils.scheduleCompensatingTask(new Runnable { def run = body })
|
2010-06-18 17:13:10 +12:00
|
|
|
|
|
|
|
|
/**
|
2010-06-20 16:40:18 +12:00
|
|
|
* STM retry for blocking transactions (use within an atomic).
|
|
|
|
|
* Can be used to wait for a condition.
|
2010-06-18 17:13:10 +12:00
|
|
|
*/
|
|
|
|
|
def retry = MultiverseStmUtils.retry
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Use either-orElse to combine two blocking transactions.
|
2010-07-13 12:37:11 +02:00
|
|
|
* Usage:
|
|
|
|
|
* <pre>
|
|
|
|
|
* either {
|
|
|
|
|
* ...
|
|
|
|
|
* } orElse {
|
|
|
|
|
* ...
|
|
|
|
|
* }
|
|
|
|
|
* </pre>
|
2010-06-18 17:13:10 +12:00
|
|
|
*/
|
2010-06-14 21:16:35 +12:00
|
|
|
def either[T](firstBody: => T) = new {
|
|
|
|
|
def orElse(secondBody: => T) = new OrElseTemplate[T] {
|
|
|
|
|
def either(mtx: MultiverseTransaction) = firstBody
|
|
|
|
|
def orelse(mtx: MultiverseTransaction) = secondBody
|
|
|
|
|
}.execute()
|
|
|
|
|
}
|
|
|
|
|
}
|