diff --git a/akka.ipr b/akka.ipr
index 8cb30124e6..10621d8129 100644
--- a/akka.ipr
+++ b/akka.ipr
@@ -562,7 +562,7 @@
-
+
diff --git a/akka.iws b/akka.iws
index 7b4e8ac4c6..e3f0a53a0d 100644
--- a/akka.iws
+++ b/akka.iws
@@ -1,20 +1,62 @@
+
+
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
@@ -31,7 +73,7 @@
-
+
@@ -44,7 +86,46 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -120,91 +201,91 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
+
+
-
+
-
-
+
+
-
+
-
-
+
+
-
+
-
-
+
+
-
+
-
-
+
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -223,27 +304,28 @@
+
@@ -302,118 +384,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -424,6 +394,10 @@
+
+
+
+
@@ -456,6 +430,14 @@
+
+
+
+
+
+
+
+
@@ -483,6 +465,66 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -543,6 +585,156 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -618,8 +810,38 @@
+
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -652,6 +874,44 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -692,6 +952,374 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -712,6 +1340,7 @@
+
@@ -738,7 +1367,7 @@
-
+
@@ -778,24 +1407,26 @@
-
+
-
+
-
-
+
+
-
-
-
+
+
+
-
+
+
+
@@ -832,6 +1463,51 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -843,7 +1519,9 @@
-
+
+
+
@@ -852,52 +1530,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -915,7 +1547,9 @@
-
+
+
+
@@ -930,6 +1564,102 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -947,7 +1677,9 @@
-
+
+
+
@@ -962,38 +1694,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
@@ -1011,7 +1711,9 @@
-
+
+
+
@@ -1037,7 +1739,9 @@
-
+
+
+
@@ -1052,7 +1756,7 @@
-
+
@@ -1069,7 +1773,9 @@
-
+
+
+
@@ -1078,7 +1784,7 @@
-
+
@@ -1095,7 +1801,9 @@
-
+
+
+
@@ -1127,7 +1835,9 @@
-
+
+
+
@@ -1159,117 +1869,9 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
@@ -1295,7 +1897,9 @@
-
+
+
+
@@ -1327,7 +1931,9 @@
-
+
+
+
@@ -1359,7 +1965,9 @@
-
+
+
+
@@ -1374,12 +1982,74 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -1391,7 +2061,9 @@
-
+
+
+
@@ -1400,15 +2072,15 @@
-
-
+
+
-
-
+
+
-
+
@@ -1417,9 +2089,11 @@
-
+
+
+
-
+
@@ -1444,44 +2118,27 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
+
+
+
+
+
-
-
+
+
-
-
-
+
+
+
-
-
-
-
-
-
-
+
+
+
+
localhost
@@ -1563,7 +2220,7 @@
-
+
@@ -1571,8 +2228,8 @@
-
-
+
+
@@ -1625,116 +2282,114 @@
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
+
-
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/bin/start-akka-server.sh b/bin/start-akka-server.sh
index 3cdf818317..362875f95c 100755
--- a/bin/start-akka-server.sh
+++ b/bin/start-akka-server.sh
@@ -1,6 +1,6 @@
#!/bin/bash
-VERSION=0.1
+VERSION=0.5
#if [ $# -gt 1 ];
#then
@@ -29,16 +29,9 @@ CLASSPATH=$CLASSPATH:$BASE_DIR/lib/scala-library-2.7.5.jar
CLASSPATH=$CLASSPATH:$BASE_DIR/lib/configgy-1.3.jar
CLASSPATH=$CLASSPATH:$BASE_DIR/config
-STORAGE_OPTS=" \
- -Dcassandra \
- -Dstorage-config=$BASE_DIR/config/ \
- -Dpidfile=akka.pid"
-
# To have Akka dump the generated classes, add the '-Daspectwerkz.transform.dump=*' option and it will dump classes to $BASE_DIR/_dump
JVM_OPTS=" \
-server \
- -Xdebug \
- -Xrunjdwp:transport=dt_socket,server=y,address=8888,suspend=n \
-Xms128M \
-Xmx1G \
-XX:SurvivorRatio=8 \
@@ -54,6 +47,6 @@ JVM_OPTS=" \
-Dcom.sun.management.jmxremote.authenticate=false"
echo "Starting up with options:
-"$JAVA_HOME/bin/java $JVM_OPTS $STORAGE_OPTS -cp $CLASSPATH se.scalablesolutions.akka.Boot se.scalablesolutions.akka.kernel.Kernel ${1}
+"$JAVA_HOME/bin/java $JVM_OPTS -cp $CLASSPATH se.scalablesolutions.akka.Boot se.scalablesolutions.akka.kernel.Kernel ${1}
-$JAVA_HOME/bin/java $JVM_OPTS $STORAGE_OPTS -cp $CLASSPATH se.scalablesolutions.akka.Boot se.scalablesolutions.akka.kernel.Kernel ${1}
+$JAVA_HOME/bin/java $JVM_OPTS -cp $CLASSPATH se.scalablesolutions.akka.Boot se.scalablesolutions.akka.kernel.Kernel ${1}
diff --git a/config/log4j.properties b/config/log4j.properties
index c03de259b2..fc1b7535f9 100755
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@ -2,7 +2,7 @@
# and the pattern to %c instead of %l. (%l is slower.)
# output messages into a rolling log file as well as stdout
-log4j.rootLogger=DEBUG,stdout,R
+log4j.rootLogger=INFO,stdout,R
# stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
diff --git a/fun-test-java/akka-fun-test-java.iml b/fun-test-java/akka-fun-test-java.iml
index ff6bbd0ffb..0b286c1ece 100644
--- a/fun-test-java/akka-fun-test-java.iml
+++ b/fun-test-java/akka-fun-test-java.iml
@@ -37,6 +37,13 @@
+
+
+
+
+
+
+
@@ -60,12 +67,6 @@
-
-
-
-
-
-
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
index ed4bc9295d..5612f97d20 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
@@ -24,21 +24,13 @@ import java.util.concurrent.ThreadPoolExecutor;
public class ActiveObjectGuiceConfiguratorTest extends TestCase {
static String messageLog = "";
- static {
- new Thread(new Runnable() {
- public void run() {
- RemoteServer server = new RemoteServer();
- server.start();
- }
- }).start();
- try { Thread.currentThread().sleep(1000); } catch (Exception e) {}
- }
-
- final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
- protected void setUp() {
- EventBasedThreadPoolDispatcher dispatcher = new EventBasedThreadPoolDispatcher();
- dispatcher
+ final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
+
+ protected void setUp() {
+ se.scalablesolutions.akka.kernel.Kernel$.MODULE$.config();
+ EventBasedThreadPoolDispatcher dispatcher = new EventBasedThreadPoolDispatcher();
+ dispatcher
.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(16)
.setMaxPoolSize(128)
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
index d78f2b741c..aef87f9a92 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
@@ -7,7 +7,7 @@ package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.kernel.config.*;
import static se.scalablesolutions.akka.kernel.config.JavaConfig.*;
import se.scalablesolutions.akka.kernel.actor.*;
-
+import se.scalablesolutions.akka.kernel.Kernel;
import junit.framework.TestCase;
public class InMemNestedStateTest extends TestCase {
@@ -26,6 +26,7 @@ public class InMemNestedStateTest extends TestCase {
new Component(InMemFailer.class, new LifeCycle(new Permanent(), 1000), 1000)
//new Component("inmem-clasher", InMemClasher.class, InMemClasherImpl.class, new LifeCycle(new Permanent(), 1000), 100000)
}).inject().supervise();
+ se.scalablesolutions.akka.kernel.Kernel$.MODULE$.config();
}
protected void tearDown() {
@@ -37,7 +38,7 @@ public class InMemNestedStateTest extends TestCase {
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
InMemStatefulNested nested = conf.getActiveObject(InMemStatefulNested.class);
nested.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactionrequired
assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
assertEquals("new state", nested.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
}
@@ -49,7 +50,7 @@ public class InMemNestedStateTest extends TestCase {
nested.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
InMemFailer failer = conf.getActiveObject(InMemFailer.class);
try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
@@ -62,7 +63,7 @@ public class InMemNestedStateTest extends TestCase {
stateful.setVectorState("init"); // set init state
InMemStatefulNested nested = conf.getActiveObject(InMemStatefulNested.class);
nested.setVectorState("init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactionrequired
assertEquals("new state", stateful.getVectorState());
assertEquals("new state", nested.getVectorState());
}
@@ -74,7 +75,7 @@ public class InMemNestedStateTest extends TestCase {
nested.setVectorState("init"); // set init state
InMemFailer failer = conf.getActiveObject(InMemFailer.class);
try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
@@ -87,7 +88,7 @@ public class InMemNestedStateTest extends TestCase {
InMemStatefulNested nested = conf.getActiveObject(InMemStatefulNested.class);
stateful.setRefState("init"); // set init state
nested.setRefState("init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactionrequired
assertEquals("new state", stateful.getRefState());
assertEquals("new state", nested.getRefState());
}
@@ -99,7 +100,7 @@ public class InMemNestedStateTest extends TestCase {
nested.setRefState("init"); // set init state
InMemFailer failer = conf.getActiveObject(InMemFailer.class);
try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java
index 092dc629fb..2ee03a893e 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java
@@ -1,48 +1,41 @@
package se.scalablesolutions.akka.api;
-import se.scalablesolutions.akka.annotation.state;
-import se.scalablesolutions.akka.annotation.transactional;
+import se.scalablesolutions.akka.annotation.transactionrequired;
import se.scalablesolutions.akka.annotation.prerestart;
import se.scalablesolutions.akka.annotation.postrestart;
import se.scalablesolutions.akka.kernel.state.*;
+@transactionrequired
public class InMemStateful {
private TransactionalState factory = new TransactionalState();
private TransactionalMap mapState = factory.newInMemoryMap();
private TransactionalVector vectorState = factory.newInMemoryVector();
private TransactionalRef refState = factory.newInMemoryRef();
- @transactional
public String getMapState(String key) {
return (String)mapState.get(key).get();
}
- @transactional
public String getVectorState() {
return (String)vectorState.last();
}
- @transactional
public String getRefState() {
return (String)refState.get().get();
}
- @transactional
public void setMapState(String key, String msg) {
mapState.put(key, msg);
}
- @transactional
public void setVectorState(String msg) {
vectorState.add(msg);
}
- @transactional
public void setRefState(String msg) {
refState.swap(msg);
}
- @transactional
public void success(String key, String msg) {
mapState.put(key, msg);
vectorState.add(msg);
@@ -56,7 +49,6 @@ public class InMemStateful {
nested.success(key, msg);
}
- @transactional
public String failure(String key, String msg, InMemFailer failer) {
mapState.put(key, msg);
vectorState.add(msg);
@@ -65,7 +57,6 @@ public class InMemStateful {
return msg;
}
- @transactional
public String failure(String key, String msg, InMemStatefulNested nested, InMemFailer failer) {
mapState.put(key, msg);
vectorState.add(msg);
@@ -74,7 +65,6 @@ public class InMemStateful {
return msg;
}
- @transactional
public void thisMethodHangs(String key, String msg, InMemFailer failer) {
setMapState(key, msg);
}
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java
index 867a20ffb2..0f6921f28b 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStatefulNested.java
@@ -1,52 +1,53 @@
package se.scalablesolutions.akka.api;
-import se.scalablesolutions.akka.annotation.transactional;
+import se.scalablesolutions.akka.annotation.transactionrequired;
import se.scalablesolutions.akka.kernel.state.*;
+@transactionrequired
public class InMemStatefulNested {
private TransactionalState factory = new TransactionalState();
private TransactionalMap mapState = factory.newInMemoryMap();
private TransactionalVector vectorState = factory.newInMemoryVector();
private TransactionalRef refState = factory.newInMemoryRef();
- @transactional
+
public String getMapState(String key) {
return (String)mapState.get(key).get();
}
- @transactional
+
public String getVectorState() {
return (String)vectorState.last();
}
- @transactional
+
public String getRefState() {
return (String)refState.get().get();
}
- @transactional
+
public void setMapState(String key, String msg) {
mapState.put(key, msg);
}
- @transactional
+
public void setVectorState(String msg) {
vectorState.add(msg);
}
- @transactional
+
public void setRefState(String msg) {
refState.swap(msg);
}
- @transactional
+
public void success(String key, String msg) {
mapState.put(key, msg);
vectorState.add(msg);
refState.swap(msg);
}
- @transactional
+
public String failure(String key, String msg, InMemFailer failer) {
mapState.put(key, msg);
vectorState.add(msg);
@@ -55,7 +56,7 @@ public class InMemStatefulNested {
return msg;
}
- @transactional
+
public void thisMethodHangs(String key, String msg, InMemFailer failer) {
setMapState(key, msg);
}
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java
index 7067fab0d0..4b4b8e975f 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java
@@ -4,19 +4,21 @@
package se.scalablesolutions.akka.api;
+import junit.framework.TestCase;
+
import se.scalablesolutions.akka.kernel.config.*;
import static se.scalablesolutions.akka.kernel.config.JavaConfig.*;
import se.scalablesolutions.akka.kernel.actor.*;
-
-import junit.framework.TestCase;
+import se.scalablesolutions.akka.kernel.Kernel;
public class InMemoryStateTest extends TestCase {
static String messageLog = "";
final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
- final private ActiveObjectFactory factory = new ActiveObjectFactory();
+
protected void setUp() {
+ se.scalablesolutions.akka.kernel.Kernel$.MODULE$.config();
conf.configureActiveObjects(
new RestartStrategy(new AllForOne(), 3, 5000),
new Component[]{
@@ -27,14 +29,14 @@ public class InMemoryStateTest extends TestCase {
}).inject().supervise();
}
- protected void tearDown() {
+ protected void tearDown() {
conf.stop();
}
public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
InMemStateful stateful = conf.getActiveObject(InMemStateful.class);
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
}
@@ -43,7 +45,7 @@ public class InMemoryStateTest extends TestCase {
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
InMemFailer failer = conf.getActiveObject(InMemFailer.class);
try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
@@ -53,7 +55,7 @@ public class InMemoryStateTest extends TestCase {
public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
InMemStateful stateful = conf.getActiveObject(InMemStateful.class);
stateful.setVectorState("init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit
assertEquals("new state", stateful.getVectorState());
}
@@ -63,7 +65,7 @@ public class InMemoryStateTest extends TestCase {
stateful.setVectorState("init"); // set init state
InMemFailer failer = conf.getActiveObject(InMemFailer.class);
try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
@@ -73,7 +75,7 @@ public class InMemoryStateTest extends TestCase {
public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
InMemStateful stateful = conf.getActiveObject(InMemStateful.class);
stateful.setRefState("init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit
assertEquals("new state", stateful.getRefState());
}
@@ -83,7 +85,7 @@ public class InMemoryStateTest extends TestCase {
stateful.setRefState("init"); // set init state
InMemFailer failer = conf.getActiveObject(InMemFailer.class);
try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
@@ -95,7 +97,7 @@ public class InMemoryStateTest extends TestCase {
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
InMemFailer failer = conf.getActiveObject(InMemFailer.class);
try {
- stateful.thisMethodHangs("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
+ stateful.thisMethodHangs("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java
index dbdce4cb83..9bc8821a98 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java
@@ -18,6 +18,7 @@ public class PersistentNestedStateTest extends TestCase {
final private ActiveObjectFactory factory = new ActiveObjectFactory();
static {
+ se.scalablesolutions.akka.kernel.Kernel$.MODULE$.config();
System.setProperty("storage-config", "config");
Kernel.startCassandra();
}
@@ -43,7 +44,7 @@ public class PersistentNestedStateTest extends TestCase {
PersistentStatefulNested nested = conf.getActiveObject(PersistentStatefulNested.class);
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
nested.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactionrequired
assertEquals("new state", nested.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
}
@@ -55,7 +56,7 @@ public class PersistentNestedStateTest extends TestCase {
nested.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
PersistentFailer failer = conf.getActiveObject(PersistentFailer.class);
try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
@@ -68,7 +69,7 @@ public class PersistentNestedStateTest extends TestCase {
stateful.setVectorState("init"); // set init state
PersistentStatefulNested nested = conf.getActiveObject(PersistentStatefulNested.class);
nested.setVectorState("init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactionrequired
assertEquals("new state", stateful.getVectorState(0));
assertEquals("new state", nested.getVectorState(0));
}
@@ -80,7 +81,7 @@ public class PersistentNestedStateTest extends TestCase {
nested.setVectorState("init"); // set init state
PersistentFailer failer = conf.getActiveObject(PersistentFailer.class);
try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
@@ -93,7 +94,7 @@ public class PersistentNestedStateTest extends TestCase {
PersistentStatefulNested nested = conf.getActiveObject(PersistentStatefulNested.class);
stateful.setRefState("init"); // set init state
nested.setRefState("init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactional
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactionrequired
assertEquals("new state", stateful.getRefState());
assertEquals("new state", nested.getRefState());
}
@@ -105,7 +106,7 @@ public class PersistentNestedStateTest extends TestCase {
nested.setRefState("init"); // set init state
PersistentFailer failer = conf.getActiveObject(PersistentFailer.class);
try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactional method
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java
index 6a83728c01..2668e980b7 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java
@@ -4,12 +4,10 @@
package se.scalablesolutions.akka.api;
-import se.scalablesolutions.akka.annotation.*;
import se.scalablesolutions.akka.kernel.config.*;
import static se.scalablesolutions.akka.kernel.config.JavaConfig.*;
+import se.scalablesolutions.akka.kernel.actor.*;
import se.scalablesolutions.akka.kernel.Kernel;
-import se.scalablesolutions.akka.kernel.state.TransactionalMap;
-import se.scalablesolutions.akka.kernel.state.CassandraPersistentTransactionalMap;
import junit.framework.TestCase;
@@ -17,6 +15,7 @@ public class PersistentStateTest extends TestCase {
static String messageLog = "";
static {
+ se.scalablesolutions.akka.kernel.Kernel$.MODULE$.config();
System.setProperty("storage-config", "config");
Kernel.startCassandra();
}
@@ -39,7 +38,7 @@ public class PersistentStateTest extends TestCase {
public void testShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class);
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
}
@@ -48,7 +47,7 @@ public class PersistentStateTest extends TestCase {
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
PersistentFailer failer = conf.getActiveObject(PersistentFailer.class);
try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
@@ -58,7 +57,7 @@ public class PersistentStateTest extends TestCase {
public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class);
stateful.setVectorState("init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
assertEquals("init", stateful.getVectorState(0));
assertEquals("new state", stateful.getVectorState(1));
}
@@ -68,7 +67,7 @@ public class PersistentStateTest extends TestCase {
stateful.setVectorState("init"); // set init state
PersistentFailer failer = conf.getActiveObject(PersistentFailer.class);
try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
@@ -78,7 +77,7 @@ public class PersistentStateTest extends TestCase {
public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class);
stateful.setRefState("init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
assertEquals("new state", stateful.getRefState());
}
@@ -87,7 +86,7 @@ public class PersistentStateTest extends TestCase {
stateful.setRefState("init"); // set init state
PersistentFailer failer = conf.getActiveObject(PersistentFailer.class);
try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java
index b3729728ef..30341581ba 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java
@@ -1,55 +1,55 @@
package se.scalablesolutions.akka.api;
+import se.scalablesolutions.akka.annotation.transactionrequired;
import se.scalablesolutions.akka.kernel.state.*;
-import se.scalablesolutions.akka.annotation.transactional;
-import se.scalablesolutions.akka.annotation.state;
+@transactionrequired
public class PersistentStateful {
private TransactionalState factory = new TransactionalState();
private TransactionalMap mapState = factory.newPersistentMap(new CassandraStorageConfig());
private TransactionalVector vectorState = factory.newPersistentVector(new CassandraStorageConfig());;
private TransactionalRef refState = factory.newPersistentRef(new CassandraStorageConfig());
- @transactional
+
public String getMapState(String key) {
return (String) mapState.get(key).get();
}
- @transactional
+
public String getVectorState(int index) {
return (String) vectorState.get(index);
}
- @transactional
+
public String getRefState() {
if (refState.isDefined()) {
return (String) refState.get().get();
} else throw new IllegalStateException("No such element");
}
- @transactional
+
public void setMapState(String key, String msg) {
mapState.put(key, msg);
}
- @transactional
+
public void setVectorState(String msg) {
vectorState.add(msg);
}
- @transactional
+
public void setRefState(String msg) {
refState.swap(msg);
}
- @transactional
+
public void success(String key, String msg) {
mapState.put(key, msg);
vectorState.add(msg);
refState.swap(msg);
}
- @transactional
+
public String failure(String key, String msg, PersistentFailer failer) {
mapState.put(key, msg);
vectorState.add(msg);
@@ -65,7 +65,7 @@ public class PersistentStateful {
nested.success(key, msg);
}
- @transactional
+
public String failure(String key, String msg, PersistentStatefulNested nested, PersistentFailer failer) {
mapState.put(key, msg);
vectorState.add(msg);
@@ -75,7 +75,7 @@ public class PersistentStateful {
}
- @transactional
+
public void thisMethodHangs(String key, String msg, PersistentFailer failer) {
setMapState(key, msg);
}
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java
index 6b8ba10863..34eeca21a3 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java
@@ -1,55 +1,55 @@
package se.scalablesolutions.akka.api;
+import se.scalablesolutions.akka.annotation.transactionrequired;
import se.scalablesolutions.akka.kernel.state.*;
-import se.scalablesolutions.akka.annotation.transactional;
-import se.scalablesolutions.akka.annotation.state;
+@transactionrequired
public class PersistentStatefulNested {
private TransactionalState factory = new TransactionalState();
private TransactionalMap mapState = factory.newPersistentMap(new CassandraStorageConfig());
private TransactionalVector vectorState = factory.newPersistentVector(new CassandraStorageConfig());;
private TransactionalRef refState = factory.newPersistentRef(new CassandraStorageConfig());
- @transactional
+
public String getMapState(String key) {
return (String) mapState.get(key).get();
}
- @transactional
+
public String getVectorState(int index) {
return (String) vectorState.get(index);
}
- @transactional
+
public String getRefState() {
if (refState.isDefined()) {
return (String) refState.get().get();
} else throw new IllegalStateException("No such element");
}
- @transactional
+
public void setMapState(String key, String msg) {
mapState.put(key, msg);
}
- @transactional
+
public void setVectorState(String msg) {
vectorState.add(msg);
}
- @transactional
+
public void setRefState(String msg) {
refState.swap(msg);
}
- @transactional
+
public void success(String key, String msg) {
mapState.put(key, msg);
vectorState.add(msg);
refState.swap(msg);
}
- @transactional
+
public String failure(String key, String msg, PersistentFailer failer) {
mapState.put(key, msg);
vectorState.add(msg);
@@ -58,7 +58,7 @@ public class PersistentStatefulNested {
return msg;
}
- @transactional
+
public void thisMethodHangs(String key, String msg, PersistentFailer failer) {
setMapState(key, msg);
}
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java
index 9057bf169b..3975c0366e 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java
@@ -6,7 +6,6 @@ package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.kernel.actor.ActiveObjectFactory;
import se.scalablesolutions.akka.kernel.config.ActiveObjectGuiceConfiguratorForJava;
-import static se.scalablesolutions.akka.kernel.config.JavaConfig.*;
import se.scalablesolutions.akka.kernel.nio.RemoteServer;
import junit.framework.TestCase;
@@ -21,6 +20,7 @@ public class RemoteInMemoryStateTest extends TestCase {
}
}).start();
try { Thread.currentThread().sleep(1000); } catch (Exception e) {}
+ se.scalablesolutions.akka.kernel.Kernel$.MODULE$.config();
}
final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
final private ActiveObjectFactory factory = new ActiveObjectFactory();
@@ -32,7 +32,7 @@ public class RemoteInMemoryStateTest extends TestCase {
public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
}
@@ -41,7 +41,7 @@ public class RemoteInMemoryStateTest extends TestCase {
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
InMemFailer failer = factory.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getActiveObject(InMemFailer.class);
try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
@@ -51,7 +51,7 @@ public class RemoteInMemoryStateTest extends TestCase {
public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
stateful.setVectorState("init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit
assertEquals("new state", stateful.getVectorState());
}
@@ -61,7 +61,7 @@ public class RemoteInMemoryStateTest extends TestCase {
stateful.setVectorState("init"); // set init state
InMemFailer failer = factory.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getActiveObject(InMemFailer.class);
try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
@@ -71,7 +71,7 @@ public class RemoteInMemoryStateTest extends TestCase {
public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
stateful.setRefState("init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit
assertEquals("new state", stateful.getRefState());
}
@@ -81,7 +81,7 @@ public class RemoteInMemoryStateTest extends TestCase {
stateful.setRefState("init"); // set init state
InMemFailer failer = factory.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getActiveObject(InMemFailer.class);
try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
@@ -93,7 +93,7 @@ public class RemoteInMemoryStateTest extends TestCase {
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
InMemFailer failer = conf.getActiveObject(InMemFailer.class);
try {
- stateful.thisMethodHangs("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
+ stateful.thisMethodHangs("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java
index 8708fc5b36..489cb7ef43 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java
@@ -4,18 +4,18 @@
package se.scalablesolutions.akka.api;
-import se.scalablesolutions.akka.kernel.actor.ActiveObjectFactory;
-import se.scalablesolutions.akka.kernel.config.ActiveObjectGuiceConfiguratorForJava;
+import se.scalablesolutions.akka.kernel.config.*;
import static se.scalablesolutions.akka.kernel.config.JavaConfig.*;
-import se.scalablesolutions.akka.kernel.nio.RemoteServer;
-
+import se.scalablesolutions.akka.kernel.actor.*;
import se.scalablesolutions.akka.kernel.Kernel;
+
import junit.framework.TestCase;
public class RemotePersistentStateTest extends TestCase {
static String messageLog = "";
static {
+ se.scalablesolutions.akka.kernel.Kernel$.MODULE$.config();
System.setProperty("storage-config", "config");
Kernel.startCassandra();
Kernel.startRemoteService();
@@ -40,7 +40,7 @@ public class RemotePersistentStateTest extends TestCase {
public void testShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class);
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
}
@@ -49,7 +49,7 @@ public class RemotePersistentStateTest extends TestCase {
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
PersistentFailer failer = conf.getActiveObject(PersistentFailer.class);
try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
@@ -59,7 +59,7 @@ public class RemotePersistentStateTest extends TestCase {
public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class);
stateful.setVectorState("init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
assertEquals("init", stateful.getVectorState(0));
assertEquals("new state", stateful.getVectorState(1));
}
@@ -69,7 +69,7 @@ public class RemotePersistentStateTest extends TestCase {
stateful.setVectorState("init"); // set init state
PersistentFailer failer = conf.getActiveObject(PersistentFailer.class);
try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
@@ -79,7 +79,7 @@ public class RemotePersistentStateTest extends TestCase {
public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class);
stateful.setRefState("init"); // set init state
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
+ stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
assertEquals("new state", stateful.getRefState());
}
@@ -88,7 +88,7 @@ public class RemotePersistentStateTest extends TestCase {
stateful.setRefState("init"); // set init state
PersistentFailer failer = conf.getActiveObject(PersistentFailer.class);
try {
- stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
+ stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java
index 65bf6ed5e6..a8a8c18cf2 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java
@@ -6,6 +6,7 @@ package se.scalablesolutions.akka.api;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.container.grizzly.GrizzlyWebContainerFactory;
import com.sun.grizzly.http.SelectorThread;
import com.sun.grizzly.http.servlet.ServletAdapter;
import com.sun.grizzly.tcp.Adapter;
@@ -20,9 +21,11 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.net.URI;
+import java.util.Map;
+import java.util.HashMap;
-import se.scalablesolutions.akka.kernel.config.ActiveObjectGuiceConfiguratorForJava;
-import se.scalablesolutions.akka.kernel.config.JavaConfig;
+import se.scalablesolutions.akka.kernel.config.*;
+import static se.scalablesolutions.akka.kernel.config.JavaConfig.*;
public class RestTest extends TestSuite {
@@ -33,31 +36,21 @@ public class RestTest extends TestSuite {
@BeforeClass
public static void initialize() throws IOException {
+ se.scalablesolutions.akka.kernel.Kernel$.MODULE$.config();
conf.configureActiveObjects(
- new JavaConfig.RestartStrategy(new JavaConfig.AllForOne(), 3, 5000),
- new JavaConfig.Component[] {
- new JavaConfig.Component(
+ new RestartStrategy(new AllForOne(), 3, 5000),
+ new Component[] {
+ new Component(
JerseyFoo.class,
- new JavaConfig.LifeCycle(new JavaConfig.Permanent(), 1000), 10000000)
+ new LifeCycle(new Permanent(), 1000),
+ 10000000)
}).inject().supervise();
selector = startJersey();
}
- @AfterClass
- public static void cleanup() throws IOException {
- conf.stop();
- selector.stopEndpoint();
- System.exit(0);
- }
-
@Test
- public void dummy() {
- assertTrue(true);
- }
-
- //@Test
public void simpleRequest() throws IOException, InstantiationException {
- selector.start();
+ //selector.start();
Client client = Client.create();
WebResource webResource = client.resource(URI);
String responseMsg = webResource.path("/foo").get(String.class);
diff --git a/kernel/akka-kernel.iml b/kernel/akka-kernel.iml
index 2817cfa09d..dc29dd99c4 100644
--- a/kernel/akka-kernel.iml
+++ b/kernel/akka-kernel.iml
@@ -25,6 +25,7 @@
+
@@ -52,6 +53,13 @@
+
+
+
+
+
+
+
@@ -75,12 +83,6 @@
-
-
-
-
-
-
diff --git a/kernel/pom.xml b/kernel/pom.xml
index 17c49efd7f..78a63ffd3c 100644
--- a/kernel/pom.xml
+++ b/kernel/pom.xml
@@ -1,286 +1,288 @@
- 4.0.0
+ 4.0.0
- akka-kernel
- Akka Kernel Module
+ akka-kernel
+ Akka Kernel Module
- jar
+ jar
-
- akka
- ${akka.groupId}
- ${akka.version}
-
+
+ akka
+ ${akka.groupId}
+ ${akka.version}
+
-
-
- ${akka.groupId}
- akka-util-java
- ${akka.version}
-
-
- org.scala-lang
- scala-library
- ${scala.version}
-
-
- org.codehaus.aspectwerkz
- aspectwerkz-nodeps-jdk5
- 2.1
-
-
- net.lag
- configgy
- 1.3
-
-
- org.guiceyfruit
- guiceyfruit-core
- 2.0
-
-
- org.guiceyfruit
- guice-core
- 2.0-beta-4
-
-
- org.guiceyfruit
- guice-jsr250
- 2.0-beta-4
-
-
- org.apache.camel
- camel-core
- 2.0-SNAPSHOT
-
-
- org.jboss.netty
- netty
- 3.1.0.CR1
-
+
+
+
+ ${akka.groupId}
+ akka-util-java
+ ${akka.version}
+
+
+ org.scala-lang
+ scala-library
+ ${scala.version}
+
+
+ org.codehaus.aspectwerkz
+ aspectwerkz-nodeps-jdk5
+ 2.1
+
+
+ net.lag
+ configgy
+ 1.3
+
+
+ org.guiceyfruit
+ guiceyfruit-core
+ 2.0
+
+
+ org.guiceyfruit
+ guice-core
+ 2.0-beta-4
+
+
+ org.guiceyfruit
+ guice-jsr250
+ 2.0-beta-4
+
+
+ org.apache.camel
+ camel-core
+ 2.0-SNAPSHOT
+
+
+ org.jboss.netty
+ netty
+ 3.1.0.CR1
+
+
+ org.apache
+ zookeeper
+ 3.1.0
+
-
-
- com.sun.grizzly
- grizzly-servlet-webserver
- 1.9.9
-
-
- com.sun.jersey
- jersey-server
- 1.0.3
-
-
- com.sun.jersey
- jersey-json
- 1.0.3
-
-
- com.sun.jersey
- jersey-client
- 1.1.0-ea
-
-
- com.sun.jersey
- jersey-atom
- 1.0.3
-
-
- com.sun.jersey.contribs
- jersey-multipart
- 1.0.3
-
-
- com.sun.jersey.contribs
- jersey-guice
- 1.0.3
-
-
- javax.ws.rs
- jsr311-api
- 1.0
-
-
-
-
- org.apache.cassandra
- cassandra
- 0.3.0-dev
-
-
- com.facebook
- thrift
- 1.0
-
-
- com.facebook
- fb303
- 1.0
-
-
- org.apache.commons
- commons-collections
- 3.2.1
-
-
- high-scale-lib
- high-scale-lib
- 1.0
-
-
- org.apache.commons
- commons-lang
- 2.4
-
+
+
+ org.apache.cassandra
+ cassandra
+ 0.4.0-dev
+
+
+ com.facebook
+ thrift
+ 1.0
+
+
+ com.facebook
+ fb303
+ 1.0
+
+
+ org.apache.commons
+ commons-collections
+ 3.2.1
+
+
+ high-scale-lib
+ high-scale-lib
+ 1.0
+
+
+ org.apache.commons
+ commons-lang
+ 2.4
+
-
-
+
+
+ com.sun.grizzly
+ grizzly-servlet-webserver
+ 1.9.9
+
+
+ com.sun.jersey
+ jersey-server
+ 1.0.3
+
+
+ com.sun.jersey
+ jersey-json
+ 1.0.3
+
+
+ com.sun.jersey
+ jersey-client
+ 1.1.0-ea
+
+
+ com.sun.jersey
+ jersey-atom
+ 1.0.3
+
+
+ com.sun.jersey.contribs
+ jersey-multipart
+ 1.0.3
+
+
+ com.sun.jersey.contribs
+ jersey-guice
+ 1.0.3
+
+
+ javax.ws.rs
+ jsr311-api
+ 1.0
+
+
-
-
- org.slf4j
- slf4j-log4j12
- 1.4.3
-
-
- org.slf4j
- slf4j-api
- 1.4.3
-
-
- log4j
- log4j
- 1.2.13
-
-
-
-
- org.scala-tools.testing
- scalatest
- 0.9.5
- test
-
-
- com.jteigen.scalatest
- junit4runner
- 1.0
- test
-
-
- junit
- junit
- 4.5
- test
-
-
+
+
+ org.slf4j
+ slf4j-log4j12
+ 1.4.3
+
+
+ org.slf4j
+ slf4j-api
+ 1.4.3
+
+
+ log4j
+ log4j
+ 1.2.13
+
-
- src/main/scala
- src/test/scala
-
-
- org.scala-tools
- maven-scala-plugin
-
-
-
- compile
- testCompile
-
-
-
-
-
- -target:jvm-1.5
- -unchecked
-
- ${scala.version}
- 1.1
-
-
-
- org.apache.maven.plugins
- maven-eclipse-plugin
-
- true
-
-
- ch.epfl.lamp.sdt.core.scalabuilder
-
-
-
-
- ch.epfl.lamp.sdt.core.scalanature
-
-
-
-
- org.eclipse.jdt.launching.JRE_CONTAINER
-
-
- ch.epfl.lamp.sdt.launching.SCALA_CONTAINER
-
-
-
-
-
-
-
-
- false
- src/main/resources
-
-
- false
- src/main/scala
-
- **
-
-
- **/*.scala
-
-
-
-
-
-
-
- org.scala-tools
- maven-scala-plugin
-
- 1.1
- ${scala.version}
-
-
-
-
+
+
+
+
+ org.scala-tools.testing
+ scalatest
+ 0.9.5
+ test
+
+
+ com.jteigen.scalatest
+ junit4runner
+ 1.0
+ test
+
+
+ junit
+ junit
+ 4.5
+ test
+
+
+
+
+ src/main/scala
+ src/test/scala
+
+
+ org.scala-tools
+ maven-scala-plugin
+
+
+
+ compile
+ testCompile
+
+
+
+
+
+ -target:jvm-1.5
+ -unchecked
+
+ ${scala.version}
+ 1.1
+
+
+
+ org.apache.maven.plugins
+ maven-eclipse-plugin
+
+ true
+
+
+ ch.epfl.lamp.sdt.core.scalabuilder
+
+
+
+
+ ch.epfl.lamp.sdt.core.scalanature
+
+
+
+
+ org.eclipse.jdt.launching.JRE_CONTAINER
+
+
+ ch.epfl.lamp.sdt.launching.SCALA_CONTAINER
+
+
+
+
+
+
+
+
+ false
+ src/main/resources
+
+
+ false
+ src/main/scala
+
+ **
+
+
+ **/*.scala
+
+
+
+
+
+
+
+ org.scala-tools
+ maven-scala-plugin
+
+ 1.1
+ ${scala.version}
+
+
+
+
diff --git a/kernel/src/main/scala/Boot.scala b/kernel/src/main/scala/Boot.scala
index 4ea23287a0..e4cbc984a0 100644
--- a/kernel/src/main/scala/Boot.scala
+++ b/kernel/src/main/scala/Boot.scala
@@ -5,8 +5,7 @@
package se.scalablesolutions.akka
import java.io.File
-import java.lang.reflect.Method
-import java.net.{URL, URLClassLoader}
+import java.net.URLClassLoader
import kernel.util.Logging
/**
diff --git a/kernel/src/main/scala/Kernel.scala b/kernel/src/main/scala/Kernel.scala
index 53cd2b4eff..3920d8c65a 100644
--- a/kernel/src/main/scala/Kernel.scala
+++ b/kernel/src/main/scala/Kernel.scala
@@ -4,127 +4,160 @@
package se.scalablesolutions.akka.kernel
-//import org.apache.zookeeper.jmx.ManagedUtil
-//import org.apache.zookeeper.server.persistence.FileTxnSnapLog
-//import org.apache.zookeeper.server.ServerConfig
-//import org.apache.zookeeper.server.NIOServerCnxn
-
-//import voldemort.client.{SocketStoreClientFactory, StoreClient, StoreClientFactory}
-//import voldemort.server.{VoldemortConfig, VoldemortServer}
-//import voldemort.versioning.Versioned
-
import com.sun.grizzly.http.SelectorThread
-import com.sun.jersey.api.container.grizzly.GrizzlyWebContainerFactory
-
-import java.io.IOException
-import java.net.URI
-import java.util.{Map, HashMap}
-import java.io.{File, IOException}
+import com.sun.grizzly.http.servlet.ServletAdapter
+import com.sun.grizzly.standalone.StaticStreamAlgorithm
import javax.ws.rs.core.UriBuilder
-import javax.management.JMException
-import kernel.nio.{RemoteClient, RemoteServer}
-import kernel.state.CassandraNode
+
+import net.lag.configgy.{Config, Configgy, RuntimeEnvironment}
+
+import kernel.jersey.AkkaServlet
+import kernel.nio.RemoteServer
+import kernel.state.CassandraStorage
import kernel.util.Logging
/**
* @author Jonas Bonér
*/
object Kernel extends Logging {
+ val config = setupConfig
+
+ val RUN_REST_SERVICE = config.getBool("akka.rest.service", true)
+ val RUN_REMOTE_SERVICE = config.getBool("akka.remote.service", true)
+ val STORAGE_SYSTEM = config.getString("akka.storage.system", "cassandra")
- val SERVER_URL = "localhost"
- /*
- private[this] var storageFactory: StoreClientFactory = _
- private[this] var storageServer: VoldemortServer = _
- */
-
- private[this] var remoteServer: RemoteServer = _
+ // FIXME add API to shut server down gracefully
+ private var remoteServer: RemoteServer = _
+ private var jerseySelectorThread: SelectorThread = _
+ private val startTime = System.currentTimeMillis
def main(args: Array[String]): Unit = {
+ printBanner
log.info("Starting Akka kernel...")
- startRemoteService
- startCassandra
- //cassandraBenchmark
-
- //startJersey
+ if (RUN_REMOTE_SERVICE) startRemoteService
+ STORAGE_SYSTEM match {
+ case "cassandra" => startCassandra
+ case "terracotta" => throw new UnsupportedOperationException("terracotta storage backend is not yet supported")
+ case "redis" => throw new UnsupportedOperationException("redis storage backend is not yet supported")
+ case "voldemort" => throw new UnsupportedOperationException("voldemort storage backend is not yet supported")
+ case "tokyo-cabinet" => throw new UnsupportedOperationException("tokyo-cabinet storage backend is not yet supported")
+ case "tokyo-tyrant" => throw new UnsupportedOperationException("tokyo-tyrart storage backend is not yet supported")
+ case "hazelcast" => throw new UnsupportedOperationException("hazelcast storage backend is not yet supported")
+ }
+ if (RUN_REST_SERVICE) startJersey
+
//startZooKeeper
//startVoldemort
+ //cassandraBenchmark
log.info("Akka kernel started successfully")
}
+ def uptime = (System.currentTimeMillis - startTime) / 1000
+
+ def setupConfig: Config = {
+ try {
+ Configgy.configure(akka.Boot.CONFIG + "/akka.conf")
+ val runtime = new RuntimeEnvironment(getClass)
+ //runtime.load(args)
+ val config = Configgy.config
+ config.registerWithJmx("com.scalablesolutions.akka.config")
+ // config.subscribe { c => configure(c.getOrElse(new Config)) }
+ config
+ } catch {
+ case e: net.lag.configgy.ParseException => throw new Error("Could not retreive the akka.conf config file. Make sure you have set the AKKA_HOME environment variable to the root of the distribution.")
+ }
+ }
private[akka] def startRemoteService = {
// FIXME manage remote serve thread for graceful shutdown
val remoteServerThread = new Thread(new Runnable() {
- def run = {
- RemoteServer.start
- }
- })
+ def run = RemoteServer.start
+ }, "akka remote service")
remoteServerThread.start
- Thread.sleep(1000) // wait for server to start up
}
- private[akka] def startCassandra = {
- CassandraNode.start
+ private[akka] def startCassandra = if (kernel.Kernel.config.getBool("akka.storage.cassandra.service", true)) {
+ System.setProperty("cassandra", "")
+ System.setProperty("storage-config", akka.Boot.CONFIG + "/")
+ CassandraStorage.start
}
private[akka] def startJersey = {
- val JERSEY_SERVER_URL = "http://" + SERVER_URL + "/"
- val JERSEY_SERVER_PORT = 9998
- val JERSEY_REST_CLASSES_ROOT_PACKAGE = "se.scalablesolutions.akka.kernel"
- val JERSEY_BASE_URI = UriBuilder.fromUri(JERSEY_SERVER_URL).port(getPort(JERSEY_SERVER_PORT)).build()
- val initParams = new java.util.HashMap[String, String]
- initParams.put("com.sun.jersey.config.property.packages", JERSEY_REST_CLASSES_ROOT_PACKAGE)
- val threadSelector = GrizzlyWebContainerFactory.create(JERSEY_BASE_URI, initParams)
- // TODO: handle shutdown of Jersey in separate thread
- // TODO: spawn main in new thread an communicate using socket
- System.in.read
- threadSelector.stopEndpoint
+ val JERSEY_HOSTNAME = kernel.Kernel.config.getString("akka.rest.hostname", "localhost")
+ val JERSEY_URL = "http://" + JERSEY_HOSTNAME + "/"
+ val JERSEY_PORT = kernel.Kernel.config.getInt("akka.rest.port", 9998)
+
+ val uri = UriBuilder.fromUri(JERSEY_URL).port(JERSEY_PORT).build()
+ val adapter = new ServletAdapter
+ val servlet = classOf[AkkaServlet].newInstance
+ adapter.setServletInstance(servlet)
+ adapter.setContextPath(uri.getPath)
+
+ val scheme = uri.getScheme
+ if (!scheme.equalsIgnoreCase("http")) throw new IllegalArgumentException("The URI scheme, of the URI " + JERSEY_URL + ", must be equal (ignoring case) to 'http'")
+
+ jerseySelectorThread = new SelectorThread
+ jerseySelectorThread.setAlgorithmClassName(classOf[StaticStreamAlgorithm].getName)
+ jerseySelectorThread.setPort(JERSEY_PORT)
+ jerseySelectorThread.setAdapter(adapter)
+ jerseySelectorThread.listen
+ log.info("REST service started successfully. Listening to port [" + JERSEY_PORT + "]")
}
+ private def printBanner = {
+ log.info(
+"""==============================
+ __ __
+ _____ | | _| | _______
+ \__ \ | |/ / |/ /\__ \
+ / __ \| <| < / __ \_
+ (____ /__|_ \__|_ \(____ /
+ \/ \/ \/ \/
+""")
+ log.info(" Running version " + kernel.Kernel.config.getString("akka.version", "awesome"))
+ log.info("==============================")
+ }
+
private def cassandraBenchmark = {
val NR_ENTRIES = 100000
-
+
println("=================================================")
var start = System.currentTimeMillis
- for (i <- 1 to NR_ENTRIES) CassandraNode.insertMapStorageEntryFor("test", i.toString, "data")
+ for (i <- 1 to NR_ENTRIES) CassandraStorage.insertMapStorageEntryFor("test", i.toString, "data")
var end = System.currentTimeMillis
println("Writes per second: " + NR_ENTRIES / ((end - start).toDouble / 1000))
- /*
-FIXME: batch_insert fails with the following exception:
-
-ERROR - Exception was generated at : 04/27/2009 15:26:35 on thread main
-[B cannot be cast to org.apache.cassandra.db.WriteResponse
-java.lang.ClassCastException: [B cannot be cast to org.apache.cassandra.db.WriteResponse
- at org.apache.cassandra.service.WriteResponseResolver.resolve(WriteResponseResolver.java:50)
- at org.apache.cassandra.service.WriteResponseResolver.resolve(WriteResponseResolver.java:31)
- at org.apache.cassandra.service.QuorumResponseHandler.get(QuorumResponseHandler.java:101)
- at org.apache.cassandra.service.StorageProxy.insertBlocking(StorageProxy.java:135)
- at org.apache.cassandra.service.CassandraServer.batch_insert_blocking(CassandraServer.java:489)
- at se.scalablesolutions.akka.kernel.CassandraNode$.insertHashEntries(CassandraNode.scala:59)
- at se.scalablesolutions.akka.kernel.Kernel$.cassandraBenchmark(Kernel.scala:91)
- at se.scalablesolutions.akka.kernel.Kernel$.main(Kernel.scala:52)
- at se.scalablesolutions.akka.kernel.Kernel.main(Kernel.scala)
-
- println("=================================================")
- var start = System.currentTimeMillis
- println(start)
- val entries = new scala.collection.mutable.ArrayBuffer[Tuple2[String, String]]
- for (i <- 1 to NR_ENTRIES) entries += (i.toString, "data")
- CassandraNode.insertHashEntries("test", entries.toList)
- var end = System.currentTimeMillis
- println("Writes per second - batch: " + NR_ENTRIES / ((end - start).toDouble / 1000))
- */
println("=================================================")
start = System.currentTimeMillis
- for (i <- 1 to NR_ENTRIES) CassandraNode.getMapStorageEntryFor("test", i.toString)
+ val entries = new scala.collection.mutable.ArrayBuffer[Tuple2[String, String]]
+ for (i <- 1 to NR_ENTRIES) entries += (i.toString, "data")
+ CassandraStorage.insertMapStorageEntriesFor("test", entries.toList)
+ end = System.currentTimeMillis
+ println("Writes per second - batch: " + NR_ENTRIES / ((end - start).toDouble / 1000))
+
+ println("=================================================")
+ start = System.currentTimeMillis
+ for (i <- 1 to NR_ENTRIES) CassandraStorage.getMapStorageEntryFor("test", i.toString)
end = System.currentTimeMillis
println("Reads per second: " + NR_ENTRIES / ((end - start).toDouble / 1000))
System.exit(0)
}
+}
+
+
+
+/*
+ //import voldemort.client.{SocketStoreClientFactory, StoreClient, StoreClientFactory}
+ //import voldemort.server.{VoldemortConfig, VoldemortServer}
+ //import voldemort.versioning.Versioned
+
+ private[this] var storageFactory: StoreClientFactory = _
+ private[this] var storageServer: VoldemortServer = _
+ */
+
// private[akka] def startVoldemort = {
// val VOLDEMORT_SERVER_URL = "tcp://" + SERVER_URL
// val VOLDEMORT_SERVER_PORT = 6666
@@ -159,7 +192,11 @@ java.lang.ClassCastException: [B cannot be cast to org.apache.cassandra.db.Write
// private[akka] def getStorageFor(storageName: String): StoreClient[String, String] =
// storageFactory.getStoreClient(storageName)
- // private[akka] def startZooKeeper = {
+ // private[akka] def startZooKeeper = {
+ //import org.apache.zookeeper.jmx.ManagedUtil
+ //import org.apache.zookeeper.server.persistence.FileTxnSnapLog
+ //import org.apache.zookeeper.server.ServerConfig
+ //import org.apache.zookeeper.server.NIOServerCnxn
// val ZOO_KEEPER_SERVER_URL = SERVER_URL
// val ZOO_KEEPER_SERVER_PORT = 9898
// try {
@@ -191,22 +228,3 @@ java.lang.ClassCastException: [B cannot be cast to org.apache.cassandra.db.Write
// // if (zooKeeper.isRunning) zooKeeper.shutdown
// } catch { case e => log.fatal("Unexpected exception: s%",e) }
// }
-
- private def getPort(defaultPort: Int) = {
- val port = System.getenv("JERSEY_HTTP_PORT")
- if (null != port) Integer.parseInt(port)
- else defaultPort;
- }
-}
-
-//import javax.ws.rs.{Produces, Path, GET}
-// @GET
-// @Produces("application/json")
-// @Path("/network/{id: [0-9]+}/{nid}")
-// def getUserByNetworkId(@PathParam {val value = "id"} id: Int, @PathParam {val value = "nid"} networkId: String): User = {
-// val q = em.createQuery("SELECT u FROM User u WHERE u.networkId = :id AND u.networkUserId = :nid")
-// q.setParameter("id", id)
-// q.setParameter("nid", networkId)
-// q.getSingleResult.asInstanceOf[User]
-// }
-
diff --git a/kernel/src/main/scala/actor/ActiveObject.scala b/kernel/src/main/scala/actor/ActiveObject.scala
index 05b993dfe4..ad833877d9 100644
--- a/kernel/src/main/scala/actor/ActiveObject.scala
+++ b/kernel/src/main/scala/actor/ActiveObject.scala
@@ -20,11 +20,11 @@ class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectEx
object Annotations {
import se.scalablesolutions.akka.annotation._
- val transactional = classOf[transactional]
- val oneway = classOf[oneway]
- val immutable = classOf[immutable]
- val prerestart = classOf[prerestart]
- val postrestart = classOf[postrestart]
+ val oneway = classOf[oneway]
+ val transactionrequired = classOf[transactionrequired]
+ val prerestart = classOf[prerestart]
+ val postrestart = classOf[postrestart]
+ val immutable = classOf[immutable]
}
/**
@@ -259,14 +259,14 @@ sealed class ActorAroundAdvice(val target: Class[_],
*/
private[kernel] class Dispatcher extends Actor {
private val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]()
-
- makeTransactional
private[actor] var target: Option[AnyRef] = None
private var preRestart: Option[Method] = None
private var postRestart: Option[Method] = None
private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef) = {
+ if (targetClass.isAnnotationPresent(Annotations.transactionrequired)) makeTransactional
+
id = targetClass.getName
target = Some(targetInstance)
val methods = targetInstance.getClass.getDeclaredMethods.toList
diff --git a/kernel/src/main/scala/actor/Actor.scala b/kernel/src/main/scala/actor/Actor.scala
index 7c1acd8129..b96dc2e3a9 100644
--- a/kernel/src/main/scala/actor/Actor.scala
+++ b/kernel/src/main/scala/actor/Actor.scala
@@ -34,6 +34,10 @@ class ActorMessageHandler(val actor: Actor) extends MessageHandler {
def handle(handle: MessageHandle) = actor.handle(handle)
}
+object Actor {
+ val timeout = kernel.Kernel.config.getInt("akka.actor.timeout", 5000)
+}
+
trait Actor extends Logging with TransactionManagement {
@volatile private[this] var isRunning: Boolean = false
private[this] val remoteFlagLock = new ReadWriteLock
@@ -59,7 +63,7 @@ trait Actor extends Logging with TransactionManagement {
*
* Defines the default timeout for '!!' invocations, e.g. the timeout for the future returned by the call to '!!'.
*/
- @volatile var timeout: Long = 5000L
+ @volatile var timeout: Long = Actor.timeout
/**
* User overridable callback/setting.
@@ -397,8 +401,7 @@ trait Actor extends Logging with TransactionManagement {
val future = RemoteClient.clientFor(remoteAddress.get).send(new RemoteRequest(true, message, null, this.getClass.getName, timeout, null, false, false, supervisorUuid))
if (future.isDefined) future.get
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
- }
- else {
+ } else {
val future = new DefaultCompletableFutureResult(timeout)
mailbox.append(new MessageHandle(this, message, Some(future), TransactionManagement.threadBoundTx.get))
future
@@ -526,5 +529,12 @@ trait Actor extends Logging with TransactionManagement {
} else None
}
+
+ private[kernel] def swapDispatcher(disp: MessageDispatcher) = {
+ dispatcher = disp
+ mailbox = dispatcher.messageQueue
+ dispatcher.registerHandler(this, new ActorMessageHandler(this))
+ }
+
override def toString(): String = "Actor[" + id + "]"
}
diff --git a/kernel/src/main/scala/config/ActiveObjectConfigurator.scala b/kernel/src/main/scala/config/ActiveObjectConfigurator.scala
index 30e0a4ad44..6f5203574c 100644
--- a/kernel/src/main/scala/config/ActiveObjectConfigurator.scala
+++ b/kernel/src/main/scala/config/ActiveObjectConfigurator.scala
@@ -4,25 +4,13 @@
package se.scalablesolutions.akka.kernel.config
-import ScalaConfig.{RestartStrategy, Component}
import javax.servlet.ServletContext
+
+import scala.collection.mutable.HashSet
+
+import ScalaConfig.{RestartStrategy, Component}
import kernel.util.Logging
-object ActiveObjectConfigurator extends Logging {
-
- private var configuration: ActiveObjectConfigurator = _
-
- // FIXME: cheating with only having one single, scope per ServletContext
- def registerConfigurator(conf: ActiveObjectConfigurator) = {
- configuration = conf
- }
-
- def getConfiguratorFor(ctx: ServletContext): ActiveObjectConfigurator = {
- configuration
- //configurations.getOrElse(ctx, throw new IllegalArgumentException("No configuration for servlet context [" + ctx + "]"))
- }
-}
-
trait ActiveObjectConfigurator {
/**
* Returns the active abject that has been put under supervision for the class specified.
@@ -32,6 +20,8 @@ trait ActiveObjectConfigurator {
*/
def getActiveObject[T](clazz: Class[T]): T
+ def isActiveObjectDefined[T](clazz: Class[T]): Boolean
+
def getExternalDependency[T](clazz: Class[T]): T
def getComponentInterfaces: List[Class[_]]
@@ -46,3 +36,29 @@ trait ActiveObjectConfigurator {
def stop
}
+
+object ActiveObjectConfigurator extends Logging {
+
+ private val configuration = new HashSet[ActiveObjectConfigurator]
+
+ // FIXME: cheating with only having one single, scope per ServletContext
+ def registerConfigurator(conf: ActiveObjectConfigurator) = synchronized {
+ configuration + conf
+ }
+
+ def getConfiguratorsFor(ctx: ServletContext): List[ActiveObjectConfigurator] = synchronized {
+ configuration.toList
+ //configurations.getOrElse(ctx, throw new IllegalArgumentException("No configuration for servlet context [" + ctx + "]"))
+ }
+}
+
+class ActiveObjectConfiguratorRepository extends Logging {
+ def registerConfigurator(conf: ActiveObjectConfigurator) = {
+ ActiveObjectConfigurator.registerConfigurator(conf)
+ }
+
+ def getConfiguratorsFor(ctx: ServletContext): List[ActiveObjectConfigurator] = {
+ ActiveObjectConfigurator.getConfiguratorsFor(ctx)
+ }
+}
+
diff --git a/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
index 209a3a529a..d8068975ab 100644
--- a/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
+++ b/kernel/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
@@ -52,6 +52,10 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
proxy.asInstanceOf[T]
}
+ override def isActiveObjectDefined[T](clazz: Class[T]): Boolean = synchronized {
+ activeObjectRegistry.get(clazz).isDefined
+ }
+
override def getExternalDependency[T](clazz: Class[T]): T = synchronized {
injector.getInstance(clazz).asInstanceOf[T]
}
@@ -95,8 +99,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
private def newSubclassingProxy(component: Component): DependencyBinding = {
val targetClass = component.target
val actor = new Dispatcher
- actor.start
- if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get
+ if (component.dispatcher.isDefined) actor.swapDispatcher(component.dispatcher.get)
val remoteAddress =
if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
@@ -111,8 +114,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry
component.target.getConstructor(Array[Class[_]]()).setAccessible(true)
val actor = new Dispatcher
- actor.start
- if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get
+ if (component.dispatcher.isDefined) actor.swapDispatcher(component.dispatcher.get)
val remoteAddress =
if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
diff --git a/kernel/src/main/scala/config/ActiveObjectGuiceConfiguratorForJava.scala b/kernel/src/main/scala/config/ActiveObjectGuiceConfiguratorForJava.scala
index 382c54da7a..4d7b79e380 100644
--- a/kernel/src/main/scala/config/ActiveObjectGuiceConfiguratorForJava.scala
+++ b/kernel/src/main/scala/config/ActiveObjectGuiceConfiguratorForJava.scala
@@ -17,7 +17,9 @@ import org.apache.camel.{Endpoint, Routes}
* @author Jonas Bonér
*/
class ActiveObjectGuiceConfiguratorForJava {
- val INSTANCE = new ActiveObjectGuiceConfigurator
+ private val INSTANCE = new ActiveObjectGuiceConfigurator
+
+ def getInstance = INSTANCE
/**
* Returns the active abject that has been put under supervision for the class specified.
diff --git a/kernel/src/main/scala/jersey/ActiveObjectComponentProvider.scala b/kernel/src/main/scala/jersey/ActiveObjectComponentProvider.scala
index 617f0b10af..acf0ec06c8 100644
--- a/kernel/src/main/scala/jersey/ActiveObjectComponentProvider.scala
+++ b/kernel/src/main/scala/jersey/ActiveObjectComponentProvider.scala
@@ -8,10 +8,19 @@ import com.sun.jersey.core.spi.component.ioc.IoCFullyManagedComponentProvider
import kernel.config.ActiveObjectConfigurator
import kernel.util.Logging
-import java.lang.reflect.{Constructor, InvocationTargetException}
-class ActiveObjectComponentProvider(val clazz: Class[_], val configurator: ActiveObjectConfigurator)
+class ActiveObjectComponentProvider(val clazz: Class[_], val configurators: List[ActiveObjectConfigurator])
extends IoCFullyManagedComponentProvider with Logging {
- override def getInstance: AnyRef = configurator.getActiveObject(clazz).asInstanceOf[AnyRef]
+ override def getInstance: AnyRef = {
+ val instances = for {
+ conf <- configurators
+ if conf.isActiveObjectDefined(clazz)
+ } yield conf.getActiveObject(clazz).asInstanceOf[AnyRef]
+ instances match {
+ case instance :: Nil => instance
+ case Nil => throw new IllegalArgumentException("No Active Object for class [" + clazz + "] could be found. Make sure you have defined and configured the class as an Active Object in a ActiveObjectConfigurator")
+ case _ => throw new IllegalArgumentException("Active Object for class [" + clazz + "] is defined in more than one ActiveObjectConfigurator. Eliminate the redundancy.")
+ }
+ }
}
\ No newline at end of file
diff --git a/kernel/src/main/scala/jersey/ActiveObjectComponentProviderFactory.scala b/kernel/src/main/scala/jersey/ActiveObjectComponentProviderFactory.scala
index a97f42a123..332902cbf0 100644
--- a/kernel/src/main/scala/jersey/ActiveObjectComponentProviderFactory.scala
+++ b/kernel/src/main/scala/jersey/ActiveObjectComponentProviderFactory.scala
@@ -4,16 +4,16 @@
package se.scalablesolutions.akka.kernel.jersey
-import com.sun.jersey.core.spi.component.ioc.{IoCComponentProvider, IoCComponentProviderFactory}
-import com.sun.jersey.core.spi.component.{ComponentContext, ComponentProviderFactory}
+import com.sun.jersey.core.spi.component.ioc.IoCComponentProviderFactory
+import com.sun.jersey.core.spi.component.ComponentContext
import config.ActiveObjectConfigurator
-class ActiveObjectComponentProviderFactory(val configurator: ActiveObjectConfigurator)
+class ActiveObjectComponentProviderFactory(val configurators: List[ActiveObjectConfigurator])
extends IoCComponentProviderFactory {
override def getComponentProvider(clazz: Class[_]): ActiveObjectComponentProvider = getComponentProvider(null, clazz)
override def getComponentProvider(context: ComponentContext, clazz: Class[_]): ActiveObjectComponentProvider = {
- new ActiveObjectComponentProvider(clazz, configurator)
+ new ActiveObjectComponentProvider(clazz, configurators)
}
}
\ No newline at end of file
diff --git a/kernel/src/main/scala/jersey/AkkaServlet.scala b/kernel/src/main/scala/jersey/AkkaServlet.scala
index efc3a5f6a5..112c39b30f 100644
--- a/kernel/src/main/scala/jersey/AkkaServlet.scala
+++ b/kernel/src/main/scala/jersey/AkkaServlet.scala
@@ -4,23 +4,26 @@
package se.scalablesolutions.akka.kernel.jersey
+import config.ActiveObjectConfigurator
+
import com.sun.jersey.api.core.{DefaultResourceConfig, ResourceConfig}
import com.sun.jersey.spi.container.servlet.ServletContainer
import com.sun.jersey.spi.container.WebApplication
-import config.ActiveObjectConfigurator
-import java.util.{HashSet, ArrayList}
+
+import java.util.HashSet
+
class AkkaServlet extends ServletContainer {
override def initiate(rc: ResourceConfig, wa: WebApplication) = {
- val configurator = ActiveObjectConfigurator.getConfiguratorFor(getServletContext);
+ val configurators = ActiveObjectConfigurator.getConfiguratorsFor(getServletContext);
val set = new HashSet[Class[_]]
- for (c <- configurator.getComponentInterfaces) {
- println("========== " + c)
- set.add(c)
- }
+ for {
+ conf <- configurators
+ clazz <- conf.getComponentInterfaces
+ } set.add(clazz)
wa.initiate(
new DefaultResourceConfig(set),
- new ActiveObjectComponentProviderFactory(configurator));
+ new ActiveObjectComponentProviderFactory(configurators));
}
}
\ No newline at end of file
diff --git a/kernel/src/main/scala/nio/RemoteClient.scala b/kernel/src/main/scala/nio/RemoteClient.scala
index 62602bec47..141d212c8a 100644
--- a/kernel/src/main/scala/nio/RemoteClient.scala
+++ b/kernel/src/main/scala/nio/RemoteClient.scala
@@ -28,7 +28,7 @@ object RemoteClient extends Logging {
else {
val client = new RemoteClient(hostname, port)
client.connect
- clients + hash -> client
+ clients += hash -> client
client
}
}
diff --git a/kernel/src/main/scala/nio/RemoteServer.scala b/kernel/src/main/scala/nio/RemoteServer.scala
index fa7ff0e7ef..5a23f3ea50 100644
--- a/kernel/src/main/scala/nio/RemoteServer.scala
+++ b/kernel/src/main/scala/nio/RemoteServer.scala
@@ -23,10 +23,9 @@ class RemoteServer extends Logging {
}
object RemoteServer extends Logging {
- // FIXME make all remote server option configurable
- val HOSTNAME = "localhost"
- val PORT = 9999
- val CONNECTION_TIMEOUT_MILLIS = 100
+ val HOSTNAME = kernel.Kernel.config.getString("akka.remote.hostname", "localhost")
+ val PORT = kernel.Kernel.config.getInt("akka.remote.port", 9999)
+ val CONNECTION_TIMEOUT_MILLIS = kernel.Kernel.config.getInt("akka.remote.connection-timeout", 1000)
@volatile private var isRunning = false
@@ -77,7 +76,7 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
//e.getChannel.write(firstMessage)
}
- override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) ={
+ override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
val message = event.getMessage
if (message == null) throw new IllegalStateException("Message in remote MessageEvent is null: " + event)
if (message.isInstanceOf[RemoteRequest]) handleRemoteRequest(message.asInstanceOf[RemoteRequest], event.getChannel)
diff --git a/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala b/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala
index 4e95c8b6dc..1efbe85fb3 100644
--- a/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala
+++ b/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala
@@ -60,7 +60,6 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
private val NR_START_THREADS = 16
private val NR_MAX_THREADS = 128
private val KEEP_ALIVE_TIME = 60000L // default is one minute
- private val MILLISECONDS = TimeUnit.MILLISECONDS
private var inProcessOfBuilding = false
private var executor: ExecutorService = _
@@ -114,14 +113,15 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
override protected def doShutdown = executor.shutdownNow
private def getIfNotBusy(key: AnyRef): Option[MessageHandler] = guard.synchronized {
- if (!busyHandlers.contains(key) && messageHandlers.containsKey(key)) {
+ if (CONCURRENT_MODE && messageHandlers.containsKey(key)) Some(messageHandlers.get(key))
+ else if (!busyHandlers.contains(key) && messageHandlers.containsKey(key)) {
busyHandlers.add(key)
Some(messageHandlers.get(key))
} else None
}
private def free(key: AnyRef) = guard.synchronized {
- busyHandlers.remove(key)
+ if (!CONCURRENT_MODE) busyHandlers.remove(key)
}
@@ -297,16 +297,16 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend
def submit[T](callable: Callable[T]) = executor.submit(callable)
def submit[T](runnable: Runnable, t: T) = executor.submit(runnable, t)
def submit(runnable: Runnable) = executor.submit(runnable)
- /*
def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables)
def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables)
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
- */
+/*
def invokeAll[T](callables: Collection[Callable[T]]) = executor.invokeAll(callables)
def invokeAll[T](callables: Collection[Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
def invokeAny[T](callables: Collection[Callable[T]]) = executor.invokeAny(callables)
def invokeAny[T](callables: Collection[Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
+ */
}
/**
diff --git a/kernel/src/main/scala/reactor/MessageDispatcherBase.scala b/kernel/src/main/scala/reactor/MessageDispatcherBase.scala
index b456d4da94..b85ccb13a2 100644
--- a/kernel/src/main/scala/reactor/MessageDispatcherBase.scala
+++ b/kernel/src/main/scala/reactor/MessageDispatcherBase.scala
@@ -4,9 +4,13 @@
package se.scalablesolutions.akka.kernel.reactor
+import java.util.concurrent.TimeUnit
import java.util.HashMap
trait MessageDispatcherBase extends MessageDispatcher {
+ val CONCURRENT_MODE = kernel.Kernel.config.getBool("akka.actor.concurrent-mode", false)
+ val MILLISECONDS = TimeUnit.MILLISECONDS
+
val messageQueue = new MessageQueue
@volatile protected var active: Boolean = false
diff --git a/kernel/src/main/scala/state/CassandraNode.scala b/kernel/src/main/scala/state/CassandraStorage.scala
similarity index 70%
rename from kernel/src/main/scala/state/CassandraNode.scala
rename to kernel/src/main/scala/state/CassandraStorage.scala
index 1440ae1c97..532a24e228 100644
--- a/kernel/src/main/scala/state/CassandraNode.scala
+++ b/kernel/src/main/scala/state/CassandraStorage.scala
@@ -5,12 +5,17 @@
package se.scalablesolutions.akka.kernel.state
import java.io.File
-import java.lang.reflect.Constructor
import kernel.util.{Serializer, JavaSerializationSerializer, Logging}
import org.apache.cassandra.config.DatabaseDescriptor
import org.apache.cassandra.service._
+import org.apache.thrift.server.TThreadPoolServer
+import org.apache.thrift.protocol.TBinaryProtocol
+import org.apache.thrift.transport.TServerSocket
+import org.apache.thrift.transport.TTransportFactory
+import org.apache.thrift.TProcessorFactory
+
/**
* NOTE: requires command line options:
*
@@ -18,18 +23,30 @@ import org.apache.cassandra.service._
*
* @author Jonas Bonér
*/
-final object CassandraNode extends Logging {
-
+final object CassandraStorage extends Logging {
val TABLE_NAME = "akka"
val MAP_COLUMN_FAMILY = "map"
val VECTOR_COLUMN_FAMILY = "vector"
val REF_COLUMN_FAMILY = "ref:item"
+ val IS_ASCENDING = true
- // TODO: make pluggable (Avro, JSON, Thrift, Protobuf etc.)
- private[this] var serializer: Serializer = new JavaSerializationSerializer
+ val RUN_THRIFT_SERVICE = kernel.Kernel.config.getBool("akka.storage.cassandra.thrift-server.service", false)
+ val BLOCKING_CALL = kernel.Kernel.config.getInt("akka.storage.cassandra.blocking", 0)
+
+ private[this] val serializer: Serializer = {
+ kernel.Kernel.config.getString("akka.storage.cassandra.storage-format", "serialization") match {
+ case "serialization" => new JavaSerializationSerializer
+ case "json" => throw new UnsupportedOperationException("json storage protocol is not yet supported")
+ case "avro" => throw new UnsupportedOperationException("avro storage protocol is not yet supported")
+ case "thrift" => throw new UnsupportedOperationException("thrift storage protocol is not yet supported")
+ case "protobuf" => throw new UnsupportedOperationException("protobuf storage protocol is not yet supported")
+ }
+ }
// TODO: is this server thread-safe or needed to be wrapped up in an actor?
private[this] val server = classOf[CassandraServer].newInstance.asInstanceOf[CassandraServer]
+
+ private[this] var thriftServer: CassandraThriftServer = _
def start = {
try {
@@ -40,9 +57,16 @@ final object CassandraNode extends Logging {
log.error("Could not start up persistent storage")
throw e
}
+ if (RUN_THRIFT_SERVICE) {
+ thriftServer = new CassandraThriftServer(server)
+ thriftServer.start
+ }
}
- def stop = {}
+ def stop = {
+ //server.storageService.shutdown
+ if (RUN_THRIFT_SERVICE) thriftServer.stop
+ }
// ===============================================================
// For Ref
@@ -55,7 +79,7 @@ final object CassandraNode extends Logging {
REF_COLUMN_FAMILY,
serializer.out(element),
System.currentTimeMillis,
- false) // FIXME: what is this flag for?
+ BLOCKING_CALL)
}
def getRefStorageFor(name: String): Option[AnyRef] = {
@@ -80,7 +104,7 @@ final object CassandraNode extends Logging {
VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name),
serializer.out(element),
System.currentTimeMillis,
- false) // FIXME: what is this flag for?
+ BLOCKING_CALL)
}
def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
@@ -95,7 +119,7 @@ final object CassandraNode extends Logging {
}
def getVectorStorageRangeFor(name: String, start: Int, count: Int): List[AnyRef] =
- server.get_slice(TABLE_NAME, name, VECTOR_COLUMN_FAMILY, start, count)
+ server.get_slice(TABLE_NAME, name, VECTOR_COLUMN_FAMILY, IS_ASCENDING, count)
.toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]].map(tuple => tuple._2)
def getVectorStorageSizeFor(name: String): Int =
@@ -112,7 +136,7 @@ final object CassandraNode extends Logging {
MAP_COLUMN_FAMILY + ":" + key,
serializer.out(value),
System.currentTimeMillis,
- false) // FIXME: what is this flag for?
+ BLOCKING_CALL)
}
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[String, AnyRef]]) = {
@@ -127,7 +151,7 @@ final object CassandraNode extends Logging {
TABLE_NAME,
name,
columns),
- false) // non-blocking
+ BLOCKING_CALL)
}
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = {
@@ -154,28 +178,20 @@ final object CassandraNode extends Logging {
server.get_column_count(TABLE_NAME, name, MAP_COLUMN_FAMILY)
def removeMapStorageFor(name: String) =
- server.remove(TABLE_NAME, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, false)
+ server.remove(TABLE_NAME, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, BLOCKING_CALL)
- def getMapStorageRangeFor(name: String, start: Int, count: Int): List[Tuple2[String, AnyRef]] =
- server.get_slice(TABLE_NAME, name, MAP_COLUMN_FAMILY, start, count)
- .toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]]
+ def getMapStorageRangeFor(name: String, start: Int, count: Int): List[Tuple2[String, AnyRef]] = {
+ server.get_slice(TABLE_NAME, name, MAP_COLUMN_FAMILY, IS_ASCENDING, count)
+ .toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]]
+ }
}
-/*
- * This code is only for starting up the Cassandra Thrift server, perhaps later
-
-import scala.actors.Actor._
-
-import com.facebook.thrift.protocol.TBinaryProtocol
-import com.facebook.thrift.protocol.TProtocolFactory
-import com.facebook.thrift.server.TThreadPoolServer
-import com.facebook.thrift.transport.TServerSocket
-import com.facebook.thrift.transport.TTransportException
-import com.facebook.thrift.transport.TTransportFactory
-import com.facebook.thrift.TProcessorFactory
+class CassandraThriftServer(server: CassandraServer) extends Logging {
+ case object Start
+ case object Stop
private[this] val serverEngine: TThreadPoolServer = try {
- val pidFile = System.getProperty("pidfile")
+ val pidFile = kernel.Kernel.config.getString("akka.storage.cassandra.thrift-server.pidfile", "akka.pid")
if (pidFile != null) new File(pidFile).deleteOnExit();
val listenPort = DatabaseDescriptor.getThriftPort
@@ -197,17 +213,19 @@ import com.facebook.thrift.TProcessorFactory
log.error("Could not start up persistent storage node.")
throw e
}
+
+ import scala.actors.Actor._
private[this] val serverDaemon = actor {
receive {
- case Start =>
- log.info("Persistent storage node starting up...")
+ case Start =>
+ log.info("Cassandra thrift service is starting up...")
serverEngine.serve
- case Stop =>
- log.info("Persistent storage node shutting down...")
+ case Stop =>
+ log.info("Cassandra thrift service is shutting down...")
serverEngine.stop
- //case Insert(..) =>
- // server.
}
}
-*/
+ def start = serverDaemon ! Start
+ def stop = serverDaemon ! Stop
+}
diff --git a/kernel/src/main/scala/state/State.scala b/kernel/src/main/scala/state/State.scala
index 7bb8e40855..6718949f91 100644
--- a/kernel/src/main/scala/state/State.scala
+++ b/kernel/src/main/scala/state/State.scala
@@ -186,7 +186,7 @@ abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
}
/**
- * Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage.
+ * Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage.
*
* @author Jonas Bonér
*/
@@ -194,43 +194,44 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str
override def getRange(start: Int, count: Int) = {
verifyTransaction
- CassandraNode.getMapStorageRangeFor(uuid, start, count)
+ CassandraStorage.getMapStorageRangeFor(uuid, start, count)
}
// ---- For Transactional ----
override def commit = {
+ CassandraStorage.insertMapStorageEntriesFor(uuid, changeSet.toList)
// FIXME: should use batch function once the bug is resolved
- for (entry <- changeSet) {
- val (key, value) = entry
- CassandraNode.insertMapStorageEntryFor(uuid, key, value)
- }
+// for (entry <- changeSet) {
+// val (key, value) = entry
+// CassandraStorage.insertMapStorageEntryFor(uuid, key, value)
+// }
}
// ---- Overriding scala.collection.mutable.Map behavior ----
override def clear = {
verifyTransaction
- CassandraNode.removeMapStorageFor(uuid)
+ CassandraStorage.removeMapStorageFor(uuid)
}
override def contains(key: String): Boolean = {
verifyTransaction
- CassandraNode.getMapStorageEntryFor(uuid, key).isDefined
+ CassandraStorage.getMapStorageEntryFor(uuid, key).isDefined
}
override def size: Int = {
verifyTransaction
- CassandraNode.getMapStorageSizeFor(uuid)
+ CassandraStorage.getMapStorageSizeFor(uuid)
}
// ---- For scala.collection.mutable.Map ----
override def get(key: String): Option[AnyRef] = {
verifyTransaction
- val result = CassandraNode.getMapStorageEntryFor(uuid, key)
+ val result = CassandraStorage.getMapStorageEntryFor(uuid, key)
result
}
override def elements: Iterator[Tuple2[String, AnyRef]] = {
//verifyTransaction
new Iterator[Tuple2[String, AnyRef]] {
- private val originalList: List[Tuple2[String, AnyRef]] = CassandraNode.getMapStorageFor(uuid)
+ private val originalList: List[Tuple2[String, AnyRef]] = CassandraStorage.getMapStorageFor(uuid)
private var elements = originalList.reverse
override def next: Tuple2[String, AnyRef]= synchronized {
val element = elements.head
@@ -335,15 +336,15 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect
// ---- For TransactionalVector ----
override def get(index: Int): AnyRef = {
verifyTransaction
- CassandraNode.getVectorStorageEntryFor(uuid, index)
+ CassandraStorage.getVectorStorageEntryFor(uuid, index)
}
override def getRange(start: Int, count: Int): List[AnyRef] = {
verifyTransaction
- CassandraNode.getVectorStorageRangeFor(uuid, start, count)
+ CassandraStorage.getVectorStorageRangeFor(uuid, start, count)
}
override def length: Int = {
verifyTransaction
- CassandraNode.getVectorStorageSizeFor(uuid)
+ CassandraStorage.getVectorStorageSizeFor(uuid)
}
override def apply(index: Int): AnyRef = get(index)
override def first: AnyRef = get(0)
@@ -358,7 +359,7 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect
override def commit = {
// FIXME: should use batch function once the bug is resolved
for (element <- changeSet) {
- CassandraNode.insertVectorStorageEntryFor(uuid, element)
+ CassandraStorage.insertVectorStorageEntryFor(uuid, element)
}
}
}
@@ -397,11 +398,11 @@ class TransactionalRef[T] extends Transactional {
}
class CassandraPersistentTransactionalRef extends TransactionalRef[AnyRef] {
- override def commit = if (ref.isDefined) CassandraNode.insertRefStorageFor(uuid, ref.get)
+ override def commit = if (ref.isDefined) CassandraStorage.insertRefStorageFor(uuid, ref.get)
override def get: Option[AnyRef] = {
verifyTransaction
- CassandraNode.getRefStorageFor(uuid)
+ CassandraStorage.getRefStorageFor(uuid)
}
override def isDefined: Boolean = get.isDefined
override def getOrElse(default: => AnyRef): AnyRef = {
diff --git a/kernel/src/main/scala/stm/TransactionManagement.scala b/kernel/src/main/scala/stm/TransactionManagement.scala
index 678e4fe33c..5a532b5591 100644
--- a/kernel/src/main/scala/stm/TransactionManagement.scala
+++ b/kernel/src/main/scala/stm/TransactionManagement.scala
@@ -14,7 +14,7 @@ class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Tran
}
object TransactionManagement {
- private val txEnabled = new AtomicBoolean(true)
+ private val txEnabled = new AtomicBoolean(kernel.Kernel.config.getBool("akka.stm.service", true))
def isTransactionalityEnabled = txEnabled.get
def disableTransactions = txEnabled.set(false)
diff --git a/kernel/src/main/scala/stm/TransactionWatcher.scala b/kernel/src/main/scala/stm/TransactionWatcher.scala
new file mode 100644
index 0000000000..755a75f54d
--- /dev/null
+++ b/kernel/src/main/scala/stm/TransactionWatcher.scala
@@ -0,0 +1,105 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.kernel.stm
+
+/*
+import kernel.util.Logging
+import org.apache.zookeeper.jmx.ManagedUtil
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog
+import org.apache.zookeeper.server.{ServerConfig, NIOServerCnxn}
+import org.apache.zookeeper.{KeeperException, WatchedEvent, Watcher, ZooKeeper, DataMonitor}
+*/
+/**
+ * @author Jonas Bonér
+ *
+class TransactionWatcher extends Logging with Watcher {
+
+ val SERVER_URL = "localhost"
+
+ val ZOO_KEEPER_URL = SERVER_URL
+ val ZOO_KEEPER_PORT = 2181
+ val znode = "master"
+
+ private[this] val db = new scala.collection.mutable.HashMap[String, String]
+
+ private[this] val zk = new ZooKeeper(ZOO_KEEPER_URL + ":" + ZOO_KEEPER_PORT, 3000, this)
+ private[this] val dm = new DataMonitor(zk, znode, null, this)
+
+ override def process(event: WatchedEvent) = {
+ log.debug("New ZooKeeper event: %s", event)
+ val path = event.getPath();
+ if (event.getType == Event.EventType.None) {
+ // We are are being told that the state of the connection has changed
+ event.getState match {
+ case SyncConnected =>
+ // In this particular example we don't need to do anything
+ // here - watches are automatically re-registered with
+ // server and any watches triggered while the client was
+ // disconnected will be delivered (in order of course)
+ case Expired =>
+ dead = true
+ listener.closing(KeeperException.Code.SessionExpired)
+ }
+ } else {
+ if (path != null && path.equals(znode)) {
+ // Something has changed on the node, let's find out
+ zk.exists(znode, true, this, null)
+ }
+ }
+ if (chainedWatcher != null) chainedWatcher.process(event);
+ }
+
+
+
+ def run: Unit = synchronized {
+ try {
+ while (!dm.dead) wait
+ } catch {
+ case e: InterruptedException => Thread.currentThread.interrupt
+ }
+ }
+
+ def closing(rc: Int): Unit = synchronized { notifyAll() }
+}
+
+ */
+object TransactionWatcher {
+ def main(args: Array[String]): Unit = {
+ println("Connecting to ZooKeeper...")
+ //new TransactionWatcher
+ }
+}
+
+ // private[akka] def startZooKeeper = {
+ // try {
+ // ManagedUtil.registerLog4jMBeans
+ // ServerConfig.parse(args)
+ // } catch {
+ // case e: JMException => log.warning("Unable to register log4j JMX control: s%", e)
+ // case e => log.fatal("Error in ZooKeeper config: s%", e)
+ // }
+ // val factory = new ZooKeeperServer.Factory() {
+ // override def createConnectionFactory = new NIOServerCnxn.Factory(ServerConfig.getClientPort)
+ // override def createServer = {
+ // val server = new ZooKeeperServer
+ // val txLog = new FileTxnSnapLog(
+ // new File(ServerConfig.getDataLogDir),
+ // new File(ServerConfig.getDataDir))
+ // server.setTxnLogFactory(txLog)
+ // server
+ // }
+ // }
+ // try {
+ // val zooKeeper = factory.createServer
+ // zooKeeper.startup
+ // log.info("ZooKeeper started")
+ // // TODO: handle clean shutdown as below in separate thread
+ // // val cnxnFactory = serverFactory.createConnectionFactory
+ // // cnxnFactory.setZooKeeperServer(zooKeeper)
+ // // cnxnFactory.join
+ // // if (zooKeeper.isRunning) zooKeeper.shutdown
+ // } catch { case e => log.fatal("Unexpected exception: s%",e) }
+ // }
+
diff --git a/kernel/src/main/scala/util/Logging.scala b/kernel/src/main/scala/util/Logging.scala
old mode 100755
new mode 100644
index eebb667882..655dfce5db
--- a/kernel/src/main/scala/util/Logging.scala
+++ b/kernel/src/main/scala/util/Logging.scala
@@ -22,7 +22,7 @@ import java.net.UnknownHostException;
trait Logging {
@transient var log = {
val log = Logger.get(this.getClass.getName)
- log.setLevel(Level.ALL)
+ //0log.setLevel(Level.ALL)
log
}
}
diff --git a/kernel/src/test/scala/InMemoryActorSpec.scala b/kernel/src/test/scala/InMemoryActorSpec.scala
index abf7781b49..a16898f3b1 100644
--- a/kernel/src/test/scala/InMemoryActorSpec.scala
+++ b/kernel/src/test/scala/InMemoryActorSpec.scala
@@ -90,7 +90,7 @@ class InMemoryActorSpec extends TestCase {
stateful.start
stateful ! SetMapStateOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
Thread.sleep(100)
- stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
+ stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
Thread.sleep(100)
assertEquals("new state", (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
}
@@ -100,7 +100,7 @@ class InMemoryActorSpec extends TestCase {
val stateful = new InMemStatefulActor
stateful.start
stateful !! SetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
- stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
+ stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
assertEquals("new state", (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
}
@@ -112,7 +112,7 @@ class InMemoryActorSpec extends TestCase {
failer.start
stateful ! SetMapStateOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
Thread.sleep(100)
- stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
+ stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
Thread.sleep(100)
assertEquals("init", (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
}
@@ -125,7 +125,7 @@ class InMemoryActorSpec extends TestCase {
val failer = new InMemFailerActor
failer.start
try {
- stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
+ stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
fail("should have thrown an exception")
} catch {case e: RuntimeException => {}}
assertEquals("init", (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
@@ -137,7 +137,7 @@ class InMemoryActorSpec extends TestCase {
stateful.start
stateful ! SetVectorStateOneWay("init") // set init state
Thread.sleep(100)
- stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
+ stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
Thread.sleep(100)
assertEquals("new state", (stateful !! GetVectorState).get)
}
@@ -147,7 +147,7 @@ class InMemoryActorSpec extends TestCase {
val stateful = new InMemStatefulActor
stateful.start
stateful !! SetVectorState("init") // set init state
- stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
+ stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
assertEquals("new state", (stateful !! GetVectorState).get)
}
@@ -159,7 +159,7 @@ class InMemoryActorSpec extends TestCase {
Thread.sleep(100)
val failer = new InMemFailerActor
failer.start
- stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
+ stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
Thread.sleep(100)
assertEquals("init", (stateful !! GetVectorState).get) // check that state is == init state
}
@@ -172,7 +172,7 @@ class InMemoryActorSpec extends TestCase {
val failer = new InMemFailerActor
failer.start
try {
- stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
+ stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
fail("should have thrown an exception")
} catch {case e: RuntimeException => {}}
assertEquals("init", (stateful !! GetVectorState).get) // check that state is == init state
@@ -184,7 +184,7 @@ class InMemoryActorSpec extends TestCase {
stateful.start
stateful ! SetRefStateOneWay("init") // set init state
Thread.sleep(100)
- stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
+ stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
Thread.sleep(100)
assertEquals("new state", (stateful !! GetRefState).get)
}
@@ -194,7 +194,7 @@ class InMemoryActorSpec extends TestCase {
val stateful = new InMemStatefulActor
stateful.start
stateful !! SetRefState("init") // set init state
- stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
+ stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
assertEquals("new state", (stateful !! GetRefState).get)
}
@@ -206,7 +206,7 @@ class InMemoryActorSpec extends TestCase {
Thread.sleep(100)
val failer = new InMemFailerActor
failer.start
- stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
+ stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
Thread.sleep(100)
assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state
}
@@ -219,7 +219,7 @@ class InMemoryActorSpec extends TestCase {
val failer = new InMemFailerActor
failer.start
try {
- stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
+ stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
fail("should have thrown an exception")
} catch {case e: RuntimeException => {}}
assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state
diff --git a/kernel/src/test/scala/PersistentActorSpec.scala b/kernel/src/test/scala/PersistentActorSpec.scala
index 204a96edaa..7faa35de40 100644
--- a/kernel/src/test/scala/PersistentActorSpec.scala
+++ b/kernel/src/test/scala/PersistentActorSpec.scala
@@ -72,7 +72,7 @@ class PersistentActorSpec extends TestCase {
val stateful = new PersistentActor
stateful.start
stateful !! SetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
- stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
+ stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
assertEquals("new state", (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
}
@@ -84,7 +84,7 @@ class PersistentActorSpec extends TestCase {
val failer = new PersistentFailerActor
failer.start
try {
- stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
+ stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
fail("should have thrown an exception")
} catch {case e: RuntimeException => {}}
assertEquals("init", (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
@@ -95,7 +95,7 @@ class PersistentActorSpec extends TestCase {
val stateful = new PersistentActor
stateful.start
stateful !! SetVectorState("init") // set init state
- stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
+ stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
assertEquals("new state", (stateful !! GetVectorState).get)
}
@@ -107,7 +107,7 @@ class PersistentActorSpec extends TestCase {
val failer = new PersistentFailerActor
failer.start
try {
- stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
+ stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
fail("should have thrown an exception")
} catch {case e: RuntimeException => {}}
assertEquals("init", (stateful !! GetVectorState).get) // check that state is == init state
@@ -118,7 +118,7 @@ class PersistentActorSpec extends TestCase {
val stateful = new PersistentActor
stateful.start
stateful !! SetRefState("init") // set init state
- stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
+ stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
assertEquals("new state", (stateful !! GetRefState).get)
}
@@ -130,7 +130,7 @@ class PersistentActorSpec extends TestCase {
val failer = new PersistentFailerActor
failer.start
try {
- stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
+ stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
fail("should have thrown an exception")
} catch {case e: RuntimeException => {}}
assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state
diff --git a/kernel/src/test/scala/RemoteActorSpec.scala b/kernel/src/test/scala/RemoteActorSpec.scala
index 1d8a83b61f..3e5a21f813 100644
--- a/kernel/src/test/scala/RemoteActorSpec.scala
+++ b/kernel/src/test/scala/RemoteActorSpec.scala
@@ -27,6 +27,7 @@ class RemoteActorSpecActorBidirectional extends Actor {
class RemoteActorSpec extends TestCase {
+ kernel.Kernel.config
new Thread(new Runnable() {
def run = {
val server = new RemoteServer
diff --git a/kernel/src/test/scala/RemoteSupervisorSpec.scala b/kernel/src/test/scala/RemoteSupervisorSpec.scala
index 95d4ccd1e2..2b396a21a1 100644
--- a/kernel/src/test/scala/RemoteSupervisorSpec.scala
+++ b/kernel/src/test/scala/RemoteSupervisorSpec.scala
@@ -22,6 +22,7 @@ object Log {
@RunWith(classOf[JUnit4Runner])
class RemoteSupervisorSpec extends Suite {
+ Kernel.config
new Thread(new Runnable() {
def run = {
val server = new RemoteServer
@@ -244,6 +245,7 @@ class RemoteSupervisorSpec extends Suite {
}
}
+ /*
def testOneWayKillSingleActorOneForOne = {
Log.messageLog = ""
val sup = getSingleActorOneForOneSupervisor
@@ -277,7 +279,8 @@ class RemoteSupervisorSpec extends Suite {
Log.oneWayLog
}
}
-
+*/
+
/*
def testOneWayKillSingleActorAllForOne = {
Log.messageLog = ""
diff --git a/lib/cassandra-0.3.0-dev.jar b/lib/cassandra-0.3.0-dev.jar
deleted file mode 100644
index 631f082ef6..0000000000
Binary files a/lib/cassandra-0.3.0-dev.jar and /dev/null differ
diff --git a/pom.xml b/pom.xml
index 84ea905ad5..7a58e98a6b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -12,7 +12,7 @@
pom
- 0.1
+ 0.5
se.scalablesolutions.akka
2.7.5
diff --git a/util-java/src/main/java/se/scalablesolutions/akka/annotation/transactional.java b/util-java/src/main/java/se/scalablesolutions/akka/annotation/transactionrequired.java
similarity index 71%
rename from util-java/src/main/java/se/scalablesolutions/akka/annotation/transactional.java
rename to util-java/src/main/java/se/scalablesolutions/akka/annotation/transactionrequired.java
index e8180757b3..c67bcf4862 100644
--- a/util-java/src/main/java/se/scalablesolutions/akka/annotation/transactional.java
+++ b/util-java/src/main/java/se/scalablesolutions/akka/annotation/transactionrequired.java
@@ -7,5 +7,5 @@ package se.scalablesolutions.akka.annotation;
import java.lang.annotation.*;
@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.METHOD)
-public @interface transactional {}
+@Target(ElementType.TYPE)
+public @interface transactionrequired {}