diff --git a/akka-core/src/main/scala/stm/JtaTransactionManagerDetector.scala b/akka-core/src/main/scala/stm/JtaTransactionManagerDetector.scala deleted file mode 100644 index fb78bf922a..0000000000 --- a/akka-core/src/main/scala/stm/JtaTransactionManagerDetector.scala +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.stm - -import javax.transaction.{TransactionManager, UserTransaction, SystemException} -import javax.naming.{InitialContext, Context, NamingException} - -import se.scalablesolutions.akka.config.Config._ -import se.scalablesolutions.akka.util.Logging - -/** - * @author Jonas Bonér - */ -object JtaTransactionManagerDetector extends Logging { - val DEFAULT_USER_TRANSACTION_NAME = "java:comp/UserTransaction" - val FALLBACK_TRANSACTION_MANAGER_NAMES = List("java:comp/TransactionManager", - "java:appserver/TransactionManager", - "java:pm/TransactionManager", - "java:/TransactionManager") - val DEFAULT_TRANSACTION_SYNCHRONIZATION_REGISTRY_NAME = "java:comp/TransactionSynchronizationRegistry" - val TRANSACTION_SYNCHRONIZATION_REGISTRY_CLASS_NAME = "javax.transaction.TransactionSynchronizationRegistry" - - def findUserTransaction: Option[UserTransaction] = { - val located = createInitialContext.lookup(DEFAULT_USER_TRANSACTION_NAME) - if (located eq null) None - else { - log.info("JTA UserTransaction detected [%s]", located) - Some(located.asInstanceOf[UserTransaction]) - } - } - - def findTransactionManager: Option[TransactionManager] = { - val context = createInitialContext - val tms = for { - name <- FALLBACK_TRANSACTION_MANAGER_NAMES - tm = context.lookup(name) - if tm ne null - } yield tm - tms match { - case Nil => None - case tm :: _ => - log.info("JTA TransactionManager detected [%s]", tm) - Some(tm.asInstanceOf[TransactionManager]) - } - } - - private def createInitialContext = new InitialContext(new java.util.Hashtable) -} diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala index d670060a45..7fbe9898e6 100644 --- a/akka-core/src/main/scala/stm/Transaction.scala +++ b/akka-core/src/main/scala/stm/Transaction.scala @@ -297,31 +297,24 @@ object Transaction { * @author Jonas Bonér */ @serializable class Transaction extends Logging { - import JtaTransactionManagerDetector._ val JTA_AWARE = config.getBool("akka.stm.jta-aware", false) - val jta: Either[Option[UserTransaction], Option[TransactionManager]] = if (JTA_AWARE) { - findUserTransaction match { - case None => Right(findTransactionManager) - case tm => Left(tm) - } - } else Left(None) - + val id = Transaction.idFactory.incrementAndGet @volatile private[this] var status: TransactionStatus = TransactionStatus.New private[akka] var transaction: Option[MultiverseTransaction] = None private[this] val persistentStateMap = new HashMap[String, Committable] private[akka] val depth = new AtomicInteger(0) - + + val tc: Option[TransactionContainer] = + if (JTA_AWARE) Some(TransactionContainer()) + else None + log.trace("Creating %s", toString) // --- public methods --------- def begin = synchronized { - jta match { - case Left(Some(userTx)) => if (!isJtaTxActive(userTx.getStatus)) userTx.begin - case Right(Some(txMan)) => if (!isJtaTxActive(txMan.getStatus)) txMan.begin - case _ => {} // do nothing - } + tc.foreach(_.begin) } def commit = synchronized { @@ -330,20 +323,12 @@ object Transaction { persistentStateMap.valuesIterator.foreach(_.commit) } status = TransactionStatus.Completed - jta match { - case Left(Some(userTx)) => if (isJtaTxActive(userTx.getStatus)) userTx.commit - case Right(Some(txMan)) => if (isJtaTxActive(txMan.getStatus)) txMan.commit - case _ => {} // do nothing - } + tc.foreach(_.commit) } def abort = synchronized { log.trace("Aborting transaction %s", toString) - jta match { - case Left(Some(userTx)) => if (isJtaTxActive(userTx.getStatus)) userTx.rollback - case Right(Some(txMan)) => if (isJtaTxActive(txMan.getStatus)) txMan.rollback - case _ => {} // do nothing - } + tc.foreach(_.rollback) } def isNew = synchronized { status == TransactionStatus.New } diff --git a/akka-core/src/main/scala/stm/TransactionContainer.scala b/akka-core/src/main/scala/stm/TransactionContainer.scala new file mode 100644 index 0000000000..19c7c54255 --- /dev/null +++ b/akka-core/src/main/scala/stm/TransactionContainer.scala @@ -0,0 +1,160 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.stm + +import javax.transaction.{TransactionManager, UserTransaction, Transaction => JtaTransaction, SystemException, Status} +import javax.naming.{InitialContext, Context, NamingException} + +import se.scalablesolutions.akka.config.Config._ +import se.scalablesolutions.akka.util.Logging + +/** + * JTA transaction container holding either a UserTransaction or a TransactionManager. + *

+ * The TransactionContainer is created using the factory val container = TransactionContainer() + * + * @author Jonas Bonér + */ +class TransactionContainer private (val tm: Either[Option[UserTransaction], Option[TransactionManager]]) { + + def begin = tm match { + case Left(Some(userTx)) => userTx.begin + case Right(Some(txMan)) => txMan.begin + case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope") + } + + def commit = tm match { + case Left(Some(userTx)) => userTx.commit + case Right(Some(txMan)) => txMan.commit + case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope") + } + + def rollback = tm match { + case Left(Some(userTx)) => userTx.rollback + case Right(Some(txMan)) => txMan.rollback + case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope") + } + + def getStatus = tm match { + case Left(Some(userTx)) => userTx.getStatus + case Right(Some(txMan)) => txMan.getStatus + case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope") + } + + def isInExistingTransaction = tm match { + case Left(Some(userTx)) => userTx.getStatus == Status.STATUS_ACTIVE + case Right(Some(txMan)) => txMan.getStatus == Status.STATUS_ACTIVE + case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope") + } + + def isRollbackOnly = tm match { + case Left(Some(userTx)) => userTx.getStatus == Status.STATUS_MARKED_ROLLBACK + case Right(Some(txMan)) => txMan.getStatus == Status.STATUS_MARKED_ROLLBACK + case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope") + } + + def setRollbackOnly = tm match { + case Left(Some(userTx)) => userTx.setRollbackOnly + case Right(Some(txMan)) => txMan.setRollbackOnly + case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope") + } + /* + def getTransaction: JtaTransaction = tm match { + case Left(Some(userTx)) => userTx + case Right(Some(txMan)) => txMan.getTransaction + case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope") + } + */ + def suspend = tm match { + case Right(Some(txMan)) => txMan.suspend + case _ => throw new IllegalStateException("Does not have a TransactionManager in scope") + } + + def resume(tx: JtaTransaction) = tm match { + case Right(Some(txMan)) => txMan.resume(tx) + case _ => throw new IllegalStateException("Does not have a TransactionManager in scope") + } +} + +/** + * Detects if there is a UserTransaction or TransactionManager available in the JNDI. + * + * @author Jonas Bonér + */ +object TransactionContainer extends Logging { + val AKKA_JTA_TRANSACTION_SERVICE_CLASS = "se.scalablesolutions.akka.jta.AtomikosTransactionService" + val DEFAULT_USER_TRANSACTION_NAME = "java:comp/UserTransaction" + val FALLBACK_TRANSACTION_MANAGER_NAMES = "java:comp/TransactionManager" :: + "java:appserver/TransactionManager" :: + "java:pm/TransactionManager" :: + "java:/TransactionManager" :: Nil + val DEFAULT_TRANSACTION_SYNCHRONIZATION_REGISTRY_NAME = "java:comp/TransactionSynchronizationRegistry" + val TRANSACTION_SYNCHRONIZATION_REGISTRY_CLASS_NAME = "javax.transaction.TransactionSynchronizationRegistry" + val JTA_PROVIDER = config.getString("akka.stm.jta.provider", "from-jndi") + + def apply(tm: Either[Option[UserTransaction], Option[TransactionManager]]) = new TransactionContainer(tm) + + def apply(): TransactionContainer = + JTA_PROVIDER match { + case "from-jndi" => + new TransactionContainer(findUserTransaction match { + case None => Right(findTransactionManager) + case tm => Left(tm) + }) + case "atomikos" => + try { + Class.forName(AKKA_JTA_TRANSACTION_SERVICE_CLASS) + .newInstance.asInstanceOf[TransactionService] + .transactionContainer + } catch { + case e: ClassNotFoundException => + throw new IllegalStateException( + "JTA provider defined as 'atomikos', but the AtomikosTransactionService classes can not be found." + + "\n\tPlease make sure you have 'akka-jta' JAR and its dependencies on your classpath.") + } + case _ => + throw new IllegalStateException( + "No UserTransaction on TransactionManager could be found in scope." + + "\n\tEither add 'akka-jta' to the classpath or make sure there is a" + + "\n\tTransactionManager or UserTransaction defined in the JNDI.") + + } + + def findUserTransaction: Option[UserTransaction] = { + val located = createInitialContext.lookup(DEFAULT_USER_TRANSACTION_NAME) + if (located eq null) None + else { + log.info("JTA UserTransaction detected [%s]", located) + Some(located.asInstanceOf[UserTransaction]) + } + } + + def findTransactionManager: Option[TransactionManager] = { + val context = createInitialContext + val tms = for { + name <- FALLBACK_TRANSACTION_MANAGER_NAMES + tm = context.lookup(name) + if tm ne null + } yield tm + tms match { + case Nil => None + case tm :: _ => + log.info("JTA TransactionManager detected [%s]", tm) + Some(tm.asInstanceOf[TransactionManager]) + } + } + + private def createInitialContext = new InitialContext(new java.util.Hashtable) +} + +/** + * JTA Transaction service. + * + * @author Jonas Bonér + */ +trait TransactionService { + def transactionContainer: TransactionContainer +} + diff --git a/akka-jta/src/main/scala/AtomikosTransactionService.scala b/akka-jta/src/main/scala/AtomikosTransactionService.scala index 1daa08a204..d59c60961b 100644 --- a/akka-jta/src/main/scala/AtomikosTransactionService.scala +++ b/akka-jta/src/main/scala/AtomikosTransactionService.scala @@ -10,19 +10,22 @@ import com.atomikos.icatch.jta.{J2eeTransactionManager, J2eeUserTransaction} import com.atomikos.icatch.config.{TSInitInfo, UserTransactionService, UserTransactionServiceImp} import se.scalablesolutions.akka.config.Config._ +import se.scalablesolutions.akka.stm.{TransactionService, TransactionContainer} + +object AtomikosTransactionService extends AtomikosTransactionService /** * Atomikos implementation of the transaction service trait. * * @author Jonas Bonér */ -object AtomikosTransactionService extends TransactionService with TransactionProtocol { +class AtomikosTransactionService extends TransactionService with TransactionProtocol { - val JTA_TRANSACTION_TIMEOUT = config.getInt("akka.jta.timeout", 60) + val JTA_TRANSACTION_TIMEOUT: Int = config.getInt("akka.stm.jta.timeout", 60000) / 1000 private val txService: UserTransactionService = new UserTransactionServiceImp private val info: TSInitInfo = txService.createTSInitInfo - val transactionManager = + val transactionContainer: TransactionContainer = TransactionContainer(Right(Some( try { txService.init(info) val tm: TransactionManager = new J2eeTransactionManager @@ -31,7 +34,7 @@ object AtomikosTransactionService extends TransactionService with TransactionPro } catch { case e => throw new SystemException("Could not create a new Atomikos J2EE Transaction Manager, due to: " + e.toString) } - + ))) // TODO: gracefully shutdown of the TM //txService.shutdown(false) } diff --git a/akka-jta/src/main/scala/TransactionContext.scala b/akka-jta/src/main/scala/TransactionContext.scala index 218bd93828..0f6ae29454 100644 --- a/akka-jta/src/main/scala/TransactionContext.scala +++ b/akka-jta/src/main/scala/TransactionContext.scala @@ -6,18 +6,10 @@ package se.scalablesolutions.akka.jta import javax.transaction.{Transaction, Status, TransactionManager} +import se.scalablesolutions.akka.stm.{TransactionService, TransactionContainer} import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.config.Config._ -/** - * JTA Transaction service. - * - * @author Jonas Bonér - */ -trait TransactionService { - def transactionManager: TransactionManager -} - /** * Base monad for the transaction monad implementations. * @@ -39,11 +31,6 @@ trait TransactionMonad { // JTA Transaction definitions // ----------------------------- - /** - * Returns the current Transaction. - */ - def getTransaction: Transaction = TransactionContext.getTransactionManager.getTransaction - /** * Marks the current transaction as doomed. */ @@ -108,13 +95,8 @@ trait TransactionMonad { * @author Jonas Bonér */ object TransactionContext extends TransactionProtocol with Logging { - val TRANSACTION_PROVIDER = config.getString("akka.jta.transaction-provider", "atomikos") - - private implicit val defaultTransactionService = TRANSACTION_PROVIDER match { - case "atomikos" => AtomikosTransactionService - case _ => throw new IllegalArgumentException("Transaction provider [" + TRANSACTION_PROVIDER + "] is not supported") - } - private[TransactionContext] val stack = new scala.util.DynamicVariable(new TransactionContext) + implicit val tc = TransactionContainer() + private[TransactionContext] val stack = new scala.util.DynamicVariable(new TransactionContext(tc)) object Required extends TransactionMonad { def map[T](f: TransactionMonad => T): T = withTxRequired { f(this) } @@ -157,9 +139,7 @@ object TransactionContext extends TransactionProtocol with Logging { private[jta] def isRollbackOnly = current.isRollbackOnly - private[jta] def getTransactionManager: TransactionManager = current.getTransactionManager - - private[jta] def getTransaction: Transaction = current.getTransactionManager.getTransaction + private[jta] def getTransactionContainer: TransactionContainer = current.getTransactionContainer private[this] def current = stack.value @@ -171,14 +151,14 @@ object TransactionContext extends TransactionProtocol with Logging { */ private[jta] def withNewContext[T](body: => T): T = { val suspendedTx: Option[Transaction] = - if (isInExistingTransaction(getTransactionManager)) { + if (getTransactionContainer.isInExistingTransaction) { log.debug("Suspending TX") - Some(getTransactionManager.suspend) + Some(getTransactionContainer.suspend) } else None - val result = stack.withValue(new TransactionContext) { body } + val result = stack.withValue(new TransactionContext(tc)) { body } if (suspendedTx.isDefined) { log.debug("Resuming TX") - getTransactionManager.resume(suspendedTx.get) + getTransactionContainer.resume(suspendedTx.get) } result } @@ -189,9 +169,8 @@ object TransactionContext extends TransactionProtocol with Logging { * * @author Jonas Bonér */ -class TransactionContext(private implicit val transactionService: TransactionService) { - val tm: TransactionManager = transactionService.transactionManager - private def setRollbackOnly = tm.setRollbackOnly - private def isRollbackOnly: Boolean = tm.getStatus == Status.STATUS_MARKED_ROLLBACK - private def getTransactionManager: TransactionManager = tm +class TransactionContext(val tc: TransactionContainer) { + private def setRollbackOnly = tc.setRollbackOnly + private def isRollbackOnly: Boolean = tc.getStatus == Status.STATUS_MARKED_ROLLBACK + private def getTransactionContainer: TransactionContainer = tc } diff --git a/akka-jta/src/main/scala/TransactionProtocol.scala b/akka-jta/src/main/scala/TransactionProtocol.scala index 4a4c263eb1..837955425e 100644 --- a/akka-jta/src/main/scala/TransactionProtocol.scala +++ b/akka-jta/src/main/scala/TransactionProtocol.scala @@ -5,6 +5,7 @@ package se.scalablesolutions.akka.jta import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.stm.TransactionContainer import javax.naming.{NamingException, Context, InitialContext} import javax.transaction.{ @@ -71,7 +72,7 @@ trait TransactionProtocol extends Logging { *

    * override def joinTransaction = {
    *   val em = TransactionContext.getEntityManager
-   *   val tm = TransactionContext.getTransactionManager
+   *   val tm = TransactionContext.getTransactionContainer
    *   val closeAtTxCompletion: Boolean) 
    *   tm.getTransaction.registerSynchronization(new javax.transaction.Synchronization() {
    *     def beforeCompletion = {
@@ -109,7 +110,7 @@ trait TransactionProtocol extends Logging {
    * Here is an example on how to handle JPA exceptions.
    * 
    * 
-   * def handleException(tm: TransactionManager, e: Exception) = {
+   * def handleException(tm: TransactionContainer, e: Exception) = {
    *   if (isInExistingTransaction(tm)) {
    *     // Do not roll back in case of NoResultException or NonUniqueResultException
    *     if (!e.isInstanceOf[NoResultException] &&
@@ -122,7 +123,7 @@ trait TransactionProtocol extends Logging {
    * }
    * 
*/ - def handleException(tm: TransactionManager, e: Exception) = { + def handleException(tm: TransactionContainer, e: Exception) = { tm.setRollbackOnly throw e } @@ -133,7 +134,7 @@ trait TransactionProtocol extends Logging { * Creates a new transaction if no transaction is active in scope, else joins the outer transaction. */ def withTxRequired[T](body: => T): T = { - val tm = TransactionContext.getTransactionManager + val tm = TransactionContext.getTransactionContainer if (!isInExistingTransaction(tm)) { tm.begin try { @@ -154,7 +155,7 @@ trait TransactionProtocol extends Logging { * commits or rollbacks new transaction, finally resumes previous transaction. */ def withTxRequiresNew[T](body: => T): T = TransactionContext.withNewContext { - val tm = TransactionContext.getTransactionManager + val tm = TransactionContext.getTransactionContainer tm.begin try { joinTransaction @@ -191,7 +192,7 @@ trait TransactionProtocol extends Logging { * Throws a TransactionRequiredException if there is no transaction active in scope. */ def withTxMandatory[T](body: => T): T = { - if (!isInExistingTransaction(TransactionContext.getTransactionManager)) + if (!isInExistingTransaction(TransactionContext.getTransactionContainer)) throw new TransactionRequiredException("No active TX at method with TX type set to MANDATORY") body } @@ -202,12 +203,12 @@ trait TransactionProtocol extends Logging { * Throws a SystemException in case of an existing transaction in scope. */ def withTxNever[T](body: => T): T = { - if (isInExistingTransaction(TransactionContext.getTransactionManager)) + if (isInExistingTransaction(TransactionContext.getTransactionContainer)) throw new SystemException("Detected active TX at method with TX type set to NEVER") body } - protected def commitOrRollBack(tm: TransactionManager) = { + protected def commitOrRollBack(tm: TransactionContainer) = { if (isInExistingTransaction(tm)) { if (isRollbackOnly(tm)) { log.debug("Rolling back TX marked as ROLLBACK_ONLY") @@ -229,7 +230,7 @@ trait TransactionProtocol extends Logging { * @param tm the transaction manager * @return boolean */ - protected def isInExistingTransaction(tm: TransactionManager): Boolean = + protected def isInExistingTransaction(tm: TransactionContainer): Boolean = tm.getStatus != Status.STATUS_NO_TRANSACTION /** @@ -238,7 +239,7 @@ trait TransactionProtocol extends Logging { * @param tm the transaction manager * @return boolean */ - protected def isRollbackOnly(tm: TransactionManager): Boolean = + protected def isRollbackOnly(tm: TransactionContainer): Boolean = tm.getStatus == Status.STATUS_MARKED_ROLLBACK /** diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 8e4be0bab4..d2c5ed3c73 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -17,7 +17,7 @@ version = "0.9" - # FQN to the class doing initial active object/actor + # FQN (Fully Qualified Name) to the class doing initial active object/actor # supervisor bootstrap, should be defined in default constructor boot = ["sample.camel.Boot", "sample.rest.java.Boot", @@ -31,17 +31,18 @@ service = on - fair = on # should transactions be fair or non-fair (non fair yield better performance) - max-nr-of-retries = 1000 # max nr of retries of a failing transaction before giving up - timeout = 10000 # transaction timeout; if transaction has not committed within the timeout then it is aborted - jta-aware = off # 'on' means that if there JTA Transaction Manager available then the STM will begin (or join), commit or rollback the JTA transaction. Default is 'off' + fair = on # should transactions be fair or non-fair (non fair yield better performance) + max-nr-of-retries = 1000 # max nr of retries of a failing transaction before giving up + timeout = 10000 # transaction timeout; if transaction has not committed within the timeout then it is aborted + jta-aware = off # 'on' means that if there JTA Transaction Manager available then the STM will + # begin (or join), commit or rollback the JTA transaction. Default is 'off'. - service = off # 'on' means that if there is a running JTA transaction then the STM will participate in it. Default is 'off' - timeout = 60 # timeout in seconds + provider = "from-jndi" # Options: "from-jndi", "atomikos" + timeout = 60000 - + service = on hostname = "localhost"