jta-enabled stm

This commit is contained in:
Jonas Bonér 2010-04-17 19:28:14 +02:00
parent fa50bdae47
commit 908156ea13
4 changed files with 40 additions and 6 deletions

View file

@ -8,9 +8,12 @@ import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.TimeUnit
import javax.transaction.{TransactionManager, UserTransaction, Status}
import scala.collection.mutable.HashMap
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.Config._
import org.multiverse.api.{Transaction => MultiverseTransaction, TransactionLifecycleListener, TransactionLifecycleEvent}
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
@ -272,9 +275,9 @@ object Transaction {
createNewTransactionSet
} else getTransactionSetInScope
val tx = new Transaction
tx.begin
tx.transaction = Some(mtx)
setTransaction(Some(tx))
txSet.registerOnCommitTask(new Runnable() {
def run = tx.commit
})
@ -288,11 +291,20 @@ object Transaction {
}
/**
* The Akka specific Transaction class, keeping track of persistent data structures (as in on-disc).
* The Akka specific Transaction class, keeping track of persistent data structures (as in on-disc)
* and JTA support.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@serializable class Transaction extends Logging {
val JTA_AWARE = config.getBool("akka.stm.jta-aware", false)
val jta: Either[Option[UserTransaction], Option[TransactionManager]] = if (JTA_AWARE) {
JtaTransactionManagerDetector.findUserTransaction match {
case None => Right(JtaTransactionManagerDetector.findTransactionManager)
case tm => Left(tm)
}
} else Left(None)
val id = Transaction.idFactory.incrementAndGet
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
private[akka] var transaction: Option[MultiverseTransaction] = None
@ -303,16 +315,34 @@ object Transaction {
// --- public methods ---------
def begin = synchronized {
jta match {
case Left(Some(userTx)) => if (!isJtaTxActive(userTx.getStatus)) userTx.begin
case Right(Some(txMan)) => if (!isJtaTxActive(txMan.getStatus)) txMan.begin
case _ => {} // do nothing
}
}
def commit = synchronized {
log.trace("Committing transaction %s", toString)
Transaction.atomic0 {
persistentStateMap.valuesIterator.foreach(_.commit)
}
status = TransactionStatus.Completed
jta match {
case Left(Some(userTx)) => if (isJtaTxActive(userTx.getStatus)) userTx.commit
case Right(Some(txMan)) => if (isJtaTxActive(txMan.getStatus)) txMan.commit
case _ => {} // do nothing
}
}
def abort = synchronized {
log.trace("Aborting transaction %s", toString)
jta match {
case Left(Some(userTx)) => if (isJtaTxActive(userTx.getStatus)) userTx.rollback
case Right(Some(txMan)) => if (isJtaTxActive(txMan.getStatus)) txMan.rollback
case _ => {} // do nothing
}
}
def isNew = synchronized { status == TransactionStatus.New }
@ -325,6 +355,8 @@ object Transaction {
// --- internal methods ---------
private def isJtaTxActive(status: Int) = status == Status.STATUS_ACTIVE
private[akka] def status_? = status
private[akka] def increment = depth.incrementAndGet