diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala index 7539c72ae0..06e2b78ac2 100644 --- a/akka-core/src/main/scala/stm/Transaction.scala +++ b/akka-core/src/main/scala/stm/Transaction.scala @@ -16,11 +16,8 @@ import se.scalablesolutions.akka.config.Config._ import org.multiverse.api.{Transaction => MultiverseTransaction} import org.multiverse.api.lifecycle.{TransactionLifecycleListener, TransactionLifecycleEvent} -import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance import org.multiverse.api.ThreadLocalTransaction._ -import org.multiverse.templates.{TransactionalCallable, OrElseTemplate} import org.multiverse.api.{TraceLevel => MultiverseTraceLevel} -import org.multiverse.api.StmUtils class NoTransactionInScopeException extends RuntimeException class TransactionRetryException(message: String) extends RuntimeException(message) @@ -46,22 +43,8 @@ object Transaction { * * @author Jonas Bonér */ - object Local extends TransactionManagement with Logging { - - object DefaultLocalTransactionConfig extends TransactionConfig - object DefaultLocalTransactionFactory extends TransactionFactory(DefaultLocalTransactionConfig, "DefaultLocalTransaction") - - def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultLocalTransactionFactory): T = atomic(factory)(body) - - def atomic[T](factory: TransactionFactory)(body: => T): T = { - factory.boilerplate.execute(new TransactionalCallable[T]() { - def call(mtx: MultiverseTransaction): T = { - factory.addHooks - body - } - }) - } - } + @deprecated("Use the akka.stm.local package object instead.") + object Local extends LocalStm /** * Module for "global" transaction management, global in the context of multiple threads. @@ -79,48 +62,13 @@ object Transaction { * * @author Jonas Bonér */ - object Global extends TransactionManagement with Logging { - - object DefaultGlobalTransactionConfig extends TransactionConfig - object DefaultGlobalTransactionFactory extends TransactionFactory(DefaultGlobalTransactionConfig, "DefaultGlobalTransaction") - - def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultGlobalTransactionFactory): T = atomic(factory)(body) - - def atomic[T](factory: TransactionFactory)(body: => T): T = { - factory.boilerplate.execute(new TransactionalCallable[T]() { - def call(mtx: MultiverseTransaction): T = { - if (!isTransactionSetInScope) createNewTransactionSet - factory.addHooks - val result = body - val txSet = getTransactionSetInScope - log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet) - // FIXME ? txSet.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS) - txSet.joinCommit(mtx) - clearTransaction - result - } - }) - } - } + @deprecated("Use the akka.stm.global package object instead.") + object Global extends GlobalStm /** * TODO: document */ - object Util { - - def deferred[T](body: => T): Unit = StmUtils.scheduleDeferredTask(new Runnable { def run = body }) - - def compensating[T](body: => T): Unit = StmUtils.scheduleCompensatingTask(new Runnable { def run = body }) - - def retry = StmUtils.retry - - def either[T](firstBody: => T) = new { - def orElse(secondBody: => T) = new OrElseTemplate[T] { - def either(mtx: MultiverseTransaction) = firstBody - def orelse(mtx: MultiverseTransaction) = secondBody - }.execute() - } - } + object Util extends StmUtil /** * Attach an Akka-specific Transaction to the current Multiverse transaction. diff --git a/akka-core/src/main/scala/stm/TransactionManagement.scala b/akka-core/src/main/scala/stm/TransactionManagement.scala index dd9a4e28df..c92ddc6b51 100644 --- a/akka-core/src/main/scala/stm/TransactionManagement.scala +++ b/akka-core/src/main/scala/stm/TransactionManagement.scala @@ -8,8 +8,11 @@ import se.scalablesolutions.akka.util.Logging import java.util.concurrent.atomic.AtomicBoolean +import org.multiverse.api.StmUtils import org.multiverse.api.ThreadLocalTransaction._ +import org.multiverse.api.{Transaction => MultiverseTransaction} import org.multiverse.commitbarriers.CountDownCommitBarrier +import org.multiverse.templates.{TransactionalCallable, OrElseTemplate} class StmException(msg: String) extends RuntimeException(msg) @@ -20,8 +23,10 @@ class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Tran object TransactionManagement extends TransactionManagement { import se.scalablesolutions.akka.config.Config._ - val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", true)) - val FAIR_TRANSACTIONS = config.getBool("akka.stm.fair", true) + // is this needed? + val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", true)) + // move to stm.global.fair? + val FAIR_TRANSACTIONS = config.getBool("akka.stm.fair", true) def isTransactionalityEnabled = TRANSACTION_ENABLED.get @@ -85,3 +90,72 @@ trait TransactionManagement { (option ne null) && option.isDefined } } + +class LocalStm extends TransactionManagement with Logging { + + val DefaultLocalTransactionConfig = TransactionConfig() + val DefaultLocalTransactionFactory = TransactionFactory(DefaultLocalTransactionConfig, "DefaultLocalTransaction") + + def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultLocalTransactionFactory): T = atomic(factory)(body) + + def atomic[T](factory: TransactionFactory)(body: => T): T = { + factory.boilerplate.execute(new TransactionalCallable[T]() { + def call(mtx: MultiverseTransaction): T = { + factory.addHooks + body + } + }) + } +} + +class GlobalStm extends TransactionManagement with Logging { + + val DefaultGlobalTransactionConfig = TransactionConfig() + val DefaultGlobalTransactionFactory = TransactionFactory(DefaultGlobalTransactionConfig, "DefaultGlobalTransaction") + + def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultGlobalTransactionFactory): T = atomic(factory)(body) + + def atomic[T](factory: TransactionFactory)(body: => T): T = { + factory.boilerplate.execute(new TransactionalCallable[T]() { + def call(mtx: MultiverseTransaction): T = { + if (!isTransactionSetInScope) createNewTransactionSet + factory.addHooks + val result = body + val txSet = getTransactionSetInScope + log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet) + // FIXME ? txSet.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS) + txSet.joinCommit(mtx) + clearTransaction + result + } + }) + } +} + +trait StmUtil { + + def deferred[T](body: => T): Unit = StmUtils.scheduleDeferredTask(new Runnable { def run = body }) + + def compensating[T](body: => T): Unit = StmUtils.scheduleCompensatingTask(new Runnable { def run = body }) + + def retry = StmUtils.retry + + def either[T](firstBody: => T) = new { + def orElse(secondBody: => T) = new OrElseTemplate[T] { + def either(mtx: MultiverseTransaction) = firstBody + def orelse(mtx: MultiverseTransaction) = secondBody + }.execute() + } +} + +trait StmCommon { + type TransactionConfig = se.scalablesolutions.akka.stm.TransactionConfig + val TransactionConfig = se.scalablesolutions.akka.stm.TransactionConfig + + type TransactionFactory = se.scalablesolutions.akka.stm.TransactionFactory + val TransactionFactory = se.scalablesolutions.akka.stm.TransactionFactory + + type Ref[T] = se.scalablesolutions.akka.stm.Ref[T] + val Ref = se.scalablesolutions.akka.stm.Ref +} + diff --git a/akka-core/src/main/scala/stm/packages.scala b/akka-core/src/main/scala/stm/packages.scala new file mode 100644 index 0000000000..4e4810573d --- /dev/null +++ b/akka-core/src/main/scala/stm/packages.scala @@ -0,0 +1,9 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.stm + +package object local extends LocalStm with StmUtil with StmCommon + +package object global extends GlobalStm with StmUtil with StmCommon