diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala index 6f992d0aee..5194098799 100644 --- a/akka-core/src/main/scala/stm/Transaction.scala +++ b/akka-core/src/main/scala/stm/Transaction.scala @@ -6,7 +6,6 @@ package se.scalablesolutions.akka.stm import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.TimeUnit import javax.transaction.{TransactionManager, UserTransaction, Status, TransactionSynchronizationRegistry} @@ -19,9 +18,8 @@ import org.multiverse.api.{Transaction => MultiverseTransaction} import org.multiverse.api.lifecycle.{TransactionLifecycleListener, TransactionLifecycleEvent} import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance import org.multiverse.api.ThreadLocalTransaction._ -import org.multiverse.templates.{TransactionTemplate, OrElseTemplate} -import org.multiverse.api.backoff.ExponentialBackoffPolicy -import org.multiverse.stms.alpha.AlphaStm +import org.multiverse.templates.{TransactionalCallable, OrElseTemplate} +import org.multiverse.api.{TraceLevel => MultiverseTraceLevel} import org.multiverse.api.StmUtils class NoTransactionInScopeException extends RuntimeException @@ -50,16 +48,18 @@ object Transaction { */ object Local extends TransactionManagement with Logging { - /** - * See ScalaDoc on Transaction.Local class. - */ - def atomic[T](body: => T): T = { - new TransactionTemplate[T]() { - def execute(mtx: MultiverseTransaction): T = { - Transaction.attach + object DefaultLocalTransactionConfig extends TransactionConfig + object DefaultLocalTransactionFactory extends TransactionFactory(DefaultLocalTransactionConfig, "DefaultLocalTransaction") + + def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultLocalTransactionFactory): T = atomic(factory)(body) + + def atomic[T](factory: TransactionFactory)(body: => T): T = { + factory.boilerplate.execute(new TransactionalCallable[T]() { + def call(mtx: MultiverseTransaction): T = { + factory.addHooks body } - }.execute() + }) } } @@ -68,8 +68,7 @@ object Transaction { * You have to use these if you do need to have one transaction span multiple threads (or Actors). *
* Example of atomic transaction management using the atomic block. - * - * Here are some examples (assuming implicit transaction family name in scope): + * : *
* import se.scalablesolutions.akka.stm.Transaction.Global._
*
@@ -82,15 +81,16 @@ object Transaction {
*/
object Global extends TransactionManagement with Logging {
- /**
- * See ScalaDoc on Transaction.Global class.
- */
- def atomic[T](body: => T): T = {
- var isTopLevelTransaction = false
- new TransactionTemplate[T]() {
- def execute(mtx: MultiverseTransaction): T = {
+ object DefaultGlobalTransactionConfig extends TransactionConfig
+ object DefaultGlobalTransactionFactory extends TransactionFactory(DefaultGlobalTransactionConfig, "DefaultGlobalTransaction")
+
+ def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultGlobalTransactionFactory): T = atomic(factory)(body)
+
+ def atomic[T](factory: TransactionFactory)(body: => T): T = {
+ factory.boilerplate.execute(new TransactionalCallable[T]() {
+ def call(mtx: MultiverseTransaction): T = {
if (!isTransactionSetInScope) createNewTransactionSet
- Transaction.attach
+ factory.addHooks
val result = body
val txSet = getTransactionSetInScope
log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet)
@@ -99,7 +99,7 @@ object Transaction {
clearTransaction
result
}
- }.execute()
+ })
}
}
@@ -124,7 +124,7 @@ object Transaction {
/**
* Attach an Akka-specific Transaction to the current Multiverse transaction.
- * Must be called within a Multiverse transaction.
+ * Must be called within a Multiverse transaction. Used by TransactionFactory.addHooks
*/
private[akka] def attach = {
val mtx = getRequiredThreadLocalTransaction
@@ -140,6 +140,16 @@ object Transaction {
}
})
}
+
+ /**
+ * Mapping to Multiverse TraceLevel.
+ */
+ object TraceLevel {
+ val None = MultiverseTraceLevel.none
+ val Coarse = MultiverseTraceLevel.course // mispelling?
+ val Course = MultiverseTraceLevel.course
+ val Fine = MultiverseTraceLevel.fine
+ }
}
/**
@@ -278,3 +288,4 @@ trait Committable {
trait Abortable {
def abort: Unit
}
+
diff --git a/akka-core/src/main/scala/stm/TransactionFactory.scala b/akka-core/src/main/scala/stm/TransactionFactory.scala
new file mode 100644
index 0000000000..9147e3ce36
--- /dev/null
+++ b/akka-core/src/main/scala/stm/TransactionFactory.scala
@@ -0,0 +1,128 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ */
+
+package se.scalablesolutions.akka.stm
+
+import se.scalablesolutions.akka.config.Config._
+import se.scalablesolutions.akka.util.Duration
+
+import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
+import org.multiverse.stms.alpha.AlphaStm
+import org.multiverse.templates.TransactionBoilerplate
+import org.multiverse.api.TraceLevel
+
+/**
+ * For configuring multiverse transactions.
+ */
+object TransactionConfig {
+ val FAMILY_NAME = "DefaultTransaction"
+ val READONLY = false
+ val MAX_RETRIES = config.getInt("akka.stm.max-retries", 1000)
+ val TIMEOUT = config.getLong("akka.stm.timeout", Long.MaxValue)
+ val TIME_UNIT = config.getString("akka.stm.time-unit", "seconds")
+ val TRACK_READS = config.getBool("akka.stm.track-reads", false)
+ val WRITE_SKEW = config.getBool("akka.stm.write-skew", true)
+ val EXPLICIT_RETRIES = config.getBool("akka.stm.explicit-retries", false)
+ val INTERRUPTIBLE = config.getBool("akka.stm.interruptible", false)
+ val SPECULATIVE = config.getBool("akka.stm.speculative", false)
+ val QUICK_RELEASE = config.getBool("akka.stm.quick-release", true)
+ val TRACE_LEVEL = traceLevel(config.getString("akka.stm.trace-level", "none"))
+ val HOOKS = config.getBool("akka.stm.hooks", true)
+
+ val DefaultTimeout = Duration(TIMEOUT, TIME_UNIT)
+
+ def traceLevel(level: String) = level.toLowerCase match {
+ case "coarse" | "course" => Transaction.TraceLevel.Coarse
+ case "fine" => Transaction.TraceLevel.Fine
+ case _ => Transaction.TraceLevel.None
+ }
+
+ def apply(familyName: String = FAMILY_NAME,
+ readonly: Boolean = READONLY,
+ maxRetries: Int = MAX_RETRIES,
+ timeout: Duration = DefaultTimeout,
+ trackReads: Boolean = TRACK_READS,
+ writeSkew: Boolean = WRITE_SKEW,
+ explicitRetries: Boolean = EXPLICIT_RETRIES,
+ interruptible: Boolean = INTERRUPTIBLE,
+ speculative: Boolean = SPECULATIVE,
+ quickRelease: Boolean = QUICK_RELEASE,
+ traceLevel: TraceLevel = TRACE_LEVEL,
+ hooks: Boolean = HOOKS) = {
+ new TransactionConfig(familyName, readonly, maxRetries, timeout, trackReads, writeSkew,
+ explicitRetries, interruptible, speculative, quickRelease, traceLevel, hooks)
+ }
+}
+
+/**
+ * For configuring multiverse transactions.
+ */
+class TransactionConfig(val familyName: String = TransactionConfig.FAMILY_NAME,
+ val readonly: Boolean = TransactionConfig.READONLY,
+ val maxRetries: Int = TransactionConfig.MAX_RETRIES,
+ val timeout: Duration = TransactionConfig.DefaultTimeout,
+ val trackReads: Boolean = TransactionConfig.TRACK_READS,
+ val writeSkew: Boolean = TransactionConfig.WRITE_SKEW,
+ val explicitRetries: Boolean = TransactionConfig.EXPLICIT_RETRIES,
+ val interruptible: Boolean = TransactionConfig.INTERRUPTIBLE,
+ val speculative: Boolean = TransactionConfig.SPECULATIVE,
+ val quickRelease: Boolean = TransactionConfig.QUICK_RELEASE,
+ val traceLevel: TraceLevel = TransactionConfig.TRACE_LEVEL,
+ val hooks: Boolean = TransactionConfig.HOOKS)
+
+object DefaultTransactionConfig extends TransactionConfig
+
+/**
+ * Wrapper for transaction config, factory, and boilerplate. Used by atomic.
+ */
+object TransactionFactory {
+ def apply(config: TransactionConfig) = new TransactionFactory(config)
+
+ def apply(config: TransactionConfig, defaultName: String) = new TransactionFactory(config, defaultName)
+
+ def apply(familyName: String = TransactionConfig.FAMILY_NAME,
+ readonly: Boolean = TransactionConfig.READONLY,
+ maxRetries: Int = TransactionConfig.MAX_RETRIES,
+ timeout: Duration = TransactionConfig.DefaultTimeout,
+ trackReads: Boolean = TransactionConfig.TRACK_READS,
+ writeSkew: Boolean = TransactionConfig.WRITE_SKEW,
+ explicitRetries: Boolean = TransactionConfig.EXPLICIT_RETRIES,
+ interruptible: Boolean = TransactionConfig.INTERRUPTIBLE,
+ speculative: Boolean = TransactionConfig.SPECULATIVE,
+ quickRelease: Boolean = TransactionConfig.QUICK_RELEASE,
+ traceLevel: TraceLevel = TransactionConfig.TRACE_LEVEL,
+ hooks: Boolean = TransactionConfig.HOOKS) = {
+ val config = new TransactionConfig(familyName, readonly, maxRetries, timeout, trackReads, writeSkew,
+ explicitRetries, interruptible, speculative, quickRelease, traceLevel, hooks)
+ new TransactionFactory(config)
+ }
+}
+
+/**
+ * Wrapper for transaction config, factory, and boilerplate. Used by atomic.
+ */
+class TransactionFactory(val config: TransactionConfig = DefaultTransactionConfig, defaultName: String = TransactionConfig.FAMILY_NAME) {
+ self =>
+
+ // use the config family name if it's been set, otherwise defaultName - used by actors to set class name as default
+ val familyName = if (config.familyName != TransactionConfig.FAMILY_NAME) config.familyName else defaultName
+
+ val factory = (getGlobalStmInstance().asInstanceOf[AlphaStm].getTransactionFactoryBuilder()
+ .setFamilyName(familyName)
+ .setReadonly(config.readonly)
+ .setMaxRetries(config.maxRetries)
+ .setTimeoutNs(config.timeout.toNanos)
+ .setReadTrackingEnabled(config.trackReads)
+ .setWriteSkewAllowed(config.writeSkew)
+ .setExplicitRetryAllowed(config.explicitRetries)
+ .setInterruptible(config.interruptible)
+ .setSpeculativeConfigurationEnabled(config.speculative)
+ .setQuickReleaseEnabled(config.quickRelease)
+ .setTraceLevel(config.traceLevel)
+ .build())
+
+ val boilerplate = new TransactionBoilerplate(factory)
+
+ def addHooks = if (config.hooks) Transaction.attach
+}
diff --git a/akka-core/src/test/scala/RefSpec.scala b/akka-core/src/test/scala/RefSpec.scala
index f30f008619..3b1a1fe3ae 100644
--- a/akka-core/src/test/scala/RefSpec.scala
+++ b/akka-core/src/test/scala/RefSpec.scala
@@ -29,7 +29,7 @@ class RefSpec extends Spec with ShouldMatchers {
val ref = Ref(3)
try {
- atomic {
+ atomic(DefaultLocalTransactionFactory) {
ref.swap(5)
throw new Exception
}
diff --git a/akka-core/src/test/scala/StmSpec.scala b/akka-core/src/test/scala/StmSpec.scala
index fb560f2ac4..f08116f4af 100644
--- a/akka-core/src/test/scala/StmSpec.scala
+++ b/akka-core/src/test/scala/StmSpec.scala
@@ -70,7 +70,7 @@ class StmSpec extends
ref.get.getOrElse(0)
}
try {
- atomic {
+ atomic(DefaultLocalTransactionFactory) {
increment
increment
throw new Exception