diff --git a/akka.iws b/akka.iws
index 56432c0e34..a9353f6a59 100644
--- a/akka.iws
+++ b/akka.iws
@@ -49,6 +49,21 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -106,10 +121,64 @@
-
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -118,7 +187,7 @@
-
+
@@ -133,28 +202,10 @@
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
@@ -318,6 +369,132 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -1430,14 +1607,14 @@
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
localhost
@@ -1484,7 +1661,7 @@
-
+
@@ -1492,7 +1669,7 @@
-
+
@@ -1503,7 +1680,7 @@
-
+
@@ -1554,50 +1731,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -1647,23 +1780,65 @@
-
+
-
+
-
+
-
+
+
+
+
+
+
+
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java
index 69d1012f59..8fa95abb0b 100644
--- a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java
+++ b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java
@@ -36,15 +36,15 @@ public class InMemStateful {
@transactional
public void success(String key, String msg) {
mapState.put(key, msg);
- vectorState.add(msg);
- refState.swap(msg);
+ //vectorState.add(msg);
+ //refState.swap(msg);
}
@transactional
public void failure(String key, String msg, InMemFailer failer) {
mapState.put(key, msg);
- vectorState.add(msg);
- refState.swap(msg);
+ //vectorState.add(msg);
+ //refState.swap(msg);
failer.fail();
}
diff --git a/kernel/src/main/scala/ActiveObject.scala b/kernel/src/main/scala/ActiveObject.scala
index f094d5b672..aaac2dc695 100755
--- a/kernel/src/main/scala/ActiveObject.scala
+++ b/kernel/src/main/scala/ActiveObject.scala
@@ -90,7 +90,7 @@ object ActiveObject {
sealed class TransactionalAroundAdvice(target: Class[_],
targetInstance: AnyRef,
server: GenericServerContainer) extends AroundAdvice {
- val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance)
+ private val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance)
server.transactionalRefs = refs
server.transactionalMaps = maps
server.transactionalVectors = vectors
@@ -98,53 +98,59 @@ sealed class TransactionalAroundAdvice(target: Class[_],
import ActiveObject.threadBoundTx
private[this] var activeTx: Option[Transaction] = None
+ // FIXME: switch to using PCD annotation matching, break out into its own aspect + switch to StaticJoinPoint
def invoke(joinpoint: JoinPoint): AnyRef = {
- // FIXME: switch to using PCD annotation matching, break out into its own aspect + switch to StaticJoinPoint
val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti]
val method = rtti.getMethod
- if (method.isAnnotationPresent(Annotations.transactional)) {
- if (activeTx.isDefined) {
- val tx = activeTx.get
- tx.commit(server)
- threadBoundTx.set(None)
- activeTx = None
- }
- // FIXME: check if we are already in a transaction if so NEST (set parent)
- val newTx = new Transaction
- newTx.begin(server)
- threadBoundTx.set(Some(newTx))
- }
+ if (method.isAnnotationPresent(Annotations.transactional)) {
+ tryToCommitTransaction
+ startNewTransaction
+ }
+ joinExistingTransaction
+
+ val result: AnyRef = if (rtti.getMethod.isAnnotationPresent(Annotations.oneway)) sendOneWay(joinpoint)
+ else handleResult(sendAndReceiveEventually(joinpoint))
+ tryToPrecommitTransaction
+ result
+ }
+
+ private def startNewTransaction = {
+ val newTx = new Transaction
+ newTx.begin(server)
+ threadBoundTx.set(Some(newTx))
+ }
+
+ private def joinExistingTransaction = {
val cflowTx = threadBoundTx.get
- if (!activeTx.isDefined && cflowTx.isDefined) {
+ if (!activeTx.isDefined && cflowTx.isDefined) {
val currentTx = cflowTx.get
currentTx.join(server)
activeTx = Some(currentTx)
}
activeTx = threadBoundTx.get
+ }
- // FIXME: switch to using PCD annotation matching, break out into its own aspect + switch to StaticJoinPoint
- val result: AnyRef =
- if (rtti.getMethod.isAnnotationPresent(Annotations.oneway)) {
- server ! (activeTx, joinpoint)
- } else {
- val result: ErrRef[AnyRef] =
- server !!! ((activeTx, joinpoint), {
- var ref = ErrRef(activeTx)
- ref() = throw new ActiveObjectInvocationTimeoutException("Invocation to active object [" + targetInstance.getClass.getName + "] timed out after " + server.timeout + " milliseconds")
- ref
- })
- try {
- result()
- } catch {
- case e =>
- rollback(result.tx)
- throw e
- }
- }
+ private def tryToPrecommitTransaction = {
// FIXME: clear threadBoundTx on successful commit
if (activeTx.isDefined) activeTx.get.precommit(server)
- result
+ }
+
+ private def tryToCommitTransaction = if (activeTx.isDefined) {
+ val tx = activeTx.get
+ tx.commit(server)
+ threadBoundTx.set(None)
+ activeTx = None
+ }
+
+ private def handleResult(result: ErrRef[AnyRef]): AnyRef = {
+ try {
+ result()
+ } catch {
+ case e =>
+ rollback(result.tx)
+ throw e
+ }
}
private def rollback(tx: Option[Transaction]) = tx match {
@@ -154,6 +160,16 @@ sealed class TransactionalAroundAdvice(target: Class[_],
threadBoundTx.set(Some(tx))
}
+ private def sendOneWay(joinpoint: JoinPoint) = server ! (activeTx, joinpoint)
+
+ private def sendAndReceiveEventually(joinpoint: JoinPoint): ErrRef[AnyRef] = {
+ server !!! ((activeTx, joinpoint), {
+ var ref = ErrRef(activeTx)
+ ref() = throw new ActiveObjectInvocationTimeoutException("Invocation to active object [" + targetInstance.getClass.getName + "] timed out after " + server.timeout + " milliseconds")
+ ref
+ })
+ }
+
/**
* Search for transactional items for a specific target instance, crawl the class hierarchy recursively up to the top.
*/