Moved implicit Ordering(ArraySeq[Byte]) to a new PersistentMapBinary companion object
and created an implicit Ordering(Array[Byte]) that can be used on the backends too Finished Map Backend Spec that tests proper ordering of retrieved k->v pairs and fixed Voldemort to work properly
This commit is contained in:
parent
578b9dfdfc
commit
984de304a8
4 changed files with 192 additions and 130 deletions
|
|
@ -7,6 +7,7 @@ package se.scalablesolutions.akka.persistence.common
|
|||
import se.scalablesolutions.akka.stm._
|
||||
import se.scalablesolutions.akka.stm.TransactionManagement.transaction
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import collection.mutable.ArraySeq
|
||||
|
||||
// FIXME move to 'stm' package + add message with more info
|
||||
class NoTransactionInScopeException extends RuntimeException
|
||||
|
|
@ -47,26 +48,38 @@ trait Storage {
|
|||
type ElementType
|
||||
|
||||
def newMap: PersistentMap[ElementType, ElementType]
|
||||
|
||||
def newVector: PersistentVector[ElementType]
|
||||
|
||||
def newRef: PersistentRef[ElementType]
|
||||
|
||||
def newQueue: PersistentQueue[ElementType] = // only implemented for redis
|
||||
throw new UnsupportedOperationException
|
||||
|
||||
def newSortedSet: PersistentSortedSet[ElementType] = // only implemented for redis
|
||||
throw new UnsupportedOperationException
|
||||
|
||||
def getMap(id: String): PersistentMap[ElementType, ElementType]
|
||||
|
||||
def getVector(id: String): PersistentVector[ElementType]
|
||||
|
||||
def getRef(id: String): PersistentRef[ElementType]
|
||||
|
||||
def getQueue(id: String): PersistentQueue[ElementType] = // only implemented for redis
|
||||
throw new UnsupportedOperationException
|
||||
|
||||
def getSortedSet(id: String): PersistentSortedSet[ElementType] = // only implemented for redis
|
||||
throw new UnsupportedOperationException
|
||||
|
||||
def newMap(id: String): PersistentMap[ElementType, ElementType]
|
||||
|
||||
def newVector(id: String): PersistentVector[ElementType]
|
||||
|
||||
def newRef(id: String): PersistentRef[ElementType]
|
||||
|
||||
def newQueue(id: String): PersistentQueue[ElementType] = // only implemented for redis
|
||||
throw new UnsupportedOperationException
|
||||
|
||||
def newSortedSet(id: String): PersistentSortedSet[ElementType] = // only implemented for redis
|
||||
throw new UnsupportedOperationException
|
||||
}
|
||||
|
|
@ -90,7 +103,7 @@ private[akka] object PersistentMap {
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
|
||||
with Transactional with Committable with Abortable with Logging {
|
||||
with Transactional with Committable with Abortable with Logging {
|
||||
|
||||
//Import Ops
|
||||
import PersistentMap._
|
||||
|
|
@ -118,7 +131,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
|
|||
protected def clearDistinctKeys = keysInCurrentTx.clear
|
||||
|
||||
protected def filterTxLogByKey(key: K): IndexedSeq[LogEntry] =
|
||||
appendOnlyTxLog filter(e => e.key.map(equal(_, key)).getOrElse(true))
|
||||
appendOnlyTxLog filter (e => e.key.map(equal(_, key)).getOrElse(true))
|
||||
|
||||
// need to get current value considering the underlying storage as well as the transaction log
|
||||
protected def getCurrentValue(key: K): Option[V] = {
|
||||
|
|
@ -129,7 +142,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
|
|||
// get the snapshot from the underlying store for this key
|
||||
val underlying = try {
|
||||
storage.getMapStorageEntryFor(uuid, key)
|
||||
} catch { case e: Exception => None }
|
||||
} catch {case e: Exception => None}
|
||||
|
||||
if (txEntries.isEmpty) underlying
|
||||
else txEntries.last match {
|
||||
|
|
@ -146,12 +159,14 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
|
|||
case None => Map.empty[K, V]
|
||||
case Some(v) => Map((key, v))
|
||||
}
|
||||
txEntries.foreach {case LogEntry(k, v, o) => o match {
|
||||
case PUT => m.put(k.get, v.get)
|
||||
case REM => m -= k.get
|
||||
case UPD => m.update(k.get, v.get)
|
||||
case CLR => Map.empty[K, V]
|
||||
}}
|
||||
txEntries.foreach {
|
||||
case LogEntry(k, v, o) => o match {
|
||||
case PUT => m.put(k.get, v.get)
|
||||
case REM => m -= k.get
|
||||
case UPD => m.update(k.get, v.get)
|
||||
case CLR => Map.empty[K, V]
|
||||
}
|
||||
}
|
||||
m get key
|
||||
}
|
||||
|
||||
|
|
@ -159,12 +174,14 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
|
|||
val storage: MapStorageBackend[K, V]
|
||||
|
||||
def commit = {
|
||||
appendOnlyTxLog.foreach { case LogEntry(k, v, o) => o match {
|
||||
case PUT => storage.insertMapStorageEntryFor(uuid, k.get, v.get)
|
||||
case UPD => storage.insertMapStorageEntryFor(uuid, k.get, v.get)
|
||||
case REM => storage.removeMapStorageFor(uuid, k.get)
|
||||
case CLR => storage.removeMapStorageFor(uuid)
|
||||
}}
|
||||
appendOnlyTxLog.foreach {
|
||||
case LogEntry(k, v, o) => o match {
|
||||
case PUT => storage.insertMapStorageEntryFor(uuid, k.get, v.get)
|
||||
case UPD => storage.insertMapStorageEntryFor(uuid, k.get, v.get)
|
||||
case REM => storage.removeMapStorageFor(uuid, k.get)
|
||||
case CLR => storage.removeMapStorageFor(uuid)
|
||||
}
|
||||
}
|
||||
|
||||
appendOnlyTxLog.clear
|
||||
clearDistinctKeys
|
||||
|
|
@ -180,8 +197,8 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
|
|||
this
|
||||
}
|
||||
|
||||
override def +=(kv : (K,V)) = {
|
||||
put(kv._1,kv._2)
|
||||
override def +=(kv: (K, V)) = {
|
||||
put(kv._1, kv._2)
|
||||
this
|
||||
}
|
||||
|
||||
|
|
@ -230,10 +247,10 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
|
|||
case Seq() => // current tx doesn't use this
|
||||
storage.getMapStorageEntryFor(uuid, key).isDefined // check storage
|
||||
case txs => // present in log
|
||||
val lastOp = txs.last.op
|
||||
val lastOp = txs.last.op
|
||||
lastOp != REM && lastOp != CLR // last entry cannot be a REM
|
||||
}
|
||||
} catch { case e: Exception => false }
|
||||
}
|
||||
} catch {case e: Exception => false}
|
||||
|
||||
protected def existsInStorage(key: K): Option[V] = try {
|
||||
storage.getMapStorageEntryFor(uuid, key)
|
||||
|
|
@ -243,33 +260,33 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
|
|||
|
||||
override def size: Int = try {
|
||||
// partition key set affected in current tx into those which r added & which r deleted
|
||||
val (keysAdded, keysRemoved) = keysInCurrentTx.map {
|
||||
val (keysAdded, keysRemoved) = keysInCurrentTx.map {
|
||||
case (kseq, k) => ((kseq, k), getCurrentValue(k))
|
||||
}.partition(_._2.isDefined)
|
||||
|
||||
// keys which existed in storage but removed in current tx
|
||||
val inStorageRemovedInTx =
|
||||
keysRemoved.keySet
|
||||
.map(_._2)
|
||||
.filter(k => existsInStorage(k).isDefined)
|
||||
.size
|
||||
val inStorageRemovedInTx =
|
||||
keysRemoved.keySet
|
||||
.map(_._2)
|
||||
.filter(k => existsInStorage(k).isDefined)
|
||||
.size
|
||||
|
||||
// all keys in storage
|
||||
val keysInStorage =
|
||||
storage.getMapStorageFor(uuid)
|
||||
.map { case (k, v) => toEquals(k) }
|
||||
.toSet
|
||||
val keysInStorage =
|
||||
storage.getMapStorageFor(uuid)
|
||||
.map {case (k, v) => toEquals(k)}
|
||||
.toSet
|
||||
|
||||
// (keys that existed UNION keys added ) - (keys removed)
|
||||
(keysInStorage union keysAdded.keySet.map(_._1)).size - inStorageRemovedInTx
|
||||
} catch {
|
||||
case e: Exception => 0
|
||||
} catch {
|
||||
case e: Exception => 0
|
||||
}
|
||||
|
||||
// get must consider underlying storage & current uncommitted tx log
|
||||
override def get(key: K): Option[V] = getCurrentValue(key)
|
||||
|
||||
def iterator: Iterator[Tuple2[K, V]]
|
||||
def iterator: Iterator[Tuple2[K, V]]
|
||||
|
||||
private def register = {
|
||||
if (transaction.get.isEmpty) throw new NoTransactionInScopeException
|
||||
|
|
@ -277,38 +294,50 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
|
|||
}
|
||||
}
|
||||
|
||||
object PersistentMapBinary {
|
||||
object COrdering {
|
||||
//frontend
|
||||
implicit object ArraySeqOrdering extends Ordering[ArraySeq[Byte]] {
|
||||
def compare(o1: ArraySeq[Byte], o2: ArraySeq[Byte]) =
|
||||
ArrayOrdering.compare(o1.toArray, o2.toArray)
|
||||
}
|
||||
//backend
|
||||
implicit object ArrayOrdering extends Ordering[Array[Byte]] {
|
||||
def compare(o1: Array[Byte], o2: Array[Byte]) =
|
||||
new String(o1) compare new String(o2)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] {
|
||||
import scala.collection.mutable.ArraySeq
|
||||
|
||||
type T = ArraySeq[Byte]
|
||||
|
||||
def toEquals(k: Array[Byte]) = ArraySeq(k: _*)
|
||||
|
||||
override def equal(k1: Array[Byte], k2: Array[Byte]): Boolean = k1 sameElements k2
|
||||
|
||||
object COrdering {
|
||||
implicit object ArraySeqOrdering extends Ordering[ArraySeq[Byte]] {
|
||||
def compare(o1: ArraySeq[Byte], o2: ArraySeq[Byte]) =
|
||||
new String(o1.toArray) compare new String(o2.toArray)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
import scala.collection.immutable.{TreeMap, SortedMap}
|
||||
private def replayAllKeys: SortedMap[ArraySeq[Byte], Array[Byte]] = {
|
||||
import COrdering._
|
||||
import PersistentMapBinary.COrdering._
|
||||
|
||||
// need ArraySeq for ordering
|
||||
val fromStorage =
|
||||
TreeMap(storage.getMapStorageFor(uuid).map { case (k, v) => (ArraySeq(k: _*), v) }: _*)
|
||||
val fromStorage =
|
||||
TreeMap(storage.getMapStorageFor(uuid).map {case (k, v) => (ArraySeq(k: _*), v)}: _*)
|
||||
|
||||
val (keysAdded, keysRemoved) = keysInCurrentTx.map {
|
||||
val (keysAdded, keysRemoved) = keysInCurrentTx.map {
|
||||
case (_, k) => (k, getCurrentValue(k))
|
||||
}.partition(_._2.isDefined)
|
||||
|
||||
val inStorageRemovedInTx =
|
||||
keysRemoved.keySet
|
||||
.filter(k => existsInStorage(k).isDefined)
|
||||
.map(k => ArraySeq(k: _*))
|
||||
val inStorageRemovedInTx =
|
||||
keysRemoved.keySet
|
||||
.filter(k => existsInStorage(k).isDefined)
|
||||
.map(k => ArraySeq(k: _*))
|
||||
|
||||
(fromStorage -- inStorageRemovedInTx) ++ keysAdded.map { case (k, v) => (ArraySeq(k: _*), v.get) }
|
||||
(fromStorage -- inStorageRemovedInTx) ++ keysAdded.map {case (k, v) => (ArraySeq(k: _*), v.get)}
|
||||
}
|
||||
|
||||
override def slice(start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = try {
|
||||
|
|
@ -317,51 +346,53 @@ trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] {
|
|||
if (newMap isEmpty) List[(Array[Byte], Array[Byte])]()
|
||||
|
||||
val startKey =
|
||||
start match {
|
||||
case Some(bytes) => Some(ArraySeq(bytes: _*))
|
||||
case None => None
|
||||
}
|
||||
start match {
|
||||
case Some(bytes) => Some(ArraySeq(bytes: _*))
|
||||
case None => None
|
||||
}
|
||||
|
||||
val endKey =
|
||||
finish match {
|
||||
case Some(bytes) => Some(ArraySeq(bytes: _*))
|
||||
case None => None
|
||||
}
|
||||
finish match {
|
||||
case Some(bytes) => Some(ArraySeq(bytes: _*))
|
||||
case None => None
|
||||
}
|
||||
|
||||
((startKey, endKey, count): @unchecked) match {
|
||||
case ((Some(s), Some(e), _)) =>
|
||||
newMap.range(s, e)
|
||||
.toList
|
||||
.map(e => (e._1.toArray, e._2))
|
||||
.toList
|
||||
.toList
|
||||
.map(e => (e._1.toArray, e._2))
|
||||
.toList
|
||||
case ((Some(s), None, c)) if c > 0 =>
|
||||
newMap.from(s)
|
||||
.iterator
|
||||
.take(count)
|
||||
.map(e => (e._1.toArray, e._2))
|
||||
.toList
|
||||
.iterator
|
||||
.take(count)
|
||||
.map(e => (e._1.toArray, e._2))
|
||||
.toList
|
||||
case ((Some(s), None, _)) =>
|
||||
newMap.from(s)
|
||||
.toList
|
||||
.map(e => (e._1.toArray, e._2))
|
||||
.toList
|
||||
.toList
|
||||
.map(e => (e._1.toArray, e._2))
|
||||
.toList
|
||||
case ((None, Some(e), _)) =>
|
||||
newMap.until(e)
|
||||
.toList
|
||||
.map(e => (e._1.toArray, e._2))
|
||||
.toList
|
||||
.toList
|
||||
.map(e => (e._1.toArray, e._2))
|
||||
.toList
|
||||
}
|
||||
} catch { case e: Exception => Nil }
|
||||
} catch {case e: Exception => Nil}
|
||||
|
||||
override def iterator: Iterator[(Array[Byte], Array[Byte])] = {
|
||||
override def iterator: Iterator[(Array[Byte], Array[Byte])] = {
|
||||
new Iterator[(Array[Byte], Array[Byte])] {
|
||||
private var elements = replayAllKeys
|
||||
|
||||
override def next: (Array[Byte], Array[Byte]) = synchronized {
|
||||
val (k, v) = elements.head
|
||||
elements = elements.tail
|
||||
(k.toArray, v)
|
||||
}
|
||||
override def hasNext: Boolean = synchronized { !elements.isEmpty }
|
||||
|
||||
override def hasNext: Boolean = synchronized {!elements.isEmpty}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -394,7 +425,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
|
|||
val storage: VectorStorageBackend[T]
|
||||
|
||||
def commit = {
|
||||
for(entry <- appendOnlyTxLog) {
|
||||
for (entry <- appendOnlyTxLog) {
|
||||
(entry: @unchecked) match {
|
||||
case LogEntry(_, Some(v), ADD) => storage.insertVectorStorageEntryFor(uuid, v)
|
||||
case LogEntry(Some(i), Some(v), UPD) => storage.updateVectorStorageEntryFor(uuid, i, v)
|
||||
|
|
@ -412,7 +443,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
|
|||
import scala.collection.mutable.ArrayBuffer
|
||||
var elemsStorage = ArrayBuffer(storage.getVectorStorageRangeFor(uuid, None, None, storage.getVectorStorageSizeFor(uuid)).reverse: _*)
|
||||
|
||||
for(entry <- appendOnlyTxLog) {
|
||||
for (entry <- appendOnlyTxLog) {
|
||||
(entry: @unchecked) match {
|
||||
case LogEntry(_, Some(v), ADD) => elemsStorage += v
|
||||
case LogEntry(Some(i), Some(v), UPD) => elemsStorage.update(i, v)
|
||||
|
|
@ -446,11 +477,11 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
|
|||
val curr = replay
|
||||
val s = if (start.isDefined) start.get else 0
|
||||
val cnt =
|
||||
if (finish.isDefined) {
|
||||
val f = finish.get
|
||||
if (f >= s) (f - s) else count
|
||||
}
|
||||
else count
|
||||
if (finish.isDefined) {
|
||||
val f = finish.get
|
||||
if (f >= s) (f - s) else count
|
||||
}
|
||||
else count
|
||||
if (s == 0 && cnt == 0) List().toIndexedSeq
|
||||
else curr.slice(s, s + cnt).toIndexedSeq
|
||||
}
|
||||
|
|
@ -519,12 +550,12 @@ trait PersistentRef[T] extends Transactional with Committable with Abortable {
|
|||
}
|
||||
}
|
||||
|
||||
private[akka] object PersistentQueue {
|
||||
//Operations for PersistentQueue
|
||||
sealed trait QueueOp
|
||||
case object ENQ extends QueueOp
|
||||
case object DEQ extends QueueOp
|
||||
}
|
||||
private[akka] object PersistentQueue {
|
||||
//Operations for PersistentQueue
|
||||
sealed trait QueueOp
|
||||
case object ENQ extends QueueOp
|
||||
case object DEQ extends QueueOp
|
||||
}
|
||||
|
||||
/**
|
||||
* Implementation of <tt>PersistentQueue</tt> for every concrete
|
||||
|
|
@ -552,7 +583,7 @@ trait PersistentRef[T] extends Transactional with Committable with Abortable {
|
|||
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||
*/
|
||||
trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
|
||||
with Transactional with Committable with Abortable with Logging {
|
||||
with Transactional with Committable with Abortable with Logging {
|
||||
|
||||
//Import Ops
|
||||
import PersistentQueue._
|
||||
|
|
@ -575,11 +606,12 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
|
|||
val storage: QueueStorageBackend[A]
|
||||
|
||||
def commit = {
|
||||
enqueuedNDequeuedEntries.toList.foreach { e =>
|
||||
e._2 match {
|
||||
case ENQ => storage.enqueue(uuid, e._1.get)
|
||||
case DEQ => storage.dequeue(uuid)
|
||||
}
|
||||
enqueuedNDequeuedEntries.toList.foreach {
|
||||
e =>
|
||||
e._2 match {
|
||||
case ENQ => storage.enqueue(uuid, e._1.get)
|
||||
case DEQ => storage.dequeue(uuid)
|
||||
}
|
||||
}
|
||||
if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get) {
|
||||
storage.remove(uuid)
|
||||
|
|
@ -635,7 +667,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
|
|||
|
||||
override def size: Int = try {
|
||||
storage.size(uuid) + localQ.get.length
|
||||
} catch { case e: Exception => 0 }
|
||||
} catch {case e: Exception => 0}
|
||||
|
||||
override def isEmpty: Boolean =
|
||||
size == 0
|
||||
|
|
@ -644,10 +676,12 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
|
|||
enqueue(elem)
|
||||
this
|
||||
}
|
||||
|
||||
def ++=(elems: Iterator[A]) = {
|
||||
enqueue(elems.toList: _*)
|
||||
this
|
||||
}
|
||||
|
||||
def ++=(elems: Iterable[A]): Unit = this ++= elems.iterator
|
||||
|
||||
override def dequeueFirst(p: A => Boolean): Option[A] =
|
||||
|
|
@ -670,24 +704,24 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
|
|||
* <p/>
|
||||
* zscore can be implemented in a variety of ways by the calling class:
|
||||
* <pre>
|
||||
* trait ZScorable {
|
||||
* trait ZScorable {
|
||||
* def toZScore: Float
|
||||
* }
|
||||
*
|
||||
* class Foo extends ZScorable {
|
||||
* class Foo extends ZScorable {
|
||||
* //.. implemnetation
|
||||
* }
|
||||
* </pre>
|
||||
* Or we can also use views:
|
||||
* <pre>
|
||||
* class Foo {
|
||||
* class Foo {
|
||||
* //..
|
||||
* }
|
||||
*
|
||||
* implicit def Foo2Scorable(foo: Foo): ZScorable = new ZScorable {
|
||||
* def toZScore = {
|
||||
* implicit def Foo2Scorable(foo: Foo): ZScorable = new ZScorable {
|
||||
* def toZScore = {
|
||||
* //..
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
|
|
@ -696,7 +730,6 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
|
|||
* @author <a href="http://debasishg.blogspot.com"</a>
|
||||
*/
|
||||
trait PersistentSortedSet[A] extends Transactional with Committable with Abortable {
|
||||
|
||||
protected val newElems = TransactionalMap[A, Float]()
|
||||
protected val removedElems = TransactionalVector[A]()
|
||||
|
||||
|
|
@ -729,8 +762,8 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab
|
|||
}
|
||||
|
||||
private def inStorage(elem: A): Option[Float] = storage.zscore(uuid, elem) match {
|
||||
case Some(s) => Some(s.toFloat)
|
||||
case None => None
|
||||
case Some(s) => Some(s.toFloat)
|
||||
case None => None
|
||||
}
|
||||
|
||||
def contains(elem: A): Boolean = {
|
||||
|
|
@ -758,8 +791,8 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab
|
|||
def compare(that: (A, Float)) = x._2 compare that._2
|
||||
}
|
||||
|
||||
implicit def ordering = new scala.math.Ordering[(A,Float)] {
|
||||
def compare(x: (A, Float),y : (A,Float)) = x._2 compare y._2
|
||||
implicit def ordering = new scala.math.Ordering[(A, Float)] {
|
||||
def compare(x: (A, Float), y: (A, Float)) = x._2 compare y._2
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -773,9 +806,9 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab
|
|||
// -1 means the last element, -2 means the second last
|
||||
val s = if (start < 0) start + l else start
|
||||
val e =
|
||||
if (end < 0) end + l
|
||||
else if (end >= l) (l - 1)
|
||||
else end
|
||||
if (end < 0) end + l
|
||||
else if (end >= l) (l - 1)
|
||||
else end
|
||||
// slice is open at the end, we need a closed end range
|
||||
ts.iterator.slice(s, e + 1).toList
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,7 +8,9 @@ import org.scalatest.matchers.ShouldMatchers
|
|||
import se.scalablesolutions.akka.util.Logging
|
||||
import org.scalatest.{BeforeAndAfterEach, Spec}
|
||||
import scala.util.Random
|
||||
import collection.immutable.{HashMap, HashSet}
|
||||
import collection.immutable.{TreeMap, HashMap, HashSet}
|
||||
import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._
|
||||
|
||||
|
||||
/**
|
||||
* Implementation Compatibility test for PersistentMap backend implementations.
|
||||
|
|
@ -106,38 +108,44 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter
|
|||
|
||||
|
||||
|
||||
it("should return all the key value pairs in the map (in the correct order?) when getMapStorageFor(name) is called") {
|
||||
it("should return all the key value pairs in the map in the correct order when getMapStorageFor(name) is called") {
|
||||
val mapName = "allTest"
|
||||
val rand = new Random(3).nextInt(100)
|
||||
val entries = (1 to rand).toList.map {
|
||||
var entries = new TreeMap[Array[Byte], Array[Byte]]()(ArrayOrdering)
|
||||
(1 to rand).foreach {
|
||||
index =>
|
||||
(("allTestKey" + index).getBytes -> ("allTestValue" + index).getBytes)
|
||||
entries += (("allTestKey" + index).getBytes -> ("allTestValue" + index).getBytes)
|
||||
}
|
||||
|
||||
storage.insertMapStorageEntriesFor(mapName, entries)
|
||||
storage.insertMapStorageEntriesFor(mapName, entries.toList)
|
||||
val retrieved = storage.getMapStorageFor(mapName)
|
||||
retrieved.size should be(rand)
|
||||
entries.size should be(rand)
|
||||
|
||||
|
||||
|
||||
val entryMap = new HashMap[String, String] ++ entries.map {_ match {case (k, v) => (new String(k), new String(v))}}
|
||||
val retrievedMap = new HashMap[String, String] ++ entries.map {_ match {case (k, v) => (new String(k), new String(v))}}
|
||||
|
||||
entryMap should equal(retrievedMap)
|
||||
//Should the ordering of key-vals returned be enforced?
|
||||
//ordered by key?
|
||||
//using what comaparison?
|
||||
|
||||
(0 until rand).foreach {
|
||||
i: Int => {
|
||||
new String(entries.toList(i)._1) should be(new String(retrieved(i)._1))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
it("should return all the key->value pairs that exist in the map that are between start and end, up to count pairs when getMapStorageRangeFor is called") {
|
||||
//implement if this method will be used
|
||||
}
|
||||
|
||||
it("should not throw an exception when size is called on a non existent map?") {
|
||||
storage.getMapStorageSizeFor("nonExistent") should be(0)
|
||||
}
|
||||
|
||||
it("should behave properly when getMapStorageRange is called?") {
|
||||
//No current code calls getMapStorageRangeFor
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -17,9 +17,10 @@ import voldemort.versioning.Versioned
|
|||
import collection.JavaConversions
|
||||
import java.nio.ByteBuffer
|
||||
import collection.Map
|
||||
import collection.immutable.{IndexedSeq, SortedSet, TreeSet, HashMap}
|
||||
import collection.mutable.{Set, HashSet, ArrayBuffer}
|
||||
import java.util.{Properties, Map => JMap}
|
||||
import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._
|
||||
import collection.immutable._
|
||||
|
||||
/*
|
||||
RequiredReads + RequiredWrites should be > ReplicationFactor for all Voldemort Stores
|
||||
|
|
@ -54,11 +55,8 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
val vectorSizeIndex = getIndexedBytes(-1)
|
||||
val queueHeadIndex = getIndexedBytes(-1)
|
||||
val queueTailIndex = getIndexedBytes(-2)
|
||||
|
||||
|
||||
implicit val byteOrder = new Ordering[Array[Byte]] {
|
||||
override def compare(x: Array[Byte], y: Array[Byte]) = ByteUtils.compare(x, y)
|
||||
}
|
||||
//explicit implicit :)
|
||||
implicit val ordering = ArrayOrdering
|
||||
|
||||
|
||||
def getRefStorageFor(name: String): Option[Array[Byte]] = {
|
||||
|
|
@ -90,17 +88,17 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
mapKey => getKey(name, mapKey)
|
||||
}))
|
||||
|
||||
val buf = new ArrayBuffer[(Array[Byte], Array[Byte])](all.size)
|
||||
var returned = new TreeMap[Array[Byte], Array[Byte]]()(ordering)
|
||||
JavaConversions.asMap(all).foreach {
|
||||
(entry) => {
|
||||
entry match {
|
||||
case (key: Array[Byte], versioned: Versioned[Array[Byte]]) => {
|
||||
buf += key -> versioned.getValue
|
||||
case (namePlusKey: Array[Byte], versioned: Versioned[Array[Byte]]) => {
|
||||
returned += getMapKeyFromKey(name, namePlusKey) -> versioned.getValue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
buf.toList
|
||||
returned.toList
|
||||
}
|
||||
|
||||
def getMapStorageSizeFor(name: String): Int = {
|
||||
|
|
@ -263,7 +261,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
try {
|
||||
queueClient.delete(key)
|
||||
} catch {
|
||||
//a failure to delete is ok, just leaves a K-V in Voldemort that will be overwritten if the queue ever wraps around
|
||||
//a failure to delete is ok, just leaves a K-V in Voldemort that will be overwritten if the queue ever wraps around
|
||||
case e: Exception => log.warn(e, "caught an exception while deleting a dequeued element, however this will not cause any inconsistency in the queue")
|
||||
}
|
||||
}
|
||||
|
|
@ -332,6 +330,13 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
IntSerializer.fromBytes(indexBytes)
|
||||
}
|
||||
|
||||
def getMapKeyFromKey(owner: String, key: Array[Byte]): Array[Byte] = {
|
||||
val mapKeyLength = key.length - IntSerializer.bytesPerInt - owner.getBytes("UTF-8").length
|
||||
val mapkey = new Array[Byte](mapKeyLength)
|
||||
System.arraycopy(key, key.length - mapKeyLength, mapkey, 0, mapKeyLength)
|
||||
mapkey
|
||||
}
|
||||
|
||||
|
||||
def getClientConfig(configMap: Map[String, String]): Properties = {
|
||||
val properites = new Properties
|
||||
|
|
@ -450,6 +455,8 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
}
|
||||
|
||||
def fromBytes(bytes: Array[Byte]): SortedSet[Array[Byte]] = {
|
||||
import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._
|
||||
|
||||
var set = new TreeSet[Array[Byte]]
|
||||
if (bytes.length > IntSerializer.bytesPerInt) {
|
||||
var pos = 0
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ import org.scalatest.matchers.ShouldMatchers
|
|||
import org.junit.runner.RunWith
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import se.scalablesolutions.akka.persistence.voldemort.VoldemortStorageBackend._
|
||||
import se.scalablesolutions.akka.actor.{newUuid,Uuid}
|
||||
import se.scalablesolutions.akka.actor.{newUuid, Uuid}
|
||||
import collection.immutable.TreeSet
|
||||
import VoldemortStorageBackendSuite._
|
||||
|
||||
|
|
@ -84,4 +84,18 @@ class VoldemortPersistentDatastructureSuite extends FunSuite with ShouldMatchers
|
|||
|
||||
}
|
||||
|
||||
test("Persistent Maps work as expected") {
|
||||
atomic {
|
||||
val map = VoldemortStorage.getMap("map")
|
||||
map.put("mapTest".getBytes, null)
|
||||
}
|
||||
|
||||
atomic {
|
||||
val map = VoldemortStorage.getMap("map")
|
||||
map.get("mapTest".getBytes).get should be(null)
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue