diff --git a/akka-jta/src/main/scala/TransactionManagerDetector.scala b/akka-core/src/main/scala/stm/JtaTransactionManagerDetector.scala similarity index 95% rename from akka-jta/src/main/scala/TransactionManagerDetector.scala rename to akka-core/src/main/scala/stm/JtaTransactionManagerDetector.scala index f86d513ab3..30e9d3a33c 100644 --- a/akka-jta/src/main/scala/TransactionManagerDetector.scala +++ b/akka-core/src/main/scala/stm/JtaTransactionManagerDetector.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.jta +package se.scalablesolutions.akka.stm import javax.transaction.{TransactionManager, UserTransaction, SystemException} import javax.naming.{InitialContext, Context, NamingException} @@ -12,7 +12,7 @@ import se.scalablesolutions.akka.config.Config._ /** * @author Jonas Bonér */ -object TransactionManagerDetector { +object JtaTransactionManagerDetector { val DEFAULT_USER_TRANSACTION_NAME = "java:comp/UserTransaction" val FALLBACK_TRANSACTION_MANAGER_NAMES = List("java:comp/TransactionManager", "java:appserver/TransactionManager", diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala index 209c131781..44bb545a69 100644 --- a/akka-core/src/main/scala/stm/Transaction.scala +++ b/akka-core/src/main/scala/stm/Transaction.scala @@ -8,9 +8,12 @@ import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.TimeUnit +import javax.transaction.{TransactionManager, UserTransaction, Status} + import scala.collection.mutable.HashMap import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.config.Config._ import org.multiverse.api.{Transaction => MultiverseTransaction, TransactionLifecycleListener, TransactionLifecycleEvent} import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance @@ -272,9 +275,9 @@ object Transaction { createNewTransactionSet } else getTransactionSetInScope val tx = new Transaction + tx.begin tx.transaction = Some(mtx) setTransaction(Some(tx)) - txSet.registerOnCommitTask(new Runnable() { def run = tx.commit }) @@ -288,11 +291,20 @@ object Transaction { } /** - * The Akka specific Transaction class, keeping track of persistent data structures (as in on-disc). + * The Akka specific Transaction class, keeping track of persistent data structures (as in on-disc) + * and JTA support. * * @author Jonas Bonér */ @serializable class Transaction extends Logging { + val JTA_AWARE = config.getBool("akka.stm.jta-aware", false) + val jta: Either[Option[UserTransaction], Option[TransactionManager]] = if (JTA_AWARE) { + JtaTransactionManagerDetector.findUserTransaction match { + case None => Right(JtaTransactionManagerDetector.findTransactionManager) + case tm => Left(tm) + } + } else Left(None) + val id = Transaction.idFactory.incrementAndGet @volatile private[this] var status: TransactionStatus = TransactionStatus.New private[akka] var transaction: Option[MultiverseTransaction] = None @@ -303,16 +315,34 @@ object Transaction { // --- public methods --------- + def begin = synchronized { + jta match { + case Left(Some(userTx)) => if (!isJtaTxActive(userTx.getStatus)) userTx.begin + case Right(Some(txMan)) => if (!isJtaTxActive(txMan.getStatus)) txMan.begin + case _ => {} // do nothing + } + } + def commit = synchronized { log.trace("Committing transaction %s", toString) Transaction.atomic0 { persistentStateMap.valuesIterator.foreach(_.commit) } status = TransactionStatus.Completed + jta match { + case Left(Some(userTx)) => if (isJtaTxActive(userTx.getStatus)) userTx.commit + case Right(Some(txMan)) => if (isJtaTxActive(txMan.getStatus)) txMan.commit + case _ => {} // do nothing + } } def abort = synchronized { log.trace("Aborting transaction %s", toString) + jta match { + case Left(Some(userTx)) => if (isJtaTxActive(userTx.getStatus)) userTx.rollback + case Right(Some(txMan)) => if (isJtaTxActive(txMan.getStatus)) txMan.rollback + case _ => {} // do nothing + } } def isNew = synchronized { status == TransactionStatus.New } @@ -325,6 +355,8 @@ object Transaction { // --- internal methods --------- + private def isJtaTxActive(status: Int) = status == Status.STATUS_ACTIVE + private[akka] def status_? = status private[akka] def increment = depth.incrementAndGet diff --git a/config/akka-reference.conf b/config/akka-reference.conf index a8184d2662..8e4be0bab4 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -34,6 +34,7 @@ fair = on # should transactions be fair or non-fair (non fair yield better performance) max-nr-of-retries = 1000 # max nr of retries of a failing transaction before giving up timeout = 10000 # transaction timeout; if transaction has not committed within the timeout then it is aborted + jta-aware = off # 'on' means that if there JTA Transaction Manager available then the STM will begin (or join), commit or rollback the JTA transaction. Default is 'off' diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index a998d07b90..cf1e9c3e68 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -151,13 +151,14 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { val netty = "org.jboss.netty" % "netty" % "3.2.0.BETA1" % "compile" val commons_io = "commons-io" % "commons-io" % "1.4" % "compile" val dispatch_json = "net.databinder" % "dispatch-json_2.8.0.Beta1" % "0.6.6" % "compile" - val dispatch_htdisttp = "net.databinder" % "dispatch-http_2.8.0.Beta1" % "0.6.6" % "compile" + val dispatch_http = "net.databinder" % "dispatch-http_2.8.0.Beta1" % "0.6.6" % "compile" val sjson = "sjson.json" % "sjson" % "0.5-SNAPSHOT-2.8.Beta1" % "compile" val sbinary = "sbinary" % "sbinary" % "2.8.0.Beta1-2.8.0.Beta1-0.3.1-SNAPSHOT" % "compile" val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % "1.2.1" % "compile" val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % "1.2.1" % "compile" - val voldemort = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile" + val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile" val jsr166x = "jsr166x" % "jsr166x" % "1.0" % "compile" + val jta_1_1 = "org.apache.geronimo.specs" % "geronimo-jta_1.1_spec" % "1.1.1" % "compile" // testing val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test" val junit = "junit" % "junit" % "4.5" % "test"