restructured distribution and maven files, removed unused jars, added bunch of maven plugins, added ActorRegistry, using real AOP aspects for proxies
This commit is contained in:
commit
e6db95be3a
9 changed files with 716 additions and 98 deletions
|
|
@ -1,68 +1,68 @@
|
|||
####################
|
||||
# Akka Config File #
|
||||
####################
|
||||
|
||||
# This file has all the default settings, so all these could be remove with no visible effect.
|
||||
# Modify as needed.
|
||||
|
||||
<log>
|
||||
filename = "./logs/akka.log"
|
||||
roll = "daily" # Options: never, hourly, daily, sunday/monday/...
|
||||
level = "debug" # Options: fatal, critical, error, warning, info, debug, trace
|
||||
console = on
|
||||
# syslog_host = ""
|
||||
# syslog_server_name = ""
|
||||
</log>
|
||||
|
||||
<akka>
|
||||
version = "0.6"
|
||||
|
||||
boot = ["sample.java.Boot", "sample.scala.Boot"] # FQN to the class doing initial active object/actor
|
||||
# supervisor bootstrap, should be defined in default constructor
|
||||
<actor>
|
||||
timeout = 5000 # default timeout for future based invocations
|
||||
serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
|
||||
</actor>
|
||||
|
||||
<stm>
|
||||
service = on
|
||||
restart-on-collision = off # (not implemented yet) if 'on' then it reschedules the transaction,
|
||||
# if 'off' then throws an exception or rollback for user to handle
|
||||
wait-for-completion = 100 # how long time in millis a transaction should be given time to complete when a collision is detected
|
||||
wait-nr-of-times = 3 # the number of times it should check for completion of a pending transaction upon collision
|
||||
distributed = off # not implemented yet
|
||||
</stm>
|
||||
|
||||
<remote>
|
||||
service = on
|
||||
hostname = "localhost"
|
||||
port = 9999
|
||||
connection-timeout = 1000 # in millis
|
||||
</remote>
|
||||
|
||||
<rest>
|
||||
service = on
|
||||
hostname = "localhost"
|
||||
port = 9998
|
||||
</rest>
|
||||
|
||||
<storage>
|
||||
system = "cassandra" # Options: cassandra, mongodb
|
||||
|
||||
<cassandra>
|
||||
service = on
|
||||
hostname = "127.0.0.1" # IP address or hostname of one of the Cassandra cluster's seeds
|
||||
port = 9160
|
||||
storage-format = "java" # Options: java, scala-json, java-json, protobuf
|
||||
consistency-level = 1
|
||||
</cassandra>
|
||||
|
||||
<mongodb>
|
||||
service = on
|
||||
hostname = "127.0.0.1" # IP address or hostname of the MongoDB DB instance
|
||||
port = 27017
|
||||
dbname = "mydb"
|
||||
storage-format = "scala-json" # Options: java, scala-json, java-json, protobuf
|
||||
</mongodb>
|
||||
</storage>
|
||||
</akka>
|
||||
####################
|
||||
# Akka Config File #
|
||||
####################
|
||||
|
||||
# This file has all the default settings, so all these could be remove with no visible effect.
|
||||
# Modify as needed.
|
||||
|
||||
<log>
|
||||
filename = "./logs/akka.log"
|
||||
roll = "daily" # Options: never, hourly, daily, sunday/monday/...
|
||||
level = "debug" # Options: fatal, critical, error, warning, info, debug, trace
|
||||
console = on
|
||||
# syslog_host = ""
|
||||
# syslog_server_name = ""
|
||||
</log>
|
||||
|
||||
<akka>
|
||||
version = "0.6"
|
||||
|
||||
boot = ["sample.java.Boot", "sample.scala.Boot"] # FQN to the class doing initial active object/actor
|
||||
# supervisor bootstrap, should be defined in default constructor
|
||||
<actor>
|
||||
timeout = 5000 # default timeout for future based invocations
|
||||
serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
|
||||
</actor>
|
||||
|
||||
<stm>
|
||||
service = on
|
||||
restart-on-collision = off # (not implemented yet) if 'on' then it reschedules the transaction,
|
||||
# if 'off' then throws an exception or rollback for user to handle
|
||||
wait-for-completion = 100 # how long time in millis a transaction should be given time to complete when a collision is detected
|
||||
wait-nr-of-times = 3 # the number of times it should check for completion of a pending transaction upon collision
|
||||
distributed = off # not implemented yet
|
||||
</stm>
|
||||
|
||||
<remote>
|
||||
service = on
|
||||
hostname = "localhost"
|
||||
port = 9999
|
||||
connection-timeout = 1000 # in millis
|
||||
</remote>
|
||||
|
||||
<rest>
|
||||
service = on
|
||||
hostname = "localhost"
|
||||
port = 9998
|
||||
</rest>
|
||||
|
||||
<storage>
|
||||
system = "cassandra" # Options: cassandra, mongodb
|
||||
|
||||
<cassandra>
|
||||
service = on
|
||||
hostname = "127.0.0.1" # IP address or hostname of one of the Cassandra cluster's seeds
|
||||
port = 9160
|
||||
storage-format = "java" # Options: java, scala-json, java-json, protobuf
|
||||
consistency-level = 1
|
||||
</cassandra>
|
||||
|
||||
<mongodb>
|
||||
service = on
|
||||
hostname = "127.0.0.1" # IP address or hostname of the MongoDB DB instance
|
||||
port = 27017
|
||||
dbname = "mydb"
|
||||
storage-format = "scala-json" # Options: java, scala-json, java-json, protobuf
|
||||
</mongodb>
|
||||
</storage>
|
||||
</akka>
|
||||
|
|
|
|||
BIN
embedded-repo/com/mongodb/mongo/0.6/mongo-0.6.jar
Normal file
BIN
embedded-repo/com/mongodb/mongo/0.6/mongo-0.6.jar
Normal file
Binary file not shown.
|
|
@ -105,6 +105,13 @@
|
|||
<version>1.0</version>
|
||||
</dependency>
|
||||
|
||||
<!-- For Mongo -->
|
||||
<dependency>
|
||||
<groupId>com.mongodb</groupId>
|
||||
<artifactId>mongo</artifactId>
|
||||
<version>0.6</version>
|
||||
</dependency>
|
||||
|
||||
<!-- For Cassandra -->
|
||||
<dependency>
|
||||
<groupId>org.apache.cassandra</groupId>
|
||||
|
|
|
|||
|
|
@ -220,6 +220,7 @@ object ActiveObject {
|
|||
}
|
||||
|
||||
private[kernel] def newInstance[T](target: Class[T], actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
|
||||
//if (getClass.getClassLoader.getResourceAsStream("META-INF/aop.xml") != null) println("000000000000000000000 FOUND AOP")
|
||||
if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get)
|
||||
val proxy = Proxy.newInstance(target, false, true)
|
||||
actor.initialize(target, proxy)
|
||||
|
|
@ -230,6 +231,7 @@ object ActiveObject {
|
|||
}
|
||||
|
||||
private[kernel] def newInstance[T](intf: Class[T], target: AnyRef, actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
|
||||
//if (getClass.getClassLoader.getResourceAsStream("META-INF/aop.xml") != null) println("000000000000000000000 FOUND AOP")
|
||||
if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get)
|
||||
val proxy = Proxy.newInstance(Array(intf), Array(target), false, true)
|
||||
actor.initialize(target.getClass, target)
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ import org.apache.thrift.protocol._
|
|||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object CassandraStorage extends Logging {
|
||||
object CassandraStorage extends MapStorage with VectorStorage with Logging {
|
||||
val KEYSPACE = "akka"
|
||||
val MAP_COLUMN_PARENT = new ColumnParent("map", null)
|
||||
val VECTOR_COLUMN_PARENT = new ColumnParent("vector", null)
|
||||
|
|
@ -112,7 +112,7 @@ object CassandraStorage extends Logging {
|
|||
// For Vector
|
||||
// ===============================================================
|
||||
|
||||
def insertVectorStorageEntryFor(name: String, element: AnyRef) = if (sessions.isDefined) {
|
||||
override def insertVectorStorageEntryFor(name: String, element: AnyRef) = if (sessions.isDefined) {
|
||||
sessions.get.withSession {
|
||||
_ ++| (name,
|
||||
new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(getVectorStorageSizeFor(name))),
|
||||
|
|
@ -122,7 +122,10 @@ object CassandraStorage extends Logging {
|
|||
}
|
||||
} else throw new IllegalStateException("CassandraStorage is not started")
|
||||
|
||||
def getVectorStorageEntryFor(name: String, index: Int): AnyRef = if (sessions.isDefined) {
|
||||
override def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
|
||||
}
|
||||
|
||||
override def getVectorStorageEntryFor(name: String, index: Int): AnyRef = if (sessions.isDefined) {
|
||||
val column: Option[Column] = sessions.get.withSession {
|
||||
_ | (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index)))
|
||||
}
|
||||
|
|
@ -130,7 +133,7 @@ object CassandraStorage extends Logging {
|
|||
else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]")
|
||||
} else throw new IllegalStateException("CassandraStorage is not started")
|
||||
|
||||
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = if (sessions.isDefined) {
|
||||
override def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = if (sessions.isDefined) {
|
||||
val startBytes = if (start.isDefined) intToBytes(start.get) else null
|
||||
val finishBytes = if (finish.isDefined) intToBytes(finish.get) else null
|
||||
val columns: List[Column] = sessions.get.withSession {
|
||||
|
|
@ -144,7 +147,7 @@ object CassandraStorage extends Logging {
|
|||
columns.map(column => serializer.in(column.value, None))
|
||||
} else throw new IllegalStateException("CassandraStorage is not started")
|
||||
|
||||
def getVectorStorageSizeFor(name: String): Int = if (sessions.isDefined) {
|
||||
override def getVectorStorageSizeFor(name: String): Int = if (sessions.isDefined) {
|
||||
sessions.get.withSession {
|
||||
_ |# (name, VECTOR_COLUMN_PARENT)
|
||||
}
|
||||
|
|
@ -154,7 +157,7 @@ object CassandraStorage extends Logging {
|
|||
// For Map
|
||||
// ===============================================================
|
||||
|
||||
def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = if (sessions.isDefined) {
|
||||
override def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = if (sessions.isDefined) {
|
||||
sessions.get.withSession {
|
||||
_ ++| (name,
|
||||
new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)),
|
||||
|
|
@ -164,7 +167,7 @@ object CassandraStorage extends Logging {
|
|||
}
|
||||
} else throw new IllegalStateException("CassandraStorage is not started")
|
||||
|
||||
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = if (sessions.isDefined) {
|
||||
override def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = if (sessions.isDefined) {
|
||||
val cf2columns: java.util.Map[String, java.util.List[Column]] = new java.util.HashMap
|
||||
for (entry <- entries) {
|
||||
val columns: java.util.List[Column] = new java.util.ArrayList
|
||||
|
|
@ -176,7 +179,7 @@ object CassandraStorage extends Logging {
|
|||
}
|
||||
} else throw new IllegalStateException("CassandraStorage is not started")
|
||||
|
||||
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = if (sessions.isDefined) {
|
||||
override def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = if (sessions.isDefined) {
|
||||
try {
|
||||
val column: Option[Column] = sessions.get.withSession {
|
||||
_ | (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)))
|
||||
|
|
@ -190,7 +193,7 @@ object CassandraStorage extends Logging {
|
|||
}
|
||||
} else throw new IllegalStateException("CassandraStorage is not started")
|
||||
|
||||
def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) {
|
||||
override def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) {
|
||||
throw new UnsupportedOperationException
|
||||
/*
|
||||
val columns = server.get_columns_since(name, MAP_COLUMN_FAMILY, -1)
|
||||
|
|
@ -202,15 +205,15 @@ object CassandraStorage extends Logging {
|
|||
*/
|
||||
} else throw new IllegalStateException("CassandraStorage is not started")
|
||||
|
||||
def getMapStorageSizeFor(name: String): Int = if (sessions.isDefined) {
|
||||
override def getMapStorageSizeFor(name: String): Int = if (sessions.isDefined) {
|
||||
sessions.get.withSession {
|
||||
_ |# (name, MAP_COLUMN_PARENT)
|
||||
}
|
||||
} else throw new IllegalStateException("CassandraStorage is not started")
|
||||
|
||||
def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null)
|
||||
override def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null)
|
||||
|
||||
def removeMapStorageFor(name: String, key: AnyRef): Unit = if (sessions.isDefined) {
|
||||
override def removeMapStorageFor(name: String, key: AnyRef): Unit = if (sessions.isDefined) {
|
||||
val keyBytes = if (key == null) null else serializer.out(key)
|
||||
sessions.get.withSession {
|
||||
_ -- (name,
|
||||
|
|
@ -220,7 +223,7 @@ object CassandraStorage extends Logging {
|
|||
}
|
||||
} else throw new IllegalStateException("CassandraStorage is not started")
|
||||
|
||||
def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int):
|
||||
override def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int):
|
||||
List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) {
|
||||
val startBytes = if (start.isDefined) serializer.out(start.get) else null
|
||||
val finishBytes = if (finish.isDefined) serializer.out(finish.get) else null
|
||||
|
|
|
|||
258
kernel/src/main/scala/state/MongoStorage.scala
Normal file
258
kernel/src/main/scala/state/MongoStorage.scala
Normal file
|
|
@ -0,0 +1,258 @@
|
|||
package se.scalablesolutions.akka.kernel.state
|
||||
|
||||
import com.mongodb._
|
||||
import se.scalablesolutions.akka.kernel.util.Logging
|
||||
import serialization.{Serializer}
|
||||
import kernel.Kernel.config
|
||||
|
||||
import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList}
|
||||
|
||||
object MongoStorage extends MapStorage
|
||||
with VectorStorage with Logging {
|
||||
|
||||
// enrich with null safe findOne
|
||||
class RichDBCollection(value: DBCollection) {
|
||||
def findOneNS(o: DBObject): Option[DBObject] = {
|
||||
value.findOne(o) match {
|
||||
case null => None
|
||||
case x => Some(x)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
implicit def enrichDBCollection(c: DBCollection) = new RichDBCollection(c)
|
||||
|
||||
val KEY = "key"
|
||||
val VALUE = "value"
|
||||
val COLLECTION = "akka_coll"
|
||||
val MONGODB_SERVER_HOSTNAME =
|
||||
config.getString("akka.storage.mongodb.hostname", "127.0.0.1")
|
||||
val MONGODB_SERVER_DBNAME =
|
||||
config.getString("akka.storage.mongodb.dbname", "testdb")
|
||||
val MONGODB_SERVER_PORT =
|
||||
config.getInt("akka.storage.mongodb.port", 27017)
|
||||
|
||||
val db = new Mongo(MONGODB_SERVER_HOSTNAME,
|
||||
MONGODB_SERVER_PORT, MONGODB_SERVER_DBNAME)
|
||||
val coll = db.getCollection(COLLECTION)
|
||||
|
||||
// @fixme: make this pluggable
|
||||
private[this] val serializer: Serializer = Serializer.ScalaJSON
|
||||
|
||||
override def insertMapStorageEntryFor(name: String,
|
||||
key: AnyRef, value: AnyRef) {
|
||||
insertMapStorageEntriesFor(name, List((key, value)))
|
||||
}
|
||||
|
||||
override def insertMapStorageEntriesFor(name: String,
|
||||
entries: List[Tuple2[AnyRef, AnyRef]]) {
|
||||
import java.util.{Map, HashMap}
|
||||
|
||||
val m: Map[AnyRef, AnyRef] = new HashMap
|
||||
for ((k, v) <- entries) {
|
||||
m.put(k, serializer.out(v))
|
||||
}
|
||||
|
||||
nullSafeFindOne(name) match {
|
||||
case None =>
|
||||
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, m))
|
||||
case Some(dbo) => {
|
||||
// collate the maps
|
||||
val o = dbo.get(VALUE).asInstanceOf[Map[AnyRef, AnyRef]]
|
||||
o.putAll(m)
|
||||
|
||||
// remove existing reference
|
||||
removeMapStorageFor(name)
|
||||
// and insert
|
||||
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, o))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def removeMapStorageFor(name: String) = {
|
||||
val q = new BasicDBObject
|
||||
q.put(KEY, name)
|
||||
coll.remove(q)
|
||||
}
|
||||
|
||||
override def removeMapStorageFor(name: String, key: AnyRef) = {
|
||||
nullSafeFindOne(name) match {
|
||||
case None =>
|
||||
case Some(dbo) => {
|
||||
val orig = dbo.get(VALUE).asInstanceOf[DBObject].toMap
|
||||
orig.remove(key.asInstanceOf[String])
|
||||
|
||||
// remove existing reference
|
||||
removeMapStorageFor(name)
|
||||
// and insert
|
||||
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, orig))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def getMapStorageEntryFor(name: String,
|
||||
key: AnyRef): Option[AnyRef] = {
|
||||
getValueForKey(name, key.asInstanceOf[String])
|
||||
}
|
||||
|
||||
override def getMapStorageSizeFor(name: String): Int = {
|
||||
nullSafeFindOne(name) match {
|
||||
case None => 0
|
||||
case Some(dbo) =>
|
||||
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]].keySet.size
|
||||
}
|
||||
}
|
||||
|
||||
override def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = {
|
||||
val m =
|
||||
nullSafeFindOne(name) match {
|
||||
case None =>
|
||||
throw new Predef.NoSuchElementException(name + " not present")
|
||||
case Some(dbo) =>
|
||||
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
|
||||
}
|
||||
val n =
|
||||
List(m.keySet.toArray: _*).asInstanceOf[List[String]]
|
||||
val vals =
|
||||
for(s <- n)
|
||||
yield (s, serializer.in(m.get(s).asInstanceOf[Array[Byte]], None))
|
||||
vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
|
||||
}
|
||||
|
||||
override def getMapStorageRangeFor(name: String, start: Option[AnyRef],
|
||||
finish: Option[AnyRef],
|
||||
count: Int): List[Tuple2[AnyRef, AnyRef]] = {
|
||||
val m =
|
||||
nullSafeFindOne(name) match {
|
||||
case None =>
|
||||
throw new Predef.NoSuchElementException(name + " not present")
|
||||
case Some(dbo) =>
|
||||
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
|
||||
}
|
||||
|
||||
/**
|
||||
* <tt>count</tt> is the max number of results to return. Start with
|
||||
* <tt>start</tt> or 0 (if <tt>start</tt> is not defined) and go until
|
||||
* you hit <tt>finish</tt> or <tt>count</tt>.
|
||||
*/
|
||||
val s = if (start.isDefined) start.get.asInstanceOf[Int] else 0
|
||||
val cnt =
|
||||
if (finish.isDefined) {
|
||||
val f = finish.get.asInstanceOf[Int]
|
||||
if (f >= s) Math.min(count, (f - s)) else count
|
||||
}
|
||||
else count
|
||||
|
||||
val n =
|
||||
List(m.keySet.toArray: _*).asInstanceOf[List[String]].sort((e1, e2) => (e1 compareTo e2) < 0).slice(s, s + cnt)
|
||||
val vals =
|
||||
for(s <- n)
|
||||
yield (s, serializer.in(m.get(s).asInstanceOf[Array[Byte]], None))
|
||||
vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
|
||||
}
|
||||
|
||||
private def getValueForKey(name: String, key: String): Option[AnyRef] = {
|
||||
try {
|
||||
nullSafeFindOne(name) match {
|
||||
case None => None
|
||||
case Some(dbo) =>
|
||||
Some(serializer.in(
|
||||
dbo.get(VALUE)
|
||||
.asInstanceOf[JMap[String, AnyRef]]
|
||||
.get(key).asInstanceOf[Array[Byte]], None))
|
||||
}
|
||||
} catch {
|
||||
case e =>
|
||||
throw new Predef.NoSuchElementException(e.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
override def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
|
||||
val q = new BasicDBObject
|
||||
q.put(KEY, name)
|
||||
|
||||
val currentList =
|
||||
coll.findOneNS(q) match {
|
||||
case None =>
|
||||
new JArrayList[AnyRef]
|
||||
case Some(dbo) =>
|
||||
dbo.get(VALUE).asInstanceOf[JArrayList[AnyRef]]
|
||||
}
|
||||
if (!currentList.isEmpty) {
|
||||
// record exists
|
||||
// remove before adding
|
||||
coll.remove(q)
|
||||
}
|
||||
|
||||
// add to the current list
|
||||
elements.map(serializer.out(_)).foreach(currentList.add(_))
|
||||
|
||||
coll.insert(
|
||||
new BasicDBObject()
|
||||
.append(KEY, name)
|
||||
.append(VALUE, currentList)
|
||||
)
|
||||
}
|
||||
|
||||
override def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
|
||||
insertVectorStorageEntriesFor(name, List(element))
|
||||
}
|
||||
|
||||
override def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
|
||||
try {
|
||||
val o =
|
||||
nullSafeFindOne(name) match {
|
||||
case None =>
|
||||
throw new Predef.NoSuchElementException(name + " not present")
|
||||
|
||||
case Some(dbo) =>
|
||||
dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
|
||||
}
|
||||
serializer.in(
|
||||
o.get(index).asInstanceOf[Array[Byte]],
|
||||
None
|
||||
)
|
||||
} catch {
|
||||
case e =>
|
||||
throw new Predef.NoSuchElementException(e.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
override def getVectorStorageRangeFor(name: String,
|
||||
start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
|
||||
try {
|
||||
val o =
|
||||
nullSafeFindOne(name) match {
|
||||
case None =>
|
||||
throw new Predef.NoSuchElementException(name + " not present")
|
||||
|
||||
case Some(dbo) =>
|
||||
dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
|
||||
}
|
||||
|
||||
// pick the subrange and make a Scala list
|
||||
val l =
|
||||
List(o.subList(start.get, start.get + count).toArray: _*)
|
||||
|
||||
for(e <- l)
|
||||
yield serializer.in(e.asInstanceOf[Array[Byte]], None)
|
||||
} catch {
|
||||
case e =>
|
||||
throw new Predef.NoSuchElementException(e.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
override def getVectorStorageSizeFor(name: String): Int = {
|
||||
nullSafeFindOne(name) match {
|
||||
case None => 0
|
||||
case Some(dbo) =>
|
||||
dbo.get(VALUE).asInstanceOf[JList[AnyRef]].size
|
||||
}
|
||||
}
|
||||
|
||||
private def nullSafeFindOne(name: String): Option[DBObject] = {
|
||||
val o = new BasicDBObject
|
||||
o.put(KEY, name)
|
||||
coll.findOneNS(o)
|
||||
}
|
||||
}
|
||||
|
|
@ -16,6 +16,7 @@ abstract class PersistentStorageConfig extends TransactionalStateConfig
|
|||
case class CassandraStorageConfig extends PersistentStorageConfig
|
||||
case class TerracottaStorageConfig extends PersistentStorageConfig
|
||||
case class TokyoCabinetStorageConfig extends PersistentStorageConfig
|
||||
case class MongoStorageConfig extends PersistentStorageConfig
|
||||
|
||||
/**
|
||||
* Scala API.
|
||||
|
|
@ -39,12 +40,14 @@ object TransactionalState extends TransactionalState
|
|||
class TransactionalState {
|
||||
def newPersistentMap(config: PersistentStorageConfig): TransactionalMap[AnyRef, AnyRef] = config match {
|
||||
case CassandraStorageConfig() => new CassandraPersistentTransactionalMap
|
||||
case MongoStorageConfig() => new MongoPersistentTransactionalMap
|
||||
case TerracottaStorageConfig() => throw new UnsupportedOperationException
|
||||
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
|
||||
}
|
||||
|
||||
def newPersistentVector(config: PersistentStorageConfig): TransactionalVector[AnyRef] = config match {
|
||||
case CassandraStorageConfig() => new CassandraPersistentTransactionalVector
|
||||
case MongoStorageConfig() => new MongoPersistentTransactionalVector
|
||||
case TerracottaStorageConfig() => throw new UnsupportedOperationException
|
||||
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
|
||||
}
|
||||
|
|
@ -194,16 +197,23 @@ abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
|
|||
}
|
||||
|
||||
/**
|
||||
* Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage.
|
||||
* Implementation of <tt>PersistentTransactionalMap</tt> for every concrete
|
||||
* storage will have the same workflow. This abstracts the workflow.
|
||||
*
|
||||
* Subclasses just need to provide the actual concrete instance for the
|
||||
* abstract val <tt>storage</tt>.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[AnyRef, AnyRef] {
|
||||
abstract class TemplatePersistentTransactionalMap extends PersistentTransactionalMap[AnyRef, AnyRef] {
|
||||
|
||||
// to be concretized in subclasses
|
||||
val storage: MapStorage
|
||||
|
||||
override def remove(key: AnyRef) = {
|
||||
verifyTransaction
|
||||
if (changeSet.contains(key)) changeSet -= key
|
||||
else CassandraStorage.removeMapStorageFor(uuid, key)
|
||||
else storage.removeMapStorageFor(uuid, key)
|
||||
}
|
||||
|
||||
override def getRange(start: Option[AnyRef], count: Int) =
|
||||
|
|
@ -212,7 +222,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
|
|||
def getRange(start: Option[AnyRef], finish: Option[AnyRef], count: Int) = {
|
||||
verifyTransaction
|
||||
try {
|
||||
CassandraStorage.getMapStorageRangeFor(uuid, start, finish, count)
|
||||
storage.getMapStorageRangeFor(uuid, start, finish, count)
|
||||
} catch {
|
||||
case e: Exception => Nil
|
||||
}
|
||||
|
|
@ -220,7 +230,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
|
|||
|
||||
// ---- For Transactional ----
|
||||
override def commit = {
|
||||
CassandraStorage.insertMapStorageEntriesFor(uuid, changeSet.toList)
|
||||
storage.insertMapStorageEntriesFor(uuid, changeSet.toList)
|
||||
changeSet.clear
|
||||
}
|
||||
|
||||
|
|
@ -228,7 +238,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
|
|||
override def clear = {
|
||||
verifyTransaction
|
||||
try {
|
||||
CassandraStorage.removeMapStorageFor(uuid)
|
||||
storage.removeMapStorageFor(uuid)
|
||||
} catch {
|
||||
case e: Exception => {}
|
||||
}
|
||||
|
|
@ -237,7 +247,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
|
|||
override def contains(key: AnyRef): Boolean = {
|
||||
try {
|
||||
verifyTransaction
|
||||
CassandraStorage.getMapStorageEntryFor(uuid, key).isDefined
|
||||
storage.getMapStorageEntryFor(uuid, key).isDefined
|
||||
} catch {
|
||||
case e: Exception => false
|
||||
}
|
||||
|
|
@ -246,7 +256,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
|
|||
override def size: Int = {
|
||||
verifyTransaction
|
||||
try {
|
||||
CassandraStorage.getMapStorageSizeFor(uuid)
|
||||
storage.getMapStorageSizeFor(uuid)
|
||||
} catch {
|
||||
case e: Exception => 0
|
||||
}
|
||||
|
|
@ -258,7 +268,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
|
|||
// if (changeSet.contains(key)) changeSet.get(key)
|
||||
// else {
|
||||
val result = try {
|
||||
CassandraStorage.getMapStorageEntryFor(uuid, key)
|
||||
storage.getMapStorageEntryFor(uuid, key)
|
||||
} catch {
|
||||
case e: Exception => None
|
||||
}
|
||||
|
|
@ -270,7 +280,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
|
|||
//verifyTransaction
|
||||
new Iterator[Tuple2[AnyRef, AnyRef]] {
|
||||
private val originalList: List[Tuple2[AnyRef, AnyRef]] = try {
|
||||
CassandraStorage.getMapStorageFor(uuid)
|
||||
storage.getMapStorageFor(uuid)
|
||||
} catch {
|
||||
case e: Throwable => Nil
|
||||
}
|
||||
|
|
@ -285,6 +295,25 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage.
|
||||
*
|
||||
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||
*/
|
||||
class CassandraPersistentTransactionalMap extends TemplatePersistentTransactionalMap {
|
||||
val storage = CassandraStorage
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements a persistent transactional map based on the MongoDB distributed P2P key-value storage.
|
||||
*
|
||||
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||
*/
|
||||
class MongoPersistentTransactionalMap extends TemplatePersistentTransactionalMap {
|
||||
val storage = MongoStorage
|
||||
}
|
||||
|
||||
/**
|
||||
* Base for all transactional vector implementations.
|
||||
*
|
||||
|
|
@ -382,17 +411,19 @@ abstract class PersistentTransactionalVector[T] extends TransactionalVector[T] {
|
|||
}
|
||||
|
||||
/**
|
||||
* Implements a persistent transactional vector based on the Cassandra distributed P2P key-value storage.
|
||||
* Implements a template for a concrete persistent transactional vector based storage.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||
*/
|
||||
class CassandraPersistentTransactionalVector extends PersistentTransactionalVector[AnyRef] {
|
||||
abstract class TemplatePersistentTransactionalVector extends PersistentTransactionalVector[AnyRef] {
|
||||
|
||||
val storage: VectorStorage
|
||||
|
||||
// ---- For TransactionalVector ----
|
||||
override def get(index: Int): AnyRef = {
|
||||
verifyTransaction
|
||||
if (changeSet.size > index) changeSet(index)
|
||||
else CassandraStorage.getVectorStorageEntryFor(uuid, index)
|
||||
else storage.getVectorStorageEntryFor(uuid, index)
|
||||
}
|
||||
|
||||
override def getRange(start: Int, count: Int): List[AnyRef] =
|
||||
|
|
@ -400,12 +431,12 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect
|
|||
|
||||
def getRange(start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
|
||||
verifyTransaction
|
||||
CassandraStorage.getVectorStorageRangeFor(uuid, start, finish, count)
|
||||
storage.getVectorStorageRangeFor(uuid, start, finish, count)
|
||||
}
|
||||
|
||||
override def length: Int = {
|
||||
verifyTransaction
|
||||
CassandraStorage.getVectorStorageSizeFor(uuid)
|
||||
storage.getVectorStorageSizeFor(uuid)
|
||||
}
|
||||
|
||||
override def apply(index: Int): AnyRef = get(index)
|
||||
|
|
@ -422,11 +453,29 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect
|
|||
// ---- For Transactional ----
|
||||
override def commit = {
|
||||
// FIXME: should use batch function once the bug is resolved
|
||||
for (element <- changeSet) CassandraStorage.insertVectorStorageEntryFor(uuid, element)
|
||||
for (element <- changeSet) storage.insertVectorStorageEntryFor(uuid, element)
|
||||
changeSet.clear
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements a persistent transactional vector based on the Cassandra distributed P2P key-value storage.
|
||||
*
|
||||
* @author <a href="http://debasishg.blogspot.com">Debaissh Ghosh</a>
|
||||
*/
|
||||
class CassandraPersistentTransactionalVector extends TemplatePersistentTransactionalVector {
|
||||
val storage = CassandraStorage
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements a persistent transactional vector based on the MongoDB distributed P2P key-value storage.
|
||||
*
|
||||
* @author <a href="http://debasishg.blogspot.com">Debaissh Ghosh</a>
|
||||
*/
|
||||
class MongoPersistentTransactionalVector extends TemplatePersistentTransactionalVector {
|
||||
val storage = MongoStorage
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements a transactional reference.
|
||||
*
|
||||
|
|
|
|||
27
kernel/src/main/scala/state/Storage.scala
Normal file
27
kernel/src/main/scala/state/Storage.scala
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
package se.scalablesolutions.akka.kernel.state
|
||||
|
||||
// abstracts persistence storage
|
||||
trait Storage {
|
||||
}
|
||||
|
||||
// for Maps
|
||||
trait MapStorage extends Storage {
|
||||
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]])
|
||||
def insertMapStorageEntryFor(name: String, key: AnyRef, value: AnyRef)
|
||||
def removeMapStorageFor(name: String)
|
||||
def removeMapStorageFor(name: String, key: AnyRef)
|
||||
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef]
|
||||
def getMapStorageSizeFor(name: String): Int
|
||||
def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]]
|
||||
def getMapStorageRangeFor(name: String, start: Option[AnyRef],
|
||||
finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]]
|
||||
}
|
||||
|
||||
// for vectors
|
||||
trait VectorStorage extends Storage {
|
||||
def insertVectorStorageEntryFor(name: String, element: AnyRef)
|
||||
def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef])
|
||||
def getVectorStorageEntryFor(name: String, index: Int): AnyRef
|
||||
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef]
|
||||
def getVectorStorageSizeFor(name: String): Int
|
||||
}
|
||||
272
kernel/src/test/scala/MongoStorageSpec.scala
Normal file
272
kernel/src/test/scala/MongoStorageSpec.scala
Normal file
|
|
@ -0,0 +1,272 @@
|
|||
package se.scalablesolutions.akka.kernel.state
|
||||
|
||||
import junit.framework.TestCase
|
||||
|
||||
import org.junit.{Test, Before}
|
||||
import org.junit.Assert._
|
||||
|
||||
class MongoStorageSpec extends TestCase {
|
||||
|
||||
val changeSetV = new scala.collection.mutable.ArrayBuffer[AnyRef]
|
||||
val changeSetM = new scala.collection.mutable.HashMap[AnyRef, AnyRef]
|
||||
|
||||
override def setUp = {
|
||||
MongoStorage.coll.drop
|
||||
}
|
||||
|
||||
@Test
|
||||
def testVectorInsertForTransactionId = {
|
||||
changeSetV += "debasish" // string
|
||||
changeSetV += List(1, 2, 3) // Scala List
|
||||
changeSetV += List(100, 200)
|
||||
MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
|
||||
assertEquals(
|
||||
3,
|
||||
MongoStorage.getVectorStorageSizeFor("U-A1"))
|
||||
changeSetV.clear
|
||||
|
||||
// changeSetV should be reinitialized
|
||||
changeSetV += List(12, 23, 45)
|
||||
changeSetV += "maulindu"
|
||||
MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
|
||||
assertEquals(
|
||||
5,
|
||||
MongoStorage.getVectorStorageSizeFor("U-A1"))
|
||||
|
||||
// add more to the same changeSetV
|
||||
changeSetV += "ramanendu"
|
||||
changeSetV += Map(1 -> "dg", 2 -> "mc")
|
||||
|
||||
// add for a diff transaction
|
||||
MongoStorage.insertVectorStorageEntriesFor("U-A2", changeSetV.toList)
|
||||
assertEquals(
|
||||
4,
|
||||
MongoStorage.getVectorStorageSizeFor("U-A2"))
|
||||
|
||||
// previous transaction change set should remain same
|
||||
assertEquals(
|
||||
5,
|
||||
MongoStorage.getVectorStorageSizeFor("U-A1"))
|
||||
|
||||
// test single element entry
|
||||
MongoStorage.insertVectorStorageEntryFor("U-A1", Map(1->1, 2->4, 3->9))
|
||||
assertEquals(
|
||||
6,
|
||||
MongoStorage.getVectorStorageSizeFor("U-A1"))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testVectorFetchForKeys = {
|
||||
|
||||
// initially everything 0
|
||||
assertEquals(
|
||||
0,
|
||||
MongoStorage.getVectorStorageSizeFor("U-A2"))
|
||||
|
||||
assertEquals(
|
||||
0,
|
||||
MongoStorage.getVectorStorageSizeFor("U-A1"))
|
||||
|
||||
// get some stuff
|
||||
changeSetV += "debasish"
|
||||
changeSetV += List(12, 13, 14)
|
||||
MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
|
||||
|
||||
assertEquals(
|
||||
2,
|
||||
MongoStorage.getVectorStorageSizeFor("U-A1"))
|
||||
|
||||
assertEquals(
|
||||
"debasish",
|
||||
MongoStorage.getVectorStorageEntryFor("U-A1", 0).asInstanceOf[String])
|
||||
|
||||
assertEquals(
|
||||
List(12, 13, 14),
|
||||
MongoStorage.getVectorStorageEntryFor("U-A1", 1).asInstanceOf[List[Int]])
|
||||
|
||||
changeSetV.clear
|
||||
changeSetV += Map(1->1, 2->4, 3->9)
|
||||
changeSetV += BigInt(2310)
|
||||
changeSetV += List(100, 200, 300)
|
||||
MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
|
||||
|
||||
assertEquals(
|
||||
5,
|
||||
MongoStorage.getVectorStorageSizeFor("U-A1"))
|
||||
|
||||
val r =
|
||||
MongoStorage.getVectorStorageRangeFor("U-A1", Some(1), None, 3)
|
||||
|
||||
assertEquals(3, r.size)
|
||||
assertEquals(List(12, 13, 14), r(0).asInstanceOf[List[Int]])
|
||||
}
|
||||
|
||||
@Test
|
||||
def testVectorFetchForNonExistentKeys = {
|
||||
try {
|
||||
MongoStorage.getVectorStorageEntryFor("U-A1", 1)
|
||||
fail("should throw an exception")
|
||||
} catch {case e: Predef.NoSuchElementException => {}}
|
||||
|
||||
try {
|
||||
MongoStorage.getVectorStorageRangeFor("U-A1", Some(2), None, 12)
|
||||
fail("should throw an exception")
|
||||
} catch {case e: Predef.NoSuchElementException => {}}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testMapInsertForTransactionId = {
|
||||
case class Foo(no: Int, name: String)
|
||||
fillMap
|
||||
|
||||
// add some more to changeSet
|
||||
changeSetM += "5" -> Foo(12, "dg")
|
||||
changeSetM += "6" -> java.util.Calendar.getInstance.getTime
|
||||
|
||||
// insert all into Mongo
|
||||
MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
|
||||
assertEquals(
|
||||
6,
|
||||
MongoStorage.getMapStorageSizeFor("U-M1"))
|
||||
|
||||
// individual insert api
|
||||
MongoStorage.insertMapStorageEntryFor("U-M1", "7", "akka")
|
||||
MongoStorage.insertMapStorageEntryFor("U-M1", "8", List(23, 25))
|
||||
assertEquals(
|
||||
8,
|
||||
MongoStorage.getMapStorageSizeFor("U-M1"))
|
||||
|
||||
// add the same changeSet for another transaction
|
||||
MongoStorage.insertMapStorageEntriesFor("U-M2", changeSetM.toList)
|
||||
assertEquals(
|
||||
6,
|
||||
MongoStorage.getMapStorageSizeFor("U-M2"))
|
||||
|
||||
// the first transaction should remain the same
|
||||
assertEquals(
|
||||
8,
|
||||
MongoStorage.getMapStorageSizeFor("U-M1"))
|
||||
changeSetM.clear
|
||||
}
|
||||
|
||||
@Test
|
||||
def testMapContents = {
|
||||
fillMap
|
||||
MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
|
||||
MongoStorage.getMapStorageEntryFor("U-M1", "2") match {
|
||||
case Some(x) => assertEquals("peter", x.asInstanceOf[String])
|
||||
case None => fail("should fetch peter")
|
||||
}
|
||||
MongoStorage.getMapStorageEntryFor("U-M1", "4") match {
|
||||
case Some(x) => assertEquals(3, x.asInstanceOf[List[Int]].size)
|
||||
case None => fail("should fetch list")
|
||||
}
|
||||
MongoStorage.getMapStorageEntryFor("U-M1", "3") match {
|
||||
case Some(x) => assertEquals(2, x.asInstanceOf[List[Int]].size)
|
||||
case None => fail("should fetch list")
|
||||
}
|
||||
|
||||
// get the entire map
|
||||
val l: List[Tuple2[AnyRef, AnyRef]] =
|
||||
MongoStorage.getMapStorageFor("U-M1")
|
||||
|
||||
assertEquals(4, l.size)
|
||||
assertTrue(l.map(_._1).contains("1"))
|
||||
assertTrue(l.map(_._1).contains("2"))
|
||||
assertTrue(l.map(_._1).contains("3"))
|
||||
assertTrue(l.map(_._1).contains("4"))
|
||||
|
||||
assertTrue(l.map(_._2).contains("john"))
|
||||
|
||||
// trying to fetch for a non-existent transaction will throw
|
||||
try {
|
||||
MongoStorage.getMapStorageFor("U-M2")
|
||||
fail("should throw an exception")
|
||||
} catch {case e: Predef.NoSuchElementException => {}}
|
||||
|
||||
changeSetM.clear
|
||||
}
|
||||
|
||||
@Test
|
||||
def testMapContentsByRange = {
|
||||
fillMap
|
||||
changeSetM += "5" -> Map(1 -> "dg", 2 -> "mc")
|
||||
MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
|
||||
|
||||
// specify start and count
|
||||
val l: List[Tuple2[AnyRef, AnyRef]] =
|
||||
MongoStorage.getMapStorageRangeFor(
|
||||
"U-M1", Some(Integer.valueOf(2)), None, 3)
|
||||
|
||||
assertEquals(3, l.size)
|
||||
assertEquals("3", l(0)._1.asInstanceOf[String])
|
||||
assertEquals(List(100, 200), l(0)._2.asInstanceOf[List[Int]])
|
||||
assertEquals("4", l(1)._1.asInstanceOf[String])
|
||||
assertEquals(List(10, 20, 30), l(1)._2.asInstanceOf[List[Int]])
|
||||
|
||||
// specify start, finish and count where finish - start == count
|
||||
assertEquals(3,
|
||||
MongoStorage.getMapStorageRangeFor(
|
||||
"U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(5)), 3).size)
|
||||
|
||||
// specify start, finish and count where finish - start > count
|
||||
assertEquals(3,
|
||||
MongoStorage.getMapStorageRangeFor(
|
||||
"U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(9)), 3).size)
|
||||
|
||||
// do not specify start or finish
|
||||
assertEquals(3,
|
||||
MongoStorage.getMapStorageRangeFor(
|
||||
"U-M1", None, None, 3).size)
|
||||
|
||||
// specify finish and count
|
||||
assertEquals(3,
|
||||
MongoStorage.getMapStorageRangeFor(
|
||||
"U-M1", None, Some(Integer.valueOf(3)), 3).size)
|
||||
|
||||
// specify start, finish and count where finish < start
|
||||
assertEquals(3,
|
||||
MongoStorage.getMapStorageRangeFor(
|
||||
"U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(1)), 3).size)
|
||||
|
||||
changeSetM.clear
|
||||
}
|
||||
|
||||
@Test
|
||||
def testMapStorageRemove = {
|
||||
fillMap
|
||||
changeSetM += "5" -> Map(1 -> "dg", 2 -> "mc")
|
||||
|
||||
MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
|
||||
assertEquals(5,
|
||||
MongoStorage.getMapStorageSizeFor("U-M1"))
|
||||
|
||||
// remove key "3"
|
||||
MongoStorage.removeMapStorageFor("U-M1", "3")
|
||||
assertEquals(4,
|
||||
MongoStorage.getMapStorageSizeFor("U-M1"))
|
||||
|
||||
try {
|
||||
MongoStorage.getMapStorageEntryFor("U-M1", "3")
|
||||
fail("should throw exception")
|
||||
} catch { case e => {}}
|
||||
|
||||
// remove the whole stuff
|
||||
MongoStorage.removeMapStorageFor("U-M1")
|
||||
|
||||
try {
|
||||
MongoStorage.getMapStorageFor("U-M1")
|
||||
fail("should throw exception")
|
||||
} catch { case e: NoSuchElementException => {}}
|
||||
|
||||
changeSetM.clear
|
||||
}
|
||||
|
||||
private def fillMap = {
|
||||
changeSetM += "1" -> "john"
|
||||
changeSetM += "2" -> "peter"
|
||||
changeSetM += "3" -> List(100, 200)
|
||||
changeSetM += "4" -> List(10, 20, 30)
|
||||
changeSetM
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue