merged after reimpl of persistence API
This commit is contained in:
commit
9cd836cfce
19 changed files with 648 additions and 664 deletions
|
|
@ -379,4 +379,4 @@ object Test5 extends Application {
|
|||
setV ! 'exit
|
||||
|
||||
//System.gc
|
||||
}
|
||||
}
|
||||
|
|
@ -10,7 +10,7 @@ import java.util.concurrent.atomic.AtomicInteger
|
|||
import se.scalablesolutions.akka.state.Committable
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
|
||||
import org.multiverse.api.{Stm, Transaction => MultiverseTransaction}
|
||||
import org.multiverse.api.{Transaction => MultiverseTransaction}
|
||||
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
|
||||
import org.multiverse.api.ThreadLocalTransaction._
|
||||
import org.multiverse.templates.OrElseTemplate
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ public class PersistentClasher {
|
|||
|
||||
@inittransactionalstate
|
||||
public void init() {
|
||||
state = PersistentState.newMap(new CassandraStorageConfig());
|
||||
state = CassandraStorage.newMap();
|
||||
}
|
||||
|
||||
public String getState(String key) {
|
||||
|
|
|
|||
|
|
@ -12,18 +12,19 @@ public class PersistentStateful {
|
|||
|
||||
@inittransactionalstate
|
||||
public void init() {
|
||||
mapState = PersistentState.newMap(new CassandraStorageConfig());
|
||||
vectorState = PersistentState.newVector(new CassandraStorageConfig());
|
||||
refState = PersistentState.newRef(new CassandraStorageConfig());
|
||||
mapState = CassandraStorage.newMap();
|
||||
vectorState = CassandraStorage.newVector();
|
||||
refState = CassandraStorage.newRef();
|
||||
}
|
||||
|
||||
public String getMapState(String key) {
|
||||
return (String) mapState.get(key).get();
|
||||
byte[] bytes = (byte[]) mapState.get(key.getBytes()).get();
|
||||
return new String(bytes, 0, bytes.length);
|
||||
}
|
||||
|
||||
|
||||
public String getVectorState(int index) {
|
||||
return (String) vectorState.get(index);
|
||||
byte[] bytes = (byte[]) vectorState.get(index);
|
||||
return new String(bytes, 0, bytes.length);
|
||||
}
|
||||
|
||||
public int getVectorLength() {
|
||||
|
|
@ -32,62 +33,51 @@ public class PersistentStateful {
|
|||
|
||||
public String getRefState() {
|
||||
if (refState.isDefined()) {
|
||||
return (String) refState.get().get();
|
||||
byte[] bytes = (byte[]) refState.get().get();
|
||||
return new String(bytes, 0, bytes.length);
|
||||
} else throw new IllegalStateException("No such element");
|
||||
}
|
||||
|
||||
|
||||
public void setMapState(String key, String msg) {
|
||||
mapState.put(key, msg);
|
||||
mapState.put(key.getBytes(), msg.getBytes());
|
||||
}
|
||||
|
||||
|
||||
public void setVectorState(String msg) {
|
||||
vectorState.add(msg);
|
||||
vectorState.add(msg.getBytes());
|
||||
}
|
||||
|
||||
|
||||
public void setRefState(String msg) {
|
||||
refState.swap(msg);
|
||||
refState.swap(msg.getBytes());
|
||||
}
|
||||
|
||||
|
||||
public void success(String key, String msg) {
|
||||
mapState.put(key, msg);
|
||||
vectorState.add(msg);
|
||||
refState.swap(msg);
|
||||
mapState.put(key.getBytes(), msg.getBytes());
|
||||
vectorState.add(msg.getBytes());
|
||||
refState.swap(msg.getBytes());
|
||||
}
|
||||
|
||||
|
||||
public String failure(String key, String msg, PersistentFailer failer) {
|
||||
mapState.put(key, msg);
|
||||
vectorState.add(msg);
|
||||
refState.swap(msg);
|
||||
mapState.put(key.getBytes(), msg.getBytes());
|
||||
vectorState.add(msg.getBytes());
|
||||
refState.swap(msg.getBytes());
|
||||
failer.fail();
|
||||
return msg;
|
||||
}
|
||||
|
||||
public String success(String key, String msg, PersistentStatefulNested nested) {
|
||||
mapState.put(key, msg);
|
||||
vectorState.add(msg);
|
||||
refState.swap(msg);
|
||||
mapState.put(key.getBytes(), msg.getBytes());
|
||||
vectorState.add(msg.getBytes());
|
||||
refState.swap(msg.getBytes());
|
||||
nested.success(key, msg);
|
||||
return msg;
|
||||
}
|
||||
|
||||
|
||||
public String failure(String key, String msg, PersistentStatefulNested nested, PersistentFailer failer) {
|
||||
mapState.put(key, msg);
|
||||
vectorState.add(msg);
|
||||
refState.swap(msg);
|
||||
mapState.put(key.getBytes(), msg.getBytes());
|
||||
vectorState.add(msg.getBytes());
|
||||
refState.swap(msg.getBytes());
|
||||
nested.failure(key, msg, failer);
|
||||
return msg;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void thisMethodHangs(String key, String msg, PersistentFailer failer) {
|
||||
setMapState(key, msg);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -12,18 +12,20 @@ public class PersistentStatefulNested {
|
|||
|
||||
@inittransactionalstate
|
||||
public void init() {
|
||||
mapState = PersistentState.newMap(new CassandraStorageConfig());
|
||||
vectorState = PersistentState.newVector(new CassandraStorageConfig());
|
||||
refState = PersistentState.newRef(new CassandraStorageConfig());
|
||||
mapState = CassandraStorage.newMap();
|
||||
vectorState = CassandraStorage.newVector();
|
||||
refState = CassandraStorage.newRef();
|
||||
}
|
||||
|
||||
public String getMapState(String key) {
|
||||
return (String) mapState.get(key).get();
|
||||
byte[] bytes = (byte[]) mapState.get(key.getBytes()).get();
|
||||
return new String(bytes, 0, bytes.length);
|
||||
}
|
||||
|
||||
|
||||
public String getVectorState(int index) {
|
||||
return (String) vectorState.get(index);
|
||||
byte[] bytes = (byte[]) vectorState.get(index);
|
||||
return new String(bytes, 0, bytes.length);
|
||||
}
|
||||
|
||||
public int getVectorLength() {
|
||||
|
|
@ -32,45 +34,36 @@ public class PersistentStatefulNested {
|
|||
|
||||
public String getRefState() {
|
||||
if (refState.isDefined()) {
|
||||
return (String) refState.get().get();
|
||||
byte[] bytes = (byte[]) refState.get().get();
|
||||
return new String(bytes, 0, bytes.length);
|
||||
} else throw new IllegalStateException("No such element");
|
||||
}
|
||||
|
||||
|
||||
public void setMapState(String key, String msg) {
|
||||
mapState.put(key, msg);
|
||||
mapState.put(key.getBytes(), msg.getBytes());
|
||||
}
|
||||
|
||||
|
||||
public void setVectorState(String msg) {
|
||||
vectorState.add(msg);
|
||||
vectorState.add(msg.getBytes());
|
||||
}
|
||||
|
||||
|
||||
public void setRefState(String msg) {
|
||||
refState.swap(msg);
|
||||
refState.swap(msg.getBytes());
|
||||
}
|
||||
|
||||
|
||||
public String success(String key, String msg) {
|
||||
mapState.put(key, msg);
|
||||
vectorState.add(msg);
|
||||
refState.swap(msg);
|
||||
mapState.put(key.getBytes(), msg.getBytes());
|
||||
vectorState.add(msg.getBytes());
|
||||
refState.swap(msg.getBytes());
|
||||
return msg;
|
||||
}
|
||||
|
||||
|
||||
public String failure(String key, String msg, PersistentFailer failer) {
|
||||
mapState.put(key, msg);
|
||||
vectorState.add(msg);
|
||||
refState.swap(msg);
|
||||
mapState.put(key.getBytes(), msg.getBytes());
|
||||
vectorState.add(msg.getBytes());
|
||||
refState.swap(msg.getBytes());
|
||||
failer.fail();
|
||||
return msg;
|
||||
}
|
||||
|
||||
|
||||
public void thisMethodHangs(String key, String msg, PersistentFailer failer) {
|
||||
setMapState(key, msg);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -49,7 +49,12 @@
|
|||
<artifactId>commons-pool</artifactId>
|
||||
<version>1.5.1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>log4j</groupId>
|
||||
<artifactId>log4j</artifactId>
|
||||
<version>1.2.13</version>
|
||||
</dependency>
|
||||
|
||||
<!-- For Testing -->
|
||||
<dependency>
|
||||
<groupId>org.scalatest</groupId>
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ package se.scalablesolutions.akka.state
|
|||
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.util.Helpers._
|
||||
import se.scalablesolutions.akka.serialization.Serializer
|
||||
import se.scalablesolutions.akka.Config.config
|
||||
|
||||
import org.apache.cassandra.service._
|
||||
|
|
@ -14,8 +13,14 @@ import org.apache.cassandra.service._
|
|||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object CassandraStorage extends MapStorage
|
||||
with VectorStorage with RefStorage with Logging {
|
||||
private[akka] object CassandraStorageBackend extends
|
||||
MapStorageBackend[Array[Byte], Array[Byte]] with
|
||||
VectorStorageBackend[Array[Byte]] with
|
||||
RefStorageBackend[Array[Byte]] with
|
||||
Logging {
|
||||
|
||||
type ElementType = Array[Byte]
|
||||
|
||||
val KEYSPACE = "akka"
|
||||
val MAP_COLUMN_PARENT = new ColumnParent("map", null)
|
||||
val VECTOR_COLUMN_PARENT = new ColumnParent("vector", null)
|
||||
|
|
@ -31,35 +36,14 @@ object CassandraStorage extends MapStorage
|
|||
case "ONE" => 1
|
||||
case "QUORUM" => 2
|
||||
case "ALL" => 3
|
||||
case unknown => throw new IllegalArgumentException("Consistency level [" + unknown + "] is not supported. Expected one of [ZERO, ONE, QUORUM, ALL]")
|
||||
case unknown => throw new IllegalArgumentException(
|
||||
"Cassandra consistency level [" + unknown + "] is not supported. Expected one of [ZERO, ONE, QUORUM, ALL]")
|
||||
}
|
||||
}
|
||||
val IS_ASCENDING = true
|
||||
|
||||
@volatile private[this] var isRunning = false
|
||||
private[this] val protocol: Protocol = Protocol.Binary
|
||||
/* {
|
||||
config.getString("akka.storage.cassandra.procotol", "binary") match {
|
||||
case "binary" => Protocol.Binary
|
||||
case "json" => Protocol.JSON
|
||||
case "simple-json" => Protocol.SimpleJSON
|
||||
case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]")
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
private[this] val serializer: Serializer = {
|
||||
config.getString("akka.storage.cassandra.storage-format", "manual") match {
|
||||
case "scala-json" => Serializer.ScalaJSON
|
||||
case "java-json" => Serializer.JavaJSON
|
||||
case "protobuf" => Serializer.Protobuf
|
||||
case "java" => Serializer.Java
|
||||
case "manual" => Serializer.NOOP
|
||||
case "sbinary" => throw new UnsupportedOperationException("SBinary serialization protocol is not yet supported for storage")
|
||||
case "avro" => throw new UnsupportedOperationException("Avro serialization protocol is not yet supported for storage")
|
||||
case unknown => throw new UnsupportedOperationException("Unknown storage serialization protocol [" + unknown + "]")
|
||||
}
|
||||
}
|
||||
|
||||
private[this] val sessions = new CassandraSessionPool(
|
||||
KEYSPACE,
|
||||
|
|
@ -71,22 +55,22 @@ object CassandraStorage extends MapStorage
|
|||
// For Ref
|
||||
// ===============================================================
|
||||
|
||||
def insertRefStorageFor(name: String, element: AnyRef) = {
|
||||
def insertRefStorageFor(name: String, element: Array[Byte]) = {
|
||||
sessions.withSession {
|
||||
_ ++| (name,
|
||||
new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY),
|
||||
serializer.out(element),
|
||||
element,
|
||||
System.currentTimeMillis,
|
||||
CONSISTENCY_LEVEL)
|
||||
}
|
||||
}
|
||||
|
||||
def getRefStorageFor(name: String): Option[AnyRef] = {
|
||||
def getRefStorageFor(name: String): Option[Array[Byte]] = {
|
||||
try {
|
||||
val column: Option[ColumnOrSuperColumn] = sessions.withSession {
|
||||
_ | (name, new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY))
|
||||
}
|
||||
if (column.isDefined) Some(serializer.in(column.get.getColumn.value, None))
|
||||
if (column.isDefined) Some(column.get.getColumn.value)
|
||||
else None
|
||||
} catch {
|
||||
case e =>
|
||||
|
|
@ -99,40 +83,40 @@ object CassandraStorage extends MapStorage
|
|||
// For Vector
|
||||
// ===============================================================
|
||||
|
||||
def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
|
||||
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = {
|
||||
sessions.withSession {
|
||||
_ ++| (name,
|
||||
new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(getVectorStorageSizeFor(name))),
|
||||
serializer.out(element),
|
||||
element,
|
||||
System.currentTimeMillis,
|
||||
CONSISTENCY_LEVEL)
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME implement insertVectorStorageEntriesFor
|
||||
def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
|
||||
throw new UnsupportedOperationException("insertVectorStorageEntriesFor for CassandraStorage is not implemented yet")
|
||||
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = {
|
||||
throw new UnsupportedOperationException("insertVectorStorageEntriesFor for CassandraStorageBackend is not implemented yet")
|
||||
}
|
||||
|
||||
def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) = {
|
||||
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = {
|
||||
sessions.withSession {
|
||||
_ ++| (name,
|
||||
new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index)),
|
||||
serializer.out(elem),
|
||||
elem,
|
||||
System.currentTimeMillis,
|
||||
CONSISTENCY_LEVEL)
|
||||
}
|
||||
}
|
||||
|
||||
def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
|
||||
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
|
||||
val column: Option[ColumnOrSuperColumn] = sessions.withSession {
|
||||
_ | (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index)))
|
||||
}
|
||||
if (column.isDefined) serializer.in(column.get.column.value, None)
|
||||
if (column.isDefined) column.get.column.value
|
||||
else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]")
|
||||
}
|
||||
|
||||
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
|
||||
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = {
|
||||
val startBytes = if (start.isDefined) intToBytes(start.get) else null
|
||||
val finishBytes = if (finish.isDefined) intToBytes(finish.get) else null
|
||||
val columns: List[ColumnOrSuperColumn] = sessions.withSession {
|
||||
|
|
@ -143,7 +127,7 @@ object CassandraStorage extends MapStorage
|
|||
count,
|
||||
CONSISTENCY_LEVEL)
|
||||
}
|
||||
columns.map(column => serializer.in(column.getColumn.value, None))
|
||||
columns.map(column => column.getColumn.value)
|
||||
}
|
||||
|
||||
def getVectorStorageSizeFor(name: String): Int = {
|
||||
|
|
@ -156,21 +140,21 @@ object CassandraStorage extends MapStorage
|
|||
// For Map
|
||||
// ===============================================================
|
||||
|
||||
def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = {
|
||||
def insertMapStorageEntryFor(name: String, key: Array[Byte], element: Array[Byte]) = {
|
||||
sessions.withSession {
|
||||
_ ++| (name,
|
||||
new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)),
|
||||
serializer.out(element),
|
||||
new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, key),
|
||||
element,
|
||||
System.currentTimeMillis,
|
||||
CONSISTENCY_LEVEL)
|
||||
}
|
||||
}
|
||||
|
||||
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = {
|
||||
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]) = {
|
||||
val batch = new scala.collection.mutable.HashMap[String, List[ColumnOrSuperColumn]]
|
||||
for (entry <- entries) {
|
||||
val columnOrSuperColumn = new ColumnOrSuperColumn
|
||||
columnOrSuperColumn.setColumn(new Column(serializer.out(entry._1), serializer.out(entry._2), System.currentTimeMillis))
|
||||
columnOrSuperColumn.setColumn(new Column(entry._1, entry._2, System.currentTimeMillis))
|
||||
batch + (MAP_COLUMN_PARENT.getColumn_family -> List(columnOrSuperColumn))
|
||||
}
|
||||
sessions.withSession {
|
||||
|
|
@ -178,12 +162,12 @@ object CassandraStorage extends MapStorage
|
|||
}
|
||||
}
|
||||
|
||||
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = {
|
||||
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = {
|
||||
try {
|
||||
val column: Option[ColumnOrSuperColumn] = sessions.withSession {
|
||||
_ | (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)))
|
||||
_ | (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, key))
|
||||
}
|
||||
if (column.isDefined) Some(serializer.in(column.get.getColumn.value, None))
|
||||
if (column.isDefined) Some(column.get.getColumn.value)
|
||||
else None
|
||||
} catch {
|
||||
case e =>
|
||||
|
|
@ -192,13 +176,16 @@ object CassandraStorage extends MapStorage
|
|||
}
|
||||
}
|
||||
|
||||
def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = {
|
||||
def getMapStorageFor(name: String): List[Tuple2[Array[Byte], Array[Byte]]] = {
|
||||
val size = getMapStorageSizeFor(name)
|
||||
sessions.withSession { session =>
|
||||
val columns = session / (name, MAP_COLUMN_PARENT, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY, true, size, CONSISTENCY_LEVEL)
|
||||
val columns = session /
|
||||
(name, MAP_COLUMN_PARENT,
|
||||
EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY,
|
||||
true, size, CONSISTENCY_LEVEL)
|
||||
for {
|
||||
columnOrSuperColumn <- columns
|
||||
entry = (serializer.in(columnOrSuperColumn.column.name, None), serializer.in(columnOrSuperColumn.column.value, None))
|
||||
entry = (columnOrSuperColumn.column.name, columnOrSuperColumn.column.value)
|
||||
} yield entry
|
||||
}
|
||||
}
|
||||
|
|
@ -209,8 +196,8 @@ object CassandraStorage extends MapStorage
|
|||
|
||||
def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null)
|
||||
|
||||
def removeMapStorageFor(name: String, key: AnyRef): Unit = {
|
||||
val keyBytes = if (key == null) null else serializer.out(key)
|
||||
def removeMapStorageFor(name: String, key: Array[Byte]): Unit = {
|
||||
val keyBytes = if (key == null) null else key
|
||||
sessions.withSession {
|
||||
_ -- (name,
|
||||
new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, keyBytes),
|
||||
|
|
@ -219,13 +206,13 @@ object CassandraStorage extends MapStorage
|
|||
}
|
||||
}
|
||||
|
||||
def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int):
|
||||
List[Tuple2[AnyRef, AnyRef]] = {
|
||||
val startBytes = if (start.isDefined) serializer.out(start.get) else null
|
||||
val finishBytes = if (finish.isDefined) serializer.out(finish.get) else null
|
||||
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int):
|
||||
List[Tuple2[Array[Byte], Array[Byte]]] = {
|
||||
val startBytes = if (start.isDefined) start.get else null
|
||||
val finishBytes = if (finish.isDefined) finish.get else null
|
||||
val columns: List[ColumnOrSuperColumn] = sessions.withSession {
|
||||
_ / (name, MAP_COLUMN_PARENT, startBytes, finishBytes, IS_ASCENDING, count, CONSISTENCY_LEVEL)
|
||||
}
|
||||
columns.map(column => (column.getColumn.name, serializer.in(column.getColumn.value, None)))
|
||||
columns.map(column => (column.getColumn.name, column.getColumn.value))
|
||||
}
|
||||
}
|
||||
|
|
@ -4,8 +4,8 @@
|
|||
|
||||
package se.scalablesolutions.akka.state
|
||||
|
||||
import util.Logging
|
||||
import Config.config
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.Config.config
|
||||
|
||||
import sjson.json.Serializer._
|
||||
|
||||
|
|
@ -23,8 +23,12 @@ import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList}
|
|||
* <p/>
|
||||
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||
*/
|
||||
object MongoStorage extends MapStorage with VectorStorage with RefStorage with Logging {
|
||||
|
||||
private[akka] object MongoStorageBackend extends
|
||||
MapStorageBackend[AnyRef, AnyRef] with
|
||||
VectorStorageBackend[AnyRef] with
|
||||
RefStorageBackend[AnyRef] with
|
||||
Logging {
|
||||
|
||||
// enrich with null safe findOne
|
||||
class RichDBCollection(value: DBCollection) {
|
||||
def findOneNS(o: DBObject): Option[DBObject] = {
|
||||
|
|
@ -34,13 +38,13 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
implicit def enrichDBCollection(c: DBCollection) = new RichDBCollection(c)
|
||||
|
||||
|
||||
val KEY = "key"
|
||||
val VALUE = "value"
|
||||
val COLLECTION = "akka_coll"
|
||||
|
||||
|
||||
val MONGODB_SERVER_HOSTNAME = config.getString("akka.storage.mongodb.hostname", "127.0.0.1")
|
||||
val MONGODB_SERVER_DBNAME = config.getString("akka.storage.mongodb.dbname", "testdb")
|
||||
val MONGODB_SERVER_PORT = config.getInt("akka.storage.mongodb.port", 27017)
|
||||
|
|
@ -50,27 +54,27 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
|
|||
|
||||
// FIXME: make this pluggable
|
||||
private[this] val serializer = SJSON
|
||||
|
||||
|
||||
def insertMapStorageEntryFor(name: String, key: AnyRef, value: AnyRef) {
|
||||
insertMapStorageEntriesFor(name, List((key, value)))
|
||||
}
|
||||
|
||||
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) {
|
||||
import java.util.{Map, HashMap}
|
||||
|
||||
|
||||
val m: Map[AnyRef, AnyRef] = new HashMap
|
||||
for ((k, v) <- entries) {
|
||||
m.put(k, serializer.out(v))
|
||||
}
|
||||
|
||||
|
||||
nullSafeFindOne(name) match {
|
||||
case None =>
|
||||
case None =>
|
||||
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, m))
|
||||
case Some(dbo) => {
|
||||
// collate the maps
|
||||
val o = dbo.get(VALUE).asInstanceOf[Map[AnyRef, AnyRef]]
|
||||
o.putAll(m)
|
||||
|
||||
|
||||
// remove existing reference
|
||||
removeMapStorageFor(name)
|
||||
// and insert
|
||||
|
|
@ -78,16 +82,16 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
def removeMapStorageFor(name: String) = {
|
||||
|
||||
def removeMapStorageFor(name: String): Unit = {
|
||||
val q = new BasicDBObject
|
||||
q.put(KEY, name)
|
||||
coll.remove(q)
|
||||
}
|
||||
|
||||
def removeMapStorageFor(name: String, key: AnyRef) = {
|
||||
def removeMapStorageFor(name: String, key: AnyRef): Unit = {
|
||||
nullSafeFindOne(name) match {
|
||||
case None =>
|
||||
case None =>
|
||||
case Some(dbo) => {
|
||||
val orig = dbo.get(VALUE).asInstanceOf[DBObject].toMap
|
||||
if (key.isInstanceOf[List[_]]) {
|
||||
|
|
@ -104,10 +108,10 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] =
|
||||
|
||||
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] =
|
||||
getValueForKey(name, key.asInstanceOf[String])
|
||||
|
||||
|
||||
def getMapStorageSizeFor(name: String): Int = {
|
||||
nullSafeFindOne(name) match {
|
||||
case None => 0
|
||||
|
|
@ -115,55 +119,55 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
|
|||
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]].keySet.size
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = {
|
||||
val m =
|
||||
val m =
|
||||
nullSafeFindOne(name) match {
|
||||
case None =>
|
||||
case None =>
|
||||
throw new Predef.NoSuchElementException(name + " not present")
|
||||
case Some(dbo) =>
|
||||
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
|
||||
}
|
||||
val n =
|
||||
val n =
|
||||
List(m.keySet.toArray: _*).asInstanceOf[List[String]]
|
||||
val vals =
|
||||
for(s <- n)
|
||||
val vals =
|
||||
for(s <- n)
|
||||
yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]]))
|
||||
vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
|
||||
}
|
||||
|
||||
def getMapStorageRangeFor(name: String, start: Option[AnyRef],
|
||||
finish: Option[AnyRef],
|
||||
|
||||
def getMapStorageRangeFor(name: String, start: Option[AnyRef],
|
||||
finish: Option[AnyRef],
|
||||
count: Int): List[Tuple2[AnyRef, AnyRef]] = {
|
||||
val m =
|
||||
val m =
|
||||
nullSafeFindOne(name) match {
|
||||
case None =>
|
||||
case None =>
|
||||
throw new Predef.NoSuchElementException(name + " not present")
|
||||
case Some(dbo) =>
|
||||
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
|
||||
}
|
||||
|
||||
/**
|
||||
* <tt>count</tt> is the max number of results to return. Start with
|
||||
* <tt>count</tt> is the max number of results to return. Start with
|
||||
* <tt>start</tt> or 0 (if <tt>start</tt> is not defined) and go until
|
||||
* you hit <tt>finish</tt> or <tt>count</tt>.
|
||||
*/
|
||||
val s = if (start.isDefined) start.get.asInstanceOf[Int] else 0
|
||||
val cnt =
|
||||
val cnt =
|
||||
if (finish.isDefined) {
|
||||
val f = finish.get.asInstanceOf[Int]
|
||||
if (f >= s) Math.min(count, (f - s)) else count
|
||||
}
|
||||
else count
|
||||
|
||||
val n =
|
||||
val n =
|
||||
List(m.keySet.toArray: _*).asInstanceOf[List[String]].sort((e1, e2) => (e1 compareTo e2) < 0).slice(s, s + cnt)
|
||||
val vals =
|
||||
for(s <- n)
|
||||
val vals =
|
||||
for(s <- n)
|
||||
yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]]))
|
||||
vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
|
||||
}
|
||||
|
||||
|
||||
private def getValueForKey(name: String, key: String): Option[AnyRef] = {
|
||||
try {
|
||||
nullSafeFindOne(name) match {
|
||||
|
|
@ -179,16 +183,16 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
|
|||
throw new Predef.NoSuchElementException(e.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
|
||||
val q = new BasicDBObject
|
||||
q.put(KEY, name)
|
||||
|
||||
|
||||
val currentList =
|
||||
coll.findOneNS(q) match {
|
||||
case None =>
|
||||
case None =>
|
||||
new JArrayList[AnyRef]
|
||||
case Some(dbo) =>
|
||||
case Some(dbo) =>
|
||||
dbo.get(VALUE).asInstanceOf[JArrayList[AnyRef]]
|
||||
}
|
||||
if (!currentList.isEmpty) {
|
||||
|
|
@ -196,26 +200,26 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
|
|||
// remove before adding
|
||||
coll.remove(q)
|
||||
}
|
||||
|
||||
|
||||
// add to the current list
|
||||
elements.map(serializer.out(_)).foreach(currentList.add(_))
|
||||
|
||||
|
||||
coll.insert(
|
||||
new BasicDBObject()
|
||||
.append(KEY, name)
|
||||
.append(VALUE, currentList)
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
|
||||
insertVectorStorageEntriesFor(name, List(element))
|
||||
}
|
||||
|
||||
|
||||
def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
|
||||
try {
|
||||
val o =
|
||||
nullSafeFindOne(name) match {
|
||||
case None =>
|
||||
case None =>
|
||||
throw new Predef.NoSuchElementException(name + " not present")
|
||||
|
||||
case Some(dbo) =>
|
||||
|
|
@ -224,17 +228,17 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
|
|||
serializer.in[AnyRef](
|
||||
o.get(index).asInstanceOf[Array[Byte]])
|
||||
} catch {
|
||||
case e =>
|
||||
case e =>
|
||||
throw new Predef.NoSuchElementException(e.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
def getVectorStorageRangeFor(name: String,
|
||||
|
||||
def getVectorStorageRangeFor(name: String,
|
||||
start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
|
||||
try {
|
||||
val o =
|
||||
nullSafeFindOne(name) match {
|
||||
case None =>
|
||||
case None =>
|
||||
throw new Predef.NoSuchElementException(name + " not present")
|
||||
|
||||
case Some(dbo) =>
|
||||
|
|
@ -242,24 +246,24 @@ object MongoStorage extends MapStorage with VectorStorage with RefStorage with L
|
|||
}
|
||||
|
||||
// pick the subrange and make a Scala list
|
||||
val l =
|
||||
val l =
|
||||
List(o.subList(start.get, start.get + count).toArray: _*)
|
||||
|
||||
for(e <- l)
|
||||
for(e <- l)
|
||||
yield serializer.in[AnyRef](e.asInstanceOf[Array[Byte]])
|
||||
} catch {
|
||||
case e =>
|
||||
case e =>
|
||||
throw new Predef.NoSuchElementException(e.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// FIXME implement updateVectorStorageEntryFor
|
||||
def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) = throw new UnsupportedOperationException
|
||||
|
||||
|
||||
def getVectorStorageSizeFor(name: String): Int = {
|
||||
nullSafeFindOne(name) match {
|
||||
case None => 0
|
||||
case Some(dbo) =>
|
||||
case Some(dbo) =>
|
||||
dbo.get(VALUE).asInstanceOf[JList[AnyRef]].size
|
||||
}
|
||||
}
|
||||
|
|
@ -1,352 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009 Scalable Solutions.
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.state
|
||||
|
||||
import se.scalablesolutions.akka.stm.TransactionManagement.currentTransaction
|
||||
import se.scalablesolutions.akka.collection._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
|
||||
import org.codehaus.aspectwerkz.proxy.Uuid
|
||||
|
||||
class NoTransactionInScopeException extends RuntimeException
|
||||
|
||||
sealed abstract class PersistentStateConfig
|
||||
abstract class PersistentStorageConfig extends PersistentStateConfig
|
||||
case class CassandraStorageConfig() extends PersistentStorageConfig
|
||||
case class TerracottaStorageConfig() extends PersistentStorageConfig
|
||||
case class TokyoCabinetStorageConfig() extends PersistentStorageConfig
|
||||
case class MongoStorageConfig() extends PersistentStorageConfig
|
||||
|
||||
/**
|
||||
* Example Scala usage.
|
||||
* <p/>
|
||||
* New map with generated id.
|
||||
* <pre>
|
||||
* val myMap = PersistentState.newMap(CassandraStorageConfig)
|
||||
* </pre>
|
||||
*
|
||||
* New map with user-defined id.
|
||||
* <pre>
|
||||
* val myMap = PersistentState.newMap(CassandraStorageConfig, id)
|
||||
* </pre>
|
||||
*
|
||||
* Get map by user-defined id.
|
||||
* <pre>
|
||||
* val myMap = PersistentState.getMap(CassandraStorageConfig, id)
|
||||
* </pre>
|
||||
*
|
||||
* Example Java usage:
|
||||
* <pre>
|
||||
* TransactionalMap myMap = PersistentState.newMap(new CassandraStorageConfig());
|
||||
* </pre>
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object PersistentState {
|
||||
def newMap(config: PersistentStorageConfig): PersistentMap =
|
||||
// FIXME: won't work across the remote machines, use [http://johannburkard.de/software/uuid/]
|
||||
newMap(config, Uuid.newUuid.toString)
|
||||
|
||||
def newVector(config: PersistentStorageConfig): PersistentVector =
|
||||
newVector(config, Uuid.newUuid.toString)
|
||||
|
||||
def newRef(config: PersistentStorageConfig): PersistentRef =
|
||||
newRef(config, Uuid.newUuid.toString)
|
||||
|
||||
def getMap(config: PersistentStorageConfig, id: String): PersistentMap =
|
||||
newMap(config, id)
|
||||
|
||||
def getVector(config: PersistentStorageConfig, id: String): PersistentVector =
|
||||
newVector(config, id)
|
||||
|
||||
def getRef(config: PersistentStorageConfig, id: String): PersistentRef =
|
||||
newRef(config, id)
|
||||
|
||||
def newMap(config: PersistentStorageConfig, id: String): PersistentMap = config match {
|
||||
case CassandraStorageConfig() => new CassandraPersistentMap(id)
|
||||
case MongoStorageConfig() => new MongoPersistentMap(id)
|
||||
case TerracottaStorageConfig() => throw new UnsupportedOperationException
|
||||
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
|
||||
}
|
||||
|
||||
def newVector(config: PersistentStorageConfig, id: String): PersistentVector = config match {
|
||||
case CassandraStorageConfig() => new CassandraPersistentVector(id)
|
||||
case MongoStorageConfig() => new MongoPersistentVector(id)
|
||||
case TerracottaStorageConfig() => throw new UnsupportedOperationException
|
||||
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
|
||||
}
|
||||
|
||||
def newRef(config: PersistentStorageConfig, id: String): PersistentRef = config match {
|
||||
case CassandraStorageConfig() => new CassandraPersistentRef(id)
|
||||
case MongoStorageConfig() => new MongoPersistentRef(id)
|
||||
case TerracottaStorageConfig() => throw new UnsupportedOperationException
|
||||
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implementation of <tt>PersistentMap</tt> for every concrete
|
||||
* storage will have the same workflow. This abstracts the workflow.
|
||||
*
|
||||
* Subclasses just need to provide the actual concrete instance for the
|
||||
* abstract val <tt>storage</tt>.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef]
|
||||
with Transactional with Committable with Logging {
|
||||
protected val newAndUpdatedEntries = TransactionalState.newMap[AnyRef, AnyRef]
|
||||
protected val removedEntries = TransactionalState.newVector[AnyRef]
|
||||
protected val shouldClearOnCommit = TransactionalRef[Boolean]()
|
||||
|
||||
// to be concretized in subclasses
|
||||
val storage: MapStorage
|
||||
|
||||
def commit = {
|
||||
storage.removeMapStorageFor(uuid, removedEntries.toList)
|
||||
storage.insertMapStorageEntriesFor(uuid, newAndUpdatedEntries.toList)
|
||||
if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get.get)
|
||||
storage.removeMapStorageFor(uuid)
|
||||
newAndUpdatedEntries.clear
|
||||
removedEntries.clear
|
||||
}
|
||||
|
||||
def -=(key: AnyRef) = remove(key)
|
||||
|
||||
def +=(key: AnyRef, value: AnyRef) = put(key, value)
|
||||
|
||||
override def put(key: AnyRef, value: AnyRef): Option[AnyRef] = {
|
||||
register
|
||||
newAndUpdatedEntries.put(key, value)
|
||||
}
|
||||
|
||||
override def update(key: AnyRef, value: AnyRef) = {
|
||||
register
|
||||
newAndUpdatedEntries.update(key, value)
|
||||
}
|
||||
|
||||
def remove(key: AnyRef) = {
|
||||
register
|
||||
removedEntries.add(key)
|
||||
}
|
||||
|
||||
def slice(start: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] =
|
||||
slice(start, None, count)
|
||||
|
||||
def slice(start: Option[AnyRef], finish: Option[AnyRef], count: Int):
|
||||
List[Tuple2[AnyRef, AnyRef]] = try {
|
||||
storage.getMapStorageRangeFor(uuid, start, finish, count)
|
||||
} catch { case e: Exception => Nil }
|
||||
|
||||
override def clear = {
|
||||
register
|
||||
shouldClearOnCommit.swap(true)
|
||||
}
|
||||
|
||||
override def contains(key: AnyRef): Boolean = try {
|
||||
newAndUpdatedEntries.contains(key) ||
|
||||
storage.getMapStorageEntryFor(uuid, key).isDefined
|
||||
} catch { case e: Exception => false }
|
||||
|
||||
override def size: Int = try {
|
||||
storage.getMapStorageSizeFor(uuid)
|
||||
} catch { case e: Exception => 0 }
|
||||
|
||||
override def get(key: AnyRef): Option[AnyRef] = {
|
||||
if (newAndUpdatedEntries.contains(key)) {
|
||||
newAndUpdatedEntries.get(key)
|
||||
}
|
||||
else try {
|
||||
storage.getMapStorageEntryFor(uuid, key)
|
||||
} catch { case e: Exception => None }
|
||||
}
|
||||
|
||||
override def elements: Iterator[Tuple2[AnyRef, AnyRef]] = {
|
||||
new Iterator[Tuple2[AnyRef, AnyRef]] {
|
||||
private val originalList: List[Tuple2[AnyRef, AnyRef]] = try {
|
||||
storage.getMapStorageFor(uuid)
|
||||
} catch {
|
||||
case e: Throwable => Nil
|
||||
}
|
||||
// FIXME how to deal with updated entries, these should be replaced in the originalList not just added
|
||||
private var elements = newAndUpdatedEntries.toList ::: originalList.reverse
|
||||
override def next: Tuple2[AnyRef, AnyRef]= synchronized {
|
||||
val element = elements.head
|
||||
elements = elements.tail
|
||||
element
|
||||
}
|
||||
override def hasNext: Boolean = synchronized { !elements.isEmpty }
|
||||
}
|
||||
}
|
||||
|
||||
private def register = {
|
||||
if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
|
||||
currentTransaction.get.get.register(uuid, this)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class CassandraPersistentMap(id: String) extends PersistentMap {
|
||||
val uuid = id
|
||||
val storage = CassandraStorage
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements a persistent transactional map based on the MongoDB document storage.
|
||||
*
|
||||
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||
*/
|
||||
class MongoPersistentMap(id: String) extends PersistentMap {
|
||||
val uuid = id
|
||||
val storage = MongoStorage
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements a template for a concrete persistent transactional vector based storage.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait PersistentVector extends RandomAccessSeq[AnyRef] with Transactional with Committable {
|
||||
protected val newElems = TransactionalState.newVector[AnyRef]
|
||||
protected val updatedElems = TransactionalState.newMap[Int, AnyRef]
|
||||
protected val removedElems = TransactionalState.newVector[AnyRef]
|
||||
protected val shouldClearOnCommit = TransactionalRef[Boolean]()
|
||||
|
||||
val storage: VectorStorage
|
||||
|
||||
def commit = {
|
||||
// FIXME: should use batch function once the bug is resolved
|
||||
for (element <- newElems) storage.insertVectorStorageEntryFor(uuid, element)
|
||||
for (entry <- updatedElems) storage.updateVectorStorageEntryFor(uuid, entry._1, entry._2)
|
||||
newElems.clear
|
||||
updatedElems.clear
|
||||
}
|
||||
|
||||
def +(elem: AnyRef) = add(elem)
|
||||
|
||||
def add(elem: AnyRef) = {
|
||||
register
|
||||
newElems + elem
|
||||
}
|
||||
|
||||
def apply(index: Int): AnyRef = get(index)
|
||||
|
||||
def get(index: Int): AnyRef = {
|
||||
if (newElems.size > index) newElems(index)
|
||||
else storage.getVectorStorageEntryFor(uuid, index)
|
||||
}
|
||||
|
||||
override def slice(start: Int, count: Int): RandomAccessSeq[AnyRef] = slice(Some(start), None, count)
|
||||
|
||||
def slice(start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[AnyRef] = {
|
||||
val buffer = new scala.collection.mutable.ArrayBuffer[AnyRef]
|
||||
storage.getVectorStorageRangeFor(uuid, start, finish, count).foreach(buffer.append(_))
|
||||
buffer
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the <i>tail</i> element of this vector.
|
||||
*/
|
||||
// FIXME: implement persistent vector pop
|
||||
def pop: AnyRef = {
|
||||
register
|
||||
throw new UnsupportedOperationException("need to implement persistent vector pop")
|
||||
}
|
||||
|
||||
def update(index: Int, newElem: AnyRef) = {
|
||||
register
|
||||
storage.updateVectorStorageEntryFor(uuid, index, newElem)
|
||||
}
|
||||
|
||||
override def first: AnyRef = get(0)
|
||||
|
||||
override def last: AnyRef = {
|
||||
if (newElems.length != 0) newElems.last
|
||||
else {
|
||||
val len = length
|
||||
if (len == 0) throw new NoSuchElementException("Vector is empty")
|
||||
get(len - 1)
|
||||
}
|
||||
}
|
||||
|
||||
def length: Int = storage.getVectorStorageSizeFor(uuid) + newElems.length
|
||||
|
||||
private def register = {
|
||||
if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
|
||||
currentTransaction.get.get.register(uuid, this)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements a persistent transactional vector based on the Cassandra
|
||||
* distributed P2P key-value storage.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class CassandraPersistentVector(id: String) extends PersistentVector {
|
||||
val uuid = id
|
||||
val storage = CassandraStorage
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements a persistent transactional vector based on the MongoDB
|
||||
* document storage.
|
||||
*
|
||||
* @author <a href="http://debasishg.blogspot.com">Debaissh Ghosh</a>
|
||||
*/
|
||||
class MongoPersistentVector(id: String) extends PersistentVector {
|
||||
val uuid = id
|
||||
val storage = MongoStorage
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements a persistent reference with abstract storage.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait PersistentRef extends Transactional with Committable {
|
||||
protected val ref = new TransactionalRef[AnyRef]
|
||||
|
||||
val storage: RefStorage
|
||||
|
||||
def commit = if (ref.isDefined) {
|
||||
storage.insertRefStorageFor(uuid, ref.get.get)
|
||||
ref.swap(null)
|
||||
}
|
||||
|
||||
def swap(elem: AnyRef) = {
|
||||
register
|
||||
ref.swap(elem)
|
||||
}
|
||||
|
||||
def get: Option[AnyRef] = if (ref.isDefined) ref.get else storage.getRefStorageFor(uuid)
|
||||
|
||||
def isDefined: Boolean = ref.isDefined || storage.getRefStorageFor(uuid).isDefined
|
||||
|
||||
def getOrElse(default: => AnyRef): AnyRef = {
|
||||
val current = get
|
||||
if (current.isDefined) current.get
|
||||
else default
|
||||
}
|
||||
|
||||
private def register = {
|
||||
if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
|
||||
currentTransaction.get.get.register(uuid, this)
|
||||
}
|
||||
}
|
||||
|
||||
class CassandraPersistentRef(id: String) extends PersistentRef {
|
||||
val uuid = id
|
||||
val storage = CassandraStorage
|
||||
}
|
||||
|
||||
class MongoPersistentRef(id: String) extends PersistentRef {
|
||||
val uuid = id
|
||||
val storage = MongoStorage
|
||||
}
|
||||
|
|
@ -4,33 +4,352 @@
|
|||
|
||||
package se.scalablesolutions.akka.state
|
||||
|
||||
// abstracts persistence storage
|
||||
trait Storage
|
||||
import se.scalablesolutions.akka.stm.TransactionManagement.currentTransaction
|
||||
import se.scalablesolutions.akka.collection._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
|
||||
// for Maps
|
||||
trait MapStorage extends Storage {
|
||||
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]])
|
||||
def insertMapStorageEntryFor(name: String, key: AnyRef, value: AnyRef)
|
||||
def removeMapStorageFor(name: String)
|
||||
def removeMapStorageFor(name: String, key: AnyRef)
|
||||
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef]
|
||||
def getMapStorageSizeFor(name: String): Int
|
||||
def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]]
|
||||
def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]]
|
||||
import org.codehaus.aspectwerkz.proxy.Uuid
|
||||
|
||||
class NoTransactionInScopeException extends RuntimeException
|
||||
|
||||
/**
|
||||
* Example Scala usage.
|
||||
* <p/>
|
||||
* New map with generated id.
|
||||
* <pre>
|
||||
* val myMap = CassandraStorage.newMap
|
||||
* </pre>
|
||||
*
|
||||
* New map with user-defined id.
|
||||
* <pre>
|
||||
* val myMap = MongoStorage.newMap(id)
|
||||
* </pre>
|
||||
*
|
||||
* Get map by user-defined id.
|
||||
* <pre>
|
||||
* val myMap = CassandraStorage.getMap(id)
|
||||
* </pre>
|
||||
*
|
||||
* Example Java usage:
|
||||
* <pre>
|
||||
* PersistentMap<Object, Object> myMap = MongoStorage.newMap();
|
||||
* </pre>
|
||||
* Or:
|
||||
* <pre>
|
||||
* MongoPersistentMap myMap = MongoStorage.getMap(id);
|
||||
* </pre>
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait Storage {
|
||||
// FIXME: The UUID won't work across the remote machines, use [http://johannburkard.de/software/uuid/]
|
||||
type ElementType
|
||||
|
||||
def newMap: PersistentMap[ElementType, ElementType]
|
||||
def newVector: PersistentVector[ElementType]
|
||||
def newRef: PersistentRef[ElementType]
|
||||
|
||||
def getMap(id: String): PersistentMap[ElementType, ElementType]
|
||||
def getVector(id: String): PersistentVector[ElementType]
|
||||
def getRef(id: String): PersistentRef[ElementType]
|
||||
|
||||
def newMap(id: String): PersistentMap[ElementType, ElementType]
|
||||
def newVector(id: String): PersistentVector[ElementType]
|
||||
def newRef(id: String): PersistentRef[ElementType]
|
||||
}
|
||||
|
||||
// for Vectors
|
||||
trait VectorStorage extends Storage {
|
||||
def insertVectorStorageEntryFor(name: String, element: AnyRef)
|
||||
def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef])
|
||||
def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef)
|
||||
def getVectorStorageEntryFor(name: String, index: Int): AnyRef
|
||||
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef]
|
||||
def getVectorStorageSizeFor(name: String): Int
|
||||
object CassandraStorage extends Storage {
|
||||
type ElementType = Array[Byte]
|
||||
|
||||
def newMap: PersistentMap[ElementType, ElementType] = newMap(Uuid.newUuid.toString)
|
||||
def newVector: PersistentVector[ElementType] = newVector(Uuid.newUuid.toString)
|
||||
def newRef: PersistentRef[ElementType] = newRef(Uuid.newUuid.toString)
|
||||
|
||||
def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
|
||||
def getVector(id: String): PersistentVector[ElementType] = newVector(id)
|
||||
def getRef(id: String): PersistentRef[ElementType] = newRef(id)
|
||||
|
||||
def newMap(id: String): PersistentMap[ElementType, ElementType] = new CassandraPersistentMap(id)
|
||||
def newVector(id: String): PersistentVector[ElementType] = new CassandraPersistentVector(id)
|
||||
def newRef(id: String): PersistentRef[ElementType] = new CassandraPersistentRef(id)
|
||||
}
|
||||
|
||||
// for Ref
|
||||
trait RefStorage extends Storage {
|
||||
def insertRefStorageFor(name: String, element: AnyRef)
|
||||
def getRefStorageFor(name: String): Option[AnyRef]
|
||||
object MongoStorage extends Storage {
|
||||
type ElementType = AnyRef
|
||||
|
||||
def newMap: PersistentMap[ElementType, ElementType] = newMap(Uuid.newUuid.toString)
|
||||
def newVector: PersistentVector[ElementType] = newVector(Uuid.newUuid.toString)
|
||||
def newRef: PersistentRef[ElementType] = newRef(Uuid.newUuid.toString)
|
||||
|
||||
def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
|
||||
def getVector(id: String): PersistentVector[ElementType] = newVector(id)
|
||||
def getRef(id: String): PersistentRef[ElementType] = newRef(id)
|
||||
|
||||
def newMap(id: String): PersistentMap[ElementType, ElementType] = new MongoPersistentMap(id)
|
||||
def newVector(id: String): PersistentVector[ElementType] = new MongoPersistentVector(id)
|
||||
def newRef(id: String): PersistentRef[ElementType] = new MongoPersistentRef(id)
|
||||
}
|
||||
|
||||
/**
|
||||
* Implementation of <tt>PersistentMap</tt> for every concrete
|
||||
* storage will have the same workflow. This abstracts the workflow.
|
||||
*
|
||||
* Subclasses just need to provide the actual concrete instance for the
|
||||
* abstract val <tt>storage</tt>.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
|
||||
with Transactional with Committable with Logging {
|
||||
protected val newAndUpdatedEntries = TransactionalState.newMap[K, V]
|
||||
protected val removedEntries = TransactionalState.newVector[K]
|
||||
protected val shouldClearOnCommit = TransactionalRef[Boolean]()
|
||||
|
||||
// to be concretized in subclasses
|
||||
val storage: MapStorageBackend[K, V]
|
||||
|
||||
def commit = {
|
||||
removedEntries.toList.foreach(key => storage.removeMapStorageFor(uuid, key))
|
||||
storage.insertMapStorageEntriesFor(uuid, newAndUpdatedEntries.toList)
|
||||
if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get.get)
|
||||
storage.removeMapStorageFor(uuid)
|
||||
newAndUpdatedEntries.clear
|
||||
removedEntries.clear
|
||||
}
|
||||
|
||||
def -=(key: K) = remove(key)
|
||||
|
||||
def +=(key: K, value: V) = put(key, value)
|
||||
|
||||
override def put(key: K, value: V): Option[V] = {
|
||||
register
|
||||
newAndUpdatedEntries.put(key, value)
|
||||
}
|
||||
|
||||
override def update(key: K, value: V) = {
|
||||
register
|
||||
newAndUpdatedEntries.update(key, value)
|
||||
}
|
||||
|
||||
def remove(key: K) = {
|
||||
register
|
||||
removedEntries.add(key)
|
||||
}
|
||||
|
||||
def slice(start: Option[K], count: Int): List[Tuple2[K, V]] =
|
||||
slice(start, None, count)
|
||||
|
||||
def slice(start: Option[K], finish: Option[K], count: Int): List[Tuple2[K, V]] = try {
|
||||
storage.getMapStorageRangeFor(uuid, start, finish, count)
|
||||
} catch { case e: Exception => Nil }
|
||||
|
||||
override def clear = {
|
||||
register
|
||||
shouldClearOnCommit.swap(true)
|
||||
}
|
||||
|
||||
override def contains(key: K): Boolean = try {
|
||||
newAndUpdatedEntries.contains(key) ||
|
||||
storage.getMapStorageEntryFor(uuid, key).isDefined
|
||||
} catch { case e: Exception => false }
|
||||
|
||||
override def size: Int = try {
|
||||
storage.getMapStorageSizeFor(uuid)
|
||||
} catch { case e: Exception => 0 }
|
||||
|
||||
override def get(key: K): Option[V] = {
|
||||
if (newAndUpdatedEntries.contains(key)) {
|
||||
newAndUpdatedEntries.get(key)
|
||||
}
|
||||
else try {
|
||||
storage.getMapStorageEntryFor(uuid, key)
|
||||
} catch { case e: Exception => None }
|
||||
}
|
||||
|
||||
override def elements: Iterator[Tuple2[K, V]] = {
|
||||
new Iterator[Tuple2[K, V]] {
|
||||
private val originalList: List[Tuple2[K, V]] = try {
|
||||
storage.getMapStorageFor(uuid)
|
||||
} catch {
|
||||
case e: Throwable => Nil
|
||||
}
|
||||
// FIXME how to deal with updated entries, these should be replaced in the originalList not just added
|
||||
private var elements = newAndUpdatedEntries.toList ::: originalList.reverse
|
||||
override def next: Tuple2[K, V]= synchronized {
|
||||
val element = elements.head
|
||||
elements = elements.tail
|
||||
element
|
||||
}
|
||||
override def hasNext: Boolean = synchronized { !elements.isEmpty }
|
||||
}
|
||||
}
|
||||
|
||||
private def register = {
|
||||
if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
|
||||
currentTransaction.get.get.register(uuid, this)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class CassandraPersistentMap(id: String) extends PersistentMap[Array[Byte], Array[Byte]] {
|
||||
val uuid = id
|
||||
val storage = CassandraStorageBackend
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements a persistent transactional map based on the MongoDB document storage.
|
||||
*
|
||||
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||
*/
|
||||
class MongoPersistentMap(id: String) extends PersistentMap[AnyRef, AnyRef] {
|
||||
val uuid = id
|
||||
val storage = MongoStorageBackend
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements a template for a concrete persistent transactional vector based storage.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait PersistentVector[T] extends RandomAccessSeq[T] with Transactional with Committable {
|
||||
protected val newElems = TransactionalState.newVector[T]
|
||||
protected val updatedElems = TransactionalState.newMap[Int, T]
|
||||
protected val removedElems = TransactionalState.newVector[T]
|
||||
protected val shouldClearOnCommit = TransactionalRef[Boolean]()
|
||||
|
||||
val storage: VectorStorageBackend[T]
|
||||
|
||||
def commit = {
|
||||
// FIXME: should use batch function once the bug is resolved
|
||||
for (element <- newElems) storage.insertVectorStorageEntryFor(uuid, element)
|
||||
for (entry <- updatedElems) storage.updateVectorStorageEntryFor(uuid, entry._1, entry._2)
|
||||
newElems.clear
|
||||
updatedElems.clear
|
||||
}
|
||||
|
||||
def +(elem: T) = add(elem)
|
||||
|
||||
def add(elem: T) = {
|
||||
register
|
||||
newElems + elem
|
||||
}
|
||||
|
||||
def apply(index: Int): T = get(index)
|
||||
|
||||
def get(index: Int): T = {
|
||||
if (newElems.size > index) newElems(index)
|
||||
else storage.getVectorStorageEntryFor(uuid, index)
|
||||
}
|
||||
|
||||
override def slice(start: Int, count: Int): RandomAccessSeq[T] = slice(Some(start), None, count)
|
||||
|
||||
def slice(start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[T] = {
|
||||
val buffer = new scala.collection.mutable.ArrayBuffer[T]
|
||||
storage.getVectorStorageRangeFor(uuid, start, finish, count).foreach(buffer.append(_))
|
||||
buffer
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the <i>tail</i> element of this vector.
|
||||
*/
|
||||
// FIXME: implement persistent vector pop
|
||||
def pop: T = {
|
||||
register
|
||||
throw new UnsupportedOperationException("need to implement persistent vector pop")
|
||||
}
|
||||
|
||||
def update(index: Int, newElem: T) = {
|
||||
register
|
||||
storage.updateVectorStorageEntryFor(uuid, index, newElem)
|
||||
}
|
||||
|
||||
override def first: T = get(0)
|
||||
|
||||
override def last: T = {
|
||||
if (newElems.length != 0) newElems.last
|
||||
else {
|
||||
val len = length
|
||||
if (len == 0) throw new NoSuchElementException("Vector is empty")
|
||||
get(len - 1)
|
||||
}
|
||||
}
|
||||
|
||||
def length: Int = storage.getVectorStorageSizeFor(uuid) + newElems.length
|
||||
|
||||
private def register = {
|
||||
if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
|
||||
currentTransaction.get.get.register(uuid, this)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements a persistent transactional vector based on the Cassandra
|
||||
* distributed P2P key-value storage.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class CassandraPersistentVector(id: String) extends PersistentVector[Array[Byte]] {
|
||||
val uuid = id
|
||||
val storage = CassandraStorageBackend
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements a persistent transactional vector based on the MongoDB
|
||||
* document storage.
|
||||
*
|
||||
* @author <a href="http://debasishg.blogspot.com">Debaissh Ghosh</a>
|
||||
*/
|
||||
class MongoPersistentVector(id: String) extends PersistentVector[AnyRef] {
|
||||
val uuid = id
|
||||
val storage = MongoStorageBackend
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements a persistent reference with abstract storage.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait PersistentRef[T] extends Transactional with Committable {
|
||||
protected val ref = new TransactionalRef[T]
|
||||
|
||||
val storage: RefStorageBackend[T]
|
||||
|
||||
def commit = if (ref.isDefined) {
|
||||
storage.insertRefStorageFor(uuid, ref.get.get)
|
||||
ref.swap(null.asInstanceOf[T])
|
||||
}
|
||||
|
||||
def swap(elem: T) = {
|
||||
register
|
||||
ref.swap(elem)
|
||||
}
|
||||
|
||||
def get: Option[T] = if (ref.isDefined) ref.get else storage.getRefStorageFor(uuid)
|
||||
|
||||
def isDefined: Boolean = ref.isDefined || storage.getRefStorageFor(uuid).isDefined
|
||||
|
||||
def getOrElse(default: => T): T = {
|
||||
val current = get
|
||||
if (current.isDefined) current.get
|
||||
else default
|
||||
}
|
||||
|
||||
private def register = {
|
||||
if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
|
||||
currentTransaction.get.get.register(uuid, this)
|
||||
}
|
||||
}
|
||||
|
||||
class CassandraPersistentRef(id: String) extends PersistentRef[Array[Byte]] {
|
||||
val uuid = id
|
||||
val storage = CassandraStorageBackend
|
||||
}
|
||||
|
||||
class MongoPersistentRef(id: String) extends PersistentRef[AnyRef] {
|
||||
val uuid = id
|
||||
val storage = MongoStorageBackend
|
||||
}
|
||||
|
|
|
|||
36
akka-persistence/src/main/scala/StorageBackend.scala
Normal file
36
akka-persistence/src/main/scala/StorageBackend.scala
Normal file
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Copyright (C) 2009 Scalable Solutions.
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.state
|
||||
|
||||
// abstracts persistence storage
|
||||
trait StorageBackend
|
||||
|
||||
// for Maps
|
||||
trait MapStorageBackend[K, V] extends StorageBackend {
|
||||
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[K, V]])
|
||||
def insertMapStorageEntryFor(name: String, key: K, value: V)
|
||||
def removeMapStorageFor(name: String)
|
||||
def removeMapStorageFor(name: String, key: K)
|
||||
def getMapStorageEntryFor(name: String, key: K): Option[V]
|
||||
def getMapStorageSizeFor(name: String): Int
|
||||
def getMapStorageFor(name: String): List[Tuple2[K, V]]
|
||||
def getMapStorageRangeFor(name: String, start: Option[K], finish: Option[K], count: Int): List[Tuple2[K, V]]
|
||||
}
|
||||
|
||||
// for Vectors
|
||||
trait VectorStorageBackend[T] extends StorageBackend {
|
||||
def insertVectorStorageEntryFor(name: String, element: T)
|
||||
def insertVectorStorageEntriesFor(name: String, elements: List[T])
|
||||
def updateVectorStorageEntryFor(name: String, index: Int, elem: T)
|
||||
def getVectorStorageEntryFor(name: String, index: Int): T
|
||||
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[T]
|
||||
def getVectorStorageSizeFor(name: String): Int
|
||||
}
|
||||
|
||||
// for Ref
|
||||
trait RefStorageBackend[T] extends StorageBackend {
|
||||
def insertRefStorageFor(name: String, element: T)
|
||||
def getRefStorageFor(name: String): Option[T]
|
||||
}
|
||||
|
|
@ -1,13 +1,10 @@
|
|||
package se.scalablesolutions.akka.state
|
||||
|
||||
import akka.actor.Actor
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import java.util.concurrent.TimeUnit
|
||||
import se.scalablesolutions.akka.actor.Actor
|
||||
|
||||
import junit.framework.TestCase
|
||||
import dispatch._
|
||||
|
||||
import org.junit.{Test, Before}
|
||||
import org.junit.Test
|
||||
import org.junit.Assert._
|
||||
|
||||
case class GetMapState(key: String)
|
||||
|
|
@ -31,35 +28,35 @@ class CassandraPersistentActor extends Actor {
|
|||
timeout = 100000
|
||||
makeTransactionRequired
|
||||
|
||||
private lazy val mapState: PersistentMap = PersistentState.newMap(CassandraStorageConfig())
|
||||
private lazy val vectorState: PersistentVector = PersistentState.newVector(CassandraStorageConfig())
|
||||
private lazy val refState: PersistentRef = PersistentState.newRef(CassandraStorageConfig())
|
||||
private lazy val mapState = CassandraStorage.newMap
|
||||
private lazy val vectorState = CassandraStorage.newVector
|
||||
private lazy val refState = CassandraStorage.newRef
|
||||
|
||||
def receive = {
|
||||
case GetMapState(key) =>
|
||||
reply(mapState.get(key).get)
|
||||
reply(mapState.get(key.getBytes("UTF-8")).get)
|
||||
case GetVectorSize =>
|
||||
reply(vectorState.length.asInstanceOf[AnyRef])
|
||||
case GetRefState =>
|
||||
reply(refState.get.get)
|
||||
case SetMapState(key, msg) =>
|
||||
mapState.put(key, msg)
|
||||
mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8"))
|
||||
reply(msg)
|
||||
case SetVectorState(msg) =>
|
||||
vectorState.add(msg)
|
||||
vectorState.add(msg.getBytes("UTF-8"))
|
||||
reply(msg)
|
||||
case SetRefState(msg) =>
|
||||
refState.swap(msg)
|
||||
refState.swap(msg.getBytes("UTF-8"))
|
||||
reply(msg)
|
||||
case Success(key, msg) =>
|
||||
mapState.put(key, msg)
|
||||
vectorState.add(msg)
|
||||
refState.swap(msg)
|
||||
mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8"))
|
||||
vectorState.add(msg.getBytes("UTF-8"))
|
||||
refState.swap(msg.getBytes("UTF-8"))
|
||||
reply(msg)
|
||||
case Failure(key, msg, failer) =>
|
||||
mapState.put(key, msg)
|
||||
vectorState.add(msg)
|
||||
refState.swap(msg)
|
||||
mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8"))
|
||||
vectorState.add(msg.getBytes("UTF-8"))
|
||||
refState.swap(msg.getBytes("UTF-8"))
|
||||
failer !! "Failure"
|
||||
reply(msg)
|
||||
}
|
||||
|
|
@ -74,14 +71,15 @@ class CassandraPersistentActor extends Actor {
|
|||
}
|
||||
|
||||
class CassandraPersistentActorSpec extends TestCase {
|
||||
|
||||
|
||||
@Test
|
||||
def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||
val stateful = new CassandraPersistentActor
|
||||
stateful.start
|
||||
stateful !! SetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
|
||||
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
assertEquals("new state", (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
|
||||
val result: Array[Byte] = (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get
|
||||
assertEquals("new state", new String(result, 0, result.length, "UTF-8"))
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -95,7 +93,8 @@ class CassandraPersistentActorSpec extends TestCase {
|
|||
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
fail("should have thrown an exception")
|
||||
} catch {case e: RuntimeException => {}}
|
||||
assertEquals("init", (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
|
||||
val result: Array[Byte] = (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get
|
||||
assertEquals("init", new String(result, 0, result.length, "UTF-8")) // check that state is == init state
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -127,7 +126,8 @@ class CassandraPersistentActorSpec extends TestCase {
|
|||
stateful.start
|
||||
stateful !! SetRefState("init") // set init state
|
||||
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
assertEquals("new state", (stateful !! GetRefState).get)
|
||||
val result: Array[Byte] = (stateful !! GetRefState).get
|
||||
assertEquals("new state", new String(result, 0, result.length, "UTF-8"))
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -141,6 +141,7 @@ class CassandraPersistentActorSpec extends TestCase {
|
|||
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
fail("should have thrown an exception")
|
||||
} catch {case e: RuntimeException => {}}
|
||||
assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state
|
||||
val result: Array[Byte] = (stateful !! GetRefState).get
|
||||
assertEquals("init", new String(result, 0, result.length, "UTF-8")) // check that state is == init state
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,10 +31,8 @@ case object LogSize
|
|||
|
||||
class BankAccountActor extends Actor {
|
||||
makeTransactionRequired
|
||||
private val accountState =
|
||||
PersistentState.newMap(MongoStorageConfig())
|
||||
private val txnLog =
|
||||
PersistentState.newVector(MongoStorageConfig())
|
||||
private val accountState = MongoStorage.newMap
|
||||
private val txnLog = MongoStorage.newVector
|
||||
|
||||
def receive: PartialFunction[Any, Unit] = {
|
||||
// check balance
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ class MongoStorageSpec extends TestCase {
|
|||
val changeSetM = new scala.collection.mutable.HashMap[AnyRef, AnyRef]
|
||||
|
||||
override def setUp = {
|
||||
MongoStorage.coll.drop
|
||||
MongoStorageBackend.coll.drop
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -22,40 +22,40 @@ class MongoStorageSpec extends TestCase {
|
|||
changeSetV += "debasish" // string
|
||||
changeSetV += List(1, 2, 3) // Scala List
|
||||
changeSetV += List(100, 200)
|
||||
MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
|
||||
MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
|
||||
assertEquals(
|
||||
3,
|
||||
MongoStorage.getVectorStorageSizeFor("U-A1"))
|
||||
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
|
||||
changeSetV.clear
|
||||
|
||||
// changeSetV should be reinitialized
|
||||
changeSetV += List(12, 23, 45)
|
||||
changeSetV += "maulindu"
|
||||
MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
|
||||
MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
|
||||
assertEquals(
|
||||
5,
|
||||
MongoStorage.getVectorStorageSizeFor("U-A1"))
|
||||
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
|
||||
|
||||
// add more to the same changeSetV
|
||||
changeSetV += "ramanendu"
|
||||
changeSetV += Map(1 -> "dg", 2 -> "mc")
|
||||
|
||||
// add for a diff transaction
|
||||
MongoStorage.insertVectorStorageEntriesFor("U-A2", changeSetV.toList)
|
||||
MongoStorageBackend.insertVectorStorageEntriesFor("U-A2", changeSetV.toList)
|
||||
assertEquals(
|
||||
4,
|
||||
MongoStorage.getVectorStorageSizeFor("U-A2"))
|
||||
MongoStorageBackend.getVectorStorageSizeFor("U-A2"))
|
||||
|
||||
// previous transaction change set should remain same
|
||||
assertEquals(
|
||||
5,
|
||||
MongoStorage.getVectorStorageSizeFor("U-A1"))
|
||||
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
|
||||
|
||||
// test single element entry
|
||||
MongoStorage.insertVectorStorageEntryFor("U-A1", Map(1->1, 2->4, 3->9))
|
||||
MongoStorageBackend.insertVectorStorageEntryFor("U-A1", Map(1->1, 2->4, 3->9))
|
||||
assertEquals(
|
||||
6,
|
||||
MongoStorage.getVectorStorageSizeFor("U-A1"))
|
||||
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -64,25 +64,25 @@ class MongoStorageSpec extends TestCase {
|
|||
// initially everything 0
|
||||
assertEquals(
|
||||
0,
|
||||
MongoStorage.getVectorStorageSizeFor("U-A2"))
|
||||
MongoStorageBackend.getVectorStorageSizeFor("U-A2"))
|
||||
|
||||
assertEquals(
|
||||
0,
|
||||
MongoStorage.getVectorStorageSizeFor("U-A1"))
|
||||
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
|
||||
|
||||
// get some stuff
|
||||
changeSetV += "debasish"
|
||||
changeSetV += List(BigDecimal(12), BigDecimal(13), BigDecimal(14))
|
||||
MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
|
||||
MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
|
||||
|
||||
assertEquals(
|
||||
2,
|
||||
MongoStorage.getVectorStorageSizeFor("U-A1"))
|
||||
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
|
||||
|
||||
val JsString(str) = MongoStorage.getVectorStorageEntryFor("U-A1", 0).asInstanceOf[JsString]
|
||||
val JsString(str) = MongoStorageBackend.getVectorStorageEntryFor("U-A1", 0).asInstanceOf[JsString]
|
||||
assertEquals("debasish", str)
|
||||
|
||||
val l = MongoStorage.getVectorStorageEntryFor("U-A1", 1).asInstanceOf[JsValue]
|
||||
val l = MongoStorageBackend.getVectorStorageEntryFor("U-A1", 1).asInstanceOf[JsValue]
|
||||
val num_list = list ! num
|
||||
val num_list(l0) = l
|
||||
assertEquals(List(12, 13, 14), l0)
|
||||
|
|
@ -91,14 +91,14 @@ class MongoStorageSpec extends TestCase {
|
|||
changeSetV += Map(1->1, 2->4, 3->9)
|
||||
changeSetV += BigInt(2310)
|
||||
changeSetV += List(100, 200, 300)
|
||||
MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
|
||||
MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
|
||||
|
||||
assertEquals(
|
||||
5,
|
||||
MongoStorage.getVectorStorageSizeFor("U-A1"))
|
||||
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
|
||||
|
||||
val r =
|
||||
MongoStorage.getVectorStorageRangeFor("U-A1", Some(1), None, 3)
|
||||
MongoStorageBackend.getVectorStorageRangeFor("U-A1", Some(1), None, 3)
|
||||
|
||||
assertEquals(3, r.size)
|
||||
val lr = r(0).asInstanceOf[JsValue]
|
||||
|
|
@ -109,12 +109,12 @@ class MongoStorageSpec extends TestCase {
|
|||
@Test
|
||||
def testVectorFetchForNonExistentKeys = {
|
||||
try {
|
||||
MongoStorage.getVectorStorageEntryFor("U-A1", 1)
|
||||
MongoStorageBackend.getVectorStorageEntryFor("U-A1", 1)
|
||||
fail("should throw an exception")
|
||||
} catch {case e: Predef.NoSuchElementException => {}}
|
||||
|
||||
try {
|
||||
MongoStorage.getVectorStorageRangeFor("U-A1", Some(2), None, 12)
|
||||
MongoStorageBackend.getVectorStorageRangeFor("U-A1", Some(2), None, 12)
|
||||
fail("should throw an exception")
|
||||
} catch {case e: Predef.NoSuchElementException => {}}
|
||||
}
|
||||
|
|
@ -128,43 +128,43 @@ class MongoStorageSpec extends TestCase {
|
|||
changeSetM += "6" -> java.util.Calendar.getInstance.getTime
|
||||
|
||||
// insert all into Mongo
|
||||
MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
|
||||
MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
|
||||
assertEquals(
|
||||
6,
|
||||
MongoStorage.getMapStorageSizeFor("U-M1"))
|
||||
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
|
||||
|
||||
// individual insert api
|
||||
MongoStorage.insertMapStorageEntryFor("U-M1", "7", "akka")
|
||||
MongoStorage.insertMapStorageEntryFor("U-M1", "8", List(23, 25))
|
||||
MongoStorageBackend.insertMapStorageEntryFor("U-M1", "7", "akka")
|
||||
MongoStorageBackend.insertMapStorageEntryFor("U-M1", "8", List(23, 25))
|
||||
assertEquals(
|
||||
8,
|
||||
MongoStorage.getMapStorageSizeFor("U-M1"))
|
||||
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
|
||||
|
||||
// add the same changeSet for another transaction
|
||||
MongoStorage.insertMapStorageEntriesFor("U-M2", changeSetM.toList)
|
||||
MongoStorageBackend.insertMapStorageEntriesFor("U-M2", changeSetM.toList)
|
||||
assertEquals(
|
||||
6,
|
||||
MongoStorage.getMapStorageSizeFor("U-M2"))
|
||||
MongoStorageBackend.getMapStorageSizeFor("U-M2"))
|
||||
|
||||
// the first transaction should remain the same
|
||||
assertEquals(
|
||||
8,
|
||||
MongoStorage.getMapStorageSizeFor("U-M1"))
|
||||
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
|
||||
changeSetM.clear
|
||||
}
|
||||
|
||||
@Test
|
||||
def testMapContents = {
|
||||
fillMap
|
||||
MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
|
||||
MongoStorage.getMapStorageEntryFor("U-M1", "2") match {
|
||||
MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
|
||||
MongoStorageBackend.getMapStorageEntryFor("U-M1", "2") match {
|
||||
case Some(x) => {
|
||||
val JsString(str) = x.asInstanceOf[JsValue]
|
||||
assertEquals("peter", str)
|
||||
}
|
||||
case None => fail("should fetch peter")
|
||||
}
|
||||
MongoStorage.getMapStorageEntryFor("U-M1", "4") match {
|
||||
MongoStorageBackend.getMapStorageEntryFor("U-M1", "4") match {
|
||||
case Some(x) => {
|
||||
val num_list = list ! num
|
||||
val num_list(l0) = x.asInstanceOf[JsValue]
|
||||
|
|
@ -172,7 +172,7 @@ class MongoStorageSpec extends TestCase {
|
|||
}
|
||||
case None => fail("should fetch list")
|
||||
}
|
||||
MongoStorage.getMapStorageEntryFor("U-M1", "3") match {
|
||||
MongoStorageBackend.getMapStorageEntryFor("U-M1", "3") match {
|
||||
case Some(x) => {
|
||||
val num_list = list ! num
|
||||
val num_list(l0) = x.asInstanceOf[JsValue]
|
||||
|
|
@ -183,7 +183,7 @@ class MongoStorageSpec extends TestCase {
|
|||
|
||||
// get the entire map
|
||||
val l: List[Tuple2[AnyRef, AnyRef]] =
|
||||
MongoStorage.getMapStorageFor("U-M1")
|
||||
MongoStorageBackend.getMapStorageFor("U-M1")
|
||||
|
||||
assertEquals(4, l.size)
|
||||
assertTrue(l.map(_._1).contains("1"))
|
||||
|
|
@ -196,7 +196,7 @@ class MongoStorageSpec extends TestCase {
|
|||
|
||||
// trying to fetch for a non-existent transaction will throw
|
||||
try {
|
||||
MongoStorage.getMapStorageFor("U-M2")
|
||||
MongoStorageBackend.getMapStorageFor("U-M2")
|
||||
fail("should throw an exception")
|
||||
} catch {case e: Predef.NoSuchElementException => {}}
|
||||
|
||||
|
|
@ -207,11 +207,11 @@ class MongoStorageSpec extends TestCase {
|
|||
def testMapContentsByRange = {
|
||||
fillMap
|
||||
changeSetM += "5" -> Map(1 -> "dg", 2 -> "mc")
|
||||
MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
|
||||
MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
|
||||
|
||||
// specify start and count
|
||||
val l: List[Tuple2[AnyRef, AnyRef]] =
|
||||
MongoStorage.getMapStorageRangeFor(
|
||||
MongoStorageBackend.getMapStorageRangeFor(
|
||||
"U-M1", Some(Integer.valueOf(2)), None, 3)
|
||||
|
||||
assertEquals(3, l.size)
|
||||
|
|
@ -227,27 +227,27 @@ class MongoStorageSpec extends TestCase {
|
|||
|
||||
// specify start, finish and count where finish - start == count
|
||||
assertEquals(3,
|
||||
MongoStorage.getMapStorageRangeFor(
|
||||
MongoStorageBackend.getMapStorageRangeFor(
|
||||
"U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(5)), 3).size)
|
||||
|
||||
// specify start, finish and count where finish - start > count
|
||||
assertEquals(3,
|
||||
MongoStorage.getMapStorageRangeFor(
|
||||
MongoStorageBackend.getMapStorageRangeFor(
|
||||
"U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(9)), 3).size)
|
||||
|
||||
// do not specify start or finish
|
||||
assertEquals(3,
|
||||
MongoStorage.getMapStorageRangeFor(
|
||||
MongoStorageBackend.getMapStorageRangeFor(
|
||||
"U-M1", None, None, 3).size)
|
||||
|
||||
// specify finish and count
|
||||
assertEquals(3,
|
||||
MongoStorage.getMapStorageRangeFor(
|
||||
MongoStorageBackend.getMapStorageRangeFor(
|
||||
"U-M1", None, Some(Integer.valueOf(3)), 3).size)
|
||||
|
||||
// specify start, finish and count where finish < start
|
||||
assertEquals(3,
|
||||
MongoStorage.getMapStorageRangeFor(
|
||||
MongoStorageBackend.getMapStorageRangeFor(
|
||||
"U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(1)), 3).size)
|
||||
|
||||
changeSetM.clear
|
||||
|
|
@ -258,35 +258,35 @@ class MongoStorageSpec extends TestCase {
|
|||
fillMap
|
||||
changeSetM += "5" -> Map(1 -> "dg", 2 -> "mc")
|
||||
|
||||
MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
|
||||
MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
|
||||
assertEquals(5,
|
||||
MongoStorage.getMapStorageSizeFor("U-M1"))
|
||||
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
|
||||
|
||||
// remove key "3"
|
||||
MongoStorage.removeMapStorageFor("U-M1", "3")
|
||||
MongoStorageBackend.removeMapStorageFor("U-M1", "3")
|
||||
assertEquals(4,
|
||||
MongoStorage.getMapStorageSizeFor("U-M1"))
|
||||
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
|
||||
|
||||
try {
|
||||
MongoStorage.getMapStorageEntryFor("U-M1", "3")
|
||||
MongoStorageBackend.getMapStorageEntryFor("U-M1", "3")
|
||||
fail("should throw exception")
|
||||
} catch { case e => {}}
|
||||
|
||||
// remove key "4"
|
||||
MongoStorage.removeMapStorageFor("U-M1", "4")
|
||||
MongoStorageBackend.removeMapStorageFor("U-M1", "4")
|
||||
assertEquals(3,
|
||||
MongoStorage.getMapStorageSizeFor("U-M1"))
|
||||
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
|
||||
|
||||
// remove key "2"
|
||||
MongoStorage.removeMapStorageFor("U-M1", "2")
|
||||
MongoStorageBackend.removeMapStorageFor("U-M1", "2")
|
||||
assertEquals(2,
|
||||
MongoStorage.getMapStorageSizeFor("U-M1"))
|
||||
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
|
||||
|
||||
// remove the whole stuff
|
||||
MongoStorage.removeMapStorageFor("U-M1")
|
||||
MongoStorageBackend.removeMapStorageFor("U-M1")
|
||||
|
||||
try {
|
||||
MongoStorage.getMapStorageFor("U-M1")
|
||||
MongoStorageBackend.getMapStorageFor("U-M1")
|
||||
fail("should throw exception")
|
||||
} catch { case e: NoSuchElementException => {}}
|
||||
|
||||
|
|
@ -303,14 +303,14 @@ class MongoStorageSpec extends TestCase {
|
|||
|
||||
@Test
|
||||
def testRefStorage = {
|
||||
MongoStorage.getRefStorageFor("U-R1") match {
|
||||
MongoStorageBackend.getRefStorageFor("U-R1") match {
|
||||
case None =>
|
||||
case Some(o) => fail("should be None")
|
||||
}
|
||||
|
||||
val m = Map("1"->1, "2"->4, "3"->9)
|
||||
MongoStorage.insertRefStorageFor("U-R1", m)
|
||||
MongoStorage.getRefStorageFor("U-R1") match {
|
||||
MongoStorageBackend.insertRefStorageFor("U-R1", m)
|
||||
MongoStorageBackend.getRefStorageFor("U-R1") match {
|
||||
case None => fail("should not be empty")
|
||||
case Some(r) => {
|
||||
val a = r.asInstanceOf[JsValue]
|
||||
|
|
@ -331,8 +331,8 @@ class MongoStorageSpec extends TestCase {
|
|||
// insert another one
|
||||
// the previous one should be replaced
|
||||
val b = List("100", "jonas")
|
||||
MongoStorage.insertRefStorageFor("U-R1", b)
|
||||
MongoStorage.getRefStorageFor("U-R1") match {
|
||||
MongoStorageBackend.insertRefStorageFor("U-R1", b)
|
||||
MongoStorageBackend.getRefStorageFor("U-R1") match {
|
||||
case None => fail("should not be empty")
|
||||
case Some(r) => {
|
||||
val a = r.asInstanceOf[JsValue]
|
||||
|
|
|
|||
|
|
@ -12,9 +12,9 @@ import se.scalablesolutions.akka.annotation.transactionrequired;
|
|||
import se.scalablesolutions.akka.annotation.prerestart;
|
||||
import se.scalablesolutions.akka.annotation.postrestart;
|
||||
import se.scalablesolutions.akka.state.PersistentMap;
|
||||
import se.scalablesolutions.akka.state.PersistentState;
|
||||
import se.scalablesolutions.akka.state.PersistentMap;
|
||||
import se.scalablesolutions.akka.state.CassandraStorageConfig;
|
||||
import se.scalablesolutions.akka.state.CassandraStorage;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* Try service out by invoking (multiple times):
|
||||
|
|
@ -26,21 +26,22 @@ import se.scalablesolutions.akka.state.CassandraStorageConfig;
|
|||
@Path("/persistentjavacount")
|
||||
@transactionrequired
|
||||
public class PersistentSimpleService {
|
||||
private Object KEY = "COUNTER";
|
||||
private String KEY = "COUNTER";
|
||||
|
||||
private boolean hasStartedTicking = false;
|
||||
private PersistentMap storage = PersistentState.newMap(new CassandraStorageConfig());
|
||||
private PersistentMap<byte[], byte[]> storage = CassandraStorage.newMap();
|
||||
|
||||
@GET
|
||||
@Produces({"application/html"})
|
||||
public String count() {
|
||||
if (!hasStartedTicking) {
|
||||
storage.put(KEY, 0);
|
||||
storage.put(KEY.getBytes(), ByteBuffer.allocate(2).putInt(0).array());
|
||||
hasStartedTicking = true;
|
||||
return "Tick: 0\n";
|
||||
} else {
|
||||
int counter = (Integer)storage.get(KEY).get() + 1;
|
||||
storage.put(KEY, counter);
|
||||
byte[] bytes = (byte[])storage.get(KEY.getBytes()).get();
|
||||
int counter = ByteBuffer.wrap(bytes).getInt();
|
||||
storage.put(KEY.getBytes(), ByteBuffer.allocate(4).putInt(counter + 1).array());
|
||||
return "Tick: " + counter + "\n";
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,7 +13,6 @@ import se.scalablesolutions.akka.annotation.prerestart;
|
|||
import se.scalablesolutions.akka.annotation.postrestart;
|
||||
import se.scalablesolutions.akka.state.TransactionalState;
|
||||
import se.scalablesolutions.akka.state.TransactionalMap;
|
||||
import se.scalablesolutions.akka.state.CassandraStorageConfig;
|
||||
|
||||
/**
|
||||
* Try service out by invoking (multiple times):
|
||||
|
|
|
|||
|
|
@ -1,13 +1,13 @@
|
|||
package sample.lift
|
||||
|
||||
import se.scalablesolutions.akka.state.{PersistentState, TransactionalState, CassandraStorageConfig}
|
||||
import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor}
|
||||
import se.scalablesolutions.akka.actor.Actor
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.state.{CassandraStorage, TransactionalState}
|
||||
|
||||
import java.lang.Integer
|
||||
import javax.ws.rs.core.MultivaluedMap
|
||||
import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes}
|
||||
import javax.ws.rs.{GET, Path, Produces}
|
||||
import java.nio.ByteBuffer
|
||||
|
||||
|
||||
/**
|
||||
* Try service out by invoking (multiple times):
|
||||
|
|
@ -56,7 +56,7 @@ class PersistentSimpleService extends Actor {
|
|||
case object Tick
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
private val storage = PersistentState.newMap(CassandraStorageConfig())
|
||||
private val storage = CassandraStorage.newMap
|
||||
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
|
|
@ -64,13 +64,14 @@ class PersistentSimpleService extends Actor {
|
|||
|
||||
def receive = {
|
||||
case Tick => if (hasStartedTicking) {
|
||||
val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue
|
||||
storage.put(KEY, new Integer(counter + 1))
|
||||
reply(<h1>Tick: {counter + 1}</h1>)
|
||||
val bytes = storage.get(KEY.getBytes).get
|
||||
val counter = ByteBuffer.wrap(bytes).getInt
|
||||
storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array)
|
||||
reply(<success>Tick:{counter + 1}</success>)
|
||||
} else {
|
||||
storage.put(KEY, new Integer(0))
|
||||
storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(0).array)
|
||||
hasStartedTicking = true
|
||||
reply(<h1>Tick: 0</h1>)
|
||||
reply(<success>Tick: 0</success>)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,18 +4,19 @@
|
|||
|
||||
package sample.scala
|
||||
|
||||
import se.scalablesolutions.akka.state.{PersistentState, TransactionalState, CassandraStorageConfig}
|
||||
import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor}
|
||||
import se.scalablesolutions.akka.state.{CassandraStorage, TransactionalState}
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
|
||||
import java.lang.Integer
|
||||
import java.nio.ByteBuffer
|
||||
import javax.ws.rs.core.MultivaluedMap
|
||||
import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes,PathParam}
|
||||
|
||||
import org.atmosphere.annotation.{Broadcast, Suspend}
|
||||
import org.atmosphere.util.XSSHtmlFilter
|
||||
import org.atmosphere.cpr.{BroadcastFilter,Broadcaster}
|
||||
import org.atmosphere.cpr.{Broadcaster, BroadcastFilter}
|
||||
import org.atmosphere.jersey.Broadcastable
|
||||
|
||||
class Boot {
|
||||
|
|
@ -105,7 +106,7 @@ class PersistentSimpleService extends Actor {
|
|||
case object Tick
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
private val storage = PersistentState.newMap(CassandraStorageConfig())
|
||||
private val storage = CassandraStorage.newMap
|
||||
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
|
|
@ -113,11 +114,12 @@ class PersistentSimpleService extends Actor {
|
|||
|
||||
def receive = {
|
||||
case Tick => if (hasStartedTicking) {
|
||||
val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue
|
||||
storage.put(KEY, new Integer(counter + 1))
|
||||
val bytes = storage.get(KEY.getBytes).get
|
||||
val counter = ByteBuffer.wrap(bytes).getInt
|
||||
storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array)
|
||||
reply(<success>Tick:{counter + 1}</success>)
|
||||
} else {
|
||||
storage.put(KEY, new Integer(0))
|
||||
storage.put(KEY.getBytes, Array(0.toByte))
|
||||
hasStartedTicking = true
|
||||
reply(<success>Tick: 0</success>)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ class DigestAuthenticationService extends DigestAuthenticationActor {
|
|||
//don't forget to configure your standalone Cassandra instance
|
||||
//
|
||||
//makeTransactionRequired
|
||||
//override def mkNonceMap = PersistentState.newMap(CassandraStorageConfig()).asInstanceOf[scala.collection.mutable.Map[String,Long]]
|
||||
//override def mkNonceMap = Storage.newMap(CassandraStorageConfig()).asInstanceOf[scala.collection.mutable.Map[String,Long]]
|
||||
|
||||
//Use an in-memory nonce-map as default
|
||||
override def mkNonceMap = new scala.collection.mutable.HashMap[String, Long]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue