Finalized the JTA support

This commit is contained in:
Jonas Bonér 2010-04-20 11:44:26 +02:00
parent 085d472364
commit dbc9125177
7 changed files with 208 additions and 129 deletions

View file

@ -1,50 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
import javax.transaction.{TransactionManager, UserTransaction, SystemException}
import javax.naming.{InitialContext, Context, NamingException}
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.util.Logging
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object JtaTransactionManagerDetector extends Logging {
val DEFAULT_USER_TRANSACTION_NAME = "java:comp/UserTransaction"
val FALLBACK_TRANSACTION_MANAGER_NAMES = List("java:comp/TransactionManager",
"java:appserver/TransactionManager",
"java:pm/TransactionManager",
"java:/TransactionManager")
val DEFAULT_TRANSACTION_SYNCHRONIZATION_REGISTRY_NAME = "java:comp/TransactionSynchronizationRegistry"
val TRANSACTION_SYNCHRONIZATION_REGISTRY_CLASS_NAME = "javax.transaction.TransactionSynchronizationRegistry"
def findUserTransaction: Option[UserTransaction] = {
val located = createInitialContext.lookup(DEFAULT_USER_TRANSACTION_NAME)
if (located eq null) None
else {
log.info("JTA UserTransaction detected [%s]", located)
Some(located.asInstanceOf[UserTransaction])
}
}
def findTransactionManager: Option[TransactionManager] = {
val context = createInitialContext
val tms = for {
name <- FALLBACK_TRANSACTION_MANAGER_NAMES
tm = context.lookup(name)
if tm ne null
} yield tm
tms match {
case Nil => None
case tm :: _ =>
log.info("JTA TransactionManager detected [%s]", tm)
Some(tm.asInstanceOf[TransactionManager])
}
}
private def createInitialContext = new InitialContext(new java.util.Hashtable)
}

View file

