diff --git a/kernel/src/main/scala/state/CassandraStorage.scala b/kernel/src/main/scala/state/CassandraStorage.scala index 4622841026..2e2a0da805 100644 --- a/kernel/src/main/scala/state/CassandraStorage.scala +++ b/kernel/src/main/scala/state/CassandraStorage.scala @@ -20,7 +20,7 @@ import org.apache.thrift.protocol._ /** * @author Jonas Bonér */ -object CassandraStorage extends Logging { +object CassandraStorage extends MapStorage with VectorStorage with Logging { val KEYSPACE = "akka" val MAP_COLUMN_PARENT = new ColumnParent("map", null) val VECTOR_COLUMN_PARENT = new ColumnParent("vector", null) @@ -112,7 +112,7 @@ object CassandraStorage extends Logging { // For Vector // =============================================================== - def insertVectorStorageEntryFor(name: String, element: AnyRef) = if (sessions.isDefined) { + override def insertVectorStorageEntryFor(name: String, element: AnyRef) = if (sessions.isDefined) { sessions.get.withSession { _ ++| (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(getVectorStorageSizeFor(name))), @@ -122,7 +122,10 @@ object CassandraStorage extends Logging { } } else throw new IllegalStateException("CassandraStorage is not started") - def getVectorStorageEntryFor(name: String, index: Int): AnyRef = if (sessions.isDefined) { + override def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = { + } + + override def getVectorStorageEntryFor(name: String, index: Int): AnyRef = if (sessions.isDefined) { val column: Option[Column] = sessions.get.withSession { _ | (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index))) } @@ -130,7 +133,7 @@ object CassandraStorage extends Logging { else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]") } else throw new IllegalStateException("CassandraStorage is not started") - def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = if (sessions.isDefined) { + override def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = if (sessions.isDefined) { val startBytes = if (start.isDefined) intToBytes(start.get) else null val finishBytes = if (finish.isDefined) intToBytes(finish.get) else null val columns: List[Column] = sessions.get.withSession { @@ -144,7 +147,7 @@ object CassandraStorage extends Logging { columns.map(column => serializer.in(column.value, None)) } else throw new IllegalStateException("CassandraStorage is not started") - def getVectorStorageSizeFor(name: String): Int = if (sessions.isDefined) { + override def getVectorStorageSizeFor(name: String): Int = if (sessions.isDefined) { sessions.get.withSession { _ |# (name, VECTOR_COLUMN_PARENT) } @@ -154,7 +157,7 @@ object CassandraStorage extends Logging { // For Map // =============================================================== - def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = if (sessions.isDefined) { + override def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = if (sessions.isDefined) { sessions.get.withSession { _ ++| (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)), @@ -164,7 +167,7 @@ object CassandraStorage extends Logging { } } else throw new IllegalStateException("CassandraStorage is not started") - def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = if (sessions.isDefined) { + override def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = if (sessions.isDefined) { val cf2columns: java.util.Map[String, java.util.List[Column]] = new java.util.HashMap for (entry <- entries) { val columns: java.util.List[Column] = new java.util.ArrayList @@ -176,7 +179,7 @@ object CassandraStorage extends Logging { } } else throw new IllegalStateException("CassandraStorage is not started") - def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = if (sessions.isDefined) { + override def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = if (sessions.isDefined) { try { val column: Option[Column] = sessions.get.withSession { _ | (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key))) @@ -190,7 +193,7 @@ object CassandraStorage extends Logging { } } else throw new IllegalStateException("CassandraStorage is not started") - def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) { + override def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) { throw new UnsupportedOperationException /* val columns = server.get_columns_since(name, MAP_COLUMN_FAMILY, -1) @@ -202,15 +205,15 @@ object CassandraStorage extends Logging { */ } else throw new IllegalStateException("CassandraStorage is not started") - def getMapStorageSizeFor(name: String): Int = if (sessions.isDefined) { + override def getMapStorageSizeFor(name: String): Int = if (sessions.isDefined) { sessions.get.withSession { _ |# (name, MAP_COLUMN_PARENT) } } else throw new IllegalStateException("CassandraStorage is not started") - def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null) + override def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null) - def removeMapStorageFor(name: String, key: AnyRef): Unit = if (sessions.isDefined) { + override def removeMapStorageFor(name: String, key: AnyRef): Unit = if (sessions.isDefined) { val keyBytes = if (key == null) null else serializer.out(key) sessions.get.withSession { _ -- (name, @@ -220,7 +223,7 @@ object CassandraStorage extends Logging { } } else throw new IllegalStateException("CassandraStorage is not started") - def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int): + override def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) { val startBytes = if (start.isDefined) serializer.out(start.get) else null val finishBytes = if (finish.isDefined) serializer.out(finish.get) else null diff --git a/kernel/src/main/scala/state/MongoStorage.scala b/kernel/src/main/scala/state/MongoStorage.scala new file mode 100644 index 0000000000..286fa9e9c7 --- /dev/null +++ b/kernel/src/main/scala/state/MongoStorage.scala @@ -0,0 +1,226 @@ +package se.scalablesolutions.akka.kernel.state + +import com.mongodb._ +import se.scalablesolutions.akka.kernel.util.Logging +import serialization.{Serializer} + +import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList} + +object MongoStorage extends MapStorage + with VectorStorage with Logging { + + // enrich with null safe findOne + class RichDBCollection(value: DBCollection) { + def findOneNS(o: DBObject): Option[DBObject] = { + value.findOne(o) match { + case null => None + case x => Some(x) + } + } + } + + implicit def enrichDBCollection(c: DBCollection) = new RichDBCollection(c) + + val KEY = "key" + val VALUE = "value" + val db = new Mongo("mydb"); // @fixme: need to externalize + val COLLECTION = "akka_coll" + val coll = db.getCollection(COLLECTION) + + // @fixme: make this pluggable + private[this] val serializer: Serializer = Serializer.ScalaJSON + + override def insertMapStorageEntryFor(name: String, key: AnyRef, value: AnyRef) { + insertMapStorageEntriesFor(name, List((key, value))) + } + + override def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) { + import java.util.{Map, HashMap} + + val m: Map[AnyRef, AnyRef] = new HashMap + for ((k, v) <- entries) { + m.put(k, serializer.out(v)) + } + + nullSafeFindOne(name) match { + case None => + coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, m)) + case Some(dbo) => { + // collate the maps + val o = dbo.get(VALUE).asInstanceOf[Map[AnyRef, AnyRef]] + o.putAll(m) + + // remove existing reference + removeMapStorageFor(name) + // and insert + coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, o)) + } + } + } + + override def removeMapStorageFor(name: String) = { + val q = new BasicDBObject + q.put(KEY, name) + coll.remove(q) + } + + override def removeMapStorageFor(name: String, key: AnyRef) = { + } + + override def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = { + try { + getValueForKey(name, key.asInstanceOf[String]) + } catch { + case e => + e.printStackTrace + None + } + } + + override def getMapStorageSizeFor(name: String): Int = { + nullSafeFindOne(name) match { + case None => 0 + case Some(dbo) => + dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]].keySet.size + } + } + + override def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = { + val m = + nullSafeFindOne(name) match { + case None => + throw new Predef.NoSuchElementException(name + " not present") + case Some(dbo) => + dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]] + } + val n = + List(m.keySet.toArray: _*).asInstanceOf[List[String]] + val vals = + for(s <- n) + yield (s, serializer.in(m.get(s).asInstanceOf[Array[Byte]], None)) + vals.asInstanceOf[List[Tuple2[String, AnyRef]]] + } + + override def getMapStorageRangeFor(name: String, start: Option[AnyRef], + finish: Option[AnyRef], + count: Int): List[Tuple2[AnyRef, AnyRef]] = { + val m = + nullSafeFindOne(name) match { + case None => + throw new Predef.NoSuchElementException(name + " not present") + case Some(dbo) => + dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]] + } + val s = start.get.asInstanceOf[Int] + val n = + List(m.keySet.toArray: _*).asInstanceOf[List[String]].slice(s, s + count) + val vals = + for(s <- n) + yield (s, serializer.in(m.get(s).asInstanceOf[Array[Byte]], None)) + vals.asInstanceOf[List[Tuple2[String, AnyRef]]] + } + + private def getValueForKey(name: String, key: String): Option[AnyRef] = { + try { + nullSafeFindOne(name) match { + case None => None + case Some(dbo) => + Some(serializer.in( + dbo.get(VALUE) + .asInstanceOf[JMap[String, AnyRef]] + .get(key).asInstanceOf[Array[Byte]], None)) + } + } catch { + case e => + throw new Predef.NoSuchElementException(e.getMessage) + } + } + + def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = { + val q = new BasicDBObject + q.put(KEY, name) + + val currentList = + coll.findOneNS(q) match { + case None => + new JArrayList[AnyRef] + case Some(dbo) => + dbo.get(VALUE).asInstanceOf[JArrayList[AnyRef]] + } + if (!currentList.isEmpty) { + // record exists + // remove before adding + coll.remove(q) + } + + // add to the current list + elements.map(serializer.out(_)).foreach(currentList.add(_)) + + coll.insert( + new BasicDBObject() + .append(KEY, name) + .append(VALUE, currentList) + ) + } + + override def insertVectorStorageEntryFor(name: String, element: AnyRef) = { + insertVectorStorageEntriesFor(name, List(element)) + } + + override def getVectorStorageEntryFor(name: String, index: Int): AnyRef = { + try { + val o = + nullSafeFindOne(name) match { + case None => + throw new Predef.NoSuchElementException(name + " not present") + + case Some(dbo) => + dbo.get(VALUE).asInstanceOf[JList[AnyRef]] + } + serializer.in( + o.get(index).asInstanceOf[Array[Byte]], + None + ) + } catch { + case e => + throw new Predef.NoSuchElementException(e.getMessage) + } + } + + override def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = { + try { + val o = + nullSafeFindOne(name) match { + case None => + throw new Predef.NoSuchElementException(name + " not present") + + case Some(dbo) => + dbo.get(VALUE).asInstanceOf[JList[AnyRef]] + } + + // pick the subrange and make a Scala list + val l = + List(o.subList(start.get, start.get + count).toArray: _*) + + for(e <- l) + yield serializer.in(e.asInstanceOf[Array[Byte]], None) + } catch { + case e => + throw new Predef.NoSuchElementException(e.getMessage) + } + } + + override def getVectorStorageSizeFor(name: String): Int = { + nullSafeFindOne(name) match { + case None => 0 + case Some(dbo) => + dbo.get(VALUE).asInstanceOf[JList[AnyRef]].size + } + } + + private def nullSafeFindOne(name: String): Option[DBObject] = { + val o = new BasicDBObject + o.put(KEY, name) + coll.findOneNS(o) + } +} diff --git a/kernel/src/main/scala/state/State.scala b/kernel/src/main/scala/state/State.scala index 46114eb1bc..964b2309a4 100644 --- a/kernel/src/main/scala/state/State.scala +++ b/kernel/src/main/scala/state/State.scala @@ -16,6 +16,7 @@ abstract class PersistentStorageConfig extends TransactionalStateConfig case class CassandraStorageConfig extends PersistentStorageConfig case class TerracottaStorageConfig extends PersistentStorageConfig case class TokyoCabinetStorageConfig extends PersistentStorageConfig +case class MongoStorageConfig extends PersistentStorageConfig /** * Scala API. @@ -39,12 +40,14 @@ object TransactionalState extends TransactionalState class TransactionalState { def newPersistentMap(config: PersistentStorageConfig): TransactionalMap[AnyRef, AnyRef] = config match { case CassandraStorageConfig() => new CassandraPersistentTransactionalMap + case MongoStorageConfig() => new MongoPersistentTransactionalMap case TerracottaStorageConfig() => throw new UnsupportedOperationException case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException } def newPersistentVector(config: PersistentStorageConfig): TransactionalVector[AnyRef] = config match { case CassandraStorageConfig() => new CassandraPersistentTransactionalVector + case MongoStorageConfig() => new MongoPersistentTransactionalVector case TerracottaStorageConfig() => throw new UnsupportedOperationException case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException } @@ -194,16 +197,23 @@ abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] { } /** - * Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage. + * Implementation of PersistentTransactionalMap for every concrete + * storage will have the same workflow. This abstracts the workflow. + * + * Subclasses just need to provide the actual concrete instance for the + * abstract val storage. * * @author Jonas Bonér */ -class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[AnyRef, AnyRef] { +abstract class TemplatePersistentTransactionalMap extends PersistentTransactionalMap[AnyRef, AnyRef] { + + // to be concretized in subclasses + val storage: MapStorage override def remove(key: AnyRef) = { verifyTransaction if (changeSet.contains(key)) changeSet -= key - else CassandraStorage.removeMapStorageFor(uuid, key) + else storage.removeMapStorageFor(uuid, key) } override def getRange(start: Option[AnyRef], count: Int) = @@ -212,7 +222,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any def getRange(start: Option[AnyRef], finish: Option[AnyRef], count: Int) = { verifyTransaction try { - CassandraStorage.getMapStorageRangeFor(uuid, start, finish, count) + storage.getMapStorageRangeFor(uuid, start, finish, count) } catch { case e: Exception => Nil } @@ -220,7 +230,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any // ---- For Transactional ---- override def commit = { - CassandraStorage.insertMapStorageEntriesFor(uuid, changeSet.toList) + storage.insertMapStorageEntriesFor(uuid, changeSet.toList) changeSet.clear } @@ -228,7 +238,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any override def clear = { verifyTransaction try { - CassandraStorage.removeMapStorageFor(uuid) + storage.removeMapStorageFor(uuid) } catch { case e: Exception => {} } @@ -237,7 +247,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any override def contains(key: AnyRef): Boolean = { try { verifyTransaction - CassandraStorage.getMapStorageEntryFor(uuid, key).isDefined + storage.getMapStorageEntryFor(uuid, key).isDefined } catch { case e: Exception => false } @@ -246,7 +256,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any override def size: Int = { verifyTransaction try { - CassandraStorage.getMapStorageSizeFor(uuid) + storage.getMapStorageSizeFor(uuid) } catch { case e: Exception => 0 } @@ -258,7 +268,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any // if (changeSet.contains(key)) changeSet.get(key) // else { val result = try { - CassandraStorage.getMapStorageEntryFor(uuid, key) + storage.getMapStorageEntryFor(uuid, key) } catch { case e: Exception => None } @@ -270,7 +280,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any //verifyTransaction new Iterator[Tuple2[AnyRef, AnyRef]] { private val originalList: List[Tuple2[AnyRef, AnyRef]] = try { - CassandraStorage.getMapStorageFor(uuid) + storage.getMapStorageFor(uuid) } catch { case e: Throwable => Nil } @@ -285,6 +295,25 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any } } + +/** + * Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage. + * + * @author Debasish Ghosh + */ +class CassandraPersistentTransactionalMap extends TemplatePersistentTransactionalMap { + val storage = CassandraStorage +} + +/** + * Implements a persistent transactional map based on the MongoDB distributed P2P key-value storage. + * + * @author Debasish Ghosh + */ +class MongoPersistentTransactionalMap extends TemplatePersistentTransactionalMap { + val storage = MongoStorage +} + /** * Base for all transactional vector implementations. * @@ -382,17 +411,19 @@ abstract class PersistentTransactionalVector[T] extends TransactionalVector[T] { } /** - * Implements a persistent transactional vector based on the Cassandra distributed P2P key-value storage. + * Implements a template for a concrete persistent transactional vector based storage. * - * @author Jonas Bonér + * @author Debasish Ghosh */ -class CassandraPersistentTransactionalVector extends PersistentTransactionalVector[AnyRef] { +abstract class TemplatePersistentTransactionalVector extends PersistentTransactionalVector[AnyRef] { + + val storage: VectorStorage // ---- For TransactionalVector ---- override def get(index: Int): AnyRef = { verifyTransaction if (changeSet.size > index) changeSet(index) - else CassandraStorage.getVectorStorageEntryFor(uuid, index) + else storage.getVectorStorageEntryFor(uuid, index) } override def getRange(start: Int, count: Int): List[AnyRef] = @@ -400,12 +431,12 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect def getRange(start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = { verifyTransaction - CassandraStorage.getVectorStorageRangeFor(uuid, start, finish, count) + storage.getVectorStorageRangeFor(uuid, start, finish, count) } override def length: Int = { verifyTransaction - CassandraStorage.getVectorStorageSizeFor(uuid) + storage.getVectorStorageSizeFor(uuid) } override def apply(index: Int): AnyRef = get(index) @@ -422,11 +453,29 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect // ---- For Transactional ---- override def commit = { // FIXME: should use batch function once the bug is resolved - for (element <- changeSet) CassandraStorage.insertVectorStorageEntryFor(uuid, element) + for (element <- changeSet) storage.insertVectorStorageEntryFor(uuid, element) changeSet.clear } } +/** + * Implements a persistent transactional vector based on the Cassandra distributed P2P key-value storage. + * + * @author Debaissh Ghosh + */ +class CassandraPersistentTransactionalVector extends TemplatePersistentTransactionalVector { + val storage = CassandraStorage +} + +/** + * Implements a persistent transactional vector based on the MongoDB distributed P2P key-value storage. + * + * @author Debaissh Ghosh + */ +class MongoPersistentTransactionalVector extends TemplatePersistentTransactionalVector { + val storage = MongoStorage +} + /** * Implements a transactional reference. * diff --git a/kernel/src/main/scala/state/Storage.scala b/kernel/src/main/scala/state/Storage.scala new file mode 100644 index 0000000000..2d31695af5 --- /dev/null +++ b/kernel/src/main/scala/state/Storage.scala @@ -0,0 +1,27 @@ +package se.scalablesolutions.akka.kernel.state + +// abstracts persistence storage +trait Storage { +} + +// for Maps +trait MapStorage extends Storage { + def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) + def insertMapStorageEntryFor(name: String, key: AnyRef, value: AnyRef) + def removeMapStorageFor(name: String) + def removeMapStorageFor(name: String, key: AnyRef) + def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] + def getMapStorageSizeFor(name: String): Int + def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] + def getMapStorageRangeFor(name: String, start: Option[AnyRef], + finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] +} + +// for vectors +trait VectorStorage extends Storage { + def insertVectorStorageEntryFor(name: String, element: AnyRef) + def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) + def getVectorStorageEntryFor(name: String, index: Int): AnyRef + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] + def getVectorStorageSizeFor(name: String): Int +}