diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 42745c733f..ec96b87bf2 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -968,10 +968,12 @@ sealed class LocalActorRef private[akka]( } private def joinTransaction(message: Any) = if (isTransactionSetInScope) { - // FIXME test to run bench without this trace call - Actor.log.trace("Joining transaction set [%s];\n\tactor %s\n\twith message [%s]", - getTransactionSetInScope, toString, message) - getTransactionSetInScope.incParties + import org.multiverse.api.ThreadLocalTransaction + val txSet = getTransactionSetInScope + Actor.log.trace("Joining transaction set [%s];\n\tactor %s\n\twith message [%s]", txSet, toString, message) // FIXME test to run bench without this trace call + val mtx = ThreadLocalTransaction.getThreadLocalTransaction + if ((mtx eq null) || mtx.getStatus.isDead) txSet.incParties + else txSet.incParties(mtx, 1) } /** @@ -1049,7 +1051,9 @@ sealed class LocalActorRef private[akka]( _isBeingRestarted = true // abort transaction set if (isTransactionSetInScope) try { - getTransactionSetInScope.abort + val txSet = getTransactionSetInScope + Actor.log.debug("Aborting transaction set [%s]", txSet) + txSet.abort } catch { case e: IllegalStateException => {} } Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message) diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala index 787896682d..dfd6c53fdf 100644 --- a/akka-core/src/main/scala/stm/Transaction.scala +++ b/akka-core/src/main/scala/stm/Transaction.scala @@ -250,6 +250,10 @@ object Transaction { */ def foreach(f: => Unit): Unit = atomic {f} + +// FIXME tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS) +//getTransactionSetInScope.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS) + /** * See ScalaDoc on Transaction.Global class. */ @@ -262,10 +266,6 @@ object Transaction { val txSet = getTransactionSetInScope log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet) txSet.joinCommit(mtx) - - // FIXME tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS) - //getTransactionSetInScope.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS) - clearTransaction result } @@ -280,16 +280,15 @@ object Transaction { tx.begin tx.transaction = Some(mtx) setTransaction(Some(tx)) - txSet.registerOnCommitTask(new Runnable() { - def run = { - log.trace("=========> Committing transaction [%s]", mtx) - tx.commit - } - }) - txSet.registerOnAbortTask(new Runnable() { - def run = { - log.trace("=========> Aborting transaction [%s]", mtx) - tx.abort + mtx.registerLifecycleListener(new TransactionLifecycleListener() { + def notify(mtx: MultiverseTransaction, event: TransactionLifecycleEvent) = event.name match { + case "postCommit" => + log.trace("Committing transaction [%s]", mtx) + tx.commit + case "postAbort" => + log.trace("Aborting transaction [%s]", mtx) + tx.abort + case _ => {} } }) } diff --git a/akka-core/src/main/scala/stm/TransactionalState.scala b/akka-core/src/main/scala/stm/TransactionalState.scala index 2575194090..e3e3f4ac7f 100644 --- a/akka-core/src/main/scala/stm/TransactionalState.scala +++ b/akka-core/src/main/scala/stm/TransactionalState.scala @@ -205,7 +205,7 @@ class TransactionalRef[T](initialOpt: Option[T] = None) extends Transactional { } private def ensureIsInTransaction = - if (getThreadLocalTransaction eq null) throw new NoTransactionInScopeException + ()// if (getThreadLocalTransaction eq null) throw new NoTransactionInScopeException private def ensureNotNull = if (ref.isNull) throw new RuntimeException("Cannot alter Ref's value when it is null") diff --git a/akka-core/src/test/scala/StmSpec.scala b/akka-core/src/test/scala/StmSpec.scala index 28ec49a338..401545d50c 100644 --- a/akka-core/src/test/scala/StmSpec.scala +++ b/akka-core/src/test/scala/StmSpec.scala @@ -101,6 +101,52 @@ class StmSpec extends } } } + /* + describe("Multiverse API") { + it("should blablabla") { + + import org.multiverse.api.programmatic._ +// import org.multiverse.api._ + import org.multiverse.templates._ + import java.util.concurrent.atomic._ + import se.scalablesolutions.akka.stm.Ref + import org.multiverse.api.{GlobalStmInstance, ThreadLocalTransaction, Transaction => MultiverseTransaction} + import org.multiverse.api.lifecycle.{TransactionLifecycleListener, TransactionLifecycleEvent} + import org.multiverse.commitbarriers._ + + def createRef[T]: ProgrammaticReference[T] = GlobalStmInstance + .getGlobalStmInstance + .getProgrammaticReferenceFactoryBuilder + .build + .atomicCreateReference(null.asInstanceOf[T]) + + val ref1 = Ref(0)//createRef[Int] + val ref2 = Ref(0)//createRef[Int] + + val committedCount = new AtomicInteger + val abortedCount = new AtomicInteger + val barrierHolder = new AtomicReference[CountDownCommitBarrier] + + val template = new TransactionTemplate[Int]() { + override def onStart(tx: MultiverseTransaction) = barrierHolder.set(new CountDownCommitBarrier(1)) + override def execute(tx: MultiverseTransaction): Int = { + ref1.swap(ref1.get.get + 1) + ref2.swap(ref2.get.get + 1) + barrierHolder.get.joinCommit(tx) + null.asInstanceOf[Int] + } + override def onPostCommit = committedCount.incrementAndGet + override def onPostAbort = abortedCount.incrementAndGet + } + template.execute + + ref1.get.get should equal(1) + ref2.get.get should equal(1) + committedCount.get should equal(1) + abortedCount.get should equal(2) + } + } + */ } object GlobalTransactionVectorTestActor { diff --git a/akka-http/src/main/scala/AkkaLoader.scala b/akka-http/src/main/scala/AkkaLoader.scala index 8513f5ea8c..431758e2c7 100644 --- a/akka-http/src/main/scala/AkkaLoader.scala +++ b/akka-http/src/main/scala/AkkaLoader.scala @@ -47,7 +47,6 @@ class AkkaLoader extends Logging { private def printBanner = { log.info( """ -================================================== t t t t t t tt t @@ -70,9 +69,10 @@ class AkkaLoader extends Logging { ttt ttt ttt ttt ttt ttt ttt ttt ttt tt ttt ttt ttt ttt ttt ttt tttttttt ttt ttt ttt ttt tttttttt + ================================================== """) - log.info(" Running version %s", Config.VERSION) + log.info(" Running version %s", Config.VERSION) log.info("==================================================") } }