diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index efd635c21a..f466333388 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -8,7 +8,7 @@ import se.scalablesolutions.akka.dispatch._ import se.scalablesolutions.akka.config.Config._ import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy} import se.scalablesolutions.akka.config.ScalaConfig._ -import se.scalablesolutions.akka.stm.Transaction._ +import se.scalablesolutions.akka.stm.Transaction.Global._ import se.scalablesolutions.akka.stm.TransactionManagement._ import se.scalablesolutions.akka.stm.TransactionManagement import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala index 29d4c586e9..16a3cd0e5f 100644 --- a/akka-core/src/main/scala/stm/Transaction.scala +++ b/akka-core/src/main/scala/stm/Transaction.scala @@ -12,7 +12,7 @@ import scala.collection.mutable.HashMap import se.scalablesolutions.akka.util.Logging -import org.multiverse.api.{Transaction => MultiverseTransaction} +import org.multiverse.api.{Transaction => MultiverseTransaction, TransactionLifecycleListener, TransactionLifecycleEvent} import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance import org.multiverse.api.ThreadLocalTransaction._ import org.multiverse.templates.{TransactionTemplate, OrElseTemplate} @@ -22,172 +22,258 @@ import org.multiverse.stms.alpha.AlphaStm class NoTransactionInScopeException extends RuntimeException class TransactionRetryException(message: String) extends RuntimeException(message) -/** - * Example of atomic transaction management using the atomic block. - * These blocks takes an implicit argument String defining the transaction family name. - * If these blocks are used from within an Actor then the name is automatically resolved, if not either: - * 1. define an implicit String with the name in the same scope - * 2. pass in the name explicitly - * - * Here are some examples (assuming implicit transaction family name in scope): - *
- * import se.scalablesolutions.akka.stm.Transaction._
- *
- * atomic {
- * .. // do something within a transaction
- * }
- *
- *
- * Example of atomic transaction management using atomic block with retry count:
- *
- * import se.scalablesolutions.akka.stm.Transaction._
- *
- * atomic(maxNrOfRetries) {
- * .. // do something within a transaction
- * }
- *
- *
- * Example of atomically-orElse transaction management.
- * Which is a good way to reduce contention and transaction collisions.
- *
- * import se.scalablesolutions.akka.stm.Transaction._
- *
- * atomically {
- * .. // try to do something
- * } orElse {
- * .. // if transaction clashes try do do something else to minimize contention
- * }
- *
- *
- * Example of atomic transaction management using for comprehensions (monadic):
- *
- *
- * import se.scalablesolutions.akka.stm.Transaction._
- * for (tx <- Transaction) {
- * ... // do transactional stuff
- * }
- *
- * val result = for (tx <- Transaction) yield {
- * ... // do transactional stuff yielding a result
- * }
- *
- *
- * Example of using Transaction and TransactionalRef in for comprehensions (monadic):
- *
- *
- * // For example, if you have a List with TransactionalRef
- * val refs: List[TransactionalRef] = ...
- *
- * // You can use them together with Transaction in a for comprehension since
- * // TransactionalRef is also monadic
- * for {
- * tx <- Transaction
- * ref <- refs
- * } {
- * ... // use the ref inside a transaction
- * }
- *
- * val result = for {
- * tx <- Transaction
- * ref <- refs
- * } yield {
- * ... // use the ref inside a transaction, yield a result
- * }
- *
- *
- * @author Jonas Bonér
- */
-object Transaction extends TransactionManagement with Logging {
+object Transaction {
val idFactory = new AtomicLong(-1L)
- /**
- * See ScalaDoc on Transaction class.
- */
- def map[T](f: => T): T = atomic {f}
-
- /**
- * See ScalaDoc on Transaction class.
- */
- def flatMap[T](f: => T): T = atomic {f}
-
- /**
- * See ScalaDoc on Transaction class.
- */
- def foreach(f: => Unit): Unit = atomic {f}
-
- /**
- * See ScalaDoc on Transaction class.
- */
- def atomic[T](body: => T): T = {
- var isTopLevelTransaction = true
- new TransactionTemplate[T]() {
- def execute(mtx: MultiverseTransaction): T = {
- val result = body
-
- val txSet = getTransactionSetInScope
- log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet)
- txSet.joinCommit(mtx)
-
- // FIXME tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
- //getTransactionSetInScope.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
-
- clearTransaction
- result
- }
-
- override def onStart(mtx: MultiverseTransaction) = {
- val txSet =
- if (!isTransactionSetInScope) {
- isTopLevelTransaction = true
- createNewTransactionSet
- } else getTransactionSetInScope
- val tx = new Transaction
- tx.transaction = Some(mtx)
- setTransaction(Some(tx))
-
- txSet.registerOnCommitTask(new Runnable() {
- def run = tx.commit
- })
- txSet.registerOnAbortTask(new Runnable() {
- def run = tx.abort
- })
- }
- }.execute()
- }
-
- /**
- * See ScalaDoc on class.
- */
- def atomically[A](firstBody: => A) = elseBody(firstBody)
-
- /**
- * Should only be used together with atomically to form atomically-orElse constructs.
- * See ScalaDoc on class.
- */
- def elseBody[A](firstBody: => A) = new {
- def orElse(secondBody: => A) = new OrElseTemplate[A] {
- def run(t: MultiverseTransaction) = firstBody
- def orelserun(t: MultiverseTransaction) = secondBody
- }.execute()
- }
-
/**
* Creates a STM atomic transaction and by-passes all transactions hooks
* such as persistence etc.
- *
+ *
* Only for internal usage.
*/
private[akka] def atomic0[T](body: => T): T = new TransactionTemplate[T]() {
def execute(mtx: MultiverseTransaction): T = body
}.execute()
+
+ /**
+ * Module for "local" transaction management, local in the context of threads.
+ * You should only use these if you do not need to have one transaction span
+ * multiple threads (or Actors).
+ *
+ * Example of atomic transaction management using the atomic block.
+ *
+ *
+ * import se.scalablesolutions.akka.stm.Transaction.Local._
+ *
+ * atomic {
+ * .. // do something within a transaction
+ * }
+ *
+ *
+ * Example of atomically-orElse transaction management.
+ * Which is a good way to reduce contention and transaction collisions.
+ *
+ * import se.scalablesolutions.akka.stm.Transaction.Local._
+ *
+ * atomically {
+ * .. // try to do something
+ * } orElse {
+ * .. // if transaction clashes try do do something else to minimize contention
+ * }
+ *
+ *
+ * Example of atomic transaction management using for comprehensions (monadic):
+ *
+ *
+ * import se.scalablesolutions.akka.stm.Transaction.Local._
+ * for (tx <- Transaction.Local) {
+ * ... // do transactional stuff
+ * }
+ *
+ * val result = for (tx <- Transaction.Local) yield {
+ * ... // do transactional stuff yielding a result
+ * }
+ *
+ *
+ * Example of using Transaction and TransactionalRef in for comprehensions (monadic):
+ *
+ *
+ * // For example, if you have a List with TransactionalRef
+ * val refs: List[TransactionalRef] = ...
+ *
+ * // You can use them together with Transaction in a for comprehension since
+ * // TransactionalRef is also monadic
+ * for {
+ * tx <- Transaction.Local
+ * ref <- refs
+ * } {
+ * ... // use the ref inside a transaction
+ * }
+ *
+ * val result = for {
+ * tx <- Transaction.Local
+ * ref <- refs
+ * } yield {
+ * ... // use the ref inside a transaction, yield a result
+ * }
+ *
+ *
+ * @author Jonas Bonér
+ */
+ object Local extends TransactionManagement with Logging {
+
+ /**
+ * See ScalaDoc on Transaction.Local class.
+ */
+ def map[T](f: => T): T = atomic {f}
+
+ /**
+ * See ScalaDoc on Transaction.Local class.
+ */
+ def flatMap[T](f: => T): T = atomic {f}
+
+ /**
+ * See ScalaDoc on Transaction.Local class.
+ */
+ def foreach(f: => Unit): Unit = atomic {f}
+
+ /**
+ * See ScalaDoc on Transaction.Local class.
+ */
+ def atomic[T](body: => T): T = {
+ new TransactionTemplate[T]() {
+ def execute(mtx: MultiverseTransaction): T = body
+
+ override def onStart(mtx: MultiverseTransaction) = {
+ val tx = new Transaction
+ tx.transaction = Some(mtx)
+ setTransaction(Some(tx))
+ mtx.registerLifecycleListener(new TransactionLifecycleListener() {
+ def notify(tx: MultiverseTransaction, event: TransactionLifecycleEvent) = event.name match {
+ case "postCommit" => tx.commit
+ case "postAbort" => tx.abort
+ case _ => {}
+ }
+ })
+ }
+ }.execute()
+ }
+
+ /**
+ * See ScalaDoc on Transaction.Local class.
+ */
+ def atomically[A](firstBody: => A) = elseBody(firstBody)
+
+ /**
+ * Should only be used together with atomically to form atomically-orElse constructs.
+ * See ScalaDoc on class.
+ */
+ def elseBody[A](firstBody: => A) = new {
+ def orElse(secondBody: => A) = new OrElseTemplate[A] {
+ def run(t: MultiverseTransaction) = firstBody
+ def orelserun(t: MultiverseTransaction) = secondBody
+ }.execute()
+ }
+ }
+
+ /**
+ * Module for "global" transaction management, global in the context of multiple threads.
+ * You have to use these if you do need to have one transaction span multiple threads (or Actors).
+ *
+ * Example of atomic transaction management using the atomic block.
+ *
+ * Here are some examples (assuming implicit transaction family name in scope):
+ *
+ * import se.scalablesolutions.akka.stm.Transaction.Global._
+ *
+ * atomic {
+ * .. // do something within a transaction
+ * }
+ *
+ *
+ * Example of atomic transaction management using for comprehensions (monadic):
+ *
+ *
+ * import se.scalablesolutions.akka.stm.Transaction
+ * for (tx <- Transaction.Global) {
+ * ... // do transactional stuff
+ * }
+ *
+ * val result = for (tx <- Transaction.Global) yield {
+ * ... // do transactional stuff yielding a result
+ * }
+ *
+ *
+ * Example of using Transaction and TransactionalRef in for comprehensions (monadic):
+ *
+ *
+ * // For example, if you have a List with TransactionalRef
+ * val refs: List[TransactionalRef] = ...
+ *
+ * // You can use them together with Transaction in a for comprehension since
+ * // TransactionalRef is also monadic
+ * for {
+ * tx <- Transaction.Global
+ * ref <- refs
+ * } {
+ * ... // use the ref inside a transaction
+ * }
+ *
+ * val result = for {
+ * tx <- Transaction.Global
+ * ref <- refs
+ * } yield {
+ * ... // use the ref inside a transaction, yield a result
+ * }
+ *
+ *
+ * @author Jonas Bonér
+ */
+ object Global extends TransactionManagement with Logging {
+
+ /**
+ * See ScalaDoc on Transaction.Global class.
+ */
+ def map[T](f: => T): T = atomic {f}
+
+ /**
+ * See ScalaDoc on Transaction.Global class.
+ */
+ def flatMap[T](f: => T): T = atomic {f}
+
+ /**
+ * See ScalaDoc on Transaction.Global class.
+ */
+ def foreach(f: => Unit): Unit = atomic {f}
+
+ /**
+ * See ScalaDoc on Transaction.Global class.
+ */
+ def atomic[T](body: => T): T = {
+ var isTopLevelTransaction = false
+ new TransactionTemplate[T]() {
+ def execute(mtx: MultiverseTransaction): T = {
+ val result = body
+
+ val txSet = getTransactionSetInScope
+ log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet)
+ txSet.joinCommit(mtx)
+
+ // FIXME tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
+ //getTransactionSetInScope.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
+
+ clearTransaction
+ result
+ }
+
+ override def onStart(mtx: MultiverseTransaction) = {
+ val txSet =
+ if (!isTransactionSetInScope) {
+ isTopLevelTransaction = true
+ createNewTransactionSet
+ } else getTransactionSetInScope
+ val tx = new Transaction
+ tx.transaction = Some(mtx)
+ setTransaction(Some(tx))
+
+ txSet.registerOnCommitTask(new Runnable() {
+ def run = tx.commit
+ })
+ txSet.registerOnAbortTask(new Runnable() {
+ def run = tx.abort
+ })
+ }
+ }.execute()
+ }
+ }
}
/**
+ * The Akka specific Transaction class, keeping track of persistent data structures (as in on-disc).
+ *
* @author Jonas Bonér
*/
@serializable class Transaction extends Logging {
- import Transaction._
-
val id = Transaction.idFactory.incrementAndGet
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
private[akka] var transaction: Option[MultiverseTransaction] = None
@@ -199,15 +285,15 @@ object Transaction extends TransactionManagement with Logging {
// --- public methods ---------
def commit = synchronized {
- log.trace("Committing transaction %s", toString)
- atomic0 {
+ log.trace("Committing transaction %s", toString)
+ Transaction.atomic0 {
persistentStateMap.valuesIterator.foreach(_.commit)
}
status = TransactionStatus.Completed
}
def abort = synchronized {
- log.trace("Aborting transaction %s", toString)
+ log.trace("Aborting transaction %s", toString)
}
def isNew = synchronized { status == TransactionStatus.New }
@@ -244,7 +330,7 @@ object Transaction extends TransactionManagement with Logging {
throw new IllegalStateException(
"Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString)
- // For reinitialize transaction after sending it over the wire
+ // For reinitialize transaction after sending it over the wire
/* private[akka] def reinit = synchronized {
import net.lag.logging.{Logger, Level}
if (log eq null) {
diff --git a/akka-core/src/main/scala/stm/TransactionManagement.scala b/akka-core/src/main/scala/stm/TransactionManagement.scala
index 48c8c7dd95..401c2379ee 100644
--- a/akka-core/src/main/scala/stm/TransactionManagement.scala
+++ b/akka-core/src/main/scala/stm/TransactionManagement.scala
@@ -4,6 +4,8 @@
package se.scalablesolutions.akka.stm
+import se.scalablesolutions.akka.util.Logging
+
import java.util.concurrent.atomic.AtomicBoolean
import org.multiverse.api.ThreadLocalTransaction._
@@ -49,9 +51,10 @@ object TransactionManagement extends TransactionManagement {
}
}
-trait TransactionManagement {
+trait TransactionManagement extends Logging {
private[akka] def createNewTransactionSet: CountDownCommitBarrier = {
+ log.trace("Creating new transaction set")
val txSet = new CountDownCommitBarrier(1, TransactionManagement.FAIR_TRANSACTIONS)
TransactionManagement.transactionSet.set(Some(txSet))
txSet
@@ -63,9 +66,13 @@ trait TransactionManagement {
private[akka] def setTransaction(tx: Option[Transaction]) =
if (tx.isDefined) TransactionManagement.transaction.set(tx)
- private[akka] def clearTransactionSet = TransactionManagement.transactionSet.set(None)
+ private[akka] def clearTransactionSet = {
+ log.trace("Clearing transaction set")
+ TransactionManagement.transactionSet.set(None)
+ }
private[akka] def clearTransaction = {
+ log.trace("Clearing transaction")
TransactionManagement.transaction.set(None)
setThreadLocalTransaction(null)
}
diff --git a/akka-core/src/main/scala/stm/TransactionalState.scala b/akka-core/src/main/scala/stm/TransactionalState.scala
index a8b5da83b9..7afb3fb6bb 100644
--- a/akka-core/src/main/scala/stm/TransactionalState.scala
+++ b/akka-core/src/main/scala/stm/TransactionalState.scala
@@ -4,7 +4,6 @@
package se.scalablesolutions.akka.stm
-import se.scalablesolutions.akka.stm.Transaction.atomic
import se.scalablesolutions.akka.util.UUID
import org.multiverse.stms.alpha.AlphaRef
diff --git a/akka-core/src/test/scala/AgentTest.scala b/akka-core/src/test/scala/AgentTest.scala
index 01818e1938..b378b67207 100644
--- a/akka-core/src/test/scala/AgentTest.scala
+++ b/akka-core/src/test/scala/AgentTest.scala
@@ -2,7 +2,7 @@ package se.scalablesolutions.akka.actor
import _root_.java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.actor.Actor.transactor
-import se.scalablesolutions.akka.stm.Transaction.atomic
+import se.scalablesolutions.akka.stm.Transaction.Global.atomic
import se.scalablesolutions.akka.util.Logging
import org.scalatest.Suite
diff --git a/akka-core/src/test/scala/ForwardActor.scala b/akka-core/src/test/scala/ForwardActorSpec.scala
similarity index 90%
rename from akka-core/src/test/scala/ForwardActor.scala
rename to akka-core/src/test/scala/ForwardActorSpec.scala
index 0e8f606b27..575bbab7dd 100644
--- a/akka-core/src/test/scala/ForwardActor.scala
+++ b/akka-core/src/test/scala/ForwardActorSpec.scala
@@ -5,7 +5,7 @@ import org.scalatest.junit.JUnitSuite
import org.junit.Test
-class ForwardActorTest extends JUnitSuite {
+class ForwardActorSpec extends JUnitSuite {
object ForwardState {
var sender: Actor = null
@@ -57,7 +57,7 @@ class ForwardActorTest extends JUnitSuite {
def shouldForwardActorReferenceWhenInvokingForwardOnBang = {
val senderActor = new BangSenderActor
senderActor.start
- assert(ForwardState.finished.await(1, TimeUnit.SECONDS))
+ assert(ForwardState.finished.await(2, TimeUnit.SECONDS))
assert(ForwardState.sender ne null)
assert(senderActor === ForwardState.sender)
}
@@ -66,6 +66,6 @@ class ForwardActorTest extends JUnitSuite {
def shouldForwardActorReferenceWhenInvokingForwardOnBangBang = {
val senderActor = new BangBangSenderActor
senderActor.start
- assert(ForwardState.finished.await(1, TimeUnit.SECONDS))
+ assert(ForwardState.finished.await(2, TimeUnit.SECONDS))
}
}
diff --git a/akka-core/src/test/scala/StmSpec.scala b/akka-core/src/test/scala/StmSpec.scala
new file mode 100644
index 0000000000..121407426a
--- /dev/null
+++ b/akka-core/src/test/scala/StmSpec.scala
@@ -0,0 +1,81 @@
+package se.scalablesolutions.akka.actor
+
+import se.scalablesolutions.akka.stm.Transaction.Local._
+import se.scalablesolutions.akka.stm._
+
+import org.scalatest.Spec
+import org.scalatest.Assertions
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
+
+@RunWith(classOf[JUnitRunner])
+class StmSpec extends
+ Spec with
+ ShouldMatchers with
+ BeforeAndAfterAll {
+
+ describe("STM outside actors") {
+ it("should be able to do multiple consecutive atomic {..} statements") {
+
+ lazy val ref = TransactionalState.newRef[Int]
+
+ def increment = atomic {
+ ref.swap(ref.get.getOrElse(0) + 1)
+ }
+
+ def total: Int = atomic {
+ ref.get.getOrElse(0)
+ }
+
+ increment
+ increment
+ increment
+ total should equal(3)
+ }
+
+ it("should be able to do nested atomic {..} statements") {
+
+ lazy val ref = TransactionalState.newRef[Int]
+
+ def increment = atomic {
+ ref.swap(ref.get.getOrElse(0) + 1)
+ }
+ def total: Int = atomic {
+ ref.get.getOrElse(0)
+ }
+
+ atomic {
+ increment
+ increment
+ }
+ atomic {
+ increment
+ total should equal(3)
+ }
+ }
+
+ it("should roll back failing nested atomic {..} statements") {
+
+ lazy val ref = TransactionalState.newRef[Int]
+
+ def increment = atomic {
+ ref.swap(ref.get.getOrElse(0) + 1)
+ }
+ def total: Int = atomic {
+ ref.get.getOrElse(0)
+ }
+ try {
+ atomic {
+ increment
+ increment
+ throw new Exception
+ }
+ } catch {
+ case e => {}
+ }
+ total should equal(0)
+ }
+ }
+}
diff --git a/akka-kernel/src/main/scala/Kernel.scala b/akka-kernel/src/main/scala/Kernel.scala
index cd343ebaca..9243c8486f 100644
--- a/akka-kernel/src/main/scala/Kernel.scala
+++ b/akka-kernel/src/main/scala/Kernel.scala
@@ -35,4 +35,4 @@ object Kernel extends AkkaLoader {
case x: BootableRemoteActorService => x.startRemoteService
case _ =>
})
-}
\ No newline at end of file
+}
diff --git a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala
index 6afd7baaf2..6d4bf9679e 100644
--- a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala
+++ b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala
@@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB