diff --git a/akka-jta/src/main/scala/AtomikosTransactionService.scala b/akka-jta/src/main/scala/AtomikosTransactionService.scala new file mode 100644 index 0000000000..d688564686 --- /dev/null +++ b/akka-jta/src/main/scala/AtomikosTransactionService.scala @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.jta + +import javax.transaction.{TransactionManager, SystemException} + +import com.atomikos.icatch.jta.{J2eeTransactionManager, J2eeUserTransaction} +import com.atomikos.icatch.config.{TSInitInfo, UserTransactionService, UserTransactionServiceImp} + +/** + * Atomikos implementation of the transaction service trait. + * + * @author Jonas Bonér + */ +object AtomikosTransactionService extends TransactionService with TransactionProtocol { + + // FIXME: make configurable + val JTA_TRANSACTION_TIMEOUT = 60 + private val txService: UserTransactionService = new UserTransactionServiceImp + private val info: TSInitInfo = txService.createTSInitInfo + + val transactionManager = + try { + txService.init(info) + val tm: TransactionManager = new J2eeTransactionManager + tm.setTransactionTimeout(JTA_TRANSACTION_TIMEOUT) + tm + } catch { + case e => throw new SystemException("Could not create a new Atomikos J2EE Transaction Manager, due to: " + e.toString) + } + + // TODO: gracefully shutdown of the TM + //txService.shutdown(false) +} diff --git a/akka-jta/src/main/scala/TransactionContext.scala b/akka-jta/src/main/scala/TransactionContext.scala new file mode 100644 index 0000000000..3ed562eeb5 --- /dev/null +++ b/akka-jta/src/main/scala/TransactionContext.scala @@ -0,0 +1,181 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.jta + +import javax.transaction.{Transaction, Status, TransactionManager} + +import se.scalablesolutions.akka.util.Logging + +/** + * JTA Transaction service. + * + * @author Jonas Bonér + */ +trait TransactionService { + def transactionManager: TransactionManager +} + +/** + * Base monad for the transaction monad implementations. + * + * @author Jonas Bonér + */ +trait TransactionMonad { + + // ----------------------------- + // Monadic definitions + // ----------------------------- + + def map[T](f: TransactionMonad => T): T + def flatMap[T](f: TransactionMonad => T): T + def foreach(f: TransactionMonad => Unit): Unit + def filter(f: TransactionMonad => Boolean): TransactionMonad = + if (f(this)) this else TransactionContext.NoOpTransactionMonad + + // ----------------------------- + // JTA Transaction definitions + // ----------------------------- + + /** + * Returns the current Transaction. + */ + def getTransaction: Transaction = TransactionContext.getTransactionManager.getTransaction + + /** + * Marks the current transaction as doomed. + */ + def setRollbackOnly = TransactionContext.setRollbackOnly + + /** + * Marks the current transaction as doomed. + */ + def doom = TransactionContext.setRollbackOnly + + /** + * Checks if the current transaction is doomed. + */ + def isRollbackOnly = TransactionContext.isRollbackOnly + + /** + * Checks that the current transaction is NOT doomed. + */ + def isNotDoomed = !TransactionContext.isRollbackOnly +} + +/** + * Manages a thread-local stack of TransactionContexts. + *

+ * Choose TransactionService implementation by implicit definition of the implementation of choice, + * e.g. implicit val txService = TransactionServices.AtomikosTransactionService. + *

+ * Example usage 1: + *

+ * for {
+ *   ctx <- TransactionContext.Required
+ *   entity <- updatedEntities
+ *   if !ctx.isRollbackOnly
+ * } {
+ *   // transactional stuff
+ *   ...
+ * }
+ * 
+ * Example usage 2: + *
+ * val users = for {
+ *   ctx <- TransactionContext.Required
+ *   name <- userNames
+ * } yield {
+ *   // transactional stuff
+ *   ...
+ * }
+ * 
+ * + * @author Jonas Bonér + */ +object TransactionContext extends TransactionProtocol with Logging { + // FIXME: make configurable + private implicit val defaultTransactionService = AtomikosTransactionService + + private[TransactionContext] val stack = new scala.util.DynamicVariable(new TransactionContext) + + object Required extends TransactionMonad { + def map[T](f: TransactionMonad => T): T = withTxRequired { f(this) } + def flatMap[T](f: TransactionMonad => T): T = withTxRequired { f(this) } + def foreach(f: TransactionMonad => Unit): Unit = withTxRequired { f(this) } + } + + object RequiresNew extends TransactionMonad { + def map[T](f: TransactionMonad => T): T = withTxRequiresNew { f(this) } + def flatMap[T](f: TransactionMonad => T): T = withTxRequiresNew { f(this) } + def foreach(f: TransactionMonad => Unit): Unit = withTxRequiresNew { f(this) } + } + + object Supports extends TransactionMonad { + def map[T](f: TransactionMonad => T): T = withTxSupports { f(this) } + def flatMap[T](f: TransactionMonad => T): T = withTxSupports { f(this) } + def foreach(f: TransactionMonad => Unit): Unit = withTxSupports { f(this) } + } + + object Mandatory extends TransactionMonad { + def map[T](f: TransactionMonad => T): T = withTxMandatory { f(this) } + def flatMap[T](f: TransactionMonad => T): T = withTxMandatory { f(this) } + def foreach(f: TransactionMonad => Unit): Unit = withTxMandatory { f(this) } + } + + object Never extends TransactionMonad { + def map[T](f: TransactionMonad => T): T = withTxNever { f(this) } + def flatMap[T](f: TransactionMonad => T): T = withTxNever { f(this) } + def foreach(f: TransactionMonad => Unit): Unit = withTxNever { f(this) } + } + + object NoOpTransactionMonad extends TransactionMonad { + def map[T](f: TransactionMonad => T): T = f(this) + def flatMap[T](f: TransactionMonad => T): T = f(this) + def foreach(f: TransactionMonad => Unit): Unit = f(this) + override def filter(f: TransactionMonad => Boolean): TransactionMonad = this + } + + private[jta] def setRollbackOnly = current.setRollbackOnly + + private[jta] def isRollbackOnly = current.isRollbackOnly + + private[jta] def getTransactionManager: TransactionManager = current.getTransactionManager + + private[jta] def getTransaction: Transaction = current.getTransactionManager.getTransaction + + private[this] def current = stack.value + + /** + * Continues with the invocation defined in 'body' with the brand new context define in 'newCtx', the old + * one is put on the stack and will automatically come back in scope when the method exits. + *

+ * Suspends and resumes the current JTA transaction. + */ + private[jta] def withNewContext[T](body: => T): T = { + val suspendedTx: Option[Transaction] = + if (isInExistingTransaction(getTransactionManager)) { + log.debug("Suspending TX") + Some(getTransactionManager.suspend) + } else None + val result = stack.withValue(new TransactionContext) { body } + if (suspendedTx.isDefined) { + log.debug("Resuming TX") + getTransactionManager.resume(suspendedTx.get) + } + result + } +} + +/** + * Transaction context, holds the EntityManager and the TransactionManager. + * + * @author Jonas Bonér + */ +class TransactionContext(private implicit val transactionService: TransactionService) { + val tm: TransactionManager = transactionService.transactionManager + private def setRollbackOnly = tm.setRollbackOnly + private def isRollbackOnly: Boolean = tm.getStatus == Status.STATUS_MARKED_ROLLBACK + private def getTransactionManager: TransactionManager = tm +} diff --git a/akka-jta/src/main/scala/TransactionProtocol.scala b/akka-jta/src/main/scala/TransactionProtocol.scala new file mode 100644 index 0000000000..4a4c263eb1 --- /dev/null +++ b/akka-jta/src/main/scala/TransactionProtocol.scala @@ -0,0 +1,258 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.jta + +import se.scalablesolutions.akka.util.Logging + +import javax.naming.{NamingException, Context, InitialContext} +import javax.transaction.{ + Transaction, + UserTransaction, + TransactionManager, + Status, + RollbackException, + SystemException, + TransactionRequiredException +} + +/** + *

+ * Trait that implements a JTA transaction service that obeys the transaction semantics defined + * in the transaction attribute types for the transacted methods according to the EJB 3 draft specification. + * The aspect handles UserTransaction, TransactionManager instance variable injection thru @javax.ejb.Inject + * (name subject to change as per EJB 3 spec) and method transaction levels thru @javax.ejb.TransactionAttribute. + *

+ * + *

+ * This trait should be inherited to implement the getTransactionManager() method that should return a concrete + * javax.transaction.TransactionManager implementation (from JNDI lookup etc). + *

+ *

+ *

Transaction attribute semantics

+ * (From http://www.kevinboone.com/ejb-transactions.html) + *

+ *

+ *

Required

+ * 'Required' is probably the best choice (at least initially) for an EJB method that will need to be transactional. In this case, if the method's caller is already part of a transaction, then the EJB method does not create a new transaction, but continues in the same transaction as its caller. If the caller is not in a transaction, then a new transaction is created for the EJB method. If something happens in the EJB that means that a rollback is required, then the extent of the rollback will include everything done in the EJB method, whatever the condition of the caller. If the caller was in a transaction, then everything done by the caller will be rolled back as well. Thus the 'required' attribute ensures that any work done by the EJB will be rolled back if necessary, and if the caller requires a rollback that too will be rolled back. + *

+ *

+ *

RequiresNew

+ * 'RequiresNew' will be appropriate if you want to ensure that the EJB method is rolled back if necessary, but you don't want the rollback to propogate back to the caller. This attribute results in the creation of a new transaction for the method, regardless of the transactional state of the caller. If the caller was operating in a transaction, then its transaction is suspended until the EJB method completes. Because a new transaction is always created, there may be a slight performance penalty if this attribute is over-used. + *

+ *

+ *

Mandatory

+ * With the 'mandatory' attribute, the EJB method will not even start unless its caller is in a transaction. It will throw a TransactionRequiredException instead. If the method does start, then it will become part of the transaction of the caller. So if the EJB method signals a failure, the caller will be rolled back as well as the EJB. + *

+ *

+ *

Supports

+ * With this attribute, the EJB method does not care about the transactional context of its caller. If the caller is part of a transaction, then the EJB method will be part of the same transaction. If the EJB method fails, the transaction will roll back. If the caller is not part of a transaction, then the EJB method will still operate, but a failure will not cause anything to roll back. 'Supports' is probably the attribute that leads to the fastest method call (as there is no transactional overhead), but it can lead to unpredicatable results. If you want a method to be isolated from transactions, that is, to have no effect on the transaction of its caller, then use 'NotSupported' instead. + *

+ *

+ *

NotSupported

+ * With the 'NotSupported' attribute, the EJB method will never take part in a transaction. If the caller is part of a transaction, then the caller's transaction is suspended. If the EJB method fails, there will be no effect on the caller's transaction, and no rollback will occur. Use this method if you want to ensure that the EJB method will not cause a rollback in its caller. This is appropriate if, for example, the method does something non-essential, such as logging a message. It would not be helpful if the failure of this operation caused a transaction rollback. + *

+ *

+ *

Never

+ * The 'NotSupported'' attribute will ensure that the EJB method is never called by a transactional caller. Any attempt to do so will result in a RemoteException being thrown. This attribute is probably less useful than `NotSupported', in that NotSupported will assure that the caller's transaction is never affected by the EJB method (just as `Never' does), but will allow a call from a transactional caller if necessary. + *

+ * + * @author Jonas Bonér + */ +trait TransactionProtocol extends Logging { + + /** + * Join JTA transaction. Can be overriden by concrete transaction service implementation + * to hook into other transaction services. + *

+ * Here is an example on how to integrate with JPA EntityManager. + * + *

+   * override def joinTransaction = {
+   *   val em = TransactionContext.getEntityManager
+   *   val tm = TransactionContext.getTransactionManager
+   *   val closeAtTxCompletion: Boolean) 
+   *   tm.getTransaction.registerSynchronization(new javax.transaction.Synchronization() {
+   *     def beforeCompletion = {
+   *       try {
+   *         val status = tm.getStatus
+   *         if (status != Status.STATUS_ROLLEDBACK &&
+   *             status != Status.STATUS_ROLLING_BACK &&
+   *             status != Status.STATUS_MARKED_ROLLBACK) {
+   *           log.debug("Flushing EntityManager...") 
+   *           em.flush // flush EntityManager on success
+   *         }
+   *       } catch {
+   *         case e: javax.transaction.SystemException => throw new RuntimeException(e)
+   *       }
+   *     }
+   *
+   *     def afterCompletion(status: Int) = {
+   *       val status = tm.getStatus
+   *       if (closeAtTxCompletion) em.close
+   *       if (status == Status.STATUS_ROLLEDBACK ||
+   *           status == Status.STATUS_ROLLING_BACK ||
+   *           status == Status.STATUS_MARKED_ROLLBACK) {
+   *         em.close
+   *       }
+   *     }
+   *   })
+   *   em.joinTransaction // join JTA transaction
+   * }
+   */
+  def joinTransaction: Unit = {}
+
+  /**
+   * Handle exception. Can be overriden by concrete transaction service implementation.
+   * 

+ * Here is an example on how to handle JPA exceptions. + * + *

+   * def handleException(tm: TransactionManager, e: Exception) = {
+   *   if (isInExistingTransaction(tm)) {
+   *     // Do not roll back in case of NoResultException or NonUniqueResultException
+   *     if (!e.isInstanceOf[NoResultException] &&
+   *         !e.isInstanceOf[NonUniqueResultException]) {
+   *       log.debug("Setting TX to ROLLBACK_ONLY, due to: " + e)
+   *       tm.setRollbackOnly
+   *     }
+   *   }
+   *   throw e
+   * }
+   * 
+ */ + def handleException(tm: TransactionManager, e: Exception) = { + tm.setRollbackOnly + throw e + } + + /** + * Wraps body in a transaction with REQUIRED semantics. + *

+ * Creates a new transaction if no transaction is active in scope, else joins the outer transaction. + */ + def withTxRequired[T](body: => T): T = { + val tm = TransactionContext.getTransactionManager + if (!isInExistingTransaction(tm)) { + tm.begin + try { + joinTransaction + body + } catch { + case e: Exception => handleException(tm, e) + } finally { + commitOrRollBack(tm) + } + } else body + } + + /** + * Wraps body in a transaction with REQUIRES_NEW semantics. + *

+ * Suspends existing transaction, starts a new transaction, invokes body, + * commits or rollbacks new transaction, finally resumes previous transaction. + */ + def withTxRequiresNew[T](body: => T): T = TransactionContext.withNewContext { + val tm = TransactionContext.getTransactionManager + tm.begin + try { + joinTransaction + body + } catch { + case e: Exception => handleException(tm, e) + } finally { + commitOrRollBack(tm) + } + } + + /** + * Wraps body in a transaction with NOT_SUPPORTED semantics. + *

+ * Suspends existing transaction, invokes body, resumes transaction. + */ + def withTxNotSupported[T](body: => T): T = TransactionContext.withNewContext { + body + } + + /** + * Wraps body in a transaction with SUPPORTS semantics. + *

+ * Basicalla a No-op. + */ + def withTxSupports[T](body: => T): T = { + // attach to current if exists else skip -> do nothing + body + } + + /** + * Wraps body in a transaction with MANDATORY semantics. + *

+ * Throws a TransactionRequiredException if there is no transaction active in scope. + */ + def withTxMandatory[T](body: => T): T = { + if (!isInExistingTransaction(TransactionContext.getTransactionManager)) + throw new TransactionRequiredException("No active TX at method with TX type set to MANDATORY") + body + } + + /** + * Wraps body in a transaction with NEVER semantics. + *

+ * Throws a SystemException in case of an existing transaction in scope. + */ + def withTxNever[T](body: => T): T = { + if (isInExistingTransaction(TransactionContext.getTransactionManager)) + throw new SystemException("Detected active TX at method with TX type set to NEVER") + body + } + + protected def commitOrRollBack(tm: TransactionManager) = { + if (isInExistingTransaction(tm)) { + if (isRollbackOnly(tm)) { + log.debug("Rolling back TX marked as ROLLBACK_ONLY") + tm.rollback + } else { + log.debug("Committing TX") + tm.commit + } + } + } + + // --------------------------- + // Helper methods + // --------------------------- + + /** + * Checks if a transaction is an existing transaction. + * + * @param tm the transaction manager + * @return boolean + */ + protected def isInExistingTransaction(tm: TransactionManager): Boolean = + tm.getStatus != Status.STATUS_NO_TRANSACTION + + /** + * Checks if current transaction is set to rollback only. + * + * @param tm the transaction manager + * @return boolean + */ + protected def isRollbackOnly(tm: TransactionManager): Boolean = + tm.getStatus == Status.STATUS_MARKED_ROLLBACK + + /** + * A ThreadLocal variable where to store suspended TX and enable pay as you go + * before advice - after advice data sharing in a specific case of requiresNew TX + */ + private val suspendedTx = new ThreadLocal[Transaction] { + override def initialValue = null + } + + private def storeInThreadLocal(tx: Transaction) = suspendedTx.set(tx) + + private def fetchFromThreadLocal: Option[Transaction] = { + if (suspendedTx != null && suspendedTx.get() != null) Some(suspendedTx.get.asInstanceOf[Transaction]) + else None + } +} diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index b7c02135fc..1104535609 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -80,9 +80,10 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { val codehaus_snapshots = "Codehaus Snapshots" at "http://snapshots.repository.codehaus.org" val jboss = "jBoss" at "http://repository.jboss.org/maven2" val guiceyfruit = "GuiceyFruit" at "http://guiceyfruit.googlecode.com/svn/repo/releases/" - val google = "google" at "http://google-maven-repository.googlecode.com/svn/repository" - val m2 = "m2" at "http://download.java.net/maven/2" + val google = "Google" at "http://google-maven-repository.googlecode.com/svn/repository" + val java_net = "java.net" at "http://download.java.net/maven/2" val scala_tools_snapshots = "scala-tools snapshots" at "http://scala-tools.org/repo-snapshots" + val scala_tools_releases = "scala-tools releases" at "http://scala-tools.org/repo-releases" // ------------------------------------------------------------ // project defintions @@ -98,6 +99,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { lazy val akka_persistence = project("akka-persistence", "akka-persistence", new AkkaPersistenceParentProject(_)) lazy val akka_cluster = project("akka-cluster", "akka-cluster", new AkkaClusterParentProject(_)) lazy val akka_spring = project("akka-spring", "akka-spring", new AkkaSpringProject(_), akka_core) + lazy val akka_jta = project("akka-jta", "akka-jta", new AkkaJTAProject(_), akka_core) lazy val akka_servlet = project("akka-servlet", "akka-servlet", new AkkaServletProject(_), akka_core, akka_rest, akka_camel) lazy val akka_kernel = project("akka-kernel", "akka-kernel", new AkkaKernelProject(_), @@ -146,7 +148,8 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { " dist/akka-persistence-cassandra_%s-%s.jar".format(buildScalaVersion, version) + " dist/akka-servlet_%s-%s.jar".format(buildScalaVersion, version) + " dist/akka-kernel_%s-%s.jar".format(buildScalaVersion, version) + - " dist/akka-spring_%s-%s.jar".format(buildScalaVersion, version) + " dist/akka-spring_%s-%s.jar".format(buildScalaVersion, version) + + " dist/akka-jta_%s-%s.jar".format(buildScalaVersion, version) ) // ------------------------------------------------------------ @@ -324,6 +327,14 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { val junit = "junit" % "junit" % "4.5" % "test" } + class AkkaJTAProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { + val atomikos_transactions = "com.atomikos" % "transactions" % "3.2.3" % "compile" + val atomikos_transactions_jta = "com.atomikos" % "transactions-jta" % "3.2.3" % "compile" + val atomikos_transactions_api = "com.atomikos" % "transactions-api" % "3.2.3" % "compile" + //val atomikos_transactions_util = "com.atomikos" % "transactions-util" % "3.2.3" % "compile" + val jta_spec = "org.apache.geronimo.specs" % "geronimo-jta_1.1_spec" % "1.1.1" % "compile" + } + // examples class AkkaFunTestProject(info: ProjectInfo) extends DefaultProject(info) { val jackson_core_asl = "org.codehaus.jackson" % "jackson-core-asl" % "1.2.1" % "compile"