org.scala-tools
javautils
diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala
index 41b2673bfa..a6d9231b10 100644
--- a/akka-core/src/main/scala/actor/Actor.scala
+++ b/akka-core/src/main/scala/actor/Actor.scala
@@ -911,10 +911,10 @@ trait Actor extends TransactionManagement {
try {
if (isTransactionRequiresNew && !isTransactionInScope) {
- if (senderFuture.isEmpty) throw new StmException(
- "Can't continue transaction in a one-way fire-forget message send" +
- "\n\tE.g. using Actor '!' method or Active Object 'void' method" +
- "\n\tPlease use the Actor '!!' method or Active Object method with non-void return type")
+ //if (senderFuture.isEmpty) throw new StmException(
+ // "Can't continue transaction in a one-way fire-forget message send" +
+ // "\n\tE.g. using Actor '!' method or Active Object 'void' method" +
+ // "\n\tPlease use the Actor '!!' method or Active Object method with non-void return type")
atomic {
proceed
}
diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala
index 1637b4c906..d535f98433 100644
--- a/akka-core/src/main/scala/stm/Transaction.scala
+++ b/akka-core/src/main/scala/stm/Transaction.scala
@@ -13,9 +13,10 @@ import se.scalablesolutions.akka.util.Logging
import org.multiverse.api.{Transaction => MultiverseTransaction}
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
import org.multiverse.api.ThreadLocalTransaction._
-import org.multiverse.templates.OrElseTemplate
+import org.multiverse.commitbarriers.VetoCommitBarrier
import scala.collection.mutable.HashMap
+import org.multiverse.templates.{TransactionTemplate, OrElseTemplate}
class NoTransactionInScopeException extends RuntimeException
class TransactionRetryException(message: String) extends RuntimeException(message)
@@ -30,8 +31,8 @@ class TransactionRetryException(message: String) extends RuntimeException(messag
* Here are some examples (assuming implicit transaction family name in scope):
*
* import se.scalablesolutions.akka.stm.Transaction._
- *
- * atomic {
+ *
+ * atomic {
* .. // do something within a transaction
* }
*
@@ -39,8 +40,8 @@ class TransactionRetryException(message: String) extends RuntimeException(messag
* Example of atomic transaction management using atomic block with retry count:
*
* import se.scalablesolutions.akka.stm.Transaction._
- *
- * atomic(maxNrOfRetries) {
+ *
+ * atomic(maxNrOfRetries) {
* .. // do something within a transaction
* }
*
@@ -49,10 +50,10 @@ class TransactionRetryException(message: String) extends RuntimeException(messag
* Which is a good way to reduce contention and transaction collisions.
*
* import se.scalablesolutions.akka.stm.Transaction._
- *
- * atomically {
+ *
+ * atomically {
* .. // try to do something
- * } orElse {
+ * } orElse {
* .. // if transaction clashes try do do something else to minimize contention
* }
*
@@ -61,11 +62,11 @@ class TransactionRetryException(message: String) extends RuntimeException(messag
*
*
* import se.scalablesolutions.akka.stm.Transaction._
- * for (tx <- Transaction) {
+ * for (tx <- Transaction) {
* ... // do transactional stuff
* }
*
- * val result = for (tx <- Transaction) yield {
+ * val result = for (tx <- Transaction) yield {
* ... // do transactional stuff yielding a result
* }
*
@@ -78,17 +79,17 @@ class TransactionRetryException(message: String) extends RuntimeException(messag
*
* // You can use them together with Transaction in a for comprehension since
* // TransactionalRef is also monadic
- * for {
+ * for {
* tx <- Transaction
* ref <- refs
* } {
* ... // use the ref inside a transaction
* }
*
- * val result = for {
+ * val result = for {
* tx <- Transaction
* ref <- refs
- * } yield {
+ * } yield {
* ... // use the ref inside a transaction, yield a result
* }
*
@@ -101,57 +102,61 @@ object Transaction extends TransactionManagement {
/**
* See ScalaDoc on class.
*/
- def map[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic { f(getTransactionInScope) }
+ def map[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic {f(getTransactionInScope)}
/**
* See ScalaDoc on class.
*/
- def flatMap[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic { f(getTransactionInScope) }
+ def flatMap[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic {f(getTransactionInScope)}
/**
* See ScalaDoc on class.
*/
- def foreach(f: Transaction => Unit)(implicit transactionFamilyName: String): Unit = atomic { f(getTransactionInScope) }
+ def foreach(f: Transaction => Unit)(implicit transactionFamilyName: String): Unit = atomic {f(getTransactionInScope)}
/**
* Creates a "pure" STM atomic transaction and by-passes all transactions hooks
* such as persistence etc.
* Only for internal usage.
*/
- private[akka] def pureAtomic[T](body: => T): T = new AtomicTemplate[T](
- getGlobalStmInstance, "internal", false, false, TransactionManagement.MAX_NR_OF_RETRIES) {
+ private[akka] def pureAtomic[T](body: => T): T = new TransactionTemplate[T]() {
def execute(mtx: MultiverseTransaction): T = body
}.execute()
/**
* See ScalaDoc on class.
*/
- def atomic[T](body: => T)(implicit transactionFamilyName: String): T = new AtomicTemplate[T](
- getGlobalStmInstance, transactionFamilyName, false, false, TransactionManagement.MAX_NR_OF_RETRIES) {
- def execute(mtx: MultiverseTransaction): T = body
- override def postStart(mtx: MultiverseTransaction) = {
- val tx = new Transaction
- tx.transaction = Some(mtx)
- setTransaction(Some(tx))
- }
- override def postCommit = {
- if (isTransactionInScope) getTransactionInScope.commit
- else throw new IllegalStateException("No transaction in scope")
- }
- }.execute()
+ def atomic[T](body: => T)(implicit transactionFamilyName: String): T = {
+ new TransactionTemplate[T]() { // FIXME take factory
+ def execute(mtx: MultiverseTransaction): T = body
+
+ override def onStart(mtx: MultiverseTransaction) = {
+ val tx = new Transaction
+ tx.transaction = Some(mtx)
+ setTransaction(Some(tx))
+ }
+
+ override def onPostCommit = {
+ if (isTransactionInScope) getTransactionInScope.commit
+ else throw new IllegalStateException("No transaction in scope")
+ }
+ }.execute()
+ }
/**
* See ScalaDoc on class.
*/
def atomic[T](retryCount: Int)(body: => T)(implicit transactionFamilyName: String): T = {
- new AtomicTemplate[T](getGlobalStmInstance, transactionFamilyName, false, false, retryCount) {
+ new TransactionTemplate[T]() { // FIXME take factory
def execute(mtx: MultiverseTransaction): T = body
- override def postStart(mtx: MultiverseTransaction) = {
+
+ override def onStart(mtx: MultiverseTransaction) = {
val tx = new Transaction
tx.transaction = Some(mtx)
setTransaction(Some(tx))
}
- override def postCommit = {
+
+ override def onPostCommit = {
if (isTransactionInScope) getTransactionInScope.commit
else throw new IllegalStateException("No transaction in scope")
}
@@ -162,14 +167,16 @@ object Transaction extends TransactionManagement {
* See ScalaDoc on class.
*/
def atomicReadOnly[T](retryCount: Int)(body: => T)(implicit transactionFamilyName: String): T = {
- new AtomicTemplate[T](getGlobalStmInstance, transactionFamilyName, false, true, retryCount) {
+ new TransactionTemplate[T]() { // FIXME take factory
def execute(mtx: MultiverseTransaction): T = body
- override def postStart(mtx: MultiverseTransaction) = {
+
+ override def onStart(mtx: MultiverseTransaction) = {
val tx = new Transaction
tx.transaction = Some(mtx)
setTransaction(Some(tx))
}
- override def postCommit = {
+
+ override def onPostCommit = {
if (isTransactionInScope) getTransactionInScope.commit
else throw new IllegalStateException("No transaction in scope")
}
@@ -180,14 +187,16 @@ object Transaction extends TransactionManagement {
* See ScalaDoc on class.
*/
def atomicReadOnly[T](body: => T): T = {
- new AtomicTemplate[T](true) {
+ new TransactionTemplate[T]() { // FIXME take factory
def execute(mtx: MultiverseTransaction): T = body
- override def postStart(mtx: MultiverseTransaction) = {
+
+ override def onStart(mtx: MultiverseTransaction) = {
val tx = new Transaction
tx.transaction = Some(mtx)
setTransaction(Some(tx))
}
- override def postCommit = {
+
+ override def onPostCommit = {
if (isTransactionInScope) getTransactionInScope.commit
else throw new IllegalStateException("No transaction in scope")
}
@@ -206,6 +215,7 @@ object Transaction extends TransactionManagement {
def elseBody[A](firstBody: => A) = new {
def orElse(secondBody: => A) = new OrElseTemplate[A] {
def run(t: MultiverseTransaction) = firstBody
+
def orelserun(t: MultiverseTransaction) = secondBody
}.execute()
}
@@ -216,30 +226,38 @@ object Transaction extends TransactionManagement {
*/
@serializable class Transaction extends Logging {
import Transaction._
-
+
val id = Transaction.idFactory.incrementAndGet
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
private[akka] var transaction: Option[MultiverseTransaction] = None
private[this] val persistentStateMap = new HashMap[String, Committable]
private[akka] val depth = new AtomicInteger(0)
-
+ //private[akka] val transactionSet = new VetoCommitBarrier
+
+ //transactionSet.vetoCommit(this)
+
// --- public methods ---------
+ def abort = synchronized {
+ // transactionSet.abort
+ }
+
def commit = synchronized {
pureAtomic {
persistentStateMap.values.foreach(_.commit)
TransactionManagement.clearTransaction
}
+ //transactionSet.vetoCommit(this)
status = TransactionStatus.Completed
}
- def isNew = synchronized { status == TransactionStatus.New }
+ def isNew = synchronized {status == TransactionStatus.New}
- def isActive = synchronized { status == TransactionStatus.Active }
+ def isActive = synchronized {status == TransactionStatus.Active}
- def isCompleted = synchronized { status == TransactionStatus.Completed }
+ def isCompleted = synchronized {status == TransactionStatus.Completed}
- def isAborted = synchronized { status == TransactionStatus.Aborted }
+ def isAborted = synchronized {status == TransactionStatus.Aborted}
// --- internal methods ---------
@@ -259,13 +277,13 @@ object Transaction extends TransactionManagement {
private def ensureIsActiveOrAborted =
if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted))
- throw new IllegalStateException(
- "Expected ACTIVE or ABORTED transaction - current status [" + status + "]: " + toString)
+ throw new IllegalStateException(
+ "Expected ACTIVE or ABORTED transaction - current status [" + status + "]: " + toString)
private def ensureIsActiveOrNew =
if (!(status == TransactionStatus.Active || status == TransactionStatus.New))
- throw new IllegalStateException(
- "Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString)
+ throw new IllegalStateException(
+ "Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString)
// For reinitialize transaction after sending it over the wire
private[akka] def reinit = synchronized {
@@ -277,14 +295,14 @@ object Transaction extends TransactionManagement {
}
override def equals(that: Any): Boolean = synchronized {
- that != null &&
- that.isInstanceOf[Transaction] &&
- that.asInstanceOf[Transaction].id == this.id
+ that != null &&
+ that.isInstanceOf[Transaction] &&
+ that.asInstanceOf[Transaction].id == this.id
}
-
- override def hashCode(): Int = synchronized { id.toInt }
-
- override def toString(): String = synchronized { "Transaction[" + id + ", " + status + "]" }
+
+ override def hashCode(): Int = synchronized {id.toInt}
+
+ override def toString(): String = synchronized {"Transaction[" + id + ", " + status + "]"}
}
/**
diff --git a/akka-core/src/main/scala/stm/TransactionManagement.scala b/akka-core/src/main/scala/stm/TransactionManagement.scala
index 2dd7ed9c79..1f2ede3024 100644
--- a/akka-core/src/main/scala/stm/TransactionManagement.scala
+++ b/akka-core/src/main/scala/stm/TransactionManagement.scala
@@ -26,7 +26,7 @@ object TransactionManagement extends TransactionManagement {
def isTransactionalityEnabled = TRANSACTION_ENABLED.get
def disableTransactions = TRANSACTION_ENABLED.set(false)
- private[akka] val currentTransaction: ThreadLocal[Option[Transaction]] = new ThreadLocal[Option[Transaction]]() {
+ private[akka] val currentTransaction = new ThreadLocal[Option[Transaction]]() {
override protected def initialValue: Option[Transaction] = None
}
}
@@ -52,6 +52,8 @@ trait TransactionManagement extends Logging {
private[akka] def isTransactionInScope = currentTransaction.get.isDefined
+ private[akka] def isTransactionTopLevel = if (isTransactionInScope) getTransactionInScope.isTopLevel
+
private[akka] def incrementTransaction = if (isTransactionInScope) getTransactionInScope.increment
private[akka] def decrementTransaction = if (isTransactionInScope) getTransactionInScope.decrement
diff --git a/akka-core/src/main/scala/stm/TransactionalState.scala b/akka-core/src/main/scala/stm/TransactionalState.scala
index 6003a89f89..195b134271 100644
--- a/akka-core/src/main/scala/stm/TransactionalState.scala
+++ b/akka-core/src/main/scala/stm/TransactionalState.scala
@@ -8,8 +8,7 @@ import se.scalablesolutions.akka.stm.Transaction.atomic
import se.scalablesolutions.akka.stm.NoTransactionInScopeException
import se.scalablesolutions.akka.collection._
import se.scalablesolutions.akka.util.UUID
-
-import org.multiverse.datastructures.refs.manual.Ref;
+import org.multiverse.stms.alpha.AlphaRef
/**
* Example Scala usage:
@@ -78,7 +77,7 @@ class TransactionalRef[T] extends Transactional {
implicit val txInitName = "TransactionalRef:Init"
val uuid = UUID.newUuid.toString
- private[this] val ref: Ref[T] = atomic { new Ref }
+ private[this] val ref: AlphaRef[T] = atomic { new AlphaRef }
def swap(elem: T) = {
ensureIsInTransaction
diff --git a/akka-util-java/pom.xml b/akka-util-java/pom.xml
index 3daa008792..6db055b223 100644
--- a/akka-util-java/pom.xml
+++ b/akka-util-java/pom.xml
@@ -24,34 +24,6 @@
protobuf-java
2.2.0
-