diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index e4810297b9..cd887a4f78 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -116,7 +116,7 @@ object Actor extends Logging { * */ def actor[A](body: => Unit) = { - def handler[A](body: Unit) = new { + def handler[A](body: => Unit) = new { def receive(handler: PartialFunction[Any, Unit]) = new Actor() { start body diff --git a/akka-actors/src/main/scala/stm/DataFlowVariable.scala b/akka-actors/src/main/scala/stm/DataFlowVariable.scala index 44a40f50af..83ab9d141b 100644 --- a/akka-actors/src/main/scala/stm/DataFlowVariable.scala +++ b/akka-actors/src/main/scala/stm/DataFlowVariable.scala @@ -379,4 +379,4 @@ object Test5 extends Application { setV ! 'exit //System.gc -} +} \ No newline at end of file diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java index 8ec14fcb91..059b81c1e8 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java @@ -8,7 +8,7 @@ public class PersistentClasher { @inittransactionalstate public void init() { - state = PersistentState.newMap(new CassandraStorageConfig()); + state = CassandraStorage.newMap(); } public String getState(String key) { diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java index 02a091c1f6..3cac0ae062 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java @@ -12,18 +12,19 @@ public class PersistentStateful { @inittransactionalstate public void init() { - mapState = PersistentState.newMap(new CassandraStorageConfig()); - vectorState = PersistentState.newVector(new CassandraStorageConfig()); - refState = PersistentState.newRef(new CassandraStorageConfig()); + mapState = CassandraStorage.newMap(); + vectorState = CassandraStorage.newVector(); + refState = CassandraStorage.newRef(); } public String getMapState(String key) { - return (String) mapState.get(key).get(); + byte[] bytes = (byte[]) mapState.get(key.getBytes()).get(); + return new String(bytes, 0, bytes.length); } - public String getVectorState(int index) { - return (String) vectorState.get(index); + byte[] bytes = (byte[]) vectorState.get(index); + return new String(bytes, 0, bytes.length); } public int getVectorLength() { @@ -32,62 +33,51 @@ public class PersistentStateful { public String getRefState() { if (refState.isDefined()) { - return (String) refState.get().get(); + byte[] bytes = (byte[]) refState.get().get(); + return new String(bytes, 0, bytes.length); } else throw new IllegalStateException("No such element"); } - public void setMapState(String key, String msg) { - mapState.put(key, msg); + mapState.put(key.getBytes(), msg.getBytes()); } - public void setVectorState(String msg) { - vectorState.add(msg); + vectorState.add(msg.getBytes()); } - public void setRefState(String msg) { - refState.swap(msg); + refState.swap(msg.getBytes()); } - public void success(String key, String msg) { - mapState.put(key, msg); - vectorState.add(msg); - refState.swap(msg); + mapState.put(key.getBytes(), msg.getBytes()); + vectorState.add(msg.getBytes()); + refState.swap(msg.getBytes()); } - public String failure(String key, String msg, PersistentFailer failer) { - mapState.put(key, msg); - vectorState.add(msg); - refState.swap(msg); + mapState.put(key.getBytes(), msg.getBytes()); + vectorState.add(msg.getBytes()); + refState.swap(msg.getBytes()); failer.fail(); return msg; } public String success(String key, String msg, PersistentStatefulNested nested) { - mapState.put(key, msg); - vectorState.add(msg); - refState.swap(msg); + mapState.put(key.getBytes(), msg.getBytes()); + vectorState.add(msg.getBytes()); + refState.swap(msg.getBytes()); nested.success(key, msg); return msg; } - public String failure(String key, String msg, PersistentStatefulNested nested, PersistentFailer failer) { - mapState.put(key, msg); - vectorState.add(msg); - refState.swap(msg); + mapState.put(key.getBytes(), msg.getBytes()); + vectorState.add(msg.getBytes()); + refState.swap(msg.getBytes()); nested.failure(key, msg, failer); return msg; } - - - - public void thisMethodHangs(String key, String msg, PersistentFailer failer) { - setMapState(key, msg); - } } diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java index 3251184789..50e9b7ae1d 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java @@ -12,18 +12,20 @@ public class PersistentStatefulNested { @inittransactionalstate public void init() { - mapState = PersistentState.newMap(new CassandraStorageConfig()); - vectorState = PersistentState.newVector(new CassandraStorageConfig()); - refState = PersistentState.newRef(new CassandraStorageConfig()); + mapState = CassandraStorage.newMap(); + vectorState = CassandraStorage.newVector(); + refState = CassandraStorage.newRef(); } public String getMapState(String key) { - return (String) mapState.get(key).get(); + byte[] bytes = (byte[]) mapState.get(key.getBytes()).get(); + return new String(bytes, 0, bytes.length); } public String getVectorState(int index) { - return (String) vectorState.get(index); + byte[] bytes = (byte[]) vectorState.get(index); + return new String(bytes, 0, bytes.length); } public int getVectorLength() { @@ -32,45 +34,36 @@ public class PersistentStatefulNested { public String getRefState() { if (refState.isDefined()) { - return (String) refState.get().get(); + byte[] bytes = (byte[]) refState.get().get(); + return new String(bytes, 0, bytes.length); } else throw new IllegalStateException("No such element"); } - public void setMapState(String key, String msg) { - mapState.put(key, msg); + mapState.put(key.getBytes(), msg.getBytes()); } - public void setVectorState(String msg) { - vectorState.add(msg); + vectorState.add(msg.getBytes()); } - public void setRefState(String msg) { - refState.swap(msg); + refState.swap(msg.getBytes()); } - public String success(String key, String msg) { - mapState.put(key, msg); - vectorState.add(msg); - refState.swap(msg); + mapState.put(key.getBytes(), msg.getBytes()); + vectorState.add(msg.getBytes()); + refState.swap(msg.getBytes()); return msg; } - public String failure(String key, String msg, PersistentFailer failer) { - mapState.put(key, msg); - vectorState.add(msg); - refState.swap(msg); + mapState.put(key.getBytes(), msg.getBytes()); + vectorState.add(msg.getBytes()); + refState.swap(msg.getBytes()); failer.fail(); return msg; } - - - public void thisMethodHangs(String key, String msg, PersistentFailer failer) { - setMapState(key, msg); - } } diff --git a/akka-persistence/pom.xml b/akka-persistence/pom.xml index 91a0079611..5447087e68 100644 --- a/akka-persistence/pom.xml +++ b/akka-persistence/pom.xml @@ -49,7 +49,12 @@ commons-pool 1.5.1 - + + log4j + log4j + 1.2.13 + + org.scalatest diff --git a/akka-persistence/src/main/scala/CassandraStorage.scala b/akka-persistence/src/main/scala/CassandraStorageBackend.scala similarity index 61% rename from akka-persistence/src/main/scala/CassandraStorage.scala rename to akka-persistence/src/main/scala/CassandraStorageBackend.scala index 74f06d3a9d..f5719625c7 100644 --- a/akka-persistence/src/main/scala/CassandraStorage.scala +++ b/akka-persistence/src/main/scala/CassandraStorageBackend.scala @@ -6,7 +6,6 @@ package se.scalablesolutions.akka.state import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.util.Helpers._ -import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.Config.config import org.apache.cassandra.service._ @@ -14,8 +13,14 @@ import org.apache.cassandra.service._ /** * @author Jonas Bonér */ -object CassandraStorage extends MapStorage - with VectorStorage with RefStorage with Logging { +private[akka] object CassandraStorageBackend extends + MapStorageBackend[Array[Byte], Array[Byte]] with + VectorStorageBackend[Array[Byte]] with + RefStorageBackend[Array[Byte]] with + Logging { + + type ElementType = Array[Byte] + val KEYSPACE = "akka" val MAP_COLUMN_PARENT = new ColumnParent("map", null) val VECTOR_COLUMN_PARENT = new ColumnParent("vector", null) @@ -31,35 +36,14 @@ object CassandraStorage extends MapStorage case "ONE" => 1 case "QUORUM" => 2 case "ALL" => 3 - case unknown => throw new IllegalArgumentException("Consistency level [" + unknown + "] is not supported. Expected one of [ZERO, ONE, QUORUM, ALL]") + case unknown => throw new IllegalArgumentException( + "Cassandra consistency level [" + unknown + "] is not supported. Expected one of [ZERO, ONE, QUORUM, ALL]") } } val IS_ASCENDING = true @volatile private[this] var isRunning = false private[this] val protocol: Protocol = Protocol.Binary -/* { - config.getString("akka.storage.cassandra.procotol", "binary") match { - case "binary" => Protocol.Binary - case "json" => Protocol.JSON - case "simple-json" => Protocol.SimpleJSON - case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]") - } - } -*/ - - private[this] val serializer: Serializer = { - config.getString("akka.storage.cassandra.storage-format", "manual") match { - case "scala-json" => Serializer.ScalaJSON - case "java-json" => Serializer.JavaJSON - case "protobuf" => Serializer.Protobuf - case "java" => Serializer.Java - case "manual" => Serializer.NOOP - case "sbinary" => throw new UnsupportedOperationException("SBinary serialization protocol is not yet supported for storage") - case "avro" => throw new UnsupportedOperationException("Avro serialization protocol is not yet supported for storage") - case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]") - } - } private[this] val sessions = new CassandraSessionPool( KEYSPACE, @@ -71,22 +55,22 @@ object CassandraStorage extends MapStorage // For Ref // =============================================================== - def insertRefStorageFor(name: String, element: AnyRef) = { + def insertRefStorageFor(name: String, element: Array[Byte]) = { sessions.withSession { _ ++| (name, new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY), - serializer.out(element), + element, System.currentTimeMillis, CONSISTENCY_LEVEL) } } - def getRefStorageFor(name: String): Option[AnyRef] = { + def getRefStorageFor(name: String): Option[Array[Byte]] = { try { val column: Option[ColumnOrSuperColumn] = sessions.withSession { _ | (name, new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY)) } - if (column.isDefined) Some(serializer.in(column.get.getColumn.value, None)) + if (column.isDefined) Some(column.get.getColumn.value) else None } catch { case e => @@ -99,40 +83,40 @@ object CassandraStorage extends MapStorage // For Vector // =============================================================== - def insertVectorStorageEntryFor(name: String, element: AnyRef) = { + def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = { sessions.withSession { _ ++| (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(getVectorStorageSizeFor(name))), - serializer.out(element), + element, System.currentTimeMillis, CONSISTENCY_LEVEL) } } // FIXME implement insertVectorStorageEntriesFor - def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = { - throw new UnsupportedOperationException("insertVectorStorageEntriesFor for CassandraStorage is not implemented yet") + def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = { + throw new UnsupportedOperationException("insertVectorStorageEntriesFor for CassandraStorageBackend is not implemented yet") } - def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) = { + def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = { sessions.withSession { _ ++| (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index)), - serializer.out(elem), + elem, System.currentTimeMillis, CONSISTENCY_LEVEL) } } - def getVectorStorageEntryFor(name: String, index: Int): AnyRef = { + def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = { val column: Option[ColumnOrSuperColumn] = sessions.withSession { _ | (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index))) } - if (column.isDefined) serializer.in(column.get.column.value, None) + if (column.isDefined) column.get.column.value else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]") } - def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = { + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = { val startBytes = if (start.isDefined) intToBytes(start.get) else null val finishBytes = if (finish.isDefined) intToBytes(finish.get) else null val columns: List[ColumnOrSuperColumn] = sessions.withSession { @@ -143,7 +127,7 @@ object CassandraStorage extends MapStorage count, CONSISTENCY_LEVEL) } - columns.map(column => serializer.in(column.getColumn.value, None)) + columns.map(column => column.getColumn.value) } def getVectorStorageSizeFor(name: String): Int = { @@ -156,21 +140,21 @@ object CassandraStorage extends MapStorage // For Map // =============================================================== - def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = { + def insertMapStorageEntryFor(name: String, key: Array[Byte], element: Array[Byte]) = { sessions.withSession { _ ++| (name, - new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)), - serializer.out(element), + new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, key), + element, System.currentTimeMillis, CONSISTENCY_LEVEL) } } - def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = { + def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]) = { val batch = new scala.collection.mutable.HashMap[String, List[ColumnOrSuperColumn]] for (entry <- entries) { val columnOrSuperColumn = new ColumnOrSuperColumn - columnOrSuperColumn.setColumn(new Column(serializer.out(entry._1), serializer.out(entry._2), System.currentTimeMillis)) + columnOrSuperColumn.setColumn(new Column(entry._1, entry._2, System.currentTimeMillis)) batch + (MAP_COLUMN_PARENT.getColumn_family -> List(columnOrSuperColumn)) } sessions.withSession { @@ -178,12 +162,12 @@ object CassandraStorage extends MapStorage } } - def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = { + def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = { try { val column: Option[ColumnOrSuperColumn] = sessions.withSession { - _ | (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key))) + _ | (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, key)) } - if (column.isDefined) Some(serializer.in(column.get.getColumn.value, None)) + if (column.isDefined) Some(column.get.getColumn.value) else None } catch { case e => @@ -192,13 +176,16 @@ object CassandraStorage extends MapStorage } } - def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = { + def getMapStorageFor(name: String): List[Tuple2[Array[Byte], Array[Byte]]] = { val size = getMapStorageSizeFor(name) sessions.withSession { session => - val columns = session / (name, MAP_COLUMN_PARENT, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY, true, size, CONSISTENCY_LEVEL) + val columns = session / + (name, MAP_COLUMN_PARENT, + EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY, + true, size, CONSISTENCY_LEVEL) for { columnOrSuperColumn <- columns - entry = (serializer.in(columnOrSuperColumn.column.name, None), serializer.in(columnOrSuperColumn.column.value, None)) + entry = (columnOrSuperColumn.column.name, columnOrSuperColumn.column.value) } yield entry } } @@ -209,8 +196,8 @@ object CassandraStorage extends MapStorage def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null) - def removeMapStorageFor(name: String, key: AnyRef): Unit = { - val keyBytes = if (key == null) null else serializer.out(key) + def removeMapStorageFor(name: String, key: Array[Byte]): Unit = { + val keyBytes = if (key == null) null else key sessions.withSession { _ -- (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, keyBytes), @@ -219,13 +206,13 @@ object CassandraStorage extends MapStorage } } - def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int): - List[Tuple2[AnyRef, AnyRef]] = { - val startBytes = if (start.isDefined) serializer.out(start.get) else null - val finishBytes = if (finish.isDefined) serializer.out(finish.get) else null + def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): + List[Tuple2[Array[Byte], Array[Byte]]] = { + val startBytes = if (start.isDefined) start.get else null + val finishBytes = if (finish.isDefined) finish.get else null val columns: List[ColumnOrSuperColumn] = sessions.withSession { _ / (name, MAP_COLUMN_PARENT, startBytes, finishBytes, IS_ASCENDING, count, CONSISTENCY_LEVEL) } - columns.map(column => (column.getColumn.name, serializer.in(column.getColumn.value, None))) + columns.map(column => (column.getColumn.name, column.getColumn.value)) } } diff --git a/akka-persistence/src/main/scala/MongoStorage.scala b/akka-persistence/src/main/scala/MongoStorageBackend.scala similarity index 89% rename from akka-persistence/src/main/scala/MongoStorage.scala rename to akka-persistence/src/main/scala/MongoStorageBackend.scala index 8fd7a0c4b5..62169199aa 100644 --- a/akka-persistence/src/main/scala/MongoStorage.scala +++ b/akka-persistence/src/main/scala/MongoStorageBackend.scala @@ -4,8 +4,8 @@ package se.scalablesolutions.akka.state -import util.Logging -import Config.config +import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.Config.config import sjson.json.Serializer._ @@ -23,8 +23,12 @@ import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList} *

* @author Debasish Ghosh */ -object MongoStorage extends MapStorage with VectorStorage with RefStorage with Logging { - +private[akka] object MongoStorageBackend extends + MapStorageBackend[AnyRef, AnyRef] with + VectorStorageBackend[AnyRef] with + RefStorageBackend[AnyRef] with + Logging { + // enrich with null safe findOne class RichDBCollection(value: DBCollection) { def findOneNS(o: DBObject): Option[DBObject] = { @@ -34,13 +38,13 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L } } } - + implicit def enrichDBCollection(c: DBCollection) = new RichDBCollection(c) - + val KEY = "key" val VALUE = "value" val COLLECTION = "akka_coll" - + val MONGODB_SERVER_HOSTNAME = config.getString("akka.storage.mongodb.hostname", "127.0.0.1") val MONGODB_SERVER_DBNAME = config.getString("akka.storage.mongodb.dbname", "testdb") val MONGODB_SERVER_PORT = config.getInt("akka.storage.mongodb.port", 27017) @@ -50,27 +54,27 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L // FIXME: make this pluggable private[this] val serializer = SJSON - + def insertMapStorageEntryFor(name: String, key: AnyRef, value: AnyRef) { insertMapStorageEntriesFor(name, List((key, value))) } def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) { import java.util.{Map, HashMap} - + val m: Map[AnyRef, AnyRef] = new HashMap for ((k, v) <- entries) { m.put(k, serializer.out(v)) } - + nullSafeFindOne(name) match { - case None => + case None => coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, m)) case Some(dbo) => { // collate the maps val o = dbo.get(VALUE).asInstanceOf[Map[AnyRef, AnyRef]] o.putAll(m) - + // remove existing reference removeMapStorageFor(name) // and insert @@ -78,16 +82,16 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L } } } - - def removeMapStorageFor(name: String) = { + + def removeMapStorageFor(name: String): Unit = { val q = new BasicDBObject q.put(KEY, name) coll.remove(q) } - def removeMapStorageFor(name: String, key: AnyRef) = { + def removeMapStorageFor(name: String, key: AnyRef): Unit = { nullSafeFindOne(name) match { - case None => + case None => case Some(dbo) => { val orig = dbo.get(VALUE).asInstanceOf[DBObject].toMap if (key.isInstanceOf[List[_]]) { @@ -104,10 +108,10 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L } } } - - def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = + + def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = getValueForKey(name, key.asInstanceOf[String]) - + def getMapStorageSizeFor(name: String): Int = { nullSafeFindOne(name) match { case None => 0 @@ -115,55 +119,55 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]].keySet.size } } - + def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = { - val m = + val m = nullSafeFindOne(name) match { - case None => + case None => throw new Predef.NoSuchElementException(name + " not present") case Some(dbo) => dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]] } - val n = + val n = List(m.keySet.toArray: _*).asInstanceOf[List[String]] - val vals = - for(s <- n) + val vals = + for(s <- n) yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]])) vals.asInstanceOf[List[Tuple2[String, AnyRef]]] } - - def getMapStorageRangeFor(name: String, start: Option[AnyRef], - finish: Option[AnyRef], + + def getMapStorageRangeFor(name: String, start: Option[AnyRef], + finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] = { - val m = + val m = nullSafeFindOne(name) match { - case None => + case None => throw new Predef.NoSuchElementException(name + " not present") case Some(dbo) => dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]] } /** - * count is the max number of results to return. Start with + * count is the max number of results to return. Start with * start or 0 (if start is not defined) and go until * you hit finish or count. */ val s = if (start.isDefined) start.get.asInstanceOf[Int] else 0 - val cnt = + val cnt = if (finish.isDefined) { val f = finish.get.asInstanceOf[Int] if (f >= s) Math.min(count, (f - s)) else count } else count - val n = + val n = List(m.keySet.toArray: _*).asInstanceOf[List[String]].sort((e1, e2) => (e1 compareTo e2) < 0).slice(s, s + cnt) - val vals = - for(s <- n) + val vals = + for(s <- n) yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]])) vals.asInstanceOf[List[Tuple2[String, AnyRef]]] } - + private def getValueForKey(name: String, key: String): Option[AnyRef] = { try { nullSafeFindOne(name) match { @@ -179,16 +183,16 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L throw new Predef.NoSuchElementException(e.getMessage) } } - + def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = { val q = new BasicDBObject q.put(KEY, name) - + val currentList = coll.findOneNS(q) match { - case None => + case None => new JArrayList[AnyRef] - case Some(dbo) => + case Some(dbo) => dbo.get(VALUE).asInstanceOf[JArrayList[AnyRef]] } if (!currentList.isEmpty) { @@ -196,26 +200,26 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L // remove before adding coll.remove(q) } - + // add to the current list elements.map(serializer.out(_)).foreach(currentList.add(_)) - + coll.insert( new BasicDBObject() .append(KEY, name) .append(VALUE, currentList) ) } - + def insertVectorStorageEntryFor(name: String, element: AnyRef) = { insertVectorStorageEntriesFor(name, List(element)) } - + def getVectorStorageEntryFor(name: String, index: Int): AnyRef = { try { val o = nullSafeFindOne(name) match { - case None => + case None => throw new Predef.NoSuchElementException(name + " not present") case Some(dbo) => @@ -224,17 +228,17 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L serializer.in[AnyRef]( o.get(index).asInstanceOf[Array[Byte]]) } catch { - case e => + case e => throw new Predef.NoSuchElementException(e.getMessage) } } - - def getVectorStorageRangeFor(name: String, + + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = { try { val o = nullSafeFindOne(name) match { - case None => + case None => throw new Predef.NoSuchElementException(name + " not present") case Some(dbo) => @@ -242,24 +246,24 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L } // pick the subrange and make a Scala list - val l = + val l = List(o.subList(start.get, start.get + count).toArray: _*) - for(e <- l) + for(e <- l) yield serializer.in[AnyRef](e.asInstanceOf[Array[Byte]]) } catch { - case e => + case e => throw new Predef.NoSuchElementException(e.getMessage) } } - + // FIXME implement updateVectorStorageEntryFor def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) = throw new UnsupportedOperationException - + def getVectorStorageSizeFor(name: String): Int = { nullSafeFindOne(name) match { case None => 0 - case Some(dbo) => + case Some(dbo) => dbo.get(VALUE).asInstanceOf[JList[AnyRef]].size } } diff --git a/akka-persistence/src/main/scala/PersistentState.scala b/akka-persistence/src/main/scala/PersistentState.scala deleted file mode 100644 index f08d2cd925..0000000000 --- a/akka-persistence/src/main/scala/PersistentState.scala +++ /dev/null @@ -1,352 +0,0 @@ -/** - * Copyright (C) 2009 Scalable Solutions. - */ - -package se.scalablesolutions.akka.state - -import se.scalablesolutions.akka.stm.TransactionManagement.currentTransaction -import se.scalablesolutions.akka.collection._ -import se.scalablesolutions.akka.util.Logging - -import org.codehaus.aspectwerkz.proxy.Uuid - -class NoTransactionInScopeException extends RuntimeException - -sealed abstract class PersistentStateConfig -abstract class PersistentStorageConfig extends PersistentStateConfig -case class CassandraStorageConfig() extends PersistentStorageConfig -case class TerracottaStorageConfig() extends PersistentStorageConfig -case class TokyoCabinetStorageConfig() extends PersistentStorageConfig -case class MongoStorageConfig() extends PersistentStorageConfig - -/** - * Example Scala usage. - *

- * New map with generated id. - *

- * val myMap = PersistentState.newMap(CassandraStorageConfig)
- * 
- * - * New map with user-defined id. - *
- * val myMap = PersistentState.newMap(CassandraStorageConfig, id)
- * 
- * - * Get map by user-defined id. - *
- * val myMap = PersistentState.getMap(CassandraStorageConfig, id)
- * 
- * - * Example Java usage: - *
- * TransactionalMap myMap = PersistentState.newMap(new CassandraStorageConfig());
- * 
- * - * @author Jonas Bonér - */ -object PersistentState { - def newMap(config: PersistentStorageConfig): PersistentMap = - // FIXME: won't work across the remote machines, use [http://johannburkard.de/software/uuid/] - newMap(config, Uuid.newUuid.toString) - - def newVector(config: PersistentStorageConfig): PersistentVector = - newVector(config, Uuid.newUuid.toString) - - def newRef(config: PersistentStorageConfig): PersistentRef = - newRef(config, Uuid.newUuid.toString) - - def getMap(config: PersistentStorageConfig, id: String): PersistentMap = - newMap(config, id) - - def getVector(config: PersistentStorageConfig, id: String): PersistentVector = - newVector(config, id) - - def getRef(config: PersistentStorageConfig, id: String): PersistentRef = - newRef(config, id) - - def newMap(config: PersistentStorageConfig, id: String): PersistentMap = config match { - case CassandraStorageConfig() => new CassandraPersistentMap(id) - case MongoStorageConfig() => new MongoPersistentMap(id) - case TerracottaStorageConfig() => throw new UnsupportedOperationException - case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException - } - - def newVector(config: PersistentStorageConfig, id: String): PersistentVector = config match { - case CassandraStorageConfig() => new CassandraPersistentVector(id) - case MongoStorageConfig() => new MongoPersistentVector(id) - case TerracottaStorageConfig() => throw new UnsupportedOperationException - case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException - } - - def newRef(config: PersistentStorageConfig, id: String): PersistentRef = config match { - case CassandraStorageConfig() => new CassandraPersistentRef(id) - case MongoStorageConfig() => new MongoPersistentRef(id) - case TerracottaStorageConfig() => throw new UnsupportedOperationException - case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException - } -} - -/** - * Implementation of PersistentMap for every concrete - * storage will have the same workflow. This abstracts the workflow. - * - * Subclasses just need to provide the actual concrete instance for the - * abstract val storage. - * - * @author Jonas Bonér - */ -trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef] - with Transactional with Committable with Logging { - protected val newAndUpdatedEntries = TransactionalState.newMap[AnyRef, AnyRef] - protected val removedEntries = TransactionalState.newVector[AnyRef] - protected val shouldClearOnCommit = TransactionalRef[Boolean]() - - // to be concretized in subclasses - val storage: MapStorage - - def commit = { - storage.removeMapStorageFor(uuid, removedEntries.toList) - storage.insertMapStorageEntriesFor(uuid, newAndUpdatedEntries.toList) - if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get.get) - storage.removeMapStorageFor(uuid) - newAndUpdatedEntries.clear - removedEntries.clear - } - - def -=(key: AnyRef) = remove(key) - - def +=(key: AnyRef, value: AnyRef) = put(key, value) - - override def put(key: AnyRef, value: AnyRef): Option[AnyRef] = { - register - newAndUpdatedEntries.put(key, value) - } - - override def update(key: AnyRef, value: AnyRef) = { - register - newAndUpdatedEntries.update(key, value) - } - - def remove(key: AnyRef) = { - register - removedEntries.add(key) - } - - def slice(start: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] = - slice(start, None, count) - - def slice(start: Option[AnyRef], finish: Option[AnyRef], count: Int): - List[Tuple2[AnyRef, AnyRef]] = try { - storage.getMapStorageRangeFor(uuid, start, finish, count) - } catch { case e: Exception => Nil } - - override def clear = { - register - shouldClearOnCommit.swap(true) - } - - override def contains(key: AnyRef): Boolean = try { - newAndUpdatedEntries.contains(key) || - storage.getMapStorageEntryFor(uuid, key).isDefined - } catch { case e: Exception => false } - - override def size: Int = try { - storage.getMapStorageSizeFor(uuid) - } catch { case e: Exception => 0 } - - override def get(key: AnyRef): Option[AnyRef] = { - if (newAndUpdatedEntries.contains(key)) { - newAndUpdatedEntries.get(key) - } - else try { - storage.getMapStorageEntryFor(uuid, key) - } catch { case e: Exception => None } - } - - override def elements: Iterator[Tuple2[AnyRef, AnyRef]] = { - new Iterator[Tuple2[AnyRef, AnyRef]] { - private val originalList: List[Tuple2[AnyRef, AnyRef]] = try { - storage.getMapStorageFor(uuid) - } catch { - case e: Throwable => Nil - } - // FIXME how to deal with updated entries, these should be replaced in the originalList not just added - private var elements = newAndUpdatedEntries.toList ::: originalList.reverse - override def next: Tuple2[AnyRef, AnyRef]= synchronized { - val element = elements.head - elements = elements.tail - element - } - override def hasNext: Boolean = synchronized { !elements.isEmpty } - } - } - - private def register = { - if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException - currentTransaction.get.get.register(uuid, this) - } -} - -/** - * Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage. - * - * @author Jonas Bonér - */ -class CassandraPersistentMap(id: String) extends PersistentMap { - val uuid = id - val storage = CassandraStorage -} - -/** - * Implements a persistent transactional map based on the MongoDB document storage. - * - * @author Debasish Ghosh - */ -class MongoPersistentMap(id: String) extends PersistentMap { - val uuid = id - val storage = MongoStorage -} - -/** - * Implements a template for a concrete persistent transactional vector based storage. - * - * @author Jonas Bonér - */ -trait PersistentVector extends RandomAccessSeq[AnyRef] with Transactional with Committable { - protected val newElems = TransactionalState.newVector[AnyRef] - protected val updatedElems = TransactionalState.newMap[Int, AnyRef] - protected val removedElems = TransactionalState.newVector[AnyRef] - protected val shouldClearOnCommit = TransactionalRef[Boolean]() - - val storage: VectorStorage - - def commit = { - // FIXME: should use batch function once the bug is resolved - for (element <- newElems) storage.insertVectorStorageEntryFor(uuid, element) - for (entry <- updatedElems) storage.updateVectorStorageEntryFor(uuid, entry._1, entry._2) - newElems.clear - updatedElems.clear - } - - def +(elem: AnyRef) = add(elem) - - def add(elem: AnyRef) = { - register - newElems + elem - } - - def apply(index: Int): AnyRef = get(index) - - def get(index: Int): AnyRef = { - if (newElems.size > index) newElems(index) - else storage.getVectorStorageEntryFor(uuid, index) - } - - override def slice(start: Int, count: Int): RandomAccessSeq[AnyRef] = slice(Some(start), None, count) - - def slice(start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[AnyRef] = { - val buffer = new scala.collection.mutable.ArrayBuffer[AnyRef] - storage.getVectorStorageRangeFor(uuid, start, finish, count).foreach(buffer.append(_)) - buffer - } - - /** - * Removes the tail element of this vector. - */ - // FIXME: implement persistent vector pop - def pop: AnyRef = { - register - throw new UnsupportedOperationException("need to implement persistent vector pop") - } - - def update(index: Int, newElem: AnyRef) = { - register - storage.updateVectorStorageEntryFor(uuid, index, newElem) - } - - override def first: AnyRef = get(0) - - override def last: AnyRef = { - if (newElems.length != 0) newElems.last - else { - val len = length - if (len == 0) throw new NoSuchElementException("Vector is empty") - get(len - 1) - } - } - - def length: Int = storage.getVectorStorageSizeFor(uuid) + newElems.length - - private def register = { - if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException - currentTransaction.get.get.register(uuid, this) - } -} - -/** - * Implements a persistent transactional vector based on the Cassandra - * distributed P2P key-value storage. - * - * @author Jonas Bonér - */ -class CassandraPersistentVector(id: String) extends PersistentVector { - val uuid = id - val storage = CassandraStorage -} - -/** - * Implements a persistent transactional vector based on the MongoDB - * document storage. - * - * @author Debaissh Ghosh - */ -class MongoPersistentVector(id: String) extends PersistentVector { - val uuid = id - val storage = MongoStorage -} - -/** - * Implements a persistent reference with abstract storage. - * - * @author Jonas Bonér - */ -trait PersistentRef extends Transactional with Committable { - protected val ref = new TransactionalRef[AnyRef] - - val storage: RefStorage - - def commit = if (ref.isDefined) { - storage.insertRefStorageFor(uuid, ref.get.get) - ref.swap(null) - } - - def swap(elem: AnyRef) = { - register - ref.swap(elem) - } - - def get: Option[AnyRef] = if (ref.isDefined) ref.get else storage.getRefStorageFor(uuid) - - def isDefined: Boolean = ref.isDefined || storage.getRefStorageFor(uuid).isDefined - - def getOrElse(default: => AnyRef): AnyRef = { - val current = get - if (current.isDefined) current.get - else default - } - - private def register = { - if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException - currentTransaction.get.get.register(uuid, this) - } -} - -class CassandraPersistentRef(id: String) extends PersistentRef { - val uuid = id - val storage = CassandraStorage -} - -class MongoPersistentRef(id: String) extends PersistentRef { - val uuid = id - val storage = MongoStorage -} diff --git a/akka-persistence/src/main/scala/Storage.scala b/akka-persistence/src/main/scala/Storage.scala index 52dc45afa7..5f30cc3319 100644 --- a/akka-persistence/src/main/scala/Storage.scala +++ b/akka-persistence/src/main/scala/Storage.scala @@ -4,33 +4,352 @@ package se.scalablesolutions.akka.state -// abstracts persistence storage -trait Storage +import se.scalablesolutions.akka.stm.TransactionManagement.currentTransaction +import se.scalablesolutions.akka.collection._ +import se.scalablesolutions.akka.util.Logging -// for Maps -trait MapStorage extends Storage { - def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) - def insertMapStorageEntryFor(name: String, key: AnyRef, value: AnyRef) - def removeMapStorageFor(name: String) - def removeMapStorageFor(name: String, key: AnyRef) - def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] - def getMapStorageSizeFor(name: String): Int - def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] - def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] +import org.codehaus.aspectwerkz.proxy.Uuid + +class NoTransactionInScopeException extends RuntimeException + +/** + * Example Scala usage. + *

+ * New map with generated id. + *

+ * val myMap = CassandraStorage.newMap
+ * 
+ * + * New map with user-defined id. + *
+ * val myMap = MongoStorage.newMap(id)
+ * 
+ * + * Get map by user-defined id. + *
+ * val myMap = CassandraStorage.getMap(id)
+ * 
+ * + * Example Java usage: + *
+ * PersistentMap myMap = MongoStorage.newMap();
+ * 
+ * Or: + *
+ * MongoPersistentMap myMap = MongoStorage.getMap(id);
+ * 
+ * + * @author Jonas Bonér + */ +trait Storage { + // FIXME: The UUID won't work across the remote machines, use [http://johannburkard.de/software/uuid/] + type ElementType + + def newMap: PersistentMap[ElementType, ElementType] + def newVector: PersistentVector[ElementType] + def newRef: PersistentRef[ElementType] + + def getMap(id: String): PersistentMap[ElementType, ElementType] + def getVector(id: String): PersistentVector[ElementType] + def getRef(id: String): PersistentRef[ElementType] + + def newMap(id: String): PersistentMap[ElementType, ElementType] + def newVector(id: String): PersistentVector[ElementType] + def newRef(id: String): PersistentRef[ElementType] } -// for Vectors -trait VectorStorage extends Storage { - def insertVectorStorageEntryFor(name: String, element: AnyRef) - def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) - def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) - def getVectorStorageEntryFor(name: String, index: Int): AnyRef - def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] - def getVectorStorageSizeFor(name: String): Int +object CassandraStorage extends Storage { + type ElementType = Array[Byte] + + def newMap: PersistentMap[ElementType, ElementType] = newMap(Uuid.newUuid.toString) + def newVector: PersistentVector[ElementType] = newVector(Uuid.newUuid.toString) + def newRef: PersistentRef[ElementType] = newRef(Uuid.newUuid.toString) + + def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id) + def getVector(id: String): PersistentVector[ElementType] = newVector(id) + def getRef(id: String): PersistentRef[ElementType] = newRef(id) + + def newMap(id: String): PersistentMap[ElementType, ElementType] = new CassandraPersistentMap(id) + def newVector(id: String): PersistentVector[ElementType] = new CassandraPersistentVector(id) + def newRef(id: String): PersistentRef[ElementType] = new CassandraPersistentRef(id) } -// for Ref -trait RefStorage extends Storage { - def insertRefStorageFor(name: String, element: AnyRef) - def getRefStorageFor(name: String): Option[AnyRef] +object MongoStorage extends Storage { + type ElementType = AnyRef + + def newMap: PersistentMap[ElementType, ElementType] = newMap(Uuid.newUuid.toString) + def newVector: PersistentVector[ElementType] = newVector(Uuid.newUuid.toString) + def newRef: PersistentRef[ElementType] = newRef(Uuid.newUuid.toString) + + def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id) + def getVector(id: String): PersistentVector[ElementType] = newVector(id) + def getRef(id: String): PersistentRef[ElementType] = newRef(id) + + def newMap(id: String): PersistentMap[ElementType, ElementType] = new MongoPersistentMap(id) + def newVector(id: String): PersistentVector[ElementType] = new MongoPersistentVector(id) + def newRef(id: String): PersistentRef[ElementType] = new MongoPersistentRef(id) +} + +/** + * Implementation of PersistentMap for every concrete + * storage will have the same workflow. This abstracts the workflow. + * + * Subclasses just need to provide the actual concrete instance for the + * abstract val storage. + * + * @author Jonas Bonér + */ +trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] + with Transactional with Committable with Logging { + protected val newAndUpdatedEntries = TransactionalState.newMap[K, V] + protected val removedEntries = TransactionalState.newVector[K] + protected val shouldClearOnCommit = TransactionalRef[Boolean]() + + // to be concretized in subclasses + val storage: MapStorageBackend[K, V] + + def commit = { + removedEntries.toList.foreach(key => storage.removeMapStorageFor(uuid, key)) + storage.insertMapStorageEntriesFor(uuid, newAndUpdatedEntries.toList) + if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get.get) + storage.removeMapStorageFor(uuid) + newAndUpdatedEntries.clear + removedEntries.clear + } + + def -=(key: K) = remove(key) + + def +=(key: K, value: V) = put(key, value) + + override def put(key: K, value: V): Option[V] = { + register + newAndUpdatedEntries.put(key, value) + } + + override def update(key: K, value: V) = { + register + newAndUpdatedEntries.update(key, value) + } + + def remove(key: K) = { + register + removedEntries.add(key) + } + + def slice(start: Option[K], count: Int): List[Tuple2[K, V]] = + slice(start, None, count) + + def slice(start: Option[K], finish: Option[K], count: Int): List[Tuple2[K, V]] = try { + storage.getMapStorageRangeFor(uuid, start, finish, count) + } catch { case e: Exception => Nil } + + override def clear = { + register + shouldClearOnCommit.swap(true) + } + + override def contains(key: K): Boolean = try { + newAndUpdatedEntries.contains(key) || + storage.getMapStorageEntryFor(uuid, key).isDefined + } catch { case e: Exception => false } + + override def size: Int = try { + storage.getMapStorageSizeFor(uuid) + } catch { case e: Exception => 0 } + + override def get(key: K): Option[V] = { + if (newAndUpdatedEntries.contains(key)) { + newAndUpdatedEntries.get(key) + } + else try { + storage.getMapStorageEntryFor(uuid, key) + } catch { case e: Exception => None } + } + + override def elements: Iterator[Tuple2[K, V]] = { + new Iterator[Tuple2[K, V]] { + private val originalList: List[Tuple2[K, V]] = try { + storage.getMapStorageFor(uuid) + } catch { + case e: Throwable => Nil + } + // FIXME how to deal with updated entries, these should be replaced in the originalList not just added + private var elements = newAndUpdatedEntries.toList ::: originalList.reverse + override def next: Tuple2[K, V]= synchronized { + val element = elements.head + elements = elements.tail + element + } + override def hasNext: Boolean = synchronized { !elements.isEmpty } + } + } + + private def register = { + if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException + currentTransaction.get.get.register(uuid, this) + } +} + +/** + * Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage. + * + * @author Jonas Bonér + */ +class CassandraPersistentMap(id: String) extends PersistentMap[Array[Byte], Array[Byte]] { + val uuid = id + val storage = CassandraStorageBackend +} + +/** + * Implements a persistent transactional map based on the MongoDB document storage. + * + * @author Debasish Ghosh + */ +class MongoPersistentMap(id: String) extends PersistentMap[AnyRef, AnyRef] { + val uuid = id + val storage = MongoStorageBackend +} + +/** + * Implements a template for a concrete persistent transactional vector based storage. + * + * @author Jonas Bonér + */ +trait PersistentVector[T] extends RandomAccessSeq[T] with Transactional with Committable { + protected val newElems = TransactionalState.newVector[T] + protected val updatedElems = TransactionalState.newMap[Int, T] + protected val removedElems = TransactionalState.newVector[T] + protected val shouldClearOnCommit = TransactionalRef[Boolean]() + + val storage: VectorStorageBackend[T] + + def commit = { + // FIXME: should use batch function once the bug is resolved + for (element <- newElems) storage.insertVectorStorageEntryFor(uuid, element) + for (entry <- updatedElems) storage.updateVectorStorageEntryFor(uuid, entry._1, entry._2) + newElems.clear + updatedElems.clear + } + + def +(elem: T) = add(elem) + + def add(elem: T) = { + register + newElems + elem + } + + def apply(index: Int): T = get(index) + + def get(index: Int): T = { + if (newElems.size > index) newElems(index) + else storage.getVectorStorageEntryFor(uuid, index) + } + + override def slice(start: Int, count: Int): RandomAccessSeq[T] = slice(Some(start), None, count) + + def slice(start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[T] = { + val buffer = new scala.collection.mutable.ArrayBuffer[T] + storage.getVectorStorageRangeFor(uuid, start, finish, count).foreach(buffer.append(_)) + buffer + } + + /** + * Removes the tail element of this vector. + */ + // FIXME: implement persistent vector pop + def pop: T = { + register + throw new UnsupportedOperationException("need to implement persistent vector pop") + } + + def update(index: Int, newElem: T) = { + register + storage.updateVectorStorageEntryFor(uuid, index, newElem) + } + + override def first: T = get(0) + + override def last: T = { + if (newElems.length != 0) newElems.last + else { + val len = length + if (len == 0) throw new NoSuchElementException("Vector is empty") + get(len - 1) + } + } + + def length: Int = storage.getVectorStorageSizeFor(uuid) + newElems.length + + private def register = { + if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException + currentTransaction.get.get.register(uuid, this) + } +} + +/** + * Implements a persistent transactional vector based on the Cassandra + * distributed P2P key-value storage. + * + * @author Jonas Bonér + */ +class CassandraPersistentVector(id: String) extends PersistentVector[Array[Byte]] { + val uuid = id + val storage = CassandraStorageBackend +} + +/** + * Implements a persistent transactional vector based on the MongoDB + * document storage. + * + * @author Debaissh Ghosh + */ +class MongoPersistentVector(id: String) extends PersistentVector[AnyRef] { + val uuid = id + val storage = MongoStorageBackend +} + +/** + * Implements a persistent reference with abstract storage. + * + * @author Jonas Bonér + */ +trait PersistentRef[T] extends Transactional with Committable { + protected val ref = new TransactionalRef[T] + + val storage: RefStorageBackend[T] + + def commit = if (ref.isDefined) { + storage.insertRefStorageFor(uuid, ref.get.get) + ref.swap(null.asInstanceOf[T]) + } + + def swap(elem: T) = { + register + ref.swap(elem) + } + + def get: Option[T] = if (ref.isDefined) ref.get else storage.getRefStorageFor(uuid) + + def isDefined: Boolean = ref.isDefined || storage.getRefStorageFor(uuid).isDefined + + def getOrElse(default: => T): T = { + val current = get + if (current.isDefined) current.get + else default + } + + private def register = { + if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException + currentTransaction.get.get.register(uuid, this) + } +} + +class CassandraPersistentRef(id: String) extends PersistentRef[Array[Byte]] { + val uuid = id + val storage = CassandraStorageBackend +} + +class MongoPersistentRef(id: String) extends PersistentRef[AnyRef] { + val uuid = id + val storage = MongoStorageBackend } diff --git a/akka-persistence/src/main/scala/StorageBackend.scala b/akka-persistence/src/main/scala/StorageBackend.scala new file mode 100644 index 0000000000..76a7ccdfdf --- /dev/null +++ b/akka-persistence/src/main/scala/StorageBackend.scala @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.state + +// abstracts persistence storage +trait StorageBackend + +// for Maps +trait MapStorageBackend[K, V] extends StorageBackend { + def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[K, V]]) + def insertMapStorageEntryFor(name: String, key: K, value: V) + def removeMapStorageFor(name: String) + def removeMapStorageFor(name: String, key: K) + def getMapStorageEntryFor(name: String, key: K): Option[V] + def getMapStorageSizeFor(name: String): Int + def getMapStorageFor(name: String): List[Tuple2[K, V]] + def getMapStorageRangeFor(name: String, start: Option[K], finish: Option[K], count: Int): List[Tuple2[K, V]] +} + +// for Vectors +trait VectorStorageBackend[T] extends StorageBackend { + def insertVectorStorageEntryFor(name: String, element: T) + def insertVectorStorageEntriesFor(name: String, elements: List[T]) + def updateVectorStorageEntryFor(name: String, index: Int, elem: T) + def getVectorStorageEntryFor(name: String, index: Int): T + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[T] + def getVectorStorageSizeFor(name: String): Int +} + +// for Ref +trait RefStorageBackend[T] extends StorageBackend { + def insertRefStorageFor(name: String, element: T) + def getRefStorageFor(name: String): Option[T] +} diff --git a/akka-persistence/src/test/scala/CassandraPersistentActorSpec.scala b/akka-persistence/src/test/scala/CassandraPersistentActorSpec.scala index 305763eba3..2142311f76 100644 --- a/akka-persistence/src/test/scala/CassandraPersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/CassandraPersistentActorSpec.scala @@ -1,13 +1,10 @@ package se.scalablesolutions.akka.state -import akka.actor.Actor -import java.util.concurrent.locks.ReentrantLock -import java.util.concurrent.TimeUnit +import se.scalablesolutions.akka.actor.Actor import junit.framework.TestCase -import dispatch._ -import org.junit.{Test, Before} +import org.junit.Test import org.junit.Assert._ case class GetMapState(key: String) @@ -31,35 +28,35 @@ class CassandraPersistentActor extends Actor { timeout = 100000 makeTransactionRequired - private lazy val mapState: PersistentMap = PersistentState.newMap(CassandraStorageConfig()) - private lazy val vectorState: PersistentVector = PersistentState.newVector(CassandraStorageConfig()) - private lazy val refState: PersistentRef = PersistentState.newRef(CassandraStorageConfig()) + private lazy val mapState = CassandraStorage.newMap + private lazy val vectorState = CassandraStorage.newVector + private lazy val refState = CassandraStorage.newRef def receive = { case GetMapState(key) => - reply(mapState.get(key).get) + reply(mapState.get(key.getBytes("UTF-8")).get) case GetVectorSize => reply(vectorState.length.asInstanceOf[AnyRef]) case GetRefState => reply(refState.get.get) case SetMapState(key, msg) => - mapState.put(key, msg) + mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8")) reply(msg) case SetVectorState(msg) => - vectorState.add(msg) + vectorState.add(msg.getBytes("UTF-8")) reply(msg) case SetRefState(msg) => - refState.swap(msg) + refState.swap(msg.getBytes("UTF-8")) reply(msg) case Success(key, msg) => - mapState.put(key, msg) - vectorState.add(msg) - refState.swap(msg) + mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8")) + vectorState.add(msg.getBytes("UTF-8")) + refState.swap(msg.getBytes("UTF-8")) reply(msg) case Failure(key, msg, failer) => - mapState.put(key, msg) - vectorState.add(msg) - refState.swap(msg) + mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8")) + vectorState.add(msg.getBytes("UTF-8")) + refState.swap(msg.getBytes("UTF-8")) failer !! "Failure" reply(msg) } @@ -74,14 +71,15 @@ class CassandraPersistentActor extends Actor { } class CassandraPersistentActorSpec extends TestCase { - + @Test def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { val stateful = new CassandraPersistentActor stateful.start stateful !! SetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired - assertEquals("new state", (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get) + val result: Array[Byte] = (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get + assertEquals("new state", new String(result, 0, result.length, "UTF-8")) } @Test @@ -95,7 +93,8 @@ class CassandraPersistentActorSpec extends TestCase { stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method fail("should have thrown an exception") } catch {case e: RuntimeException => {}} - assertEquals("init", (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state + val result: Array[Byte] = (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get + assertEquals("init", new String(result, 0, result.length, "UTF-8")) // check that state is == init state } @Test @@ -127,7 +126,8 @@ class CassandraPersistentActorSpec extends TestCase { stateful.start stateful !! SetRefState("init") // set init state stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired - assertEquals("new state", (stateful !! GetRefState).get) + val result: Array[Byte] = (stateful !! GetRefState).get + assertEquals("new state", new String(result, 0, result.length, "UTF-8")) } @Test @@ -141,6 +141,7 @@ class CassandraPersistentActorSpec extends TestCase { stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method fail("should have thrown an exception") } catch {case e: RuntimeException => {}} - assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state + val result: Array[Byte] = (stateful !! GetRefState).get + assertEquals("init", new String(result, 0, result.length, "UTF-8")) // check that state is == init state } } diff --git a/akka-persistence/src/test/scala/MongoPersistentActorSpec.scala b/akka-persistence/src/test/scala/MongoPersistentActorSpec.scala index 10bf943dbb..051cfbfa12 100644 --- a/akka-persistence/src/test/scala/MongoPersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/MongoPersistentActorSpec.scala @@ -31,10 +31,8 @@ case object LogSize class BankAccountActor extends Actor { makeTransactionRequired - private val accountState = - PersistentState.newMap(MongoStorageConfig()) - private val txnLog = - PersistentState.newVector(MongoStorageConfig()) + private val accountState = MongoStorage.newMap + private val txnLog = MongoStorage.newVector def receive: PartialFunction[Any, Unit] = { // check balance diff --git a/akka-persistence/src/test/scala/MongoStorageSpec.scala b/akka-persistence/src/test/scala/MongoStorageSpec.scala index 4adf61ced2..fae6d7f00d 100644 --- a/akka-persistence/src/test/scala/MongoStorageSpec.scala +++ b/akka-persistence/src/test/scala/MongoStorageSpec.scala @@ -14,7 +14,7 @@ class MongoStorageSpec extends TestCase { val changeSetM = new scala.collection.mutable.HashMap[AnyRef, AnyRef] override def setUp = { - MongoStorage.coll.drop + MongoStorageBackend.coll.drop } @Test @@ -22,40 +22,40 @@ class MongoStorageSpec extends TestCase { changeSetV += "debasish" // string changeSetV += List(1, 2, 3) // Scala List changeSetV += List(100, 200) - MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList) + MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList) assertEquals( 3, - MongoStorage.getVectorStorageSizeFor("U-A1")) + MongoStorageBackend.getVectorStorageSizeFor("U-A1")) changeSetV.clear // changeSetV should be reinitialized changeSetV += List(12, 23, 45) changeSetV += "maulindu" - MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList) + MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList) assertEquals( 5, - MongoStorage.getVectorStorageSizeFor("U-A1")) + MongoStorageBackend.getVectorStorageSizeFor("U-A1")) // add more to the same changeSetV changeSetV += "ramanendu" changeSetV += Map(1 -> "dg", 2 -> "mc") // add for a diff transaction - MongoStorage.insertVectorStorageEntriesFor("U-A2", changeSetV.toList) + MongoStorageBackend.insertVectorStorageEntriesFor("U-A2", changeSetV.toList) assertEquals( 4, - MongoStorage.getVectorStorageSizeFor("U-A2")) + MongoStorageBackend.getVectorStorageSizeFor("U-A2")) // previous transaction change set should remain same assertEquals( 5, - MongoStorage.getVectorStorageSizeFor("U-A1")) + MongoStorageBackend.getVectorStorageSizeFor("U-A1")) // test single element entry - MongoStorage.insertVectorStorageEntryFor("U-A1", Map(1->1, 2->4, 3->9)) + MongoStorageBackend.insertVectorStorageEntryFor("U-A1", Map(1->1, 2->4, 3->9)) assertEquals( 6, - MongoStorage.getVectorStorageSizeFor("U-A1")) + MongoStorageBackend.getVectorStorageSizeFor("U-A1")) } @Test @@ -64,25 +64,25 @@ class MongoStorageSpec extends TestCase { // initially everything 0 assertEquals( 0, - MongoStorage.getVectorStorageSizeFor("U-A2")) + MongoStorageBackend.getVectorStorageSizeFor("U-A2")) assertEquals( 0, - MongoStorage.getVectorStorageSizeFor("U-A1")) + MongoStorageBackend.getVectorStorageSizeFor("U-A1")) // get some stuff changeSetV += "debasish" changeSetV += List(BigDecimal(12), BigDecimal(13), BigDecimal(14)) - MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList) + MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList) assertEquals( 2, - MongoStorage.getVectorStorageSizeFor("U-A1")) + MongoStorageBackend.getVectorStorageSizeFor("U-A1")) - val JsString(str) = MongoStorage.getVectorStorageEntryFor("U-A1", 0).asInstanceOf[JsString] + val JsString(str) = MongoStorageBackend.getVectorStorageEntryFor("U-A1", 0).asInstanceOf[JsString] assertEquals("debasish", str) - val l = MongoStorage.getVectorStorageEntryFor("U-A1", 1).asInstanceOf[JsValue] + val l = MongoStorageBackend.getVectorStorageEntryFor("U-A1", 1).asInstanceOf[JsValue] val num_list = list ! num val num_list(l0) = l assertEquals(List(12, 13, 14), l0) @@ -91,14 +91,14 @@ class MongoStorageSpec extends TestCase { changeSetV += Map(1->1, 2->4, 3->9) changeSetV += BigInt(2310) changeSetV += List(100, 200, 300) - MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList) + MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList) assertEquals( 5, - MongoStorage.getVectorStorageSizeFor("U-A1")) + MongoStorageBackend.getVectorStorageSizeFor("U-A1")) val r = - MongoStorage.getVectorStorageRangeFor("U-A1", Some(1), None, 3) + MongoStorageBackend.getVectorStorageRangeFor("U-A1", Some(1), None, 3) assertEquals(3, r.size) val lr = r(0).asInstanceOf[JsValue] @@ -109,12 +109,12 @@ class MongoStorageSpec extends TestCase { @Test def testVectorFetchForNonExistentKeys = { try { - MongoStorage.getVectorStorageEntryFor("U-A1", 1) + MongoStorageBackend.getVectorStorageEntryFor("U-A1", 1) fail("should throw an exception") } catch {case e: Predef.NoSuchElementException => {}} try { - MongoStorage.getVectorStorageRangeFor("U-A1", Some(2), None, 12) + MongoStorageBackend.getVectorStorageRangeFor("U-A1", Some(2), None, 12) fail("should throw an exception") } catch {case e: Predef.NoSuchElementException => {}} } @@ -128,43 +128,43 @@ class MongoStorageSpec extends TestCase { changeSetM += "6" -> java.util.Calendar.getInstance.getTime // insert all into Mongo - MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList) + MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList) assertEquals( 6, - MongoStorage.getMapStorageSizeFor("U-M1")) + MongoStorageBackend.getMapStorageSizeFor("U-M1")) // individual insert api - MongoStorage.insertMapStorageEntryFor("U-M1", "7", "akka") - MongoStorage.insertMapStorageEntryFor("U-M1", "8", List(23, 25)) + MongoStorageBackend.insertMapStorageEntryFor("U-M1", "7", "akka") + MongoStorageBackend.insertMapStorageEntryFor("U-M1", "8", List(23, 25)) assertEquals( 8, - MongoStorage.getMapStorageSizeFor("U-M1")) + MongoStorageBackend.getMapStorageSizeFor("U-M1")) // add the same changeSet for another transaction - MongoStorage.insertMapStorageEntriesFor("U-M2", changeSetM.toList) + MongoStorageBackend.insertMapStorageEntriesFor("U-M2", changeSetM.toList) assertEquals( 6, - MongoStorage.getMapStorageSizeFor("U-M2")) + MongoStorageBackend.getMapStorageSizeFor("U-M2")) // the first transaction should remain the same assertEquals( 8, - MongoStorage.getMapStorageSizeFor("U-M1")) + MongoStorageBackend.getMapStorageSizeFor("U-M1")) changeSetM.clear } @Test def testMapContents = { fillMap - MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList) - MongoStorage.getMapStorageEntryFor("U-M1", "2") match { + MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList) + MongoStorageBackend.getMapStorageEntryFor("U-M1", "2") match { case Some(x) => { val JsString(str) = x.asInstanceOf[JsValue] assertEquals("peter", str) } case None => fail("should fetch peter") } - MongoStorage.getMapStorageEntryFor("U-M1", "4") match { + MongoStorageBackend.getMapStorageEntryFor("U-M1", "4") match { case Some(x) => { val num_list = list ! num val num_list(l0) = x.asInstanceOf[JsValue] @@ -172,7 +172,7 @@ class MongoStorageSpec extends TestCase { } case None => fail("should fetch list") } - MongoStorage.getMapStorageEntryFor("U-M1", "3") match { + MongoStorageBackend.getMapStorageEntryFor("U-M1", "3") match { case Some(x) => { val num_list = list ! num val num_list(l0) = x.asInstanceOf[JsValue] @@ -183,7 +183,7 @@ class MongoStorageSpec extends TestCase { // get the entire map val l: List[Tuple2[AnyRef, AnyRef]] = - MongoStorage.getMapStorageFor("U-M1") + MongoStorageBackend.getMapStorageFor("U-M1") assertEquals(4, l.size) assertTrue(l.map(_._1).contains("1")) @@ -196,7 +196,7 @@ class MongoStorageSpec extends TestCase { // trying to fetch for a non-existent transaction will throw try { - MongoStorage.getMapStorageFor("U-M2") + MongoStorageBackend.getMapStorageFor("U-M2") fail("should throw an exception") } catch {case e: Predef.NoSuchElementException => {}} @@ -207,11 +207,11 @@ class MongoStorageSpec extends TestCase { def testMapContentsByRange = { fillMap changeSetM += "5" -> Map(1 -> "dg", 2 -> "mc") - MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList) + MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList) // specify start and count val l: List[Tuple2[AnyRef, AnyRef]] = - MongoStorage.getMapStorageRangeFor( + MongoStorageBackend.getMapStorageRangeFor( "U-M1", Some(Integer.valueOf(2)), None, 3) assertEquals(3, l.size) @@ -227,27 +227,27 @@ class MongoStorageSpec extends TestCase { // specify start, finish and count where finish - start == count assertEquals(3, - MongoStorage.getMapStorageRangeFor( + MongoStorageBackend.getMapStorageRangeFor( "U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(5)), 3).size) // specify start, finish and count where finish - start > count assertEquals(3, - MongoStorage.getMapStorageRangeFor( + MongoStorageBackend.getMapStorageRangeFor( "U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(9)), 3).size) // do not specify start or finish assertEquals(3, - MongoStorage.getMapStorageRangeFor( + MongoStorageBackend.getMapStorageRangeFor( "U-M1", None, None, 3).size) // specify finish and count assertEquals(3, - MongoStorage.getMapStorageRangeFor( + MongoStorageBackend.getMapStorageRangeFor( "U-M1", None, Some(Integer.valueOf(3)), 3).size) // specify start, finish and count where finish < start assertEquals(3, - MongoStorage.getMapStorageRangeFor( + MongoStorageBackend.getMapStorageRangeFor( "U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(1)), 3).size) changeSetM.clear @@ -258,35 +258,35 @@ class MongoStorageSpec extends TestCase { fillMap changeSetM += "5" -> Map(1 -> "dg", 2 -> "mc") - MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList) + MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList) assertEquals(5, - MongoStorage.getMapStorageSizeFor("U-M1")) + MongoStorageBackend.getMapStorageSizeFor("U-M1")) // remove key "3" - MongoStorage.removeMapStorageFor("U-M1", "3") + MongoStorageBackend.removeMapStorageFor("U-M1", "3") assertEquals(4, - MongoStorage.getMapStorageSizeFor("U-M1")) + MongoStorageBackend.getMapStorageSizeFor("U-M1")) try { - MongoStorage.getMapStorageEntryFor("U-M1", "3") + MongoStorageBackend.getMapStorageEntryFor("U-M1", "3") fail("should throw exception") } catch { case e => {}} // remove key "4" - MongoStorage.removeMapStorageFor("U-M1", "4") + MongoStorageBackend.removeMapStorageFor("U-M1", "4") assertEquals(3, - MongoStorage.getMapStorageSizeFor("U-M1")) + MongoStorageBackend.getMapStorageSizeFor("U-M1")) // remove key "2" - MongoStorage.removeMapStorageFor("U-M1", "2") + MongoStorageBackend.removeMapStorageFor("U-M1", "2") assertEquals(2, - MongoStorage.getMapStorageSizeFor("U-M1")) + MongoStorageBackend.getMapStorageSizeFor("U-M1")) // remove the whole stuff - MongoStorage.removeMapStorageFor("U-M1") + MongoStorageBackend.removeMapStorageFor("U-M1") try { - MongoStorage.getMapStorageFor("U-M1") + MongoStorageBackend.getMapStorageFor("U-M1") fail("should throw exception") } catch { case e: NoSuchElementException => {}} @@ -303,14 +303,14 @@ class MongoStorageSpec extends TestCase { @Test def testRefStorage = { - MongoStorage.getRefStorageFor("U-R1") match { + MongoStorageBackend.getRefStorageFor("U-R1") match { case None => case Some(o) => fail("should be None") } val m = Map("1"->1, "2"->4, "3"->9) - MongoStorage.insertRefStorageFor("U-R1", m) - MongoStorage.getRefStorageFor("U-R1") match { + MongoStorageBackend.insertRefStorageFor("U-R1", m) + MongoStorageBackend.getRefStorageFor("U-R1") match { case None => fail("should not be empty") case Some(r) => { val a = r.asInstanceOf[JsValue] @@ -331,8 +331,8 @@ class MongoStorageSpec extends TestCase { // insert another one // the previous one should be replaced val b = List("100", "jonas") - MongoStorage.insertRefStorageFor("U-R1", b) - MongoStorage.getRefStorageFor("U-R1") match { + MongoStorageBackend.insertRefStorageFor("U-R1", b) + MongoStorageBackend.getRefStorageFor("U-R1") match { case None => fail("should not be empty") case Some(r) => { val a = r.asInstanceOf[JsValue] diff --git a/akka-samples-java/src/main/java/sample/java/PersistentSimpleService.java b/akka-samples-java/src/main/java/sample/java/PersistentSimpleService.java index 09676cb26e..7e0e43b6bd 100644 --- a/akka-samples-java/src/main/java/sample/java/PersistentSimpleService.java +++ b/akka-samples-java/src/main/java/sample/java/PersistentSimpleService.java @@ -12,9 +12,9 @@ import se.scalablesolutions.akka.annotation.transactionrequired; import se.scalablesolutions.akka.annotation.prerestart; import se.scalablesolutions.akka.annotation.postrestart; import se.scalablesolutions.akka.state.PersistentMap; -import se.scalablesolutions.akka.state.PersistentState; -import se.scalablesolutions.akka.state.PersistentMap; -import se.scalablesolutions.akka.state.CassandraStorageConfig; +import se.scalablesolutions.akka.state.CassandraStorage; + +import java.nio.ByteBuffer; /** * Try service out by invoking (multiple times): @@ -26,21 +26,22 @@ import se.scalablesolutions.akka.state.CassandraStorageConfig; @Path("/persistentjavacount") @transactionrequired public class PersistentSimpleService { - private Object KEY = "COUNTER"; + private String KEY = "COUNTER"; private boolean hasStartedTicking = false; - private PersistentMap storage = PersistentState.newMap(new CassandraStorageConfig()); + private PersistentMap storage = CassandraStorage.newMap(); @GET @Produces({"application/html"}) public String count() { if (!hasStartedTicking) { - storage.put(KEY, 0); + storage.put(KEY.getBytes(), ByteBuffer.allocate(2).putInt(0).array()); hasStartedTicking = true; return "Tick: 0\n"; } else { - int counter = (Integer)storage.get(KEY).get() + 1; - storage.put(KEY, counter); + byte[] bytes = (byte[])storage.get(KEY.getBytes()).get(); + int counter = ByteBuffer.wrap(bytes).getInt(); + storage.put(KEY.getBytes(), ByteBuffer.allocate(4).putInt(counter + 1).array()); return "Tick: " + counter + "\n"; } } diff --git a/akka-samples-java/src/main/java/sample/java/SimpleService.java b/akka-samples-java/src/main/java/sample/java/SimpleService.java index 7702396375..7126621e60 100644 --- a/akka-samples-java/src/main/java/sample/java/SimpleService.java +++ b/akka-samples-java/src/main/java/sample/java/SimpleService.java @@ -13,7 +13,6 @@ import se.scalablesolutions.akka.annotation.prerestart; import se.scalablesolutions.akka.annotation.postrestart; import se.scalablesolutions.akka.state.TransactionalState; import se.scalablesolutions.akka.state.TransactionalMap; -import se.scalablesolutions.akka.state.CassandraStorageConfig; /** * Try service out by invoking (multiple times): diff --git a/akka-samples-lift/src/main/scala/akka/SimpleService.scala b/akka-samples-lift/src/main/scala/akka/SimpleService.scala index b4bbd52157..8bec513bb9 100644 --- a/akka-samples-lift/src/main/scala/akka/SimpleService.scala +++ b/akka-samples-lift/src/main/scala/akka/SimpleService.scala @@ -1,13 +1,13 @@ package sample.lift -import se.scalablesolutions.akka.state.{PersistentState, TransactionalState, CassandraStorageConfig} -import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor} +import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.config.ScalaConfig._ -import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.state.{CassandraStorage, TransactionalState} import java.lang.Integer -import javax.ws.rs.core.MultivaluedMap -import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes} +import javax.ws.rs.{GET, Path, Produces} +import java.nio.ByteBuffer + /** * Try service out by invoking (multiple times): @@ -56,7 +56,7 @@ class PersistentSimpleService extends Actor { case object Tick private val KEY = "COUNTER" private var hasStartedTicking = false - private val storage = PersistentState.newMap(CassandraStorageConfig()) + private val storage = CassandraStorage.newMap @GET @Produces(Array("text/html")) @@ -64,13 +64,14 @@ class PersistentSimpleService extends Actor { def receive = { case Tick => if (hasStartedTicking) { - val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue - storage.put(KEY, new Integer(counter + 1)) - reply(

Tick: {counter + 1}

) + val bytes = storage.get(KEY.getBytes).get + val counter = ByteBuffer.wrap(bytes).getInt + storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array) + reply(Tick:{counter + 1}) } else { - storage.put(KEY, new Integer(0)) + storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(0).array) hasStartedTicking = true - reply(

Tick: 0

) + reply(Tick: 0) } } } diff --git a/akka-samples-scala/src/main/scala/SimpleService.scala b/akka-samples-scala/src/main/scala/SimpleService.scala index 982fc08745..a351b79614 100644 --- a/akka-samples-scala/src/main/scala/SimpleService.scala +++ b/akka-samples-scala/src/main/scala/SimpleService.scala @@ -4,8 +4,8 @@ package sample.scala -import se.scalablesolutions.akka.state.{PersistentState, TransactionalState, CassandraStorageConfig} import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor} +import se.scalablesolutions.akka.state.{CassandraStorage, TransactionalState} import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.util.Logging @@ -16,6 +16,7 @@ import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes import org.atmosphere.core.annotation.{Broadcast, Suspend} import org.atmosphere.util.XSSHtmlFilter import org.atmosphere.cpr.BroadcastFilter +import java.nio.ByteBuffer class Boot { val factory = SupervisorFactory( @@ -81,7 +82,7 @@ class PersistentSimpleService extends Actor { case object Tick private val KEY = "COUNTER" private var hasStartedTicking = false - private val storage = PersistentState.newMap(CassandraStorageConfig()) + private val storage = CassandraStorage.newMap @GET @Produces(Array("text/html")) @@ -89,11 +90,12 @@ class PersistentSimpleService extends Actor { def receive = { case Tick => if (hasStartedTicking) { - val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue - storage.put(KEY, new Integer(counter + 1)) + val bytes = storage.get(KEY.getBytes).get + val counter = ByteBuffer.wrap(bytes).getInt + storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array) reply(Tick:{counter + 1}) } else { - storage.put(KEY, new Integer(0)) + storage.put(KEY.getBytes, Array(0.toByte)) hasStartedTicking = true reply(Tick: 0) } diff --git a/akka-samples-security/src/main/scala/SimpleService.scala b/akka-samples-security/src/main/scala/SimpleService.scala index fc8b18367a..e9468ec75c 100644 --- a/akka-samples-security/src/main/scala/SimpleService.scala +++ b/akka-samples-security/src/main/scala/SimpleService.scala @@ -44,7 +44,7 @@ class DigestAuthenticationService extends DigestAuthenticationActor { //don't forget to configure your standalone Cassandra instance // //makeTransactionRequired - //override def mkNonceMap = PersistentState.newMap(CassandraStorageConfig()).asInstanceOf[scala.collection.mutable.Map[String,Long]] + //override def mkNonceMap = Storage.newMap(CassandraStorageConfig()).asInstanceOf[scala.collection.mutable.Map[String,Long]] //Use an in-memory nonce-map as default override def mkNonceMap = new scala.collection.mutable.HashMap[String, Long]