diff --git a/akka.ipr b/akka.ipr
index 0830068922..1cf265f7a2 100644
--- a/akka.ipr
+++ b/akka.ipr
@@ -140,7 +140,7 @@
-
+
diff --git a/akka.iws b/akka.iws
index 1fd0f0b9ea..d549183eee 100644
--- a/akka.iws
+++ b/akka.iws
@@ -1,55 +1,22 @@
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
@@ -68,6 +35,34 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -126,75 +121,10 @@
-
-
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
@@ -203,16 +133,81 @@
-
+
-
-
+
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -231,22 +226,22 @@
@@ -290,80 +285,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -393,7 +314,11 @@
-
+
+
+
+
+
@@ -411,11 +336,89 @@
-
+
+
+
+
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -450,14 +453,6 @@
-
-
-
-
-
-
-
-
@@ -485,36 +480,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -605,36 +570,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -807,9 +742,33 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -817,13 +776,33 @@
-
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -831,32 +810,32 @@
-
+
-
+
-
+
-
+
-
+
-
+
-
-
+
+
@@ -865,36 +844,32 @@
-
+
-
+
-
+
-
+
-
+
-
+
-
-
-
-
-
-
+
+
@@ -903,21 +878,33 @@
-
+
-
+
-
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -938,19 +925,19 @@
-
+
-
+
-
-
+
+
@@ -962,13 +949,13 @@
-
+
-
+
@@ -980,7 +967,7 @@
-
+
@@ -1417,16 +1404,27 @@
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -1449,7 +1447,7 @@
-
+
@@ -1457,16 +1455,16 @@
-
+
-
+
-
-
+
+
@@ -1524,119 +1522,117 @@
-
+
-
+
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
+
-
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
-
+
-
-
+
+
+
+
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java
index 42452a1380..b02ed15169 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java
@@ -9,26 +9,32 @@ public class InMemStateful {
@state private TransactionalVector vectorState = new InMemoryTransactionalVector();
@state private TransactionalRef refState = new TransactionalRef();
+ @transactional
public String getMapState(String key) {
return (String)mapState.get(key).get();
}
+ @transactional
public String getVectorState() {
return (String)vectorState.last();
}
+ @transactional
public String getRefState() {
return (String)refState.get().get();
}
+ @transactional
public void setMapState(String key, String msg) {
mapState.put(key, msg);
}
+ @transactional
public void setVectorState(String msg) {
vectorState.add(msg);
}
+ @transactional
public void setRefState(String msg) {
refState.swap(msg);
}
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java
old mode 100755
new mode 100644
index 9227583101..0a0a921e21
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java
@@ -33,7 +33,6 @@ public class InMemoryStateTest extends TestCase {
InMemStateful stateful = conf.getActiveObject(InMemStateful.class);
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit
assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
}
diff --git a/kernel/src/main/scala/actor/ActiveObject.scala b/kernel/src/main/scala/actor/ActiveObject.scala
index 2b85e46e2d..f07d9fc61d 100644
--- a/kernel/src/main/scala/actor/ActiveObject.scala
+++ b/kernel/src/main/scala/actor/ActiveObject.scala
@@ -41,12 +41,12 @@ object Annotations {
* @author Jonas Bonér
*/
class ActiveObjectFactory {
- def newInstance[T](target: Class[T]): T = {
- ActiveObject.newInstance(target)
+ def newInstance[T](target: Class[T], actor: Actor): T = {
+ ActiveObject.newInstance(target, actor)
}
- def newInstance[T](intf: Class[T], target: AnyRef): T = {
- ActiveObject.newInstance(intf, target)
+ def newInstance[T](intf: Class[T], target: AnyRef, actor: Actor): T = {
+ ActiveObject.newInstance(intf, target, actor)
}
def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor =
@@ -67,16 +67,16 @@ object ActiveObject {
tl
}
- def newInstance[T](target: Class[T]): T = {
+ def newInstance[T](target: Class[T], actor: Actor): T = {
val proxy = Proxy.newInstance(target, false, true)
// FIXME switch to weaving in the aspect at compile time
- proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new SequentialTransactionalAroundAdvice(target, proxy))
+ proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new SequentialTransactionalAroundAdvice(target, proxy, actor))
proxy.asInstanceOf[T]
}
- def newInstance[T](intf: Class[T], target: AnyRef): T = {
+ def newInstance[T](intf: Class[T], target: AnyRef, actor: Actor): T = {
val proxy = Proxy.newInstance(Array(intf), Array(target), false, true)
- proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new SequentialTransactionalAroundAdvice(intf, target))
+ proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new SequentialTransactionalAroundAdvice(intf, target, actor))
proxy.asInstanceOf[T]
}
@@ -95,7 +95,7 @@ object ActiveObject {
*/
// FIXME: STM that allows concurrent updates, detects collision, rolls back and restarts
-sealed class SequentialTransactionalAroundAdvice(target: Class[_], targetInstance: AnyRef) extends AroundAdvice {
+sealed class SequentialTransactionalAroundAdvice(target: Class[_], targetInstance: AnyRef, actor: Actor) extends AroundAdvice {
private val changeSet = new ChangeSet(target.getName)
private val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance)
@@ -115,15 +115,24 @@ sealed class SequentialTransactionalAroundAdvice(target: Class[_], targetInstanc
def invoke(joinpoint: JoinPoint): AnyRef = {
val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti]
val method = rtti.getMethod
- if (reenteringExistingTransaction) {
- tryToCommitTransaction
- if (method.isAnnotationPresent(Annotations.transactional)) startNewTransaction
+
+ tryToCommitTransaction
+
+ if (isInExistingTransaction) {
joinExistingTransaction
+ } else {
+ if (method.isAnnotationPresent(Annotations.transactional)) startNewTransaction
}
+
val result: AnyRef = try {
incrementTransaction
- if (rtti.getMethod.isAnnotationPresent(Annotations.oneway)) sendOneWay(joinpoint) // FIXME put in 2 different aspects
- else handleResult(sendAndReceiveEventually(joinpoint))
+// if (rtti.getMethod.isAnnotationPresent(Annotations.oneway)) sendOneWay(joinpoint) // FIXME put in 2 different aspects
+// else handleResult(sendAndReceiveEventually(joinpoint))
+ val result = actor !! Invocation(joinpoint, activeTx)
+ val resultOrFailure =
+ if (result.isDefined) result.get.asInstanceOf[ResultOrFailure[AnyRef]]
+ else throw new ActiveObjectInvocationTimeoutException("TIMED OUT")
+ handleResult(resultOrFailure)
} finally {
decrementTransaction
if (isTransactionAborted) removeTransactionIfTopLevel
@@ -132,19 +141,12 @@ sealed class SequentialTransactionalAroundAdvice(target: Class[_], targetInstanc
result
}
- private def isTransactionAborted = activeTx.isDefined && activeTx.get.isAborted
- private def incrementTransaction = if (activeTx.isDefined) activeTx.get.increment
- private def decrementTransaction = if (activeTx.isDefined) activeTx.get.decrement
- private def removeTransactionIfTopLevel =
- if (activeTx.isDefined && activeTx.get.topLevel_?) {
- activeTx = None
- threadBoundTx.set(None)
- }
-
private def startNewTransaction = {
val newTx = new Transaction
newTx.begin(changeSet)
- threadBoundTx.set(Some(newTx))
+ val tx = Some(newTx)
+ activeTx = tx
+ threadBoundTx.set(tx)
}
private def joinExistingTransaction = {
@@ -158,17 +160,12 @@ sealed class SequentialTransactionalAroundAdvice(target: Class[_], targetInstanc
private def tryToPrecommitTransaction = if (activeTx.isDefined) activeTx.get.precommit(changeSet)
- private def reenteringExistingTransaction= if (activeTx.isDefined) {
- val cflowTx = threadBoundTx.get
- if (cflowTx.isDefined && cflowTx.get.id == activeTx.get.id) false
- else true
- } else true
-
- private def tryToCommitTransaction = if (activeTx.isDefined) {
+ private def tryToCommitTransaction: Boolean = if (activeTx.isDefined) {
val tx = activeTx.get
tx.commit(changeSet)
removeTransactionIfTopLevel
- }
+ true
+ } else false
private def handleResult(result: ResultOrFailure[AnyRef]): AnyRef = {
try {
@@ -186,6 +183,26 @@ sealed class SequentialTransactionalAroundAdvice(target: Class[_], targetInstanc
tx.rollback(changeSet)
}
+ private def isInExistingTransaction = ActiveObject.threadBoundTx.get.isDefined
+
+ private def isTransactionAborted = activeTx.isDefined && activeTx.get.isAborted
+
+ private def incrementTransaction = if (activeTx.isDefined) activeTx.get.increment
+
+ private def decrementTransaction = if (activeTx.isDefined) activeTx.get.decrement
+
+ private def removeTransactionIfTopLevel =
+ if (activeTx.isDefined && activeTx.get.topLevel_?) {
+ activeTx = None
+ threadBoundTx.set(None)
+ }
+
+ private def reenteringExistingTransaction= if (activeTx.isDefined) {
+ val cflowTx = threadBoundTx.get
+ if (cflowTx.isDefined && cflowTx.get.id == activeTx.get.id) false
+ else true
+ } else true
+
private def sendOneWay(joinpoint: JoinPoint) =
mailbox.append(new MessageHandle(this, Invocation(joinpoint, activeTx), new NullFutureResult))
@@ -195,8 +212,7 @@ sealed class SequentialTransactionalAroundAdvice(target: Class[_], targetInstanc
getResultOrThrowException(future)
}
- private def postMessageToMailboxAndCreateFutureResultWithTimeout(
- message: AnyRef, timeout: Long): CompletableFutureResult = {
+ private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = {
val future = new DefaultCompletableFutureResult(timeout)
mailbox.append(new MessageHandle(this, message, future))
future
@@ -310,6 +326,30 @@ class ChangeSet(val id: String) {
}
}
+/**
+ * Generic Actor managing Invocation dispatch, transaction and error management.
+ *
+ * @author Jonas Bonér
+ */
+private[kernel] class Dispatcher(val targetName: String) extends Actor {
+ id = targetName
+ override def receive: PartialFunction[Any, Unit] = {
+
+ case Invocation(joinpoint: JoinPoint, tx: Option[Transaction]) =>
+ ActiveObject.threadBoundTx.set(tx)
+ try {
+ reply(ResultOrFailure(joinpoint.proceed, tx))
+ } catch {
+ case e =>
+ val resultOrFailure = ResultOrFailure(tx)
+ resultOrFailure() = throw e
+ reply(resultOrFailure)
+ }
+
+ case unexpected =>
+ throw new ActiveObjectException("Unexpected message [" + unexpected + "] sent to [" + this + "]")
+ }
+}
/*
ublic class CamelInvocationHandler implements InvocationHandler {
@@ -365,32 +405,3 @@ ublic class CamelInvocationHandler implements InvocationHandler {
} else
*/
-
-/**
- * Generic GenericServer managing Invocation dispatch, transaction and error management.
- *
- * @author Jonas Bonér
- */
-private[kernel] class Dispatcher(val targetName: String) extends Actor {
- override def receive: PartialFunction[Any, Unit] = {
-
- case Invocation(joinpoint: JoinPoint, tx: Option[Transaction]) =>
- ActiveObject.threadBoundTx.set(tx)
- try {
- reply(ResultOrFailure(joinpoint.proceed, tx))
- } catch {
- case e =>
- val resultOrFailure = ResultOrFailure(tx)
- resultOrFailure() = throw e
- reply(resultOrFailure)
- }
-
- case 'exit =>
- exit
-
- case unexpected =>
- throw new ActiveObjectException("Unexpected message [" + unexpected + "] sent to [" + this + "]")
- }
-
- override def toString(): String = "Actor[" + targetName + "]"
-}
\ No newline at end of file
diff --git a/kernel/src/main/scala/actor/Actor.scala b/kernel/src/main/scala/actor/Actor.scala
index ddf9b70ac8..d997358609 100644
--- a/kernel/src/main/scala/actor/Actor.scala
+++ b/kernel/src/main/scala/actor/Actor.scala
@@ -167,6 +167,8 @@ trait Actor extends Logging {
dispatcher = new EventBasedThreadPoolDispatcher
case DispatcherType.ThreadBasedDispatcher =>
dispatcher = new ThreadBasedDispatcher
+ case DispatcherType.EventBasedThreadPooledProxyInvokingDispatcher =>
+ dispatcher = new ProxyMessageDispatcher
}
mailbox = dispatcher.messageQueue
dispatcher.registerHandler(this, new ActorMessageHandler(this))
diff --git a/kernel/src/main/scala/actor/Supervisor.scala b/kernel/src/main/scala/actor/Supervisor.scala
index 93fa910d22..9086f29e5b 100644
--- a/kernel/src/main/scala/actor/Supervisor.scala
+++ b/kernel/src/main/scala/actor/Supervisor.scala
@@ -87,6 +87,7 @@ abstract class SupervisorFactory extends Logging {
}
}
+// FIXME remove Supervisor - all Actors can be supervisors - move SupervisorFactory config into actor
/**
* @author Jonas Bonér
*/
diff --git a/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
index d3220e02c7..c96f18cbae 100644
--- a/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
+++ b/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
@@ -8,10 +8,12 @@ import com.google.inject._
import com.google.inject.jsr250.ResourceProviderFactory
import ScalaConfig._
-import kernel.actor.{Supervisor, ActiveObjectFactory}
+import kernel.actor.{Actor, Supervisor, ActiveObjectFactory, Dispatcher}
import kernel.camel.ActiveObjectComponent
import kernel.util.Logging
+import org.codehaus.aspectwerkz.intercept.Advisable
+
import org.apache.camel.impl.{JndiRegistry, DefaultCamelContext}
import org.apache.camel.{CamelContext, Endpoint, Routes}
@@ -106,10 +108,9 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
private def newSubclassingProxy(component: Component): DependencyBinding = {
val targetClass = component.target
-
- // FIXME add wrapping Actor and pass into Worker
- workers ::= Worker(null, component.lifeCycle)
- val proxy = activeObjectFactory.newInstance(targetClass).asInstanceOf[AnyRef]
+ val actor = new Dispatcher(targetClass.getName)
+ val proxy = activeObjectFactory.newInstance(targetClass, actor).asInstanceOf[AnyRef]
+ workers ::= Worker(actor, component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, proxy, component))
new DependencyBinding(targetClass, proxy)
}
@@ -117,9 +118,10 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
private def newDelegatingProxy(component: Component): DependencyBinding = {
val targetClass = component.intf.get
val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry
- workers ::= Worker(null, component.lifeCycle) // TODO null is not an Actor
component.target.getConstructor(Array[Class[_]]()).setAccessible(true)
- val proxy = activeObjectFactory.newInstance(targetClass, targetInstance).asInstanceOf[AnyRef]
+ val actor = new Dispatcher(targetClass.getName)
+ val proxy = activeObjectFactory.newInstance(targetClass, targetInstance, actor).asInstanceOf[AnyRef]
+ workers ::= Worker(actor, component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, targetInstance, component))
new DependencyBinding(targetClass, proxy)
}
diff --git a/kernel/src/main/scala/state/State.scala b/kernel/src/main/scala/state/State.scala
old mode 100755
new mode 100644
index 60aaa9e0d5..6f91de0174
--- a/kernel/src/main/scala/state/State.scala
+++ b/kernel/src/main/scala/state/State.scala
@@ -5,7 +5,10 @@
package se.scalablesolutions.akka.kernel.state
import org.codehaus.aspectwerkz.proxy.Uuid
+
+import kernel.actor.ActiveObject
import se.scalablesolutions.akka.collection._
+
import scala.collection.mutable.HashMap
/**
@@ -13,9 +16,13 @@ import scala.collection.mutable.HashMap
*/
trait Transactional {
val uuid = Uuid.newUuid.toString
+
private[kernel] def begin
private[kernel] def commit
private[kernel] def rollback
+
+ protected def isInTransaction = ActiveObject.threadBoundTx.get.isDefined
+ protected def nonTransactionalCall = throw new IllegalStateException("Can't access transactional map outside the scope of a transaction")
}
/**
@@ -26,7 +33,37 @@ trait Transactional {
* @author Jonas Bonér
*/
trait TransactionalMap[K, V] extends Transactional with scala.collection.mutable.Map[K, V] {
- def remove(key: K)
+ def remove(key: K)
+}
+
+trait TransactionalMapGuard[K, V] extends TransactionalMap[K, V] with Transactional {
+ abstract override def contains(key: K): Boolean =
+ if (isInTransaction) super.contains(key)
+ else nonTransactionalCall
+ abstract override def clear =
+ if (isInTransaction) super.clear
+ else nonTransactionalCall
+ abstract override def size: Int =
+ if (isInTransaction) super.size
+ else nonTransactionalCall
+ abstract override def remove(key: K) =
+ if (isInTransaction) super.remove(key)
+ else nonTransactionalCall
+ abstract override def elements: Iterator[(K, V)] =
+ if (isInTransaction) super.elements
+ else nonTransactionalCall
+ abstract override def get(key: K): Option[V] =
+ if (isInTransaction) super.get(key)
+ else nonTransactionalCall
+ abstract override def put(key: K, value: V): Option[V] =
+ if (isInTransaction) super.put(key, value)
+ else nonTransactionalCall
+ abstract override def -=(key: K) =
+ if (isInTransaction) super.-=(key)
+ else nonTransactionalCall
+ abstract override def update(key: K, value: V) =
+ if (isInTransaction) super.update(key, value)
+ else nonTransactionalCall
}
/**
diff --git a/kernel/src/main/scala/stm/Transaction.scala b/kernel/src/main/scala/stm/Transaction.scala
index 28c205aa1c..ef7d7290b5 100644
--- a/kernel/src/main/scala/stm/Transaction.scala
+++ b/kernel/src/main/scala/stm/Transaction.scala
@@ -89,6 +89,8 @@ object TransactionIdFactory {
log.debug("Server [%s] has initiated transaction rollback for [%s], rolling back [%s]", changeSet.id, this, participants)
participants.foreach(_.full.foreach(_.rollback))
status = TransactionStatus.Aborted
+ participants.clear
+ precommitted.clear
}
def join(changeSet: ChangeSet) = synchronized {