diff --git a/akka.ipr b/akka.ipr
index 10621d8129..46fa8b50b5 100644
--- a/akka.ipr
+++ b/akka.ipr
@@ -9,19 +9,19 @@
diff --git a/akka.iws b/akka.iws
index 688a8ac498..6af29de35c 100644
--- a/akka.iws
+++ b/akka.iws
@@ -1,17 +1,51 @@
-
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -33,7 +67,7 @@
-
+
@@ -46,150 +80,7 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
@@ -265,82 +156,66 @@
-
-
+
+
-
+
-
-
+
+
-
+
+
+
+
+
+
+
+
+
+
+
+
-
-
+
+
-
+
-
-
+
+
-
+
-
-
+
+
-
+
-
-
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
@@ -359,28 +234,40 @@
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
@@ -439,24 +326,10 @@
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
@@ -464,12 +337,12 @@
-
+
-
-
+
+
@@ -478,19 +351,57 @@
-
+
-
-
+
+
-
-
+
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -503,226 +414,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -790,96 +481,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -910,66 +511,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -1000,36 +541,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -1081,6 +592,40 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -1135,10 +680,6 @@
-
-
-
-
@@ -1174,108 +715,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -1340,7 +779,7 @@
-
+
@@ -1349,566 +788,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -1943,7 +822,7 @@
-
+
@@ -1956,7 +835,7 @@
-
+
@@ -1993,6 +872,76 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -2032,6 +981,17 @@
+
+
+
+
+
+
+
+
+
+
+
@@ -2155,12 +1115,12 @@
-
+
-
+
@@ -2188,7 +1148,7 @@
-
+
@@ -2448,7 +1408,7 @@
-
+
@@ -2476,7 +1436,7 @@
-
+
@@ -2505,6 +1465,34 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -2705,7 +1693,7 @@
-
+
@@ -2839,31 +1827,34 @@
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
localhost
@@ -2940,21 +1931,21 @@
-
+
-
+
-
+
-
+
-
-
+
+
@@ -3007,114 +1998,109 @@
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
+
+
+
+
+
+
+
+
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/AllTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/AllTest.java
new file mode 100644
index 0000000000..ede7cd0f02
--- /dev/null
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/AllTest.java
@@ -0,0 +1,24 @@
+package se.scalablesolutions.akka.api;
+
+import junit.framework.TestCase;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+public class AllTest extends TestCase {
+ public static Test suite() {
+ TestSuite suite = new TestSuite("All Java tests");
+ suite.addTestSuite(InMemoryStateTest.class);
+ suite.addTestSuite(InMemNestedStateTest.class);
+ suite.addTestSuite(PersistentStateTest.class);
+ suite.addTestSuite(PersistentNestedStateTest.class);
+ suite.addTestSuite(RemoteInMemoryStateTest.class);
+ suite.addTestSuite(RemotePersistentStateTest.class);
+ suite.addTestSuite(ActiveObjectGuiceConfiguratorTest.class);
+ //suite.addTestSuite(RestTest.class);
+ return suite;
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+}
\ No newline at end of file
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/AllTests.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/AllTests.java
deleted file mode 100644
index 866e4f53c0..0000000000
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/AllTests.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package se.scalablesolutions.akka.api;
-
-import junit.framework.TestCase;
-import junit.framework.Test;
-import junit.framework.TestSuite;
-
-public class AllTests extends TestCase {
- public static Test suite() {
-
- TestSuite suite = new TestSuite("All tests");
- // Java tests
- suite.addTestSuite(InMemoryStateTest.class);
- suite.addTestSuite(InMemNestedStateTest.class);
- suite.addTestSuite(PersistentStateTest.class);
- suite.addTestSuite(PersistentNestedStateTest.class);
- suite.addTestSuite(RemoteInMemoryStateTest.class);
- suite.addTestSuite(RemotePersistentStateTest.class);
- suite.addTestSuite(ActiveObjectGuiceConfiguratorTest.class);
- suite.addTestSuite(RestTest.class);
-
- // Scala tests
- //suite.addTestSuite(se.scalablesolutions.akka.kernel.SupervisorSpec.class);
- /*
- suite.addTestSuite(se.scalablesolutions.akka.kernel.RemoteSupervisorSpec.class);
- suite.addTestSuite(se.scalablesolutions.akka.kernel.reactor.EventBasedDispatcherTest.class);
- suite.addTestSuite(se.scalablesolutions.akka.kernel.reactor.ThreadBasedDispatcherTest.class);
- suite.addTestSuite(se.scalablesolutions.akka.kernel.actor.ActorSpec.class);
- suite.addTestSuite(se.scalablesolutions.akka.kernel.actor.RemoteActorSpec.class);
- suite.addTestSuite(se.scalablesolutions.akka.kernel.actor.InMemStatefulActor.class);
- suite.addTestSuite(se.scalablesolutions.akka.kernel.actor.PersistentActor.class);
- */
- return suite;
- }
-
- public static void main(String[] args) {
- junit.textui.TestRunner.run(suite());
- }
-}
\ No newline at end of file
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java
new file mode 100644
index 0000000000..8e498368d5
--- /dev/null
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java
@@ -0,0 +1,13 @@
+package se.scalablesolutions.akka.api;
+
+public class PersistenceManager {
+ private static volatile boolean isRunning = false;
+ public static void init() {
+ if (!isRunning) {
+ se.scalablesolutions.akka.kernel.Kernel.config();
+ se.scalablesolutions.akka.kernel.Kernel.startCassandra();
+ se.scalablesolutions.akka.kernel.Kernel.startRemoteService();
+ isRunning = true;
+ }
+ }
+}
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java
index 482478aa7c..9afaf8b625 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java
@@ -17,13 +17,8 @@ public class PersistentNestedStateTest extends TestCase {
final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
final private ActiveObjectFactory factory = new ActiveObjectFactory();
- static {
- se.scalablesolutions.akka.kernel.Kernel$.MODULE$.config();
- System.setProperty("storage-config", "config");
- Kernel.startCassandra();
- }
-
protected void setUp() {
+ PersistenceManager.init();
conf.configureActiveObjects(
new RestartStrategy(new AllForOne(), 3, 5000),
new Component[]{
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java
index 2668e980b7..ee1051ee8f 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java
@@ -14,14 +14,10 @@ import junit.framework.TestCase;
public class PersistentStateTest extends TestCase {
static String messageLog = "";
- static {
- se.scalablesolutions.akka.kernel.Kernel$.MODULE$.config();
- System.setProperty("storage-config", "config");
- Kernel.startCassandra();
- }
final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
protected void setUp() {
+ PersistenceManager.init();
conf.configureActiveObjects(
new RestartStrategy(new AllForOne(), 3, 5000),
new Component[] {
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java
index 3975c0366e..eb8fefb89e 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java
@@ -52,7 +52,6 @@ public class RemoteInMemoryStateTest extends TestCase {
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
stateful.setVectorState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit
assertEquals("new state", stateful.getVectorState());
}
@@ -72,7 +71,6 @@ public class RemoteInMemoryStateTest extends TestCase {
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
stateful.setRefState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit
assertEquals("new state", stateful.getRefState());
}
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java
index e688ff0dd8..a0dda0acf3 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java
@@ -14,15 +14,10 @@ import junit.framework.TestCase;
public class RemotePersistentStateTest extends TestCase {
static String messageLog = "";
- static {
- Kernel.startCassandra();
- Kernel.startRemoteService();
- }
final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
- final private ActiveObjectFactory factory = new ActiveObjectFactory();
-
protected void setUp() {
+ PersistenceManager.init();
conf.configureActiveObjects(
new RestartStrategy(new AllForOne(), 3, 5000),
new Component[] {
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java
index 5b7de680a4..230b95899b 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import se.scalablesolutions.akka.kernel.config.*;
import static se.scalablesolutions.akka.kernel.config.JavaConfig.*;
+/*
public class RestTest extends TestCase {
private static int PORT = 9998;
@@ -36,8 +37,8 @@ public class RestTest extends TestCase {
private static ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
@BeforeClass
- public static void initialize() throws IOException {
- se.scalablesolutions.akka.kernel.Kernel$.MODULE$.config();
+ protected void setUp() {
+ se.scalablesolutions.akka.kernel.Kernel$.MODULE$.config();
conf.configureActiveObjects(
new RestartStrategy(new AllForOne(), 3, 5000),
new Component[] {
@@ -50,8 +51,8 @@ public class RestTest extends TestCase {
}
@Test
- public void simpleRequest() throws IOException, InstantiationException {
- //selector.start();
+ public void testSimpleRequest() throws IOException, InstantiationException {
+ selector.listen();
Client client = Client.create();
WebResource webResource = client.resource(URI);
String responseMsg = webResource.path("/foo").get(String.class);
@@ -79,7 +80,7 @@ public class RestTest extends TestCase {
selectorThread.setAlgorithmClassName(StaticStreamAlgorithm.class.getName());
selectorThread.setPort(port);
selectorThread.setAdapter(adapter);
- selectorThread.listen();
return selectorThread;
}
}
+*/
diff --git a/kernel/pom.xml b/kernel/pom.xml
index 78a63ffd3c..52630c8907 100644
--- a/kernel/pom.xml
+++ b/kernel/pom.xml
@@ -245,16 +245,6 @@
-
diff --git a/kernel/src/main/resources/aop.xml b/kernel/src/main/resources/aop.xml
new file mode 100644
index 0000000000..fe844fa481
--- /dev/null
+++ b/kernel/src/main/resources/aop.xml
@@ -0,0 +1,9 @@
+
+
+
+
+
+
+
diff --git a/kernel/src/main/scala/actor/ActiveObject.scala b/kernel/src/main/scala/actor/ActiveObject.scala
index 858d8cec0e..fbadeb6a38 100644
--- a/kernel/src/main/scala/actor/ActiveObject.scala
+++ b/kernel/src/main/scala/actor/ActiveObject.scala
@@ -80,7 +80,7 @@ class ActiveObjectFactory {
ActiveObject.newInstance(intf, target, actor, remoteAddress, timeout)
}
- private[kernel] def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor =
+ private[kernel] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor =
ActiveObject.supervise(restartStrategy, components)
/*
@@ -160,7 +160,7 @@ object ActiveObject {
proxy.asInstanceOf[T]
}
- private[kernel] def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor = {
+ private[kernel] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor = {
object factory extends SupervisorFactory {
override def getSupervisorConfig = SupervisorConfig(restartStrategy, components)
}
@@ -176,7 +176,7 @@ object ActiveObject {
@serializable
sealed class ActorAroundAdvice(val target: Class[_],
val targetInstance: AnyRef,
- val actor: Dispatcher,
+ val actor: Dispatcher,
val remoteAddress: Option[InetSocketAddress],
val timeout: Long) extends AroundAdvice {
val id = target.getName
@@ -293,11 +293,11 @@ private[kernel] class Dispatcher extends Actor {
}
override protected def preRestart(reason: AnyRef, config: Option[AnyRef]) {
- if (preRestart.isDefined) preRestart.get.invoke(target.get, ZERO_ITEM_CLASS_ARRAY)
+ if (preRestart.isDefined) preRestart.get.invoke(target.get, ZERO_ITEM_CLASS_ARRAY: _*)
}
override protected def postRestart(reason: AnyRef, config: Option[AnyRef]) {
- if (postRestart.isDefined) postRestart.get.invoke(target.get, ZERO_ITEM_CLASS_ARRAY)
+ if (postRestart.isDefined) postRestart.get.invoke(target.get, ZERO_ITEM_CLASS_ARRAY: _*)
}
}
diff --git a/kernel/src/main/scala/actor/Actor.scala b/kernel/src/main/scala/actor/Actor.scala
index fb05b02843..c245d6af7d 100644
--- a/kernel/src/main/scala/actor/Actor.scala
+++ b/kernel/src/main/scala/actor/Actor.scala
@@ -30,8 +30,8 @@ object DispatcherType {
case object ThreadBasedDispatcher extends DispatcherType
}
-class ActorMessageHandler(val actor: Actor) extends MessageHandler {
- def handle(handle: MessageHandle) = actor.handle(handle)
+class ActorMessageInvoker(val actor: Actor) extends MessageInvoker {
+ def invoke(handle: MessageInvocation) = actor.invoke(handle)
}
object Actor {
@@ -53,9 +53,6 @@ trait Actor extends Logging with TransactionManagement {
protected[this] val linkedActors = new CopyOnWriteArraySet[Actor]
protected[actor] var lifeCycleConfig: Option[LifeCycle] = None
- protected[this] var latestMessage: Option[MessageHandle] = None
- protected[this] var messageToReschedule: Option[MessageHandle] = None
-
// ====================================
// ==== USER CALLBACKS TO OVERRIDE ====
// ====================================
@@ -89,7 +86,7 @@ trait Actor extends Logging with TransactionManagement {
protected[kernel] var dispatcher: MessageDispatcher = {
val dispatcher = new EventBasedThreadPoolDispatcher
mailbox = dispatcher.messageQueue
- dispatcher.registerHandler(this, new ActorMessageHandler(this))
+ dispatcher.registerHandler(this, new ActorMessageInvoker(this))
dispatcher
}
@@ -202,10 +199,9 @@ trait Actor extends Logging with TransactionManagement {
/**
* Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
*/
- def !(message: AnyRef): Unit = if (isRunning) {
- if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(message, timeout, false, true)
- else postMessageToMailbox(message)
- } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
+ def !(message: AnyRef): Unit =
+ if (isRunning) postMessageToMailbox(message)
+ else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
/**
* Sends a message asynchronously and waits on a future for a reply message.
@@ -217,13 +213,9 @@ trait Actor extends Logging with TransactionManagement {
* If not then the sender will unessecary block until the timeout expires.
*/
def !: Option[T] = if (isRunning) {
- if (TransactionManagement.isTransactionalityEnabled) {
- transactionalDispatch(message, timeout, false, false)
- } else {
- val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
- future.await
- getResultOrThrowException(future)
- }
+ val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
+ future.await
+ getResultOrThrowException(future)
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
/**
@@ -242,13 +234,9 @@ trait Actor extends Logging with TransactionManagement {
* E.g. send-and-receive-eventually semantics.
*/
def !?[T](message: AnyRef): T = if (isRunning) {
- if (TransactionManagement.isTransactionalityEnabled) {
- transactionalDispatch(message, 0, true, false).get
- } else {
- val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, 0)
- future.awaitBlocking
- getResultOrThrowException(future).get
- }
+ val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, 0)
+ future.awaitBlocking
+ getResultOrThrowException(future).get
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
/**
@@ -395,7 +383,7 @@ trait Actor extends Logging with TransactionManagement {
val supervisorUuid = registerSupervisorAsRemoteActor
RemoteClient.clientFor(remoteAddress.get).send(new RemoteRequest(true, message, null, this.getClass.getName, timeout, null, true, false, supervisorUuid))
} else {
- val handle = new MessageHandle(this, message, None, activeTx)
+ val handle = new MessageInvocation(this, message, None, TransactionManagement.threadBoundTx.get)
mailbox.append(handle)
latestMessage = Some(handle)
}
@@ -409,87 +397,32 @@ trait Actor extends Logging with TransactionManagement {
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
} else {
val future = new DefaultCompletableFutureResult(timeout)
- val handle = new MessageHandle(this, message, Some(future), TransactionManagement.threadBoundTx.get)
+ val handle = new MessageInvocation(this, message, Some(future), TransactionManagement.threadBoundTx.get)
mailbox.append(handle)
latestMessage = Some(handle)
future
}
}
- private def transactionalDispatch[T](message: AnyRef, timeout: Long, blocking: Boolean, oneWay: Boolean): Option[T] = {
- import TransactionManagement._
- if (!tryToCommitTransaction) {
- var nrRetries = 0 // FIXME only if top-level
- var failed = true
- do {
- Thread.sleep(TIME_WAITING_FOR_COMPLETION)
- nrRetries += 1
- log.debug("Pending transaction [%s] not completed, waiting %s milliseconds. Attempt %s", activeTx.get, TIME_WAITING_FOR_COMPLETION, nrRetries)
- failed = !tryToCommitTransaction
- } while(nrRetries < NR_OF_TIMES_WAITING_FOR_COMPLETION && failed)
- if (failed) {
- log.debug("Pending transaction [%s] still not completed, aborting and rescheduling message [%s]", activeTx.get, latestMessage)
- rollback(activeTx)
- if (RESTART_TRANSACTION_ON_COLLISION) messageToReschedule = Some(latestMessage.get)
- else throw new TransactionRollbackException("Conflicting transactions, rolling back transaction for message [" + latestMessage + "]")
- }
- }
- if (isInExistingTransaction) joinExistingTransaction
- else if (isTransactional) startNewTransaction
- incrementTransaction
- try {
- if (oneWay) {
- postMessageToMailbox(message)
- None
- } else {
- val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
- if (blocking) future.awaitBlocking
- else future.await
- getResultOrThrowException(future)
- }
- } catch {
- case e: TransactionAwareWrapperException =>
- e.cause.printStackTrace
- rollback(e.tx)
- throw e.cause
- } finally {
- decrementTransaction
- if (isTransactionAborted) removeTransactionIfTopLevel
- else tryToPrecommitTransaction
- TransactionManagement.threadBoundTx.set(None)
- if (messageToReschedule.isDefined) {
- val handle = messageToReschedule.get
- val newTx = startNewTransaction
- val clone = new MessageHandle(handle.sender, handle.message, handle.future, newTx)
- log.debug("Rescheduling message %s", clone)
- mailbox.append(clone) // FIXME append or prepend rescheduled messages?
- }
- }
- }
-
- private def getResultOrThrowException[T](future: FutureResult): Option[T] =
- if (future.exception.isDefined) {
- val (_, cause) = future.exception.get
- if (TransactionManagement.isTransactionalityEnabled) throw new TransactionAwareWrapperException(cause, activeTx)
- else throw cause
- } else {
- future.result.asInstanceOf[Option[T]]
- }
-
/**
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods
*/
- private[kernel] def handle(messageHandle: MessageHandle) = synchronized {
+ private[kernel] def invoke(messageHandle: MessageInvocation) = synchronized {
+ if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
+ else dispatch(messageHandle)
+ }
+
+ private def dispatch[T](messageHandle: MessageInvocation) = {
+ if (messageHandle.tx.isDefined) TransactionManagement.threadBoundTx.set(messageHandle.tx)
val message = messageHandle.message
val future = messageHandle.future
try {
- if (messageHandle.tx.isDefined) TransactionManagement.threadBoundTx.set(messageHandle.tx)
senderFuture = future
if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString)
} catch {
case e =>
- // FIXME to fix supervisor restart of actor for oneway calls, inject a supervisor proxy that can send notification back to client
+ // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
if (supervisor.isDefined) supervisor.get ! Exit(this, e)
if (future.isDefined) future.get.completeWithException(this, e)
else e.printStackTrace
@@ -498,6 +431,51 @@ trait Actor extends Logging with TransactionManagement {
}
}
+ private def transactionalDispatch[T](messageHandle: MessageInvocation) = {
+ if (messageHandle.tx.isDefined) TransactionManagement.threadBoundTx.set(messageHandle.tx)
+ val message = messageHandle.message
+ val future = messageHandle.future
+ try {
+ if (!tryToCommitTransaction && isTransactionTopLevel) handleCollision
+
+ if (isInExistingTransaction) joinExistingTransaction
+ else if (isTransactional) startNewTransaction
+
+ incrementTransaction
+ senderFuture = future
+ if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
+ else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString)
+ } catch {
+ case e =>
+ rollback(activeTx)
+ TransactionManagement.threadBoundTx.set(None) // need to clear threadBoundTx before call to supervisor
+ // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
+ if (supervisor.isDefined) supervisor.get ! Exit(this, e)
+ if (future.isDefined) future.get.completeWithException(this, e)
+ else e.printStackTrace
+ } finally {
+ decrementTransaction
+ if (isTransactionAborted) removeTransactionIfTopLevel
+ else tryToPrecommitTransaction
+ rescheduleClashedMessages
+ TransactionManagement.threadBoundTx.set(None)
+ }
+ }
+
+ private def getResultOrThrowException[T](future: FutureResult): Option[T] =
+ if (future.exception.isDefined) {
+ val (_, cause) = future.exception.get
+ throw cause
+ } else future.result.asInstanceOf[Option[T]]
+
+ private def rescheduleClashedMessages = if (messageToReschedule.isDefined) {
+ val handle = messageToReschedule.get
+ val newTx = startNewTransaction
+ val clone = new MessageInvocation(handle.sender, handle.message, handle.future, newTx)
+ log.debug("Rescheduling message %s", clone)
+ mailbox.append(clone) // FIXME append or prepend rescheduled messages?
+ }
+
private def base: PartialFunction[Any, Unit] = lifeCycle orElse (hotswap getOrElse receive)
private val lifeCycle: PartialFunction[Any, Unit] = {
@@ -564,7 +542,7 @@ trait Actor extends Logging with TransactionManagement {
private[kernel] def swapDispatcher(disp: MessageDispatcher) = {
dispatcher = disp
mailbox = dispatcher.messageQueue
- dispatcher.registerHandler(this, new ActorMessageHandler(this))
+ dispatcher.registerHandler(this, new ActorMessageInvoker(this))
}
override def toString(): String = "Actor[" + uuid + ":" + id + "]"
diff --git a/kernel/src/main/scala/actor/Supervisor.scala b/kernel/src/main/scala/actor/Supervisor.scala
index a20e4be093..38b7e78e4c 100644
--- a/kernel/src/main/scala/actor/Supervisor.scala
+++ b/kernel/src/main/scala/actor/Supervisor.scala
@@ -34,11 +34,11 @@ case class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends
* override protected def getSupervisorConfig: SupervisorConfig = {
* SupervisorConfig(
* RestartStrategy(OneForOne, 3, 10),
- * Worker(
+ * Supervise(
* myFirstActor,
* LifeCycle(Permanent, 1000))
* ::
- * Worker(
+ * Supervise(
* mySecondActor,
* LifeCycle(Permanent, 1000))
* :: Nil)
@@ -123,7 +123,7 @@ class Supervisor(handler: FaultHandlingStrategy) extends Actor with Logging {
case SupervisorConfig(_, servers) =>
servers.map(server =>
server match {
- case Worker(actor, lifecycle) =>
+ case Supervise(actor, lifecycle) =>
actor.lifeCycleConfig = Some(lifecycle)
startLink(actor)
diff --git a/kernel/src/main/scala/collection/Vector.scala b/kernel/src/main/scala/collection/Vector.scala
old mode 100755
new mode 100644
index 437f263551..9ead78e3b5
--- a/kernel/src/main/scala/collection/Vector.scala
+++ b/kernel/src/main/scala/collection/Vector.scala
@@ -300,18 +300,18 @@ class Vector[+T] private (val length: Int, shift: Int, root: Array[AnyRef], tail
case vec: Vector[T] => {
var back = length == vec.length
var i = 0
-
+
while (i < length) {
back &&= apply(i) == vec.apply(i)
i += 1
}
-
+
back
}
-
+
case _ => false
}
-
+
override def hashCode = foldLeft(0) { _ ^ _.hashCode }
}
@@ -326,7 +326,7 @@ object Vector {
private[collection] def array(elems: AnyRef*) = {
val back = new Array[AnyRef](elems.length)
Array.copy(elems, 0, back, 0, back.length)
-
+
back
}
}
diff --git a/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
index d8068975ab..fb5aa637f0 100644
--- a/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
+++ b/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
@@ -28,7 +28,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
private var supervisor: Option[Supervisor] = None
private var restartStrategy: RestartStrategy = _
private var components: List[Component] = _
- private var workers: List[Worker] = Nil
+ private var supervised: List[Supervise] = Nil
private var bindings: List[DependencyBinding] = Nil
private var configRegistry = new HashMap[Class[_], Component] // TODO is configRegistry needed?
private var activeObjectRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, Component]]
@@ -104,7 +104,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
val proxy = activeObjectFactory.newInstance(targetClass, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
- workers ::= Worker(actor, component.lifeCycle)
+ supervised ::= Supervise(actor, component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, proxy, component))
new DependencyBinding(targetClass, proxy)
}
@@ -112,14 +112,14 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
private def newDelegatingProxy(component: Component): DependencyBinding = {
val targetClass = component.intf.get
val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry
- component.target.getConstructor(Array[Class[_]]()).setAccessible(true)
+ component.target.getConstructor(Array[Class[_]](): _*).setAccessible(true)
val actor = new Dispatcher
if (component.dispatcher.isDefined) actor.swapDispatcher(component.dispatcher.get)
val remoteAddress =
if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
val proxy = activeObjectFactory.newInstance(targetClass, targetInstance, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
- workers ::= Worker(actor, component.lifeCycle)
+ supervised ::= Supervise(actor, component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, targetInstance, component))
new DependencyBinding(targetClass, proxy)
}
@@ -132,7 +132,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
override def supervise: ActiveObjectConfigurator = synchronized {
if (injector == null) inject
- supervisor = Some(activeObjectFactory.supervise(restartStrategy, workers))
+ supervisor = Some(activeObjectFactory.supervise(restartStrategy, supervised))
//camelContext.addComponent(AKKA_CAMEL_ROUTING_SCHEME, new ActiveObjectComponent(this))
//camelContext.start
supervisor.get.startSupervisor
diff --git a/kernel/src/main/scala/config/Config.scala b/kernel/src/main/scala/config/Config.scala
index 0fd2fb9c02..23e29b8e44 100644
--- a/kernel/src/main/scala/config/Config.scala
+++ b/kernel/src/main/scala/config/Config.scala
@@ -22,14 +22,16 @@ object ScalaConfig {
abstract class Scope extends ConfigElement
case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server
- case class Worker(actor: Actor, lifeCycle: LifeCycle) extends Server
+ case class Supervise(actor: Actor, lifeCycle: LifeCycle) extends Server
case class RestartStrategy(scheme: FailOverScheme, maxNrOfRetries: Int, withinTimeRange: Int) extends ConfigElement
case object AllForOne extends FailOverScheme
case object OneForOne extends FailOverScheme
- case class LifeCycle(scope: Scope, shutdownTime: Int) extends ConfigElement
+ case class LifeCycle(scope: Scope, shutdownTime: Int) extends ConfigElement {
+ def this(scope: Scope) = this(scope, 0)
+ }
case object Permanent extends Scope
case object Transient extends Scope
case object Temporary extends Scope
@@ -87,10 +89,13 @@ object JavaConfig {
def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.RestartStrategy(
scheme.transform, maxNrOfRetries, withinTimeRange)
}
+// class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val shutdownTime: Int, val callbacks: RestartCallbacks) extends ConfigElement {
class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val shutdownTime: Int) extends ConfigElement {
def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.LifeCycle(scope.transform, shutdownTime)
}
+ class RestartCallbacks(val preRestart: String, val postRestart: String)
+
abstract class Scope extends ConfigElement {
def transform: se.scalablesolutions.akka.kernel.config.ScalaConfig.Scope
}
@@ -115,7 +120,7 @@ object JavaConfig {
}
class RemoteAddress(@BeanProperty val hostname: String, @BeanProperty val port: Int)
-
+
abstract class Server extends ConfigElement
class Component(@BeanProperty val intf: Class[_],
@BeanProperty val target: Class[_],
@@ -150,8 +155,8 @@ object JavaConfig {
se.scalablesolutions.akka.kernel.config.ScalaConfig.Component(intf, target, lifeCycle.transform, timeout, dispatcher,
if (remoteAddress != null) se.scalablesolutions.akka.kernel.config.ScalaConfig.RemoteAddress(remoteAddress.hostname, remoteAddress.port) else null)
- def newWorker(actor: Actor) =
- se.scalablesolutions.akka.kernel.config.ScalaConfig.Worker(actor, lifeCycle.transform)
+ def newSupervised(actor: Actor) =
+ se.scalablesolutions.akka.kernel.config.ScalaConfig.Supervise(actor, lifeCycle.transform)
}
}
\ No newline at end of file
diff --git a/kernel/src/main/scala/config/Configuration.scala b/kernel/src/main/scala/config/Configuration.scala
old mode 100755
new mode 100644
index 906f33cf01..d4e63123dc
--- a/kernel/src/main/scala/config/Configuration.scala
+++ b/kernel/src/main/scala/config/Configuration.scala
@@ -55,6 +55,6 @@ class Component(@BeanProperty val intf: Class[_],
@BeanProperty val target: Class[_],
@BeanProperty val lifeCycle: LifeCycle,
@BeanProperty val timeout: Int) extends Server {
- def newWorker(proxy: ActiveObjectProxy) = se.scalablesolutions.akka.kernel.Worker(proxy.server, lifeCycle.transform)
+ def newWorker(proxy: ActiveObjectProxy) = se.scalablesolutions.akka.kernel.Supervise(proxy.server, lifeCycle.transform)
}
*/
\ No newline at end of file
diff --git a/kernel/src/main/scala/nio/RemoteServer.scala b/kernel/src/main/scala/nio/RemoteServer.scala
index 5a23f3ea50..8ee7e57dda 100644
--- a/kernel/src/main/scala/nio/RemoteServer.scala
+++ b/kernel/src/main/scala/nio/RemoteServer.scala
@@ -123,12 +123,12 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
val argClasses = args.map(_.getClass)
val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClasses, request.timeout)
- continueTransaction(request)
+ //continueTransaction(request)
try {
- val messageReceiver = activeObject.getClass.getDeclaredMethod(request.method, unescapedArgClasses)
- if (request.isOneWay) messageReceiver.invoke(activeObject, unescapedArgs)
+ val messageReceiver = activeObject.getClass.getDeclaredMethod(request.method, unescapedArgClasses: _*)
+ if (request.isOneWay) messageReceiver.invoke(activeObject, unescapedArgs: _*)
else {
- val result = messageReceiver.invoke(activeObject, unescapedArgs)
+ val result = messageReceiver.invoke(activeObject, unescapedArgs: _*)
log.debug("Returning result from remote active object invocation [%s]", result)
//channel.write(request.newReplyWithMessage(result, TransactionManagement.threadBoundTx.get))
channel.write(request.newReplyWithMessage(result, null))
diff --git a/kernel/src/main/scala/reactor/Dispatchers.scala b/kernel/src/main/scala/reactor/Dispatchers.scala
new file mode 100644
index 0000000000..15cf9a8501
--- /dev/null
+++ b/kernel/src/main/scala/reactor/Dispatchers.scala
@@ -0,0 +1,44 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.kernel.reactor
+
+import kernel.actor.Actor
+
+/**
+ * Dispatcher factory.
+ *
+ * Example usage:
+ *
+ * val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
+ * .withNewThreadPoolWithBoundedBlockingQueue(100)
+ * .setCorePoolSize(16)
+ * .setMaxPoolSize(128)
+ * .setKeepAliveTimeInMillis(60000)
+ * .setRejectionPolicy(new CallerRunsPolicy)
+ * .buildThreadPool
+ *
+ *
+ *
+ * @author Jonas Bonér
+ */
+object Dispatchers {
+
+ /**
+ * Creates an event based dispatcher serving multiple (millions) of actors through a thread pool.
+ * Has a fluent builder interface for configuring its semantics.
+ */
+ def newEventBasedThreadPoolDispatcher = new EventBasedThreadPoolDispatcher
+
+ /**
+ * Creates an event based dispatcher serving multiple (millions) of actors through a single thread.
+ */
+ def newEventBasedSingleThreadDispatcher = new EventBasedSingleThreadDispatcher
+
+ /**
+ * Creates an thread based dispatcher serving a single actor through the same single thread.
+ * E.g. each actor consumes its own thread.
+ */
+ def newThreadBasedDispatcher(actor: Actor) = new ThreadBasedDispatcher(actor)
+}
\ No newline at end of file
diff --git a/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala b/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala
index 303816d077..1884b10e24 100644
--- a/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala
+++ b/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala
@@ -13,18 +13,18 @@ package se.scalablesolutions.akka.kernel.reactor
class EventBasedSingleThreadDispatcher extends MessageDispatcherBase {
def start = if (!active) {
active = true
- val messageDemultiplexer = new EventBasedSingleThreadDemultiplexer(messageQueue)
+ val messageDemultiplexer = new EventBasedSingleThreadDemultiplexer(queue)
selectorThread = new Thread {
override def run = {
while (active) {
try {
messageDemultiplexer.select
} catch { case e: InterruptedException => active = false }
- val queue = messageDemultiplexer.acquireSelectedQueue
- for (index <- 0 until queue.size) {
- val handle = queue.remove
+ val selectedQueue = messageDemultiplexer.acquireSelectedQueue
+ for (index <- 0 until selectedQueue.size) {
+ val handle = selectedQueue.remove
val handler = messageHandlers.get(handle.sender)
- if (handler != null) handler.handle(handle)
+ if (handler != null) handler.invoke(handle)
}
}
}
@@ -33,14 +33,14 @@ class EventBasedSingleThreadDispatcher extends MessageDispatcherBase {
}
}
-class EventBasedSingleThreadDemultiplexer(private val messageQueue: MessageQueue) extends MessageDemultiplexer {
+class EventBasedSingleThreadDemultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {
import java.util.{LinkedList, Queue}
- private val selectedQueue: Queue[MessageHandle] = new LinkedList[MessageHandle]
+ private val selectedQueue: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
def select = messageQueue.read(selectedQueue)
- def acquireSelectedQueue: Queue[MessageHandle] = selectedQueue
+ def acquireSelectedQueue: Queue[MessageInvocation] = selectedQueue
def releaseSelectedQueue = throw new UnsupportedOperationException("EventBasedSingleThreadDemultiplexer can't release its queue")
diff --git a/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala b/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala
index 1efbe85fb3..4950cda4bf 100644
--- a/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala
+++ b/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala
@@ -77,7 +77,7 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
/**
* This dispatcher code is based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
*/
- val messageDemultiplexer = new EventBasedThreadPoolDemultiplexer(messageQueue)
+ val messageDemultiplexer = new EventBasedThreadPoolDemultiplexer(queue)
selectorThread = new Thread {
override def run = {
while (active) {
@@ -86,19 +86,19 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf]
messageDemultiplexer.select
} catch {case e: InterruptedException => active = false}
- val queue = messageDemultiplexer.acquireSelectedQueue
- for (index <- 0 until queue.size) {
- val message = queue.peek
+ val selectedQueue = messageDemultiplexer.acquireSelectedQueue
+ for (index <- 0 until selectedQueue.size) {
+ val message = selectedQueue.peek
val messageHandler = getIfNotBusy(message.sender)
if (messageHandler.isDefined) {
executor.execute(new Runnable {
override def run = {
- messageHandler.get.handle(message)
+ messageHandler.get.invoke(message)
free(message.sender)
messageDemultiplexer.wakeUp
}
})
- queue.remove
+ selectedQueue.remove
}
}
} finally {
@@ -112,7 +112,7 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
override protected def doShutdown = executor.shutdownNow
- private def getIfNotBusy(key: AnyRef): Option[MessageHandler] = guard.synchronized {
+ private def getIfNotBusy(key: AnyRef): Option[MessageInvoker] = guard.synchronized {
if (CONCURRENT_MODE && messageHandlers.containsKey(key)) Some(messageHandlers.get(key))
else if (!busyHandlers.contains(key) && messageHandlers.containsKey(key)) {
busyHandlers.add(key)
@@ -240,8 +240,8 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
private def ensureNotActive = if (active) throw new IllegalStateException("Can't build a new thread pool for a dispatcher that is already up and running")
}
-class EventBasedThreadPoolDemultiplexer(private val messageQueue: MessageQueue) extends MessageDemultiplexer {
- private val selectedQueue: Queue[MessageHandle] = new LinkedList[MessageHandle]
+class EventBasedThreadPoolDemultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {
+ private val selectedQueue: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
private val selectedQueueLock = new ReentrantLock
def select = try {
@@ -251,7 +251,7 @@ class EventBasedThreadPoolDemultiplexer(private val messageQueue: MessageQueue)
selectedQueueLock.unlock
}
- def acquireSelectedQueue: Queue[MessageHandle] = {
+ def acquireSelectedQueue: Queue[MessageInvocation] = {
selectedQueueLock.lock
selectedQueue
}
diff --git a/kernel/src/main/scala/reactor/MessageDispatcherBase.scala b/kernel/src/main/scala/reactor/MessageDispatcherBase.scala
index b85ccb13a2..8fb35d84b1 100644
--- a/kernel/src/main/scala/reactor/MessageDispatcherBase.scala
+++ b/kernel/src/main/scala/reactor/MessageDispatcherBase.scala
@@ -4,6 +4,7 @@
package se.scalablesolutions.akka.kernel.reactor
+import java.util.{LinkedList, Queue}
import java.util.concurrent.TimeUnit
import java.util.HashMap
@@ -11,14 +12,16 @@ trait MessageDispatcherBase extends MessageDispatcher {
val CONCURRENT_MODE = kernel.Kernel.config.getBool("akka.actor.concurrent-mode", false)
val MILLISECONDS = TimeUnit.MILLISECONDS
- val messageQueue = new MessageQueue
+ val queue = new ReactiveMessageQueue
@volatile protected var active: Boolean = false
- protected val messageHandlers = new HashMap[AnyRef, MessageHandler]
+ protected val messageHandlers = new HashMap[AnyRef, MessageInvoker]
protected var selectorThread: Thread = _
protected val guard = new Object
- def registerHandler(key: AnyRef, handler: MessageHandler) = guard.synchronized {
+ def messageQueue = queue
+
+ def registerHandler(key: AnyRef, handler: MessageInvoker) = guard.synchronized {
messageHandlers.put(key, handler)
}
@@ -37,3 +40,29 @@ trait MessageDispatcherBase extends MessageDispatcher {
*/
protected def doShutdown = {}
}
+
+class ReactiveMessageQueue extends MessageQueue {
+ private[kernel] val queue: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
+ @volatile private var interrupted = false
+
+ def append(handle: MessageInvocation) = queue.synchronized {
+ queue.offer(handle)
+ queue.notifyAll
+ }
+
+ def prepend(handle: MessageInvocation) = queue.synchronized {
+ queue.add(handle)
+ queue.notifyAll
+ }
+
+ def read(destination: Queue[MessageInvocation]) = queue.synchronized {
+ while (queue.isEmpty && !interrupted) queue.wait
+ if (!interrupted) while (!queue.isEmpty) destination.offer(queue.remove)
+ else interrupted = false
+ }
+
+ def interrupt = queue.synchronized {
+ interrupted = true
+ queue.notifyAll
+ }
+}
\ No newline at end of file
diff --git a/kernel/src/main/scala/reactor/Reactor.scala b/kernel/src/main/scala/reactor/Reactor.scala
index 98079d9952..cfc88b56ba 100644
--- a/kernel/src/main/scala/reactor/Reactor.scala
+++ b/kernel/src/main/scala/reactor/Reactor.scala
@@ -2,26 +2,24 @@
* Copyright (C) 2009 Scalable Solutions.
*/
-/**
- * Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].
- * See also this article: [http://today.java.net/cs/user/print/a/350].
- *
- * Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
- */
package se.scalablesolutions.akka.kernel.reactor
-import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.ThreadFactory
-import java.util.{LinkedList, Queue}
+import java.util.Queue
import kernel.stm.Transaction
-import kernel.util.{Logging, HashCode}
-trait MessageHandler {
- def handle(message: MessageHandle)
+import kernel.util.HashCode
+
+trait MessageQueue {
+ def append(handle: MessageInvocation)
+ def prepend(handle: MessageInvocation)
+}
+
+trait MessageInvoker {
+ def invoke(message: MessageInvocation)
}
trait MessageDispatcher {
def messageQueue: MessageQueue
- def registerHandler(key: AnyRef, handler: MessageHandler)
+ def registerHandler(key: AnyRef, handler: MessageInvoker)
def unregisterHandler(key: AnyRef)
def start
def shutdown
@@ -29,15 +27,15 @@ trait MessageDispatcher {
trait MessageDemultiplexer {
def select
- def acquireSelectedQueue: Queue[MessageHandle]
+ def acquireSelectedQueue: Queue[MessageInvocation]
def releaseSelectedQueue
def wakeUp
}
-class MessageHandle(val sender: AnyRef,
- val message: AnyRef,
- val future: Option[CompletableFutureResult],
- val tx: Option[Transaction]) {
+class MessageInvocation(val sender: AnyRef,
+ val message: AnyRef,
+ val future: Option[CompletableFutureResult],
+ val tx: Option[Transaction]) {
override def hashCode(): Int = {
var result = HashCode.SEED
@@ -50,39 +48,13 @@ class MessageHandle(val sender: AnyRef,
override def equals(that: Any): Boolean =
that != null &&
- that.isInstanceOf[MessageHandle] &&
- that.asInstanceOf[MessageHandle].sender == sender &&
- that.asInstanceOf[MessageHandle].message == message &&
- that.asInstanceOf[MessageHandle].future.isDefined == future.isDefined &&
- that.asInstanceOf[MessageHandle].future.get == future.get &&
- that.asInstanceOf[MessageHandle].tx.isDefined == tx.isDefined &&
- that.asInstanceOf[MessageHandle].tx.get.id == tx.get.id
+ that.isInstanceOf[MessageInvocation] &&
+ that.asInstanceOf[MessageInvocation].sender == sender &&
+ that.asInstanceOf[MessageInvocation].message == message &&
+ that.asInstanceOf[MessageInvocation].future.isDefined == future.isDefined &&
+ that.asInstanceOf[MessageInvocation].future.get == future.get &&
+ that.asInstanceOf[MessageInvocation].tx.isDefined == tx.isDefined &&
+ that.asInstanceOf[MessageInvocation].tx.get.id == tx.get.id
- override def toString(): String = "MessageHandle[message = " + message + ", sender = " + sender + ", future = " + future + ", tx = " + tx + "]"
-}
-
-class MessageQueue {
- private[kernel] val queue: Queue[MessageHandle] = new LinkedList[MessageHandle]
- @volatile private var interrupted = false
-
- def append(handle: MessageHandle) = queue.synchronized {
- queue.offer(handle)
- queue.notifyAll
- }
-
- def prepend(handle: MessageHandle) = queue.synchronized {
- queue.add(handle)
- queue.notifyAll
- }
-
- def read(destination: Queue[MessageHandle]) = queue.synchronized {
- while (queue.isEmpty && !interrupted) queue.wait
- if (!interrupted) while (!queue.isEmpty) destination.offer(queue.remove)
- else interrupted = false
- }
-
- def interrupt = queue.synchronized {
- interrupted = true
- queue.notifyAll
- }
+ override def toString(): String = "MessageInvocation[message = " + message + ", sender = " + sender + ", future = " + future + ", tx = " + tx + "]"
}
diff --git a/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala b/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala
new file mode 100644
index 0000000000..b01f373dc9
--- /dev/null
+++ b/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala
@@ -0,0 +1,55 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.kernel.reactor
+
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.Queue
+import kernel.actor.{Actor, ActorMessageInvoker}
+
+/**
+ * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
+ * @author Jonas Bonér
+ */
+class ThreadBasedDispatcher private[kernel] (val messageHandler: MessageInvoker) extends MessageDispatcher {
+ def this(actor: Actor) = this(new ActorMessageInvoker(actor))
+
+ private val queue = new BlockingMessageQueue
+ private var selectorThread: Thread = _
+ @volatile private var active: Boolean = false
+
+ def messageQueue = queue
+
+ def start = if (!active) {
+ active = true
+ selectorThread = new Thread {
+ override def run = {
+ while (active) {
+ try {
+ messageHandler.invoke(queue.take)
+ } catch { case e: InterruptedException => active = false }
+ }
+ }
+ }
+ selectorThread.start
+ }
+
+ def shutdown = if (active) {
+ active = false
+ selectorThread.interrupt
+ }
+
+ def registerHandler(key: AnyRef, handler: MessageInvoker) = throw new UnsupportedOperationException
+ def unregisterHandler(key: AnyRef) = throw new UnsupportedOperationException
+}
+
+class BlockingMessageQueue extends MessageQueue {
+ // FIXME: configure the LBQ
+ private val queue = new LinkedBlockingQueue[MessageInvocation]
+ def append(handle: MessageInvocation) = queue.put(handle)
+ def prepend(handle: MessageInvocation) = queue.add(handle) // FIXME is add prepend???
+ def take: MessageInvocation = queue.take
+ def read(destination: Queue[MessageInvocation]) = throw new UnsupportedOperationException
+ def interrupt = throw new UnsupportedOperationException
+}
\ No newline at end of file
diff --git a/kernel/src/main/scala/state/CassandraStorage.scala b/kernel/src/main/scala/state/CassandraStorage.scala
index b40f37a4b8..496e491686 100644
--- a/kernel/src/main/scala/state/CassandraStorage.scala
+++ b/kernel/src/main/scala/state/CassandraStorage.scala
@@ -53,10 +53,10 @@ final object CassandraStorage extends Logging {
if (!isRunning) {
try {
server.start
- log.info("Persistent storage has started up successfully");
+ log.info("Cassandra persistent storage has started up successfully");
} catch {
case e =>
- log.error("Could not start up persistent storage")
+ log.error("Could not start up Cassandra persistent storage")
throw e
}
if (RUN_THRIFT_SERVICE) {
@@ -214,7 +214,7 @@ class CassandraThriftServer(server: CassandraServer) extends Logging {
options)
} catch {
case e =>
- log.error("Could not start up persistent storage node.")
+ log.error("Could not start up Cassandra thrift service")
throw e
}
@@ -222,9 +222,9 @@ class CassandraThriftServer(server: CassandraServer) extends Logging {
private[this] val serverDaemon = actor {
receive {
case Start =>
- log.info("Cassandra thrift service is starting up...")
serverEngine.serve
- case Stop =>
+ log.info("Cassandra thrift service has starting up successfully")
+ case Stop =>
log.info("Cassandra thrift service is shutting down...")
serverEngine.stop
}
diff --git a/kernel/src/main/scala/state/DataFlowVariable.scala b/kernel/src/main/scala/state/DataFlowVariable.scala
index 93d5865e8f..5e8c8ac77c 100644
--- a/kernel/src/main/scala/state/DataFlowVariable.scala
+++ b/kernel/src/main/scala/state/DataFlowVariable.scala
@@ -39,8 +39,8 @@ object DataFlow {
private class ReactiveEventBasedThread[MessageType, ReturnType](body: MessageType => ReturnType) extends Actor {
def act = loop {
react {
- case message: MessageType => sender ! body(message)
- case 'exit => exit()
+ case 'exit => exit()
+ case message => sender ! body(message.asInstanceOf[MessageType])
}
}
}
diff --git a/kernel/src/main/scala/stm/Transaction.scala b/kernel/src/main/scala/stm/Transaction.scala
index 501c0f84a6..b72eb7c2b2 100644
--- a/kernel/src/main/scala/stm/Transaction.scala
+++ b/kernel/src/main/scala/stm/Transaction.scala
@@ -8,8 +8,6 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import kernel.state.Transactional
import kernel.util.Logging
-class TransactionRollbackException(msg: String) extends RuntimeException(msg)
-
@serializable sealed abstract class TransactionStatus
object TransactionStatus {
case object New extends TransactionStatus
@@ -39,22 +37,22 @@ object TransactionIdFactory {
log.debug("Creating a new transaction with id [%s]", id)
- private[this] val transactionals = new ChangeSet
+ @volatile private[this] var status: TransactionStatus = TransactionStatus.New
+
+ private[this] val transactionalItems = new ChangeSet
private[this] var participants: List[String] = Nil
private[this] var precommitted: List[String] = Nil
-
+
private[this] val depth = new AtomicInteger(0)
- @volatile private[this] var status: TransactionStatus = TransactionStatus.New
-
def increment = synchronized { depth.incrementAndGet }
def decrement = synchronized { depth.decrementAndGet }
- def topLevel_? = synchronized { depth.get == 0 }
+ def isTopLevel = synchronized { depth.get == 0 }
def register(transactional: Transactional) = synchronized {
ensureIsActiveOrNew
- transactionals + transactional
+ transactionalItems + transactional
}
def begin(participant: String) = synchronized {
@@ -83,7 +81,7 @@ object TransactionIdFactory {
}}.exists(_ == true)
} else false
if (haveAllPreCommitted) {
- transactionals.items.foreach(_.commit)
+ transactionalItems.items.foreach(_.commit)
status = TransactionStatus.Completed
reset
true
@@ -97,7 +95,7 @@ object TransactionIdFactory {
def rollback(participant: String) = synchronized {
ensureIsActiveOrAborted
log.debug("TX ROLLBACK - Server with UUID [%s] has initiated transaction rollback for [%s]", participant, toString)
- transactionals.items.foreach(_.rollback)
+ transactionalItems.items.foreach(_.rollback)
status = TransactionStatus.Aborted
reset
}
@@ -105,7 +103,7 @@ object TransactionIdFactory {
def rollbackForRescheduling(participant: String) = synchronized {
ensureIsActiveOrAborted
log.debug("TX ROLLBACK for recheduling - Server with UUID [%s] has initiated transaction rollback for [%s]", participant, toString)
- transactionals.items.foreach(_.rollback)
+ transactionalItems.items.foreach(_.rollback)
reset
}
@@ -121,7 +119,7 @@ object TransactionIdFactory {
def isAborted = status == TransactionStatus.Aborted
private def reset = {
- transactionals.clear
+ transactionalItems.clear
participants = Nil
precommitted = Nil
}
@@ -136,7 +134,7 @@ object TransactionIdFactory {
throw new IllegalStateException("Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString)
// For reinitialize transaction after sending it over the wire
- private[kernel] def reinit = {
+ private[kernel] def reinit = synchronized {
import net.lag.logging.{Logger, Level}
if (log == null) {
log = Logger.get(this.getClass.getName)
diff --git a/kernel/src/main/scala/stm/TransactionManagement.scala b/kernel/src/main/scala/stm/TransactionManagement.scala
index a2300de4a5..ae06fed357 100644
--- a/kernel/src/main/scala/stm/TransactionManagement.scala
+++ b/kernel/src/main/scala/stm/TransactionManagement.scala
@@ -6,8 +6,11 @@ package se.scalablesolutions.akka.kernel.stm
import java.util.concurrent.atomic.AtomicBoolean
+import kernel.reactor.MessageInvocation
import kernel.util.Logging
-import org.codehaus.aspectwerkz.proxy.Uuid
+import org.codehaus.aspectwerkz.proxy.Uuid // FIXME is java.util.UUID better?
+
+class TransactionRollbackException(msg: String) extends RuntimeException(msg)
class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
override def toString(): String = "TransactionAwareWrapperException[" + cause + ", " + tx + "]"
@@ -17,7 +20,8 @@ object TransactionManagement {
val TIME_WAITING_FOR_COMPLETION = kernel.Kernel.config.getInt("akka.stm.wait-for-completion", 100)
val NR_OF_TIMES_WAITING_FOR_COMPLETION = kernel.Kernel.config.getInt("akka.stm.wait-nr-of-times", 3)
val TRANSACTION_ENABLED = new AtomicBoolean(kernel.Kernel.config.getBool("akka.stm.service", true))
- val RESTART_TRANSACTION_ON_COLLISION = kernel.Kernel.config.getBool("akka.stm.restart-transaction", true)
+ // FIXME reenable 'akka.stm.restart-on-collision' when new STM is in place
+ val RESTART_TRANSACTION_ON_COLLISION = false //kernel.Kernel.config.getBool("akka.stm.restart-on-collision", true)
def isTransactionalityEnabled = TRANSACTION_ENABLED.get
def disableTransactions = TRANSACTION_ENABLED.set(false)
@@ -27,9 +31,11 @@ object TransactionManagement {
}
}
-// FIXME: STM that allows concurrent updates, detects collision, rolls back and restarts
trait TransactionManagement extends Logging {
val uuid = Uuid.newUuid.toString
+
+ protected[this] var latestMessage: Option[MessageInvocation] = None
+ protected[this] var messageToReschedule: Option[MessageInvocation] = None
import TransactionManagement.threadBoundTx
private[kernel] var activeTx: Option[Transaction] = None
@@ -74,6 +80,25 @@ trait TransactionManagement extends Logging {
tx.rollbackForRescheduling(uuid)
}
+ protected def handleCollision = {
+ var nrRetries = 0
+ var failed = true
+ do {
+ Thread.sleep(TransactionManagement.TIME_WAITING_FOR_COMPLETION)
+ nrRetries += 1
+ log.debug("Pending transaction [%s] not completed, waiting %s milliseconds. Attempt %s", activeTx.get, TransactionManagement.TIME_WAITING_FOR_COMPLETION, nrRetries)
+ failed = !tryToCommitTransaction
+ } while(nrRetries < TransactionManagement.NR_OF_TIMES_WAITING_FOR_COMPLETION && failed)
+ if (failed) {
+ log.debug("Pending transaction [%s] still not completed, aborting and rescheduling message [%s]", activeTx.get, latestMessage)
+ rollback(activeTx)
+ if (TransactionManagement.RESTART_TRANSACTION_ON_COLLISION) messageToReschedule = Some(latestMessage.get)
+ else throw new TransactionRollbackException("Conflicting transactions, rolling back transaction for message [" + latestMessage + "]")
+ }
+ }
+
+ protected def isTransactionTopLevel = activeTx.isDefined && activeTx.get.isTopLevel
+
protected def isInExistingTransaction = TransactionManagement.threadBoundTx.get.isDefined
protected def isTransactionAborted = activeTx.isDefined && activeTx.get.isAborted
@@ -82,11 +107,10 @@ trait TransactionManagement extends Logging {
protected def decrementTransaction = if (activeTx.isDefined) activeTx.get.decrement
- protected def removeTransactionIfTopLevel =
- if (activeTx.isDefined && activeTx.get.topLevel_?) {
- activeTx = None
- threadBoundTx.set(None)
- }
+ protected def removeTransactionIfTopLevel = if (isTransactionTopLevel) {
+ activeTx = None
+ threadBoundTx.set(None)
+ }
protected def reenteringExistingTransaction= if (activeTx.isDefined) {
val cflowTx = threadBoundTx.get
diff --git a/kernel/src/test/scala/AllTest.scala b/kernel/src/test/scala/AllTest.scala
new file mode 100644
index 0000000000..b0ef909aae
--- /dev/null
+++ b/kernel/src/test/scala/AllTest.scala
@@ -0,0 +1,26 @@
+package se.scalablesolutions.akka.kernel
+
+import junit.framework.Test
+import junit.framework.TestCase
+import junit.framework.TestSuite
+
+import kernel.actor.{ActorSpec, RemoteActorSpec, PersistentActorSpec, InMemoryActorSpec}
+import kernel.reactor.{EventBasedSingleThreadDispatcherTest, EventBasedThreadPoolDispatcherTest}
+
+object AllTest extends TestCase {
+ def suite(): Test = {
+ val suite = new TestSuite("All Scala tests")
+ suite.addTestSuite(classOf[SupervisorSpec])
+ suite.addTestSuite(classOf[RemoteSupervisorSpec])
+ suite.addTestSuite(classOf[EventBasedSingleThreadDispatcherTest])
+ suite.addTestSuite(classOf[EventBasedThreadPoolDispatcherTest])
+ suite.addTestSuite(classOf[ActorSpec])
+ suite.addTestSuite(classOf[RemoteActorSpec])
+ suite.addTestSuite(classOf[PersistentActorSpec])
+ suite.addTestSuite(classOf[InMemoryActorSpec])
+ //suite.addTestSuite(classOf[TransactionClasherSpec])
+ suite
+ }
+
+ def main(args: Array[String]) = junit.textui.TestRunner.run(suite)
+}
\ No newline at end of file
diff --git a/kernel/src/test/scala/AllTests.scala b/kernel/src/test/scala/AllTests.scala
deleted file mode 100644
index 464b2e3922..0000000000
--- a/kernel/src/test/scala/AllTests.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-package se.scalablesolutions.akka.kernel
-
-import junit.framework.Test
-import junit.framework.TestCase
-import junit.framework.TestSuite
-
-object AllTests extends TestCase {
- def suite(): Test = {
- val suite = new TestSuite("All tests")
- suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.SupervisorSpec])
- suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.RemoteSupervisorSpec])
- suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.reactor.EventBasedDispatcherTest])
- suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.reactor.ThreadBasedDispatcherTest])
- suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.actor.ActorSpec])
- suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.actor.RemoteActorSpec])
- suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.actor.PersistentActorSpec])
- suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.actor.InMemoryActorSpec])
- suite
- }
-
- def main(args: Array[String]) = junit.textui.TestRunner.run(suite)
-}
\ No newline at end of file
diff --git a/kernel/src/test/scala/EventBasedDispatcherTest.scala b/kernel/src/test/scala/EventBasedSingleThreadDispatcherTest.scala
similarity index 81%
rename from kernel/src/test/scala/EventBasedDispatcherTest.scala
rename to kernel/src/test/scala/EventBasedSingleThreadDispatcherTest.scala
index 8f346f69d9..dc272893f5 100644
--- a/kernel/src/test/scala/EventBasedDispatcherTest.scala
+++ b/kernel/src/test/scala/EventBasedSingleThreadDispatcherTest.scala
@@ -9,13 +9,13 @@ import org.junit.{Test, Before}
import org.junit.Assert._
import junit.framework.TestCase
-class EventBasedDispatcherTest extends TestCase {
+class EventBasedSingleThreadDispatcherTest extends TestCase {
private var threadingIssueDetected: AtomicBoolean = null
- class TestMessageHandle(handleLatch: CountDownLatch) extends MessageHandler {
+ class TestMessageHandle(handleLatch: CountDownLatch) extends MessageInvoker {
val guardLock: Lock = new ReentrantLock
- def handle(message: MessageHandle) {
+ def invoke(message: MessageInvocation) {
try {
if (threadingIssueDetected.get) return
if (guardLock.tryLock) {
@@ -59,7 +59,7 @@ class EventBasedDispatcherTest extends TestCase {
dispatcher.registerHandler(key, new TestMessageHandle(handleLatch))
dispatcher.start
for (i <- 0 until 100) {
- dispatcher.messageQueue.append(new MessageHandle(key, new Object, None, None))
+ dispatcher.messageQueue.append(new MessageInvocation(key, new Object, None, None))
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
@@ -73,8 +73,8 @@ class EventBasedDispatcherTest extends TestCase {
dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch))
dispatcher.registerHandler(key2, new TestMessageHandle(handleLatch))
dispatcher.start
- dispatcher.messageQueue.append(new MessageHandle(key1, new Object, None, None))
- dispatcher.messageQueue.append(new MessageHandle(key2, new Object, None, None))
+ dispatcher.messageQueue.append(new MessageInvocation(key1, new Object, None, None))
+ dispatcher.messageQueue.append(new MessageInvocation(key2, new Object, None, None))
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
}
@@ -84,9 +84,9 @@ class EventBasedDispatcherTest extends TestCase {
val key1 = "key1"
val key2 = "key2"
val dispatcher = new EventBasedSingleThreadDispatcher
- dispatcher.registerHandler(key1, new MessageHandler {
+ dispatcher.registerHandler(key1, new MessageInvoker {
var currentValue = -1;
- def handle(message: MessageHandle) {
+ def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
@@ -95,9 +95,9 @@ class EventBasedDispatcherTest extends TestCase {
} else threadingIssueDetected.set(true)
}
})
- dispatcher.registerHandler(key2, new MessageHandler {
+ dispatcher.registerHandler(key2, new MessageInvoker {
var currentValue = -1;
- def handle(message: MessageHandle) {
+ def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
@@ -108,8 +108,8 @@ class EventBasedDispatcherTest extends TestCase {
})
dispatcher.start
for (i <- 0 until 100) {
- dispatcher.messageQueue.append(new MessageHandle(key1, new Integer(i), None, None))
- dispatcher.messageQueue.append(new MessageHandle(key2, new Integer(i), None, None))
+ dispatcher.messageQueue.append(new MessageInvocation(key1, new Integer(i), None, None))
+ dispatcher.messageQueue.append(new MessageInvocation(key2, new Integer(i), None, None))
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
diff --git a/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala b/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala
new file mode 100644
index 0000000000..ea3b83729e
--- /dev/null
+++ b/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala
@@ -0,0 +1,143 @@
+package se.scalablesolutions.akka.kernel.reactor
+
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.locks.Lock
+import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.{Executors, CountDownLatch, CyclicBarrier, TimeUnit}
+import org.junit.Before
+import org.junit.Test
+import org.junit.Assert._
+import junit.framework.TestCase
+
+class EventBasedThreadPoolDispatcherTest extends TestCase {
+ private var threadingIssueDetected: AtomicBoolean = null
+
+ @Before
+ override def setUp = {
+ threadingIssueDetected = new AtomicBoolean(false)
+ }
+
+ @Test
+ def testMessagesDispatchedToTheSameHandlerAreExecutedSequentially = {
+ internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially
+ }
+
+ @Test
+ def testMessagesDispatchedToDifferentHandlersAreExecutedConcurrently = {
+ internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently
+ }
+
+ @Test
+ def testMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
+ internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
+ }
+
+ private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
+ val guardLock = new ReentrantLock
+ val handleLatch = new CountDownLatch(10)
+ val key = "key"
+ val dispatcher = new EventBasedThreadPoolDispatcher
+ dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
+ .setCorePoolSize(2)
+ .setMaxPoolSize(4)
+ .setKeepAliveTimeInMillis(60000)
+ .setRejectionPolicy(new CallerRunsPolicy)
+ .buildThreadPool
+ dispatcher.registerHandler(key, new MessageInvoker {
+ def invoke(message: MessageInvocation) {
+ try {
+ if (threadingIssueDetected.get) return
+ if (guardLock.tryLock) {
+ Thread.sleep(100)
+ handleLatch.countDown
+ } else {
+ threadingIssueDetected.set(true)
+ }
+ } catch {
+ case e: Exception => threadingIssueDetected.set(true); e.printStackTrace
+ } finally {
+ guardLock.unlock
+ }
+ }
+ })
+ dispatcher.start
+ for (i <- 0 until 10) {
+ dispatcher.messageQueue.append(new MessageInvocation(key, new Object, None, None))
+ }
+ assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
+ assertFalse(threadingIssueDetected.get)
+ }
+
+ private def internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently: Unit = {
+ val handlersBarrier = new CyclicBarrier(3)
+ val key1 = "key1"
+ val key2 = "key2"
+ val dispatcher = new EventBasedThreadPoolDispatcher
+ dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
+ .setCorePoolSize(2)
+ .setMaxPoolSize(4)
+ .setKeepAliveTimeInMillis(60000)
+ .setRejectionPolicy(new CallerRunsPolicy)
+ .buildThreadPool
+ dispatcher.registerHandler(key1, new MessageInvoker {
+ def invoke(message: MessageInvocation) = synchronized {
+ try {handlersBarrier.await(1, TimeUnit.SECONDS)}
+ catch {case e: Exception => threadingIssueDetected.set(true)}
+ }
+ })
+ dispatcher.registerHandler(key2, new MessageInvoker {
+ def invoke(message: MessageInvocation) = synchronized {
+ try {handlersBarrier.await(1, TimeUnit.SECONDS)}
+ catch {case e: Exception => threadingIssueDetected.set(true)}
+ }
+ })
+ dispatcher.start
+ dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1", None, None))
+ dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2", None, None))
+ handlersBarrier.await(5, TimeUnit.SECONDS)
+ assertFalse(threadingIssueDetected.get)
+ }
+
+ private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = {
+ val handleLatch = new CountDownLatch(200)
+ val key1 = "key1"
+ val key2 = "key2"
+ val dispatcher = new EventBasedThreadPoolDispatcher
+ dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
+ .setCorePoolSize(2)
+ .setMaxPoolSize(4)
+ .setKeepAliveTimeInMillis(60000)
+ .setRejectionPolicy(new CallerRunsPolicy)
+ .buildThreadPool
+ dispatcher.registerHandler(key1, new MessageInvoker {
+ var currentValue = -1;
+ def invoke(message: MessageInvocation) {
+ if (threadingIssueDetected.get) return
+ val messageValue = message.message.asInstanceOf[Int]
+ if (messageValue.intValue == currentValue + 1) {
+ currentValue = messageValue.intValue
+ handleLatch.countDown
+ } else threadingIssueDetected.set(true)
+ }
+ })
+ dispatcher.registerHandler(key2, new MessageInvoker {
+ var currentValue = -1;
+ def invoke(message: MessageInvocation) {
+ if (threadingIssueDetected.get) return
+ val messageValue = message.message.asInstanceOf[Int]
+ if (messageValue.intValue == currentValue + 1) {
+ currentValue = messageValue.intValue
+ handleLatch.countDown
+ } else threadingIssueDetected.set(true)
+ }
+ })
+ dispatcher.start
+ for (i <- 0 until 100) {
+ dispatcher.messageQueue.append(new MessageInvocation(key1, new Integer(i), None, None))
+ dispatcher.messageQueue.append(new MessageInvocation(key2, new Integer(i), None, None))
+ }
+ assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
+ assertFalse(threadingIssueDetected.get)
+ }
+}
diff --git a/kernel/src/test/scala/InMemoryActorSpec.scala b/kernel/src/test/scala/InMemoryActorSpec.scala
index 7e541d5267..9f5e2eb797 100644
--- a/kernel/src/test/scala/InMemoryActorSpec.scala
+++ b/kernel/src/test/scala/InMemoryActorSpec.scala
@@ -140,7 +140,7 @@ class InMemoryActorSpec extends TestCase {
Thread.sleep(100)
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
Thread.sleep(100)
- assertEquals("new state", (stateful !! GetVectorSize).get)
+ assertEquals(2, (stateful !! GetVectorSize).get)
}
@Test
@@ -225,4 +225,5 @@ class InMemoryActorSpec extends TestCase {
} catch {case e: RuntimeException => {}}
assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state
}
+
}
\ No newline at end of file
diff --git a/kernel/src/test/scala/RemoteSupervisorSpec.scala b/kernel/src/test/scala/RemoteSupervisorSpec.scala
index e1c199234a..86805f2d84 100644
--- a/kernel/src/test/scala/RemoteSupervisorSpec.scala
+++ b/kernel/src/test/scala/RemoteSupervisorSpec.scala
@@ -474,7 +474,7 @@ class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100),
- Worker(
+ Supervise(
pingpong1,
LifeCycle(Permanent, 100))
:: Nil)
@@ -491,7 +491,7 @@ class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100),
- Worker(
+ Supervise(
pingpong1,
LifeCycle(Permanent, 100))
:: Nil)
@@ -512,15 +512,15 @@ class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100),
- Worker(
+ Supervise(
pingpong1,
LifeCycle(Permanent, 100))
::
- Worker(
+ Supervise(
pingpong2,
LifeCycle(Permanent, 100))
::
- Worker(
+ Supervise(
pingpong3,
LifeCycle(Permanent, 100))
:: Nil)
@@ -541,15 +541,15 @@ class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100),
- Worker(
+ Supervise(
pingpong1,
LifeCycle(Permanent, 100))
::
- Worker(
+ Supervise(
pingpong2,
LifeCycle(Permanent, 100))
::
- Worker(
+ Supervise(
pingpong3,
LifeCycle(Permanent, 100))
:: Nil)
@@ -570,17 +570,17 @@ class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100),
- Worker(
+ Supervise(
pingpong1,
LifeCycle(Permanent, 100))
::
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100),
- Worker(
+ Supervise(
pingpong2,
LifeCycle(Permanent, 100))
::
- Worker(
+ Supervise(
pingpong3,
LifeCycle(Permanent, 100))
:: Nil)
diff --git a/kernel/src/test/scala/SupervisorSpec.scala b/kernel/src/test/scala/SupervisorSpec.scala
index 6517d6530a..9c49da7d54 100644
--- a/kernel/src/test/scala/SupervisorSpec.scala
+++ b/kernel/src/test/scala/SupervisorSpec.scala
@@ -460,7 +460,7 @@ class SupervisorSpec extends junit.framework.TestCase with Suite {
override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100),
- Worker(
+ Supervise(
pingpong1,
LifeCycle(Permanent, 100))
:: Nil)
@@ -476,7 +476,7 @@ class SupervisorSpec extends junit.framework.TestCase with Suite {
override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100),
- Worker(
+ Supervise(
pingpong1,
LifeCycle(Permanent, 100))
:: Nil)
@@ -494,15 +494,15 @@ class SupervisorSpec extends junit.framework.TestCase with Suite {
override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100),
- Worker(
+ Supervise(
pingpong1,
LifeCycle(Permanent, 100))
::
- Worker(
+ Supervise(
pingpong2,
LifeCycle(Permanent, 100))
::
- Worker(
+ Supervise(
pingpong3,
LifeCycle(Permanent, 100))
:: Nil)
@@ -520,15 +520,15 @@ class SupervisorSpec extends junit.framework.TestCase with Suite {
override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100),
- Worker(
+ Supervise(
pingpong1,
LifeCycle(Permanent, 100))
::
- Worker(
+ Supervise(
pingpong2,
LifeCycle(Permanent, 100))
::
- Worker(
+ Supervise(
pingpong3,
LifeCycle(Permanent, 100))
:: Nil)
@@ -546,17 +546,17 @@ class SupervisorSpec extends junit.framework.TestCase with Suite {
override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100),
- Worker(
+ Supervise(
pingpong1,
LifeCycle(Permanent, 100))
::
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100),
- Worker(
+ Supervise(
pingpong2,
LifeCycle(Permanent, 100))
::
- Worker(
+ Supervise(
pingpong3,
LifeCycle(Permanent, 100))
:: Nil)
diff --git a/kernel/src/test/scala/ThreadBasedDispatcherTest.scala b/kernel/src/test/scala/ThreadBasedDispatcherTest.scala
index dde09f75ec..621e3dccfd 100644
--- a/kernel/src/test/scala/ThreadBasedDispatcherTest.scala
+++ b/kernel/src/test/scala/ThreadBasedDispatcherTest.scala
@@ -1,18 +1,36 @@
package se.scalablesolutions.akka.kernel.reactor
-import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock
-import java.util.concurrent.{Executors, CountDownLatch, CyclicBarrier, TimeUnit}
-import org.junit.Before
-import org.junit.Test
+import org.junit.{Test, Before}
import org.junit.Assert._
import junit.framework.TestCase
class ThreadBasedDispatcherTest extends TestCase {
private var threadingIssueDetected: AtomicBoolean = null
+ class TestMessageHandle(handleLatch: CountDownLatch) extends MessageInvoker {
+ val guardLock: Lock = new ReentrantLock
+
+ def invoke(message: MessageInvocation) {
+ try {
+ if (threadingIssueDetected.get) return
+ if (guardLock.tryLock) {
+ handleLatch.countDown
+ } else {
+ threadingIssueDetected.set(true)
+ }
+ } catch {
+ case e: Exception => threadingIssueDetected.set(true)
+ } finally {
+ guardLock.unlock
+ }
+ }
+ }
+
@Before
override def setUp = {
threadingIssueDetected = new AtomicBoolean(false)
@@ -23,11 +41,6 @@ class ThreadBasedDispatcherTest extends TestCase {
internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially
}
- @Test
- def testMessagesDispatchedToDifferentHandlersAreExecutedConcurrently = {
- internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently
- }
-
@Test
def testMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
@@ -35,95 +48,21 @@ class ThreadBasedDispatcherTest extends TestCase {
private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
val guardLock = new ReentrantLock
- val handleLatch = new CountDownLatch(10)
- val key = "key"
- val dispatcher = new EventBasedThreadPoolDispatcher
- dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
- .setCorePoolSize(2)
- .setMaxPoolSize(4)
- .setKeepAliveTimeInMillis(60000)
- .setRejectionPolicy(new CallerRunsPolicy)
- .buildThreadPool
- dispatcher.registerHandler(key, new MessageHandler {
- def handle(message: MessageHandle) {
- try {
- if (threadingIssueDetected.get) return
- if (guardLock.tryLock) {
- Thread.sleep(100)
- handleLatch.countDown
- } else {
- threadingIssueDetected.set(true)
- }
- } catch {
- case e: Exception => threadingIssueDetected.set(true); e.printStackTrace
- } finally {
- guardLock.unlock
- }
- }
- })
+ val handleLatch = new CountDownLatch(100)
+ val dispatcher = new ThreadBasedDispatcher(new TestMessageHandle(handleLatch))
dispatcher.start
- for (i <- 0 until 10) {
- dispatcher.messageQueue.append(new MessageHandle(key, new Object, None, None))
+ for (i <- 0 until 100) {
+ dispatcher.messageQueue.append(new MessageInvocation("id", new Object, None, None))
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
}
- private def internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently: Unit = {
- val handlersBarrier = new CyclicBarrier(3)
- val key1 = "key1"
- val key2 = "key2"
- val dispatcher = new EventBasedThreadPoolDispatcher
- dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
- .setCorePoolSize(2)
- .setMaxPoolSize(4)
- .setKeepAliveTimeInMillis(60000)
- .setRejectionPolicy(new CallerRunsPolicy)
- .buildThreadPool
- dispatcher.registerHandler(key1, new MessageHandler {
- def handle(message: MessageHandle) = synchronized {
- try {handlersBarrier.await(1, TimeUnit.SECONDS)}
- catch {case e: Exception => threadingIssueDetected.set(true)}
- }
- })
- dispatcher.registerHandler(key2, new MessageHandler {
- def handle(message: MessageHandle) = synchronized {
- try {handlersBarrier.await(1, TimeUnit.SECONDS)}
- catch {case e: Exception => threadingIssueDetected.set(true)}
- }
- })
- dispatcher.start
- dispatcher.messageQueue.append(new MessageHandle(key1, "Sending Message 1", None, None))
- dispatcher.messageQueue.append(new MessageHandle(key2, "Sending Message 2", None, None))
- handlersBarrier.await(5, TimeUnit.SECONDS)
- assertFalse(threadingIssueDetected.get)
- }
-
private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = {
- val handleLatch = new CountDownLatch(200)
- val key1 = "key1"
- val key2 = "key2"
- val dispatcher = new EventBasedThreadPoolDispatcher
- dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
- .setCorePoolSize(2)
- .setMaxPoolSize(4)
- .setKeepAliveTimeInMillis(60000)
- .setRejectionPolicy(new CallerRunsPolicy)
- .buildThreadPool
- dispatcher.registerHandler(key1, new MessageHandler {
+ val handleLatch = new CountDownLatch(100)
+ val dispatcher = new ThreadBasedDispatcher(new MessageInvoker {
var currentValue = -1;
- def handle(message: MessageHandle) {
- if (threadingIssueDetected.get) return
- val messageValue = message.message.asInstanceOf[Int]
- if (messageValue.intValue == currentValue + 1) {
- currentValue = messageValue.intValue
- handleLatch.countDown
- } else threadingIssueDetected.set(true)
- }
- })
- dispatcher.registerHandler(key2, new MessageHandler {
- var currentValue = -1;
- def handle(message: MessageHandle) {
+ def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
@@ -134,10 +73,10 @@ class ThreadBasedDispatcherTest extends TestCase {
})
dispatcher.start
for (i <- 0 until 100) {
- dispatcher.messageQueue.append(new MessageHandle(key1, new Integer(i), None, None))
- dispatcher.messageQueue.append(new MessageHandle(key2, new Integer(i), None, None))
+ dispatcher.messageQueue.append(new MessageInvocation("id", new Integer(i), None, None))
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
+ dispatcher.shutdown
}
}
diff --git a/kernel/src/test/scala/TransactionClasherSpec.scala b/kernel/src/test/scala/TransactionClasherSpec.scala
index 3000b34931..4177036c90 100644
--- a/kernel/src/test/scala/TransactionClasherSpec.scala
+++ b/kernel/src/test/scala/TransactionClasherSpec.scala
@@ -48,9 +48,42 @@ class TxClasherActor extends Actor {
}
}
+class TxActorOneWay(clasher: Actor) extends Actor {
+ timeout = 1000000
+ makeTransactional
+
+ def receive: PartialFunction[Any, Unit] = {
+ case msg: AnyRef =>
+ clasher ! msg
+ }
+}
+
+class TxClasherActorOneWay extends Actor {
+ val vector = TransactionalState.newInMemoryVector[String]
+ timeout = 1000000
+ makeTransactional
+ var count = 0
+ def receive: PartialFunction[Any, Unit] = {
+ case "First" =>
+ if (count == 0) Thread.sleep(5000)
+ count += 1
+ println("FIRST")
+ vector.add("First")
+ println("--- VECTOR: " + vector)
+ case "Second" =>
+ println("SECOND")
+ vector.add("Second")
+ println("--- VECTOR: " + vector)
+ case "Index0" =>
+ reply(vector(0))
+ case "Index1" =>
+ reply(vector(1))
+ }
+}
+
class TransactionClasherSpec extends TestCase {
@Test
- def testX = {
+ def testBangBangClash = {
val clasher = new TxClasherActor
clasher.start
val txActor1 = new TxActor(clasher)
@@ -70,6 +103,27 @@ class TransactionClasherSpec extends TestCase {
} catch { case e: TransactionRollbackException => {} }
}
+ @Test
+ def testBangClash = {
+ val clasher = new TxClasherActorOneWay
+ clasher.start
+ val txActor1 = new TxActorOneWay(clasher)
+ txActor1.start
+ val txActor2 = new TxActorOneWay(clasher)
+ txActor2.start
+
+ val t1 = new Thread(new Runnable() {
+ def run = {
+ txActor1 ! "First"
+ }
+ }).start
+ Thread.sleep(1000)
+ try {
+ txActor2 ! "Second"
+ fail("Expected TransactionRollbackException")
+ } catch { case e: TransactionRollbackException => {} }
+ }
+
/*
@Test
def testX = {
diff --git a/util-java/src/main/java/se/scalablesolutions/akka/annotation/configuration.java b/util-java/src/main/java/se/scalablesolutions/akka/annotation/configuration.java
new file mode 100644
index 0000000000..4c662945b6
--- /dev/null
+++ b/util-java/src/main/java/se/scalablesolutions/akka/annotation/configuration.java
@@ -0,0 +1,11 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.annotation;
+
+import java.lang.annotation.*;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface configuration {}