Configurable TransactionFactory

This commit is contained in:
Peter Vlugter 2010-06-10 15:49:26 +12:00
parent 9c026ad27d
commit ef4f525b52
4 changed files with 165 additions and 26 deletions

View file

@ -6,7 +6,6 @@ package se.scalablesolutions.akka.stm
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.TimeUnit
import javax.transaction.{TransactionManager, UserTransaction, Status, TransactionSynchronizationRegistry}
@ -19,9 +18,8 @@ import org.multiverse.api.{Transaction => MultiverseTransaction}
import org.multiverse.api.lifecycle.{TransactionLifecycleListener, TransactionLifecycleEvent}
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
import org.multiverse.api.ThreadLocalTransaction._
import org.multiverse.templates.{TransactionTemplate, OrElseTemplate}
import org.multiverse.api.backoff.ExponentialBackoffPolicy
import org.multiverse.stms.alpha.AlphaStm
import org.multiverse.templates.{TransactionalCallable, OrElseTemplate}
import org.multiverse.api.{TraceLevel => MultiverseTraceLevel}
import org.multiverse.api.StmUtils
class NoTransactionInScopeException extends RuntimeException
@ -50,16 +48,18 @@ object Transaction {
*/
object Local extends TransactionManagement with Logging {
/**
* See ScalaDoc on Transaction.Local class.
*/
def atomic[T](body: => T): T = {
new TransactionTemplate[T]() {
def execute(mtx: MultiverseTransaction): T = {
Transaction.attach
object DefaultLocalTransactionConfig extends TransactionConfig
object DefaultLocalTransactionFactory extends TransactionFactory(DefaultLocalTransactionConfig, "DefaultLocalTransaction")
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultLocalTransactionFactory): T = atomic(factory)(body)
def atomic[T](factory: TransactionFactory)(body: => T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = {
factory.addHooks
body
}
}.execute()
})
}
}
@ -68,8 +68,7 @@ object Transaction {
* You have to use these if you do need to have one transaction span multiple threads (or Actors).
* <p/>
* Example of atomic transaction management using the atomic block.
* <p/>
* Here are some examples (assuming implicit transaction family name in scope):
* <p/>:
* <pre>
* import se.scalablesolutions.akka.stm.Transaction.Global._
*
@ -82,15 +81,16 @@ object Transaction {
*/
object Global extends TransactionManagement with Logging {
/**
* See ScalaDoc on Transaction.Global class.
*/
def atomic[T](body: => T): T = {
var isTopLevelTransaction = false
new TransactionTemplate[T]() {
def execute(mtx: MultiverseTransaction): T = {
object DefaultGlobalTransactionConfig extends TransactionConfig
object DefaultGlobalTransactionFactory extends TransactionFactory(DefaultGlobalTransactionConfig, "DefaultGlobalTransaction")
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultGlobalTransactionFactory): T = atomic(factory)(body)
def atomic[T](factory: TransactionFactory)(body: => T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = {
if (!isTransactionSetInScope) createNewTransactionSet
Transaction.attach
factory.addHooks
val result = body
val txSet = getTransactionSetInScope
log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet)
@ -99,7 +99,7 @@ object Transaction {
clearTransaction
result
}
}.execute()
})
}
}
@ -124,7 +124,7 @@ object Transaction {
/**
* Attach an Akka-specific Transaction to the current Multiverse transaction.
* Must be called within a Multiverse transaction.
* Must be called within a Multiverse transaction. Used by TransactionFactory.addHooks
*/
private[akka] def attach = {
val mtx = getRequiredThreadLocalTransaction
@ -140,6 +140,16 @@ object Transaction {
}
})
}
/**
* Mapping to Multiverse TraceLevel.
*/
object TraceLevel {
val None = MultiverseTraceLevel.none
val Coarse = MultiverseTraceLevel.course // mispelling?
val Course = MultiverseTraceLevel.course
val Fine = MultiverseTraceLevel.fine
}
}
/**
@ -278,3 +288,4 @@ trait Committable {
trait Abortable {
def abort: Unit
}

View file

@ -0,0 +1,128 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.util.Duration
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
import org.multiverse.stms.alpha.AlphaStm
import org.multiverse.templates.TransactionBoilerplate
import org.multiverse.api.TraceLevel
/**
* For configuring multiverse transactions.
*/
object TransactionConfig {
val FAMILY_NAME = "DefaultTransaction"
val READONLY = false
val MAX_RETRIES = config.getInt("akka.stm.max-retries", 1000)
val TIMEOUT = config.getLong("akka.stm.timeout", Long.MaxValue)
val TIME_UNIT = config.getString("akka.stm.time-unit", "seconds")
val TRACK_READS = config.getBool("akka.stm.track-reads", false)
val WRITE_SKEW = config.getBool("akka.stm.write-skew", true)
val EXPLICIT_RETRIES = config.getBool("akka.stm.explicit-retries", false)
val INTERRUPTIBLE = config.getBool("akka.stm.interruptible", false)
val SPECULATIVE = config.getBool("akka.stm.speculative", false)
val QUICK_RELEASE = config.getBool("akka.stm.quick-release", true)
val TRACE_LEVEL = traceLevel(config.getString("akka.stm.trace-level", "none"))
val HOOKS = config.getBool("akka.stm.hooks", true)
val DefaultTimeout = Duration(TIMEOUT, TIME_UNIT)
def traceLevel(level: String) = level.toLowerCase match {
case "coarse" | "course" => Transaction.TraceLevel.Coarse
case "fine" => Transaction.TraceLevel.Fine
case _ => Transaction.TraceLevel.None
}
def apply(familyName: String = FAMILY_NAME,
readonly: Boolean = READONLY,
maxRetries: Int = MAX_RETRIES,
timeout: Duration = DefaultTimeout,
trackReads: Boolean = TRACK_READS,
writeSkew: Boolean = WRITE_SKEW,
explicitRetries: Boolean = EXPLICIT_RETRIES,
interruptible: Boolean = INTERRUPTIBLE,
speculative: Boolean = SPECULATIVE,
quickRelease: Boolean = QUICK_RELEASE,
traceLevel: TraceLevel = TRACE_LEVEL,
hooks: Boolean = HOOKS) = {
new TransactionConfig(familyName, readonly, maxRetries, timeout, trackReads, writeSkew,
explicitRetries, interruptible, speculative, quickRelease, traceLevel, hooks)
}
}
/**
* For configuring multiverse transactions.
*/
class TransactionConfig(val familyName: String = TransactionConfig.FAMILY_NAME,
val readonly: Boolean = TransactionConfig.READONLY,
val maxRetries: Int = TransactionConfig.MAX_RETRIES,
val timeout: Duration = TransactionConfig.DefaultTimeout,
val trackReads: Boolean = TransactionConfig.TRACK_READS,
val writeSkew: Boolean = TransactionConfig.WRITE_SKEW,
val explicitRetries: Boolean = TransactionConfig.EXPLICIT_RETRIES,
val interruptible: Boolean = TransactionConfig.INTERRUPTIBLE,
val speculative: Boolean = TransactionConfig.SPECULATIVE,
val quickRelease: Boolean = TransactionConfig.QUICK_RELEASE,
val traceLevel: TraceLevel = TransactionConfig.TRACE_LEVEL,
val hooks: Boolean = TransactionConfig.HOOKS)
object DefaultTransactionConfig extends TransactionConfig
/**
* Wrapper for transaction config, factory, and boilerplate. Used by atomic.
*/
object TransactionFactory {
def apply(config: TransactionConfig) = new TransactionFactory(config)
def apply(config: TransactionConfig, defaultName: String) = new TransactionFactory(config, defaultName)
def apply(familyName: String = TransactionConfig.FAMILY_NAME,
readonly: Boolean = TransactionConfig.READONLY,
maxRetries: Int = TransactionConfig.MAX_RETRIES,
timeout: Duration = TransactionConfig.DefaultTimeout,
trackReads: Boolean = TransactionConfig.TRACK_READS,
writeSkew: Boolean = TransactionConfig.WRITE_SKEW,
explicitRetries: Boolean = TransactionConfig.EXPLICIT_RETRIES,
interruptible: Boolean = TransactionConfig.INTERRUPTIBLE,
speculative: Boolean = TransactionConfig.SPECULATIVE,
quickRelease: Boolean = TransactionConfig.QUICK_RELEASE,
traceLevel: TraceLevel = TransactionConfig.TRACE_LEVEL,
hooks: Boolean = TransactionConfig.HOOKS) = {
val config = new TransactionConfig(familyName, readonly, maxRetries, timeout, trackReads, writeSkew,
explicitRetries, interruptible, speculative, quickRelease, traceLevel, hooks)
new TransactionFactory(config)
}
}
/**
* Wrapper for transaction config, factory, and boilerplate. Used by atomic.
*/
class TransactionFactory(val config: TransactionConfig = DefaultTransactionConfig, defaultName: String = TransactionConfig.FAMILY_NAME) {
self =>
// use the config family name if it's been set, otherwise defaultName - used by actors to set class name as default
val familyName = if (config.familyName != TransactionConfig.FAMILY_NAME) config.familyName else defaultName
val factory = (getGlobalStmInstance().asInstanceOf[AlphaStm].getTransactionFactoryBuilder()
.setFamilyName(familyName)
.setReadonly(config.readonly)
.setMaxRetries(config.maxRetries)
.setTimeoutNs(config.timeout.toNanos)
.setReadTrackingEnabled(config.trackReads)
.setWriteSkewAllowed(config.writeSkew)
.setExplicitRetryAllowed(config.explicitRetries)
.setInterruptible(config.interruptible)
.setSpeculativeConfigurationEnabled(config.speculative)
.setQuickReleaseEnabled(config.quickRelease)
.setTraceLevel(config.traceLevel)
.build())
val boilerplate = new TransactionBoilerplate(factory)
def addHooks = if (config.hooks) Transaction.attach
}

View file

@ -29,7 +29,7 @@ class RefSpec extends Spec with ShouldMatchers {
val ref = Ref(3)
try {
atomic {
atomic(DefaultLocalTransactionFactory) {
ref.swap(5)
throw new Exception
}

View file

@ -70,7 +70,7 @@ class StmSpec extends
ref.get.getOrElse(0)
}
try {
atomic {
atomic(DefaultLocalTransactionFactory) {
increment
increment
throw new Exception