Added stm local and global package objects

This commit is contained in:
Peter Vlugter 2010-06-14 21:16:35 +12:00
parent ba0b5033c6
commit f8ca6b94c4
3 changed files with 90 additions and 59 deletions

View file

@ -16,11 +16,8 @@ import se.scalablesolutions.akka.config.Config._
import org.multiverse.api.{Transaction => MultiverseTransaction} import org.multiverse.api.{Transaction => MultiverseTransaction}
import org.multiverse.api.lifecycle.{TransactionLifecycleListener, TransactionLifecycleEvent} import org.multiverse.api.lifecycle.{TransactionLifecycleListener, TransactionLifecycleEvent}
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
import org.multiverse.api.ThreadLocalTransaction._ import org.multiverse.api.ThreadLocalTransaction._
import org.multiverse.templates.{TransactionalCallable, OrElseTemplate}
import org.multiverse.api.{TraceLevel => MultiverseTraceLevel} import org.multiverse.api.{TraceLevel => MultiverseTraceLevel}
import org.multiverse.api.StmUtils
class NoTransactionInScopeException extends RuntimeException class NoTransactionInScopeException extends RuntimeException
class TransactionRetryException(message: String) extends RuntimeException(message) class TransactionRetryException(message: String) extends RuntimeException(message)
@ -46,22 +43,8 @@ object Transaction {
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object Local extends TransactionManagement with Logging { @deprecated("Use the akka.stm.local package object instead.")
object Local extends LocalStm
object DefaultLocalTransactionConfig extends TransactionConfig
object DefaultLocalTransactionFactory extends TransactionFactory(DefaultLocalTransactionConfig, "DefaultLocalTransaction")
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultLocalTransactionFactory): T = atomic(factory)(body)
def atomic[T](factory: TransactionFactory)(body: => T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = {
factory.addHooks
body
}
})
}
}
/** /**
* Module for "global" transaction management, global in the context of multiple threads. * Module for "global" transaction management, global in the context of multiple threads.
@ -79,48 +62,13 @@ object Transaction {
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object Global extends TransactionManagement with Logging { @deprecated("Use the akka.stm.global package object instead.")
object Global extends GlobalStm
object DefaultGlobalTransactionConfig extends TransactionConfig
object DefaultGlobalTransactionFactory extends TransactionFactory(DefaultGlobalTransactionConfig, "DefaultGlobalTransaction")
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultGlobalTransactionFactory): T = atomic(factory)(body)
def atomic[T](factory: TransactionFactory)(body: => T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = {
if (!isTransactionSetInScope) createNewTransactionSet
factory.addHooks
val result = body
val txSet = getTransactionSetInScope
log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet)
// FIXME ? txSet.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
txSet.joinCommit(mtx)
clearTransaction
result
}
})
}
}
/** /**
* TODO: document * TODO: document
*/ */
object Util { object Util extends StmUtil
def deferred[T](body: => T): Unit = StmUtils.scheduleDeferredTask(new Runnable { def run = body })
def compensating[T](body: => T): Unit = StmUtils.scheduleCompensatingTask(new Runnable { def run = body })
def retry = StmUtils.retry
def either[T](firstBody: => T) = new {
def orElse(secondBody: => T) = new OrElseTemplate[T] {
def either(mtx: MultiverseTransaction) = firstBody
def orelse(mtx: MultiverseTransaction) = secondBody
}.execute()
}
}
/** /**
* Attach an Akka-specific Transaction to the current Multiverse transaction. * Attach an Akka-specific Transaction to the current Multiverse transaction.

View file

@ -8,8 +8,11 @@ import se.scalablesolutions.akka.util.Logging
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import org.multiverse.api.StmUtils
import org.multiverse.api.ThreadLocalTransaction._ import org.multiverse.api.ThreadLocalTransaction._
import org.multiverse.api.{Transaction => MultiverseTransaction}
import org.multiverse.commitbarriers.CountDownCommitBarrier import org.multiverse.commitbarriers.CountDownCommitBarrier
import org.multiverse.templates.{TransactionalCallable, OrElseTemplate}
class StmException(msg: String) extends RuntimeException(msg) class StmException(msg: String) extends RuntimeException(msg)
@ -20,7 +23,9 @@ class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Tran
object TransactionManagement extends TransactionManagement { object TransactionManagement extends TransactionManagement {
import se.scalablesolutions.akka.config.Config._ import se.scalablesolutions.akka.config.Config._
// is this needed?
val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", true)) val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", true))
// move to stm.global.fair?
val FAIR_TRANSACTIONS = config.getBool("akka.stm.fair", true) val FAIR_TRANSACTIONS = config.getBool("akka.stm.fair", true)
def isTransactionalityEnabled = TRANSACTION_ENABLED.get def isTransactionalityEnabled = TRANSACTION_ENABLED.get
@ -85,3 +90,72 @@ trait TransactionManagement {
(option ne null) && option.isDefined (option ne null) && option.isDefined
} }
} }
class LocalStm extends TransactionManagement with Logging {
val DefaultLocalTransactionConfig = TransactionConfig()
val DefaultLocalTransactionFactory = TransactionFactory(DefaultLocalTransactionConfig, "DefaultLocalTransaction")
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultLocalTransactionFactory): T = atomic(factory)(body)
def atomic[T](factory: TransactionFactory)(body: => T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = {
factory.addHooks
body
}
})
}
}
class GlobalStm extends TransactionManagement with Logging {
val DefaultGlobalTransactionConfig = TransactionConfig()
val DefaultGlobalTransactionFactory = TransactionFactory(DefaultGlobalTransactionConfig, "DefaultGlobalTransaction")
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultGlobalTransactionFactory): T = atomic(factory)(body)
def atomic[T](factory: TransactionFactory)(body: => T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = {
if (!isTransactionSetInScope) createNewTransactionSet
factory.addHooks
val result = body
val txSet = getTransactionSetInScope
log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet)
// FIXME ? txSet.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
txSet.joinCommit(mtx)
clearTransaction
result
}
})
}
}
trait StmUtil {
def deferred[T](body: => T): Unit = StmUtils.scheduleDeferredTask(new Runnable { def run = body })
def compensating[T](body: => T): Unit = StmUtils.scheduleCompensatingTask(new Runnable { def run = body })
def retry = StmUtils.retry
def either[T](firstBody: => T) = new {
def orElse(secondBody: => T) = new OrElseTemplate[T] {
def either(mtx: MultiverseTransaction) = firstBody
def orelse(mtx: MultiverseTransaction) = secondBody
}.execute()
}
}
trait StmCommon {
type TransactionConfig = se.scalablesolutions.akka.stm.TransactionConfig
val TransactionConfig = se.scalablesolutions.akka.stm.TransactionConfig
type TransactionFactory = se.scalablesolutions.akka.stm.TransactionFactory
val TransactionFactory = se.scalablesolutions.akka.stm.TransactionFactory
type Ref[T] = se.scalablesolutions.akka.stm.Ref[T]
val Ref = se.scalablesolutions.akka.stm.Ref
}

View file

@ -0,0 +1,9 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
package object local extends LocalStm with StmUtil with StmCommon
package object global extends GlobalStm with StmUtil with StmCommon