Added STM Synchronization registration to JNDI TransactionSynchronizationRegistry

This commit is contained in:
Jonas Bonér 2010-04-20 14:53:45 +02:00
parent 251899dfde
commit fec67a8fc1
2 changed files with 14 additions and 159 deletions

View file

@ -8,7 +8,7 @@ import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.TimeUnit
import javax.transaction.{TransactionManager, UserTransaction, Status}
import javax.transaction.{TransactionManager, UserTransaction, Status, TransactionSynchronizationRegistry}
import scala.collection.mutable.HashMap
@ -305,7 +305,7 @@ object Transaction {
private[this] val persistentStateMap = new HashMap[String, Committable]
private[akka] val depth = new AtomicInteger(0)
val tc: Option[TransactionContainer] =
val jta: Option[TransactionContainer] =
if (JTA_AWARE) Some(TransactionContainer())
else None
@ -314,7 +314,16 @@ object Transaction {
// --- public methods ---------
def begin = synchronized {
tc.foreach(_.begin)
jta.foreach { txContainer =>
txContainer.begin
TransactionContainer.findSynchronizationRegistry match {
case Some(registry) =>
registry.asInstanceOf[TransactionSynchronizationRegistry].registerInterposedSynchronization(
new StmSynchronization(txContainer, this))
case None =>
log.warning("Cannot find TransactionSynchronizationRegistry in JNDI, can't register STM synchronization")
}
}
}
def commit = synchronized {
@ -323,12 +332,12 @@ object Transaction {
persistentStateMap.valuesIterator.foreach(_.commit)
}
status = TransactionStatus.Completed
tc.foreach(_.commit)
jta.foreach(_.commit)
}
def abort = synchronized {
log.trace("Aborting transaction %s", toString)
tc.foreach(_.rollback)
jta.foreach(_.rollback)
}
def isNew = synchronized { status == TransactionStatus.New }

View file

@ -1,154 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
import javax.transaction.{TransactionManager, UserTransaction, Transaction => JtaTransaction, SystemException, Status}
import javax.naming.{InitialContext, Context, NamingException}
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.util.Logging
/**
* JTA transaction container holding either a UserTransaction or a TransactionManager.
* <p/>
* The TransactionContainer is created using the factory <tt>val container = TransactionContainer()</tt>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class TransactionContainer private (val tm: Either[Option[UserTransaction], Option[TransactionManager]]) {
def begin = tm match {
case Left(Some(userTx)) => userTx.begin
case Right(Some(txMan)) => txMan.begin
case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope")
}
def commit = tm match {
case Left(Some(userTx)) => userTx.commit
case Right(Some(txMan)) => txMan.commit
case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope")
}
def rollback = tm match {
case Left(Some(userTx)) => userTx.rollback
case Right(Some(txMan)) => txMan.rollback
case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope")
}
def getStatus = tm match {
case Left(Some(userTx)) => userTx.getStatus
case Right(Some(txMan)) => txMan.getStatus
case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope")
}
def isInExistingTransaction = tm match {
case Left(Some(userTx)) => userTx.getStatus == Status.STATUS_ACTIVE
case Right(Some(txMan)) => txMan.getStatus == Status.STATUS_ACTIVE
case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope")
}
def isRollbackOnly = tm match {
case Left(Some(userTx)) => userTx.getStatus == Status.STATUS_MARKED_ROLLBACK
case Right(Some(txMan)) => txMan.getStatus == Status.STATUS_MARKED_ROLLBACK
case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope")
}
def setRollbackOnly = tm match {
case Left(Some(userTx)) => userTx.setRollbackOnly
case Right(Some(txMan)) => txMan.setRollbackOnly
case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope")
}
def suspend = tm match {
case Right(Some(txMan)) => txMan.suspend
case _ => throw new IllegalStateException("Does not have a TransactionManager in scope")
}
def resume(tx: JtaTransaction) = tm match {
case Right(Some(txMan)) => txMan.resume(tx)
case _ => throw new IllegalStateException("Does not have a TransactionManager in scope")
}
}
/**
* Detects if there is a UserTransaction or TransactionManager available in the JNDI.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object TransactionContainer extends Logging {
val AKKA_JTA_TRANSACTION_SERVICE_CLASS = "se.scalablesolutions.akka.jta.AtomikosTransactionService"
val DEFAULT_USER_TRANSACTION_NAME = "java:comp/UserTransaction"
val FALLBACK_TRANSACTION_MANAGER_NAMES = "java:comp/TransactionManager" ::
"java:appserver/TransactionManager" ::
"java:pm/TransactionManager" ::
"java:/TransactionManager" :: Nil
val DEFAULT_TRANSACTION_SYNCHRONIZATION_REGISTRY_NAME = "java:comp/TransactionSynchronizationRegistry"
val TRANSACTION_SYNCHRONIZATION_REGISTRY_CLASS_NAME = "javax.transaction.TransactionSynchronizationRegistry"
val JTA_PROVIDER = config.getString("akka.jta.provider", "from-jndi")
def apply(tm: Either[Option[UserTransaction], Option[TransactionManager]]) = new TransactionContainer(tm)
def apply(): TransactionContainer =
JTA_PROVIDER match {
case "from-jndi" =>
new TransactionContainer(findUserTransaction match {
case None => Right(findTransactionManager)
case tm => Left(tm)
})
case "atomikos" =>
try {
Class.forName(AKKA_JTA_TRANSACTION_SERVICE_CLASS)
.newInstance.asInstanceOf[TransactionService]
.transactionContainer
} catch {
case e: ClassNotFoundException =>
throw new IllegalStateException(
"JTA provider defined as 'atomikos', but the AtomikosTransactionService classes can not be found." +
"\n\tPlease make sure you have 'akka-jta' JAR and its dependencies on your classpath.")
}
case _ =>
throw new IllegalStateException(
"No UserTransaction on TransactionManager could be found in scope." +
"\n\tEither add 'akka-jta' to the classpath or make sure there is a" +
"\n\tTransactionManager or UserTransaction defined in the JNDI.")
}
def findUserTransaction: Option[UserTransaction] = {
val located = createInitialContext.lookup(DEFAULT_USER_TRANSACTION_NAME)
if (located eq null) None
else {
log.info("JTA UserTransaction detected [%s]", located)
Some(located.asInstanceOf[UserTransaction])
}
}
def findTransactionManager: Option[TransactionManager] = {
val context = createInitialContext
val tms = for {
name <- FALLBACK_TRANSACTION_MANAGER_NAMES
tm = context.lookup(name)
if tm ne null
} yield tm
tms match {
case Nil => None
case tm :: _ =>
log.info("JTA TransactionManager detected [%s]", tm)
Some(tm.asInstanceOf[TransactionManager])
}
}
private def createInitialContext = new InitialContext(new java.util.Hashtable)
}
/**
* JTA Transaction service.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait TransactionService {
def transactionContainer: TransactionContainer
}