From b98cfd5c1f102fde062b3cee930a9a4e837ef077 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Tue, 13 Jul 2010 12:37:11 +0200 Subject: [PATCH] Fixed 3 bugs in Active Objects and Actor supervision + changed to use Multiverse tryJoinCommit + improved logging + added more tracing + various misc fixes --- .../src/main/scala/actor/ActiveObject.scala | 72 ++--- akka-core/src/main/scala/actor/ActorRef.scala | 276 +++++++++--------- .../main/scala/dispatch/MessageHandling.scala | 2 +- akka-core/src/main/scala/stm/JTA.scala | 39 ++- .../src/main/scala/stm/Transaction.scala | 7 +- .../main/scala/stm/TransactionFactory.scala | 6 +- .../scala/stm/TransactionManagement.scala | 32 +- .../akka/actor/TransactionalActiveObject.java | 3 + .../scala/TransactionalActiveObjectSpec.scala | 17 +- 9 files changed, 247 insertions(+), 207 deletions(-) diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index a545f9f633..c794da9f47 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -474,9 +474,9 @@ object ActiveObject extends Logging { val parent = clazz.getSuperclass if (parent != null) injectActiveObjectContext0(activeObject, parent) else { - log.trace( - "Can't set 'ActiveObjectContext' for ActiveObject [%s] since no field of this type could be found.", - activeObject.getClass.getName) + log.ifTrace("Can't set 'ActiveObjectContext' for ActiveObject [" + + activeObject.getClass.getName + + "] since no field of this type could be found.") None } } @@ -486,7 +486,6 @@ object ActiveObject extends Logging { private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor = Supervisor(SupervisorConfig(restartStrategy, components)) - } private[akka] object AspectInitRegistry extends ListenerManagement { @@ -634,11 +633,12 @@ private[akka] sealed class ActiveObjectAspect { joinPoint: JoinPoint, isOneWay: Boolean, isVoid: Boolean, sender: AnyRef, senderFuture: CompletableFuture[Any]) { override def toString: String = synchronized { - "Invocation [joinPoint: " + joinPoint.toString + - ", isOneWay: " + isOneWay + - ", isVoid: " + isVoid + - ", sender: " + sender + - ", senderFuture: " + senderFuture + + "Invocation [" + + "\n\t\tmethod = " + joinPoint.getRtti.asInstanceOf[MethodRtti].getMethod.getName + " @ " + joinPoint.getTarget.getClass.getName + + "\n\t\tisOneWay = " + isOneWay + + "\n\t\tisVoid = " + isVoid + + "\n\t\tsender = " + sender + + "\n\t\tsenderFuture = " + senderFuture + "]" } @@ -758,14 +758,14 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, } def receive = { - case Invocation(joinPoint, isOneWay, _, sender, senderFuture) => + case invocation @ Invocation(joinPoint, isOneWay, _, sender, senderFuture) => + ActiveObject.log.ifTrace("Invoking active object with message:\n" + invocation) context.foreach { ctx => if (sender ne null) ctx._sender = sender if (senderFuture ne null) ctx._senderFuture = senderFuture } ActiveObjectContext.sender.value = joinPoint.getThis // set next sender self.senderFuture.foreach(ActiveObjectContext.senderFuture.value = _) - if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint) if (isOneWay) joinPoint.proceed else self.reply(joinPoint.proceed) @@ -773,61 +773,53 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, // Jan Kronquist: started work on issue 121 case Link(target) => self.link(target) case Unlink(target) => self.unlink(target) - case unexpected => - throw new IllegalActorStateException("Unexpected message [" + unexpected + "] sent to [" + this + "]") + case unexpected => throw new IllegalActorStateException( + "Unexpected message [" + unexpected + "] sent to [" + this + "]") } override def preRestart(reason: Throwable) { try { - // Since preRestart is called we know that this dispatcher - // is about to be restarted. Put the instance in a thread - // local so the new dispatcher can be initialized with the contents of the - // old. - //FIXME - This should be considered as a workaround. - crashedActorTl.set(this) - if (preRestart.isDefined) preRestart.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) + // Since preRestart is called we know that this dispatcher + // is about to be restarted. Put the instance in a thread + // local so the new dispatcher can be initialized with the + // contents of the old. + //FIXME - This should be considered as a workaround. + crashedActorTl.set(this) + preRestart.foreach(_.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)) } catch { case e: InvocationTargetException => throw e.getCause } } override def postRestart(reason: Throwable) { try { - - if (postRestart.isDefined) { - postRestart.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) - } + postRestart.foreach(_.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)) } catch { case e: InvocationTargetException => throw e.getCause } } override def init = { - // Get the crashed dispatcher from thread local and intitialize this actor with the - // contents of the old dispatcher - val oldActor = crashedActorTl.get(); - if(oldActor != null) { - initialize(oldActor.targetClass,oldActor.target.get,oldActor.context) - crashedActorTl.set(null) - } + // Get the crashed dispatcher from thread local and intitialize this actor with the + // contents of the old dispatcher + val oldActor = crashedActorTl.get(); + if (oldActor != null) { + initialize(oldActor.targetClass, oldActor.target.get, oldActor.context) + crashedActorTl.set(null) + } } override def shutdown = { try { - if (zhutdown.isDefined) { - zhutdown.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) - } - } catch { - case e: InvocationTargetException => throw e.getCause - } finally { + zhutdown.foreach(_.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)) + } catch { case e: InvocationTargetException => throw e.getCause + } finally { AspectInitRegistry.unregister(target.get); } } override def initTransactionalState = { - try { + try { if (initTxState.isDefined && target.isDefined) initTxState.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) } catch { case e: InvocationTargetException => throw e.getCause } } - - private def serializeArguments(joinPoint: JoinPoint) = { val args = joinPoint.getRtti.asInstanceOf[MethodRtti].getParameterValues var unserializable = false diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 7543817343..5d39a46d82 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -10,26 +10,27 @@ import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, F import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.stm.global._ import se.scalablesolutions.akka.stm.TransactionManagement._ -import se.scalablesolutions.akka.stm.TransactionManagement +import se.scalablesolutions.akka.stm.{TransactionManagement, TransactionSetAbortedException} import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, MessageSerializer, RemoteRequestProtocolIdFactory} import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.util.{HashCode, Logging, UUID, ReentrantGuard} +import RemoteActorSerialization._ import org.multiverse.api.ThreadLocalTransaction._ import org.multiverse.commitbarriers.CountDownCommitBarrier - -import jsr166x.{Deque, ConcurrentLinkedDeque} +import org.multiverse.api.exceptions.DeadTransactionException import java.net.InetSocketAddress import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.{Map => JMap} import java.lang.reflect.Field -import RemoteActorSerialization._ + +import jsr166x.{Deque, ConcurrentLinkedDeque} import com.google.protobuf.ByteString -import java.util.concurrent.{ConcurrentHashMap, TimeUnit} /** * ActorRef is an immutable and serializable handle to an Actor. @@ -582,8 +583,22 @@ sealed class LocalActorRef private[akka]( private[this] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None)) extends ActorRef { - private var isDeserialized = false - private var loader: Option[ClassLoader] = None + @volatile private[akka] var _remoteAddress: Option[InetSocketAddress] = None // only mutable to maintain identity across nodes + @volatile private[akka] var _linkedActors: Option[ConcurrentHashMap[String, ActorRef]] = None + @volatile private[akka] var _supervisor: Option[ActorRef] = None + @volatile private var isInInitialization = false + @volatile private var runActorInitialization = false + @volatile private var isDeserialized = false + @volatile private var loader: Option[ClassLoader] = None + + protected[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation] + protected[this] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) } + + // Needed to be able to null out the 'val self: ActorRef' member variables to make the Actor + // instance elegible for garbage collection + private val actorSelfFields = findActorSelfField(actor.getClass) + + if (runActorInitialization && !isDeserialized) initializeActorInstance private[akka] def this(clazz: Class[_ <: Actor]) = this(Left(Some(clazz))) private[akka] def this(factory: () => Actor) = this(Right(Some(factory))) @@ -629,22 +644,7 @@ sealed class LocalActorRef private[akka]( ActorRegistry.register(this) } - // Only mutable for RemoteServer in order to maintain identity across nodes - @volatile private[akka] var _remoteAddress: Option[InetSocketAddress] = None - @volatile private[akka] var _linkedActors: Option[ConcurrentHashMap[String, ActorRef]] = None - @volatile private[akka] var _supervisor: Option[ActorRef] = None - - protected[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation] - protected[this] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) } - - @volatile private var isInInitialization = false - @volatile private var runActorInitialization = false - - // Needed to be able to null out the 'val self: ActorRef' member variables to make the Actor - // instance elegible for garbage collection - private val actorSelfFields = findActorSelfField(actor.getClass) - - if (runActorInitialization && !isDeserialized) initializeActorInstance + // ========= PUBLIC FUNCTIONS ========= /** * Returns the mailbox. @@ -903,40 +903,10 @@ sealed class LocalActorRef private[akka]( */ def supervisor: Option[ActorRef] = guard.withGuard { _supervisor } + // ========= AKKA PROTECTED FUNCTIONS ========= + protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = guard.withGuard { _supervisor = sup } - private def spawnButDoNotStart[T <: Actor: Manifest]: ActorRef = guard.withGuard { - val actorRef = Actor.actorOf(manifest[T].erasure.asInstanceOf[Class[T]].newInstance) - if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) actorRef.dispatcher = dispatcher - actorRef - } - - private[this] def newActor: Actor = { - isInInitialization = true - Actor.actorRefInCreation.value = Some(this) - val actor = actorFactory match { - case Left(Some(clazz)) => - try { - clazz.newInstance - } catch { - case e: InstantiationException => throw new ActorInitializationException( - "Could not instantiate Actor due to:\n" + e + - "\nMake sure Actor is NOT defined inside a class/trait," + - "\nif so put it outside the class/trait, f.e. in a companion object," + - "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.") - } - case Right(Some(factory)) => - factory() - case _ => - throw new ActorInitializationException( - "Can't create Actor, no Actor class or factory function in scope") - } - if (actor eq null) throw new ActorInitializationException( - "Actor instance passed to ActorRef can not be 'null'") - isInInitialization = false - actor - } - protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { joinTransaction(message) @@ -975,15 +945,6 @@ sealed class LocalActorRef private[akka]( } } - private def joinTransaction(message: Any) = if (isTransactionSetInScope) { - import org.multiverse.api.ThreadLocalTransaction - val txSet = getTransactionSetInScope - Actor.log.trace("Joining transaction set [%s];\n\tactor %s\n\twith message [%s]", txSet, toString, message) // FIXME test to run bench without this trace call - val mtx = ThreadLocalTransaction.getThreadLocalTransaction - if ((mtx eq null) || mtx.getStatus.isDead) txSet.incParties - else txSet.incParties(mtx, 1) - } - /** * Callback for the dispatcher. This is the ingle entry point to the user Actor implementation. */ @@ -1003,58 +964,6 @@ sealed class LocalActorRef private[akka]( } } - private def dispatch[T](messageHandle: MessageInvocation) = { - val message = messageHandle.message //serializeMessage(messageHandle.message) - var topLevelTransaction = false - val txSet: Option[CountDownCommitBarrier] = - if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet - else { - topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx - if (isTransactor) { - Actor.log.trace( - "Creating a new transaction set (top-level transaction)\n\tfor actor %s\n\twith message %s", - toString, messageHandle) - Some(createNewTransactionSet) - } else None - } - setTransactionSet(txSet) - - try { - cancelReceiveTimeout // FIXME: leave this here? - if (isTransactor) { - val txFactory = _transactionFactory.getOrElse(DefaultGlobalTransactionFactory) - atomic(txFactory) { - actor.base(message) - setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit - } - } else { - actor.base(message) - setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit - } - } catch { - case e => - _isBeingRestarted = true - // abort transaction set - if (isTransactionSetInScope) { - val txSet = getTransactionSetInScope - Actor.log.debug("Aborting transaction set [%s]", txSet) - txSet.abort - } - Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message) - - senderFuture.foreach(_.completeWithException(this, e)) - - clearTransaction - if (topLevelTransaction) clearTransactionSet - - // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client - if (_supervisor.isDefined) _supervisor.get ! Exit(this, e) - } finally { - clearTransaction - if (topLevelTransaction) clearTransactionSet - } - } - protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = { if (trapExit.exists(_.isAssignableFrom(reason.getClass))) { if (faultHandler.isDefined) { @@ -1075,6 +984,7 @@ sealed class LocalActorRef private[akka]( } protected[akka] def restart(reason: Throwable): Unit = { + _isBeingRestarted = true val failedActor = actorInstance.get failedActor.synchronized { lifeCycle.get match { @@ -1117,20 +1027,6 @@ sealed class LocalActorRef private[akka]( } } - private def shutDownTemporaryActor(temporaryActor: ActorRef) = { - Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", temporaryActor.id) - temporaryActor.stop - linkedActors.remove(temporaryActor.uuid) // remove the temporary actor - // if last temporary actor is gone, then unlink me from supervisor - if (linkedActors.isEmpty) { - Actor.log.info( - "All linked actors have died permanently (they were all configured as TEMPORARY)" + - "\n\tshutting down and unlinking supervisor actor as well [%s].", - temporaryActor.id) - _supervisor.foreach(_ ! UnlinkAndStop(this)) - } - } - protected[akka] def registerSupervisorAsRemoteActor: Option[String] = guard.withGuard { if (_supervisor.isDefined) { RemoteClient.clientFor(remoteAddress.get).registerSupervisorForActor(this) @@ -1149,6 +1045,126 @@ sealed class LocalActorRef private[akka]( protected[akka] def linkedActorsAsList: List[ActorRef] = linkedActors.values.toArray.toList.asInstanceOf[List[ActorRef]] + // ========= PRIVATE FUNCTIONS ========= + + private def spawnButDoNotStart[T <: Actor: Manifest]: ActorRef = guard.withGuard { + val actorRef = Actor.actorOf(manifest[T].erasure.asInstanceOf[Class[T]].newInstance) + if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) actorRef.dispatcher = dispatcher + actorRef + } + + private[this] def newActor: Actor = { + isInInitialization = true + Actor.actorRefInCreation.value = Some(this) + val actor = actorFactory match { + case Left(Some(clazz)) => + try { + clazz.newInstance + } catch { + case e: InstantiationException => throw new ActorInitializationException( + "Could not instantiate Actor due to:\n" + e + + "\nMake sure Actor is NOT defined inside a class/trait," + + "\nif so put it outside the class/trait, f.e. in a companion object," + + "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.") + } + case Right(Some(factory)) => + factory() + case _ => + throw new ActorInitializationException( + "Can't create Actor, no Actor class or factory function in scope") + } + if (actor eq null) throw new ActorInitializationException( + "Actor instance passed to ActorRef can not be 'null'") + isInInitialization = false + actor + } + + private def joinTransaction(message: Any) = if (isTransactionSetInScope) { + import org.multiverse.api.ThreadLocalTransaction + val oldTxSet = getTransactionSetInScope + val currentTxSet = if (oldTxSet.isAborted || oldTxSet.isCommitted) { + clearTransactionSet + createNewTransactionSet + } else oldTxSet + Actor.log.ifTrace("Joining transaction set [" + currentTxSet + "];\n\tactor " + toString + "\n\twith message [" + message + "]") + val mtx = ThreadLocalTransaction.getThreadLocalTransaction + if ((mtx eq null) || mtx.getStatus.isDead) currentTxSet.incParties + else currentTxSet.incParties(mtx, 1) + } + + private def dispatch[T](messageHandle: MessageInvocation) = { + Actor.log.ifTrace("Invoking actor with message:\n" + messageHandle) + val message = messageHandle.message //serializeMessage(messageHandle.message) + var topLevelTransaction = false + val txSet: Option[CountDownCommitBarrier] = + if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet + else { + topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx + if (isTransactor) { + Actor.log.ifTrace("Creating a new transaction set (top-level transaction)\n\tfor actor " + toString + "\n\twith message " + messageHandle) + Some(createNewTransactionSet) + } else None + } + setTransactionSet(txSet) + + try { + cancelReceiveTimeout // FIXME: leave this here? + if (isTransactor) { + val txFactory = _transactionFactory.getOrElse(DefaultGlobalTransactionFactory) + atomic(txFactory) { + actor.base(message) + setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit + } + } else { + actor.base(message) + setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit + } + } catch { + case e: DeadTransactionException => + handleExceptionInDispatch( + new TransactionSetAbortedException("Transaction set has been aborted by another participant"), + message, topLevelTransaction) + case e => + handleExceptionInDispatch(e, message, topLevelTransaction) + } finally { + clearTransaction + if (topLevelTransaction) clearTransactionSet + } + } + + private def shutDownTemporaryActor(temporaryActor: ActorRef) = { + Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", temporaryActor.id) + temporaryActor.stop + linkedActors.remove(temporaryActor.uuid) // remove the temporary actor + // if last temporary actor is gone, then unlink me from supervisor + if (linkedActors.isEmpty) { + Actor.log.info( + "All linked actors have died permanently (they were all configured as TEMPORARY)" + + "\n\tshutting down and unlinking supervisor actor as well [%s].", + temporaryActor.id) + _supervisor.foreach(_ ! UnlinkAndStop(this)) + } + } + + private def handleExceptionInDispatch(e: Throwable, message: Any, topLevelTransaction: Boolean) = { + _isBeingRestarted = true + // abort transaction set + if (isTransactionSetInScope) { + val txSet = getTransactionSetInScope + Actor.log.debug("Aborting transaction set [%s]", txSet) + txSet.abort + } + Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message) + + senderFuture.foreach(_.completeWithException(this, e)) + + clearTransaction + if (topLevelTransaction) clearTransactionSet + + // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client + if (_supervisor.isDefined) _supervisor.get ! Exit(this, e) + } + private def nullOutActorRefReferencesFor(actor: Actor) = { actorSelfFields._1.set(actor, null) actorSelfFields._2.set(actor, null) diff --git a/akka-core/src/main/scala/dispatch/MessageHandling.scala b/akka-core/src/main/scala/dispatch/MessageHandling.scala index c2e74ceb1d..a73f2b691b 100644 --- a/akka-core/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-core/src/main/scala/dispatch/MessageHandling.scala @@ -53,7 +53,7 @@ final class MessageInvocation(val receiver: ActorRef, "\n\tsender = " + sender + "\n\tsenderFuture = " + senderFuture + "\n\ttransactionSet = " + transactionSet + - "\n]" + "]" } } diff --git a/akka-core/src/main/scala/stm/JTA.scala b/akka-core/src/main/scala/stm/JTA.scala index bb61973c91..24b5a49086 100644 --- a/akka-core/src/main/scala/stm/JTA.scala +++ b/akka-core/src/main/scala/stm/JTA.scala @@ -4,7 +4,9 @@ package se.scalablesolutions.akka.stm -import javax.transaction.{TransactionManager, UserTransaction, Transaction => JtaTransaction, SystemException, Status, Synchronization, TransactionSynchronizationRegistry} +import javax.transaction.{TransactionManager, UserTransaction, + Transaction => JtaTransaction, SystemException, + Status, Synchronization, TransactionSynchronizationRegistry} import javax.naming.{InitialContext, Context, NamingException} import se.scalablesolutions.akka.config.Config._ @@ -16,7 +18,7 @@ import se.scalablesolutions.akka.util.Logging * @author Jonas Bonér */ object TransactionContainer extends Logging { - val AKKA_JTA_TRANSACTION_SERVICE_CLASS = "se.scalablesolutions.akka.jta.AtomikosTransactionService" + val AKKA_JTA_TRANSACTION_SERVICE_CLASS = "se.scalablesolutions.akka.jta.AtomikosTransactionService" val DEFAULT_USER_TRANSACTION_NAME = "java:comp/UserTransaction" val FALLBACK_TRANSACTION_MANAGER_NAMES = "java:comp/TransactionManager" :: "java:appserver/TransactionManager" :: @@ -119,22 +121,31 @@ class TransactionContainer private (val tm: Either[Option[UserTransaction], Opti } } - def begin = tm match { - case Left(Some(userTx)) => userTx.begin - case Right(Some(txMan)) => txMan.begin - case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope") + def begin = { + TransactionContainer.log.ifTrace("Starting JTA transaction") + tm match { + case Left(Some(userTx)) => userTx.begin + case Right(Some(txMan)) => txMan.begin + case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope") + } } - def commit = tm match { - case Left(Some(userTx)) => userTx.commit - case Right(Some(txMan)) => txMan.commit - case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope") + def commit = { + TransactionContainer.log.ifTrace("Committing JTA transaction") + tm match { + case Left(Some(userTx)) => userTx.commit + case Right(Some(txMan)) => txMan.commit + case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope") + } } - def rollback = tm match { - case Left(Some(userTx)) => userTx.rollback - case Right(Some(txMan)) => txMan.rollback - case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope") + def rollback = { + TransactionContainer.log.ifTrace("Aborting JTA transaction") + tm match { + case Left(Some(userTx)) => userTx.rollback + case Right(Some(txMan)) => txMan.rollback + case _ => throw new StmConfigurationException("Does not have a UserTransaction or TransactionManager in scope") + } } def getStatus = tm match { diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala index 54f20a3504..0951cbc5c5 100644 --- a/akka-core/src/main/scala/stm/Transaction.scala +++ b/akka-core/src/main/scala/stm/Transaction.scala @@ -83,11 +83,12 @@ object Transaction { if (JTA_AWARE) Some(TransactionContainer()) else None - log.trace("Creating %s", toString) + log.ifTrace("Creating transaction " + toString) // --- public methods --------- def begin = synchronized { + log.ifTrace("Starting transaction " + toString) jta.foreach { txContainer => txContainer.begin txContainer.registerSynchronization(new StmSynchronization(txContainer, this)) @@ -95,14 +96,14 @@ object Transaction { } def commit = synchronized { - log.trace("Committing transaction %s", toString) + log.ifTrace("Committing transaction " + toString) persistentStateMap.valuesIterator.foreach(_.commit) status = TransactionStatus.Completed jta.foreach(_.commit) } def abort = synchronized { - log.trace("Aborting transaction %s", toString) + log.ifTrace("Aborting transaction " + toString) jta.foreach(_.rollback) persistentStateMap.valuesIterator.foreach(_.abort) persistentStateMap.clear diff --git a/akka-core/src/main/scala/stm/TransactionFactory.scala b/akka-core/src/main/scala/stm/TransactionFactory.scala index 56982bb759..d4e61ee04f 100644 --- a/akka-core/src/main/scala/stm/TransactionFactory.scala +++ b/akka-core/src/main/scala/stm/TransactionFactory.scala @@ -37,8 +37,8 @@ object TransactionConfig { def traceLevel(level: String) = level.toLowerCase match { case "coarse" | "course" => Transaction.TraceLevel.Coarse - case "fine" => Transaction.TraceLevel.Fine - case _ => Transaction.TraceLevel.None + case "fine" => Transaction.TraceLevel.Fine + case _ => Transaction.TraceLevel.None } /** @@ -126,7 +126,7 @@ object TransactionFactory { traceLevel: TraceLevel = TransactionConfig.TRACE_LEVEL, hooks: Boolean = TransactionConfig.HOOKS) = { val config = new TransactionConfig(familyName, readonly, maxRetries, timeout, trackReads, writeSkew, - explicitRetries, interruptible, speculative, quickRelease, traceLevel, hooks) + explicitRetries, interruptible, speculative, quickRelease, traceLevel, hooks) new TransactionFactory(config) } } diff --git a/akka-core/src/main/scala/stm/TransactionManagement.scala b/akka-core/src/main/scala/stm/TransactionManagement.scala index 0c6a244f42..398e1a1200 100644 --- a/akka-core/src/main/scala/stm/TransactionManagement.scala +++ b/akka-core/src/main/scala/stm/TransactionManagement.scala @@ -7,6 +7,7 @@ package se.scalablesolutions.akka.stm import se.scalablesolutions.akka.util.Logging import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.TimeUnit import org.multiverse.api.{StmUtils => MultiverseStmUtils} import org.multiverse.api.ThreadLocalTransaction._ @@ -14,16 +15,20 @@ import org.multiverse.api.{Transaction => MultiverseTransaction} import org.multiverse.commitbarriers.CountDownCommitBarrier import org.multiverse.templates.{TransactionalCallable, OrElseTemplate} -class StmException(msg: String) extends RuntimeException(msg) +class TransactionSetAbortedException(msg: String) extends RuntimeException(msg) +// TODO Should we remove TransactionAwareWrapperException? Not used anywhere yet. class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) { override def toString = "TransactionAwareWrapperException[" + cause + ", " + tx + "]" } +/** + * Internal helper methods and properties for transaction management. + */ object TransactionManagement extends TransactionManagement { import se.scalablesolutions.akka.config.Config._ - // move to stm.global.fair? + // FIXME move to stm.global.fair? val FAIR_TRANSACTIONS = config.getBool("akka.stm.fair", true) private[akka] val transactionSet = new ThreadLocal[Option[CountDownCommitBarrier]]() { @@ -47,6 +52,9 @@ object TransactionManagement extends TransactionManagement { } } +/** + * Internal helper methods for transaction management. + */ trait TransactionManagement { private[akka] def createNewTransactionSet: CountDownCommitBarrier = { @@ -111,7 +119,9 @@ class LocalStm extends TransactionManagement with Logging { factory.boilerplate.execute(new TransactionalCallable[T]() { def call(mtx: MultiverseTransaction): T = { factory.addHooks - body + val result = body + log.ifTrace("Committing local transaction [" + mtx + "]") + result } }) } @@ -145,9 +155,10 @@ class GlobalStm extends TransactionManagement with Logging { factory.addHooks val result = body val txSet = getTransactionSetInScope - log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet) - // FIXME ? txSet.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS) - try { txSet.joinCommit(mtx) } catch { case e: IllegalStateException => {} } + log.ifTrace("Committing global transaction [" + mtx + "]\n\tand joining transaction set [" + txSet + "]") + mtx.commit + // Need to catch IllegalStateException until we have fix in Multiverse, since it throws it by mistake + try { txSet.tryJoinCommit(mtx, TransactionConfig.TIMEOUT, TimeUnit.MILLISECONDS) } catch { case e: IllegalStateException => {} } clearTransaction result } @@ -156,6 +167,7 @@ class GlobalStm extends TransactionManagement with Logging { } trait StmUtil { + /** * Schedule a deferred task on the thread local transaction (use within an atomic). * This is executed when the transaction commits. @@ -178,6 +190,14 @@ trait StmUtil { /** * Use either-orElse to combine two blocking transactions. + * Usage: + *
+   * either {
+   *   ...
+   * } orElse {
+   *   ...
+   * }
+   * 
*/ def either[T](firstBody: => T) = new { def orElse(secondBody: => T) = new OrElseTemplate[T] { diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/TransactionalActiveObject.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/TransactionalActiveObject.java index 515f4fafee..825e7ca489 100644 --- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/TransactionalActiveObject.java +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/TransactionalActiveObject.java @@ -21,6 +21,7 @@ public class TransactionalActiveObject { refState = new Ref(); isInitialized = true; } + System.out.println("==========> init"); } public String getMapState(String key) { @@ -37,6 +38,7 @@ public class TransactionalActiveObject { public void setMapState(String key, String msg) { mapState.put(key, msg); + System.out.println("==========> setMapState"); } public void setVectorState(String msg) { @@ -72,6 +74,7 @@ public class TransactionalActiveObject { mapState.put(key, msg); vectorState.add(msg); refState.swap(msg); + System.out.println("==========> failure"); nested.failure(key, msg, failer); return msg; } diff --git a/akka-core/src/test/scala/TransactionalActiveObjectSpec.scala b/akka-core/src/test/scala/TransactionalActiveObjectSpec.scala index b42c137e33..8c4c23a530 100644 --- a/akka-core/src/test/scala/TransactionalActiveObjectSpec.scala +++ b/akka-core/src/test/scala/TransactionalActiveObjectSpec.scala @@ -16,18 +16,12 @@ import se.scalablesolutions.akka.config._ import se.scalablesolutions.akka.config.ActiveObjectConfigurator import se.scalablesolutions.akka.config.JavaConfig._ import se.scalablesolutions.akka.actor._ -/* + @RunWith(classOf[JUnitRunner]) class TransactionalActiveObjectSpec extends -<<<<<<< HEAD:akka-core/src/test/scala/TransactionalActiveObjectSpec.scala Spec with ShouldMatchers with BeforeAndAfterAll { -======= - Spec with - ShouldMatchers with - BeforeAndAfterAll { ->>>>>>> 38e8bea3fe6a7e9fcc9c5f353124144739bdc234:akka-core/src/test/scala/TransactionalActiveObjectSpec.scala private val conf = new ActiveObjectConfigurator private var messageLog = "" @@ -52,7 +46,7 @@ class TransactionalActiveObjectSpec extends } describe("Transactional in-memory Active Object ") { - +/* it("map should not rollback state for stateful server in case of success") { val stateful = conf.getInstance(classOf[TransactionalActiveObject]) stateful.init @@ -60,11 +54,13 @@ class TransactionalActiveObjectSpec extends stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess") should equal("new state") } - +*/ it("map should rollback state for stateful server in case of failure") { val stateful = conf.getInstance(classOf[TransactionalActiveObject]) stateful.init + Thread.sleep(500) stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") + Thread.sleep(500) val failer = conf.getInstance(classOf[ActiveObjectFailer]) try { stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) @@ -73,6 +69,7 @@ class TransactionalActiveObjectSpec extends stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure") should equal("init") } + /* it("vector should rollback state for stateful server in case of failure") { val stateful = conf.getInstance(classOf[TransactionalActiveObject]) stateful.init @@ -112,6 +109,6 @@ class TransactionalActiveObjectSpec extends stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") stateful.getRefState should equal("new state") } + */ } } -*/