diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala index 9581d46546..16a3cd0e5f 100644 --- a/akka-core/src/main/scala/stm/Transaction.scala +++ b/akka-core/src/main/scala/stm/Transaction.scala @@ -28,7 +28,7 @@ object Transaction { /** * Creates a STM atomic transaction and by-passes all transactions hooks * such as persistence etc. - * + * * Only for internal usage. */ private[akka] def atomic0[T](body: => T): T = new TransactionTemplate[T]() { @@ -37,7 +37,7 @@ object Transaction { /** * Module for "local" transaction management, local in the context of threads. - * You should only use these if you do not need to have one transaction span + * You should only use these if you do not need to have one transaction span * multiple threads (or Actors). *

* Example of atomic transaction management using the atomic block. @@ -50,7 +50,7 @@ object Transaction { * } * * - * Example of atomically-orElse transaction management. + * Example of atomically-orElse transaction management. * Which is a good way to reduce contention and transaction collisions. *

    * import se.scalablesolutions.akka.stm.Transaction.Local._
@@ -66,11 +66,11 @@ object Transaction {
    *
    * 
    * import se.scalablesolutions.akka.stm.Transaction.Local._
-   * for (tx <- Transaction)  {
+   * for (tx <- Transaction.Local)  {
    *   ... // do transactional stuff
    * }
    *
-   * val result = for (tx <- Transaction) yield  {
+   * val result = for (tx <- Transaction.Local) yield  {
    *   ... // do transactional stuff yielding a result
    * }
    * 
@@ -84,14 +84,14 @@ object Transaction { * // You can use them together with Transaction in a for comprehension since * // TransactionalRef is also monadic * for { - * tx <- Transaction + * tx <- Transaction.Local * ref <- refs * } { * ... // use the ref inside a transaction * } * * val result = for { - * tx <- Transaction + * tx <- Transaction.Local * ref <- refs * } yield { * ... // use the ref inside a transaction, yield a result @@ -102,58 +102,58 @@ object Transaction { */ object Local extends TransactionManagement with Logging { - /** - * See ScalaDoc on Transaction class. - */ - def map[T](f: => T): T = atomic {f} + /** + * See ScalaDoc on Transaction.Local class. + */ + def map[T](f: => T): T = atomic {f} - /** - * See ScalaDoc on Transaction class. - */ - def flatMap[T](f: => T): T = atomic {f} + /** + * See ScalaDoc on Transaction.Local class. + */ + def flatMap[T](f: => T): T = atomic {f} - /** - * See ScalaDoc on Transaction class. - */ - def foreach(f: => Unit): Unit = atomic {f} + /** + * See ScalaDoc on Transaction.Local class. + */ + def foreach(f: => Unit): Unit = atomic {f} - /** - * See ScalaDoc on class. - */ - def atomic[T](body: => T): T = { - new TransactionTemplate[T]() { - def execute(mtx: MultiverseTransaction): T = body + /** + * See ScalaDoc on Transaction.Local class. + */ + def atomic[T](body: => T): T = { + new TransactionTemplate[T]() { + def execute(mtx: MultiverseTransaction): T = body - override def onStart(mtx: MultiverseTransaction) = { - val tx = new Transaction - tx.transaction = Some(mtx) - setTransaction(Some(tx)) - mtx.registerLifecycleListener(new TransactionLifecycleListener() { - def notify(tx: MultiverseTransaction, event: TransactionLifecycleEvent) = event.name match { - case "postCommit" => tx.commit - case "postAbort" => tx.abort - case _ => {} - } - }) - } - }.execute() - } + override def onStart(mtx: MultiverseTransaction) = { + val tx = new Transaction + tx.transaction = Some(mtx) + setTransaction(Some(tx)) + mtx.registerLifecycleListener(new TransactionLifecycleListener() { + def notify(tx: MultiverseTransaction, event: TransactionLifecycleEvent) = event.name match { + case "postCommit" => tx.commit + case "postAbort" => tx.abort + case _ => {} + } + }) + } + }.execute() + } - /** - * See ScalaDoc on class. - */ - def atomically[A](firstBody: => A) = elseBody(firstBody) + /** + * See ScalaDoc on Transaction.Local class. + */ + def atomically[A](firstBody: => A) = elseBody(firstBody) - /** - * Should only be used together with atomically to form atomically-orElse constructs. - * See ScalaDoc on class. - */ - def elseBody[A](firstBody: => A) = new { - def orElse(secondBody: => A) = new OrElseTemplate[A] { - def run(t: MultiverseTransaction) = firstBody - def orelserun(t: MultiverseTransaction) = secondBody - }.execute() - } + /** + * Should only be used together with atomically to form atomically-orElse constructs. + * See ScalaDoc on class. + */ + def elseBody[A](firstBody: => A) = new { + def orElse(secondBody: => A) = new OrElseTemplate[A] { + def run(t: MultiverseTransaction) = firstBody + def orelserun(t: MultiverseTransaction) = secondBody + }.execute() + } } /** @@ -162,7 +162,7 @@ object Transaction { *

* Example of atomic transaction management using the atomic block. *

- * Here are some examples (assuming implicit transaction family name in scope): + * Here are some examples (assuming implicit transaction family name in scope): *

    * import se.scalablesolutions.akka.stm.Transaction.Global._
    *
@@ -174,12 +174,12 @@ object Transaction {
    * Example of atomic transaction management using for comprehensions (monadic):
    *
    * 
-   * import se.scalablesolutions.akka.stm.Transaction.Global_
-   * for (tx <- Transaction)  {
+   * import se.scalablesolutions.akka.stm.Transaction
+   * for (tx <- Transaction.Global)  {
    *   ... // do transactional stuff
    * }
    *
-   * val result = for (tx <- Transaction) yield  {
+   * val result = for (tx <- Transaction.Global) yield  {
    *   ... // do transactional stuff yielding a result
    * }
    * 
@@ -193,14 +193,14 @@ object Transaction { * // You can use them together with Transaction in a for comprehension since * // TransactionalRef is also monadic * for { - * tx <- Transaction + * tx <- Transaction.Global * ref <- refs * } { * ... // use the ref inside a transaction * } * * val result = for { - * tx <- Transaction + * tx <- Transaction.Global * ref <- refs * } yield { * ... // use the ref inside a transaction, yield a result @@ -210,64 +210,67 @@ object Transaction { * @author Jonas Bonér */ object Global extends TransactionManagement with Logging { - /** - * See ScalaDoc on Transaction class. - */ - def map[T](f: => T): T = atomic {f} - /** - * See ScalaDoc on Transaction class. - */ - def flatMap[T](f: => T): T = atomic {f} + /** + * See ScalaDoc on Transaction.Global class. + */ + def map[T](f: => T): T = atomic {f} - /** - * See ScalaDoc on Transaction class. - */ - def foreach(f: => Unit): Unit = atomic {f} + /** + * See ScalaDoc on Transaction.Global class. + */ + def flatMap[T](f: => T): T = atomic {f} - /** - * See ScalaDoc on Transaction class. - */ - def atomic[T](body: => T): T = { - var isTopLevelTransaction = false - new TransactionTemplate[T]() { - def execute(mtx: MultiverseTransaction): T = { - val result = body + /** + * See ScalaDoc on Transaction.Global class. + */ + def foreach(f: => Unit): Unit = atomic {f} - val txSet = getTransactionSetInScope - log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet) - txSet.joinCommit(mtx) + /** + * See ScalaDoc on Transaction.Global class. + */ + def atomic[T](body: => T): T = { + var isTopLevelTransaction = false + new TransactionTemplate[T]() { + def execute(mtx: MultiverseTransaction): T = { + val result = body - // FIXME tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS) - //getTransactionSetInScope.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS) + val txSet = getTransactionSetInScope + log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet) + txSet.joinCommit(mtx) - clearTransaction - result - } + // FIXME tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS) + //getTransactionSetInScope.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS) - override def onStart(mtx: MultiverseTransaction) = { - val txSet = - if (!isTransactionSetInScope) { - isTopLevelTransaction = true - createNewTransactionSet - } else getTransactionSetInScope - val tx = new Transaction - tx.transaction = Some(mtx) - setTransaction(Some(tx)) + clearTransaction + result + } - txSet.registerOnCommitTask(new Runnable() { - def run = tx.commit - }) - txSet.registerOnAbortTask(new Runnable() { - def run = tx.abort - }) - } - }.execute() - } + override def onStart(mtx: MultiverseTransaction) = { + val txSet = + if (!isTransactionSetInScope) { + isTopLevelTransaction = true + createNewTransactionSet + } else getTransactionSetInScope + val tx = new Transaction + tx.transaction = Some(mtx) + setTransaction(Some(tx)) + + txSet.registerOnCommitTask(new Runnable() { + def run = tx.commit + }) + txSet.registerOnAbortTask(new Runnable() { + def run = tx.abort + }) + } + }.execute() + } } } /** + * The Akka specific Transaction class, keeping track of persistent data structures (as in on-disc). + * * @author Jonas Bonér */ @serializable class Transaction extends Logging { @@ -282,7 +285,7 @@ object Transaction { // --- public methods --------- def commit = synchronized { - log.trace("Committing transaction %s", toString) + log.trace("Committing transaction %s", toString) Transaction.atomic0 { persistentStateMap.valuesIterator.foreach(_.commit) } @@ -290,7 +293,7 @@ object Transaction { } def abort = synchronized { - log.trace("Aborting transaction %s", toString) + log.trace("Aborting transaction %s", toString) } def isNew = synchronized { status == TransactionStatus.New } @@ -327,7 +330,7 @@ object Transaction { throw new IllegalStateException( "Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString) - // For reinitialize transaction after sending it over the wire + // For reinitialize transaction after sending it over the wire /* private[akka] def reinit = synchronized { import net.lag.logging.{Logger, Level} if (log eq null) { diff --git a/akka-core/src/test/scala/ForwardActor.scala b/akka-core/src/test/scala/ForwardActorSpec.scala similarity index 90% rename from akka-core/src/test/scala/ForwardActor.scala rename to akka-core/src/test/scala/ForwardActorSpec.scala index 0e8f606b27..575bbab7dd 100644 --- a/akka-core/src/test/scala/ForwardActor.scala +++ b/akka-core/src/test/scala/ForwardActorSpec.scala @@ -5,7 +5,7 @@ import org.scalatest.junit.JUnitSuite import org.junit.Test -class ForwardActorTest extends JUnitSuite { +class ForwardActorSpec extends JUnitSuite { object ForwardState { var sender: Actor = null @@ -57,7 +57,7 @@ class ForwardActorTest extends JUnitSuite { def shouldForwardActorReferenceWhenInvokingForwardOnBang = { val senderActor = new BangSenderActor senderActor.start - assert(ForwardState.finished.await(1, TimeUnit.SECONDS)) + assert(ForwardState.finished.await(2, TimeUnit.SECONDS)) assert(ForwardState.sender ne null) assert(senderActor === ForwardState.sender) } @@ -66,6 +66,6 @@ class ForwardActorTest extends JUnitSuite { def shouldForwardActorReferenceWhenInvokingForwardOnBangBang = { val senderActor = new BangBangSenderActor senderActor.start - assert(ForwardState.finished.await(1, TimeUnit.SECONDS)) + assert(ForwardState.finished.await(2, TimeUnit.SECONDS)) } }