Fixed issue with CommitBarrier and its registered callbacks + Added compensating 'barrier.decParties' to each 'barrier.incParties'

This commit is contained in:
Jonas Bonér 2010-05-28 23:50:08 +02:00
parent 89bf596f38
commit c22e564b74
5 changed files with 71 additions and 22 deletions

View file

@ -968,10 +968,12 @@ sealed class LocalActorRef private[akka](
}
private def joinTransaction(message: Any) = if (isTransactionSetInScope) {
// FIXME test to run bench without this trace call
Actor.log.trace("Joining transaction set [%s];\n\tactor %s\n\twith message [%s]",
getTransactionSetInScope, toString, message)
getTransactionSetInScope.incParties
import org.multiverse.api.ThreadLocalTransaction
val txSet = getTransactionSetInScope
Actor.log.trace("Joining transaction set [%s];\n\tactor %s\n\twith message [%s]", txSet, toString, message) // FIXME test to run bench without this trace call
val mtx = ThreadLocalTransaction.getThreadLocalTransaction
if ((mtx eq null) || mtx.getStatus.isDead) txSet.incParties
else txSet.incParties(mtx, 1)
}
/**
@ -1049,7 +1051,9 @@ sealed class LocalActorRef private[akka](
_isBeingRestarted = true
// abort transaction set
if (isTransactionSetInScope) try {
getTransactionSetInScope.abort
val txSet = getTransactionSetInScope
Actor.log.debug("Aborting transaction set [%s]", txSet)
txSet.abort
} catch { case e: IllegalStateException => {} }
Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)

View file

@ -250,6 +250,10 @@ object Transaction {
*/
def foreach(f: => Unit): Unit = atomic {f}
// FIXME tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
//getTransactionSetInScope.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
/**
* See ScalaDoc on Transaction.Global class.
*/
@ -262,10 +266,6 @@ object Transaction {
val txSet = getTransactionSetInScope
log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet)
txSet.joinCommit(mtx)
// FIXME tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
//getTransactionSetInScope.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
clearTransaction
result
}
@ -280,16 +280,15 @@ object Transaction {
tx.begin
tx.transaction = Some(mtx)
setTransaction(Some(tx))
txSet.registerOnCommitTask(new Runnable() {
def run = {
log.trace("=========> Committing transaction [%s]", mtx)
mtx.registerLifecycleListener(new TransactionLifecycleListener() {
def notify(mtx: MultiverseTransaction, event: TransactionLifecycleEvent) = event.name match {
case "postCommit" =>
log.trace("Committing transaction [%s]", mtx)
tx.commit
}
})
txSet.registerOnAbortTask(new Runnable() {
def run = {
log.trace("=========> Aborting transaction [%s]", mtx)
case "postAbort" =>
log.trace("Aborting transaction [%s]", mtx)
tx.abort
case _ => {}
}
})
}

View file

@ -205,7 +205,7 @@ class TransactionalRef[T](initialOpt: Option[T] = None) extends Transactional {
}
private def ensureIsInTransaction =
if (getThreadLocalTransaction eq null) throw new NoTransactionInScopeException
()// if (getThreadLocalTransaction eq null) throw new NoTransactionInScopeException
private def ensureNotNull =
if (ref.isNull) throw new RuntimeException("Cannot alter Ref's value when it is null")

View file

@ -101,6 +101,52 @@ class StmSpec extends
}
}
}
/*
describe("Multiverse API") {
it("should blablabla") {
import org.multiverse.api.programmatic._
// import org.multiverse.api._
import org.multiverse.templates._
import java.util.concurrent.atomic._
import se.scalablesolutions.akka.stm.Ref
import org.multiverse.api.{GlobalStmInstance, ThreadLocalTransaction, Transaction => MultiverseTransaction}
import org.multiverse.api.lifecycle.{TransactionLifecycleListener, TransactionLifecycleEvent}
import org.multiverse.commitbarriers._
def createRef[T]: ProgrammaticReference[T] = GlobalStmInstance
.getGlobalStmInstance
.getProgrammaticReferenceFactoryBuilder
.build
.atomicCreateReference(null.asInstanceOf[T])
val ref1 = Ref(0)//createRef[Int]
val ref2 = Ref(0)//createRef[Int]
val committedCount = new AtomicInteger
val abortedCount = new AtomicInteger
val barrierHolder = new AtomicReference[CountDownCommitBarrier]
val template = new TransactionTemplate[Int]() {
override def onStart(tx: MultiverseTransaction) = barrierHolder.set(new CountDownCommitBarrier(1))
override def execute(tx: MultiverseTransaction): Int = {
ref1.swap(ref1.get.get + 1)
ref2.swap(ref2.get.get + 1)
barrierHolder.get.joinCommit(tx)
null.asInstanceOf[Int]
}
override def onPostCommit = committedCount.incrementAndGet
override def onPostAbort = abortedCount.incrementAndGet
}
template.execute
ref1.get.get should equal(1)
ref2.get.get should equal(1)
committedCount.get should equal(1)
abortedCount.get should equal(2)
}
}
*/
}
object GlobalTransactionVectorTestActor {

View file

@ -47,7 +47,6 @@ class AkkaLoader extends Logging {
private def printBanner = {
log.info(
"""
==================================================
t
t t t
t t tt t
@ -70,6 +69,7 @@ class AkkaLoader extends Logging {
ttt ttt ttt ttt ttt ttt ttt ttt
ttt tt ttt ttt ttt ttt ttt ttt
tttttttt ttt ttt ttt ttt tttttttt
==================================================
""")
log.info(" Running version %s", Config.VERSION)