refState = factory.newInMemoryRef();
@transactional
public String getMapState(String key) {
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java
index 39b1ed8ece..7067fab0d0 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java
@@ -17,7 +17,6 @@ public class InMemoryStateTest extends TestCase {
final private ActiveObjectFactory factory = new ActiveObjectFactory();
protected void setUp() {
-
conf.configureActiveObjects(
new RestartStrategy(new AllForOne(), 3, 5000),
new Component[]{
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java
index a0510ff162..e9492ee9e7 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java
@@ -6,9 +6,9 @@ import se.scalablesolutions.akka.annotation.state;
public class PersistentStateful {
private TransactionalState factory = new TransactionalState();
- private TransactionalMap mapState = factory.newMap(new PersistentMapConfig(new CassandraStorageConfig()));
- private TransactionalVector vectorState = factory.newVector(new PersistentVectorConfig(new CassandraStorageConfig()));;
- private TransactionalRef refState = factory.newRef(new PersistentRefConfig(new CassandraStorageConfig()));
+ private TransactionalMap mapState = factory.newPersistentMap(new CassandraStorageConfig());
+ private TransactionalVector vectorState = factory.newPersistentVector(new CassandraStorageConfig());;
+ private TransactionalRef refState = factory.newPersistentRef(new CassandraStorageConfig());
@transactional
public String getMapState(String key) {
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java
index b5fe68e463..e433a83c09 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java
@@ -21,6 +21,7 @@ public class RemoteInMemoryStateTest extends TestCase {
server.connect();
}
}).start();
+ try { Thread.currentThread().sleep(1000); } catch (Exception e) {}
}
final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
final private se.scalablesolutions.akka.kernel.actor.ActiveObjectFactory factory = new se.scalablesolutions.akka.kernel.actor.ActiveObjectFactory();
diff --git a/kernel/src/main/scala/actor/ActiveObject.scala b/kernel/src/main/scala/actor/ActiveObject.scala
index cc39caed0a..2e74768912 100644
--- a/kernel/src/main/scala/actor/ActiveObject.scala
+++ b/kernel/src/main/scala/actor/ActiveObject.scala
@@ -12,7 +12,8 @@ import kernel.config.ActiveObjectGuiceConfigurator
import kernel.config.ScalaConfig._
import kernel.camel.{MessageDriven, ActiveObjectProducer}
import kernel.nio.{RemoteRequest, NettyClient}
-import kernel.stm.{ChangeSet, Transaction}
+import kernel.stm.{TransactionManagement, TransactionAwareWrapperException, ChangeSet, Transaction}
+
import kernel.util.Helpers.ReadWriteLock
import kernel.util.{HashCode, ResultOrFailure}
import kernel.state.{Transactional, TransactionalMap, TransactionalRef, TransactionalVector}
@@ -27,7 +28,6 @@ import scala.collection.mutable.HashMap
sealed class ActiveObjectException(msg: String) extends RuntimeException(msg)
class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectException(msg)
-class TransactionAwareException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause)
object Annotations {
import se.scalablesolutions.akka.annotation._
@@ -84,12 +84,6 @@ class ActiveObjectFactory {
object ActiveObject {
val AKKA_CAMEL_ROUTING_SCHEME = "akka"
- private[kernel] val threadBoundTx: ThreadLocal[Option[Transaction]] = {
- val tl = new ThreadLocal[Option[Transaction]]
- tl.set(None)
- tl
- }
-
def newInstance[T](target: Class[T]): T = newInstance(target, new Dispatcher(target.getName), false)
def newInstance[T](intf: Class[T], target: AnyRef): T = newInstance(intf, target, new Dispatcher(intf.getName), false)
@@ -105,7 +99,7 @@ object ActiveObject {
val proxy = Proxy.newInstance(target, false, true)
// FIXME switch to weaving in the aspect at compile time
proxy.asInstanceOf[Advisable].aw_addAdvice(
- "execution(* *.*(..))", new SequentialTransactionalAroundAdvice(target, proxy, actor, remote))
+ "execution(* *.*(..))", new TransactionalAroundAdvice(target, proxy, actor, remote))
proxy.asInstanceOf[T]
}
@@ -113,7 +107,7 @@ object ActiveObject {
if (remote) NettyClient.connect
val proxy = Proxy.newInstance(Array(intf), Array(target), false, true)
proxy.asInstanceOf[Advisable].aw_addAdvice(
- "execution(* *.*(..))", new SequentialTransactionalAroundAdvice(intf, target, actor, remote))
+ "execution(* *.*(..))", new TransactionalAroundAdvice(intf, target, actor, remote))
proxy.asInstanceOf[T]
}
@@ -130,60 +124,37 @@ object ActiveObject {
/**
* @author Jonas Bonér
*/
+@serializable sealed class TransactionalAroundAdvice(
+ val target: Class[_], val targetInstance: AnyRef, actor: Actor, val isRemote: Boolean)
+ extends AroundAdvice with TransactionManagement {
-// FIXME: STM that allows concurrent updates, detects collision, rolls back and restarts
-@serializable sealed class SequentialTransactionalAroundAdvice(
- target: Class[_], targetInstance: AnyRef, actor: Actor, val isRemote: Boolean) extends AroundAdvice {
- private val changeSet = new ChangeSet(target.getName)
+ val transactionalInstance = targetInstance
- private val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance)
- changeSet.refs = refs
- changeSet.maps = maps
- changeSet.vectors = vectors
-
import kernel.reactor._
private[this] var dispatcher = new ProxyMessageDispatcher
private[this] var mailbox = dispatcher.messageQueue
dispatcher.start
- import ActiveObject.threadBoundTx
- private[kernel] var activeTx: Option[Transaction] = None
+ def invoke(joinpoint: JoinPoint): AnyRef =
+ if (TransactionManagement.isTransactionsEnabled) transactionalDispatch(joinpoint)
+ else
+ dispatch(joinpoint)
- // FIXME: switch to using PCD annotation matching, break out into its own aspect + switch to StaticJoinPoint
- def invoke(joinpoint: JoinPoint): AnyRef = {
- val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti]
- val method = rtti.getMethod
+ private def dispatch(joinpoint: JoinPoint) = {
+ if (isRemote) remoteDispatch(joinpoint)
+ else localDispatch(joinpoint)
+ }
- val isOneWay = rtti.getMethod.getReturnType == java.lang.Void.TYPE
+ private def transactionalDispatch(joinpoint: JoinPoint) = {
// FIXME join TX with same id, do not COMMIT
tryToCommitTransaction
- if (isInExistingTransaction) {
- joinExistingTransaction
- } else {
- if (method.isAnnotationPresent(Annotations.transactional)) startNewTransaction
- }
+ if (isInExistingTransaction) joinExistingTransaction
+ else if (isTransactional(joinpoint)) startNewTransaction
+ incrementTransaction
try {
- incrementTransaction
- if (isRemote) {
- val future = NettyClient.send(
- new RemoteRequest(false, rtti.getParameterValues, rtti.getMethod.getName, target.getName, activeTx, isOneWay, false))
- if (isOneWay) null // for void methods
- else {
- future.await_?
- val result = getResultOrThrowException(future)
- if (result.isDefined) result.get
- else throw new IllegalStateException("No result defined for invocation [" + joinpoint + "]")
- }
- } else {
- if (isOneWay) actor !! Invocation(joinpoint, activeTx) // FIXME investigate why ! causes TX to race
- else {
- val result = actor !! Invocation(joinpoint, activeTx)
- if (result.isDefined) result.get
- else throw new IllegalStateException("No result defined for invocation [" + joinpoint + "]")
- }
- }
+ dispatch(joinpoint)
} catch {
- case e: TransactionAwareException =>
+ case e: TransactionAwareWrapperException =>
rollback(e.tx)
throw e.cause
} finally {
@@ -193,136 +164,41 @@ object ActiveObject {
}
}
- // TODO: create a method setCallee/setCaller to the joinpoint interface and compiler
- private def nullOutTransientFieldsInJoinpoint(joinpoint: JoinPoint) = {
- val clazz = joinpoint.getClass
- val callee = clazz.getDeclaredField("CALLEE")
- callee.setAccessible(true)
- callee.set(joinpoint, null)
- val caller = clazz.getDeclaredField("CALLER")
- caller.setAccessible(true)
- caller.set(joinpoint, null)
- val interceptors = clazz.getDeclaredField("AROUND_INTERCEPTORS")
- interceptors.setAccessible(true)
- interceptors.set(joinpoint, null)
- }
-
- private def startNewTransaction = {
- val newTx = new Transaction
- newTx.begin(changeSet)
- val tx = Some(newTx)
- activeTx = tx
- threadBoundTx.set(tx)
- }
-
- private def joinExistingTransaction = {
- val cflowTx = threadBoundTx.get
- if (!activeTx.isDefined && cflowTx.isDefined) {
- val currentTx = cflowTx.get
- currentTx.join(changeSet)
- activeTx = Some(currentTx)
+ private def localDispatch(joinpoint: JoinPoint): AnyRef = {
+ val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti]
+ if (isOneWay(rtti)) actor !! Invocation(joinpoint, activeTx) // FIXME investigate why ! causes TX to race
+ else {
+ val result = actor !! Invocation(joinpoint, activeTx)
+ if (result.isDefined) result.get
+ else throw new IllegalStateException("No result defined for invocation [" + joinpoint + "]")
}
}
- private def tryToPrecommitTransaction = if (activeTx.isDefined) activeTx.get.precommit(changeSet)
-
- private def tryToCommitTransaction: Boolean = if (activeTx.isDefined) {
- val tx = activeTx.get
- tx.commit(changeSet)
- removeTransactionIfTopLevel
- true
- } else false
-
- private def handleResult(result: ResultOrFailure[AnyRef]): AnyRef = {
- try {
- result()
- } catch {
- case e =>
- rollback(result.tx)
- throw e
+ private def remoteDispatch(joinpoint: JoinPoint): AnyRef = {
+ val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti]
+ val oneWay = isOneWay(rtti)
+ val future = NettyClient.send(
+ new RemoteRequest(false, rtti.getParameterValues, rtti.getMethod.getName, target.getName, activeTx, oneWay, false))
+ if (oneWay) null // for void methods
+ else {
+ future.await
+ val result = getResultOrThrowException(future)
+ if (result.isDefined) result.get
+ else throw new IllegalStateException("No result defined for invocation [" + joinpoint + "]")
}
}
- private def rollback(tx: Option[Transaction]) = tx match {
- case None => {} // no tx; nothing to do
- case Some(tx) =>
- tx.rollback(changeSet)
- }
-
- private def isInExistingTransaction = ActiveObject.threadBoundTx.get.isDefined
-
- private def isTransactionAborted = activeTx.isDefined && activeTx.get.isAborted
-
- private def incrementTransaction = if (activeTx.isDefined) activeTx.get.increment
-
- private def decrementTransaction = if (activeTx.isDefined) activeTx.get.decrement
-
- private def removeTransactionIfTopLevel =
- if (activeTx.isDefined && activeTx.get.topLevel_?) {
- activeTx = None
- threadBoundTx.set(None)
- }
-
- private def reenteringExistingTransaction= if (activeTx.isDefined) {
- val cflowTx = threadBoundTx.get
- if (cflowTx.isDefined && cflowTx.get.id == activeTx.get.id) false
- else true
- } else true
-
- private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = {
- val future = new DefaultCompletableFutureResult(timeout)
- mailbox.append(new MessageHandle(this, message, future))
- future
- }
-
private def getResultOrThrowException[T](future: FutureResult): Option[T] =
if (future.exception.isDefined) {
val (_, cause) = future.exception.get
- throw new TransactionAwareException(cause, activeTx)
- } else {
- if (future.result.isDefined) {
- val (res, tx) = future.result.get.asInstanceOf[Tuple2[AnyRef, Option[Transaction]]]
- Some(res).asInstanceOf[Option[T]]
- } else None
- }
+ if (TransactionManagement.isTransactionsEnabled) throw new TransactionAwareWrapperException(cause, activeTx)
+ else throw cause
+ } else future.result.asInstanceOf[Option[T]]
- /**
- * Search for transactional items for a specific target instance, crawl the class hierarchy recursively up to the top.
- */
- private def getTransactionalItemsFor(targetInstance: AnyRef):
- Tuple3[List[TransactionalMap[_, _]], List[TransactionalVector[_]], List[TransactionalRef[_]]] = {
- require(targetInstance != null)
- var maps: List[TransactionalMap[_, _]] = Nil
- var refs: List[TransactionalRef[_]] = Nil
- var vectors: List[TransactionalVector[_]] = Nil
+ private def isTransactional(joinpoint: JoinPoint) =
+ joinpoint.getRtti.asInstanceOf[MethodRtti].getMethod.isAnnotationPresent(Annotations.transactional)
- def getTransactionalItemsFor(target: Class[_]):
- Tuple3[List[TransactionalMap[_, _]], List[TransactionalVector[_]], List[TransactionalRef[_]]] = {
- target.getDeclaredFields.toArray.toList.asInstanceOf[List[Field]].foreach(println)
- for {
- field <- target.getDeclaredFields.toArray.toList.asInstanceOf[List[Field]]
- fieldType = field.getType
- if (fieldType == classOf[TransactionalMap[_, _]]) ||
- (fieldType == classOf[TransactionalVector[_]]) ||
- (fieldType == classOf[TransactionalRef[_]])
- txItem = {
- field.setAccessible(true)
- field.get(targetInstance)
- }
- if txItem != null
- } {
- if (txItem.isInstanceOf[TransactionalMap[_, _]]) maps ::= txItem.asInstanceOf[TransactionalMap[_, _]]
- else if (txItem.isInstanceOf[TransactionalRef[_]]) refs ::= txItem.asInstanceOf[TransactionalRef[_]]
- else if (txItem.isInstanceOf[TransactionalVector[_]]) vectors ::= txItem.asInstanceOf[TransactionalVector[_]]
- }
- val parent = target.getSuperclass
- if (parent == classOf[Object]) (maps, vectors, refs)
- else getTransactionalItemsFor(parent)
- }
-
- // start the search for transactional items, crawl the class hierarchy up until we reach Object
- getTransactionalItemsFor(targetInstance.getClass)
- }
+ private def isOneWay(rtti: MethodRtti) = rtti.getMethod.getReturnType == java.lang.Void.TYPE
}
/**
@@ -357,6 +233,7 @@ object ActiveObject {
* @author Jonas Bonér
*/
private[kernel] class Dispatcher(val targetName: String) extends Actor {
+ //makeTransactional
id = targetName
// FIXME implement the pre/post restart methods and call annotated methods on the POJO
@@ -366,12 +243,12 @@ private[kernel] class Dispatcher(val targetName: String) extends Actor {
override def receive: PartialFunction[Any, Unit] = {
case Invocation(joinpoint: JoinPoint, tx: Option[Transaction]) =>
- ActiveObject.threadBoundTx.set(tx)
+ TransactionManagement.threadBoundTx.set(tx)
try {
reply(joinpoint.proceed)
} catch {
case e =>
- throw new TransactionAwareException(e, tx)
+ throw new TransactionAwareWrapperException(e, tx)
}
case unexpected =>
@@ -379,6 +256,22 @@ private[kernel] class Dispatcher(val targetName: String) extends Actor {
}
}
+// TODO: create a method setCallee/setCaller to the joinpoint interface and compiler
+/*
+private def nullOutTransientFieldsInJoinpoint(joinpoint: JoinPoint) = {
+ val clazz = joinpoint.getClass
+ val callee = clazz.getDeclaredField("CALLEE")
+ callee.setAccessible(true)
+ callee.set(joinpoint, null)
+ val caller = clazz.getDeclaredField("CALLER")
+ caller.setAccessible(true)
+ caller.set(joinpoint, null)
+ val interceptors = clazz.getDeclaredField("AROUND_INTERCEPTORS")
+ interceptors.setAccessible(true)
+ interceptors.set(joinpoint, null)
+}
+*/
+
/*
ublic class CamelInvocationHandler implements InvocationHandler {
private final Endpoint endpoint;
diff --git a/kernel/src/main/scala/actor/Actor.scala b/kernel/src/main/scala/actor/Actor.scala
index 6c290cdbf1..e9d96e57c2 100644
--- a/kernel/src/main/scala/actor/Actor.scala
+++ b/kernel/src/main/scala/actor/Actor.scala
@@ -9,7 +9,7 @@ import java.util.concurrent.{CopyOnWriteArraySet, TimeUnit}
import kernel.reactor._
import kernel.config.ScalaConfig._
import kernel.nio.{NettyClient, RemoteRequest}
-import kernel.stm.Transaction
+import kernel.stm.{TransactionAwareWrapperException, TransactionManagement, Transaction}
import kernel.util.Logging
import kernel.util.Helpers._
@@ -29,13 +29,13 @@ object DispatcherType {
}
class ActorMessageHandler(val actor: Actor) extends MessageHandler {
- def handle(handle: MessageHandle) = actor.handle(handle.message, handle.future)
+ def handle(handle: MessageHandle) = actor.handle(handle)
}
-trait Actor extends Logging {
- var timeout: Long = 5000L
- var isRemote = false
-
+trait Actor extends Logging with TransactionManagement {
+ val transactionalInstance = this
+ @volatile private[this] var isRemote = false
+ @volatile private[this] var isTransactional = false
@volatile private[this] var isRunning: Boolean = false
protected[this] var id: String = super.toString
protected[this] var dispatcher: MessageDispatcher = _
@@ -118,31 +118,62 @@ trait Actor extends Logging {
// ==== API ====
// =============
+ /**
+ * TODO: document
+ */
+ @volatile var timeout: Long = 5000L
+
+ /**
+ * TODO: document
+ */
def !(message: AnyRef) = if (isRunning) {
if (isRemote) NettyClient.send(new RemoteRequest(true, message, null, this.getClass.getName, null, true, false))
- else mailbox.append(new MessageHandle(this, message, new NullFutureResult))
+ else mailbox.append(new MessageHandle(this, message, new NullFutureResult, activeTx))
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
+ /**
+ * TODO: document
+ */
def !: Option[T] = if (isRunning) {
- val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
- future.await_?
- getResultOrThrowException(future)
+ if (TransactionManagement.isTransactionsEnabled) {
+ transactionalDispatch(message, timeout, false)
+ } else {
+ val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
+ future.await
+ getResultOrThrowException(future)
+ }
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
+ /**
+ * TODO: document
+ */
def !: Option[T] = !
+ /**
+ * TODO: document
+ */
def !?[T](message: AnyRef): T = if (isRunning) {
- val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, 0)
- future.await_!
- getResultOrThrowException(future).get
+ if (TransactionManagement.isTransactionsEnabled) {
+ transactionalDispatch(message, 0, true).get
+ } else {
+ val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, 0)
+ future.awaitBlocking
+ getResultOrThrowException(future).get
+ }
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
+ /**
+ * TODO: document
+ */
protected[this] def reply(message: AnyRef) = senderFuture match {
case None => throw new IllegalStateException("No sender future in scope, can't reply. Have you used '!' (async, fire-and-forget)? If so, switch to '!!' which will return a future to wait on." )
case Some(future) => future.completeWithResult(message)
}
// FIXME can be deadlock prone if cyclic linking? - HOWTO?
+ /**
+ * TODO: document
+ */
protected[this] def link(actor: Actor) = synchronized { actor.synchronized {
if (isRunning) {
linkedActors.add(actor)
@@ -153,6 +184,9 @@ trait Actor extends Logging {
}}
// FIXME can be deadlock prone if cyclic linking? - HOWTO?
+ /**
+ * TODO: document
+ */
protected[this] def unlink(actor: Actor) = synchronized { actor.synchronized {
if (isRunning) {
if (!linkedActors.contains(actor)) throw new IllegalStateException("Actor [" + actor + "] is not a linked actor, can't unlink")
@@ -162,6 +196,9 @@ trait Actor extends Logging {
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
}}
+ /**
+ * TODO: document
+ */
def start = synchronized {
if (!isRunning) {
dispatcherType match {
@@ -181,6 +218,9 @@ trait Actor extends Logging {
}
}
+ /**
+ * TODO: document
+ */
def stop = synchronized {
if (isRunning) {
dispatcher.unregisterHandler(this)
@@ -205,30 +245,57 @@ trait Actor extends Logging {
link(actor)
}
+ /**
+ * TODO: document
+ */
def spawn(actorClass: Class[_]) = {
// FIXME: should pass in dispatcher etc. - inherit
}
+ /**
+ * TODO: document
+ */
def spawnRemote(actorClass: Class[_]) = {
}
+ /**
+ * TODO: document
+ */
def spawnLink(actorClass: Class[_]) = {
}
+ /**
+ * TODO: document
+ */
def spawnLinkRemote(actorClass: Class[_]) = {
}
+ /**
+ * TODO: document
+ */
def makeRemote = isRemote = true
+ /**
+ * TODO: document
+ */
+ def makeTransactional = synchronized {
+ if (isRunning) throw new IllegalArgumentException("Can not make actor transactional after it has been started")
+ else isTransactional = true
+ }
+
// ================================
// ==== IMPLEMENTATION DETAILS ====
// ================================
- private[kernel] def handle(message: AnyRef, future: CompletableFutureResult) = synchronized {
+ private[kernel] def handle(messageHandle: MessageHandle) = synchronized {
+ val message = messageHandle.message
+ val future = messageHandle.future
try {
+ if (messageHandle.tx.isDefined)
+ TransactionManagement.threadBoundTx.set(messageHandle.tx)
senderFuture = Some(future)
- if (base.isDefinedAt(message)) base(message)
+ if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
else throw new IllegalArgumentException("No handler matching message [" + message + "] in actor [" + this.getClass.getName + "]")
} catch {
case e =>
@@ -237,21 +304,49 @@ trait Actor extends Logging {
}
}
- private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = {
+ private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult =
if (isRemote) NettyClient.send(new RemoteRequest(true, message, null, this.getClass.getName, null, false, false))
else {
val future = new DefaultCompletableFutureResult(timeout)
- mailbox.append(new MessageHandle(this, message, future))
+ mailbox.append(new MessageHandle(this, message, future, TransactionManagement.threadBoundTx.get))
future
}
+
+ private def transactionalDispatch[T](message: AnyRef, timeout: Long, blocking: Boolean): Option[T] = {
+ // FIXME join TX with same id, do not COMMIT
+ println("------ Actor1: " + this)
+ tryToCommitTransaction
+ println("------ Actor2: " + this)
+ if (isInExistingTransaction) joinExistingTransaction
+ else if (isTransactional) startNewTransaction
+ println("------ Actor3: " + this)
+ incrementTransaction
+ try {
+ val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
+ if (blocking) future.awaitBlocking
+ else future.await
+ getResultOrThrowException(future)
+ } catch {
+ case e: TransactionAwareWrapperException =>
+ e.cause.printStackTrace
+ rollback(e.tx)
+ throw e.cause
+ } finally {
+ decrementTransaction
+ if (isTransactionAborted) removeTransactionIfTopLevel
+ else tryToPrecommitTransaction
+ TransactionManagement.threadBoundTx.set(None)
+ }
}
+
private def getResultOrThrowException[T](future: FutureResult): Option[T] =
if (isRemote) getRemoteResultOrThrowException(future)
else {
if (future.exception.isDefined) {
val (_, cause) = future.exception.get
- throw cause
+ if (TransactionManagement.isTransactionsEnabled) throw new TransactionAwareWrapperException(cause, activeTx)
+ else throw cause
} else {
future.result.asInstanceOf[Option[T]]
}
@@ -262,7 +357,8 @@ trait Actor extends Logging {
private def getRemoteResultOrThrowException[T](future: FutureResult): Option[T] =
if (future.exception.isDefined) {
val (_, cause) = future.exception.get
- throw cause // throw new TransactionAwareException(cause, activeTx)
+ if (TransactionManagement.isTransactionsEnabled) throw new TransactionAwareWrapperException(cause, activeTx)
+ else throw cause
} else {
if (future.result.isDefined) {
val (res, tx) = future.result.get.asInstanceOf[Tuple2[Option[T], Option[Transaction]]]
diff --git a/kernel/src/main/scala/actor/Supervisor.scala b/kernel/src/main/scala/actor/Supervisor.scala
index 9086f29e5b..dd3b521150 100644
--- a/kernel/src/main/scala/actor/Supervisor.scala
+++ b/kernel/src/main/scala/actor/Supervisor.scala
@@ -92,6 +92,7 @@ abstract class SupervisorFactory extends Logging {
* @author Jonas Bonér
*/
class Supervisor(handler: FaultHandlingStrategy) extends Actor with Logging {
+ makeTransactional
trapExit = true
faultHandler = Some(handler)
diff --git a/kernel/src/main/scala/nio/NettyClient.scala b/kernel/src/main/scala/nio/NettyClient.scala
index bb76accb02..fe2a17a0e9 100644
--- a/kernel/src/main/scala/nio/NettyClient.scala
+++ b/kernel/src/main/scala/nio/NettyClient.scala
@@ -116,9 +116,10 @@ class ObjectClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResu
val result = event.getMessage
if (result.isInstanceOf[RemoteReply]) {
val reply = result.asInstanceOf[RemoteReply]
- val tx = reply.tx
+// val tx = reply.tx
val future = futures.get(reply.id)
- if (reply.successful) future.completeWithResult((reply.message, tx))
+ //if (reply.successful) future.completeWithResult((reply.message, tx))
+ if (reply.successful) future.completeWithResult(reply.message)
else future.completeWithException(null, reply.exception)
} else throw new IllegalArgumentException("Unknown message received in NIO client handler: " + result)
} catch {
diff --git a/kernel/src/main/scala/nio/NettyServer.scala b/kernel/src/main/scala/nio/NettyServer.scala
index b7ff74f459..8baf4cc1ae 100644
--- a/kernel/src/main/scala/nio/NettyServer.scala
+++ b/kernel/src/main/scala/nio/NettyServer.scala
@@ -8,7 +8,8 @@ import java.lang.reflect.InvocationTargetException
import java.net.InetSocketAddress
import java.util.concurrent.{ConcurrentHashMap, Executors}
import kernel.actor._
-import kernel.reactor.{DefaultCompletableFutureResult, CompletableFutureResult}
+import kernel.reactor.{MessageHandle, DefaultCompletableFutureResult, CompletableFutureResult}
+import kernel.stm.TransactionManagement
import kernel.util.Logging
import java.util.ArrayList
import java.util.List
@@ -66,7 +67,7 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
private val actors = new ConcurrentHashMap[String, Actor]
private val MESSAGE_HANDLE = classOf[Actor].getDeclaredMethod(
- "handle", Array[Class[_]](classOf[AnyRef], classOf[CompletableFutureResult]))
+ "handle", Array[Class[_]](classOf[MessageHandle]))
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
@@ -118,7 +119,8 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
val resultOrNone = actor !! request.message
val result = if (resultOrNone.isDefined) resultOrNone else null
log.debug("Returning result from actor invocation [%s]", result)
- channel.write(request.newReplyWithMessage(result, ActiveObject.threadBoundTx.get))
+ //channel.write(request.newReplyWithMessage(result, TransactionManagement.threadBoundTx.get))
+ channel.write(request.newReplyWithMessage(result, null))
} catch {
case e: InvocationTargetException =>
log.error("Could not invoke remote actor [%s] due to: %s", request.target, e.getCause)
@@ -143,7 +145,8 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
else {
val result = messageReceiver.invoke(activeObject, unescapedArgs)
log.debug("Returning result from active object invocation [%s]", result)
- channel.write(request.newReplyWithMessage(result, ActiveObject.threadBoundTx.get))
+ //channel.write(request.newReplyWithMessage(result, TransactionManagement.threadBoundTx.get))
+ channel.write(request.newReplyWithMessage(result, null))
}
} catch {
case e: InvocationTargetException =>
@@ -157,8 +160,8 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
val tx = request.tx
if (tx.isDefined) {
tx.get.reinit
- ActiveObject.threadBoundTx.set(tx)
- } else ActiveObject.threadBoundTx.set(None)
+ TransactionManagement.threadBoundTx.set(tx)
+ } else TransactionManagement.threadBoundTx.set(None)
}
private def unescapeArgs(args: scala.List[AnyRef], argClasses: scala.List[Class[_]]) = {
diff --git a/kernel/src/main/scala/reactor/Future.scala b/kernel/src/main/scala/reactor/Future.scala
index d74beb5dff..25ab4a8fcf 100644
--- a/kernel/src/main/scala/reactor/Future.scala
+++ b/kernel/src/main/scala/reactor/Future.scala
@@ -13,8 +13,8 @@ import java.util.concurrent.TimeUnit
class FutureTimeoutException(message: String) extends RuntimeException(message)
sealed trait FutureResult {
- def await_?
- def await_!
+ def await
+ def awaitBlocking
def isCompleted: Boolean
def isExpired: Boolean
def timeoutInNanos: Long
@@ -39,7 +39,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes
private var _result: Option[AnyRef] = None
private var _exception: Option[Tuple2[AnyRef, Throwable]] = None
- def await_? = try {
+ def await = try {
_lock.lock
var wait = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)
while (!_completed && wait > 0) {
@@ -56,7 +56,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes
_lock.unlock
}
- def await_! = try {
+ def awaitBlocking = try {
_lock.lock
while (!_completed) {
_signal.await
@@ -121,8 +121,8 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes
class NullFutureResult extends CompletableFutureResult {
def completeWithResult(result: AnyRef) = {}
def completeWithException(toBlame: AnyRef, exception: Throwable) = {}
- def await_? = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
- def await_! = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
+ def await = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
+ def awaitBlocking = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
def isCompleted: Boolean = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
def isExpired: Boolean = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
def timeoutInNanos: Long = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
diff --git a/kernel/src/main/scala/reactor/Reactor.scala b/kernel/src/main/scala/reactor/Reactor.scala
index 86a9a73981..ab409a941c 100644
--- a/kernel/src/main/scala/reactor/Reactor.scala
+++ b/kernel/src/main/scala/reactor/Reactor.scala
@@ -13,6 +13,7 @@ package se.scalablesolutions.akka.kernel.reactor
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.ThreadFactory
import java.util.{LinkedList, Queue}
+import kernel.stm.Transaction
import kernel.util.{Logging, HashCode}
trait MessageHandler {
def handle(message: MessageHandle)
@@ -33,13 +34,17 @@ trait MessageDemultiplexer {
def wakeUp
}
-class MessageHandle(val sender: AnyRef, val message: AnyRef, val future: CompletableFutureResult) {
+class MessageHandle(val sender: AnyRef,
+ val message: AnyRef,
+ val future: CompletableFutureResult,
+ val tx: Option[Transaction]) {
override def hashCode(): Int = {
var result = HashCode.SEED
result = HashCode.hash(result, sender)
result = HashCode.hash(result, message)
result = HashCode.hash(result, future)
+ result = if (tx.isDefined) HashCode.hash(result, tx.get.id) else result
result
}
@@ -48,7 +53,9 @@ class MessageHandle(val sender: AnyRef, val message: AnyRef, val future: Complet
that.isInstanceOf[MessageHandle] &&
that.asInstanceOf[MessageHandle].sender == sender &&
that.asInstanceOf[MessageHandle].message == message &&
- that.asInstanceOf[MessageHandle].future == future
+ that.asInstanceOf[MessageHandle].future == future &&
+ that.asInstanceOf[MessageHandle].tx.isDefined == tx.isDefined &&
+ that.asInstanceOf[MessageHandle].tx.get.id == tx.get.id
}
class MessageQueue {
diff --git a/kernel/src/main/scala/state/State.scala b/kernel/src/main/scala/state/State.scala
index a49d296de4..0749fbe2bd 100644
--- a/kernel/src/main/scala/state/State.scala
+++ b/kernel/src/main/scala/state/State.scala
@@ -4,6 +4,7 @@
package se.scalablesolutions.akka.kernel.state
+import kernel.stm.TransactionManagement
import org.codehaus.aspectwerkz.proxy.Uuid
import kernel.actor.ActiveObject
@@ -12,71 +13,54 @@ import se.scalablesolutions.akka.collection._
import scala.collection.mutable.HashMap
sealed abstract class TransactionalStateConfig
-abstract class TransactionalMapConfig extends TransactionalStateConfig
-abstract class TransactionalVectorConfig extends TransactionalStateConfig
-abstract class TransactionalRefConfig extends TransactionalStateConfig
-
abstract class PersistentStorageConfig extends TransactionalStateConfig
case class CassandraStorageConfig extends PersistentStorageConfig
case class TerracottaStorageConfig extends PersistentStorageConfig
case class TokyoCabinetStorageConfig extends PersistentStorageConfig
-case class PersistentMapConfig(storage: PersistentStorageConfig) extends TransactionalMapConfig
-case class InMemoryMapConfig extends TransactionalMapConfig
-
-case class PersistentVectorConfig(storage: PersistentStorageConfig) extends TransactionalVectorConfig
-case class InMemoryVectorConfig extends TransactionalVectorConfig
-
-case class PersistentRefConfig(storage: PersistentStorageConfig) extends TransactionalRefConfig
-case class InMemoryRefConfig extends TransactionalRefConfig
-
+/**
+ * Scala API.
+ *
+ * Example Scala usage:
+ *
+ * val myMap = TransactionalState.newPersistentMap(CassandraStorageConfig)
+ *
+ */
object TransactionalState extends TransactionalState
+
+/**
+ * Java API.
+ *
+ * Example Java usage:
+ *
+ * TransactionalState state = new TransactionalState();
+ * TransactionalMap myMap = state.newPersistentMap(new CassandraStorageConfig());
+ *
+ */
class TransactionalState {
-
- /**
- * Usage:
- *
- * val myMap = TransactionalState.newMap(PersistentMapConfig(CassandraStorageConfig))
- *
- */
- def newMap(config: TransactionalMapConfig) = config match {
- case PersistentMapConfig(storage) => storage match {
- case CassandraStorageConfig() => new CassandraPersistentTransactionalMap
- case TerracottaStorageConfig() => throw new UnsupportedOperationException
- case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
- }
- case InMemoryMapConfig() => new InMemoryTransactionalMap
+ def newPersistentMap(config: PersistentStorageConfig): TransactionalMap[String, AnyRef] = config match {
+ case CassandraStorageConfig() => new CassandraPersistentTransactionalMap
+ case TerracottaStorageConfig() => throw new UnsupportedOperationException
+ case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
}
- /**
- * Usage:
- *
- * val myVector = TransactionalState.newVector(PersistentVectorConfig(CassandraStorageConfig))
- *
- */
- def newVector(config: TransactionalVectorConfig) = config match {
- case PersistentVectorConfig(storage) => storage match {
- case CassandraStorageConfig() => new CassandraPersistentTransactionalVector
- case TerracottaStorageConfig() => throw new UnsupportedOperationException
- case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
- }
- case InMemoryVectorConfig() => new InMemoryTransactionalVector
+ def newPersistentVector(config: PersistentStorageConfig): TransactionalVector[AnyRef] = config match {
+ case CassandraStorageConfig() => new CassandraPersistentTransactionalVector
+ case TerracottaStorageConfig() => throw new UnsupportedOperationException
+ case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
}
- /**
- * Usage:
- *
- * val myRef = TransactionalState.newRef(PersistentRefConfig(CassandraStorageConfig))
- *
- */
- def newRef(config: TransactionalRefConfig) = config match {
- case PersistentRefConfig(storage) => storage match {
- case CassandraStorageConfig() => new CassandraPersistentTransactionalRef
- case TerracottaStorageConfig() => throw new UnsupportedOperationException
- case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
- }
- case InMemoryRefConfig() => new TransactionalRef
+ def newPersistentRef(config: PersistentStorageConfig): TransactionalRef[AnyRef] = config match {
+ case CassandraStorageConfig() => new CassandraPersistentTransactionalRef
+ case TerracottaStorageConfig() => throw new UnsupportedOperationException
+ case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
}
+
+ def newInMemoryMap[K, V]: TransactionalMap[K, V] = new InMemoryTransactionalMap[K, V]
+
+ def newInMemoryVector[T]: TransactionalVector[T] = new InMemoryTransactionalVector[T]
+
+ def newInMemoryRef[T]: TransactionalRef[T] = new TransactionalRef[T]
}
/**
@@ -90,15 +74,15 @@ trait Transactional {
private[kernel] def commit
private[kernel] def rollback
- protected def isInTransaction = ActiveObject.threadBoundTx.get.isDefined
+ protected def isInTransaction = TransactionManagement.threadBoundTx.get.isDefined
protected def nonTransactionalCall = throw new IllegalStateException("Can't access transactional map outside the scope of a transaction")
}
/**
* Base trait for all state implementations (persistent or in-memory).
- *
+ *
* TODO: Make this class inherit scala.collection.mutable.Map and/or java.util.Map
- *
+ *
* @author Jonas Bonér
*/
trait TransactionalMap[K, V] extends Transactional with scala.collection.mutable.Map[K, V] {
@@ -118,7 +102,7 @@ trait TransactionalMapGuard[K, V] extends TransactionalMap[K, V] with Transactio
abstract override def remove(key: K) =
if (isInTransaction) super.remove(key)
else nonTransactionalCall
- abstract override def elements: Iterator[(K, V)] =
+ abstract override def elements: Iterator[(K, V)] =
if (isInTransaction) super.elements
else nonTransactionalCall
abstract override def get(key: K): Option[V] =
@@ -149,7 +133,7 @@ class InMemoryTransactionalMap[K, V] extends TransactionalMap[K, V] {
override def begin = snapshot = state
override def commit = snapshot = state
override def rollback = state = snapshot
-
+
// ---- Overriding scala.collection.mutable.Map behavior ----
override def contains(key: K): Boolean = state.contains(key)
override def clear = state = new HashTrie[K, V]
@@ -171,16 +155,16 @@ class InMemoryTransactionalMap[K, V] extends TransactionalMap[K, V] {
/**
* Base class for all persistent transactional map implementations should extend.
* Implements a Unit of Work, records changes into a change set.
- *
+ *
* Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
- *
+ *
* @author Jonas Bonér
*/
abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
// FIXME: need to handle remove in another changeSet
protected[kernel] val changeSet = new HashMap[K, V]
-
+
def getRange(start: Int, count: Int)
// ---- For Transactional ----
@@ -249,9 +233,9 @@ abstract class TransactionalVector[T] extends Transactional with RandomAccessSeq
/**
* Implements an in-memory transactional vector.
- *
+ *
* Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
- *
+ *
* @author Jonas Bonér
*/
class InMemoryTransactionalVector[T] extends TransactionalVector[T] {
@@ -308,20 +292,27 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect
override def length: Int = CassandraNode.getVectorStorageSizeFor(uuid)
override def apply(index: Int): AnyRef = get(index)
override def first: AnyRef = get(0)
- override def last: AnyRef = get(length)
+ override def last: AnyRef = {
+ val l = length
+ if (l == 0) throw new NoSuchElementException("Vector is empty")
+ get(length - 1)
+ }
// ---- For Transactional ----
override def commit = {
// FIXME: should use batch function once the bug is resolved
- for (element <- changeSet) CassandraNode.insertVectorStorageEntryFor(uuid, element)
+ for (element <- changeSet) {
+ CassandraNode.insertVectorStorageEntryFor(uuid, element)
+ println("33333333333 " + CassandraNode.getVectorStorageSizeFor(uuid))
+ }
}
}
/**
* Implements a transactional reference.
- *
+ *
* Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
- *
+ *
* @author Jonas Bonér
*/
class TransactionalRef[T] extends Transactional {
diff --git a/kernel/src/main/scala/stm/TransactionManagement.scala b/kernel/src/main/scala/stm/TransactionManagement.scala
new file mode 100644
index 0000000000..ad412cb9c1
--- /dev/null
+++ b/kernel/src/main/scala/stm/TransactionManagement.scala
@@ -0,0 +1,153 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.kernel.stm
+
+import java.lang.reflect.Field
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kernel.state.{TransactionalMap, TransactionalRef, TransactionalVector}
+import kernel.util.Logging
+
+class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
+ override def toString(): String = "TransactionAwareWrapperException[" + cause + ", " + tx + "]"
+}
+
+object TransactionManagement {
+ private val txEnabled = new AtomicBoolean(true)
+
+ def isTransactionsEnabled = txEnabled.get
+ def enableTransactions = txEnabled.set(true)
+
+ private[kernel] lazy val threadBoundTx: ThreadLocal[Option[Transaction]] = {
+ val tl = new ThreadLocal[Option[Transaction]]
+ tl.set(None)
+ tl
+ }
+}
+
+// FIXME: STM that allows concurrent updates, detects collision, rolls back and restarts
+trait TransactionManagement extends Logging {
+ val transactionalInstance: AnyRef
+
+ private lazy val changeSet = new ChangeSet(transactionalInstance.getClass.getName)
+
+ import TransactionManagement.threadBoundTx
+ private[kernel] var activeTx: Option[Transaction] = None
+
+ protected def startNewTransaction = {
+ val (maps, vectors, refs) = getTransactionalItemsFor(transactionalInstance)
+ changeSet.maps = maps
+ changeSet.refs = refs
+ changeSet.vectors = vectors
+
+ val newTx = new Transaction
+ newTx.begin(changeSet)
+ val tx = Some(newTx)
+ activeTx = tx
+ threadBoundTx.set(tx)
+ }
+
+ protected def joinExistingTransaction = {
+ val cflowTx = threadBoundTx.get
+ if (activeTx.isDefined && cflowTx.isDefined && activeTx.get.id == cflowTx.get.id) {
+ val currentTx = cflowTx.get
+ currentTx.join(changeSet)
+ activeTx = Some(currentTx)
+ }
+ }
+
+ protected def tryToPrecommitTransaction = if (activeTx.isDefined) activeTx.get.precommit(changeSet)
+
+ protected def tryToCommitTransaction: Boolean = if (activeTx.isDefined) {
+ val tx = activeTx.get
+ tx.commit(changeSet)
+ removeTransactionIfTopLevel
+ true
+ } else false
+
+ protected def rollback(tx: Option[Transaction]) = tx match {
+ case None => {} // no tx; nothing to do
+ case Some(tx) =>
+ tx.rollback(changeSet)
+ }
+
+ protected def isInExistingTransaction = {
+ println(TransactionManagement)
+ println(TransactionManagement.threadBoundTx)
+ println(TransactionManagement.threadBoundTx.get)
+ println(TransactionManagement.threadBoundTx.get.isDefined)
+ TransactionManagement.threadBoundTx.get.isDefined
+ }
+
+ protected def isTransactionAborted = activeTx.isDefined && activeTx.get.isAborted
+
+ protected def incrementTransaction = if (activeTx.isDefined) activeTx.get.increment
+
+ protected def decrementTransaction = if (activeTx.isDefined) activeTx.get.decrement
+
+ protected def removeTransactionIfTopLevel =
+ if (activeTx.isDefined && activeTx.get.topLevel_?) {
+ activeTx = None
+ threadBoundTx.set(None)
+ }
+
+ protected def reenteringExistingTransaction= if (activeTx.isDefined) {
+ val cflowTx = threadBoundTx.get
+ if (cflowTx.isDefined && cflowTx.get.id == activeTx.get.id) false
+ else true
+ } else true
+
+ /**
+ * Search for transactional items for a specific target instance, crawl the class hierarchy recursively up to the top.
+ */
+ protected def getTransactionalItemsFor(targetInstance: AnyRef):
+ Tuple3[List[TransactionalMap[_, _]], List[TransactionalVector[_]], List[TransactionalRef[_]]] = {
+ require(targetInstance != null)
+ var maps: List[TransactionalMap[_, _]] = Nil
+ var refs: List[TransactionalRef[_]] = Nil
+ var vectors: List[TransactionalVector[_]] = Nil
+
+ def getTransactionalItemsFor(target: Class[_]):
+ Tuple3[List[TransactionalMap[_, _]], List[TransactionalVector[_]], List[TransactionalRef[_]]] = {
+ for {
+ field <- target.getDeclaredFields.toArray.toList.asInstanceOf[List[Field]]
+ fieldType = field.getType
+ if (fieldType == classOf[TransactionalMap[_, _]]) ||
+ (fieldType == classOf[TransactionalVector[_]]) ||
+ (fieldType == classOf[TransactionalRef[_]])
+ txItem = {
+ field.setAccessible(true)
+ field.get(targetInstance)
+ }
+ if txItem != null
+ } {
+ log.debug("Managing transactional state [%s]", field)
+ if (txItem.isInstanceOf[TransactionalMap[_, _]]) maps ::= txItem.asInstanceOf[TransactionalMap[_, _]]
+ else if (txItem.isInstanceOf[TransactionalRef[_]]) refs ::= txItem.asInstanceOf[TransactionalRef[_]]
+ else if (txItem.isInstanceOf[TransactionalVector[_]]) vectors ::= txItem.asInstanceOf[TransactionalVector[_]]
+ }
+ val parent = target.getSuperclass
+ if (parent == classOf[Object]) (maps, vectors, refs)
+ else getTransactionalItemsFor(parent)
+ }
+
+ // start the search for transactional items, crawl the class hierarchy up until we reach Object
+ getTransactionalItemsFor(targetInstance.getClass)
+ }
+
+ /*
+ protected def getResultOrThrowException[T](future: FutureResult): Option[T] =
+ if (future.exception.isDefined) {
+ val (_, cause) = future.exception.get
+ throw new TransactionAwareWrapperException(cause, activeTx)
+ } else {
+ if (future.result.isDefined) {
+ val (res, tx) = future.result.get.asInstanceOf[Tuple2[AnyRef, Option[Transaction]]]
+ Some(res).asInstanceOf[Option[T]]
+ } else None
+ }
+ */
+}
+
diff --git a/kernel/src/test/scala/EventBasedDispatcherTest.scala b/kernel/src/test/scala/EventBasedDispatcherTest.scala
index 7e420e4f58..58eda77b50 100644
--- a/kernel/src/test/scala/EventBasedDispatcherTest.scala
+++ b/kernel/src/test/scala/EventBasedDispatcherTest.scala
@@ -60,7 +60,7 @@ class EventBasedDispatcherTest {
dispatcher.registerHandler(key, new TestMessageHandle(handleLatch))
dispatcher.start
for (i <- 0 until 100) {
- dispatcher.messageQueue.append(new MessageHandle(key, new Object, new NullFutureResult))
+ dispatcher.messageQueue.append(new MessageHandle(key, new Object, new NullFutureResult, None))
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
@@ -74,8 +74,8 @@ class EventBasedDispatcherTest {
dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch))
dispatcher.registerHandler(key2, new TestMessageHandle(handleLatch))
dispatcher.start
- dispatcher.messageQueue.append(new MessageHandle(key1, new Object, new NullFutureResult))
- dispatcher.messageQueue.append(new MessageHandle(key2, new Object, new NullFutureResult))
+ dispatcher.messageQueue.append(new MessageHandle(key1, new Object, new NullFutureResult, None))
+ dispatcher.messageQueue.append(new MessageHandle(key2, new Object, new NullFutureResult, None))
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
}
@@ -109,8 +109,8 @@ class EventBasedDispatcherTest {
})
dispatcher.start
for (i <- 0 until 100) {
- dispatcher.messageQueue.append(new MessageHandle(key1, new Integer(i), new NullFutureResult))
- dispatcher.messageQueue.append(new MessageHandle(key2, new Integer(i), new NullFutureResult))
+ dispatcher.messageQueue.append(new MessageHandle(key1, new Integer(i), new NullFutureResult, None))
+ dispatcher.messageQueue.append(new MessageHandle(key2, new Integer(i), new NullFutureResult, None))
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
diff --git a/kernel/src/test/scala/InMemoryActorSpec.scala b/kernel/src/test/scala/InMemoryActorSpec.scala
new file mode 100644
index 0000000000..2397f42e13
--- /dev/null
+++ b/kernel/src/test/scala/InMemoryActorSpec.scala
@@ -0,0 +1,136 @@
+package se.scalablesolutions.akka.kernel.actor
+
+import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.TimeUnit
+
+import kernel.state.TransactionalState
+import kernel.reactor._
+
+import org.junit.{Test, Before}
+import org.junit.Assert._
+
+case class SetMapState(key: String, value: String)
+case class SetVectorState(key: String)
+case class SetRefState(key: String)
+case class GetMapState(key: String)
+case object GetVectorState
+case object GetRefState
+case class Success(key: String, value: String)
+case class Failure(key: String, value: String, failer: Actor)
+
+class InMemStatefulActor extends Actor {
+ makeTransactional
+ private val mapState = TransactionalState.newInMemoryMap[String, String]
+ private val vectorState = TransactionalState.newInMemoryVector[String]
+ private val refState = TransactionalState.newInMemoryRef[String]
+
+ def receive: PartialFunction[Any, Unit] = {
+ case GetMapState(key) =>
+ reply(mapState.get(key).get)
+ case GetVectorState =>
+ reply(vectorState.last)
+ case GetRefState =>
+ reply(refState.get.get)
+ case SetMapState(key, msg) =>
+ mapState.put(key, msg)
+ reply(msg)
+ case SetVectorState(msg) =>
+ vectorState.add(msg)
+ reply(msg)
+ case SetRefState(msg) =>
+ refState.swap(msg)
+ reply(msg)
+ case Success(key, msg) =>
+ mapState.put(key, msg)
+ vectorState.add(msg)
+ refState.swap(msg)
+ reply(msg)
+ case Failure(key, msg, failer) =>
+ mapState.put(key, msg)
+ vectorState.add(msg)
+ refState.swap(msg)
+ failer !! "Failure"
+ reply(msg)
+ }
+}
+
+class InMemFailerActor extends Actor {
+ makeTransactional
+ def receive: PartialFunction[Any, Unit] = {
+ case "Failure" =>
+ throw new RuntimeException("expected")
+ }
+}
+
+class InMemoryActorSpec {
+ @Test
+ def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
+ val stateful = new InMemStatefulActor
+ stateful.start
+ stateful !! SetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
+ stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
+ assertEquals("new state", (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
+ }
+
+ @Test
+ def testMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
+ val stateful = new InMemStatefulActor
+ stateful.start
+ stateful !! SetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
+ val failer = new InMemFailerActor
+ failer.start
+ try {
+ stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
+ fail("should have thrown an exception")
+ } catch {case e: RuntimeException => {}}
+ assertEquals("init", (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
+ }
+
+ @Test
+ def testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
+ val stateful = new InMemStatefulActor
+ stateful.start
+ stateful !! SetVectorState("init") // set init state
+ stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
+ stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // to trigger commit
+ assertEquals("new state", (stateful !! GetVectorState).get)
+ }
+
+ @Test
+ def testVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
+ val stateful = new InMemStatefulActor
+ stateful.start
+ stateful !! SetVectorState("init") // set init state
+ val failer = new InMemFailerActor
+ failer.start
+ try {
+ stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
+ fail("should have thrown an exception")
+ } catch {case e: RuntimeException => {}}
+ assertEquals("init", (stateful !! GetVectorState).get) // check that state is == init state
+ }
+
+ @Test
+ def testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
+ val stateful = new InMemStatefulActor
+ stateful.start
+ stateful !! SetRefState("init") // set init state
+ stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
+ stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // to trigger commit
+ assertEquals("new state", (stateful !! GetRefState).get)
+ }
+
+ @Test
+ def testRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
+ val stateful = new InMemStatefulActor
+ stateful.start
+ stateful !! SetRefState("init") // set init state
+ val failer = new InMemFailerActor
+ failer.start
+ try {
+ stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
+ fail("should have thrown an exception")
+ } catch {case e: RuntimeException => {}}
+ assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state
+ }
+}
\ No newline at end of file
diff --git a/kernel/src/test/scala/PersistentActorSpec.scala b/kernel/src/test/scala/PersistentActorSpec.scala
new file mode 100644
index 0000000000..f12c72e731
--- /dev/null
+++ b/kernel/src/test/scala/PersistentActorSpec.scala
@@ -0,0 +1,135 @@
+package se.scalablesolutions.akka.kernel.actor
+
+import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.TimeUnit
+
+import kernel.reactor._
+
+import kernel.state.{CassandraStorageConfig, TransactionalState}
+import org.junit.{Test, Before}
+import org.junit.Assert._
+
+class PersistentActor extends Actor {
+ makeTransactional
+ private val mapState = TransactionalState.newPersistentMap(CassandraStorageConfig())
+ private val vectorState = TransactionalState.newPersistentVector(CassandraStorageConfig())
+ private val refState = TransactionalState.newPersistentRef(CassandraStorageConfig())
+
+ def receive: PartialFunction[Any, Unit] = {
+ case GetMapState(key) =>
+ reply(mapState.get(key).get)
+ case GetVectorState =>
+ reply(vectorState.last)
+ case GetRefState =>
+ reply(refState.get.get)
+ case SetMapState(key, msg) =>
+ mapState.put(key, msg)
+ reply(msg)
+ case SetVectorState(msg) =>
+ vectorState.add(msg)
+ reply(msg)
+ case SetRefState(msg) =>
+ refState.swap(msg)
+ reply(msg)
+ case Success(key, msg) =>
+ mapState.put(key, msg)
+ vectorState.add(msg)
+ refState.swap(msg)
+ reply(msg)
+ case Failure(key, msg, failer) =>
+ mapState.put(key, msg)
+ vectorState.add(msg)
+ refState.swap(msg)
+ failer !! "Failure"
+ reply(msg)
+ }
+}
+
+class PersistentFailerActor extends Actor {
+ makeTransactional
+ def receive: PartialFunction[Any, Unit] = {
+ case "Failure" =>
+ throw new RuntimeException("expected")
+ }
+}
+
+object PersistenceManager {
+ @volatile var isRunning = false
+ def init = if (!isRunning) {
+ System.setProperty("storage-config", "config")
+ Kernel.startCassandra
+ isRunning = true
+ }
+}
+class PersistentActorSpec {
+ PersistenceManager.init
+
+ @Test
+ def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
+ val stateful = new PersistentActor
+ stateful.start
+ stateful !! SetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
+ stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
+ assertEquals("new state", (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
+ }
+
+ @Test
+ def testMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
+ val stateful = new PersistentActor
+ stateful.start
+ stateful !! SetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
+ val failer = new PersistentFailerActor
+ failer.start
+ try {
+ stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
+ fail("should have thrown an exception")
+ } catch {case e: RuntimeException => {}}
+ assertEquals("init", (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
+ }
+
+ @Test
+ def testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
+ val stateful = new PersistentActor
+ stateful.start
+ stateful !! SetVectorState("init") // set init state
+ stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
+ assertEquals("new state", (stateful !! GetVectorState).get)
+ }
+
+ @Test
+ def testVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
+ val stateful = new PersistentActor
+ stateful.start
+ stateful !! SetVectorState("init") // set init state
+ val failer = new PersistentFailerActor
+ failer.start
+ try {
+ stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
+ fail("should have thrown an exception")
+ } catch {case e: RuntimeException => {}}
+ assertEquals("init", (stateful !! GetVectorState).get) // check that state is == init state
+ }
+
+ @Test
+ def testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
+ val stateful = new PersistentActor
+ stateful.start
+ stateful !! SetRefState("init") // set init state
+ stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
+ assertEquals("new state", (stateful !! GetRefState).get)
+ }
+
+ @Test
+ def testRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
+ val stateful = new PersistentActor
+ stateful.start
+ stateful !! SetRefState("init") // set init state
+ val failer = new PersistentFailerActor
+ failer.start
+ try {
+ stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
+ fail("should have thrown an exception")
+ } catch {case e: RuntimeException => {}}
+ assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state
+ }
+}
\ No newline at end of file
diff --git a/kernel/src/test/scala/RemoteActorSpec.scala b/kernel/src/test/scala/RemoteActorSpec.scala
index fb3736092c..09f46dd128 100644
--- a/kernel/src/test/scala/RemoteActorSpec.scala
+++ b/kernel/src/test/scala/RemoteActorSpec.scala
@@ -36,6 +36,7 @@ class RemoteActorSpec {
server.connect
}
}).start
+ Thread.sleep(1000)
NettyClient.connect
private val unit = TimeUnit.MILLISECONDS
@@ -74,11 +75,11 @@ class RemoteActorSpec {
actor.stop
}
- @Test
+ @Test
def sendReceiveException = {
implicit val timeout = 5000L
val actor = new RemoteActorSpecActorBidirectional
- actor.isRemote = true
+ actor.makeRemote
actor.start
try {
actor !! "Failure"
diff --git a/kernel/src/test/scala/ThreadBasedDispatcherTest.scala b/kernel/src/test/scala/ThreadBasedDispatcherTest.scala
index 4c7f9c68bc..58020e6511 100644
--- a/kernel/src/test/scala/ThreadBasedDispatcherTest.scala
+++ b/kernel/src/test/scala/ThreadBasedDispatcherTest.scala
@@ -62,7 +62,7 @@ class ThreadBasedDispatcherTest {
dispatcher.registerHandler(key, new TestMessageHandle(handleLatch))
dispatcher.start
for (i <- 0 until 100) {
- dispatcher.messageQueue.append(new MessageHandle(key, new Object, new NullFutureResult))
+ dispatcher.messageQueue.append(new MessageHandle(key, new Object, new NullFutureResult, None))
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
@@ -86,8 +86,8 @@ class ThreadBasedDispatcherTest {
}
})
dispatcher.start
- dispatcher.messageQueue.append(new MessageHandle(key1, "Sending Message 1", new NullFutureResult))
- dispatcher.messageQueue.append(new MessageHandle(key2, "Sending Message 2", new NullFutureResult))
+ dispatcher.messageQueue.append(new MessageHandle(key1, "Sending Message 1", new NullFutureResult, None))
+ dispatcher.messageQueue.append(new MessageHandle(key2, "Sending Message 2", new NullFutureResult, None))
handlersBarrier.await(5, TimeUnit.SECONDS)
assertFalse(threadingIssueDetected.get)
//dispatcher.shutdown
@@ -122,8 +122,8 @@ class ThreadBasedDispatcherTest {
})
dispatcher.start
for (i <- 0 until 100) {
- dispatcher.messageQueue.append(new MessageHandle(key1, new Integer(i), new NullFutureResult))
- dispatcher.messageQueue.append(new MessageHandle(key2, new Integer(i), new NullFutureResult))
+ dispatcher.messageQueue.append(new MessageHandle(key1, new Integer(i), new NullFutureResult, None))
+ dispatcher.messageQueue.append(new MessageHandle(key2, new Integer(i), new NullFutureResult, None))
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)