From b4176e276a9a5d4e6dce8eeeb267e7882a461427 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Tue, 20 Apr 2010 14:53:45 +0200 Subject: [PATCH] Added STM Synchronization registration to JNDI TransactionSynchronizationRegistry --- .../{TransactionContainer.scala => JTA.scala} | 163 +++++++++++------- .../src/main/scala/stm/Transaction.scala | 19 +- 2 files changed, 112 insertions(+), 70 deletions(-) rename akka-core/src/main/scala/stm/{TransactionContainer.scala => JTA.scala} (81%) diff --git a/akka-core/src/main/scala/stm/TransactionContainer.scala b/akka-core/src/main/scala/stm/JTA.scala similarity index 81% rename from akka-core/src/main/scala/stm/TransactionContainer.scala rename to akka-core/src/main/scala/stm/JTA.scala index 09da21a494..5be0460681 100644 --- a/akka-core/src/main/scala/stm/TransactionContainer.scala +++ b/akka-core/src/main/scala/stm/JTA.scala @@ -4,12 +4,98 @@ package se.scalablesolutions.akka.stm -import javax.transaction.{TransactionManager, UserTransaction, Transaction => JtaTransaction, SystemException, Status} +import javax.transaction.{TransactionManager, UserTransaction, Transaction => JtaTransaction, SystemException, Status, Synchronization, TransactionSynchronizationRegistry} import javax.naming.{InitialContext, Context, NamingException} import se.scalablesolutions.akka.config.Config._ import se.scalablesolutions.akka.util.Logging +/** + * Detects if there is a UserTransaction or TransactionManager available in the JNDI. + * + * @author Jonas Bonér + */ +object TransactionContainer extends Logging { + val AKKA_JTA_TRANSACTION_SERVICE_CLASS = "se.scalablesolutions.akka.jta.AtomikosTransactionService" + val DEFAULT_USER_TRANSACTION_NAME = "java:comp/UserTransaction" + val FALLBACK_TRANSACTION_MANAGER_NAMES = "java:comp/TransactionManager" :: + "java:appserver/TransactionManager" :: + "java:pm/TransactionManager" :: + "java:/TransactionManager" :: Nil + val DEFAULT_TRANSACTION_SYNCHRONIZATION_REGISTRY_NAME = "java:comp/TransactionSynchronizationRegistry" + + val JTA_PROVIDER = config.getString("akka.jta.provider", "from-jndi") + + private var synchronizationRegistry: Option[TransactionSynchronizationRegistry] = None + + def apply(tm: Either[Option[UserTransaction], Option[TransactionManager]]) = new TransactionContainer(tm) + + def apply(): TransactionContainer = + JTA_PROVIDER match { + case "from-jndi" => + new TransactionContainer(findUserTransaction match { + case None => Right(findTransactionManager) + case tm => Left(tm) + }) + case "atomikos" => + try { + Class.forName(AKKA_JTA_TRANSACTION_SERVICE_CLASS) + .newInstance.asInstanceOf[TransactionService] + .transactionContainer + } catch { + case e: ClassNotFoundException => + throw new IllegalStateException( + "JTA provider defined as 'atomikos', but the AtomikosTransactionService classes can not be found." + + "\n\tPlease make sure you have 'akka-jta' JAR and its dependencies on your classpath.") + } + case _ => + throw new IllegalStateException( + "No UserTransaction on TransactionManager could be found in scope." + + "\n\tEither add 'akka-jta' to the classpath or make sure there is a" + + "\n\tTransactionManager or UserTransaction defined in the JNDI.") + + } + + def findUserTransaction: Option[UserTransaction] = { + val located = createInitialContext.lookup(DEFAULT_USER_TRANSACTION_NAME) + if (located eq null) None + else { + log.info("JTA UserTransaction detected [%s]", located) + Some(located.asInstanceOf[UserTransaction]) + } + } + + def findSynchronizationRegistry: Option[TransactionSynchronizationRegistry] = synchronized { + if (synchronizationRegistry.isDefined) synchronizationRegistry + else { + val located = createInitialContext.lookup(DEFAULT_TRANSACTION_SYNCHRONIZATION_REGISTRY_NAME) + if (located eq null) None + else { + log.info("JTA TransactionSynchronizationRegistry detected [%s]", located) + synchronizationRegistry = Some(located.asInstanceOf[TransactionSynchronizationRegistry]) + synchronizationRegistry + } + } + } + + def findTransactionManager: Option[TransactionManager] = { + val context = createInitialContext + val tms = for { + name <- FALLBACK_TRANSACTION_MANAGER_NAMES + tm = context.lookup(name) + if tm ne null + } yield tm + tms match { + case Nil => None + case tm :: _ => + log.info("JTA TransactionManager detected [%s]", tm) + Some(tm.asInstanceOf[TransactionManager]) + } + } + + private def createInitialContext = new InitialContext(new java.util.Hashtable) +} + /** * JTA transaction container holding either a UserTransaction or a TransactionManager. *

@@ -18,7 +104,6 @@ import se.scalablesolutions.akka.util.Logging * @author Jonas Bonér */ class TransactionContainer private (val tm: Either[Option[UserTransaction], Option[TransactionManager]]) { - def begin = tm match { case Left(Some(userTx)) => userTx.begin case Right(Some(txMan)) => txMan.begin @@ -73,74 +158,22 @@ class TransactionContainer private (val tm: Either[Option[UserTransaction], Opti } /** - * Detects if there is a UserTransaction or TransactionManager available in the JNDI. - * + * STM Synchronization class for synchronizing with the JTA TransactionManager. + * * @author Jonas Bonér */ -object TransactionContainer extends Logging { - val AKKA_JTA_TRANSACTION_SERVICE_CLASS = "se.scalablesolutions.akka.jta.AtomikosTransactionService" - val DEFAULT_USER_TRANSACTION_NAME = "java:comp/UserTransaction" - val FALLBACK_TRANSACTION_MANAGER_NAMES = "java:comp/TransactionManager" :: - "java:appserver/TransactionManager" :: - "java:pm/TransactionManager" :: - "java:/TransactionManager" :: Nil - val DEFAULT_TRANSACTION_SYNCHRONIZATION_REGISTRY_NAME = "java:comp/TransactionSynchronizationRegistry" - val TRANSACTION_SYNCHRONIZATION_REGISTRY_CLASS_NAME = "javax.transaction.TransactionSynchronizationRegistry" - val JTA_PROVIDER = config.getString("akka.jta.provider", "from-jndi") - - def apply(tm: Either[Option[UserTransaction], Option[TransactionManager]]) = new TransactionContainer(tm) - - def apply(): TransactionContainer = - JTA_PROVIDER match { - case "from-jndi" => - new TransactionContainer(findUserTransaction match { - case None => Right(findTransactionManager) - case tm => Left(tm) - }) - case "atomikos" => - try { - Class.forName(AKKA_JTA_TRANSACTION_SERVICE_CLASS) - .newInstance.asInstanceOf[TransactionService] - .transactionContainer - } catch { - case e: ClassNotFoundException => - throw new IllegalStateException( - "JTA provider defined as 'atomikos', but the AtomikosTransactionService classes can not be found." + - "\n\tPlease make sure you have 'akka-jta' JAR and its dependencies on your classpath.") - } - case _ => - throw new IllegalStateException( - "No UserTransaction on TransactionManager could be found in scope." + - "\n\tEither add 'akka-jta' to the classpath or make sure there is a" + - "\n\tTransactionManager or UserTransaction defined in the JNDI.") - - } - - def findUserTransaction: Option[UserTransaction] = { - val located = createInitialContext.lookup(DEFAULT_USER_TRANSACTION_NAME) - if (located eq null) None - else { - log.info("JTA UserTransaction detected [%s]", located) - Some(located.asInstanceOf[UserTransaction]) +class StmSynchronization(tc: TransactionContainer, tx: Transaction) extends Synchronization with Logging { + def beforeCompletion = { + val status = tc.getStatus + if (status != Status.STATUS_ROLLEDBACK && + status != Status.STATUS_ROLLING_BACK && + status != Status.STATUS_MARKED_ROLLBACK) { + log.debug("JTA transaction has failed, abort STM transaction") + tx.transaction.foreach(_.abort) // abort multiverse tx } } - def findTransactionManager: Option[TransactionManager] = { - val context = createInitialContext - val tms = for { - name <- FALLBACK_TRANSACTION_MANAGER_NAMES - tm = context.lookup(name) - if tm ne null - } yield tm - tms match { - case Nil => None - case tm :: _ => - log.info("JTA TransactionManager detected [%s]", tm) - Some(tm.asInstanceOf[TransactionManager]) - } - } - - private def createInitialContext = new InitialContext(new java.util.Hashtable) + def afterCompletion(status: Int) = {} } /** diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala index 7fbe9898e6..97d588f400 100644 --- a/akka-core/src/main/scala/stm/Transaction.scala +++ b/akka-core/src/main/scala/stm/Transaction.scala @@ -8,7 +8,7 @@ import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.TimeUnit -import javax.transaction.{TransactionManager, UserTransaction, Status} +import javax.transaction.{TransactionManager, UserTransaction, Status, TransactionSynchronizationRegistry} import scala.collection.mutable.HashMap @@ -305,7 +305,7 @@ object Transaction { private[this] val persistentStateMap = new HashMap[String, Committable] private[akka] val depth = new AtomicInteger(0) - val tc: Option[TransactionContainer] = + val jta: Option[TransactionContainer] = if (JTA_AWARE) Some(TransactionContainer()) else None @@ -314,7 +314,16 @@ object Transaction { // --- public methods --------- def begin = synchronized { - tc.foreach(_.begin) + jta.foreach { txContainer => + txContainer.begin + TransactionContainer.findSynchronizationRegistry match { + case Some(registry) => + registry.asInstanceOf[TransactionSynchronizationRegistry].registerInterposedSynchronization( + new StmSynchronization(txContainer, this)) + case None => + log.warning("Cannot find TransactionSynchronizationRegistry in JNDI, can't register STM synchronization") + } + } } def commit = synchronized { @@ -323,12 +332,12 @@ object Transaction { persistentStateMap.valuesIterator.foreach(_.commit) } status = TransactionStatus.Completed - tc.foreach(_.commit) + jta.foreach(_.commit) } def abort = synchronized { log.trace("Aborting transaction %s", toString) - tc.foreach(_.rollback) + jta.foreach(_.rollback) } def isNew = synchronized { status == TransactionStatus.New }