From 74f7d474ab2919e378a5d496a8ea1c205652e27f Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Wed, 23 Mar 2011 18:33:44 +1300 Subject: [PATCH] Remove akka-specific transaction and hooks --- .../src/main/scala/Ants.scala | 6 +- akka-stm/src/main/scala/akka/stm/Ref.scala | 7 + akka-stm/src/main/scala/akka/stm/Stm.scala | 5 +- .../src/main/scala/akka/stm/Transaction.scala | 252 ------------------ .../scala/akka/stm/TransactionFactory.scala | 18 +- .../akka/stm/TransactionFactoryBuilder.scala | 8 +- .../scala/akka/transactor/Coordinated.scala | 1 - .../src/test/scala/config/ConfigSpec.scala | 2 - config/akka-reference.conf | 3 - 9 files changed, 18 insertions(+), 284 deletions(-) delete mode 100644 akka-stm/src/main/scala/akka/stm/Transaction.scala diff --git a/akka-samples/akka-sample-ants/src/main/scala/Ants.scala b/akka-samples/akka-sample-ants/src/main/scala/Ants.scala index 79b14d2e78..1eb38d6f0f 100644 --- a/akka-samples/akka-sample-ants/src/main/scala/Ants.scala +++ b/akka-samples/akka-sample-ants/src/main/scala/Ants.scala @@ -67,7 +67,7 @@ object World { lazy val ants = setup lazy val evaporator = actorOf[Evaporator].start - private val snapshotFactory = TransactionFactory(readonly = true, familyName = "snapshot", hooks = false) + private val snapshotFactory = TransactionFactory(readonly = true, familyName = "snapshot") def snapshot = atomic(snapshotFactory) { Array.tabulate(Dim, Dim)(place(_, _).opt) } @@ -138,7 +138,7 @@ class AntActor(initLoc: (Int, Int)) extends WorldActor { val locRef = Ref(initLoc) val name = "ant-from-" + initLoc._1 + "-" + initLoc._2 - implicit val txFactory = TransactionFactory(familyName = name, hooks = false) + implicit val txFactory = TransactionFactory(familyName = name) val homing = (p: Place) => p.pher + (100 * (if (p.home) 0 else 1)) val foraging = (p: Place) => p.pher + p.food @@ -210,7 +210,7 @@ class Evaporator extends WorldActor { import Config._ import World._ - implicit val txFactory = TransactionFactory(familyName = "evaporator", hooks = false) + implicit val txFactory = TransactionFactory(familyName = "evaporator") val evaporate = (pher: Float) => pher * EvapRate def act = for (x <- 0 until Dim; y <- 0 until Dim) { diff --git a/akka-stm/src/main/scala/akka/stm/Ref.scala b/akka-stm/src/main/scala/akka/stm/Ref.scala index d6ef09bccd..74b1bf5a9e 100644 --- a/akka-stm/src/main/scala/akka/stm/Ref.scala +++ b/akka-stm/src/main/scala/akka/stm/Ref.scala @@ -8,6 +8,13 @@ import akka.actor.{newUuid, Uuid} import org.multiverse.transactional.refs.BasicRef +/** + * Common trait for all the transactional objects. + */ +@serializable trait Transactional { + val uuid: String +} + /** * Transactional managed reference. See the companion class for more information. */ diff --git a/akka-stm/src/main/scala/akka/stm/Stm.scala b/akka-stm/src/main/scala/akka/stm/Stm.scala index be511f0e2c..6e949b1ded 100644 --- a/akka-stm/src/main/scala/akka/stm/Stm.scala +++ b/akka-stm/src/main/scala/akka/stm/Stm.scala @@ -48,10 +48,7 @@ trait Stm { def atomic[T](factory: TransactionFactory)(body: => T): T = { factory.boilerplate.execute(new TransactionalCallable[T]() { - def call(mtx: MultiverseTransaction): T = { - factory.addHooks - body - } + def call(mtx: MultiverseTransaction): T = body }) } } diff --git a/akka-stm/src/main/scala/akka/stm/Transaction.scala b/akka-stm/src/main/scala/akka/stm/Transaction.scala deleted file mode 100644 index b2f0caaf07..0000000000 --- a/akka-stm/src/main/scala/akka/stm/Transaction.scala +++ /dev/null @@ -1,252 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package akka.stm - -import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.atomic.AtomicInteger - -import scala.collection.mutable.HashMap - -import akka.util.ReflectiveAccess -import akka.config.Config._ -import akka.config.ModuleNotAvailableException -import akka.AkkaException - -import org.multiverse.api.{Transaction => MultiverseTransaction} -import org.multiverse.api.lifecycle.{TransactionLifecycleListener, TransactionLifecycleEvent} -import org.multiverse.api.ThreadLocalTransaction._ -import org.multiverse.api.{PropagationLevel => MultiversePropagationLevel} -import org.multiverse.api.{TraceLevel => MultiverseTraceLevel} - -class NoTransactionInScopeException extends AkkaException("No transaction in scope") -class TransactionRetryException(message: String) extends AkkaException(message) -class StmConfigurationException(message: String) extends AkkaException(message) - - -/** - * Internal helper methods for managing Akka-specific transaction. - */ -object TransactionManagement extends TransactionManagement { - private[akka] val transaction = new ThreadLocal[Option[Transaction]]() { - override protected def initialValue: Option[Transaction] = None - } - - private[akka] def getTransaction: Transaction = { - val option = transaction.get - if ((option eq null) || option.isEmpty) throw new StmConfigurationException("No Transaction in scope") - option.get - } -} - -/** - * Internal helper methods for managing Akka-specific transaction. - */ -trait TransactionManagement { - private[akka] def setTransaction(tx: Option[Transaction]) = - if (tx.isDefined) TransactionManagement.transaction.set(tx) - - private[akka] def clearTransaction = { - TransactionManagement.transaction.set(None) - setThreadLocalTransaction(null) - } - - private[akka] def getTransactionInScope = TransactionManagement.getTransaction - - private[akka] def isTransactionInScope = { - val option = TransactionManagement.transaction.get - (option ne null) && option.isDefined - } -} - -object Transaction { - val idFactory = new AtomicLong(-1L) - - /** - * Attach an Akka-specific Transaction to the current Multiverse transaction. - * Must be called within a Multiverse transaction. Used by TransactionFactory.addHooks - */ - private[akka] def attach = { - val mtx = getRequiredThreadLocalTransaction - val tx = new Transaction - tx.begin - tx.transaction = Some(mtx) - TransactionManagement.transaction.set(Some(tx)) - mtx.registerLifecycleListener(new TransactionLifecycleListener() { - def notify(mtx: MultiverseTransaction, event: TransactionLifecycleEvent) = event match { - case TransactionLifecycleEvent.PostCommit => tx.commitJta - case TransactionLifecycleEvent.PreCommit => tx.commitPersistentState - case TransactionLifecycleEvent.PostAbort => tx.abort - case _ => {} - } - }) - } -} - -/** - * The Akka-specific Transaction class. - * For integration with persistence modules and JTA support. - */ -@serializable class Transaction { - val JTA_AWARE = config.getBool("akka.stm.jta-aware", false) - val STATE_RETRIES = config.getInt("akka.storage.max-retries",10) - - val id = Transaction.idFactory.incrementAndGet - @volatile private[this] var status: TransactionStatus = TransactionStatus.New - private[akka] var transaction: Option[MultiverseTransaction] = None - private[this] val persistentStateMap = new HashMap[String, Committable with Abortable] - private[akka] val depth = new AtomicInteger(0) - - val jta: Option[ReflectiveJtaModule.TransactionContainer] = - if (JTA_AWARE) Some(ReflectiveJtaModule.createTransactionContainer) - else None - - // --- public methods --------- - - def begin = synchronized { - jta.foreach { _.beginWithStmSynchronization(this) } - } - - def commitPersistentState = synchronized { - retry(STATE_RETRIES){ - persistentStateMap.valuesIterator.foreach(_.commit) - persistentStateMap.clear - } - status = TransactionStatus.Completed - } - - def commitJta = synchronized { - jta.foreach(_.commit) - } - - def abort = synchronized { - jta.foreach(_.rollback) - persistentStateMap.valuesIterator.foreach(_.abort) - persistentStateMap.clear - } - - def retry(tries:Int)(block: => Unit):Unit={ - if(tries==0){ - throw new TransactionRetryException("Exhausted Retries while committing persistent state") - } - try{ - block - } catch{ - case e:Exception=>{ - retry(tries-1){block} - } - } - } - - def isNew = synchronized { status == TransactionStatus.New } - - def isActive = synchronized { status == TransactionStatus.Active } - - def isCompleted = synchronized { status == TransactionStatus.Completed } - - def isAborted = synchronized { status == TransactionStatus.Aborted } - - // --- internal methods --------- - - //private def isJtaTxActive(status: Int) = status == Status.STATUS_ACTIVE - - private[akka] def status_? = status - - private[akka] def increment = depth.incrementAndGet - - private[akka] def decrement = depth.decrementAndGet - - private[akka] def isTopLevel = depth.get == 0 - //when calling this method, make sure to prefix the uuid with the type so you - //have no possibility of kicking a diffferent type with the same uuid out of a transction - private[akka] def register(uuid: String, storage: Committable with Abortable) = { - if(persistentStateMap.getOrElseUpdate(uuid, {storage}) ne storage){ - throw new IllegalStateException("attempted to register an instance of persistent data structure for id [%s] when there is already a different instance registered".format(uuid)) - } - } - - private def ensureIsActive = if (status != TransactionStatus.Active) - throw new StmConfigurationException( - "Expected ACTIVE transaction - current status [" + status + "]: " + toString) - - private def ensureIsActiveOrAborted = - if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted)) - throw new StmConfigurationException( - "Expected ACTIVE or ABORTED transaction - current status [" + status + "]: " + toString) - - private def ensureIsActiveOrNew = - if (!(status == TransactionStatus.Active || status == TransactionStatus.New)) - throw new StmConfigurationException( - "Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString) - - override def equals(that: Any): Boolean = synchronized { - that.isInstanceOf[Transaction] && - that.asInstanceOf[Transaction].id == this.id - } - - override def hashCode: Int = synchronized { id.toInt } - - override def toString = synchronized { "Transaction[" + id + ", " + status + "]" } -} - -@serializable sealed abstract class TransactionStatus - -object TransactionStatus { - case object New extends TransactionStatus - case object Active extends TransactionStatus - case object Aborted extends TransactionStatus - case object Completed extends TransactionStatus -} - -/** - * Common trait for all the transactional objects: - * Ref, TransactionalMap, TransactionalVector, - * PersistentRef, PersistentMap, PersistentVector, PersistentQueue, PersistentSortedSet - */ -@serializable trait Transactional { - val uuid: String -} - -/** - * Used for integration with the persistence modules. - */ -trait Committable { - def commit(): Unit -} - -/** - * Used for integration with the persistence modules. - */ -trait Abortable { - def abort(): Unit -} - -/** - * Used internally for reflective access to the JTA module. - * Allows JTA integration to work when akka-jta.jar is on the classpath. - */ -object ReflectiveJtaModule { - type TransactionContainerObject = { - def apply(): TransactionContainer - } - - type TransactionContainer = { - def beginWithStmSynchronization(transaction: Transaction): Unit - def commit: Unit - def rollback: Unit - } - - lazy val isJtaEnabled = transactionContainerObjectInstance.isDefined - - def ensureJtaEnabled = if (!isJtaEnabled) throw new ModuleNotAvailableException( - "Can't load the JTA module, make sure that akka-jta.jar is on the classpath") - - val transactionContainerObjectInstance: Option[TransactionContainerObject] = - ReflectiveAccess.getObjectFor("akka.jta.TransactionContainer$") - - def createTransactionContainer: TransactionContainer = { - ensureJtaEnabled - transactionContainerObjectInstance.get.apply.asInstanceOf[TransactionContainer] - } -} diff --git a/akka-stm/src/main/scala/akka/stm/TransactionFactory.scala b/akka-stm/src/main/scala/akka/stm/TransactionFactory.scala index 0bb0caa494..d04e017a6b 100644 --- a/akka-stm/src/main/scala/akka/stm/TransactionFactory.scala +++ b/akka-stm/src/main/scala/akka/stm/TransactionFactory.scala @@ -32,7 +32,6 @@ object TransactionConfig { val QUICK_RELEASE = config.getBool("akka.stm.quick-release", true) val PROPAGATION = propagation(config.getString("akka.stm.propagation", "requires")) val TRACE_LEVEL = traceLevel(config.getString("akka.stm.trace-level", "none")) - val HOOKS = config.getBool("akka.stm.hooks", true) val DefaultTimeout = Duration(TIMEOUT, TIME_UNIT) @@ -65,7 +64,6 @@ object TransactionConfig { * @param quickRelease Whether locks should be released as quickly as possible (before whole commit). * @param propagation For controlling how nested transactions behave. * @param traceLevel Transaction trace level. - * @param hooks Whether hooks for persistence modules and JTA should be added to the transaction. */ def apply(familyName: String = FAMILY_NAME, readonly: JBoolean = READONLY, @@ -78,10 +76,9 @@ object TransactionConfig { speculative: Boolean = SPECULATIVE, quickRelease: Boolean = QUICK_RELEASE, propagation: MPropagation = PROPAGATION, - traceLevel: MTraceLevel = TRACE_LEVEL, - hooks: Boolean = HOOKS) = { + traceLevel: MTraceLevel = TRACE_LEVEL) = { new TransactionConfig(familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed, - interruptible, speculative, quickRelease, propagation, traceLevel, hooks) + interruptible, speculative, quickRelease, propagation, traceLevel) } } @@ -100,7 +97,6 @@ object TransactionConfig { *

quickRelease - Whether locks should be released as quickly as possible (before whole commit). *

propagation - For controlling how nested transactions behave. *

traceLevel - Transaction trace level. - *

hooks - Whether hooks for persistence modules and JTA should be added to the transaction. */ class TransactionConfig(val familyName: String = TransactionConfig.FAMILY_NAME, val readonly: JBoolean = TransactionConfig.READONLY, @@ -113,8 +109,7 @@ class TransactionConfig(val familyName: String = TransactionConfig.FAMILY val speculative: Boolean = TransactionConfig.SPECULATIVE, val quickRelease: Boolean = TransactionConfig.QUICK_RELEASE, val propagation: MPropagation = TransactionConfig.PROPAGATION, - val traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL, - val hooks: Boolean = TransactionConfig.HOOKS) + val traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL) object DefaultTransactionConfig extends TransactionConfig @@ -137,11 +132,10 @@ object TransactionFactory { speculative: Boolean = TransactionConfig.SPECULATIVE, quickRelease: Boolean = TransactionConfig.QUICK_RELEASE, propagation: MPropagation = TransactionConfig.PROPAGATION, - traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL, - hooks: Boolean = TransactionConfig.HOOKS) = { + traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL) = { val config = new TransactionConfig( familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed, - interruptible, speculative, quickRelease, propagation, traceLevel, hooks) + interruptible, speculative, quickRelease, propagation, traceLevel) new TransactionFactory(config) } } @@ -199,8 +193,6 @@ class TransactionFactory( } val boilerplate = new TransactionBoilerplate(factory) - - def addHooks = if (config.hooks) Transaction.attach } /** diff --git a/akka-stm/src/main/scala/akka/stm/TransactionFactoryBuilder.scala b/akka-stm/src/main/scala/akka/stm/TransactionFactoryBuilder.scala index 0765652c6a..b71f68c375 100644 --- a/akka-stm/src/main/scala/akka/stm/TransactionFactoryBuilder.scala +++ b/akka-stm/src/main/scala/akka/stm/TransactionFactoryBuilder.scala @@ -27,7 +27,6 @@ class TransactionConfigBuilder { var quickRelease: Boolean = TransactionConfig.QUICK_RELEASE var propagation: MPropagation = TransactionConfig.PROPAGATION var traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL - var hooks: Boolean = TransactionConfig.HOOKS def setFamilyName(familyName: String) = { this.familyName = familyName; this } def setReadonly(readonly: JBoolean) = { this.readonly = readonly; this } @@ -41,11 +40,10 @@ class TransactionConfigBuilder { def setQuickRelease(quickRelease: Boolean) = { this.quickRelease = quickRelease; this } def setPropagation(propagation: MPropagation) = { this.propagation = propagation; this } def setTraceLevel(traceLevel: MTraceLevel) = { this.traceLevel = traceLevel; this } - def setHooks(hooks: Boolean) = { this.hooks = hooks; this } def build() = new TransactionConfig( familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed, - interruptible, speculative, quickRelease, propagation, traceLevel, hooks) + interruptible, speculative, quickRelease, propagation, traceLevel) } /** @@ -64,7 +62,6 @@ class TransactionFactoryBuilder { var quickRelease: Boolean = TransactionConfig.QUICK_RELEASE var propagation: MPropagation = TransactionConfig.PROPAGATION var traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL - var hooks: Boolean = TransactionConfig.HOOKS def setFamilyName(familyName: String) = { this.familyName = familyName; this } def setReadonly(readonly: JBoolean) = { this.readonly = readonly; this } @@ -78,12 +75,11 @@ class TransactionFactoryBuilder { def setQuickRelease(quickRelease: Boolean) = { this.quickRelease = quickRelease; this } def setPropagation(propagation: MPropagation) = { this.propagation = propagation; this } def setTraceLevel(traceLevel: MTraceLevel) = { this.traceLevel = traceLevel; this } - def setHooks(hooks: Boolean) = { this.hooks = hooks; this } def build() = { val config = new TransactionConfig( familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed, - interruptible, speculative, quickRelease, propagation, traceLevel, hooks) + interruptible, speculative, quickRelease, propagation, traceLevel) new TransactionFactory(config) } } diff --git a/akka-stm/src/main/scala/akka/transactor/Coordinated.scala b/akka-stm/src/main/scala/akka/transactor/Coordinated.scala index 2de7747607..8ce08cf624 100644 --- a/akka-stm/src/main/scala/akka/transactor/Coordinated.scala +++ b/akka-stm/src/main/scala/akka/transactor/Coordinated.scala @@ -129,7 +129,6 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) { def atomic[T](factory: TransactionFactory)(body: => T): T = { factory.boilerplate.execute(new TransactionalCallable[T]() { def call(mtx: MultiverseTransaction): T = { - factory.addHooks val result = body val timeout = factory.config.timeout barrier.tryJoinCommit(mtx, timeout.length, timeout.unit) diff --git a/akka-stm/src/test/scala/config/ConfigSpec.scala b/akka-stm/src/test/scala/config/ConfigSpec.scala index 69f76eb89b..4108a99d63 100644 --- a/akka-stm/src/test/scala/config/ConfigSpec.scala +++ b/akka-stm/src/test/scala/config/ConfigSpec.scala @@ -20,9 +20,7 @@ class ConfigSpec extends WordSpec with MustMatchers { getBool("akka.stm.blocking-allowed") must equal(Some(false)) getBool("akka.stm.fair") must equal(Some(true)) - getBool("akka.stm.hooks") must equal(Some(true)) getBool("akka.stm.interruptible") must equal(Some(false)) - getBool("akka.stm.jta-aware") must equal(Some(false)) getInt("akka.stm.max-retries") must equal(Some(1000)) getString("akka.stm.propagation") must equal(Some("requires")) getBool("akka.stm.quick-release") must equal(Some(true)) diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 458f937e5e..3282a3e2c7 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -71,9 +71,6 @@ akka { quick-release = true propagation = "requires" trace-level = "none" - hooks = true - jta-aware = off # Option 'on' means that if there JTA Transaction Manager available then the STM will - # begin (or join), commit or rollback the JTA transaction. Default is 'off'. } jta {