diff --git a/akka.iws b/akka.iws index 8385d535a7..89a01a72d6 100644 --- a/akka.iws +++ b/akka.iws @@ -1,7 +1,12 @@ - + + + + + + @@ -19,6 +24,47 @@ + + + + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - + - - + + - - - - + + + + - + - + - + + - - + @@ -378,6 +264,107 @@ + + + + + + + + + + + + + + + + + + + + + + + + @@ -531,7 +518,13 @@ + + + - + - + - - - - + + + + - + - + - + - + + @@ -657,69 +651,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -762,9 +693,82 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + @@ -786,8 +790,15 @@ + @@ -839,8 +850,15 @@ + diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java index 57c3fc6eb1..279d9eaf54 100755 --- a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java @@ -31,18 +31,17 @@ public class InMemoryStateTest extends TestCase { conf.stop(); } -/* public void testShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { InMemStateful stateful = conf.getActiveObject("inmem-stateful"); stateful.setState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional + stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional assertEquals("new state", stateful.getState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")); } -*/ - public void testShouldRollbackStateForStatefulServerInCaseOfFailure() { + + public void testShouldRollbackStateForStatefulServerInCaseOfFailure() { InMemStateful stateful = conf.getActiveObject("inmem-stateful"); stateful.setState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state - InMemFailer failer = conf.getActiveObject("inmem-failer"); try { stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method @@ -51,7 +50,8 @@ public class InMemoryStateTest extends TestCase { } // expected assertEquals("init", stateful.getState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state } - + /* + */ // public void testShouldRollbackStateForStatefulServerInCaseOfMessageClash() // { // InMemStateful stateful = conf.getActiveObject(InMemStateful.class); diff --git a/kernel/src/main/scala/ActiveObject.scala b/kernel/src/main/scala/ActiveObject.scala index b6f7bd6dd8..8d651fc141 100755 --- a/kernel/src/main/scala/ActiveObject.scala +++ b/kernel/src/main/scala/ActiveObject.scala @@ -4,17 +4,17 @@ package se.scalablesolutions.akka.kernel +import kernel.camel.{MessageDriven, ActiveObjectProducer} import config.ActiveObjectGuiceConfigurator import config.ScalaConfig._ import java.util.{List => JList, ArrayList} import java.lang.reflect.{Method, Field, InvocationHandler, Proxy, InvocationTargetException} import java.lang.annotation.Annotation -import kernel.camel.{MessageDriven, ActiveObjectProducer} + import org.apache.camel.{Processor, Exchange} + import scala.collection.mutable.HashMap -//import voldemort.client.{SocketStoreClientFactory, StoreClient, StoreClientFactory} -//import voldemort.versioning.Versioned sealed class ActiveObjectException(msg: String) extends RuntimeException(msg) class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectException(msg) @@ -104,26 +104,27 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I def invoke(proxy: AnyRef, m: Method, args: Array[AnyRef]): AnyRef = { if (m.isAnnotationPresent(Annotations.transactional)) { + if (activeTx.isDefined) { + val tx = activeTx.get + //val cflowTx = threadBoundTx.get + // if (cflowTx.isDefined && cflowTx.get != tx) { + // new tx in scope; try to commit + tx.commit(server) + threadBoundTx.set(None) + activeTx = None + // } + } // FIXME: check if we are already in a transaction if so NEST (set parent) val newTx = new Transaction newTx.begin(server) threadBoundTx.set(Some(newTx)) } + val cflowTx = threadBoundTx.get - activeTx match { - case Some(tx) => - if (cflowTx.isDefined && cflowTx.get != tx) { - // new tx in scope; try to commit - tx.commit(server) - threadBoundTx.set(None) - activeTx = None - } - case None => - if (cflowTx.isDefined) { - val currentTx = cflowTx.get - currentTx.join(server) - activeTx = Some(currentTx) - } + if (!activeTx.isDefined && cflowTx.isDefined) { + val currentTx = cflowTx.get + currentTx.join(server) + activeTx = Some(currentTx) } activeTx = threadBoundTx.get invoke(Invocation(m, args, targetInstance, activeTx)) diff --git a/kernel/src/main/scala/Transaction.scala b/kernel/src/main/scala/Transaction.scala index 11aa8cb189..f6dc0f3322 100644 --- a/kernel/src/main/scala/Transaction.scala +++ b/kernel/src/main/scala/Transaction.scala @@ -5,8 +5,7 @@ package se.scalablesolutions.akka.kernel import java.util.concurrent.atomic.AtomicLong -import scala.collection.mutable.HashMap - +import scala.collection.mutable.{HashSet, HashMap} sealed abstract class TransactionStatus object TransactionStatus { case object New extends TransactionStatus @@ -34,60 +33,59 @@ object TransactionIdFactory { class Transaction extends Logging { val id = TransactionIdFactory.newId - log.debug("Creating a new transaction [%s]", id) + log.debug("Creating a new transaction with id [%s]", id) + + // FIXME: add support for nested transactions private[this] var parent: Option[Transaction] = None - private[this] var participants: List[GenericServerContainer] = Nil - private[this] var precommitted: List[GenericServerContainer] = Nil + private[this] val participants = new HashSet[GenericServerContainer] + private[this] val precommitted = new HashSet[GenericServerContainer] @volatile private[this] var status: TransactionStatus = TransactionStatus.New def begin(server: GenericServerContainer) = synchronized { - println("===== begin 1 " + server) - if (status == TransactionStatus.Aborted) throw new IllegalStateException("Can't begin ABORTED transaction") - if (status == TransactionStatus.Completed) throw new IllegalStateException("Can't begin COMPLETED transaction") - if (status == TransactionStatus.New) log.debug("Actor [%s] is starting NEW transaction", server) - else log.debug("Actor [%s] is participating in transaction", server) - println("===== begin 2 " + server) + ensureIsActiveOrNew + if (status == TransactionStatus.New) log.info("Server [%s] is starting NEW transaction [%s]", server.id, this) + else log.info("Server [%s] is participating in transaction", server) server.transactionalItems.foreach(_.begin) - participants ::= server + participants + server status = TransactionStatus.Active } def precommit(server: GenericServerContainer) = synchronized { if (status == TransactionStatus.Active) { - println("===== precommit " + server) - log.debug("Pre-committing transaction for actor [%s]", server) - precommitted ::= server + log.info("Pre-committing transaction [%s] for server [%s]", this, server.id) + precommitted + server } } def commit(server: GenericServerContainer) = synchronized { if (status == TransactionStatus.Active) { - println("===== commit " + server) - log.debug("Committing transaction for actor [%s]", server) + log.info("Committing transaction [%s] for server [%s]", this, server.id) val haveAllPreCommitted = if (participants.size == precommitted.size) {{ for (server <- participants) yield { if (precommitted.exists(_.id == server.id)) true else false - }}.exists(_ == false) + }}.exists(_ == true) } else false if (haveAllPreCommitted) status = TransactionStatus.Completed else rollback(server) } + participants.clear + precommitted.clear } def rollback(server: GenericServerContainer) = synchronized { ensureIsActiveOrAborted - println("===== rollback " + server) - log.debug("Actor [%s] has initiated transaction rollback, rolling back [%s]" , server, participants) + log.info("Server [%s] has initiated transaction rollback for [%s], rolling back [%s]", server.id, this, participants) participants.foreach(_.transactionalItems.foreach(_.rollback)) status = TransactionStatus.Aborted } def join(server: GenericServerContainer) = synchronized { - println("===== joining " + server) + ensureIsActive + log.info("Server [%s] is joining transaction [%s]" , server.id, this) server.transactionalItems.foreach(_.begin) - participants ::= server + participants + server } private def ensureIsActive = if (status != TransactionStatus.Active) @@ -96,6 +94,9 @@ class Transaction extends Logging { private def ensureIsActiveOrAborted = if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted)) throw new IllegalStateException("Expected ACTIVE or ABORTED transaction - current status [" + status + "]") + private def ensureIsActiveOrNew = if (!(status == TransactionStatus.Active || status == TransactionStatus.New)) + throw new IllegalStateException("Expected ACTIVE or NEW transaction - current status [" + status + "]") + override def equals(that: Any): Boolean = synchronized { that != null && that.isInstanceOf[Transaction] &&