@ -297,14 +297,7 @@ object Transaction {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@serializable class Transaction extends Logging {
import JtaTransactionManagerDetector._
val JTA_AWARE = config.getBool("akka.stm.jta-aware", false)
val jta: Either[Option[UserTransaction], Option[TransactionManager]] = if (JTA_AWARE) {
findUserTransaction match {
case None => Right(findTransactionManager)
case tm => Left(tm)
}
} else Left(None)
val id = Transaction.idFactory.incrementAndGet
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
@ -312,16 +305,16 @@ object Transaction {
private[this] val persistentStateMap = new HashMap[String, Committable]
private[akka] val depth = new AtomicInteger(0)
val tc: Option[TransactionContainer] =
if (JTA_AWARE) Some(TransactionContainer())
else None
log.trace("Creating %s", toString)
// --- public methods ---------
def begin = synchronized {
jta match {
case Left(Some(userTx)) => if (!isJtaTxActive(userTx.getStatus)) userTx.begin
case Right(Some(txMan)) => if (!isJtaTxActive(txMan.getStatus)) txMan.begin
case _ => {} // do nothing
}
tc.foreach(_.begin)
}
def commit = synchronized {
@ -330,20 +323,12 @@ object Transaction {
persistentStateMap.valuesIterator.foreach(_.commit)
}
status = TransactionStatus.Completed
jta match {
case Left(Some(userTx)) => if (isJtaTxActive(userTx.getStatus)) userTx.commit
case Right(Some(txMan)) => if (isJtaTxActive(txMan.getStatus)) txMan.commit
case _ => {} // do nothing
}
tc.foreach(_.commit)
}
def abort = synchronized {
log.trace("Aborting transaction %s", toString)
jta match {
case Left(Some(userTx)) => if (isJtaTxActive(userTx.getStatus)) userTx.rollback
case Right(Some(txMan)) => if (isJtaTxActive(txMan.getStatus)) txMan.rollback
case _ => {} // do nothing
}
tc.foreach(_.rollback)
}
def isNew = synchronized { status == TransactionStatus.New }

View file

@ -0,0 +1,160 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
import javax.transaction.{TransactionManager, UserTransaction, Transaction => JtaTransaction, SystemException, Status}
import javax.naming.{InitialContext, Context, NamingException}
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.util.Logging
/**
* JTA transaction container holding either a UserTransaction or a TransactionManager.
* <p/>
* The TransactionContainer is created using the factory <tt>val container = TransactionContainer()</tt>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class TransactionContainer private (val tm: Either[Option[UserTransaction], Option[TransactionManager]]) {
def begin = tm match {
case Left(Some(userTx)) => userTx.begin
case Right(Some(txMan)) => txMan.begin
case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope")
}
def commit = tm match {
case Left(Some(userTx)) => userTx.commit
case Right(Some(txMan)) => txMan.commit
case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope")
}
def rollback = tm match {
case Left(Some(userTx)) => userTx.rollback
case Right(Some(txMan)) => txMan.rollback
case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope")
}
def getStatus = tm match {
case Left(Some(userTx)) => userTx.getStatus
case Right(Some(txMan)) => txMan.getStatus
case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope")
}
def isInExistingTransaction = tm match {
case Left(Some(userTx)) => userTx.getStatus == Status.STATUS_ACTIVE
case Right(Some(txMan)) => txMan.getStatus == Status.STATUS_ACTIVE
case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope")
}
def isRollbackOnly = tm match {
case Left(Some(userTx)) => userTx.getStatus == Status.STATUS_MARKED_ROLLBACK
case Right(Some(txMan)) => txMan.getStatus == Status.STATUS_MARKED_ROLLBACK
case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope")
}
def setRollbackOnly = tm match {
case Left(Some(userTx)) => userTx.setRollbackOnly
case Right(Some(txMan)) => txMan.setRollbackOnly
case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope")
}
/*
def getTransaction: JtaTransaction = tm match {
case Left(Some(userTx)) => userTx
case Right(Some(txMan)) => txMan.getTransaction
case _ => throw new IllegalStateException("Does not have a UserTransaction or TransactionManager in scope")
}
*/
def suspend = tm match {
case Right(Some(txMan)) => txMan.suspend
case _ => throw new IllegalStateException("Does not have a TransactionManager in scope")
}
def resume(tx: JtaTransaction) = tm match {
case Right(Some(txMan)) => txMan.resume(tx)
case _ => throw new IllegalStateException("Does not have a TransactionManager in scope")
}
}
/**
* Detects if there is a UserTransaction or TransactionManager available in the JNDI.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object TransactionContainer extends Logging {
val AKKA_JTA_TRANSACTION_SERVICE_CLASS = "se.scalablesolutions.akka.jta.AtomikosTransactionService"
val DEFAULT_USER_TRANSACTION_NAME = "java:comp/UserTransaction"
val FALLBACK_TRANSACTION_MANAGER_NAMES = "java:comp/TransactionManager" ::
"java:appserver/TransactionManager" ::
"java:pm/TransactionManager" ::
"java:/TransactionManager" :: Nil
val DEFAULT_TRANSACTION_SYNCHRONIZATION_REGISTRY_NAME = "java:comp/TransactionSynchronizationRegistry"
val TRANSACTION_SYNCHRONIZATION_REGISTRY_CLASS_NAME = "javax.transaction.TransactionSynchronizationRegistry"
val JTA_PROVIDER = config.getString("akka.stm.jta.provider", "from-jndi")
def apply(tm: Either[Option[UserTransaction], Option[TransactionManager]]) = new TransactionContainer(tm)
def apply(): TransactionContainer =
JTA_PROVIDER match {
case "from-jndi" =>
new TransactionContainer(findUserTransaction match {
case None => Right(findTransactionManager)
case tm => Left(tm)
})
case "atomikos" =>
try {
Class.forName(AKKA_JTA_TRANSACTION_SERVICE_CLASS)
.newInstance.asInstanceOf[TransactionService]
.transactionContainer
} catch {
case e: ClassNotFoundException =>
throw new IllegalStateException(
"JTA provider defined as 'atomikos', but the AtomikosTransactionService classes can not be found." +
"\n\tPlease make sure you have 'akka-jta' JAR and its dependencies on your classpath.")
}
case _ =>
throw new IllegalStateException(
"No UserTransaction on TransactionManager could be found in scope." +
"\n\tEither add 'akka-jta' to the classpath or make sure there is a" +
"\n\tTransactionManager or UserTransaction defined in the JNDI.")
}
def findUserTransaction: Option[UserTransaction] = {
val located = createInitialContext.lookup(DEFAULT_USER_TRANSACTION_NAME)
if (located eq null) None
else {
log.info("JTA UserTransaction detected [%s]", located)
Some(located.asInstanceOf[UserTransaction])
}
}
def findTransactionManager: Option[TransactionManager] = {
val context = createInitialContext
val tms = for {
name <- FALLBACK_TRANSACTION_MANAGER_NAMES
tm = context.lookup(name)
if tm ne null
} yield tm
tms match {
case Nil => None
case tm :: _ =>
log.info("JTA TransactionManager detected [%s]", tm)
Some(tm.asInstanceOf[TransactionManager])
}
}
private def createInitialContext = new InitialContext(new java.util.Hashtable)
}
/**
* JTA Transaction service.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait TransactionService {
def transactionContainer: TransactionContainer
}

View file

@ -10,19 +10,22 @@ import com.atomikos.icatch.jta.{J2eeTransactionManager, J2eeUserTransaction}
import com.atomikos.icatch.config.{TSInitInfo, UserTransactionService, UserTransactionServiceImp}
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.stm.{TransactionService, TransactionContainer}
object AtomikosTransactionService extends AtomikosTransactionService
/**
* Atomikos implementation of the transaction service trait.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object AtomikosTransactionService extends TransactionService with TransactionProtocol {
class AtomikosTransactionService extends TransactionService with TransactionProtocol {
val JTA_TRANSACTION_TIMEOUT = config.getInt("akka.jta.timeout", 60)
val JTA_TRANSACTION_TIMEOUT: Int = config.getInt("akka.stm.jta.timeout", 60000) / 1000
private val txService: UserTransactionService = new UserTransactionServiceImp
private val info: TSInitInfo = txService.createTSInitInfo
val transactionManager =
val transactionContainer: TransactionContainer = TransactionContainer(Right(Some(
try {
txService.init(info)
val tm: TransactionManager = new J2eeTransactionManager
@ -31,7 +34,7 @@ object AtomikosTransactionService extends TransactionService with TransactionPro
} catch {
case e => throw new SystemException("Could not create a new Atomikos J2EE Transaction Manager, due to: " + e.toString)
}
)))
// TODO: gracefully shutdown of the TM
//txService.shutdown(false)
}

View file

@ -6,18 +6,10 @@ package se.scalablesolutions.akka.jta
import javax.transaction.{Transaction, Status, TransactionManager}
import se.scalablesolutions.akka.stm.{TransactionService, TransactionContainer}
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.Config._
/**
* JTA Transaction service.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait TransactionService {
def transactionManager: TransactionManager
}
/**
* Base monad for the transaction monad implementations.
*
@ -39,11 +31,6 @@ trait TransactionMonad {
// JTA Transaction definitions
// -----------------------------
/**
* Returns the current Transaction.
*/
def getTransaction: Transaction = TransactionContext.getTransactionManager.getTransaction
/**
* Marks the current transaction as doomed.
*/
@ -108,13 +95,8 @@ trait TransactionMonad {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object TransactionContext extends TransactionProtocol with Logging {
val TRANSACTION_PROVIDER = config.getString("akka.jta.transaction-provider", "atomikos")
private implicit val defaultTransactionService = TRANSACTION_PROVIDER match {
case "atomikos" => AtomikosTransactionService
case _ => throw new IllegalArgumentException("Transaction provider [" + TRANSACTION_PROVIDER + "] is not supported")
}
private[TransactionContext] val stack = new scala.util.DynamicVariable(new TransactionContext)
implicit val tc = TransactionContainer()
private[TransactionContext] val stack = new scala.util.DynamicVariable(new TransactionContext(tc))
object Required extends TransactionMonad {
def map[T](f: TransactionMonad => T): T = withTxRequired { f(this) }
@ -157,9 +139,7 @@ object TransactionContext extends TransactionProtocol with Logging {
private[jta] def isRollbackOnly = current.isRollbackOnly
private[jta] def getTransactionManager: TransactionManager = current.getTransactionManager
private[jta] def getTransaction: Transaction = current.getTransactionManager.getTransaction
private[jta] def getTransactionContainer: TransactionContainer = current.getTransactionContainer
private[this] def current = stack.value
@ -171,14 +151,14 @@ object TransactionContext extends TransactionProtocol with Logging {
*/
private[jta] def withNewContext[T](body: => T): T = {
val suspendedTx: Option[Transaction] =
if (isInExistingTransaction(getTransactionManager)) {
if (getTransactionContainer.isInExistingTransaction) {
log.debug("Suspending TX")
Some(getTransactionManager.suspend)
Some(getTransactionContainer.suspend)
} else None
val result = stack.withValue(new TransactionContext) { body }
val result = stack.withValue(new TransactionContext(tc)) { body }
if (suspendedTx.isDefined) {
log.debug("Resuming TX")
getTransactionManager.resume(suspendedTx.get)
getTransactionContainer.resume(suspendedTx.get)
}
result
}
@ -189,9 +169,8 @@ object TransactionContext extends TransactionProtocol with Logging {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class TransactionContext(private implicit val transactionService: TransactionService) {
val tm: TransactionManager = transactionService.transactionManager
private def setRollbackOnly = tm.setRollbackOnly
private def isRollbackOnly: Boolean = tm.getStatus == Status.STATUS_MARKED_ROLLBACK
private def getTransactionManager: TransactionManager = tm
class TransactionContext(val tc: TransactionContainer) {
private def setRollbackOnly = tc.setRollbackOnly
private def isRollbackOnly: Boolean = tc.getStatus == Status.STATUS_MARKED_ROLLBACK
private def getTransactionContainer: TransactionContainer = tc
}

View file

@ -5,6 +5,7 @@
package se.scalablesolutions.akka.jta
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.stm.TransactionContainer
import javax.naming.{NamingException, Context, InitialContext}
import javax.transaction.{
@ -71,7 +72,7 @@ trait TransactionProtocol extends Logging {
* <pre>
* override def joinTransaction = {
* val em = TransactionContext.getEntityManager
* val tm = TransactionContext.getTransactionManager
* val tm = TransactionContext.getTransactionContainer
* val closeAtTxCompletion: Boolean)
* tm.getTransaction.registerSynchronization(new javax.transaction.Synchronization() {
* def beforeCompletion = {
@ -109,7 +110,7 @@ trait TransactionProtocol extends Logging {
* Here is an example on how to handle JPA exceptions.
*
* <pre>
* def handleException(tm: TransactionManager, e: Exception) = {
* def handleException(tm: TransactionContainer, e: Exception) = {
* if (isInExistingTransaction(tm)) {
* // Do not roll back in case of NoResultException or NonUniqueResultException
* if (!e.isInstanceOf[NoResultException] &&
@ -122,7 +123,7 @@ trait TransactionProtocol extends Logging {
* }
* </pre>
*/
def handleException(tm: TransactionManager, e: Exception) = {
def handleException(tm: TransactionContainer, e: Exception) = {
tm.setRollbackOnly
throw e
}
@ -133,7 +134,7 @@ trait TransactionProtocol extends Logging {
* Creates a new transaction if no transaction is active in scope, else joins the outer transaction.
*/
def withTxRequired[T](body: => T): T = {
val tm = TransactionContext.getTransactionManager
val tm = TransactionContext.getTransactionContainer
if (!isInExistingTransaction(tm)) {
tm.begin
try {
@ -154,7 +155,7 @@ trait TransactionProtocol extends Logging {
* commits or rollbacks new transaction, finally resumes previous transaction.
*/
def withTxRequiresNew[T](body: => T): T = TransactionContext.withNewContext {
val tm = TransactionContext.getTransactionManager
val tm = TransactionContext.getTransactionContainer
tm.begin
try {
joinTransaction
@ -191,7 +192,7 @@ trait TransactionProtocol extends Logging {
* Throws a TransactionRequiredException if there is no transaction active in scope.
*/
def withTxMandatory[T](body: => T): T = {
if (!isInExistingTransaction(TransactionContext.getTransactionManager))
if (!isInExistingTransaction(TransactionContext.getTransactionContainer))
throw new TransactionRequiredException("No active TX at method with TX type set to MANDATORY")
body
}
@ -202,12 +203,12 @@ trait TransactionProtocol extends Logging {
* Throws a SystemException in case of an existing transaction in scope.
*/
def withTxNever[T](body: => T): T = {
if (isInExistingTransaction(TransactionContext.getTransactionManager))
if (isInExistingTransaction(TransactionContext.getTransactionContainer))
throw new SystemException("Detected active TX at method with TX type set to NEVER")
body
}
protected def commitOrRollBack(tm: TransactionManager) = {
protected def commitOrRollBack(tm: TransactionContainer) = {
if (isInExistingTransaction(tm)) {
if (isRollbackOnly(tm)) {
log.debug("Rolling back TX marked as ROLLBACK_ONLY")
@ -229,7 +230,7 @@ trait TransactionProtocol extends Logging {
* @param tm the transaction manager
* @return boolean
*/
protected def isInExistingTransaction(tm: TransactionManager): Boolean =
protected def isInExistingTransaction(tm: TransactionContainer): Boolean =
tm.getStatus != Status.STATUS_NO_TRANSACTION
/**
@ -238,7 +239,7 @@ trait TransactionProtocol extends Logging {
* @param tm the transaction manager
* @return boolean
*/
protected def isRollbackOnly(tm: TransactionManager): Boolean =
protected def isRollbackOnly(tm: TransactionContainer): Boolean =
tm.getStatus == Status.STATUS_MARKED_ROLLBACK
/**

View file

@ -17,7 +17,7 @@
<akka>
version = "0.9"
# FQN to the class doing initial active object/actor
# FQN (Fully Qualified Name) to the class doing initial active object/actor
# supervisor bootstrap, should be defined in default constructor
boot = ["sample.camel.Boot",
"sample.rest.java.Boot",
@ -34,12 +34,13 @@
fair = on # should transactions be fair or non-fair (non fair yield better performance)
max-nr-of-retries = 1000 # max nr of retries of a failing transaction before giving up
timeout = 10000 # transaction timeout; if transaction has not committed within the timeout then it is aborted
jta-aware = off # 'on' means that if there JTA Transaction Manager available then the STM will begin (or join), commit or rollback the JTA transaction. Default is 'off'
jta-aware = off # 'on' means that if there JTA Transaction Manager available then the STM will
# begin (or join), commit or rollback the JTA transaction. Default is 'off'.
</stm>
<jta>
service = off # 'on' means that if there is a running JTA transaction then the STM will participate in it. Default is 'off'
timeout = 60 # timeout in seconds
provider = "from-jndi" # Options: "from-jndi", "atomikos"
timeout = 60000
</jta>
<rest>