diff --git a/akka-jta/src/main/scala/AtomikosTransactionService.scala b/akka-jta/src/main/scala/AtomikosTransactionService.scala
new file mode 100644
index 0000000000..d688564686
--- /dev/null
+++ b/akka-jta/src/main/scala/AtomikosTransactionService.scala
@@ -0,0 +1,36 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ * Trait that implements a JTA transaction service that obeys the transaction semantics defined
+ * in the transaction attribute types for the transacted methods according to the EJB 3 draft specification.
+ * The aspect handles UserTransaction, TransactionManager instance variable injection thru @javax.ejb.Inject
+ * (name subject to change as per EJB 3 spec) and method transaction levels thru @javax.ejb.TransactionAttribute.
+ *
+ * This trait should be inherited to implement the getTransactionManager() method that should return a concrete
+ * javax.transaction.TransactionManager implementation (from JNDI lookup etc).
+ *
+ * implicit val txService = TransactionServices.AtomikosTransactionService.
+ *
+ * Example usage 1:
+ *
+ * for {
+ * ctx <- TransactionContext.Required
+ * entity <- updatedEntities
+ * if !ctx.isRollbackOnly
+ * } {
+ * // transactional stuff
+ * ...
+ * }
+ *
+ * Example usage 2:
+ *
+ * val users = for {
+ * ctx <- TransactionContext.Required
+ * name <- userNames
+ * } yield {
+ * // transactional stuff
+ * ...
+ * }
+ *
+ *
+ * @author Jonas Bonér
+ */
+object TransactionContext extends TransactionProtocol with Logging {
+ // FIXME: make configurable
+ private implicit val defaultTransactionService = AtomikosTransactionService
+
+ private[TransactionContext] val stack = new scala.util.DynamicVariable(new TransactionContext)
+
+ object Required extends TransactionMonad {
+ def map[T](f: TransactionMonad => T): T = withTxRequired { f(this) }
+ def flatMap[T](f: TransactionMonad => T): T = withTxRequired { f(this) }
+ def foreach(f: TransactionMonad => Unit): Unit = withTxRequired { f(this) }
+ }
+
+ object RequiresNew extends TransactionMonad {
+ def map[T](f: TransactionMonad => T): T = withTxRequiresNew { f(this) }
+ def flatMap[T](f: TransactionMonad => T): T = withTxRequiresNew { f(this) }
+ def foreach(f: TransactionMonad => Unit): Unit = withTxRequiresNew { f(this) }
+ }
+
+ object Supports extends TransactionMonad {
+ def map[T](f: TransactionMonad => T): T = withTxSupports { f(this) }
+ def flatMap[T](f: TransactionMonad => T): T = withTxSupports { f(this) }
+ def foreach(f: TransactionMonad => Unit): Unit = withTxSupports { f(this) }
+ }
+
+ object Mandatory extends TransactionMonad {
+ def map[T](f: TransactionMonad => T): T = withTxMandatory { f(this) }
+ def flatMap[T](f: TransactionMonad => T): T = withTxMandatory { f(this) }
+ def foreach(f: TransactionMonad => Unit): Unit = withTxMandatory { f(this) }
+ }
+
+ object Never extends TransactionMonad {
+ def map[T](f: TransactionMonad => T): T = withTxNever { f(this) }
+ def flatMap[T](f: TransactionMonad => T): T = withTxNever { f(this) }
+ def foreach(f: TransactionMonad => Unit): Unit = withTxNever { f(this) }
+ }
+
+ object NoOpTransactionMonad extends TransactionMonad {
+ def map[T](f: TransactionMonad => T): T = f(this)
+ def flatMap[T](f: TransactionMonad => T): T = f(this)
+ def foreach(f: TransactionMonad => Unit): Unit = f(this)
+ override def filter(f: TransactionMonad => Boolean): TransactionMonad = this
+ }
+
+ private[jta] def setRollbackOnly = current.setRollbackOnly
+
+ private[jta] def isRollbackOnly = current.isRollbackOnly
+
+ private[jta] def getTransactionManager: TransactionManager = current.getTransactionManager
+
+ private[jta] def getTransaction: Transaction = current.getTransactionManager.getTransaction
+
+ private[this] def current = stack.value
+
+ /**
+ * Continues with the invocation defined in 'body' with the brand new context define in 'newCtx', the old
+ * one is put on the stack and will automatically come back in scope when the method exits.
+ *
+ * Suspends and resumes the current JTA transaction.
+ */
+ private[jta] def withNewContext[T](body: => T): T = {
+ val suspendedTx: Option[Transaction] =
+ if (isInExistingTransaction(getTransactionManager)) {
+ log.debug("Suspending TX")
+ Some(getTransactionManager.suspend)
+ } else None
+ val result = stack.withValue(new TransactionContext) { body }
+ if (suspendedTx.isDefined) {
+ log.debug("Resuming TX")
+ getTransactionManager.resume(suspendedTx.get)
+ }
+ result
+ }
+}
+
+/**
+ * Transaction context, holds the EntityManager and the TransactionManager.
+ *
+ * @author Jonas Bonér
+ */
+class TransactionContext(private implicit val transactionService: TransactionService) {
+ val tm: TransactionManager = transactionService.transactionManager
+ private def setRollbackOnly = tm.setRollbackOnly
+ private def isRollbackOnly: Boolean = tm.getStatus == Status.STATUS_MARKED_ROLLBACK
+ private def getTransactionManager: TransactionManager = tm
+}
diff --git a/akka-jta/src/main/scala/TransactionProtocol.scala b/akka-jta/src/main/scala/TransactionProtocol.scala
new file mode 100644
index 0000000000..4a4c263eb1
--- /dev/null
+++ b/akka-jta/src/main/scala/TransactionProtocol.scala
@@ -0,0 +1,258 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB Transaction attribute semantics
+ * (From http://www.kevinboone.com/ejb-transactions.html)
+ *
+ *
+ *
+ *
TransactionRequiredException instead. If the method does start, then it will become part of the transaction of the caller. So if the EJB method signals a failure, the caller will be rolled back as well as the EJB.
+ *
+ * + *
+ *
+ *
RemoteException being thrown. This attribute is probably less useful than `NotSupported', in that NotSupported will assure that the caller's transaction is never affected by the EJB method (just as `Never' does), but will allow a call from a transactional caller if necessary.
+ *
+ *
+ * @author Jonas Bonér
+ */
+trait TransactionProtocol extends Logging {
+
+ /**
+ * Join JTA transaction. Can be overriden by concrete transaction service implementation
+ * to hook into other transaction services.
+ *
+ * Here is an example on how to integrate with JPA EntityManager.
+ *
+ *
+ * override def joinTransaction = {
+ * val em = TransactionContext.getEntityManager
+ * val tm = TransactionContext.getTransactionManager
+ * val closeAtTxCompletion: Boolean)
+ * tm.getTransaction.registerSynchronization(new javax.transaction.Synchronization() {
+ * def beforeCompletion = {
+ * try {
+ * val status = tm.getStatus
+ * if (status != Status.STATUS_ROLLEDBACK &&
+ * status != Status.STATUS_ROLLING_BACK &&
+ * status != Status.STATUS_MARKED_ROLLBACK) {
+ * log.debug("Flushing EntityManager...")
+ * em.flush // flush EntityManager on success
+ * }
+ * } catch {
+ * case e: javax.transaction.SystemException => throw new RuntimeException(e)
+ * }
+ * }
+ *
+ * def afterCompletion(status: Int) = {
+ * val status = tm.getStatus
+ * if (closeAtTxCompletion) em.close
+ * if (status == Status.STATUS_ROLLEDBACK ||
+ * status == Status.STATUS_ROLLING_BACK ||
+ * status == Status.STATUS_MARKED_ROLLBACK) {
+ * em.close
+ * }
+ * }
+ * })
+ * em.joinTransaction // join JTA transaction
+ * }
+ */
+ def joinTransaction: Unit = {}
+
+ /**
+ * Handle exception. Can be overriden by concrete transaction service implementation.
+ *
+ * Here is an example on how to handle JPA exceptions.
+ *
+ *
+ * def handleException(tm: TransactionManager, e: Exception) = {
+ * if (isInExistingTransaction(tm)) {
+ * // Do not roll back in case of NoResultException or NonUniqueResultException
+ * if (!e.isInstanceOf[NoResultException] &&
+ * !e.isInstanceOf[NonUniqueResultException]) {
+ * log.debug("Setting TX to ROLLBACK_ONLY, due to: " + e)
+ * tm.setRollbackOnly
+ * }
+ * }
+ * throw e
+ * }
+ *
+ */
+ def handleException(tm: TransactionManager, e: Exception) = {
+ tm.setRollbackOnly
+ throw e
+ }
+
+ /**
+ * Wraps body in a transaction with REQUIRED semantics.
+ *
+ * Creates a new transaction if no transaction is active in scope, else joins the outer transaction.
+ */
+ def withTxRequired[T](body: => T): T = {
+ val tm = TransactionContext.getTransactionManager
+ if (!isInExistingTransaction(tm)) {
+ tm.begin
+ try {
+ joinTransaction
+ body
+ } catch {
+ case e: Exception => handleException(tm, e)
+ } finally {
+ commitOrRollBack(tm)
+ }
+ } else body
+ }
+
+ /**
+ * Wraps body in a transaction with REQUIRES_NEW semantics.
+ *
+ * Suspends existing transaction, starts a new transaction, invokes body,
+ * commits or rollbacks new transaction, finally resumes previous transaction.
+ */
+ def withTxRequiresNew[T](body: => T): T = TransactionContext.withNewContext {
+ val tm = TransactionContext.getTransactionManager
+ tm.begin
+ try {
+ joinTransaction
+ body
+ } catch {
+ case e: Exception => handleException(tm, e)
+ } finally {
+ commitOrRollBack(tm)
+ }
+ }
+
+ /**
+ * Wraps body in a transaction with NOT_SUPPORTED semantics.
+ *
+ * Suspends existing transaction, invokes body, resumes transaction.
+ */
+ def withTxNotSupported[T](body: => T): T = TransactionContext.withNewContext {
+ body
+ }
+
+ /**
+ * Wraps body in a transaction with SUPPORTS semantics.
+ *
+ * Basicalla a No-op.
+ */
+ def withTxSupports[T](body: => T): T = {
+ // attach to current if exists else skip -> do nothing
+ body
+ }
+
+ /**
+ * Wraps body in a transaction with MANDATORY semantics.
+ *
+ * Throws a TransactionRequiredException if there is no transaction active in scope.
+ */
+ def withTxMandatory[T](body: => T): T = {
+ if (!isInExistingTransaction(TransactionContext.getTransactionManager))
+ throw new TransactionRequiredException("No active TX at method with TX type set to MANDATORY")
+ body
+ }
+
+ /**
+ * Wraps body in a transaction with NEVER semantics.
+ *
+ * Throws a SystemException in case of an existing transaction in scope.
+ */
+ def withTxNever[T](body: => T): T = {
+ if (isInExistingTransaction(TransactionContext.getTransactionManager))
+ throw new SystemException("Detected active TX at method with TX type set to NEVER")
+ body
+ }
+
+ protected def commitOrRollBack(tm: TransactionManager) = {
+ if (isInExistingTransaction(tm)) {
+ if (isRollbackOnly(tm)) {
+ log.debug("Rolling back TX marked as ROLLBACK_ONLY")
+ tm.rollback
+ } else {
+ log.debug("Committing TX")
+ tm.commit
+ }
+ }
+ }
+
+ // ---------------------------
+ // Helper methods
+ // ---------------------------
+
+ /**
+ * Checks if a transaction is an existing transaction.
+ *
+ * @param tm the transaction manager
+ * @return boolean
+ */
+ protected def isInExistingTransaction(tm: TransactionManager): Boolean =
+ tm.getStatus != Status.STATUS_NO_TRANSACTION
+
+ /**
+ * Checks if current transaction is set to rollback only.
+ *
+ * @param tm the transaction manager
+ * @return boolean
+ */
+ protected def isRollbackOnly(tm: TransactionManager): Boolean =
+ tm.getStatus == Status.STATUS_MARKED_ROLLBACK
+
+ /**
+ * A ThreadLocal variable where to store suspended TX and enable pay as you go
+ * before advice - after advice data sharing in a specific case of requiresNew TX
+ */
+ private val suspendedTx = new ThreadLocal[Transaction] {
+ override def initialValue = null
+ }
+
+ private def storeInThreadLocal(tx: Transaction) = suspendedTx.set(tx)
+
+ private def fetchFromThreadLocal: Option[Transaction] = {
+ if (suspendedTx != null && suspendedTx.get() != null) Some(suspendedTx.get.asInstanceOf[Transaction])
+ else None
+ }
+}
diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala
index b7c02135fc..1104535609 100644
--- a/project/build/AkkaProject.scala
+++ b/project/build/AkkaProject.scala
@@ -80,9 +80,10 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val codehaus_snapshots = "Codehaus Snapshots" at "http://snapshots.repository.codehaus.org"
val jboss = "jBoss" at "http://repository.jboss.org/maven2"
val guiceyfruit = "GuiceyFruit" at "http://guiceyfruit.googlecode.com/svn/repo/releases/"
- val google = "google" at "http://google-maven-repository.googlecode.com/svn/repository"
- val m2 = "m2" at "http://download.java.net/maven/2"
+ val google = "Google" at "http://google-maven-repository.googlecode.com/svn/repository"
+ val java_net = "java.net" at "http://download.java.net/maven/2"
val scala_tools_snapshots = "scala-tools snapshots" at "http://scala-tools.org/repo-snapshots"
+ val scala_tools_releases = "scala-tools releases" at "http://scala-tools.org/repo-releases"
// ------------------------------------------------------------
// project defintions
@@ -98,6 +99,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
lazy val akka_persistence = project("akka-persistence", "akka-persistence", new AkkaPersistenceParentProject(_))
lazy val akka_cluster = project("akka-cluster", "akka-cluster", new AkkaClusterParentProject(_))
lazy val akka_spring = project("akka-spring", "akka-spring", new AkkaSpringProject(_), akka_core)
+ lazy val akka_jta = project("akka-jta", "akka-jta", new AkkaJTAProject(_), akka_core)
lazy val akka_servlet = project("akka-servlet", "akka-servlet", new AkkaServletProject(_),
akka_core, akka_rest, akka_camel)
lazy val akka_kernel = project("akka-kernel", "akka-kernel", new AkkaKernelProject(_),
@@ -146,7 +148,8 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
" dist/akka-persistence-cassandra_%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-servlet_%s-%s.jar".format(buildScalaVersion, version) +
" dist/akka-kernel_%s-%s.jar".format(buildScalaVersion, version) +
- " dist/akka-spring_%s-%s.jar".format(buildScalaVersion, version)
+ " dist/akka-spring_%s-%s.jar".format(buildScalaVersion, version) +
+ " dist/akka-jta_%s-%s.jar".format(buildScalaVersion, version)
)
// ------------------------------------------------------------
@@ -324,6 +327,14 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val junit = "junit" % "junit" % "4.5" % "test"
}
+ class AkkaJTAProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
+ val atomikos_transactions = "com.atomikos" % "transactions" % "3.2.3" % "compile"
+ val atomikos_transactions_jta = "com.atomikos" % "transactions-jta" % "3.2.3" % "compile"
+ val atomikos_transactions_api = "com.atomikos" % "transactions-api" % "3.2.3" % "compile"
+ //val atomikos_transactions_util = "com.atomikos" % "transactions-util" % "3.2.3" % "compile"
+ val jta_spec = "org.apache.geronimo.specs" % "geronimo-jta_1.1_spec" % "1.1.1" % "compile"
+ }
+
// examples
class AkkaFunTestProject(info: ProjectInfo) extends DefaultProject(info) {
val jackson_core_asl = "org.codehaus.jackson" % "jackson-core-asl" % "1.2.1" % "compile"