diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala index 7fbe9898e6..97d588f400 100644 --- a/akka-core/src/main/scala/stm/Transaction.scala +++ b/akka-core/src/main/scala/stm/Transaction.scala @@ -8,7 +8,7 @@ import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.TimeUnit -import javax.transaction.{TransactionManager, UserTransaction, Status} +import javax.transaction.{TransactionManager, UserTransaction, Status, TransactionSynchronizationRegistry} import scala.collection.mutable.HashMap @@ -305,7 +305,7 @@ object Transaction { private[this] val persistentStateMap = new HashMap[String, Committable] private[akka] val depth = new AtomicInteger(0) - val tc: Option[TransactionContainer] = + val jta: Option[TransactionContainer] = if (JTA_AWARE) Some(TransactionContainer()) else None @@ -314,7 +314,16 @@ object Transaction { // --- public methods --------- def begin = synchronized { - tc.foreach(_.begin) + jta.foreach { txContainer => + txContainer.begin + TransactionContainer.findSynchronizationRegistry match { + case Some(registry) => + registry.asInstanceOf[TransactionSynchronizationRegistry].registerInterposedSynchronization( + new StmSynchronization(txContainer, this)) + case None => + log.warning("Cannot find TransactionSynchronizationRegistry in JNDI, can't register STM synchronization") + } + } } def commit = synchronized { @@ -323,12 +332,12 @@ object Transaction { persistentStateMap.valuesIterator.foreach(_.commit) } status = TransactionStatus.Completed - tc.foreach(_.commit) + jta.foreach(_.commit) } def abort = synchronized { log.trace("Aborting transaction %s", toString) - tc.foreach(_.rollback) + jta.foreach(_.rollback) } def isNew = synchronized { status == TransactionStatus.New } diff --git a/akka-core/src/main/scala/stm/TransactionContainer.scala b/akka-core/src/main/scala/stm/TransactionContainer.scala deleted file mode 100644 index 09da21a494..0000000000 --- a/akka-core/src/main/scala/stm/TransactionContainer.scala +++ /dev/null @@ -1,154 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.stm - -import javax.transaction.{TransactionManager, UserTransaction, Transaction => JtaTransaction, SystemException, Status} -import javax.naming.{InitialContext, Context, NamingException} - -import se.scalablesolutions.akka.config.Config._ -import se.scalablesolutions.akka.util.Logging - -/** - * JTA transaction container holding either a UserTransaction or a TransactionManager. - *

- * The TransactionContainer is created using the factory val container = TransactionContainer() - * - * @author Jonas Bonér - */ -class TransactionContainer private (val tm: Either[Option[UserTransaction], Option[TransactionManager]]) { - - def begin = tm match { - case Left(Some(userTx)) => userTx.begin - case Right(Some(txMan)) => txMan.begin - case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope") - } - - def commit = tm match { - case Left(Some(userTx)) => userTx.commit - case Right(Some(txMan)) => txMan.commit - case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope") - } - - def rollback = tm match { - case Left(Some(userTx)) => userTx.rollback - case Right(Some(txMan)) => txMan.rollback - case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope") - } - - def getStatus = tm match { - case Left(Some(userTx)) => userTx.getStatus - case Right(Some(txMan)) => txMan.getStatus - case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope") - } - - def isInExistingTransaction = tm match { - case Left(Some(userTx)) => userTx.getStatus == Status.STATUS_ACTIVE - case Right(Some(txMan)) => txMan.getStatus == Status.STATUS_ACTIVE - case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope") - } - - def isRollbackOnly = tm match { - case Left(Some(userTx)) => userTx.getStatus == Status.STATUS_MARKED_ROLLBACK - case Right(Some(txMan)) => txMan.getStatus == Status.STATUS_MARKED_ROLLBACK - case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope") - } - - def setRollbackOnly = tm match { - case Left(Some(userTx)) => userTx.setRollbackOnly - case Right(Some(txMan)) => txMan.setRollbackOnly - case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope") - } - - def suspend = tm match { - case Right(Some(txMan)) => txMan.suspend - case _ => throw new IllegalStateException("Does not have a TransactionManager in scope") - } - - def resume(tx: JtaTransaction) = tm match { - case Right(Some(txMan)) => txMan.resume(tx) - case _ => throw new IllegalStateException("Does not have a TransactionManager in scope") - } -} - -/** - * Detects if there is a UserTransaction or TransactionManager available in the JNDI. - * - * @author Jonas Bonér - */ -object TransactionContainer extends Logging { - val AKKA_JTA_TRANSACTION_SERVICE_CLASS = "se.scalablesolutions.akka.jta.AtomikosTransactionService" - val DEFAULT_USER_TRANSACTION_NAME = "java:comp/UserTransaction" - val FALLBACK_TRANSACTION_MANAGER_NAMES = "java:comp/TransactionManager" :: - "java:appserver/TransactionManager" :: - "java:pm/TransactionManager" :: - "java:/TransactionManager" :: Nil - val DEFAULT_TRANSACTION_SYNCHRONIZATION_REGISTRY_NAME = "java:comp/TransactionSynchronizationRegistry" - val TRANSACTION_SYNCHRONIZATION_REGISTRY_CLASS_NAME = "javax.transaction.TransactionSynchronizationRegistry" - val JTA_PROVIDER = config.getString("akka.jta.provider", "from-jndi") - - def apply(tm: Either[Option[UserTransaction], Option[TransactionManager]]) = new TransactionContainer(tm) - - def apply(): TransactionContainer = - JTA_PROVIDER match { - case "from-jndi" => - new TransactionContainer(findUserTransaction match { - case None => Right(findTransactionManager) - case tm => Left(tm) - }) - case "atomikos" => - try { - Class.forName(AKKA_JTA_TRANSACTION_SERVICE_CLASS) - .newInstance.asInstanceOf[TransactionService] - .transactionContainer - } catch { - case e: ClassNotFoundException => - throw new IllegalStateException( - "JTA provider defined as 'atomikos', but the AtomikosTransactionService classes can not be found." + - "\n\tPlease make sure you have 'akka-jta' JAR and its dependencies on your classpath.") - } - case _ => - throw new IllegalStateException( - "No UserTransaction on TransactionManager could be found in scope." + - "\n\tEither add 'akka-jta' to the classpath or make sure there is a" + - "\n\tTransactionManager or UserTransaction defined in the JNDI.") - - } - - def findUserTransaction: Option[UserTransaction] = { - val located = createInitialContext.lookup(DEFAULT_USER_TRANSACTION_NAME) - if (located eq null) None - else { - log.info("JTA UserTransaction detected [%s]", located) - Some(located.asInstanceOf[UserTransaction]) - } - } - - def findTransactionManager: Option[TransactionManager] = { - val context = createInitialContext - val tms = for { - name <- FALLBACK_TRANSACTION_MANAGER_NAMES - tm = context.lookup(name) - if tm ne null - } yield tm - tms match { - case Nil => None - case tm :: _ => - log.info("JTA TransactionManager detected [%s]", tm) - Some(tm.asInstanceOf[TransactionManager]) - } - } - - private def createInitialContext = new InitialContext(new java.util.Hashtable) -} - -/** - * JTA Transaction service. - * - * @author Jonas Bonér - */ -trait TransactionService { - def transactionContainer: TransactionContainer -} -