From ef4f525b52033304dda02c98846e2d9e590c0d77 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Thu, 10 Jun 2010 15:49:26 +1200 Subject: [PATCH] Configurable TransactionFactory --- .../src/main/scala/stm/Transaction.scala | 59 ++++---- .../main/scala/stm/TransactionFactory.scala | 128 ++++++++++++++++++ akka-core/src/test/scala/RefSpec.scala | 2 +- akka-core/src/test/scala/StmSpec.scala | 2 +- 4 files changed, 165 insertions(+), 26 deletions(-) create mode 100644 akka-core/src/main/scala/stm/TransactionFactory.scala diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala index 6f992d0aee..5194098799 100644 --- a/akka-core/src/main/scala/stm/Transaction.scala +++ b/akka-core/src/main/scala/stm/Transaction.scala @@ -6,7 +6,6 @@ package se.scalablesolutions.akka.stm import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.TimeUnit import javax.transaction.{TransactionManager, UserTransaction, Status, TransactionSynchronizationRegistry} @@ -19,9 +18,8 @@ import org.multiverse.api.{Transaction => MultiverseTransaction} import org.multiverse.api.lifecycle.{TransactionLifecycleListener, TransactionLifecycleEvent} import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance import org.multiverse.api.ThreadLocalTransaction._ -import org.multiverse.templates.{TransactionTemplate, OrElseTemplate} -import org.multiverse.api.backoff.ExponentialBackoffPolicy -import org.multiverse.stms.alpha.AlphaStm +import org.multiverse.templates.{TransactionalCallable, OrElseTemplate} +import org.multiverse.api.{TraceLevel => MultiverseTraceLevel} import org.multiverse.api.StmUtils class NoTransactionInScopeException extends RuntimeException @@ -50,16 +48,18 @@ object Transaction { */ object Local extends TransactionManagement with Logging { - /** - * See ScalaDoc on Transaction.Local class. - */ - def atomic[T](body: => T): T = { - new TransactionTemplate[T]() { - def execute(mtx: MultiverseTransaction): T = { - Transaction.attach + object DefaultLocalTransactionConfig extends TransactionConfig + object DefaultLocalTransactionFactory extends TransactionFactory(DefaultLocalTransactionConfig, "DefaultLocalTransaction") + + def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultLocalTransactionFactory): T = atomic(factory)(body) + + def atomic[T](factory: TransactionFactory)(body: => T): T = { + factory.boilerplate.execute(new TransactionalCallable[T]() { + def call(mtx: MultiverseTransaction): T = { + factory.addHooks body } - }.execute() + }) } } @@ -68,8 +68,7 @@ object Transaction { * You have to use these if you do need to have one transaction span multiple threads (or Actors). *

* Example of atomic transaction management using the atomic block. - *

- * Here are some examples (assuming implicit transaction family name in scope): + *

: *

    * import se.scalablesolutions.akka.stm.Transaction.Global._
    *
@@ -82,15 +81,16 @@ object Transaction {
    */
   object Global extends TransactionManagement with Logging {
 
-    /**
-     * See ScalaDoc on Transaction.Global class.
-     */
-    def atomic[T](body: => T): T = {
-      var isTopLevelTransaction = false
-      new TransactionTemplate[T]() {
-        def execute(mtx: MultiverseTransaction): T = {
+    object DefaultGlobalTransactionConfig extends TransactionConfig
+    object DefaultGlobalTransactionFactory extends TransactionFactory(DefaultGlobalTransactionConfig, "DefaultGlobalTransaction")
+
+    def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultGlobalTransactionFactory): T = atomic(factory)(body)
+    
+    def atomic[T](factory: TransactionFactory)(body: => T): T = {
+      factory.boilerplate.execute(new TransactionalCallable[T]() {
+        def call(mtx: MultiverseTransaction): T = {
           if (!isTransactionSetInScope) createNewTransactionSet
-          Transaction.attach
+          factory.addHooks
           val result = body
           val txSet = getTransactionSetInScope
           log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet)
@@ -99,7 +99,7 @@ object Transaction {
           clearTransaction
           result
         }
-      }.execute()
+      })
     }
   }
 
@@ -124,7 +124,7 @@ object Transaction {
 
   /**
    * Attach an Akka-specific Transaction to the current Multiverse transaction.
-   * Must be called within a Multiverse transaction.
+   * Must be called within a Multiverse transaction. Used by TransactionFactory.addHooks
    */ 
   private[akka] def attach = {
     val mtx = getRequiredThreadLocalTransaction
@@ -140,6 +140,16 @@ object Transaction {
       }
     })
   }
+
+  /**
+   * Mapping to Multiverse TraceLevel.
+   */ 
+  object TraceLevel {
+    val None = MultiverseTraceLevel.none
+    val Coarse = MultiverseTraceLevel.course // mispelling?
+    val Course = MultiverseTraceLevel.course
+    val Fine = MultiverseTraceLevel.fine    
+  }
 }
 
 /**
@@ -278,3 +288,4 @@ trait Committable {
 trait Abortable {
   def abort: Unit
 }
+
diff --git a/akka-core/src/main/scala/stm/TransactionFactory.scala b/akka-core/src/main/scala/stm/TransactionFactory.scala
new file mode 100644
index 0000000000..9147e3ce36
--- /dev/null
+++ b/akka-core/src/main/scala/stm/TransactionFactory.scala
@@ -0,0 +1,128 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB 
+ */
+
+package se.scalablesolutions.akka.stm
+
+import se.scalablesolutions.akka.config.Config._
+import se.scalablesolutions.akka.util.Duration
+
+import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
+import org.multiverse.stms.alpha.AlphaStm
+import org.multiverse.templates.TransactionBoilerplate
+import org.multiverse.api.TraceLevel
+
+/**
+ * For configuring multiverse transactions.
+ */ 
+object TransactionConfig {
+  val FAMILY_NAME      = "DefaultTransaction"
+  val READONLY         = false
+  val MAX_RETRIES      = config.getInt("akka.stm.max-retries", 1000)
+  val TIMEOUT          = config.getLong("akka.stm.timeout", Long.MaxValue)
+  val TIME_UNIT        = config.getString("akka.stm.time-unit", "seconds")
+  val TRACK_READS      = config.getBool("akka.stm.track-reads", false)
+  val WRITE_SKEW       = config.getBool("akka.stm.write-skew", true)
+  val EXPLICIT_RETRIES = config.getBool("akka.stm.explicit-retries", false)
+  val INTERRUPTIBLE    = config.getBool("akka.stm.interruptible", false)
+  val SPECULATIVE      = config.getBool("akka.stm.speculative", false)
+  val QUICK_RELEASE    = config.getBool("akka.stm.quick-release", true)
+  val TRACE_LEVEL      = traceLevel(config.getString("akka.stm.trace-level", "none"))
+  val HOOKS            = config.getBool("akka.stm.hooks", true)
+
+  val DefaultTimeout = Duration(TIMEOUT, TIME_UNIT)
+
+  def traceLevel(level: String) = level.toLowerCase match {
+    case "coarse" | "course" => Transaction.TraceLevel.Coarse
+    case "fine" => Transaction.TraceLevel.Fine
+    case _ => Transaction.TraceLevel.None
+  }
+
+  def apply(familyName: String       = FAMILY_NAME,
+            readonly: Boolean        = READONLY,
+            maxRetries: Int          = MAX_RETRIES,
+            timeout: Duration        = DefaultTimeout,
+            trackReads: Boolean      = TRACK_READS,
+            writeSkew: Boolean       = WRITE_SKEW,
+            explicitRetries: Boolean = EXPLICIT_RETRIES,
+            interruptible: Boolean   = INTERRUPTIBLE,
+            speculative: Boolean     = SPECULATIVE,
+            quickRelease: Boolean    = QUICK_RELEASE,
+            traceLevel: TraceLevel   = TRACE_LEVEL,
+            hooks: Boolean           = HOOKS) = {
+    new TransactionConfig(familyName, readonly, maxRetries, timeout, trackReads, writeSkew,
+                          explicitRetries, interruptible, speculative, quickRelease, traceLevel, hooks)
+  }
+}
+
+/**
+ * For configuring multiverse transactions.
+ */
+class TransactionConfig(val familyName: String       = TransactionConfig.FAMILY_NAME,
+                        val readonly: Boolean        = TransactionConfig.READONLY,
+                        val maxRetries: Int          = TransactionConfig.MAX_RETRIES,
+                        val timeout: Duration        = TransactionConfig.DefaultTimeout,
+                        val trackReads: Boolean      = TransactionConfig.TRACK_READS,
+                        val writeSkew: Boolean       = TransactionConfig.WRITE_SKEW,
+                        val explicitRetries: Boolean = TransactionConfig.EXPLICIT_RETRIES,
+                        val interruptible: Boolean   = TransactionConfig.INTERRUPTIBLE,
+                        val speculative: Boolean     = TransactionConfig.SPECULATIVE,
+                        val quickRelease: Boolean    = TransactionConfig.QUICK_RELEASE,
+                        val traceLevel: TraceLevel   = TransactionConfig.TRACE_LEVEL,
+                        val hooks: Boolean           = TransactionConfig.HOOKS) 
+
+object DefaultTransactionConfig extends TransactionConfig
+
+/**
+ * Wrapper for transaction config, factory, and boilerplate. Used by atomic.
+ */ 
+object TransactionFactory {
+  def apply(config: TransactionConfig) = new TransactionFactory(config)
+
+  def apply(config: TransactionConfig, defaultName: String) = new TransactionFactory(config, defaultName)
+
+  def apply(familyName: String       = TransactionConfig.FAMILY_NAME,
+            readonly: Boolean        = TransactionConfig.READONLY,
+            maxRetries: Int          = TransactionConfig.MAX_RETRIES,
+            timeout: Duration        = TransactionConfig.DefaultTimeout,
+            trackReads: Boolean      = TransactionConfig.TRACK_READS,
+            writeSkew: Boolean       = TransactionConfig.WRITE_SKEW,
+            explicitRetries: Boolean = TransactionConfig.EXPLICIT_RETRIES,
+            interruptible: Boolean   = TransactionConfig.INTERRUPTIBLE,
+            speculative: Boolean     = TransactionConfig.SPECULATIVE,
+            quickRelease: Boolean    = TransactionConfig.QUICK_RELEASE,
+            traceLevel: TraceLevel   = TransactionConfig.TRACE_LEVEL,
+            hooks: Boolean           = TransactionConfig.HOOKS) = {
+    val config = new TransactionConfig(familyName, readonly, maxRetries, timeout, trackReads, writeSkew,
+                                        explicitRetries, interruptible, speculative, quickRelease, traceLevel, hooks)
+    new TransactionFactory(config)
+  }
+}
+
+/**
+ * Wrapper for transaction config, factory, and boilerplate. Used by atomic.
+ */ 
+class TransactionFactory(val config: TransactionConfig = DefaultTransactionConfig, defaultName: String = TransactionConfig.FAMILY_NAME) {
+  self =>
+
+  // use the config family name if it's been set, otherwise defaultName - used by actors to set class name as default
+  val familyName = if (config.familyName != TransactionConfig.FAMILY_NAME) config.familyName else defaultName
+
+  val factory = (getGlobalStmInstance().asInstanceOf[AlphaStm].getTransactionFactoryBuilder()
+                 .setFamilyName(familyName)
+                 .setReadonly(config.readonly)
+                 .setMaxRetries(config.maxRetries)
+                 .setTimeoutNs(config.timeout.toNanos)
+                 .setReadTrackingEnabled(config.trackReads)
+                 .setWriteSkewAllowed(config.writeSkew)
+                 .setExplicitRetryAllowed(config.explicitRetries)
+                 .setInterruptible(config.interruptible)
+                 .setSpeculativeConfigurationEnabled(config.speculative)
+                 .setQuickReleaseEnabled(config.quickRelease)
+                 .setTraceLevel(config.traceLevel)
+                 .build())
+
+  val boilerplate = new TransactionBoilerplate(factory)
+
+  def addHooks = if (config.hooks) Transaction.attach
+}
diff --git a/akka-core/src/test/scala/RefSpec.scala b/akka-core/src/test/scala/RefSpec.scala
index f30f008619..3b1a1fe3ae 100644
--- a/akka-core/src/test/scala/RefSpec.scala
+++ b/akka-core/src/test/scala/RefSpec.scala
@@ -29,7 +29,7 @@ class RefSpec extends Spec with ShouldMatchers {
       val ref = Ref(3)
 
       try {
-        atomic {
+        atomic(DefaultLocalTransactionFactory) {
           ref.swap(5)
           throw new Exception
         }
diff --git a/akka-core/src/test/scala/StmSpec.scala b/akka-core/src/test/scala/StmSpec.scala
index fb560f2ac4..f08116f4af 100644
--- a/akka-core/src/test/scala/StmSpec.scala
+++ b/akka-core/src/test/scala/StmSpec.scala
@@ -70,7 +70,7 @@ class StmSpec extends
         ref.get.getOrElse(0)
       }
       try {
-        atomic {
+        atomic(DefaultLocalTransactionFactory) {
           increment
           increment
           throw new Exception