diff --git a/akka.iws b/akka.iws index 3518026c55..3cb4137447 100644 --- a/akka.iws +++ b/akka.iws @@ -2,20 +2,26 @@ - - + - - - - + + + + + + + + + + + @@ -36,71 +42,6 @@ - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -177,10 +300,10 @@ - - + + - + @@ -189,79 +312,34 @@ - + - - + + - + - - + + - + - - + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + @@ -280,22 +358,22 @@ @@ -374,6 +452,72 @@ @@ -1515,7 +1745,39 @@ - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + - + - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + localhost @@ -2364,19 +2691,19 @@ - + - + - + @@ -2430,100 +2757,44 @@ - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - + - + - + - + - + - + - + + + + + + + + @@ -2535,9 +2806,67 @@ + + + + + + + + + + + + + + + + + + + + + + + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/bin/start-akka-server.sh b/bin/start-akka-server.sh index 362875f95c..65f0c773d5 100755 --- a/bin/start-akka-server.sh +++ b/bin/start-akka-server.sh @@ -2,14 +2,6 @@ VERSION=0.5 -#if [ $# -gt 1 ]; -#then -# echo 'USAGE: bin/start-akka-server.sh' -# exit 1 -#fi - -JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/1.6.0/Home - BASE_DIR=$(dirname $0)/.. echo 'Starting Akka Kernel from directory' $BASE_DIR diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/AllTests.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/AllTests.java new file mode 100644 index 0000000000..866e4f53c0 --- /dev/null +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/AllTests.java @@ -0,0 +1,38 @@ +package se.scalablesolutions.akka.api; + +import junit.framework.TestCase; +import junit.framework.Test; +import junit.framework.TestSuite; + +public class AllTests extends TestCase { + public static Test suite() { + + TestSuite suite = new TestSuite("All tests"); + // Java tests + suite.addTestSuite(InMemoryStateTest.class); + suite.addTestSuite(InMemNestedStateTest.class); + suite.addTestSuite(PersistentStateTest.class); + suite.addTestSuite(PersistentNestedStateTest.class); + suite.addTestSuite(RemoteInMemoryStateTest.class); + suite.addTestSuite(RemotePersistentStateTest.class); + suite.addTestSuite(ActiveObjectGuiceConfiguratorTest.class); + suite.addTestSuite(RestTest.class); + + // Scala tests + //suite.addTestSuite(se.scalablesolutions.akka.kernel.SupervisorSpec.class); + /* + suite.addTestSuite(se.scalablesolutions.akka.kernel.RemoteSupervisorSpec.class); + suite.addTestSuite(se.scalablesolutions.akka.kernel.reactor.EventBasedDispatcherTest.class); + suite.addTestSuite(se.scalablesolutions.akka.kernel.reactor.ThreadBasedDispatcherTest.class); + suite.addTestSuite(se.scalablesolutions.akka.kernel.actor.ActorSpec.class); + suite.addTestSuite(se.scalablesolutions.akka.kernel.actor.RemoteActorSpec.class); + suite.addTestSuite(se.scalablesolutions.akka.kernel.actor.InMemStatefulActor.class); + suite.addTestSuite(se.scalablesolutions.akka.kernel.actor.PersistentActor.class); + */ + return suite; + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } +} \ No newline at end of file diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java index 9f9a227286..482478aa7c 100644 --- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java @@ -70,8 +70,8 @@ public class PersistentNestedStateTest extends TestCase { PersistentStatefulNested nested = conf.getActiveObject(PersistentStatefulNested.class); nested.setVectorState("init"); // set init state stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactionrequired - assertEquals(3, stateful.getVectorLength()); // BAD: keeps one element since last test - assertEquals(3, nested.getVectorLength()); + assertEquals(2, stateful.getVectorLength()); // BAD: keeps one element since last test + assertEquals(2, nested.getVectorLength()); } public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() { diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java index 489cb7ef43..e688ff0dd8 100644 --- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java @@ -15,8 +15,6 @@ public class RemotePersistentStateTest extends TestCase { static String messageLog = ""; static { - se.scalablesolutions.akka.kernel.Kernel$.MODULE$.config(); - System.setProperty("storage-config", "config"); Kernel.startCassandra(); Kernel.startRemoteService(); } @@ -28,8 +26,8 @@ public class RemotePersistentStateTest extends TestCase { conf.configureActiveObjects( new RestartStrategy(new AllForOne(), 3, 5000), new Component[] { - new Component(PersistentStateful.class, new LifeCycle(new Permanent(), 1000), 1000, new RemoteAddress("localhost", 9999)), - new Component(PersistentFailer.class, new LifeCycle(new Permanent(), 1000), 1000, new RemoteAddress("localhost", 9999)) + new Component(PersistentStateful.class, new LifeCycle(new Permanent(), 1000), 1000000, new RemoteAddress("localhost", 9999)), + new Component(PersistentFailer.class, new LifeCycle(new Permanent(), 1000), 1000000, new RemoteAddress("localhost", 9999)) }).supervise(); } @@ -43,13 +41,13 @@ public class RemotePersistentStateTest extends TestCase { stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")); } - + public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() { PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class); stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state PersistentFailer failer = conf.getActiveObject(PersistentFailer.class); try { - stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method + stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "MapShouldRollBack", failer); // call failing transactionrequired method fail("should have thrown an exception"); } catch (RuntimeException e) { } // expected @@ -58,22 +56,21 @@ public class RemotePersistentStateTest extends TestCase { public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class); - stateful.setVectorState("init"); // set init state - stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired - assertEquals("init", stateful.getVectorState(0)); - assertEquals("new state", stateful.getVectorState(1)); + int init = stateful.getVectorLength(); + stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "VectorShouldNotRollback"); // transactionrequired + assertEquals(init + 1, stateful.getVectorLength()); } public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() { PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class); - stateful.setVectorState("init"); // set init state + int init = stateful.getVectorLength(); PersistentFailer failer = conf.getActiveObject(PersistentFailer.class); try { stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method fail("should have thrown an exception"); } catch (RuntimeException e) { } // expected - assertEquals("init", stateful.getVectorState(0)); // check that state is == init state + assertEquals(init, stateful.getVectorLength()); } public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { @@ -94,4 +91,5 @@ public class RemotePersistentStateTest extends TestCase { } // expected assertEquals("init", stateful.getRefState()); // check that state is == init state } + } \ No newline at end of file diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java index a8a8c18cf2..5b7de680a4 100644 --- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java @@ -16,6 +16,7 @@ import javax.ws.rs.core.UriBuilder; import javax.servlet.Servlet; import junit.framework.TestSuite; +import junit.framework.TestCase; import org.junit.*; import static org.junit.Assert.*; @@ -27,7 +28,7 @@ import java.util.HashMap; import se.scalablesolutions.akka.kernel.config.*; import static se.scalablesolutions.akka.kernel.config.JavaConfig.*; -public class RestTest extends TestSuite { +public class RestTest extends TestCase { private static int PORT = 9998; private static URI URI = UriBuilder.fromUri("http://localhost/").port(PORT).build(); diff --git a/kernel/src/main/scala/actor/ActiveObject.scala b/kernel/src/main/scala/actor/ActiveObject.scala index ad833877d9..858d8cec0e 100644 --- a/kernel/src/main/scala/actor/ActiveObject.scala +++ b/kernel/src/main/scala/actor/ActiveObject.scala @@ -192,9 +192,9 @@ sealed class ActorAroundAdvice(val target: Class[_], private def localDispatch(joinpoint: JoinPoint): AnyRef = { val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti] - if (isOneWay(rtti)) actor ! Invocation(joinpoint) + if (isOneWay(rtti)) actor ! Invocation(joinpoint, true) else { - val result = actor !! Invocation(joinpoint) + val result = actor !! Invocation(joinpoint, false) if (result.isDefined) result.get else throw new IllegalStateException("No result defined for invocation [" + joinpoint + "]") } @@ -233,22 +233,24 @@ sealed class ActorAroundAdvice(val target: Class[_], * * @author Jonas Bonér */ -@serializable private[kernel] case class Invocation(val joinpoint: JoinPoint) { +@serializable private[kernel] case class Invocation(val joinpoint: JoinPoint, val isOneWay: Boolean) { override def toString: String = synchronized { - "Invocation [joinpoint: " + joinpoint.toString + "]" + "Invocation [joinpoint: " + joinpoint.toString + ", isOneWay: " + isOneWay + "]" } override def hashCode(): Int = synchronized { var result = HashCode.SEED result = HashCode.hash(result, joinpoint) + result = HashCode.hash(result, isOneWay) result } override def equals(that: Any): Boolean = synchronized { that != null && that.isInstanceOf[Invocation] && - that.asInstanceOf[Invocation].joinpoint == joinpoint + that.asInstanceOf[Invocation].joinpoint == joinpoint && + that.asInstanceOf[Invocation].isOneWay == isOneWay } } @@ -283,8 +285,9 @@ private[kernel] class Dispatcher extends Actor { } override def receive: PartialFunction[Any, Unit] = { - case Invocation(joinpoint: JoinPoint) => - reply(joinpoint.proceed) + case Invocation(joinpoint, oneWay) => + if (oneWay) joinpoint.proceed + else reply(joinpoint.proceed) case unexpected => throw new ActiveObjectException("Unexpected message [" + unexpected + "] sent to [" + this + "]") } diff --git a/kernel/src/main/scala/state/CassandraStorage.scala b/kernel/src/main/scala/state/CassandraStorage.scala index 532a24e228..b40f37a4b8 100644 --- a/kernel/src/main/scala/state/CassandraStorage.scala +++ b/kernel/src/main/scala/state/CassandraStorage.scala @@ -32,7 +32,8 @@ final object CassandraStorage extends Logging { val RUN_THRIFT_SERVICE = kernel.Kernel.config.getBool("akka.storage.cassandra.thrift-server.service", false) val BLOCKING_CALL = kernel.Kernel.config.getInt("akka.storage.cassandra.blocking", 0) - + + @volatile private[this] var isRunning = false private[this] val serializer: Serializer = { kernel.Kernel.config.getString("akka.storage.cassandra.storage-format", "serialization") match { case "serialization" => new JavaSerializationSerializer @@ -48,22 +49,25 @@ final object CassandraStorage extends Logging { private[this] var thriftServer: CassandraThriftServer = _ - def start = { - try { - server.start - log.info("Persistent storage has started up successfully"); - } catch { - case e => - log.error("Could not start up persistent storage") - throw e - } - if (RUN_THRIFT_SERVICE) { - thriftServer = new CassandraThriftServer(server) - thriftServer.start + def start = synchronized { + if (!isRunning) { + try { + server.start + log.info("Persistent storage has started up successfully"); + } catch { + case e => + log.error("Could not start up persistent storage") + throw e + } + if (RUN_THRIFT_SERVICE) { + thriftServer = new CassandraThriftServer(server) + thriftServer.start + } + isRunning } } - def stop = { + def stop = if (isRunning) { //server.storageService.shutdown if (RUN_THRIFT_SERVICE) thriftServer.stop } diff --git a/kernel/src/main/scala/state/State.scala b/kernel/src/main/scala/state/State.scala index 6718949f91..489f5ce5ba 100644 --- a/kernel/src/main/scala/state/State.scala +++ b/kernel/src/main/scala/state/State.scala @@ -87,7 +87,7 @@ trait Transactional { /** * Base trait for all state implementations (persistent or in-memory). * - * TODO: Make this class inherit scala.collection.mutable.Map and/or java.util.Map + * FIXME: Create Java versions using pcollections * * @author Jonas Bonér */ @@ -168,9 +168,9 @@ abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] { def getRange(start: Int, count: Int) // ---- For Transactional ---- - override def begin = changeSet.clear - override def rollback = {} - + override def begin = {} + override def rollback = changeSet.clear + // ---- For scala.collection.mutable.Map ---- override def put(key: K, value: V): Option[V] = { verifyTransaction @@ -200,11 +200,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Str // ---- For Transactional ---- override def commit = { CassandraStorage.insertMapStorageEntriesFor(uuid, changeSet.toList) - // FIXME: should use batch function once the bug is resolved -// for (entry <- changeSet) { -// val (key, value) = entry -// CassandraStorage.insertMapStorageEntryFor(uuid, key, value) -// } + changeSet.clear } // ---- Overriding scala.collection.mutable.Map behavior ---- @@ -316,8 +312,8 @@ abstract class PersistentTransactionalVector[T] extends TransactionalVector[T] { protected[kernel] var changeSet: List[T] = Nil // ---- For Transactional ---- - override def begin = changeSet = Nil - override def rollback = {} + override def begin = {} + override def rollback = changeSet = Nil // ---- For TransactionalVector ---- override def add(value: T) = { @@ -358,9 +354,8 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect // ---- For Transactional ---- override def commit = { // FIXME: should use batch function once the bug is resolved - for (element <- changeSet) { - CassandraStorage.insertVectorStorageEntryFor(uuid, element) - } + for (element <- changeSet) CassandraStorage.insertVectorStorageEntryFor(uuid, element) + changeSet = Nil } } @@ -398,8 +393,11 @@ class TransactionalRef[T] extends Transactional { } class CassandraPersistentTransactionalRef extends TransactionalRef[AnyRef] { - override def commit = if (ref.isDefined) CassandraStorage.insertRefStorageFor(uuid, ref.get) - + override def commit = if (ref.isDefined) { + CassandraStorage.insertRefStorageFor(uuid, ref.get) + ref = None + } + override def rollback = ref = None override def get: Option[AnyRef] = { verifyTransaction CassandraStorage.getRefStorageFor(uuid) diff --git a/kernel/src/test/scala/ActorTest.scala b/kernel/src/test/scala/ActorSpec.scala similarity index 83% rename from kernel/src/test/scala/ActorTest.scala rename to kernel/src/test/scala/ActorSpec.scala index 2db753b123..5977301e7a 100644 --- a/kernel/src/test/scala/ActorTest.scala +++ b/kernel/src/test/scala/ActorSpec.scala @@ -1,14 +1,10 @@ package se.scalablesolutions.akka.kernel.actor -import concurrent.Lock -import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.TimeUnit -import reactor._ -import org.junit.{Test, Before} import org.junit.Assert._ -class ActorTest { +class ActorSpec extends junit.framework.TestCase { private val unit = TimeUnit.MILLISECONDS class TestActor extends Actor { @@ -20,8 +16,7 @@ class ActorTest { } } - @Test - def sendOneWay = { + def testSendOneWay = { implicit val timeout = 5000L var oneWay = "nada" val actor = new Actor { @@ -36,8 +31,7 @@ class ActorTest { actor.stop } - @Test - def sendReplySync = { + def testSendReplySync = { implicit val timeout = 5000L val actor = new TestActor actor.start @@ -46,8 +40,7 @@ class ActorTest { actor.stop } - @Test - def sendReplyAsync = { + def testSendReplyAsync = { implicit val timeout = 5000L val actor = new TestActor actor.start @@ -56,8 +49,7 @@ class ActorTest { actor.stop } - @Test - def sendReceiveException = { + def testSendReceiveException = { implicit val timeout = 5000L val actor = new TestActor actor.start diff --git a/kernel/src/test/scala/AllTests.scala b/kernel/src/test/scala/AllTests.scala new file mode 100644 index 0000000000..464b2e3922 --- /dev/null +++ b/kernel/src/test/scala/AllTests.scala @@ -0,0 +1,22 @@ +package se.scalablesolutions.akka.kernel + +import junit.framework.Test +import junit.framework.TestCase +import junit.framework.TestSuite + +object AllTests extends TestCase { + def suite(): Test = { + val suite = new TestSuite("All tests") + suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.SupervisorSpec]) + suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.RemoteSupervisorSpec]) + suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.reactor.EventBasedDispatcherTest]) + suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.reactor.ThreadBasedDispatcherTest]) + suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.actor.ActorSpec]) + suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.actor.RemoteActorSpec]) + suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.actor.PersistentActorSpec]) + suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.actor.InMemoryActorSpec]) + suite + } + + def main(args: Array[String]) = junit.textui.TestRunner.run(suite) +} \ No newline at end of file diff --git a/kernel/src/test/scala/PersistentActorSpec.scala b/kernel/src/test/scala/PersistentActorSpec.scala index 60fcbb2d17..1ffbfe0ac4 100644 --- a/kernel/src/test/scala/PersistentActorSpec.scala +++ b/kernel/src/test/scala/PersistentActorSpec.scala @@ -96,7 +96,7 @@ class PersistentActorSpec extends TestCase { stateful.start stateful !! SetVectorState("init") // set init state stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired - assertEquals(3, (stateful !! GetVectorSize).get) + assertEquals(2, (stateful !! GetVectorSize).get) } @Test diff --git a/kernel/src/test/scala/RemoteSupervisorSpec.scala b/kernel/src/test/scala/RemoteSupervisorSpec.scala index 2b396a21a1..e1c199234a 100644 --- a/kernel/src/test/scala/RemoteSupervisorSpec.scala +++ b/kernel/src/test/scala/RemoteSupervisorSpec.scala @@ -10,7 +10,7 @@ import kernel.config.ScalaConfig._ import com.jteigen.scalatest.JUnit4Runner import org.junit.runner.RunWith -import org.scalatest._ +import org.scalatest.Suite object Log { var messageLog: String = "" @@ -20,7 +20,7 @@ object Log { * @author Jonas Bonér */ @RunWith(classOf[JUnit4Runner]) -class RemoteSupervisorSpec extends Suite { +class RemoteSupervisorSpec extends junit.framework.TestCase with Suite { Kernel.config new Thread(new Runnable() { diff --git a/kernel/src/test/scala/SupervisorSpec.scala b/kernel/src/test/scala/SupervisorSpec.scala index 52a2e9c6ad..6517d6530a 100644 --- a/kernel/src/test/scala/SupervisorSpec.scala +++ b/kernel/src/test/scala/SupervisorSpec.scala @@ -9,13 +9,13 @@ import kernel.config.ScalaConfig._ import com.jteigen.scalatest.JUnit4Runner import org.junit.runner.RunWith -import org.scalatest._ +import org.scalatest.Suite /** * @author Jonas Bonér */ @RunWith(classOf[JUnit4Runner]) -class SupervisorSpec extends Suite { +class SupervisorSpec extends junit.framework.TestCase with Suite { var messageLog: String = "" var oneWayLog: String = ""