From 8bc40775b3d671d2cfd0f98badc157de2f610d2c Mon Sep 17 00:00:00 2001 From: Jonas Boner Date: Sun, 24 May 2009 07:41:47 +0200 Subject: [PATCH] cleanup and refactoring of active object code --- akka.iws | 341 +++++++++++++----- .../akka/api/InMemStateful.java | 8 +- kernel/src/main/scala/ActiveObject.scala | 86 +++-- 3 files changed, 313 insertions(+), 122 deletions(-) diff --git a/akka.iws b/akka.iws index 56432c0e34..a9353f6a59 100644 --- a/akka.iws +++ b/akka.iws @@ -49,6 +49,21 @@ + + + + - + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -118,7 +187,7 @@ - + @@ -133,28 +202,10 @@ - + - - - - - - - - - - - - - - - - - - - + @@ -318,6 +369,132 @@ - + @@ -1492,7 +1669,7 @@ - + @@ -1503,7 +1680,7 @@ - + @@ -1554,50 +1731,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -1647,23 +1780,65 @@ - + - + - + - + + + + + + + + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java index 69d1012f59..8fa95abb0b 100644 --- a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java @@ -36,15 +36,15 @@ public class InMemStateful { @transactional public void success(String key, String msg) { mapState.put(key, msg); - vectorState.add(msg); - refState.swap(msg); + //vectorState.add(msg); + //refState.swap(msg); } @transactional public void failure(String key, String msg, InMemFailer failer) { mapState.put(key, msg); - vectorState.add(msg); - refState.swap(msg); + //vectorState.add(msg); + //refState.swap(msg); failer.fail(); } diff --git a/kernel/src/main/scala/ActiveObject.scala b/kernel/src/main/scala/ActiveObject.scala index f094d5b672..aaac2dc695 100755 --- a/kernel/src/main/scala/ActiveObject.scala +++ b/kernel/src/main/scala/ActiveObject.scala @@ -90,7 +90,7 @@ object ActiveObject { sealed class TransactionalAroundAdvice(target: Class[_], targetInstance: AnyRef, server: GenericServerContainer) extends AroundAdvice { - val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance) + private val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance) server.transactionalRefs = refs server.transactionalMaps = maps server.transactionalVectors = vectors @@ -98,53 +98,59 @@ sealed class TransactionalAroundAdvice(target: Class[_], import ActiveObject.threadBoundTx private[this] var activeTx: Option[Transaction] = None + // FIXME: switch to using PCD annotation matching, break out into its own aspect + switch to StaticJoinPoint def invoke(joinpoint: JoinPoint): AnyRef = { - // FIXME: switch to using PCD annotation matching, break out into its own aspect + switch to StaticJoinPoint val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti] val method = rtti.getMethod - if (method.isAnnotationPresent(Annotations.transactional)) { - if (activeTx.isDefined) { - val tx = activeTx.get - tx.commit(server) - threadBoundTx.set(None) - activeTx = None - } - // FIXME: check if we are already in a transaction if so NEST (set parent) - val newTx = new Transaction - newTx.begin(server) - threadBoundTx.set(Some(newTx)) - } + if (method.isAnnotationPresent(Annotations.transactional)) { + tryToCommitTransaction + startNewTransaction + } + joinExistingTransaction + + val result: AnyRef = if (rtti.getMethod.isAnnotationPresent(Annotations.oneway)) sendOneWay(joinpoint) + else handleResult(sendAndReceiveEventually(joinpoint)) + tryToPrecommitTransaction + result + } + + private def startNewTransaction = { + val newTx = new Transaction + newTx.begin(server) + threadBoundTx.set(Some(newTx)) + } + + private def joinExistingTransaction = { val cflowTx = threadBoundTx.get - if (!activeTx.isDefined && cflowTx.isDefined) { + if (!activeTx.isDefined && cflowTx.isDefined) { val currentTx = cflowTx.get currentTx.join(server) activeTx = Some(currentTx) } activeTx = threadBoundTx.get + } - // FIXME: switch to using PCD annotation matching, break out into its own aspect + switch to StaticJoinPoint - val result: AnyRef = - if (rtti.getMethod.isAnnotationPresent(Annotations.oneway)) { - server ! (activeTx, joinpoint) - } else { - val result: ErrRef[AnyRef] = - server !!! ((activeTx, joinpoint), { - var ref = ErrRef(activeTx) - ref() = throw new ActiveObjectInvocationTimeoutException("Invocation to active object [" + targetInstance.getClass.getName + "] timed out after " + server.timeout + " milliseconds") - ref - }) - try { - result() - } catch { - case e => - rollback(result.tx) - throw e - } - } + private def tryToPrecommitTransaction = { // FIXME: clear threadBoundTx on successful commit if (activeTx.isDefined) activeTx.get.precommit(server) - result + } + + private def tryToCommitTransaction = if (activeTx.isDefined) { + val tx = activeTx.get + tx.commit(server) + threadBoundTx.set(None) + activeTx = None + } + + private def handleResult(result: ErrRef[AnyRef]): AnyRef = { + try { + result() + } catch { + case e => + rollback(result.tx) + throw e + } } private def rollback(tx: Option[Transaction]) = tx match { @@ -154,6 +160,16 @@ sealed class TransactionalAroundAdvice(target: Class[_], threadBoundTx.set(Some(tx)) } + private def sendOneWay(joinpoint: JoinPoint) = server ! (activeTx, joinpoint) + + private def sendAndReceiveEventually(joinpoint: JoinPoint): ErrRef[AnyRef] = { + server !!! ((activeTx, joinpoint), { + var ref = ErrRef(activeTx) + ref() = throw new ActiveObjectInvocationTimeoutException("Invocation to active object [" + targetInstance.getClass.getName + "] timed out after " + server.timeout + " milliseconds") + ref + }) + } + /** * Search for transactional items for a specific target instance, crawl the class hierarchy recursively up to the top. */