diff --git a/akka.iws b/akka.iws index d3b44298eb..149ba860bc 100644 --- a/akka.iws +++ b/akka.iws @@ -2,15 +2,14 @@ - - - - - + + + + - + @@ -30,7 +29,46 @@ - + + + + + + + - + - + - + - + - + - + - + - + - + - + - + - - - - - - - - - - - - - - - - - - - - - - + - + - + - + - + - + - + - + - + - + - + - + + + + + + + + + + + + + + + + + + + + + + - + diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java index 422e4f9339..e32e0ee321 100644 --- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java @@ -4,7 +4,7 @@ import se.scalablesolutions.akka.kernel.TransactionalMap; import se.scalablesolutions.akka.kernel.CassandraPersistentTransactionalMap; public class PersistentClasher { - private TransactionalMap state = new CassandraPersistentTransactionalMap(this); + private TransactionalMap state = new CassandraPersistentTransactionalMap(); public String getState(String key) { return (String)state.get(key).get(); diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java index 0a4ce9aa2e..02644d79a3 100755 --- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java @@ -35,12 +35,11 @@ public class PersistentStateTest extends TestCase { protected void tearDown() { conf.stop(); } - + public void testShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class); stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional - stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")); } @@ -60,8 +59,8 @@ public class PersistentStateTest extends TestCase { PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class); stateful.setVectorState("init"); // set init state stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional - stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit - assertEquals("new state", stateful.getVectorState()); + assertEquals("init", stateful.getVectorState(0)); + assertEquals("new state", stateful.getVectorState(1)); } public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() { @@ -73,14 +72,13 @@ public class PersistentStateTest extends TestCase { fail("should have thrown an exception"); } catch (RuntimeException e) { } // expected - assertEquals("init", stateful.getVectorState()); // check that state is == init state + assertEquals("init", stateful.getVectorState(0)); // check that state is == init state } -/* + public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class); stateful.setRefState("init"); // set init state stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional - stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit assertEquals("new state", stateful.getRefState()); } @@ -95,5 +93,4 @@ public class PersistentStateTest extends TestCase { } // expected assertEquals("init", stateful.getRefState()); // check that state is == init state } - */ } diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java index 133d1f4a6b..2bb2cf98c4 100644 --- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java @@ -5,46 +5,54 @@ import se.scalablesolutions.akka.annotation.transactional; import se.scalablesolutions.akka.annotation.state; public class PersistentStateful { - private TransactionalMap mapState = new CassandraPersistentTransactionalMap(this); - private TransactionalVector vectorState = new CassandraPersistentTransactionalVector(this); - //private TransactionalRef refState = new CassandraPersistentTransactionalRef(this); + private TransactionalMap mapState = new CassandraPersistentTransactionalMap(); + private TransactionalVector vectorState = new CassandraPersistentTransactionalVector(); + private TransactionalRef refState = new CassandraPersistentTransactionalRef(); + @transactional public String getMapState(String key) { return (String) mapState.get(key).get(); } - public String getVectorState() { - return (String) vectorState.first(); + @transactional + public String getVectorState(int index) { + return (String) vectorState.get(index); } -// public String getRefState() { -// return (String) refState.get().get(); -// } + @transactional + public String getRefState() { + if (refState.isDefined()) { + return (String) refState.get().get(); + } else throw new IllegalStateException("No such element"); + } + @transactional public void setMapState(String key, String msg) { mapState.put(key, msg); } + @transactional public void setVectorState(String msg) { vectorState.add(msg); } -// public void setRefState(String msg) { -// refState.swap(msg); -// } + @transactional + public void setRefState(String msg) { + refState.swap(msg); + } @transactional public void success(String key, String msg) { mapState.put(key, msg); vectorState.add(msg); -// refState.swap(msg); + refState.swap(msg); } @transactional public void failure(String key, String msg, PersistentFailer failer) { mapState.put(key, msg); vectorState.add(msg); -// refState.swap(msg); + refState.swap(msg); failer.fail(); } diff --git a/kernel/src/main/scala/ActiveObject.scala b/kernel/src/main/scala/ActiveObject.scala index b1ecd01ca7..872f1be997 100755 --- a/kernel/src/main/scala/ActiveObject.scala +++ b/kernel/src/main/scala/ActiveObject.scala @@ -208,12 +208,13 @@ sealed class TransactionalAroundAdvice(target: Class[_], def getTransactionalItemsFor(target: Class[_]): Tuple3[List[TransactionalMap[_, _]], List[TransactionalVector[_]], List[TransactionalRef[_]]] = { + target.getDeclaredFields.toArray.toList.asInstanceOf[List[Field]].foreach(println) for { field <- target.getDeclaredFields.toArray.toList.asInstanceOf[List[Field]] fieldType = field.getType - if fieldType == classOf[TransactionalMap[_, _]] || - fieldType == classOf[TransactionalVector[_]] || - fieldType == classOf[TransactionalRef[_]] + if (fieldType == classOf[TransactionalMap[_, _]]) || + (fieldType == classOf[TransactionalVector[_]]) || + (fieldType == classOf[TransactionalRef[_]]) txItem = { field.setAccessible(true) field.get(targetInstance) @@ -225,7 +226,7 @@ sealed class TransactionalAroundAdvice(target: Class[_], else if (txItem.isInstanceOf[TransactionalVector[_]]) vectors ::= txItem.asInstanceOf[TransactionalVector[_]] } val parent = target.getSuperclass - if (parent == null) (maps, vectors, refs) + if (parent == classOf[Object]) (maps, vectors, refs) else getTransactionalItemsFor(parent) } diff --git a/kernel/src/main/scala/CassandraNode.scala b/kernel/src/main/scala/CassandraNode.scala index 329c8ce97e..4af446352e 100755 --- a/kernel/src/main/scala/CassandraNode.scala +++ b/kernel/src/main/scala/CassandraNode.scala @@ -11,17 +11,18 @@ import org.apache.cassandra.config.DatabaseDescriptor import org.apache.cassandra.service._ /** + * NOTE: requires command line options: + *
+ * -Dcassandra -Dstorage-config=config/ -Dpidfile=akka.pid + *

* @author Jonas Bonér */ final object CassandraNode extends Logging { - // NOTE: requires command line options: - // -Dcassandra -Dstorage-config=config/ -Dpidfile=akka.pid val TABLE_NAME = "akka" - val ACTOR_KEY_PREFIX = "actor" - val ACTOR_MAP_COLUMN_FAMILY = "map" - val ACTOR_VECTOR_COLUMN_FAMILY = "vector" - val ACTOR_REF_COLUMN_FAMILY = "ref" + val MAP_COLUMN_FAMILY = "map" + val VECTOR_COLUMN_FAMILY = "vector" + val REF_COLUMN_FAMILY = "ref:item" // TODO: make pluggable (JSON, Thrift, Protobuf etc.) private[this] var serializer: Serializer = new JavaSerializationSerializer @@ -46,6 +47,27 @@ final object CassandraNode extends Logging { // For Ref // =============================================================== + def insertRefStorageFor(name: String, element: AnyRef) = { + server.insert( + TABLE_NAME, + name, + REF_COLUMN_FAMILY, + serializer.out(element), + System.currentTimeMillis, + false) // FIXME: what is this flag for? + } + + def getRefStorageFor(name: String): Option[AnyRef] = { + try { + val column = server.get_column(TABLE_NAME, name, REF_COLUMN_FAMILY) + Some(serializer.in(column.value)) + } catch { + case e => + e.printStackTrace + None //throw new Predef.NoSuchElementException(e.getMessage) + } + } + // =============================================================== // For Vector // =============================================================== @@ -53,8 +75,8 @@ final object CassandraNode extends Logging { def insertVectorStorageEntryFor(name: String, element: AnyRef) = { server.insert( TABLE_NAME, - ACTOR_KEY_PREFIX + ":" + name, - ACTOR_VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name), + name, + VECTOR_COLUMN_FAMILY + ":" + getVectorStorageSizeFor(name), serializer.out(element), System.currentTimeMillis, false) // FIXME: what is this flag for? @@ -62,7 +84,7 @@ final object CassandraNode extends Logging { def getVectorStorageEntryFor(name: String, index: Int): AnyRef = { try { - val column = server.get_column(TABLE_NAME, ACTOR_KEY_PREFIX + ":" + name, ACTOR_VECTOR_COLUMN_FAMILY + ":" + index) + val column = server.get_column(TABLE_NAME, name, VECTOR_COLUMN_FAMILY + ":" + index) serializer.in(column.value) } catch { case e => throw new Predef.NoSuchElementException(e.getMessage) @@ -70,22 +92,21 @@ final object CassandraNode extends Logging { } def getVectorStorageRangeFor(name: String, start: Int, count: Int): List[AnyRef] = - server.get_slice(TABLE_NAME, ACTOR_KEY_PREFIX + ":" + name, ACTOR_VECTOR_COLUMN_FAMILY, start, count) + server.get_slice(TABLE_NAME, name, VECTOR_COLUMN_FAMILY, start, count) .toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]].map(tuple => tuple._2) def getVectorStorageSizeFor(name: String): Int = - server.get_column_count(TABLE_NAME, ACTOR_KEY_PREFIX + ":" + name, ACTOR_VECTOR_COLUMN_FAMILY) + server.get_column_count(TABLE_NAME, name, VECTOR_COLUMN_FAMILY) // =============================================================== // For Map // =============================================================== def insertMapStorageEntryFor(name: String, key: String, value: AnyRef) = { - println("PUT: " + name + " " + key + " " + value) server.insert( TABLE_NAME, - ACTOR_KEY_PREFIX + ":" + name, - ACTOR_MAP_COLUMN_FAMILY + ":" + key, + name, + MAP_COLUMN_FAMILY + ":" + key, serializer.out(value), System.currentTimeMillis, false) // FIXME: what is this flag for? @@ -97,25 +118,24 @@ final object CassandraNode extends Logging { for (entry <- entries) { val cls: List[column_t] = new ArrayList cls.add(new column_t(entry._1, serializer.out(entry._2), System.currentTimeMillis)) - columns.put(ACTOR_MAP_COLUMN_FAMILY, cls) + columns.put(MAP_COLUMN_FAMILY, cls) } server.batch_insert(new batch_mutation_t( TABLE_NAME, - ACTOR_KEY_PREFIX + ":" + name, + name, columns), false) // non-blocking } def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = { - println("GET: " + name + " " + key) try { - val column = server.get_column(TABLE_NAME, ACTOR_KEY_PREFIX + ":" + name, ACTOR_MAP_COLUMN_FAMILY + ":" + key) + val column = server.get_column(TABLE_NAME, name, MAP_COLUMN_FAMILY + ":" + key) Some(serializer.in(column.value)) } catch { case e => None } } def getMapStorageFor(name: String): List[Tuple2[String, AnyRef]] = { - val columns = server.get_columns_since(TABLE_NAME, ACTOR_KEY_PREFIX, ACTOR_MAP_COLUMN_FAMILY, -1) + val columns = server.get_columns_since(TABLE_NAME, name, MAP_COLUMN_FAMILY, -1) .toArray.toList.asInstanceOf[List[org.apache.cassandra.service.column_t]] for { column <- columns @@ -124,13 +144,13 @@ final object CassandraNode extends Logging { } def getMapStorageSizeFor(name: String): Int = - server.get_column_count(TABLE_NAME, ACTOR_KEY_PREFIX + ":" + name, ACTOR_MAP_COLUMN_FAMILY) + server.get_column_count(TABLE_NAME, name, MAP_COLUMN_FAMILY) def removeMapStorageFor(name: String) = - server.remove(TABLE_NAME, ACTOR_KEY_PREFIX + ":" + name, ACTOR_MAP_COLUMN_FAMILY, System.currentTimeMillis, false) + server.remove(TABLE_NAME, name, MAP_COLUMN_FAMILY, System.currentTimeMillis, false) def getMapStorageRangeFor(name: String, start: Int, count: Int): List[Tuple2[String, AnyRef]] = - server.get_slice(TABLE_NAME, ACTOR_KEY_PREFIX + ":" + name, ACTOR_MAP_COLUMN_FAMILY, start, count) + server.get_slice(TABLE_NAME, name, MAP_COLUMN_FAMILY, start, count) .toArray.toList.asInstanceOf[List[Tuple2[String, AnyRef]]] } diff --git a/kernel/src/main/scala/State.scala b/kernel/src/main/scala/State.scala index 97ee8f023c..9dbda3af45 100755 --- a/kernel/src/main/scala/State.scala +++ b/kernel/src/main/scala/State.scala @@ -95,10 +95,7 @@ abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] { * * @author Jonas Bonér */ -class CassandraPersistentTransactionalMap(actorNameInstance: AnyRef) - extends PersistentTransactionalMap[String, AnyRef] { - - val actorName = actorNameInstance.getClass.getName +class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[String, AnyRef] { override def getRange(start: Int, count: Int) = CassandraNode.getMapStorageRangeFor(uuid, start, count) @@ -196,10 +193,7 @@ abstract class PersistentTransactionalVector[T] extends TransactionalVector[T] { * * @author Jonas Bonér */ -class CassandraPersistentTransactionalVector(actorNameInstance: AnyRef) - extends PersistentTransactionalVector[AnyRef] { - - val actorName = actorNameInstance.getClass.getName +class CassandraPersistentTransactionalVector extends PersistentTransactionalVector[AnyRef] { // ---- For TransactionalVector ---- override def get(index: Int): AnyRef = CassandraNode.getVectorStorageEntryFor(uuid, index) @@ -234,5 +228,16 @@ class TransactionalRef[T] extends Transactional { def swap(elem: T) = ref = Some(elem) def get: Option[T] = ref def getOrElse(default: => T): T = ref.getOrElse(default) - def isDefined: Boolean= ref.isDefined + def isDefined: Boolean = ref.isDefined } + +class CassandraPersistentTransactionalRef extends TransactionalRef[AnyRef] { + override def commit = if (ref.isDefined) CassandraNode.insertRefStorageFor(uuid, ref.get) + override def get: Option[AnyRef] = CassandraNode.getRefStorageFor(uuid) + override def isDefined: Boolean = get.isDefined + override def getOrElse(default: => AnyRef): AnyRef = { + val ref = get + if (ref.isDefined) ref + else default + } +} \ No newline at end of file diff --git a/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala b/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala index b2ce83a63d..c11509a2eb 100644 --- a/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala +++ b/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala @@ -34,15 +34,11 @@ class ThreadBasedDispatcher extends MessageDispatcherBase { messageDemultiplexer.select } catch {case e: InterruptedException => active = false} val queue = messageDemultiplexer.acquireSelectedQueue - println("--- QUEUE " + queue.size) // while (!queue.isEmpty) { for (index <- 0 until queue.size) { val message = queue.peek - println("------ MESSAGE: " + message) val messageHandler = getIfNotBusy(message.sender) - println("------ MESSAGEHANDLER: " + messageHandler) if (messageHandler.isDefined) { - println("-------- SCHEDULING MESSAGE") handlerExecutor.execute(new Runnable { override def run = { messageHandler.get.handle(message)