From 73bc6855150edfb2e3d876497844b9b650cf6ee6 Mon Sep 17 00:00:00 2001 From: debasishg Date: Fri, 14 Aug 2009 16:05:35 +0530 Subject: [PATCH] added Ref semantics to MongoStorage + refactoring for commonality + added test cases --- .../main/scala/state/CassandraStorage.scala | 7 ++-- .../src/main/scala/state/MongoStorage.scala | 25 +++++++++++++- kernel/src/main/scala/state/State.scala | 17 ++++++++-- kernel/src/main/scala/state/Storage.scala | 6 ++++ kernel/src/test/scala/MongoStorageSpec.scala | 34 +++++++++++++++++++ 5 files changed, 82 insertions(+), 7 deletions(-) diff --git a/kernel/src/main/scala/state/CassandraStorage.scala b/kernel/src/main/scala/state/CassandraStorage.scala index 2e2a0da805..fdc6e607db 100644 --- a/kernel/src/main/scala/state/CassandraStorage.scala +++ b/kernel/src/main/scala/state/CassandraStorage.scala @@ -20,7 +20,8 @@ import org.apache.thrift.protocol._ /** * @author Jonas Bonér */ -object CassandraStorage extends MapStorage with VectorStorage with Logging { +object CassandraStorage extends MapStorage + with VectorStorage with RefStorage with Logging { val KEYSPACE = "akka" val MAP_COLUMN_PARENT = new ColumnParent("map", null) val VECTOR_COLUMN_PARENT = new ColumnParent("vector", null) @@ -84,7 +85,7 @@ object CassandraStorage extends MapStorage with VectorStorage with Logging { // For Ref // =============================================================== - def insertRefStorageFor(name: String, element: AnyRef) = if (sessions.isDefined) { + override def insertRefStorageFor(name: String, element: AnyRef) = if (sessions.isDefined) { sessions.get.withSession { _ ++| (name, new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY), @@ -94,7 +95,7 @@ object CassandraStorage extends MapStorage with VectorStorage with Logging { } } else throw new IllegalStateException("CassandraStorage is not started") - def getRefStorageFor(name: String): Option[AnyRef] = if (sessions.isDefined) { + override def getRefStorageFor(name: String): Option[AnyRef] = if (sessions.isDefined) { try { val column: Option[Column] = sessions.get.withSession { _ | (name, new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY)) diff --git a/kernel/src/main/scala/state/MongoStorage.scala b/kernel/src/main/scala/state/MongoStorage.scala index 657a6ec9fc..5b24a4698f 100644 --- a/kernel/src/main/scala/state/MongoStorage.scala +++ b/kernel/src/main/scala/state/MongoStorage.scala @@ -8,7 +8,7 @@ import kernel.Kernel.config import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList} object MongoStorage extends MapStorage - with VectorStorage with Logging { + with VectorStorage with RefStorage with Logging { // enrich with null safe findOne class RichDBCollection(value: DBCollection) { @@ -255,4 +255,27 @@ object MongoStorage extends MapStorage o.put(KEY, name) coll.findOneNS(o) } + + override def insertRefStorageFor(name: String, element: AnyRef) = { + nullSafeFindOne(name) match { + case None => + case Some(dbo) => { + val q = new BasicDBObject + q.put(KEY, name) + coll.remove(q) + } + } + coll.insert( + new BasicDBObject() + .append(KEY, name) + .append(VALUE, serializer.out(element))) + } + + override def getRefStorageFor(name: String): Option[AnyRef] = { + nullSafeFindOne(name) match { + case None => None + case Some(dbo) => + Some(serializer.in(dbo.get(VALUE).asInstanceOf[Array[Byte]], None)) + } + } } diff --git a/kernel/src/main/scala/state/State.scala b/kernel/src/main/scala/state/State.scala index 964b2309a4..6e012837d7 100644 --- a/kernel/src/main/scala/state/State.scala +++ b/kernel/src/main/scala/state/State.scala @@ -54,6 +54,7 @@ class TransactionalState { def newPersistentRef(config: PersistentStorageConfig): TransactionalRef[AnyRef] = config match { case CassandraStorageConfig() => new CassandraPersistentTransactionalRef + case MongoStorageConfig() => new MongoPersistentTransactionalRef case TerracottaStorageConfig() => throw new UnsupportedOperationException case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException } @@ -514,9 +515,11 @@ class TransactionalRef[T] extends Transactional { } } -class CassandraPersistentTransactionalRef extends TransactionalRef[AnyRef] { +abstract class TemplatePersistentTransactionalRef extends TransactionalRef[AnyRef] { + val storage: RefStorage + override def commit = if (ref.isDefined) { - CassandraStorage.insertRefStorageFor(uuid, ref.get) + storage.insertRefStorageFor(uuid, ref.get) ref = None } @@ -524,7 +527,7 @@ class CassandraPersistentTransactionalRef extends TransactionalRef[AnyRef] { override def get: Option[AnyRef] = { verifyTransaction - CassandraStorage.getRefStorageFor(uuid) + storage.getRefStorageFor(uuid) } override def isDefined: Boolean = get.isDefined @@ -535,3 +538,11 @@ class CassandraPersistentTransactionalRef extends TransactionalRef[AnyRef] { else default } } + +class CassandraPersistentTransactionalRef extends TemplatePersistentTransactionalRef { + val storage = CassandraStorage +} + +class MongoPersistentTransactionalRef extends TemplatePersistentTransactionalRef { + val storage = MongoStorage +} diff --git a/kernel/src/main/scala/state/Storage.scala b/kernel/src/main/scala/state/Storage.scala index 2d31695af5..927c4f0361 100644 --- a/kernel/src/main/scala/state/Storage.scala +++ b/kernel/src/main/scala/state/Storage.scala @@ -25,3 +25,9 @@ trait VectorStorage extends Storage { def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] def getVectorStorageSizeFor(name: String): Int } + +// for Ref +trait RefStorage extends Storage { + def insertRefStorageFor(name: String, element: AnyRef) + def getRefStorageFor(name: String): Option[AnyRef] +} diff --git a/kernel/src/test/scala/MongoStorageSpec.scala b/kernel/src/test/scala/MongoStorageSpec.scala index 5baa03baae..c80afb00a7 100644 --- a/kernel/src/test/scala/MongoStorageSpec.scala +++ b/kernel/src/test/scala/MongoStorageSpec.scala @@ -269,4 +269,38 @@ class MongoStorageSpec extends TestCase { changeSetM += "4" -> List(10, 20, 30) changeSetM } + + @Test + def testRefStorage = { + MongoStorage.getRefStorageFor("U-R1") match { + case None => + case Some(o) => fail("should be None") + } + + val m = Map("1"->1, "2"->4, "3"->9) + MongoStorage.insertRefStorageFor("U-R1", m) + MongoStorage.getRefStorageFor("U-R1") match { + case None => fail("should not be empty") + case Some(r) => { + val a = r.asInstanceOf[Map[String, Int]] + assertEquals(a.size, 3) + assertEquals(a.get("1").get, 1) + assertEquals(a.get("2").get, 4) + assertEquals(a.get("3").get, 9) + } + } + + // insert another one + // the previous one should be replaced + val b = List("100", "jonas") + MongoStorage.insertRefStorageFor("U-R1", b) + MongoStorage.getRefStorageFor("U-R1") match { + case None => fail("should not be empty") + case Some(r) => { + val a = r.asInstanceOf[List[String]] + assertEquals("100", a(0)) + assertEquals("jonas", a(1)) + } + } + } }