diff --git a/akka.iws b/akka.iws
index b7833c846a..7bfb13ae87 100644
--- a/akka.iws
+++ b/akka.iws
@@ -8,30 +8,22 @@
-
-
-
-
-
-
+
-
-
-
+
-
-
-
-
+
+
-
+
+
-
+
@@ -100,7 +92,46 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -176,46 +207,19 @@
-
-
+
+
-
+
-
-
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
@@ -224,7 +228,34 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -233,7 +264,25 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -242,25 +291,7 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
@@ -1360,6 +1391,36 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -1636,6 +1697,40 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -1704,6 +1799,74 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -1772,6 +1935,40 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -1978,19 +2175,19 @@
-
+
-
+
-
-
+
+
@@ -2002,13 +2199,13 @@
-
+
-
+
@@ -2021,7 +2218,7 @@
-
+
@@ -2682,6 +2879,64 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -2694,7 +2949,7 @@
-
+
@@ -2714,7 +2969,9 @@
-
+
+
+
localhost
@@ -2796,7 +3053,7 @@
-
+
@@ -2804,8 +3061,8 @@
-
-
+
+
@@ -2860,114 +3117,118 @@
-
+
-
+
-
+
-
+
-
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
+
+
+
+
+
+
+
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
new file mode 100644
index 0000000000..d78f2b741c
--- /dev/null
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
@@ -0,0 +1,109 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.api;
+
+import se.scalablesolutions.akka.kernel.config.*;
+import static se.scalablesolutions.akka.kernel.config.JavaConfig.*;
+import se.scalablesolutions.akka.kernel.actor.*;
+
+import junit.framework.TestCase;
+
+public class InMemNestedStateTest extends TestCase {
+ static String messageLog = "";
+
+ final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
+ final private ActiveObjectFactory factory = new ActiveObjectFactory();
+
+ protected void setUp() {
+ conf.configureActiveObjects(
+ new RestartStrategy(new AllForOne(), 3, 5000),
+ new Component[]{
+ // FIXME: remove string-name, add ctor to only accept target class
+ new Component(InMemStateful.class, new LifeCycle(new Permanent(), 1000), 10000000),
+ new Component(InMemStatefulNested.class, new LifeCycle(new Permanent(), 1000), 10000000),
+ new Component(InMemFailer.class, new LifeCycle(new Permanent(), 1000), 1000)
+ //new Component("inmem-clasher", InMemClasher.class, InMemClasherImpl.class, new LifeCycle(new Permanent(), 1000), 100000)
+ }).inject().supervise();
+ }
+
+ protected void tearDown() {
+ conf.stop();
+ }
+
+ public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
+ InMemStateful stateful = conf.getActiveObject(InMemStateful.class);
+ stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
+ InMemStatefulNested nested = conf.getActiveObject(InMemStatefulNested.class);
+ nested.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional
+ assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
+ assertEquals("new state", nested.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
+ }
+
+ public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() {
+ InMemStateful stateful = conf.getActiveObject(InMemStateful.class);
+ stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
+ InMemStatefulNested nested = conf.getActiveObject(InMemStatefulNested.class);
+ nested.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
+ InMemFailer failer = conf.getActiveObject(InMemFailer.class);
+ try {
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method
+ fail("should have thrown an exception");
+ } catch (RuntimeException e) {
+ } // expected
+ assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state
+ assertEquals("init", nested.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state
+ }
+
+ public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
+ InMemStateful stateful = conf.getActiveObject(InMemStateful.class);
+ stateful.setVectorState("init"); // set init state
+ InMemStatefulNested nested = conf.getActiveObject(InMemStatefulNested.class);
+ nested.setVectorState("init"); // set init state
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional
+ assertEquals("new state", stateful.getVectorState());
+ assertEquals("new state", nested.getVectorState());
+ }
+
+ public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() {
+ InMemStateful stateful = conf.getActiveObject(InMemStateful.class);
+ stateful.setVectorState("init"); // set init state
+ InMemStatefulNested nested = conf.getActiveObject(InMemStatefulNested.class);
+ nested.setVectorState("init"); // set init state
+ InMemFailer failer = conf.getActiveObject(InMemFailer.class);
+ try {
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method
+ fail("should have thrown an exception");
+ } catch (RuntimeException e) {
+ } // expected
+ assertEquals("init", stateful.getVectorState()); // check that state is == init state
+ assertEquals("init", nested.getVectorState()); // check that state is == init state
+ }
+
+ public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
+ InMemStateful stateful = conf.getActiveObject(InMemStateful.class);
+ InMemStatefulNested nested = conf.getActiveObject(InMemStatefulNested.class);
+ stateful.setRefState("init"); // set init state
+ nested.setRefState("init"); // set init state
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional
+ assertEquals("new state", stateful.getRefState());
+ assertEquals("new state", nested.getRefState());
+ }
+
+ public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() {
+ InMemStateful stateful = conf.getActiveObject(InMemStateful.class);
+ InMemStatefulNested nested = conf.getActiveObject(InMemStatefulNested.class);
+ stateful.setRefState("init"); // set init state
+ nested.setRefState("init"); // set init state
+ InMemFailer failer = conf.getActiveObject(InMemFailer.class);
+ try {
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method
+ fail("should have thrown an exception");
+ } catch (RuntimeException e) {
+ } // expected
+ assertEquals("init", stateful.getRefState()); // check that state is == init state
+ assertEquals("init", nested.getRefState()); // check that state is == init state
+ }
+}
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java
index 313e5a187e..f46a7e621b 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java
@@ -47,6 +47,13 @@ public class InMemStateful {
refState.swap(msg);
}
+ public void success(String key, String msg, InMemStatefulNested nested) {
+ mapState.put(key, msg);
+ vectorState.add(msg);
+ refState.swap(msg);
+ nested.success(key, msg);
+ }
+
@transactional
public String failure(String key, String msg, InMemFailer failer) {
mapState.put(key, msg);
@@ -56,6 +63,15 @@ public class InMemStateful {
return msg;
}
+ @transactional
+ public String failure(String key, String msg, InMemStatefulNested nested, InMemFailer failer) {
+ mapState.put(key, msg);
+ vectorState.add(msg);
+ refState.swap(msg);
+ nested.failure(key, msg, failer);
+ return msg;
+ }
+
@transactional
public void thisMethodHangs(String key, String msg, InMemFailer failer) {
setMapState(key, msg);
@@ -73,4 +89,4 @@ public class InMemStateful {
this.success("clash", "clash");
}
*/
-}
\ No newline at end of file
+}
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java
new file mode 100644
index 0000000000..867a20ffb2
--- /dev/null
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java
@@ -0,0 +1,75 @@
+package se.scalablesolutions.akka.api;
+
+import se.scalablesolutions.akka.annotation.transactional;
+import se.scalablesolutions.akka.kernel.state.*;
+
+public class InMemStatefulNested {
+ private TransactionalState factory = new TransactionalState();
+ private TransactionalMap mapState = factory.newInMemoryMap();
+ private TransactionalVector vectorState = factory.newInMemoryVector();
+ private TransactionalRef refState = factory.newInMemoryRef();
+
+ @transactional
+ public String getMapState(String key) {
+ return (String)mapState.get(key).get();
+ }
+
+ @transactional
+ public String getVectorState() {
+ return (String)vectorState.last();
+ }
+
+ @transactional
+ public String getRefState() {
+ return (String)refState.get().get();
+ }
+
+ @transactional
+ public void setMapState(String key, String msg) {
+ mapState.put(key, msg);
+ }
+
+ @transactional
+ public void setVectorState(String msg) {
+ vectorState.add(msg);
+ }
+
+ @transactional
+ public void setRefState(String msg) {
+ refState.swap(msg);
+ }
+
+ @transactional
+ public void success(String key, String msg) {
+ mapState.put(key, msg);
+ vectorState.add(msg);
+ refState.swap(msg);
+ }
+
+ @transactional
+ public String failure(String key, String msg, InMemFailer failer) {
+ mapState.put(key, msg);
+ vectorState.add(msg);
+ refState.swap(msg);
+ failer.fail();
+ return msg;
+ }
+
+ @transactional
+ public void thisMethodHangs(String key, String msg, InMemFailer failer) {
+ setMapState(key, msg);
+ }
+
+ /*
+ public void clashOk(String key, String msg, InMemClasher clasher) {
+ mapState.put(key, msg);
+ clasher.clash();
+ }
+
+ public void clashNotOk(String key, String msg, InMemClasher clasher) {
+ mapState.put(key, msg);
+ clasher.clash();
+ this.success("clash", "clash");
+ }
+ */
+}
\ No newline at end of file
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java
new file mode 100644
index 0000000000..dbdce4cb83
--- /dev/null
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java
@@ -0,0 +1,115 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.api;
+
+import se.scalablesolutions.akka.kernel.config.*;
+import static se.scalablesolutions.akka.kernel.config.JavaConfig.*;
+import se.scalablesolutions.akka.kernel.actor.*;
+import se.scalablesolutions.akka.kernel.Kernel;
+
+import junit.framework.TestCase;
+
+public class PersistentNestedStateTest extends TestCase {
+ static String messageLog = "";
+
+ final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
+ final private ActiveObjectFactory factory = new ActiveObjectFactory();
+
+ static {
+ System.setProperty("storage-config", "config");
+ Kernel.startCassandra();
+ }
+
+ protected void setUp() {
+ conf.configureActiveObjects(
+ new RestartStrategy(new AllForOne(), 3, 5000),
+ new Component[]{
+ // FIXME: remove string-name, add ctor to only accept target class
+ new Component(PersistentStateful.class, new LifeCycle(new Permanent(), 1000), 10000000),
+ new Component(PersistentStatefulNested.class, new LifeCycle(new Permanent(), 1000), 10000000),
+ new Component(PersistentFailer.class, new LifeCycle(new Permanent(), 1000), 1000)
+ //new Component("inmem-clasher", InMemClasher.class, InMemClasherImpl.class, new LifeCycle(new Permanent(), 1000), 100000)
+ }).inject().supervise();
+ }
+
+ protected void tearDown() {
+ conf.stop();
+ }
+
+ public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
+ PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class);
+ PersistentStatefulNested nested = conf.getActiveObject(PersistentStatefulNested.class);
+ stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
+ nested.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional
+ assertEquals("new state", nested.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
+ assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
+ }
+
+ public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() {
+ PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class);
+ stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
+ PersistentStatefulNested nested = conf.getActiveObject(PersistentStatefulNested.class);
+ nested.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
+ PersistentFailer failer = conf.getActiveObject(PersistentFailer.class);
+ try {
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method
+ fail("should have thrown an exception");
+ } catch (RuntimeException e) {
+ } // expected
+ assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state
+ assertEquals("init", nested.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state
+ }
+
+ public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
+ PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class);
+ stateful.setVectorState("init"); // set init state
+ PersistentStatefulNested nested = conf.getActiveObject(PersistentStatefulNested.class);
+ nested.setVectorState("init"); // set init state
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional
+ assertEquals("new state", stateful.getVectorState(0));
+ assertEquals("new state", nested.getVectorState(0));
+ }
+
+ public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() {
+ PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class);
+ stateful.setVectorState("init"); // set init state
+ PersistentStatefulNested nested = conf.getActiveObject(PersistentStatefulNested.class);
+ nested.setVectorState("init"); // set init state
+ PersistentFailer failer = conf.getActiveObject(PersistentFailer.class);
+ try {
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method
+ fail("should have thrown an exception");
+ } catch (RuntimeException e) {
+ } // expected
+ assertEquals("init", stateful.getVectorState(0)); // check that state is == init state
+ assertEquals("init", nested.getVectorState(0)); // check that state is == init state
+ }
+
+ public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
+ PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class);
+ PersistentStatefulNested nested = conf.getActiveObject(PersistentStatefulNested.class);
+ stateful.setRefState("init"); // set init state
+ nested.setRefState("init"); // set init state
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional
+ assertEquals("new state", stateful.getRefState());
+ assertEquals("new state", nested.getRefState());
+ }
+
+ public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() {
+ PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class);
+ PersistentStatefulNested nested = conf.getActiveObject(PersistentStatefulNested.class);
+ stateful.setRefState("init"); // set init state
+ nested.setRefState("init"); // set init state
+ PersistentFailer failer = conf.getActiveObject(PersistentFailer.class);
+ try {
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method
+ fail("should have thrown an exception");
+ } catch (RuntimeException e) {
+ } // expected
+ assertEquals("init", stateful.getRefState()); // check that state is == init state
+ assertEquals("init", nested.getRefState()); // check that state is == init state
+ }
+}
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java
index e9492ee9e7..b3729728ef 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java
@@ -58,6 +58,23 @@ public class PersistentStateful {
return msg;
}
+ public void success(String key, String msg, PersistentStatefulNested nested) {
+ mapState.put(key, msg);
+ vectorState.add(msg);
+ refState.swap(msg);
+ nested.success(key, msg);
+ }
+
+ @transactional
+ public String failure(String key, String msg, PersistentStatefulNested nested, PersistentFailer failer) {
+ mapState.put(key, msg);
+ vectorState.add(msg);
+ refState.swap(msg);
+ nested.failure(key, msg, failer);
+ return msg;
+ }
+
+
@transactional
public void thisMethodHangs(String key, String msg, PersistentFailer failer) {
setMapState(key, msg);
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java
new file mode 100644
index 0000000000..6b8ba10863
--- /dev/null
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java
@@ -0,0 +1,66 @@
+package se.scalablesolutions.akka.api;
+
+import se.scalablesolutions.akka.kernel.state.*;
+import se.scalablesolutions.akka.annotation.transactional;
+import se.scalablesolutions.akka.annotation.state;
+
+public class PersistentStatefulNested {
+ private TransactionalState factory = new TransactionalState();
+ private TransactionalMap mapState = factory.newPersistentMap(new CassandraStorageConfig());
+ private TransactionalVector vectorState = factory.newPersistentVector(new CassandraStorageConfig());;
+ private TransactionalRef refState = factory.newPersistentRef(new CassandraStorageConfig());
+
+ @transactional
+ public String getMapState(String key) {
+ return (String) mapState.get(key).get();
+ }
+
+ @transactional
+ public String getVectorState(int index) {
+ return (String) vectorState.get(index);
+ }
+
+ @transactional
+ public String getRefState() {
+ if (refState.isDefined()) {
+ return (String) refState.get().get();
+ } else throw new IllegalStateException("No such element");
+ }
+
+ @transactional
+ public void setMapState(String key, String msg) {
+ mapState.put(key, msg);
+ }
+
+ @transactional
+ public void setVectorState(String msg) {
+ vectorState.add(msg);
+ }
+
+ @transactional
+ public void setRefState(String msg) {
+ refState.swap(msg);
+ }
+
+ @transactional
+ public void success(String key, String msg) {
+ mapState.put(key, msg);
+ vectorState.add(msg);
+ refState.swap(msg);
+ }
+
+ @transactional
+ public String failure(String key, String msg, PersistentFailer failer) {
+ mapState.put(key, msg);
+ vectorState.add(msg);
+ refState.swap(msg);
+ failer.fail();
+ return msg;
+ }
+
+ @transactional
+ public void thisMethodHangs(String key, String msg, PersistentFailer failer) {
+ setMapState(key, msg);
+ }
+}
+
diff --git a/kernel/src/main/scala/actor/ActiveObject.scala b/kernel/src/main/scala/actor/ActiveObject.scala
index 2e74768912..87bc55727c 100644
--- a/kernel/src/main/scala/actor/ActiveObject.scala
+++ b/kernel/src/main/scala/actor/ActiveObject.scala
@@ -161,6 +161,7 @@ object ActiveObject {
decrementTransaction
if (isTransactionAborted) removeTransactionIfTopLevel
else tryToPrecommitTransaction
+ TransactionManagement.threadBoundTx.set(None)
}
}
diff --git a/kernel/src/main/scala/actor/Actor.scala b/kernel/src/main/scala/actor/Actor.scala
index e9d96e57c2..12923cd849 100644
--- a/kernel/src/main/scala/actor/Actor.scala
+++ b/kernel/src/main/scala/actor/Actor.scala
@@ -314,12 +314,9 @@ trait Actor extends Logging with TransactionManagement {
private def transactionalDispatch[T](message: AnyRef, timeout: Long, blocking: Boolean): Option[T] = {
// FIXME join TX with same id, do not COMMIT
- println("------ Actor1: " + this)
tryToCommitTransaction
- println("------ Actor2: " + this)
if (isInExistingTransaction) joinExistingTransaction
else if (isTransactional) startNewTransaction
- println("------ Actor3: " + this)
incrementTransaction
try {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
diff --git a/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala b/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala
deleted file mode 100644
index 6584e1e867..0000000000
--- a/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Copyright (C) 2009 Scalable Solutions.
- */
-
-/**
- * Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].
- * See also this article: [http://today.java.net/cs/user/print/a/350].
- *
- * Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
- */
-package se.scalablesolutions.akka.kernel.reactor
-
-class ThreadBasedDispatcher extends MessageDispatcherBase {
- def start = if (!active) {
- active = true
- val messageDemultiplexer = new EventBasedSingleThreadDemultiplexer(messageQueue)
- selectorThread = new Thread {
- override def run = {
- while (active) {
- guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf]
- try {
- messageDemultiplexer.select
- } catch { case e: InterruptedException => active = false }
- val queue = messageDemultiplexer.acquireSelectedQueue
- for (index <- 0 until queue.size) {
- val handle = queue.remove
- val handler = messageHandlers.get(handle.sender)
- if (handler != null) handler.handle(handle)
- }
- }
- }
- }
- selectorThread.start
- }
-}
-
-class ThreadBasedDemultiplexer(private val messageQueue: MessageQueue) extends MessageDemultiplexer {
- import java.util.{LinkedList, Queue}
-
- private val selectedQueue: Queue[MessageHandle] = new LinkedList[MessageHandle]
-
- def select = messageQueue.read(selectedQueue)
-
- def acquireSelectedQueue: Queue[MessageHandle] = selectedQueue
-
- def releaseSelectedQueue = throw new UnsupportedOperationException("ThreadBasedDemultiplexer can't release its queue")
-
- def wakeUp = throw new UnsupportedOperationException("ThreadBasedDemultiplexer can't be woken up")
-}
diff --git a/kernel/src/main/scala/state/State.scala b/kernel/src/main/scala/state/State.scala
index 0749fbe2bd..43fa83a629 100644
--- a/kernel/src/main/scala/state/State.scala
+++ b/kernel/src/main/scala/state/State.scala
@@ -173,6 +173,8 @@ abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
// ---- For scala.collection.mutable.Map ----
override def put(key: K, value: V): Option[V] = {
+ println("--------- MAP.PUT " + uuid + " " + key + " " + value)
+
changeSet += key -> value
None // always return None to speed up writes (else need to go to DB to get
}
@@ -195,6 +197,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
// FIXME: should use batch function once the bug is resolved
for (entry <- changeSet) {
val (key, value) = entry
+ println("--------- COMMIT " + uuid + " " + key + " " + value)
CassandraNode.insertMapStorageEntryFor(uuid, key, value)
}
}
@@ -205,7 +208,12 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
override def size: Int = CassandraNode.getMapStorageSizeFor(uuid)
// ---- For scala.collection.mutable.Map ----
- override def get(key: String): Option[AnyRef] = CassandraNode.getMapStorageEntryFor(uuid, key)
+ override def get(key: String): Option[AnyRef] = {
+ val result = CassandraNode.getMapStorageEntryFor(uuid, key)
+ println("--------- MAP.GET " + result + " " + uuid + " " + key)
+ result
+ }
+
override def elements: Iterator[Tuple2[String, AnyRef]] = {
new Iterator[Tuple2[String, AnyRef]] {
private val originalList: List[Tuple2[String, AnyRef]] = CassandraNode.getMapStorageFor(uuid)
diff --git a/kernel/src/main/scala/stm/TransactionManagement.scala b/kernel/src/main/scala/stm/TransactionManagement.scala
index ad412cb9c1..5a02ffbfe6 100644
--- a/kernel/src/main/scala/stm/TransactionManagement.scala
+++ b/kernel/src/main/scala/stm/TransactionManagement.scala
@@ -14,13 +14,13 @@ class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Tran
override def toString(): String = "TransactionAwareWrapperException[" + cause + ", " + tx + "]"
}
-object TransactionManagement {
+object TransactionManagement {
private val txEnabled = new AtomicBoolean(true)
-
+
def isTransactionsEnabled = txEnabled.get
def enableTransactions = txEnabled.set(true)
- private[kernel] lazy val threadBoundTx: ThreadLocal[Option[Transaction]] = {
+ private[kernel] val threadBoundTx: ThreadLocal[Option[Transaction]] = {
val tl = new ThreadLocal[Option[Transaction]]
tl.set(None)
tl
@@ -30,18 +30,14 @@ object TransactionManagement {
// FIXME: STM that allows concurrent updates, detects collision, rolls back and restarts
trait TransactionManagement extends Logging {
val transactionalInstance: AnyRef
-
+
private lazy val changeSet = new ChangeSet(transactionalInstance.getClass.getName)
import TransactionManagement.threadBoundTx
private[kernel] var activeTx: Option[Transaction] = None
protected def startNewTransaction = {
- val (maps, vectors, refs) = getTransactionalItemsFor(transactionalInstance)
- changeSet.maps = maps
- changeSet.refs = refs
- changeSet.vectors = vectors
-
+ storeTransactionalItemsFor(transactionalInstance)
val newTx = new Transaction
newTx.begin(changeSet)
val tx = Some(newTx)
@@ -52,6 +48,7 @@ trait TransactionManagement extends Logging {
protected def joinExistingTransaction = {
val cflowTx = threadBoundTx.get
if (activeTx.isDefined && cflowTx.isDefined && activeTx.get.id == cflowTx.get.id) {
+ storeTransactionalItemsFor(transactionalInstance)
val currentTx = cflowTx.get
currentTx.join(changeSet)
activeTx = Some(currentTx)
@@ -102,8 +99,7 @@ trait TransactionManagement extends Logging {
/**
* Search for transactional items for a specific target instance, crawl the class hierarchy recursively up to the top.
*/
- protected def getTransactionalItemsFor(targetInstance: AnyRef):
- Tuple3[List[TransactionalMap[_, _]], List[TransactionalVector[_]], List[TransactionalRef[_]]] = {
+ protected def storeTransactionalItemsFor(targetInstance: AnyRef) = {
require(targetInstance != null)
var maps: List[TransactionalMap[_, _]] = Nil
var refs: List[TransactionalRef[_]] = Nil
@@ -134,7 +130,10 @@ trait TransactionManagement extends Logging {
}
// start the search for transactional items, crawl the class hierarchy up until we reach Object
- getTransactionalItemsFor(targetInstance.getClass)
+ val (m, v, r) = getTransactionalItemsFor(targetInstance.getClass)
+ changeSet.maps = m
+ changeSet.vectors = v
+ changeSet.refs = r
}
/*