diff --git a/akka.ipr b/akka.ipr index 4102337677..f7df527504 100644 --- a/akka.ipr +++ b/akka.ipr @@ -792,11 +792,9 @@ - - - + - + @@ -949,6 +947,83 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/akka.iws b/akka.iws index 1637eb8aeb..fa115068be 100644 --- a/akka.iws +++ b/akka.iws @@ -2,18 +2,22 @@ + + - - + + + + + + - - - + - - - - + + + + @@ -46,32 +50,6 @@ @@ -607,19 +790,19 @@ - + - + - - + + @@ -631,12 +814,12 @@ - + - + @@ -645,108 +828,7 @@ - - - - - - - - - - - - - - - - - - - - - - - - - + @@ -849,7 +931,7 @@ - + - + - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - + - + - + - + - + - + + + + + + + + + + + + + + + + + + + + + + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/api-java/.classpath b/api-java/.classpath index 83bcaecb04..53228f60a3 100644 --- a/api-java/.classpath +++ b/api-java/.classpath @@ -51,7 +51,10 @@ - + + + + diff --git a/api-java/.settings/org.eclipse.jdt.core.prefs b/api-java/.settings/org.eclipse.jdt.core.prefs new file mode 100644 index 0000000000..e5a6e43dde --- /dev/null +++ b/api-java/.settings/org.eclipse.jdt.core.prefs @@ -0,0 +1,5 @@ +#Wed May 13 10:49:41 CEST 2009 +eclipse.preferences.version=1 +org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.5 +org.eclipse.jdt.core.compiler.compliance=1.5 +org.eclipse.jdt.core.compiler.source=1.5 diff --git a/api-java/akka-api-java.iml b/api-java/akka-api-java.iml index b5f81fd5a3..8ca1ea027a 100644 --- a/api-java/akka-api-java.iml +++ b/api-java/akka-api-java.iml @@ -57,6 +57,16 @@ + + + + + + + + + + diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java b/api-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java index 478b76d28f..9918cc15ae 100755 --- a/api-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java @@ -66,7 +66,7 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase { String str = conf.getActiveObject("string"); fail("exception should have been thrown"); } catch (Exception e) { - assertEquals("Class java.lang.String has not been put under supervision (by passing in the config to the supervise() method", e.getMessage()); + assertEquals("Class string has not been put under supervision (by passing in the config to the supervise() method", e.getMessage()); } } diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java index 3dac546ab9..57db86c34d 100755 --- a/api-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java @@ -107,7 +107,7 @@ class InMemStatefulImpl implements InMemStateful { private TransactionalMap state = new InMemoryTransactionalMap(); public String getState(String key) { - return state.get(key); + return state.get(key).get(); } public void setState(String key, String msg) { @@ -158,7 +158,7 @@ class InMemClasherImpl implements InMemClasher { private TransactionalMap state = new InMemoryTransactionalMap(); public String getState(String key) { - return (String) state.get(key); + return (String) state.get(key).get(); } public void setState(String key, String msg) { diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java b/api-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java index 2a28304202..8e80d54f34 100755 --- a/api-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java @@ -6,7 +6,13 @@ package se.scalablesolutions.akka.api; import se.scalablesolutions.akka.annotation.*; import se.scalablesolutions.akka.kernel.config.*; +import se.scalablesolutions.akka.kernel.config.JavaConfig.AllForOne; +import se.scalablesolutions.akka.kernel.config.JavaConfig.Component; +import se.scalablesolutions.akka.kernel.config.JavaConfig.LifeCycle; +import se.scalablesolutions.akka.kernel.config.JavaConfig.Permanent; +import se.scalablesolutions.akka.kernel.config.JavaConfig.RestartStrategy; import static se.scalablesolutions.akka.kernel.config.JavaConfig.*; +import se.scalablesolutions.akka.kernel.Kernel; import se.scalablesolutions.akka.kernel.TransactionalMap; import se.scalablesolutions.akka.kernel.CassandraPersistentTransactionalMap; @@ -15,26 +21,43 @@ import junit.framework.TestCase; public class PersistentStateTest extends TestCase { static String messageLog = ""; + static { + Kernel.startCassandra(); + } final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava(); - + protected void setUp() { conf.configureActiveObjects( new JavaConfig.RestartStrategy(new JavaConfig.AllForOne(), 3, 5000), - new Component[] { + new Component[] { new Component("persistent-stateful", PersistentStateful.class, PersistentStatefulImpl.class, new LifeCycle(new Permanent(), 1000), 10000000), new Component("persistent-failer", PersistentFailer.class, PersistentFailerImpl.class, new LifeCycle(new Permanent(), 1000), 1000), new Component("persistent-clasher", PersistentClasher.class, PersistentClasherImpl.class, new LifeCycle(new Permanent(), 1000), 100000) - }).inject().supervise(); + }).supervise(); } + protected void tearDown() { + conf.stop(); + } + public void testShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { - /* - PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class); - stateful.setState("stateful", "init"); // set init state - stateful.success("stateful", "new state"); // transactional - assertEquals("new state", stateful.getState("stateful")); - */ - assertTrue(true); + PersistentStateful stateful = conf.getActiveObject("persistent-stateful"); + stateful.setState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state + stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional + stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit + assertEquals("new state", stateful.getState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")); + } + + public void testShouldRollbackStateForStatefulServerInCaseOfFailure() { + PersistentStateful stateful = conf.getActiveObject("persistent-stateful"); + stateful.setState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state + PersistentFailer failer = conf.getActiveObject("persistent-failer"); + try { + stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method + fail("should have thrown an exception"); + } catch (RuntimeException e) { + } // expected + assertEquals("init", stateful.getState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state } } @@ -60,7 +83,7 @@ class PersistentStatefulImpl implements PersistentStateful { private TransactionalMap state = new CassandraPersistentTransactionalMap(this); public String getState(String key) { - return (String)state.get(key); + return (String)state.get(key).get(); } public void setState(String key, String msg) { @@ -110,7 +133,7 @@ class PersistentClasherImpl implements PersistentClasher { private TransactionalMap state = new CassandraPersistentTransactionalMap(this); public String getState(String key) { - return (String)state.get(key); + return (String)state.get(key).get(); } public void setState(String key, String msg) { diff --git a/config/storage-conf.xml b/config/storage-conf.xml index 5a0c2d89d8..cad425b8da 100755 --- a/config/storage-conf.xml +++ b/config/storage-conf.xml @@ -33,8 +33,8 @@ /Users/jboner/src/scala/akka/storage/staging false - - +
+
diff --git a/kernel/akka-kernel.iml b/kernel/akka-kernel.iml index 7a863e61b1..80c27d51c9 100644 --- a/kernel/akka-kernel.iml +++ b/kernel/akka-kernel.iml @@ -90,6 +90,7 @@ +
diff --git a/kernel/pom.xml b/kernel/pom.xml index c1a6025f3e..b9f8badec9 100755 --- a/kernel/pom.xml +++ b/kernel/pom.xml @@ -40,6 +40,18 @@ guice-jsr250 2.0-SNAPSHOT + + org.apache.camel + camel-core + 2.0-SNAPSHOT + + + org.jboss.netty + netty + 3.1.0.BETA2 + + + com.sun.grizzly grizzly-servlet-webserver @@ -60,11 +72,8 @@ jersey-atom 1.0.1 - - org.jboss.netty - netty - 3.1.0.BETA2 - + + org.apache.cassandra cassandra @@ -81,10 +90,11 @@ 1.0 - org.apache.camel - camel-core - 2.0-SNAPSHOT + org.apache.commons + commons-collections + 3.2.1 + + + org.slf4j slf4j-log4j12 @@ -115,7 +127,9 @@ log4j 1.2.13 - + + + org.scala-tools.testing scalatest 0.9.5 diff --git a/kernel/src/main/scala/CassandraNode.scala b/kernel/src/main/scala/CassandraNode.scala index b183264eb3..42ba79ef42 100755 --- a/kernel/src/main/scala/CassandraNode.scala +++ b/kernel/src/main/scala/CassandraNode.scala @@ -15,6 +15,8 @@ import org.apache.cassandra.service._ */ final object CassandraNode extends Logging { + // NOTE: requires command line options: + // -Dcassandra -Dstorage-config=config/ -Dpidfile=akka.pid val TABLE_NAME = "akka" val ACTOR_KEY_PREFIX = "actor" val ACTOR_MAP_COLUMN_FAMILY = "map" @@ -23,11 +25,7 @@ final object CassandraNode extends Logging { private[this] var serializer: Serializer = new JavaSerializationSerializer // TODO: is this server thread-safe or needed to be wrapped up in an actor? - private[this] val server = { - val ctor = classOf[CassandraServer].getConstructor(Array[Class[_]]():_*) - ctor.setAccessible(true) - ctor.newInstance(Array[AnyRef]():_*).asInstanceOf[CassandraServer] - } + private[this] val server = classOf[CassandraServer].newInstance.asInstanceOf[CassandraServer] def start = { try { @@ -48,7 +46,8 @@ final object CassandraNode extends Logging { ACTOR_KEY_PREFIX + ":" + actorName, ACTOR_MAP_COLUMN_FAMILY + ":" + entry, serializer.out(content), - System.currentTimeMillis) + System.currentTimeMillis, + false) // FIXME: what is this flag for? } def insertActorStorageEntries(actorName: String, entries: List[Tuple2[String, AnyRef]]) = { @@ -59,10 +58,11 @@ final object CassandraNode extends Logging { cls.add(new column_t(entry._1, serializer.out(entry._2), System.currentTimeMillis)) columns.put(ACTOR_MAP_COLUMN_FAMILY, cls) } - server.batch_insert_blocking(new batch_mutation_t( + server.batch_insert(new batch_mutation_t( TABLE_NAME, ACTOR_KEY_PREFIX + ":" + actorName, - columns)) + columns), + false) // non-blocking } def getActorStorageEntryFor(actorName: String, entry: AnyRef): Option[AnyRef] = { diff --git a/kernel/src/main/scala/Kernel.scala b/kernel/src/main/scala/Kernel.scala index e3e0541e89..31b3cd693c 100755 --- a/kernel/src/main/scala/Kernel.scala +++ b/kernel/src/main/scala/Kernel.scala @@ -74,7 +74,7 @@ object Kernel extends Logging { } private def cassandraBenchmark = { - val NR_ENTRIES = 1000000 + val NR_ENTRIES = 100000 println("=================================================") var start = System.currentTimeMillis diff --git a/kernel/src/main/scala/State.scala b/kernel/src/main/scala/State.scala index 35ec0bb2ac..79ef554684 100755 --- a/kernel/src/main/scala/State.scala +++ b/kernel/src/main/scala/State.scala @@ -7,6 +7,9 @@ package se.scalablesolutions.akka.kernel import se.scalablesolutions.akka.collection._ import scala.collection.mutable.HashMap +/** + * @author Jonas Bonér + */ trait Transactional { private[kernel] def begin private[kernel] def commit @@ -20,14 +23,40 @@ trait Transactional { * * @author Jonas Bonér */ -trait TransactionalMap[K, V] extends Transactional { - def put(key: K, value: V) - def remove(key: K) - def get(key: K): V - def contains(key: K): Boolean - def elements: Iterator[(K, V)] - def size: Int - def clear +trait TransactionalMap[K, V] extends Transactional with scala.collection.mutable.Map[K, V] { + def remove(key: K) +} + +/** + * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time. + * + * @author Jonas Bonér + */ +class InMemoryTransactionalMap[K, V] extends TransactionalMap[K, V] { + protected[kernel] var state = new HashTrie[K, V] + protected[kernel] var snapshot = state + + // ---- For Transactional ---- + override def begin = snapshot = state + override def commit = snapshot = state + override def rollback = state = snapshot + + // ---- Overriding scala.collection.mutable.Map behavior ---- + override def contains(key: K): Boolean = state.contains(key) + override def clear = state = new HashTrie[K, V] + override def size: Int = state.size + + // ---- For scala.collection.mutable.Map ---- + override def remove(key: K) = state = state - key + override def elements: Iterator[(K, V)] = state.elements + override def get(key: K): Option[V] = state.get(key) + override def put(key: K, value: V): Option[V] = { + val oldValue = state.get(key) + state = state.update(key, value) + oldValue + } + override def -=(key: K) = remove(key) + override def update(key: K, value: V) = put(key, value) } /** @@ -40,42 +69,21 @@ trait TransactionalMap[K, V] extends Transactional { */ abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] { protected[kernel] val changeSet = new HashMap[K, V] - - override def begin = { - changeSet.clear - } - - override def put(key: K, value: V) = { - changeSet += key -> value - } - - override def remove(key: K) = { - changeSet -= key - } - - def getRange(start: Int, count: Int) -} - -/** - * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time. - * - * @author Jonas Bonér - */ -class InMemoryTransactionalMap[K, V] extends TransactionalMap[K, V] { - protected[kernel] var state = new HashTrie[K, V] - protected[kernel] var snapshot = state - - override def begin = snapshot = state - override def commit = snapshot = state - override def rollback = state = snapshot - override def put(key: K, value: V) = state = state.update(key, value) - override def get(key: K): V = state.get(key).getOrElse(throw new NoSuchElementException("No value for key [" + key + "]")) - override def remove(key: K) = state = state - key - override def contains(key: K): Boolean = state.contains(key) - override def elements: Iterator[(K, V)] = state.elements - override def size: Int = state.size - override def clear = state = new HashTrie[K, V] + def getRange(start: Int, count: Int) + + // ---- For Transactional ---- + override def begin = changeSet.clear + override def rollback = {} + + // ---- For scala.collection.mutable.Map ---- + override def put(key: K, value: V): Option[V] = { + changeSet += key -> value + None // always return None to speed up writes (else need to go to DB to get + } + override def remove(key: K) = changeSet -= key + override def -=(key: K) = remove(key) + override def update(key: K, value: V) = put(key, value) } /** @@ -83,11 +91,14 @@ class InMemoryTransactionalMap[K, V] extends TransactionalMap[K, V] { * * @author Jonas Bonér */ -class CassandraPersistentTransactionalMap(actorNameInstance: AnyRef) extends PersistentTransactionalMap[String, AnyRef] { - val actorName = actorNameInstance.getClass.getName - override def begin = {} - override def rollback = {} +class CassandraPersistentTransactionalMap(actorNameInstance: AnyRef) + extends PersistentTransactionalMap[String, AnyRef] { + val actorName = actorNameInstance.getClass.getName + + override def getRange(start: Int, count: Int) = CassandraNode.getActorStorageRange(actorName, start, count) + + // ---- For Transactional ---- override def commit = { // FIXME: should use batch function once the bug is resolved for (entry <- changeSet) { @@ -96,28 +107,22 @@ class CassandraPersistentTransactionalMap(actorNameInstance: AnyRef) extends Per } } - override def get(key: String): AnyRef = CassandraNode.getActorStorageEntryFor(actorName, key) - .getOrElse(throw new NoSuchElementException("Could not find element for key [" + key + "]")) - - override def contains(key: String): Boolean = CassandraNode.getActorStorageEntryFor(actorName, key).isDefined - - override def size: Int = CassandraNode.getActorStorageSizeFor(actorName) - + // ---- Overriding scala.collection.mutable.Map behavior ---- override def clear = CassandraNode.removeActorStorageFor(actorName) - - override def getRange(start: Int, count: Int) = CassandraNode.getActorStorageRange(actorName, start, count) - - override def elements: Iterator[Tuple2[String, AnyRef]] = { + override def contains(key: String): Boolean = CassandraNode.getActorStorageEntryFor(actorName, key).isDefined + override def size: Int = CassandraNode.getActorStorageSizeFor(actorName) + + // ---- For scala.collection.mutable.Map ---- + override def get(key: String): Option[AnyRef] = CassandraNode.getActorStorageEntryFor(actorName, key) + override def elements: Iterator[Tuple2[String, AnyRef]] = { new Iterator[Tuple2[String, AnyRef]] { private val originalList: List[Tuple2[String, AnyRef]] = CassandraNode.getActorStorageFor(actorName) private var elements = originalList.reverse - override def next: Tuple2[String, AnyRef]= synchronized { val element = elements.head elements = elements.tail element - } - + } override def hasNext: Boolean = synchronized { !elements.isEmpty } } } @@ -129,10 +134,9 @@ class CassandraPersistentTransactionalMap(actorNameInstance: AnyRef) extends Per * * @author Jonas Bonér */ -abstract class TransactionalVector[T] extends Transactional { +abstract class TransactionalVector[T] extends Transactional with RandomAccessSeq[T] { def add(elem: T) def get(index: Int): T - def size: Int } /** @@ -146,13 +150,19 @@ class InMemoryTransactionalVector[T] extends TransactionalVector[T] { private[kernel] var state: Vector[T] = EmptyVector private[kernel] var snapshot = state + override def add(elem: T) = state = state + elem + override def get(index: Int): T = state(index) + + // ---- For Transactional ---- override def begin = snapshot = state override def commit = snapshot = state override def rollback = state = snapshot - override def add(elem: T) = state = state + elem - override def get(index: Int): T = state(index) - override def size: Int = state.size + // ---- For Seq ---- + def length: Int = state.length + def apply(index: Int): T = state(index) + override def elements: Iterator[T] = state.elements + override def toList: List[T] = state.toList } /** diff --git a/kernel/src/main/scala/Supervisor.scala b/kernel/src/main/scala/Supervisor.scala index 224bba3bab..b7256a12c5 100644 --- a/kernel/src/main/scala/Supervisor.scala +++ b/kernel/src/main/scala/Supervisor.scala @@ -12,8 +12,6 @@ import se.scalablesolutions.akka.kernel.Helpers._ import se.scalablesolutions.akka.kernel.config.ScalaConfig._ -//==================================================== - /** * Messages that the supervisor responds to and returns. * @@ -178,7 +176,6 @@ class Supervisor(faultHandler: FaultHandlingStrategy) extends Actor with Logging } } -//==================================================== /** * TODO: document * @@ -203,7 +200,6 @@ abstract class FaultHandlingStrategy(val maxNrOfRetries: Int, val withinTimeRang doHandleFailure(state, failedServer, reason) } - private[kernel] def restart(serverContainer: GenericServerContainer, reason: AnyRef, state: SupervisorState) = { preRestart(serverContainer) serverContainer.lock.withWriteLock { diff --git a/kernel/src/main/scala/Transaction.scala b/kernel/src/main/scala/Transaction.scala index f6dc0f3322..953289995a 100644 --- a/kernel/src/main/scala/Transaction.scala +++ b/kernel/src/main/scala/Transaction.scala @@ -67,7 +67,10 @@ class Transaction extends Logging { else false }}.exists(_ == true) } else false - if (haveAllPreCommitted) status = TransactionStatus.Completed + if (haveAllPreCommitted) { + participants.foreach(_.transactionalItems.foreach(_.commit)) + status = TransactionStatus.Completed + } else rollback(server) } participants.clear diff --git a/lib/cassandra-0.3.0-dev.jar b/lib/cassandra-0.3.0-dev.jar index 584131f521..631f082ef6 100644 Binary files a/lib/cassandra-0.3.0-dev.jar and b/lib/cassandra-0.3.0-dev.jar differ diff --git a/util-java/.settings/org.eclipse.jdt.core.prefs b/util-java/.settings/org.eclipse.jdt.core.prefs new file mode 100644 index 0000000000..592638c172 --- /dev/null +++ b/util-java/.settings/org.eclipse.jdt.core.prefs @@ -0,0 +1,5 @@ +#Wed May 13 10:43:31 CEST 2009 +org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.5 +eclipse.preferences.version=1 +org.eclipse.jdt.core.compiler.source=1.5 +org.eclipse.jdt.core.compiler.compliance=1.5