Ticket #343 implementation done except for pop of PersistentVector

This commit is contained in:
Debasish Ghosh 2010-09-17 16:08:44 +05:30
parent 00356a1263
commit bc2f7a9e01
5 changed files with 72 additions and 38 deletions

View file

@ -82,7 +82,6 @@ trait Storage {
*/
trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
with Transactional with Committable with Abortable with Logging {
protected val shouldClearOnCommit = Ref[Boolean]()
// operations on the Map
trait Op
@ -90,11 +89,12 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
case object PUT extends Op
case object REM extends Op
case object UPD extends Op
case object CLR extends Op
// append only log: records all mutating operations
protected val appendOnlyTxLog = TransactionalVector[LogEntry]()
case class LogEntry(key: K, value: Option[V], op: Op)
case class LogEntry(key: Option[K], value: Option[V], op: Op)
// need to override in subclasses e.g. "sameElements" for Array[Byte]
def equal(k1: K, k2: K): Boolean = k1 == k2
@ -114,7 +114,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
protected def clearDistinctKeys = keysInCurrentTx.clear
protected def filterTxLogByKey(key: K): IndexedSeq[LogEntry] =
appendOnlyTxLog filter(e => equal(e.key, key))
appendOnlyTxLog filter(e => e.key.map(equal(_, key)).getOrElse(true))
// need to get current value considering the underlying storage as well as the transaction log
protected def getCurrentValue(key: K): Option[V] = {
@ -128,7 +128,10 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
} catch { case e: Exception => None }
if (txEntries.isEmpty) underlying
else replay(txEntries, key, underlying)
else txEntries.last match {
case LogEntry(_, _, CLR) => None
case _ => replay(txEntries, key, underlying)
}
}
// replay all tx entries for key k with seed = initial
@ -140,9 +143,10 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
case Some(v) => Map((key, v))
}
txEntries.foreach {case LogEntry(k, v, o) => o match {
case PUT => m.put(k, v.get)
case REM => m -= k
case UPD => m.update(k, v.get)
case PUT => m.put(k.get, v.get)
case REM => m -= k.get
case UPD => m.update(k.get, v.get)
case CLR => Map.empty[K, V]
}}
m get key
}
@ -151,12 +155,11 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
val storage: MapStorageBackend[K, V]
def commit = {
// if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get) storage.removeMapStorageFor(uuid)
appendOnlyTxLog.foreach { case LogEntry(k, v, o) => o match {
case PUT => storage.insertMapStorageEntryFor(uuid, k, v.get)
case UPD => storage.insertMapStorageEntryFor(uuid, k, v.get)
case REM => storage.removeMapStorageFor(uuid, k)
case PUT => storage.insertMapStorageEntryFor(uuid, k.get, v.get)
case UPD => storage.insertMapStorageEntryFor(uuid, k.get, v.get)
case REM => storage.removeMapStorageFor(uuid, k.get)
case CLR => storage.removeMapStorageFor(uuid)
}}
appendOnlyTxLog.clear
@ -166,7 +169,6 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
def abort = {
appendOnlyTxLog.clear
clearDistinctKeys
shouldClearOnCommit.swap(false)
}
def -=(key: K) = {
@ -187,7 +189,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
override def put(key: K, value: V): Option[V] = {
register
val curr = getCurrentValue(key)
appendOnlyTxLog add LogEntry(key, Some(value), PUT)
appendOnlyTxLog add LogEntry(Some(key), Some(value), PUT)
addToListOfKeysInTx(key)
curr
}
@ -195,7 +197,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
override def update(key: K, value: V) = {
register
val curr = getCurrentValue(key)
appendOnlyTxLog add LogEntry(key, Some(value), UPD)
appendOnlyTxLog add LogEntry(Some(key), Some(value), UPD)
addToListOfKeysInTx(key)
curr
}
@ -203,7 +205,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
override def remove(key: K) = {
register
val curr = getCurrentValue(key)
appendOnlyTxLog add LogEntry(key, None, REM)
appendOnlyTxLog add LogEntry(Some(key), None, REM)
addToListOfKeysInTx(key)
curr
}
@ -215,9 +217,8 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
override def clear = {
register
appendOnlyTxLog.clear
appendOnlyTxLog add LogEntry(None, None, CLR)
clearDistinctKeys
shouldClearOnCommit.swap(true)
}
override def contains(key: K): Boolean = try {
@ -225,7 +226,8 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
case Seq() => // current tx doesn't use this
storage.getMapStorageEntryFor(uuid, key).isDefined // check storage
case txs => // present in log
txs.last.op != REM // last entry cannot be a REM
val lastOp = txs.last.op
lastOp != REM && lastOp != CLR // last entry cannot be a REM
}
} catch { case e: Exception => false }
@ -366,11 +368,6 @@ trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committable with Abortable {
protected val newElems = TransactionalVector[T]()
protected val updatedElems = TransactionalMap[Int, T]()
protected val removedElems = TransactionalVector[T]()
protected val shouldClearOnCommit = Ref[Boolean]()
// operations on the Vector
trait Op
case object ADD extends Op
@ -400,7 +397,6 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
def abort = {
appendOnlyTxLog.clear
shouldClearOnCommit.swap(false)
}
private def replay: List[T] = {
@ -466,14 +462,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
override def first: T = get(0)
override def last: T = {
if (newElems.length != 0) newElems.last
else {
val len = length
if (len == 0) throw new NoSuchElementException("Vector is empty")
get(len - 1)
}
}
override def last: T = replay.last
def length: Int = replay.length

View file

@ -9,7 +9,6 @@ import se.scalablesolutions.akka.persistence.common._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.Config.config
import java.util.NoSuchElementException
import com.novus.casbah.mongodb.Imports._
/**

View file

@ -238,7 +238,7 @@ class MongoTicket343Spec extends
val add = List(("a", "1"), ("b", "2"), ("c", "3"))
(proc !! CLEAR_AFTER_PUT(add)).getOrElse("CLEAR_AFTER_PUT failed") should equal(true)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(0)
proc.stop
}
}

View file

@ -359,7 +359,6 @@ private [akka] object RedisStorageBackend extends
case e: java.lang.NullPointerException =>
throw new StorageException("Could not connect to Redis server")
case e =>
e.printStackTrace
throw new StorageException("Error in Redis: " + e.getMessage)
}
}

View file

@ -32,6 +32,10 @@ case class VUPD(i: Int, v: String)
case class VUPD_AND_ABORT(i: Int, v: String)
case class VGET(i: Int)
case object VSIZE
case object VLAST
case object VFIRST
case class VLAST_AFTER_ADD(vsToAdd: List[String])
case class VFIRST_AFTER_ADD(vsToAdd: List[String])
case class VGET_AFTER_VADD(vsToAdd: List[String], isToFetch: List[Int])
case class VADD_WITH_SLICE(vsToAdd: List[String], start: Int, cnt: Int)
@ -175,6 +179,30 @@ object Storage {
fooVector.slice(Some(s), None, c)
}
self.reply(l.map(new String(_)))
case VLAST =>
val l = atomic { fooVector last }
self.reply(l)
case VFIRST =>
val l = atomic { fooVector first }
self.reply(l)
case VLAST_AFTER_ADD(vs) =>
val l =
atomic {
vs.foreach(fooVector + _.getBytes)
fooVector last
}
self.reply(l)
case VFIRST_AFTER_ADD(vs) =>
val l =
atomic {
vs.foreach(fooVector + _.getBytes)
fooVector first
}
self.reply(l)
}
}
}
@ -243,7 +271,7 @@ class RedisTicket343Spec extends
val add = List(("a", "1"), ("b", "2"), ("c", "3"))
(proc !! CLEAR_AFTER_PUT(add)).getOrElse("CLEAR_AFTER_PUT failed") should equal(true)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(0)
proc.stop
}
}
@ -344,7 +372,26 @@ class RedisTicket343Spec extends
(proc !! VADD_WITH_SLICE(List(), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("maulindu", "debasish"))
// slice with new elements added in current transaction
(proc !! VADD_WITH_SLICE(List("a", "b", "c", "d"), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("b", "a"))
(proc !! VADD_WITH_SLICE(List("a", "b", "c", "d"), 2, 4)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("b", "a", "nilanjan", "ramanendu"))
proc.stop
}
}
describe("Miscellaneous vector ops") {
it("vector slice() should not ignore elements added in current transaction") {
val proc = actorOf[RedisSampleVectorStorage]
proc.start
// add 4 elements in separate transactions
(proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
(proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
(proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
(proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
new String((proc !! VLAST).getOrElse("VLAST failed").asInstanceOf[Array[Byte]]) should equal("debasish")
new String((proc !! VFIRST).getOrElse("VFIRST failed").asInstanceOf[Array[Byte]]) should equal("nilanjan")
new String((proc !! VLAST_AFTER_ADD(List("kausik", "tarun"))).getOrElse("VLAST_AFTER_ADD failed").asInstanceOf[Array[Byte]]) should equal("debasish")
new String((proc !! VFIRST_AFTER_ADD(List("kausik", "tarun"))).getOrElse("VFIRST_AFTER_ADD failed").asInstanceOf[Array[Byte]]) should equal("tarun")
proc.stop
}
}