diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index bec2562066..569f9fd9c5 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -1,68 +1,68 @@
-####################
-# Akka Config File #
-####################
-
-# This file has all the default settings, so all these could be remove with no visible effect.
-# Modify as needed.
-
-
- filename = "./logs/akka.log"
- roll = "daily" # Options: never, hourly, daily, sunday/monday/...
- level = "debug" # Options: fatal, critical, error, warning, info, debug, trace
- console = on
- # syslog_host = ""
- # syslog_server_name = ""
-
-
-
- version = "0.6"
-
- boot = ["sample.java.Boot", "sample.scala.Boot"] # FQN to the class doing initial active object/actor
- # supervisor bootstrap, should be defined in default constructor
-
- timeout = 5000 # default timeout for future based invocations
- serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
-
-
-
- service = on
- restart-on-collision = off # (not implemented yet) if 'on' then it reschedules the transaction,
- # if 'off' then throws an exception or rollback for user to handle
- wait-for-completion = 100 # how long time in millis a transaction should be given time to complete when a collision is detected
- wait-nr-of-times = 3 # the number of times it should check for completion of a pending transaction upon collision
- distributed = off # not implemented yet
-
-
-
- service = on
- hostname = "localhost"
- port = 9999
- connection-timeout = 1000 # in millis
-
-
-
- service = on
- hostname = "localhost"
- port = 9998
-
-
-
- system = "cassandra" # Options: cassandra, mongodb
-
-
- service = on
- hostname = "127.0.0.1" # IP address or hostname of one of the Cassandra cluster's seeds
- port = 9160
- storage-format = "java" # Options: java, scala-json, java-json, protobuf
- consistency-level = 1
-
-
-
- service = on
- hostname = "127.0.0.1" # IP address or hostname of the MongoDB DB instance
- port = 27017
- dbname = "mydb"
- storage-format = "scala-json" # Options: java, scala-json, java-json, protobuf
-
-
-
\ No newline at end of file
+####################
+# Akka Config File #
+####################
+
+# This file has all the default settings, so all these could be remove with no visible effect.
+# Modify as needed.
+
+
+ filename = "./logs/akka.log"
+ roll = "daily" # Options: never, hourly, daily, sunday/monday/...
+ level = "debug" # Options: fatal, critical, error, warning, info, debug, trace
+ console = on
+ # syslog_host = ""
+ # syslog_server_name = ""
+
+
+
+ version = "0.6"
+
+ boot = ["sample.java.Boot", "sample.scala.Boot"] # FQN to the class doing initial active object/actor
+ # supervisor bootstrap, should be defined in default constructor
+
+ timeout = 5000 # default timeout for future based invocations
+ serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
+
+
+
+ service = on
+ restart-on-collision = off # (not implemented yet) if 'on' then it reschedules the transaction,
+ # if 'off' then throws an exception or rollback for user to handle
+ wait-for-completion = 100 # how long time in millis a transaction should be given time to complete when a collision is detected
+ wait-nr-of-times = 3 # the number of times it should check for completion of a pending transaction upon collision
+ distributed = off # not implemented yet
+
+
+
+ service = on
+ hostname = "localhost"
+ port = 9999
+ connection-timeout = 1000 # in millis
+
+
+
+ service = on
+ hostname = "localhost"
+ port = 9998
+
+
+
+ system = "cassandra" # Options: cassandra, mongodb
+
+
+ service = on
+ hostname = "127.0.0.1" # IP address or hostname of one of the Cassandra cluster's seeds
+ port = 9160
+ storage-format = "java" # Options: java, scala-json, java-json, protobuf
+ consistency-level = 1
+
+
+
+ service = on
+ hostname = "127.0.0.1" # IP address or hostname of the MongoDB DB instance
+ port = 27017
+ dbname = "mydb"
+ storage-format = "scala-json" # Options: java, scala-json, java-json, protobuf
+
+
+
diff --git a/embedded-repo/com/mongodb/mongo/0.6/mongo-0.6.jar b/embedded-repo/com/mongodb/mongo/0.6/mongo-0.6.jar
new file mode 100644
index 0000000000..444a5c6667
Binary files /dev/null and b/embedded-repo/com/mongodb/mongo/0.6/mongo-0.6.jar differ
diff --git a/kernel/pom.xml b/kernel/pom.xml
index 96a85bb27d..04b6fcc144 100644
--- a/kernel/pom.xml
+++ b/kernel/pom.xml
@@ -105,6 +105,13 @@
1.0
+
+
+ com.mongodb
+ mongo
+ 0.6
+
+
org.apache.cassandra
diff --git a/kernel/src/main/scala/actor/ActiveObject.scala b/kernel/src/main/scala/actor/ActiveObject.scala
index 5baa045d7f..cb22fecbc1 100644
--- a/kernel/src/main/scala/actor/ActiveObject.scala
+++ b/kernel/src/main/scala/actor/ActiveObject.scala
@@ -220,6 +220,7 @@ object ActiveObject {
}
private[kernel] def newInstance[T](target: Class[T], actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
+ //if (getClass.getClassLoader.getResourceAsStream("META-INF/aop.xml") != null) println("000000000000000000000 FOUND AOP")
if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get)
val proxy = Proxy.newInstance(target, false, true)
actor.initialize(target, proxy)
@@ -230,6 +231,7 @@ object ActiveObject {
}
private[kernel] def newInstance[T](intf: Class[T], target: AnyRef, actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
+ //if (getClass.getClassLoader.getResourceAsStream("META-INF/aop.xml") != null) println("000000000000000000000 FOUND AOP")
if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get)
val proxy = Proxy.newInstance(Array(intf), Array(target), false, true)
actor.initialize(target.getClass, target)
diff --git a/kernel/src/main/scala/state/CassandraStorage.scala b/kernel/src/main/scala/state/CassandraStorage.scala
index 4622841026..2e2a0da805 100644
--- a/kernel/src/main/scala/state/CassandraStorage.scala
+++ b/kernel/src/main/scala/state/CassandraStorage.scala
@@ -20,7 +20,7 @@ import org.apache.thrift.protocol._
/**
* @author Jonas Bonér
*/
-object CassandraStorage extends Logging {
+object CassandraStorage extends MapStorage with VectorStorage with Logging {
val KEYSPACE = "akka"
val MAP_COLUMN_PARENT = new ColumnParent("map", null)
val VECTOR_COLUMN_PARENT = new ColumnParent("vector", null)
@@ -112,7 +112,7 @@ object CassandraStorage extends Logging {
// For Vector
// ===============================================================
- def insertVectorStorageEntryFor(name: String, element: AnyRef) = if (sessions.isDefined) {
+ override def insertVectorStorageEntryFor(name: String, element: AnyRef) = if (sessions.isDefined) {
sessions.get.withSession {
_ ++| (name,
new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(getVectorStorageSizeFor(name))),
@@ -122,7 +122,10 @@ object CassandraStorage extends Logging {
}
} else throw new IllegalStateException("CassandraStorage is not started")
- def getVectorStorageEntryFor(name: String, index: Int): AnyRef = if (sessions.isDefined) {
+ override def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
+ }
+
+ override def getVectorStorageEntryFor(name: String, index: Int): AnyRef = if (sessions.isDefined) {
val column: Option[Column] = sessions.get.withSession {
_ | (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index)))
}
@@ -130,7 +133,7 @@ object CassandraStorage extends Logging {
else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]")
} else throw new IllegalStateException("CassandraStorage is not started")
- def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = if (sessions.isDefined) {
+ override def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = if (sessions.isDefined) {
val startBytes = if (start.isDefined) intToBytes(start.get) else null
val finishBytes = if (finish.isDefined) intToBytes(finish.get) else null
val columns: List[Column] = sessions.get.withSession {
@@ -144,7 +147,7 @@ object CassandraStorage extends Logging {
columns.map(column => serializer.in(column.value, None))
} else throw new IllegalStateException("CassandraStorage is not started")
- def getVectorStorageSizeFor(name: String): Int = if (sessions.isDefined) {
+ override def getVectorStorageSizeFor(name: String): Int = if (sessions.isDefined) {
sessions.get.withSession {
_ |# (name, VECTOR_COLUMN_PARENT)
}
@@ -154,7 +157,7 @@ object CassandraStorage extends Logging {
// For Map
// ===============================================================
- def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = if (sessions.isDefined) {
+ override def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = if (sessions.isDefined) {
sessions.get.withSession {
_ ++| (name,
new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)),
@@ -164,7 +167,7 @@ object CassandraStorage extends Logging {
}
} else throw new IllegalStateException("CassandraStorage is not started")
- def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = if (sessions.isDefined) {
+ override def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = if (sessions.isDefined) {
val cf2columns: java.util.Map[String, java.util.List[Column]] = new java.util.HashMap
for (entry <- entries) {
val columns: java.util.List[Column] = new java.util.ArrayList
@@ -176,7 +179,7 @@ object CassandraStorage extends Logging {
}
} else throw new IllegalStateException("CassandraStorage is not started")
- def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = if (sessions.isDefined) {
+ override def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = if (sessions.isDefined) {
try {
val column: Option[Column] = sessions.get.withSession {
_ | (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)))
@@ -190,7 +193,7 @@ object CassandraStorage extends Logging {
}
} else throw new IllegalStateException("CassandraStorage is not started")
- def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) {
+ override def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) {
throw new UnsupportedOperationException
/*
val columns = server.get_columns_since(name, MAP_COLUMN_FAMILY, -1)
@@ -202,15 +205,15 @@ object CassandraStorage extends Logging {
*/
} else throw new IllegalStateException("CassandraStorage is not started")
- def getMapStorageSizeFor(name: String): Int = if (sessions.isDefined) {
+ override def getMapStorageSizeFor(name: String): Int = if (sessions.isDefined) {
sessions.get.withSession {
_ |# (name, MAP_COLUMN_PARENT)
}
} else throw new IllegalStateException("CassandraStorage is not started")
- def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null)
+ override def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null)
- def removeMapStorageFor(name: String, key: AnyRef): Unit = if (sessions.isDefined) {
+ override def removeMapStorageFor(name: String, key: AnyRef): Unit = if (sessions.isDefined) {
val keyBytes = if (key == null) null else serializer.out(key)
sessions.get.withSession {
_ -- (name,
@@ -220,7 +223,7 @@ object CassandraStorage extends Logging {
}
} else throw new IllegalStateException("CassandraStorage is not started")
- def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int):
+ override def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int):
List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) {
val startBytes = if (start.isDefined) serializer.out(start.get) else null
val finishBytes = if (finish.isDefined) serializer.out(finish.get) else null
diff --git a/kernel/src/main/scala/state/MongoStorage.scala b/kernel/src/main/scala/state/MongoStorage.scala
new file mode 100644
index 0000000000..657a6ec9fc
--- /dev/null
+++ b/kernel/src/main/scala/state/MongoStorage.scala
@@ -0,0 +1,258 @@
+package se.scalablesolutions.akka.kernel.state
+
+import com.mongodb._
+import se.scalablesolutions.akka.kernel.util.Logging
+import serialization.{Serializer}
+import kernel.Kernel.config
+
+import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList}
+
+object MongoStorage extends MapStorage
+ with VectorStorage with Logging {
+
+ // enrich with null safe findOne
+ class RichDBCollection(value: DBCollection) {
+ def findOneNS(o: DBObject): Option[DBObject] = {
+ value.findOne(o) match {
+ case null => None
+ case x => Some(x)
+ }
+ }
+ }
+
+ implicit def enrichDBCollection(c: DBCollection) = new RichDBCollection(c)
+
+ val KEY = "key"
+ val VALUE = "value"
+ val COLLECTION = "akka_coll"
+ val MONGODB_SERVER_HOSTNAME =
+ config.getString("akka.storage.mongodb.hostname", "127.0.0.1")
+ val MONGODB_SERVER_DBNAME =
+ config.getString("akka.storage.mongodb.dbname", "testdb")
+ val MONGODB_SERVER_PORT =
+ config.getInt("akka.storage.mongodb.port", 27017)
+
+ val db = new Mongo(MONGODB_SERVER_HOSTNAME,
+ MONGODB_SERVER_PORT, MONGODB_SERVER_DBNAME)
+ val coll = db.getCollection(COLLECTION)
+
+ // @fixme: make this pluggable
+ private[this] val serializer: Serializer = Serializer.ScalaJSON
+
+ override def insertMapStorageEntryFor(name: String,
+ key: AnyRef, value: AnyRef) {
+ insertMapStorageEntriesFor(name, List((key, value)))
+ }
+
+ override def insertMapStorageEntriesFor(name: String,
+ entries: List[Tuple2[AnyRef, AnyRef]]) {
+ import java.util.{Map, HashMap}
+
+ val m: Map[AnyRef, AnyRef] = new HashMap
+ for ((k, v) <- entries) {
+ m.put(k, serializer.out(v))
+ }
+
+ nullSafeFindOne(name) match {
+ case None =>
+ coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, m))
+ case Some(dbo) => {
+ // collate the maps
+ val o = dbo.get(VALUE).asInstanceOf[Map[AnyRef, AnyRef]]
+ o.putAll(m)
+
+ // remove existing reference
+ removeMapStorageFor(name)
+ // and insert
+ coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, o))
+ }
+ }
+ }
+
+ override def removeMapStorageFor(name: String) = {
+ val q = new BasicDBObject
+ q.put(KEY, name)
+ coll.remove(q)
+ }
+
+ override def removeMapStorageFor(name: String, key: AnyRef) = {
+ nullSafeFindOne(name) match {
+ case None =>
+ case Some(dbo) => {
+ val orig = dbo.get(VALUE).asInstanceOf[DBObject].toMap
+ orig.remove(key.asInstanceOf[String])
+
+ // remove existing reference
+ removeMapStorageFor(name)
+ // and insert
+ coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, orig))
+ }
+ }
+ }
+
+ override def getMapStorageEntryFor(name: String,
+ key: AnyRef): Option[AnyRef] = {
+ getValueForKey(name, key.asInstanceOf[String])
+ }
+
+ override def getMapStorageSizeFor(name: String): Int = {
+ nullSafeFindOne(name) match {
+ case None => 0
+ case Some(dbo) =>
+ dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]].keySet.size
+ }
+ }
+
+ override def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = {
+ val m =
+ nullSafeFindOne(name) match {
+ case None =>
+ throw new Predef.NoSuchElementException(name + " not present")
+ case Some(dbo) =>
+ dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
+ }
+ val n =
+ List(m.keySet.toArray: _*).asInstanceOf[List[String]]
+ val vals =
+ for(s <- n)
+ yield (s, serializer.in(m.get(s).asInstanceOf[Array[Byte]], None))
+ vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
+ }
+
+ override def getMapStorageRangeFor(name: String, start: Option[AnyRef],
+ finish: Option[AnyRef],
+ count: Int): List[Tuple2[AnyRef, AnyRef]] = {
+ val m =
+ nullSafeFindOne(name) match {
+ case None =>
+ throw new Predef.NoSuchElementException(name + " not present")
+ case Some(dbo) =>
+ dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
+ }
+
+ /**
+ * count is the max number of results to return. Start with
+ * start or 0 (if start is not defined) and go until
+ * you hit finish or count.
+ */
+ val s = if (start.isDefined) start.get.asInstanceOf[Int] else 0
+ val cnt =
+ if (finish.isDefined) {
+ val f = finish.get.asInstanceOf[Int]
+ if (f >= s) Math.min(count, (f - s)) else count
+ }
+ else count
+
+ val n =
+ List(m.keySet.toArray: _*).asInstanceOf[List[String]].sort((e1, e2) => (e1 compareTo e2) < 0).slice(s, s + cnt)
+ val vals =
+ for(s <- n)
+ yield (s, serializer.in(m.get(s).asInstanceOf[Array[Byte]], None))
+ vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
+ }
+
+ private def getValueForKey(name: String, key: String): Option[AnyRef] = {
+ try {
+ nullSafeFindOne(name) match {
+ case None => None
+ case Some(dbo) =>
+ Some(serializer.in(
+ dbo.get(VALUE)
+ .asInstanceOf[JMap[String, AnyRef]]
+ .get(key).asInstanceOf[Array[Byte]], None))
+ }
+ } catch {
+ case e =>
+ throw new Predef.NoSuchElementException(e.getMessage)
+ }
+ }
+
+ override def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
+ val q = new BasicDBObject
+ q.put(KEY, name)
+
+ val currentList =
+ coll.findOneNS(q) match {
+ case None =>
+ new JArrayList[AnyRef]
+ case Some(dbo) =>
+ dbo.get(VALUE).asInstanceOf[JArrayList[AnyRef]]
+ }
+ if (!currentList.isEmpty) {
+ // record exists
+ // remove before adding
+ coll.remove(q)
+ }
+
+ // add to the current list
+ elements.map(serializer.out(_)).foreach(currentList.add(_))
+
+ coll.insert(
+ new BasicDBObject()
+ .append(KEY, name)
+ .append(VALUE, currentList)
+ )
+ }
+
+ override def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
+ insertVectorStorageEntriesFor(name, List(element))
+ }
+
+ override def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
+ try {
+ val o =
+ nullSafeFindOne(name) match {
+ case None =>
+ throw new Predef.NoSuchElementException(name + " not present")
+
+ case Some(dbo) =>
+ dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
+ }
+ serializer.in(
+ o.get(index).asInstanceOf[Array[Byte]],
+ None
+ )
+ } catch {
+ case e =>
+ throw new Predef.NoSuchElementException(e.getMessage)
+ }
+ }
+
+ override def getVectorStorageRangeFor(name: String,
+ start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
+ try {
+ val o =
+ nullSafeFindOne(name) match {
+ case None =>
+ throw new Predef.NoSuchElementException(name + " not present")
+
+ case Some(dbo) =>
+ dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
+ }
+
+ // pick the subrange and make a Scala list
+ val l =
+ List(o.subList(start.get, start.get + count).toArray: _*)
+
+ for(e <- l)
+ yield serializer.in(e.asInstanceOf[Array[Byte]], None)
+ } catch {
+ case e =>
+ throw new Predef.NoSuchElementException(e.getMessage)
+ }
+ }
+
+ override def getVectorStorageSizeFor(name: String): Int = {
+ nullSafeFindOne(name) match {
+ case None => 0
+ case Some(dbo) =>
+ dbo.get(VALUE).asInstanceOf[JList[AnyRef]].size
+ }
+ }
+
+ private def nullSafeFindOne(name: String): Option[DBObject] = {
+ val o = new BasicDBObject
+ o.put(KEY, name)
+ coll.findOneNS(o)
+ }
+}
diff --git a/kernel/src/main/scala/state/State.scala b/kernel/src/main/scala/state/State.scala
index 46114eb1bc..964b2309a4 100644
--- a/kernel/src/main/scala/state/State.scala
+++ b/kernel/src/main/scala/state/State.scala
@@ -16,6 +16,7 @@ abstract class PersistentStorageConfig extends TransactionalStateConfig
case class CassandraStorageConfig extends PersistentStorageConfig
case class TerracottaStorageConfig extends PersistentStorageConfig
case class TokyoCabinetStorageConfig extends PersistentStorageConfig
+case class MongoStorageConfig extends PersistentStorageConfig
/**
* Scala API.
@@ -39,12 +40,14 @@ object TransactionalState extends TransactionalState
class TransactionalState {
def newPersistentMap(config: PersistentStorageConfig): TransactionalMap[AnyRef, AnyRef] = config match {
case CassandraStorageConfig() => new CassandraPersistentTransactionalMap
+ case MongoStorageConfig() => new MongoPersistentTransactionalMap
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
}
def newPersistentVector(config: PersistentStorageConfig): TransactionalVector[AnyRef] = config match {
case CassandraStorageConfig() => new CassandraPersistentTransactionalVector
+ case MongoStorageConfig() => new MongoPersistentTransactionalVector
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
}
@@ -194,16 +197,23 @@ abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
}
/**
- * Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage.
+ * Implementation of PersistentTransactionalMap for every concrete
+ * storage will have the same workflow. This abstracts the workflow.
+ *
+ * Subclasses just need to provide the actual concrete instance for the
+ * abstract val storage.
*
* @author Jonas Bonér
*/
-class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[AnyRef, AnyRef] {
+abstract class TemplatePersistentTransactionalMap extends PersistentTransactionalMap[AnyRef, AnyRef] {
+
+ // to be concretized in subclasses
+ val storage: MapStorage
override def remove(key: AnyRef) = {
verifyTransaction
if (changeSet.contains(key)) changeSet -= key
- else CassandraStorage.removeMapStorageFor(uuid, key)
+ else storage.removeMapStorageFor(uuid, key)
}
override def getRange(start: Option[AnyRef], count: Int) =
@@ -212,7 +222,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
def getRange(start: Option[AnyRef], finish: Option[AnyRef], count: Int) = {
verifyTransaction
try {
- CassandraStorage.getMapStorageRangeFor(uuid, start, finish, count)
+ storage.getMapStorageRangeFor(uuid, start, finish, count)
} catch {
case e: Exception => Nil
}
@@ -220,7 +230,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
// ---- For Transactional ----
override def commit = {
- CassandraStorage.insertMapStorageEntriesFor(uuid, changeSet.toList)
+ storage.insertMapStorageEntriesFor(uuid, changeSet.toList)
changeSet.clear
}
@@ -228,7 +238,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
override def clear = {
verifyTransaction
try {
- CassandraStorage.removeMapStorageFor(uuid)
+ storage.removeMapStorageFor(uuid)
} catch {
case e: Exception => {}
}
@@ -237,7 +247,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
override def contains(key: AnyRef): Boolean = {
try {
verifyTransaction
- CassandraStorage.getMapStorageEntryFor(uuid, key).isDefined
+ storage.getMapStorageEntryFor(uuid, key).isDefined
} catch {
case e: Exception => false
}
@@ -246,7 +256,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
override def size: Int = {
verifyTransaction
try {
- CassandraStorage.getMapStorageSizeFor(uuid)
+ storage.getMapStorageSizeFor(uuid)
} catch {
case e: Exception => 0
}
@@ -258,7 +268,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
// if (changeSet.contains(key)) changeSet.get(key)
// else {
val result = try {
- CassandraStorage.getMapStorageEntryFor(uuid, key)
+ storage.getMapStorageEntryFor(uuid, key)
} catch {
case e: Exception => None
}
@@ -270,7 +280,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
//verifyTransaction
new Iterator[Tuple2[AnyRef, AnyRef]] {
private val originalList: List[Tuple2[AnyRef, AnyRef]] = try {
- CassandraStorage.getMapStorageFor(uuid)
+ storage.getMapStorageFor(uuid)
} catch {
case e: Throwable => Nil
}
@@ -285,6 +295,25 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
}
}
+
+/**
+ * Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage.
+ *
+ * @author Debasish Ghosh
+ */
+class CassandraPersistentTransactionalMap extends TemplatePersistentTransactionalMap {
+ val storage = CassandraStorage
+}
+
+/**
+ * Implements a persistent transactional map based on the MongoDB distributed P2P key-value storage.
+ *
+ * @author Debasish Ghosh
+ */
+class MongoPersistentTransactionalMap extends TemplatePersistentTransactionalMap {
+ val storage = MongoStorage
+}
+
/**
* Base for all transactional vector implementations.
*
@@ -382,17 +411,19 @@ abstract class PersistentTransactionalVector[T] extends TransactionalVector[T] {
}
/**
- * Implements a persistent transactional vector based on the Cassandra distributed P2P key-value storage.
+ * Implements a template for a concrete persistent transactional vector based storage.
*
- * @author Jonas Bonér
+ * @author Debasish Ghosh
*/
-class CassandraPersistentTransactionalVector extends PersistentTransactionalVector[AnyRef] {
+abstract class TemplatePersistentTransactionalVector extends PersistentTransactionalVector[AnyRef] {
+
+ val storage: VectorStorage
// ---- For TransactionalVector ----
override def get(index: Int): AnyRef = {
verifyTransaction
if (changeSet.size > index) changeSet(index)
- else CassandraStorage.getVectorStorageEntryFor(uuid, index)
+ else storage.getVectorStorageEntryFor(uuid, index)
}
override def getRange(start: Int, count: Int): List[AnyRef] =
@@ -400,12 +431,12 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect
def getRange(start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
verifyTransaction
- CassandraStorage.getVectorStorageRangeFor(uuid, start, finish, count)
+ storage.getVectorStorageRangeFor(uuid, start, finish, count)
}
override def length: Int = {
verifyTransaction
- CassandraStorage.getVectorStorageSizeFor(uuid)
+ storage.getVectorStorageSizeFor(uuid)
}
override def apply(index: Int): AnyRef = get(index)
@@ -422,11 +453,29 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect
// ---- For Transactional ----
override def commit = {
// FIXME: should use batch function once the bug is resolved
- for (element <- changeSet) CassandraStorage.insertVectorStorageEntryFor(uuid, element)
+ for (element <- changeSet) storage.insertVectorStorageEntryFor(uuid, element)
changeSet.clear
}
}
+/**
+ * Implements a persistent transactional vector based on the Cassandra distributed P2P key-value storage.
+ *
+ * @author Debaissh Ghosh
+ */
+class CassandraPersistentTransactionalVector extends TemplatePersistentTransactionalVector {
+ val storage = CassandraStorage
+}
+
+/**
+ * Implements a persistent transactional vector based on the MongoDB distributed P2P key-value storage.
+ *
+ * @author Debaissh Ghosh
+ */
+class MongoPersistentTransactionalVector extends TemplatePersistentTransactionalVector {
+ val storage = MongoStorage
+}
+
/**
* Implements a transactional reference.
*
diff --git a/kernel/src/main/scala/state/Storage.scala b/kernel/src/main/scala/state/Storage.scala
new file mode 100644
index 0000000000..2d31695af5
--- /dev/null
+++ b/kernel/src/main/scala/state/Storage.scala
@@ -0,0 +1,27 @@
+package se.scalablesolutions.akka.kernel.state
+
+// abstracts persistence storage
+trait Storage {
+}
+
+// for Maps
+trait MapStorage extends Storage {
+ def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]])
+ def insertMapStorageEntryFor(name: String, key: AnyRef, value: AnyRef)
+ def removeMapStorageFor(name: String)
+ def removeMapStorageFor(name: String, key: AnyRef)
+ def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef]
+ def getMapStorageSizeFor(name: String): Int
+ def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]]
+ def getMapStorageRangeFor(name: String, start: Option[AnyRef],
+ finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]]
+}
+
+// for vectors
+trait VectorStorage extends Storage {
+ def insertVectorStorageEntryFor(name: String, element: AnyRef)
+ def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef])
+ def getVectorStorageEntryFor(name: String, index: Int): AnyRef
+ def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef]
+ def getVectorStorageSizeFor(name: String): Int
+}
diff --git a/kernel/src/test/scala/MongoStorageSpec.scala b/kernel/src/test/scala/MongoStorageSpec.scala
new file mode 100644
index 0000000000..5baa03baae
--- /dev/null
+++ b/kernel/src/test/scala/MongoStorageSpec.scala
@@ -0,0 +1,272 @@
+package se.scalablesolutions.akka.kernel.state
+
+import junit.framework.TestCase
+
+import org.junit.{Test, Before}
+import org.junit.Assert._
+
+class MongoStorageSpec extends TestCase {
+
+ val changeSetV = new scala.collection.mutable.ArrayBuffer[AnyRef]
+ val changeSetM = new scala.collection.mutable.HashMap[AnyRef, AnyRef]
+
+ override def setUp = {
+ MongoStorage.coll.drop
+ }
+
+ @Test
+ def testVectorInsertForTransactionId = {
+ changeSetV += "debasish" // string
+ changeSetV += List(1, 2, 3) // Scala List
+ changeSetV += List(100, 200)
+ MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
+ assertEquals(
+ 3,
+ MongoStorage.getVectorStorageSizeFor("U-A1"))
+ changeSetV.clear
+
+ // changeSetV should be reinitialized
+ changeSetV += List(12, 23, 45)
+ changeSetV += "maulindu"
+ MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
+ assertEquals(
+ 5,
+ MongoStorage.getVectorStorageSizeFor("U-A1"))
+
+ // add more to the same changeSetV
+ changeSetV += "ramanendu"
+ changeSetV += Map(1 -> "dg", 2 -> "mc")
+
+ // add for a diff transaction
+ MongoStorage.insertVectorStorageEntriesFor("U-A2", changeSetV.toList)
+ assertEquals(
+ 4,
+ MongoStorage.getVectorStorageSizeFor("U-A2"))
+
+ // previous transaction change set should remain same
+ assertEquals(
+ 5,
+ MongoStorage.getVectorStorageSizeFor("U-A1"))
+
+ // test single element entry
+ MongoStorage.insertVectorStorageEntryFor("U-A1", Map(1->1, 2->4, 3->9))
+ assertEquals(
+ 6,
+ MongoStorage.getVectorStorageSizeFor("U-A1"))
+ }
+
+ @Test
+ def testVectorFetchForKeys = {
+
+ // initially everything 0
+ assertEquals(
+ 0,
+ MongoStorage.getVectorStorageSizeFor("U-A2"))
+
+ assertEquals(
+ 0,
+ MongoStorage.getVectorStorageSizeFor("U-A1"))
+
+ // get some stuff
+ changeSetV += "debasish"
+ changeSetV += List(12, 13, 14)
+ MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
+
+ assertEquals(
+ 2,
+ MongoStorage.getVectorStorageSizeFor("U-A1"))
+
+ assertEquals(
+ "debasish",
+ MongoStorage.getVectorStorageEntryFor("U-A1", 0).asInstanceOf[String])
+
+ assertEquals(
+ List(12, 13, 14),
+ MongoStorage.getVectorStorageEntryFor("U-A1", 1).asInstanceOf[List[Int]])
+
+ changeSetV.clear
+ changeSetV += Map(1->1, 2->4, 3->9)
+ changeSetV += BigInt(2310)
+ changeSetV += List(100, 200, 300)
+ MongoStorage.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
+
+ assertEquals(
+ 5,
+ MongoStorage.getVectorStorageSizeFor("U-A1"))
+
+ val r =
+ MongoStorage.getVectorStorageRangeFor("U-A1", Some(1), None, 3)
+
+ assertEquals(3, r.size)
+ assertEquals(List(12, 13, 14), r(0).asInstanceOf[List[Int]])
+ }
+
+ @Test
+ def testVectorFetchForNonExistentKeys = {
+ try {
+ MongoStorage.getVectorStorageEntryFor("U-A1", 1)
+ fail("should throw an exception")
+ } catch {case e: Predef.NoSuchElementException => {}}
+
+ try {
+ MongoStorage.getVectorStorageRangeFor("U-A1", Some(2), None, 12)
+ fail("should throw an exception")
+ } catch {case e: Predef.NoSuchElementException => {}}
+ }
+
+ @Test
+ def testMapInsertForTransactionId = {
+ case class Foo(no: Int, name: String)
+ fillMap
+
+ // add some more to changeSet
+ changeSetM += "5" -> Foo(12, "dg")
+ changeSetM += "6" -> java.util.Calendar.getInstance.getTime
+
+ // insert all into Mongo
+ MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
+ assertEquals(
+ 6,
+ MongoStorage.getMapStorageSizeFor("U-M1"))
+
+ // individual insert api
+ MongoStorage.insertMapStorageEntryFor("U-M1", "7", "akka")
+ MongoStorage.insertMapStorageEntryFor("U-M1", "8", List(23, 25))
+ assertEquals(
+ 8,
+ MongoStorage.getMapStorageSizeFor("U-M1"))
+
+ // add the same changeSet for another transaction
+ MongoStorage.insertMapStorageEntriesFor("U-M2", changeSetM.toList)
+ assertEquals(
+ 6,
+ MongoStorage.getMapStorageSizeFor("U-M2"))
+
+ // the first transaction should remain the same
+ assertEquals(
+ 8,
+ MongoStorage.getMapStorageSizeFor("U-M1"))
+ changeSetM.clear
+ }
+
+ @Test
+ def testMapContents = {
+ fillMap
+ MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
+ MongoStorage.getMapStorageEntryFor("U-M1", "2") match {
+ case Some(x) => assertEquals("peter", x.asInstanceOf[String])
+ case None => fail("should fetch peter")
+ }
+ MongoStorage.getMapStorageEntryFor("U-M1", "4") match {
+ case Some(x) => assertEquals(3, x.asInstanceOf[List[Int]].size)
+ case None => fail("should fetch list")
+ }
+ MongoStorage.getMapStorageEntryFor("U-M1", "3") match {
+ case Some(x) => assertEquals(2, x.asInstanceOf[List[Int]].size)
+ case None => fail("should fetch list")
+ }
+
+ // get the entire map
+ val l: List[Tuple2[AnyRef, AnyRef]] =
+ MongoStorage.getMapStorageFor("U-M1")
+
+ assertEquals(4, l.size)
+ assertTrue(l.map(_._1).contains("1"))
+ assertTrue(l.map(_._1).contains("2"))
+ assertTrue(l.map(_._1).contains("3"))
+ assertTrue(l.map(_._1).contains("4"))
+
+ assertTrue(l.map(_._2).contains("john"))
+
+ // trying to fetch for a non-existent transaction will throw
+ try {
+ MongoStorage.getMapStorageFor("U-M2")
+ fail("should throw an exception")
+ } catch {case e: Predef.NoSuchElementException => {}}
+
+ changeSetM.clear
+ }
+
+ @Test
+ def testMapContentsByRange = {
+ fillMap
+ changeSetM += "5" -> Map(1 -> "dg", 2 -> "mc")
+ MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
+
+ // specify start and count
+ val l: List[Tuple2[AnyRef, AnyRef]] =
+ MongoStorage.getMapStorageRangeFor(
+ "U-M1", Some(Integer.valueOf(2)), None, 3)
+
+ assertEquals(3, l.size)
+ assertEquals("3", l(0)._1.asInstanceOf[String])
+ assertEquals(List(100, 200), l(0)._2.asInstanceOf[List[Int]])
+ assertEquals("4", l(1)._1.asInstanceOf[String])
+ assertEquals(List(10, 20, 30), l(1)._2.asInstanceOf[List[Int]])
+
+ // specify start, finish and count where finish - start == count
+ assertEquals(3,
+ MongoStorage.getMapStorageRangeFor(
+ "U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(5)), 3).size)
+
+ // specify start, finish and count where finish - start > count
+ assertEquals(3,
+ MongoStorage.getMapStorageRangeFor(
+ "U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(9)), 3).size)
+
+ // do not specify start or finish
+ assertEquals(3,
+ MongoStorage.getMapStorageRangeFor(
+ "U-M1", None, None, 3).size)
+
+ // specify finish and count
+ assertEquals(3,
+ MongoStorage.getMapStorageRangeFor(
+ "U-M1", None, Some(Integer.valueOf(3)), 3).size)
+
+ // specify start, finish and count where finish < start
+ assertEquals(3,
+ MongoStorage.getMapStorageRangeFor(
+ "U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(1)), 3).size)
+
+ changeSetM.clear
+ }
+
+ @Test
+ def testMapStorageRemove = {
+ fillMap
+ changeSetM += "5" -> Map(1 -> "dg", 2 -> "mc")
+
+ MongoStorage.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
+ assertEquals(5,
+ MongoStorage.getMapStorageSizeFor("U-M1"))
+
+ // remove key "3"
+ MongoStorage.removeMapStorageFor("U-M1", "3")
+ assertEquals(4,
+ MongoStorage.getMapStorageSizeFor("U-M1"))
+
+ try {
+ MongoStorage.getMapStorageEntryFor("U-M1", "3")
+ fail("should throw exception")
+ } catch { case e => {}}
+
+ // remove the whole stuff
+ MongoStorage.removeMapStorageFor("U-M1")
+
+ try {
+ MongoStorage.getMapStorageFor("U-M1")
+ fail("should throw exception")
+ } catch { case e: NoSuchElementException => {}}
+
+ changeSetM.clear
+ }
+
+ private def fillMap = {
+ changeSetM += "1" -> "john"
+ changeSetM += "2" -> "peter"
+ changeSetM += "3" -> List(100, 200)
+ changeSetM += "4" -> List(10, 20, 30)
+ changeSetM
+ }
+}