Merge branch 'master' of git@github.com:jboner/akka

Conflicts:
	akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
This commit is contained in:
Jonas Bonér 2010-10-06 12:12:45 +02:00
commit 1000db8708
21 changed files with 1303 additions and 298 deletions

View file

@ -444,7 +444,6 @@ trait Actor extends Logging {
*/
def become(behavior: Option[Receive]) {
self.hotswap = behavior
self.checkReceiveTimeout // FIXME : how to reschedule receivetimeout on hotswap?
}
/** Akka Java API

View file

@ -626,16 +626,16 @@ trait ActorRef extends ActorRefShared with TransactionManagement with Logging wi
override def equals(that: Any): Boolean = {
that.isInstanceOf[ActorRef] &&
that.asInstanceOf[ActorRef].uuid == uuid
that.asInstanceOf[ActorRef].uuid == uuid
}
override def toString = "Actor[" + id + ":" + uuid + "]"
protected[akka] def checkReceiveTimeout = {
cancelReceiveTimeout
receiveTimeout.foreach { time =>
if (receiveTimeout.isDefined && dispatcher.mailboxSize(this) <= 0) { //Only reschedule if desired and there are currently no more messages to be processed
log.debug("Scheduling timeout for %s", this)
_futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, time, TimeUnit.MILLISECONDS))
_futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, receiveTimeout.get, TimeUnit.MILLISECONDS))
}
}
@ -709,7 +709,6 @@ class LocalActorRef private[akka] (
actorSelfFields._1.set(actor, this)
actorSelfFields._2.set(actor, Some(this))
start
checkReceiveTimeout
ActorRegistry.register(this)
}
@ -813,7 +812,10 @@ class LocalActorRef private[akka] (
_transactionFactory = Some(TransactionFactory(_transactionConfig, id))
}
_status = ActorRefStatus.RUNNING
if (!isInInitialization) initializeActorInstance
checkReceiveTimeout //Schedule the initial Receive timeout
}
this
}
@ -1230,6 +1232,7 @@ class LocalActorRef private[akka] (
finally {
clearTransaction
if (topLevelTransaction) clearTransactionSet
checkReceiveTimeout // Reschedule receive timeout
}
}
@ -1309,9 +1312,7 @@ class LocalActorRef private[akka] (
actor.preStart // run actor preStart
Actor.log.trace("[%s] has started", toString)
ActorRegistry.register(this)
if (id == "N/A") id = actorClass.getName // if no name set, then use default name (class name)
clearTransactionSet // clear transaction set that might have been created if atomic block has been used within the Actor constructor body
checkReceiveTimeout
}
/*

View file

@ -311,60 +311,83 @@ object ActorRegistry extends ListenerManagement {
}
}
/**
* An implementation of a ConcurrentMultiMap
* Adds/remove is serialized over the specified key
* Reads are fully concurrent <-- el-cheapo
*
* @author Viktor Klang
*/
class Index[K <: AnyRef,V <: AnyRef : Manifest] {
import scala.collection.JavaConversions._
private val Naught = Array[V]() //Nil for Arrays
private val container = new ConcurrentHashMap[K, JSet[V]]
private val emptySet = new ConcurrentSkipListSet[V]
def put(key: K, value: V) {
//Returns whether it needs to be retried or not
def tryPut(set: JSet[V], v: V): Boolean = {
set.synchronized {
if (set.isEmpty) true //IF the set is empty then it has been removed, so signal retry
else { //Else add the value to the set and signal that retry is not needed
set add v
false
}
}
}
@tailrec def syncPut(k: K, v: V): Boolean = {
/**
* Associates the value of type V with the key of type K
* @returns true if the value didn't exist for the key previously, and false otherwise
*/
def put(key: K, value: V): Boolean = {
//Tailrecursive spin-locking put
@tailrec def spinPut(k: K, v: V): Boolean = {
var retry = false
var added = false
val set = container get k
if (set ne null) retry = tryPut(set,v)
if (set ne null) {
set.synchronized {
if (set.isEmpty) {
retry = true //IF the set is empty then it has been removed, so signal retry
}
else { //Else add the value to the set and signal that retry is not needed
added = set add v
retry = false
}
}
}
else {
val newSet = new ConcurrentSkipListSet[V]
newSet add v
// Parry for two simultaneous putIfAbsent(id,newSet)
val oldSet = container.putIfAbsent(k,newSet)
if (oldSet ne null)
retry = tryPut(oldSet,v)
if (oldSet ne null) {
oldSet.synchronized {
if (oldSet.isEmpty) {
retry = true //IF the set is empty then it has been removed, so signal retry
}
else { //Else try to add the value to the set and signal that retry is not needed
added = oldSet add v
retry = false
}
}
} else {
added = true
}
}
if (retry) syncPut(k,v)
else true
if (retry) spinPut(k,v)
else added
}
syncPut(key,value)
spinPut(key,value)
}
def values(key: K) = {
/**
* @returns a _new_ array of all existing values for the given key at the time of the call
*/
def values(key: K): Array[V] = {
val set: JSet[V] = container get key
if (set ne null) set toArray Naught
else Naught
}
def foreach(key: K)(fun: (V) => Unit) {
val set = container get key
if (set ne null)
set foreach fun
val result = if (set ne null) set toArray Naught else Naught
result.asInstanceOf[Array[V]]
}
/**
* @returns Some(value) for the first matching value where the supplied function returns true for the given key,
* if no matches it returns None
*/
def findValue(key: K)(f: (V) => Boolean): Option[V] = {
import scala.collection.JavaConversions._
val set = container get key
if (set ne null)
set.iterator.find(f)
@ -372,23 +395,43 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
None
}
/**
* Applies the supplied function to all keys and their values
*/
def foreach(fun: (K,V) => Unit) {
import scala.collection.JavaConversions._
container.entrySet foreach {
(e) => e.getValue.foreach(fun(e.getKey,_))
}
}
def remove(key: K, value: V) {
/**
* Disassociates the value of type V from the key of type K
* @returns true if the value was disassociated from the key and false if it wasn't previously associated with the key
*/
def remove(key: K, value: V): Boolean = {
val set = container get key
if (set ne null) {
set.synchronized {
if (set.remove(value)) { //If we can remove the value
if (set.isEmpty) //and the set becomes empty
container.remove(key,emptySet) //We try to remove the key if it's mapped to an empty set
true //Remove succeeded
}
else false //Remove failed
}
}
} else false //Remove failed
}
def clear = { foreach(remove _) }
/**
* @returns true if the underlying containers is empty, may report false negatives when the last remove is underway
*/
def isEmpty: Boolean = container.isEmpty
/**
* Removes all keys and all values
*/
def clear = foreach { case (k,v) => remove(k,v) }
}

View file

@ -6,6 +6,7 @@ import org.junit.Test
import java.util.concurrent.TimeUnit
import org.multiverse.api.latches.StandardLatch
import Actor._
import java.util.concurrent.atomic.AtomicInteger
class ReceiveTimeoutSpec extends JUnitSuite {
@ -22,6 +23,7 @@ class ReceiveTimeoutSpec extends JUnitSuite {
}).start
assert(timeoutLatch.tryAwait(3, TimeUnit.SECONDS))
timeoutActor.stop
}
@Test def swappedReceiveShouldAlsoGetTimout = {
@ -44,9 +46,10 @@ class ReceiveTimeoutSpec extends JUnitSuite {
})
assert(swappedLatch.tryAwait(3, TimeUnit.SECONDS))
timeoutActor.stop
}
@Test def timeoutShouldBeCancelledAfterRegularReceive = {
@Test def timeoutShouldBeRescheduledAfterRegularReceive = {
val timeoutLatch = new StandardLatch
case object Tick
@ -60,7 +63,30 @@ class ReceiveTimeoutSpec extends JUnitSuite {
}).start
timeoutActor ! Tick
assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == false)
assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == true)
timeoutActor.stop
}
@Test def timeoutShouldBeTurnedOffIfDesired = {
val count = new AtomicInteger(0)
val timeoutLatch = new StandardLatch
case object Tick
val timeoutActor = actorOf(new Actor {
self.receiveTimeout = Some(500L)
protected def receive = {
case Tick => ()
case ReceiveTimeout =>
timeoutLatch.open
count.incrementAndGet
self.receiveTimeout = None
}
}).start
timeoutActor ! Tick
assert(timeoutLatch.tryAwait(2, TimeUnit.SECONDS) == true)
assert(count.get === 1)
timeoutActor.stop
}
@Test def timeoutShouldNotBeSentWhenNotSpecified = {
@ -73,5 +99,6 @@ class ReceiveTimeoutSpec extends JUnitSuite {
}).start
assert(timeoutLatch.tryAwait(1, TimeUnit.SECONDS) == false)
timeoutActor.stop
}
}

View file

@ -42,32 +42,30 @@ class AtmosphereRestServlet extends ServletContainer with AtmosphereServletProce
* <p/>
* Used by the Akka Kernel to bootstrap REST and Comet.
*/
class AkkaServlet extends AtmosphereServlet with Logging {
class AkkaServlet extends AtmosphereServlet {
import se.scalablesolutions.akka.config.Config.{config => c}
/*
* Configure Atmosphere and Jersey (default, fall-back values)
*/
addInitParameter(AtmosphereServlet.DISABLE_ONSTATE_EVENT,"true")
addInitParameter(AtmosphereServlet.BROADCASTER_CLASS,classOf[AkkaBroadcaster].getName)
addInitParameter(AtmosphereServlet.PROPERTY_USE_STREAM,"true")
addInitParameter("com.sun.jersey.config.property.packages",c.getList("akka.rest.resource_packages").mkString(";"))
addInitParameter("com.sun.jersey.spi.container.ResourceFilters",c.getList("akka.rest.filters").mkString(","))
c.getInt("akka.rest.maxInactiveActivity") foreach { value =>
log.info("MAX_INACTIVE:%s",value.toString)
addInitParameter(CometSupport.MAX_INACTIVE,value.toString)
}
c.getInt("akka.rest.maxInactiveActivity") foreach { value => addInitParameter(CometSupport.MAX_INACTIVE,value.toString) }
c.getString("akka.rest.cometSupport") foreach { value => addInitParameter("cometSupport",value) }
c.getString("akka.rest.cometSupport") foreach { value =>
addInitParameter("cometSupport",value)
}
val servlet = new AtmosphereRestServlet {
override def getInitParameter(key : String) = AkkaServlet.this.getInitParameter(key)
override def getInitParameterNames() = AkkaServlet.this.getInitParameterNames()
}
override def getInitParameter(key : String) = Option(super.getInitParameter(key)).getOrElse(initParams.get(key))
/*
* Provide a fallback for default values
*/
override def getInitParameter(key : String) =
Option(super.getInitParameter(key)).getOrElse(initParams get key)
/*
* Provide a fallback for default values
*/
override def getInitParameterNames() = {
import scala.collection.JavaConversions._
initParams.keySet.iterator ++ super.getInitParameterNames
@ -80,24 +78,24 @@ class AkkaServlet extends AtmosphereServlet with Logging {
override def loadConfiguration(sc: ServletConfig) {
config.setSupportSession(false)
isBroadcasterSpecified = true
//The bridge between Atmosphere and Jersey
val servlet = new AtmosphereRestServlet {
//These are needed to make sure that Jersey is reading the config from the outer servlet
override def getInitParameter(key : String) = AkkaServlet.this.getInitParameter(key)
override def getInitParameterNames() = AkkaServlet.this.getInitParameterNames()
}
addAtmosphereHandler("/*", servlet, new AkkaBroadcaster)
}
/**
* This method is overridden because Akka Kernel is bundles with Grizzly, so if we deploy the Kernel in another container,
* we need to handle that.
*/
override def createCometSupportResolver() : CometSupportResolver = {
import scala.collection.JavaConversions._
override lazy val createCometSupportResolver: CometSupportResolver = new DefaultCometSupportResolver(config) {
import scala.collection.JavaConversions._
new DefaultCometSupportResolver(config) {
type CS = CometSupport[_ <: AtmosphereResource[_,_]]
lazy val desiredCometSupport =
Option(AkkaServlet.this.getInitParameter("cometSupport")) filter testClassExists map newCometSupport
override def resolve(useNativeIfPossible : Boolean, useBlockingAsDefault : Boolean) : CS = {
val predef = config.getInitParameter("cometSupport")
if (testClassExists(predef)) newCometSupport(predef)
else super.resolve(useNativeIfPossible, useBlockingAsDefault)
}
}
override def resolve(useNativeIfPossible : Boolean, useBlockingAsDefault : Boolean) : CometSupport[_ <: AtmosphereResource[_,_]] =
desiredCometSupport.getOrElse(super.resolve(useNativeIfPossible, useBlockingAsDefault))
}
}

View file

@ -7,6 +7,7 @@ package se.scalablesolutions.akka.persistence.common
import se.scalablesolutions.akka.stm._
import se.scalablesolutions.akka.stm.TransactionManagement.transaction
import se.scalablesolutions.akka.util.Logging
import collection.mutable.ArraySeq
// FIXME move to 'stm' package + add message with more info
class NoTransactionInScopeException extends RuntimeException
@ -47,26 +48,38 @@ trait Storage {
type ElementType
def newMap: PersistentMap[ElementType, ElementType]
def newVector: PersistentVector[ElementType]
def newRef: PersistentRef[ElementType]
def newQueue: PersistentQueue[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
def newSortedSet: PersistentSortedSet[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
def getMap(id: String): PersistentMap[ElementType, ElementType]
def getVector(id: String): PersistentVector[ElementType]
def getRef(id: String): PersistentRef[ElementType]
def getQueue(id: String): PersistentQueue[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
def getSortedSet(id: String): PersistentSortedSet[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
def newMap(id: String): PersistentMap[ElementType, ElementType]
def newVector(id: String): PersistentVector[ElementType]
def newRef(id: String): PersistentRef[ElementType]
def newQueue(id: String): PersistentQueue[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
def newSortedSet(id: String): PersistentSortedSet[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
}
@ -90,7 +103,7 @@ private[akka] object PersistentMap {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
with Transactional with Committable with Abortable with Logging {
with Transactional with Committable with Abortable with Logging {
//Import Ops
import PersistentMap._
@ -118,7 +131,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
protected def clearDistinctKeys = keysInCurrentTx.clear
protected def filterTxLogByKey(key: K): IndexedSeq[LogEntry] =
appendOnlyTxLog filter(e => e.key.map(equal(_, key)).getOrElse(true))
appendOnlyTxLog filter (e => e.key.map(equal(_, key)).getOrElse(true))
// need to get current value considering the underlying storage as well as the transaction log
protected def getCurrentValue(key: K): Option[V] = {
@ -129,7 +142,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
// get the snapshot from the underlying store for this key
val underlying = try {
storage.getMapStorageEntryFor(uuid, key)
} catch { case e: Exception => None }
} catch {case e: Exception => None}
if (txEntries.isEmpty) underlying
else txEntries.last match {
@ -146,12 +159,14 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
case None => Map.empty[K, V]
case Some(v) => Map((key, v))
}
txEntries.foreach {case LogEntry(k, v, o) => o match {
case PUT => m.put(k.get, v.get)
case REM => m -= k.get
case UPD => m.update(k.get, v.get)
case CLR => Map.empty[K, V]
}}
txEntries.foreach {
case LogEntry(k, v, o) => o match {
case PUT => m.put(k.get, v.get)
case REM => m -= k.get
case UPD => m.update(k.get, v.get)
case CLR => Map.empty[K, V]
}
}
m get key
}
@ -159,12 +174,14 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
val storage: MapStorageBackend[K, V]
def commit = {
appendOnlyTxLog.foreach { case LogEntry(k, v, o) => o match {
case PUT => storage.insertMapStorageEntryFor(uuid, k.get, v.get)
case UPD => storage.insertMapStorageEntryFor(uuid, k.get, v.get)
case REM => storage.removeMapStorageFor(uuid, k.get)
case CLR => storage.removeMapStorageFor(uuid)
}}
appendOnlyTxLog.foreach {
case LogEntry(k, v, o) => o match {
case PUT => storage.insertMapStorageEntryFor(uuid, k.get, v.get)
case UPD => storage.insertMapStorageEntryFor(uuid, k.get, v.get)
case REM => storage.removeMapStorageFor(uuid, k.get)
case CLR => storage.removeMapStorageFor(uuid)
}
}
appendOnlyTxLog.clear
clearDistinctKeys
@ -180,8 +197,8 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
this
}
override def +=(kv : (K,V)) = {
put(kv._1,kv._2)
override def +=(kv: (K, V)) = {
put(kv._1, kv._2)
this
}
@ -230,10 +247,10 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
case Seq() => // current tx doesn't use this
storage.getMapStorageEntryFor(uuid, key).isDefined // check storage
case txs => // present in log
val lastOp = txs.last.op
val lastOp = txs.last.op
lastOp != REM && lastOp != CLR // last entry cannot be a REM
}
} catch { case e: Exception => false }
}
} catch {case e: Exception => false}
protected def existsInStorage(key: K): Option[V] = try {
storage.getMapStorageEntryFor(uuid, key)
@ -243,33 +260,33 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
override def size: Int = try {
// partition key set affected in current tx into those which r added & which r deleted
val (keysAdded, keysRemoved) = keysInCurrentTx.map {
val (keysAdded, keysRemoved) = keysInCurrentTx.map {
case (kseq, k) => ((kseq, k), getCurrentValue(k))
}.partition(_._2.isDefined)
// keys which existed in storage but removed in current tx
val inStorageRemovedInTx =
keysRemoved.keySet
.map(_._2)
.filter(k => existsInStorage(k).isDefined)
.size
val inStorageRemovedInTx =
keysRemoved.keySet
.map(_._2)
.filter(k => existsInStorage(k).isDefined)
.size
// all keys in storage
val keysInStorage =
storage.getMapStorageFor(uuid)
.map { case (k, v) => toEquals(k) }
.toSet
val keysInStorage =
storage.getMapStorageFor(uuid)
.map {case (k, v) => toEquals(k)}
.toSet
// (keys that existed UNION keys added ) - (keys removed)
(keysInStorage union keysAdded.keySet.map(_._1)).size - inStorageRemovedInTx
} catch {
case e: Exception => 0
} catch {
case e: Exception => 0
}
// get must consider underlying storage & current uncommitted tx log
override def get(key: K): Option[V] = getCurrentValue(key)
def iterator: Iterator[Tuple2[K, V]]
def iterator: Iterator[Tuple2[K, V]]
protected def register = {
if (transaction.get.isEmpty) throw new NoTransactionInScopeException
@ -277,38 +294,50 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
}
}
object PersistentMapBinary {
object COrdering {
//frontend
implicit object ArraySeqOrdering extends Ordering[ArraySeq[Byte]] {
def compare(o1: ArraySeq[Byte], o2: ArraySeq[Byte]) =
ArrayOrdering.compare(o1.toArray, o2.toArray)
}
//backend
implicit object ArrayOrdering extends Ordering[Array[Byte]] {
def compare(o1: Array[Byte], o2: Array[Byte]) =
new String(o1) compare new String(o2)
}
}
}
trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] {
import scala.collection.mutable.ArraySeq
type T = ArraySeq[Byte]
def toEquals(k: Array[Byte]) = ArraySeq(k: _*)
override def equal(k1: Array[Byte], k2: Array[Byte]): Boolean = k1 sameElements k2
object COrdering {
implicit object ArraySeqOrdering extends Ordering[ArraySeq[Byte]] {
def compare(o1: ArraySeq[Byte], o2: ArraySeq[Byte]) =
new String(o1.toArray) compare new String(o2.toArray)
}
}
import scala.collection.immutable.{TreeMap, SortedMap}
private def replayAllKeys: SortedMap[ArraySeq[Byte], Array[Byte]] = {
import COrdering._
import PersistentMapBinary.COrdering._
// need ArraySeq for ordering
val fromStorage =
TreeMap(storage.getMapStorageFor(uuid).map { case (k, v) => (ArraySeq(k: _*), v) }: _*)
val fromStorage =
TreeMap(storage.getMapStorageFor(uuid).map {case (k, v) => (ArraySeq(k: _*), v)}: _*)
val (keysAdded, keysRemoved) = keysInCurrentTx.map {
val (keysAdded, keysRemoved) = keysInCurrentTx.map {
case (_, k) => (k, getCurrentValue(k))
}.partition(_._2.isDefined)
val inStorageRemovedInTx =
keysRemoved.keySet
.filter(k => existsInStorage(k).isDefined)
.map(k => ArraySeq(k: _*))
val inStorageRemovedInTx =
keysRemoved.keySet
.filter(k => existsInStorage(k).isDefined)
.map(k => ArraySeq(k: _*))
(fromStorage -- inStorageRemovedInTx) ++ keysAdded.map { case (k, v) => (ArraySeq(k: _*), v.get) }
(fromStorage -- inStorageRemovedInTx) ++ keysAdded.map {case (k, v) => (ArraySeq(k: _*), v.get)}
}
override def slice(start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = try {
@ -317,51 +346,53 @@ trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] {
if (newMap isEmpty) List[(Array[Byte], Array[Byte])]()
val startKey =
start match {
case Some(bytes) => Some(ArraySeq(bytes: _*))
case None => None
}
start match {
case Some(bytes) => Some(ArraySeq(bytes: _*))
case None => None
}
val endKey =
finish match {
case Some(bytes) => Some(ArraySeq(bytes: _*))
case None => None
}
finish match {
case Some(bytes) => Some(ArraySeq(bytes: _*))
case None => None
}
((startKey, endKey, count): @unchecked) match {
case ((Some(s), Some(e), _)) =>
newMap.range(s, e)
.toList
.map(e => (e._1.toArray, e._2))
.toList
.toList
.map(e => (e._1.toArray, e._2))
.toList
case ((Some(s), None, c)) if c > 0 =>
newMap.from(s)
.iterator
.take(count)
.map(e => (e._1.toArray, e._2))
.toList
.iterator
.take(count)
.map(e => (e._1.toArray, e._2))
.toList
case ((Some(s), None, _)) =>
newMap.from(s)
.toList
.map(e => (e._1.toArray, e._2))
.toList
.toList
.map(e => (e._1.toArray, e._2))
.toList
case ((None, Some(e), _)) =>
newMap.until(e)
.toList
.map(e => (e._1.toArray, e._2))
.toList
.toList
.map(e => (e._1.toArray, e._2))
.toList
}
} catch { case e: Exception => Nil }
} catch {case e: Exception => Nil}
override def iterator: Iterator[(Array[Byte], Array[Byte])] = {
override def iterator: Iterator[(Array[Byte], Array[Byte])] = {
new Iterator[(Array[Byte], Array[Byte])] {
private var elements = replayAllKeys
override def next: (Array[Byte], Array[Byte]) = synchronized {
val (k, v) = elements.head
elements = elements.tail
(k.toArray, v)
}
override def hasNext: Boolean = synchronized { !elements.isEmpty }
override def hasNext: Boolean = synchronized {!elements.isEmpty}
}
}
}
@ -394,7 +425,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
val storage: VectorStorageBackend[T]
def commit = {
for(entry <- appendOnlyTxLog) {
for (entry <- appendOnlyTxLog) {
(entry: @unchecked) match {
case LogEntry(_, Some(v), ADD) => storage.insertVectorStorageEntryFor(uuid, v)
case LogEntry(Some(i), Some(v), UPD) => storage.updateVectorStorageEntryFor(uuid, i, v)
@ -412,7 +443,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
import scala.collection.mutable.ArrayBuffer
var elemsStorage = ArrayBuffer(storage.getVectorStorageRangeFor(uuid, None, None, storage.getVectorStorageSizeFor(uuid)).reverse: _*)
for(entry <- appendOnlyTxLog) {
for (entry <- appendOnlyTxLog) {
(entry: @unchecked) match {
case LogEntry(_, Some(v), ADD) => elemsStorage += v
case LogEntry(Some(i), Some(v), UPD) => elemsStorage.update(i, v)
@ -446,11 +477,11 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
val curr = replay
val s = if (start.isDefined) start.get else 0
val cnt =
if (finish.isDefined) {
val f = finish.get
if (f >= s) (f - s) else count
}
else count
if (finish.isDefined) {
val f = finish.get
if (f >= s) (f - s) else count
}
else count
if (s == 0 && cnt == 0) List().toIndexedSeq
else curr.slice(s, s + cnt).toIndexedSeq
}
@ -520,7 +551,7 @@ trait PersistentRef[T] extends Transactional with Committable with Abortable {
}
private[akka] object PersistentQueue {
//Operations for PersistentQu eue
//Operations for PersistentQueue
sealed trait QueueOp
case object ENQ extends QueueOp
case object DEQ extends QueueOp
@ -552,7 +583,7 @@ private[akka] object PersistentQueue {
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
with Transactional with Committable with Abortable with Logging {
with Transactional with Committable with Abortable with Logging {
//Import Ops
import PersistentQueue._
@ -575,11 +606,12 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
val storage: QueueStorageBackend[A]
def commit = {
enqueuedNDequeuedEntries.toList.foreach { e =>
e._2 match {
case ENQ => storage.enqueue(uuid, e._1.get)
case DEQ => storage.dequeue(uuid)
}
enqueuedNDequeuedEntries.toList.foreach {
e =>
e._2 match {
case ENQ => storage.enqueue(uuid, e._1.get)
case DEQ => storage.dequeue(uuid)
}
}
if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get) {
storage.remove(uuid)
@ -635,7 +667,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
override def size: Int = try {
storage.size(uuid) + localQ.get.length
} catch { case e: Exception => 0 }
} catch {case e: Exception => 0}
override def isEmpty: Boolean =
size == 0
@ -644,10 +676,12 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
enqueue(elem)
this
}
def ++=(elems: Iterator[A]) = {
enqueue(elems.toList: _*)
this
}
def ++=(elems: Iterable[A]): Unit = this ++= elems.iterator
override def dequeueFirst(p: A => Boolean): Option[A] =
@ -670,24 +704,24 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
* <p/>
* zscore can be implemented in a variety of ways by the calling class:
* <pre>
* trait ZScorable {
* trait ZScorable {
* def toZScore: Float
* }
*
* class Foo extends ZScorable {
* class Foo extends ZScorable {
* //.. implemnetation
* }
* </pre>
* Or we can also use views:
* <pre>
* class Foo {
* class Foo {
* //..
* }
*
* implicit def Foo2Scorable(foo: Foo): ZScorable = new ZScorable {
* def toZScore = {
* implicit def Foo2Scorable(foo: Foo): ZScorable = new ZScorable {
* def toZScore = {
* //..
* }
* }
* }
* </pre>
*
@ -696,7 +730,6 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
* @author <a href="http://debasishg.blogspot.com"</a>
*/
trait PersistentSortedSet[A] extends Transactional with Committable with Abortable {
protected val newElems = TransactionalMap[A, Float]()
protected val removedElems = TransactionalVector[A]()
@ -729,8 +762,8 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab
}
private def inStorage(elem: A): Option[Float] = storage.zscore(uuid, elem) match {
case Some(s) => Some(s.toFloat)
case None => None
case Some(s) => Some(s.toFloat)
case None => None
}
def contains(elem: A): Boolean = {
@ -758,8 +791,8 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab
def compare(that: (A, Float)) = x._2 compare that._2
}
implicit def ordering = new scala.math.Ordering[(A,Float)] {
def compare(x: (A, Float),y : (A,Float)) = x._2 compare y._2
implicit def ordering = new scala.math.Ordering[(A, Float)] {
def compare(x: (A, Float), y: (A, Float)) = x._2 compare y._2
}
def zrange(start: Int, end: Int): List[(A, Float)] = {
@ -772,9 +805,9 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab
// -1 means the last element, -2 means the second last
val s = if (start < 0) start + l else start
val e =
if (end < 0) end + l
else if (end >= l) (l - 1)
else end
if (end < 0) end + l
else if (end >= l) (l - 1)
else end
// slice is open at the end, we need a closed end range
ts.iterator.slice(s, e + 1).toList
}

View file

@ -0,0 +1,161 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.persistence.common
import org.scalatest.matchers.ShouldMatchers
import se.scalablesolutions.akka.util.Logging
import org.scalatest.{BeforeAndAfterEach, Spec}
import scala.util.Random
import collection.immutable.{TreeMap, HashMap, HashSet}
import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._
/**
* Implementation Compatibility test for PersistentMap backend implementations.
*/
trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfterEach with Logging {
def storage: MapStorageBackend[Array[Byte], Array[Byte]]
def dropMaps: Unit
override def beforeEach = {
log.info("beforeEach: dropping maps")
dropMaps
}
override def afterEach = {
log.info("afterEach: dropping maps")
dropMaps
}
describe("A Properly functioning MapStorageBackend") {
it("should remove map storage properly") {
val mapName = "removeTest"
val mkey = "removeTestKey".getBytes
val value = "removeTestValue".getBytes
storage.insertMapStorageEntryFor(mapName, mkey, value)
storage.getMapStorageEntryFor(mapName, mkey).isDefined should be(true)
storage.removeMapStorageFor(mapName, mkey)
storage.getMapStorageEntryFor(mapName, mkey) should be(None)
storage.insertMapStorageEntryFor(mapName, mkey, value)
storage.getMapStorageEntryFor(mapName, mkey).isDefined should be(true)
storage.removeMapStorageFor(mapName)
storage.getMapStorageEntryFor(mapName, mkey) should be(None)
}
it("should insert a single map storage element properly") {
val mapName = "insertSingleTest"
val mkey = "insertSingleTestKey".getBytes
val value = "insertSingleTestValue".getBytes
storage.insertMapStorageEntryFor(mapName, mkey, value)
storage.getMapStorageEntryFor(mapName, mkey).get should be(value)
storage.removeMapStorageFor(mapName, mkey)
storage.getMapStorageEntryFor(mapName, mkey) should be(None)
storage.insertMapStorageEntryFor(mapName, mkey, value)
storage.getMapStorageEntryFor(mapName, mkey).get should be(value)
storage.removeMapStorageFor(mapName)
storage.getMapStorageEntryFor(mapName, mkey) should be(None)
}
it("should insert multiple map storage elements properly") {
val mapName = "insertMultipleTest"
val rand = new Random(3).nextInt(100)
val entries = (1 to rand).toList.map {
index =>
(("insertMultipleTestKey" + index).getBytes -> ("insertMutlipleTestValue" + index).getBytes)
}
storage.insertMapStorageEntriesFor(mapName, entries)
entries foreach {
_ match {
case (mkey, value) => {
storage.getMapStorageEntryFor(mapName, mkey).isDefined should be(true)
storage.getMapStorageEntryFor(mapName, mkey).get should be(value)
}
}
}
storage.removeMapStorageFor(mapName)
entries foreach {
_ match {
case (mkey, value) => {
storage.getMapStorageEntryFor(mapName, mkey) should be(None)
}
}
}
}
it("should accurately track the number of key value pairs in a map") {
val mapName = "sizeTest"
val rand = new Random(3).nextInt(100)
val entries = (1 to rand).toList.map {
index =>
(("sizeTestKey" + index).getBytes -> ("sizeTestValue" + index).getBytes)
}
storage.insertMapStorageEntriesFor(mapName, entries)
storage.getMapStorageSizeFor(mapName) should be(rand)
}
it("should return all the key value pairs in the map in the correct order when getMapStorageFor(name) is called") {
val mapName = "allTest"
val rand = new Random(3).nextInt(100)
var entries = new TreeMap[Array[Byte], Array[Byte]]()(ArrayOrdering)
(1 to rand).foreach {
index =>
entries += (("allTestKey" + index).getBytes -> ("allTestValue" + index).getBytes)
}
storage.insertMapStorageEntriesFor(mapName, entries.toList)
val retrieved = storage.getMapStorageFor(mapName)
retrieved.size should be(rand)
entries.size should be(rand)
val entryMap = new HashMap[String, String] ++ entries.map {_ match {case (k, v) => (new String(k), new String(v))}}
val retrievedMap = new HashMap[String, String] ++ entries.map {_ match {case (k, v) => (new String(k), new String(v))}}
entryMap should equal(retrievedMap)
(0 until rand).foreach {
i: Int => {
new String(entries.toList(i)._1) should be(new String(retrieved(i)._1))
}
}
}
it("should return all the key->value pairs that exist in the map that are between start and end, up to count pairs when getMapStorageRangeFor is called") {
//implement if this method will be used
}
it("should return Some(null), not None, for a key that has had the value null set and None for a key with no value set") {
val mapName = "nullTest"
val key = "key".getBytes
storage.insertMapStorageEntryFor(mapName, key, null)
storage.getMapStorageEntryFor(mapName, key).get should be(null)
storage.removeMapStorageFor(mapName, key)
storage.getMapStorageEntryFor(mapName, key) should be(None)
}
it("should not throw an exception when size is called on a non existent map?") {
storage.getMapStorageSizeFor("nonExistent") should be(0)
}
}
}

View file

@ -0,0 +1,123 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.persistence.common
import org.scalatest.matchers.ShouldMatchers
import se.scalablesolutions.akka.util.Logging
import org.scalatest.{BeforeAndAfterEach, Spec}
import scala.util.Random
/**
* Implementation Compatibility test for PersistentQueue backend implementations.
*/
trait QueueStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfterEach with Logging {
def storage: QueueStorageBackend[Array[Byte]]
def dropQueues: Unit
override def beforeEach = {
log.info("beforeEach: dropping queues")
dropQueues
}
override def afterEach = {
log.info("afterEach: dropping queues")
dropQueues
}
describe("A Properly functioning QueueStorage Backend") {
it("should enqueue properly when there is capacity in the queue") {
val queue = "enqueueTest"
val value = "enqueueTestValue".getBytes
storage.size(queue) should be(0)
storage.enqueue(queue, value).get should be(1)
storage.size(queue) should be(1)
}
it("should return None when enqueue is called on a full queue?") {
}
it("should dequeue properly when the queue is not empty") {
val queue = "dequeueTest"
val value = "dequeueTestValue".getBytes
storage.size(queue) should be(0)
storage.enqueue(queue, value)
storage.size(queue) should be(1)
storage.dequeue(queue).get should be(value)
}
it("should return None when dequeue is called on an empty queue") {
val queue = "dequeueTest2"
val value = "dequeueTestValue2".getBytes
storage.size(queue) should be(0)
storage.dequeue(queue) should be(None)
}
it("should accurately reflect the size of the queue") {
val queue = "sizeTest"
val rand = new Random(3).nextInt(100)
val values = (1 to rand).toList.map {i: Int => ("sizeTestValue" + i).getBytes}
values.foreach {storage.enqueue(queue, _)}
storage.size(queue) should be(rand)
val drand = new Random(3).nextInt(rand)
(1 to drand).foreach {
i: Int => {
storage.dequeue(queue).isDefined should be(true)
storage.size(queue) should be(rand - i)
}
}
}
it("should support peek properly") {
val queue = "sizeTest"
val rand = new Random(3).nextInt(100)
val values = (1 to rand).toList.map {i: Int => ("peekTestValue" + i)}
storage.remove(queue)
values.foreach {s: String => storage.enqueue(queue, s.getBytes)}
(1 to rand).foreach {
index => {
val peek = storage.peek(queue, 0, index).map {new String(_)}
peek.size should be(index)
values.dropRight(values.size - index).equals(peek) should be(true)
}
}
(0 until rand).foreach {
index => {
val peek = storage.peek(queue, index, rand - index).map {new String(_)}
peek.size should be(rand - index)
values.drop(index).equals(peek) should be(true)
}
}
//Should we test counts greater than queue size? or greater than queue size - count???
}
it("should not throw an exception when remove is called on a non-existent queue") {
storage.remove("exceptionTest")
}
it("should remove queue storage properly") {
val queue = "removeTest"
val rand = new Random(3).nextInt(100)
val values = (1 to rand).toList.map {i: Int => ("removeValue" + i).getBytes}
values.foreach {storage.enqueue(queue, _)}
storage.size(queue) should be(rand)
storage.remove(queue)
storage.size(queue) should be(0)
}
it("should accept null as a value to enqueue and return Some(null) when that value is dequeued") {
val queue = "nullTest"
storage.enqueue(queue, null).get should be(1)
storage.dequeue(queue).get should be(null)
storage.dequeue(queue) should be(None)
}
}
}

View file

@ -0,0 +1,52 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.persistence.common
import org.scalatest.matchers.ShouldMatchers
import se.scalablesolutions.akka.util.Logging
import org.scalatest.{BeforeAndAfterEach, Spec}
/**
* Implementation Compatibility test for PersistentRef backend implementations.
*/
trait RefStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfterEach with Logging {
def storage: RefStorageBackend[Array[Byte]]
def dropRefs: Unit
override def beforeEach = {
log.info("beforeEach: dropping refs")
dropRefs
}
override def afterEach = {
log.info("afterEach: dropping refs")
dropRefs
}
describe("A Properly functioning RefStorageBackend") {
it("should successfully insert ref storage") {
val name = "RefStorageTest #1"
val value = name.getBytes
storage.insertRefStorageFor(name, value)
storage.getRefStorageFor(name).get should be(value)
}
it("should return None when getRefStorage is called when no value has been inserted") {
val name = "RefStorageTest #2"
val value = name.getBytes
storage.getRefStorageFor(name) should be(None)
}
it("Should return None, not Some(null) when getRefStorageFor is called when null has been set") {
val name = "RefStorageTest #3"
storage.insertRefStorageFor(name, null)
storage.getRefStorageFor(name) should be(None)
}
}
}

View file

@ -0,0 +1,35 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.persistence.common
import org.scalatest.matchers.ShouldMatchers
import se.scalablesolutions.akka.util.Logging
import org.scalatest.{BeforeAndAfterEach, Spec}
/**
* Implementation Compatibility test for PersistentSortedSet backend implementations.
*/
trait SortedSetStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfterEach with Logging {
def storage: SortedSetStorageBackend[Array[Byte]]
def dropSortedSets: Unit
override def beforeEach = {
log.info("beforeEach: dropping sorted sets")
dropSortedSets
}
override def afterEach = {
log.info("afterEach: dropping sorted sets")
dropSortedSets
}
describe("A Properly functioning SortedSetStorageBackend Backend") {
}
}

View file

@ -0,0 +1,362 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.persistence.common
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.config.OneForOneStrategy
import Actor._
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
import StorageObj._
case class GET(k: String)
case class SET(k: String, v: String)
case class REM(k: String)
case class CONTAINS(k: String)
case object MAP_SIZE
case class MSET(kvs: List[(String, String)])
case class REMOVE_AFTER_PUT(kvsToAdd: List[(String, String)], ksToRem: List[String])
case class CLEAR_AFTER_PUT(kvsToAdd: List[(String, String)])
case class PUT_WITH_SLICE(kvsToAdd: List[(String, String)], start: String, cnt: Int)
case class PUT_REM_WITH_SLICE(kvsToAdd: List[(String, String)], ksToRem: List[String], start: String, cnt: Int)
case class VADD(v: String)
case class VUPD(i: Int, v: String)
case class VUPD_AND_ABORT(i: Int, v: String)
case class VGET(i: Int)
case object VSIZE
case class VGET_AFTER_VADD(vsToAdd: List[String], isToFetch: List[Int])
case class VADD_WITH_SLICE(vsToAdd: List[String], start: Int, cnt: Int)
object StorageObj {
var getMap: String => PersistentMap[Array[Byte], Array[Byte]] = _
var getVector: String => PersistentVector[Array[Byte]] = _
class SampleMapStorage extends Actor {
self.lifeCycle = Permanent
val FOO_MAP = "akka.sample.map"
private var fooMap = atomic {StorageObj.getMap(FOO_MAP)}
def receive = {
case SET(k, v) =>
atomic {
fooMap += (k.getBytes, v.getBytes)
}
self.reply((k, v))
case GET(k) =>
val v = atomic {
fooMap.get(k.getBytes).map(new String(_)).getOrElse(k + " Not found")
}
self.reply(v)
case REM(k) =>
val v = atomic {
fooMap -= k.getBytes
}
self.reply(k)
case CONTAINS(k) =>
val v = atomic {
fooMap contains k.getBytes
}
self.reply(v)
case MAP_SIZE =>
val v = atomic {
fooMap.size
}
self.reply(v)
case MSET(kvs) => atomic {
kvs.foreach {kv => fooMap += (kv._1.getBytes, kv._2.getBytes)}
}
self.reply(kvs.size)
case REMOVE_AFTER_PUT(kvs2add, ks2rem) => atomic {
kvs2add.foreach {
kv =>
fooMap += (kv._1.getBytes, kv._2.getBytes)
}
ks2rem.foreach {
k =>
fooMap -= k.getBytes
}
}
self.reply(fooMap.size)
case CLEAR_AFTER_PUT(kvs2add) => atomic {
kvs2add.foreach {
kv =>
fooMap += (kv._1.getBytes, kv._2.getBytes)
}
fooMap.clear
}
self.reply(true)
case PUT_WITH_SLICE(kvs2add, from, cnt) =>
val v = atomic {
kvs2add.foreach {
kv =>
fooMap += (kv._1.getBytes, kv._2.getBytes)
}
fooMap.slice(Some(from.getBytes), cnt)
}
self.reply(v: List[(Array[Byte], Array[Byte])])
case PUT_REM_WITH_SLICE(kvs2add, ks2rem, from, cnt) =>
val v = atomic {
kvs2add.foreach {
kv =>
fooMap += (kv._1.getBytes, kv._2.getBytes)
}
ks2rem.foreach {
k =>
fooMap -= k.getBytes
}
fooMap.slice(Some(from.getBytes), cnt)
}
self.reply(v: List[(Array[Byte], Array[Byte])])
}
}
class SampleVectorStorage extends Actor {
self.lifeCycle = Permanent
val FOO_VECTOR = "akka.sample.vector"
private var fooVector = atomic {StorageObj.getVector(FOO_VECTOR)}
def receive = {
case VADD(v) =>
val size =
atomic {
fooVector + v.getBytes
fooVector length
}
self.reply(size)
case VGET(index) =>
val ind =
atomic {
fooVector get index
}
self.reply(ind)
case VGET_AFTER_VADD(vs, is) =>
val els =
atomic {
vs.foreach(fooVector + _.getBytes)
(is.foldRight(List[Array[Byte]]())(fooVector.get(_) :: _)).map(new String(_))
}
self.reply(els)
case VUPD_AND_ABORT(index, value) =>
val l =
atomic {
fooVector.update(index, value.getBytes)
// force fail
fooVector get 100
}
self.reply(index)
case VADD_WITH_SLICE(vs, s, c) =>
val l =
atomic {
vs.foreach(fooVector + _.getBytes)
fooVector.slice(Some(s), None, c)
}
self.reply(l.map(new String(_)))
}
}
}
trait Ticket343Test extends
Spec with
ShouldMatchers with
BeforeAndAfterEach {
def getMap: String => PersistentMap[Array[Byte], Array[Byte]]
def getVector: String => PersistentVector[Array[Byte]]
def dropMapsAndVectors: Unit
override def beforeEach {
StorageObj.getMap = getMap
StorageObj.getVector = getVector
dropMapsAndVectors
println("** dropMapsAndVectors")
}
override def afterEach {
dropMapsAndVectors
println("** dropMapsAndVectors")
}
describe("Ticket 343 Issue #1") {
it("remove after put should work within the same transaction") {
val proc = actorOf[SampleMapStorage]
proc.start
(proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
(proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
(proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
(proc !! GET("dg")).getOrElse("Get failed") should equal("1")
(proc !! GET("mc")).getOrElse("Get failed") should equal("2")
(proc !! GET("nd")).getOrElse("Get failed") should equal("3")
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
val add = List(("a", "1"), ("b", "2"), ("c", "3"))
val rem = List("a", "debasish")
(proc !! REMOVE_AFTER_PUT(add, rem)).getOrElse("REMOVE_AFTER_PUT failed") should equal(5)
(proc !! GET("debasish")).getOrElse("debasish not found") should equal("debasish Not found")
(proc !! GET("a")).getOrElse("a not found") should equal("a Not found")
(proc !! GET("b")).getOrElse("b not found") should equal("2")
(proc !! CONTAINS("b")).getOrElse("b not found") should equal(true)
(proc !! CONTAINS("debasish")).getOrElse("debasish not found") should equal(false)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(5)
proc.stop
}
}
describe("Ticket 343 Issue #2") {
it("clear after put should work within the same transaction") {
val proc = actorOf[SampleMapStorage]
proc.start
(proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
(proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
val add = List(("a", "1"), ("b", "2"), ("c", "3"))
(proc !! CLEAR_AFTER_PUT(add)).getOrElse("CLEAR_AFTER_PUT failed") should equal(true)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(0)
proc.stop
}
}
describe("Ticket 343 Issue #3") {
it("map size should change after the transaction") {
val proc = actorOf[SampleMapStorage]
proc.start
(proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
(proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
(proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
(proc !! GET("dg")).getOrElse("Get failed") should equal("1")
(proc !! GET("mc")).getOrElse("Get failed") should equal("2")
(proc !! GET("nd")).getOrElse("Get failed") should equal("3")
proc.stop
}
}
describe("slice test") {
it("should pass") {
val proc = actorOf[SampleMapStorage]
proc.start
(proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
(proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
// (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
(proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
(proc !! PUT_WITH_SLICE(List(("ec", "1"), ("tb", "2"), ("mc", "10")), "dg", 3)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map {case (k, v) => (new String(k), new String(v))} should equal(List(("dg", "1"), ("ec", "1"), ("mc", "10")))
(proc !! PUT_REM_WITH_SLICE(List(("fc", "1"), ("gb", "2"), ("xy", "10")), List("tb", "fc"), "dg", 5)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map {case (k, v) => (new String(k), new String(v))} should equal(List(("dg", "1"), ("ec", "1"), ("gb", "2"), ("mc", "10"), ("nd", "3")))
proc.stop
}
}
describe("Ticket 343 Issue #4") {
it("vector get should not ignore elements that were in vector before transaction") {
val proc = actorOf[SampleVectorStorage]
proc.start
// add 4 elements in separate transactions
(proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
(proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
(proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
(proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]]) should equal("nilanjan")
new String((proc !! VGET(1)).get.asInstanceOf[Array[Byte]]) should equal("ramanendu")
new String((proc !! VGET(2)).get.asInstanceOf[Array[Byte]]) should equal("maulindu")
new String((proc !! VGET(3)).get.asInstanceOf[Array[Byte]]) should equal("debasish")
// now add 3 more and do gets in the same transaction
(proc !! VGET_AFTER_VADD(List("a", "b", "c"), List(0, 2, 4))).get.asInstanceOf[List[String]] should equal(List("c", "a", "ramanendu"))
proc.stop
}
}
describe("Ticket 343 Issue #6") {
it("vector update should not ignore transaction") {
val proc = actorOf[SampleVectorStorage]
proc.start
// add 4 elements in separate transactions
(proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
(proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
(proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
(proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
evaluating {
(proc !! VUPD_AND_ABORT(0, "virat")).getOrElse("VUPD_AND_ABORT failed")
} should produce[Exception]
// update aborts and hence values will remain unchanged
new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]]) should equal("nilanjan")
proc.stop
}
}
describe("Ticket 343 Issue #5") {
it("vector slice() should not ignore elements added in current transaction") {
val proc = actorOf[SampleVectorStorage]
proc.start
// add 4 elements in separate transactions
(proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
(proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
(proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
(proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
// slice with no new elements added in current transaction
(proc !! VADD_WITH_SLICE(List(), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("maulindu", "debasish"))
// slice with new elements added in current transaction
(proc !! VADD_WITH_SLICE(List("a", "b", "c", "d"), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("b", "a"))
proc.stop
}
}
}

View file

@ -0,0 +1,123 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.persistence.common
import org.scalatest.matchers.ShouldMatchers
import se.scalablesolutions.akka.util.Logging
import org.scalatest.{BeforeAndAfterEach, Spec}
import scala.util.Random
/**
* Implementation Compatibility test for PersistentVector backend implementations.
*/
trait VectorStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfterEach with Logging {
def storage: VectorStorageBackend[Array[Byte]]
def dropVectors: Unit
override def beforeEach = {
log.info("beforeEach: dropping vectors")
dropVectors
}
override def afterEach = {
log.info("afterEach: dropping vectors")
dropVectors
}
describe("A Properly functioning VectorStorageBackend") {
it("should insertVectorStorageEntry as a logical prepend operation to the existing list") {
val vector = "insertSingleTest"
val rand = new Random(3).nextInt(100)
val values = (0 to rand).toList.map {i: Int => vector + "value" + i}
storage.getVectorStorageSizeFor(vector) should be(0)
values.foreach {s: String => storage.insertVectorStorageEntryFor(vector, s.getBytes)}
val shouldRetrieve = values.reverse
(0 to rand).foreach {
i: Int => {
shouldRetrieve(i) should be(new String(storage.getVectorStorageEntryFor(vector, i)))
}
}
}
it("should insertVectorStorageEntries as a logical prepend operation to the existing list") {
val vector = "insertMultiTest"
val rand = new Random(3).nextInt(100)
val values = (0 to rand).toList.map {i: Int => vector + "value" + i}
storage.getVectorStorageSizeFor(vector) should be(0)
storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes})
val shouldRetrieve = values.reverse
(0 to rand).foreach {
i: Int => {
shouldRetrieve(i) should be(new String(storage.getVectorStorageEntryFor(vector, i)))
}
}
}
it("should successfully update entries") {
val vector = "updateTest"
val rand = new Random(3).nextInt(100)
val values = (0 to rand).toList.map {i: Int => vector + "value" + i}
val urand = new Random(3).nextInt(rand)
storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes})
val toUpdate = "updated" + values.reverse(urand)
storage.updateVectorStorageEntryFor(vector, urand, toUpdate.getBytes)
toUpdate should be(new String(storage.getVectorStorageEntryFor(vector, urand)))
}
it("should return the correct value from getVectorStorageFor") {
val vector = "getTest"
val rand = new Random(3).nextInt(100)
val values = (0 to rand).toList.map {i: Int => vector + "value" + i}
val urand = new Random(3).nextInt(rand)
storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes})
values.reverse(urand) should be(new String(storage.getVectorStorageEntryFor(vector, urand)))
}
it("should return the correct values from getVectorStorageRangeFor") {
val vector = "getTest"
val rand = new Random(3).nextInt(100)
val drand = new Random(3).nextInt(rand)
val values = (0 to rand).toList.map {i: Int => vector + "value" + i}
storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes})
values.reverse should be(storage.getVectorStorageRangeFor(vector, None, None, rand + 1).map {b: Array[Byte] => new String(b)})
(0 to drand).foreach {
i: Int => {
val value: String = vector + "value" + (rand - i)
log.debug(value)
List(value) should be(storage.getVectorStorageRangeFor(vector, Some(i), None, 1).map {b: Array[Byte] => new String(b)})
}
}
}
it("should behave properly when the range used in getVectorStorageRangeFor has indexes outside the current size of the vector") {
//what is proper?
}
it("shoud return null when getStorageEntry is called on a null entry") {
//What is proper?
val vector = "nullTest"
storage.insertVectorStorageEntryFor(vector, null)
storage.getVectorStorageEntryFor(vector, 0) should be(null)
}
it("shoud throw a Storage exception when there is an attempt to retrieve an index larger than the Vector") {
val vector = "tooLargeRetrieve"
storage.insertVectorStorageEntryFor(vector, null)
evaluating {storage.getVectorStorageEntryFor(vector, 9)} should produce[StorageException]
}
it("shoud throw a Storage exception when there is an attempt to update an index larger than the Vector") {
val vector = "tooLargeUpdate"
storage.insertVectorStorageEntryFor(vector, null)
evaluating {storage.updateVectorStorageEntryFor(vector, 9, null)} should produce[StorageException]
}
}
}

View file

@ -15,14 +15,17 @@ object VoldemortStorage extends Storage {
def newMap: PersistentMap[ElementType, ElementType] = newMap(newUuid.toString)
def newVector: PersistentVector[ElementType] = newVector(newUuid.toString)
def newRef: PersistentRef[ElementType] = newRef(newUuid.toString)
override def newQueue: PersistentQueue[ElementType] = newQueue(newUuid.toString)
def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
def getVector(id: String): PersistentVector[ElementType] = newVector(id)
def getRef(id: String): PersistentRef[ElementType] = newRef(id)
override def getQueue(id: String): PersistentQueue[ElementType] = newQueue(id)
def newMap(id: String): PersistentMap[ElementType, ElementType] = new VoldemortPersistentMap(id)
def newVector(id: String): PersistentVector[ElementType] = new VoldemortPersistentVector(id)
def newRef(id: String): PersistentRef[ElementType] = new VoldemortPersistentRef(id)
override def newQueue(id:String): PersistentQueue[ElementType] = new VoldemortPersistentQueue(id)
}
@ -41,3 +44,8 @@ class VoldemortPersistentRef(id: String) extends PersistentRef[Array[Byte]] {
val uuid = id
val storage = VoldemortStorageBackend
}
class VoldemortPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] {
val uuid = id
val storage = VoldemortStorageBackend
}

View file

@ -17,9 +17,10 @@ import voldemort.versioning.Versioned
import collection.JavaConversions
import java.nio.ByteBuffer
import collection.Map
import collection.immutable.{IndexedSeq, SortedSet, TreeSet, HashMap}
import collection.mutable.{Set, HashSet, ArrayBuffer}
import java.util.{Properties, Map => JMap}
import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._
import collection.immutable._
/*
RequiredReads + RequiredWrites should be > ReplicationFactor for all Voldemort Stores
@ -49,28 +50,28 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
var queueClient: StoreClient[Array[Byte], Array[Byte]] = null
initStoreClients
val nullMapValueHeader = 0x00.byteValue
val nullMapValue: Array[Byte] = Array(nullMapValueHeader)
val notNullMapValueHeader: Byte = 0xff.byteValue
val underscoreBytesUTF8 = "_".getBytes("UTF-8")
val mapKeysIndex = getIndexedBytes(-1)
val vectorSizeIndex = getIndexedBytes(-1)
val queueHeadIndex = getIndexedBytes(-1)
val queueTailIndex = getIndexedBytes(-2)
implicit val byteOrder = new Ordering[Array[Byte]] {
override def compare(x: Array[Byte], y: Array[Byte]) = ByteUtils.compare(x, y)
}
//explicit implicit :)
implicit val ordering = ArrayOrdering
def getRefStorageFor(name: String): Option[Array[Byte]] = {
val result: Array[Byte] = refClient.getValue(name)
result match {
case null => None
case _ => Some(result)
}
Option(result)
}
def insertRefStorageFor(name: String, element: Array[Byte]) = {
refClient.put(name, element)
element match {
case null => refClient.delete(name)
case _ => refClient.put(name, element)
}
}
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = {
@ -90,17 +91,17 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
mapKey => getKey(name, mapKey)
}))
val buf = new ArrayBuffer[(Array[Byte], Array[Byte])](all.size)
var returned = new TreeMap[Array[Byte], Array[Byte]]()(ordering)
JavaConversions.asMap(all).foreach {
(entry) => {
entry match {
case (key: Array[Byte], versioned: Versioned[Array[Byte]]) => {
buf += key -> versioned.getValue
case (namePlusKey: Array[Byte], versioned: Versioned[Array[Byte]]) => {
returned += getMapKeyFromKey(name, namePlusKey) -> getMapValueFromStored(versioned.getValue)
}
}
}
}
buf.toList
returned.toList
}
def getMapStorageSizeFor(name: String): Int = {
@ -112,7 +113,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
val result: Array[Byte] = mapClient.getValue(getKey(name, key))
result match {
case null => None
case _ => Some(result)
case _ => Some(getMapValueFromStored(result))
}
}
@ -134,7 +135,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
}
def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) = {
mapClient.put(getKey(name, key), value)
mapClient.put(getKey(name, key), getStoredMapValue(value))
var keys = getMapKeys(name)
keys += key
putMapKeys(name, keys)
@ -143,7 +144,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = {
val newKeys = entries.map {
case (key, value) => {
mapClient.put(getKey(name, key), value)
mapClient.put(getKey(name, key), getStoredMapValue(value))
key
}
}
@ -169,17 +170,22 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = {
val size = getVectorStorageSizeFor(name)
val st = start.getOrElse(0)
val cnt =
var cnt =
if (finish.isDefined) {
val f = finish.get
if (f >= st) (f - st) else count
} else {
count
}
val seq: IndexedSeq[Array[Byte]] = (st until st + cnt).map {
index => getIndexedKey(name, index)
if (cnt > (size - st)) {
cnt = size - st
}
val seq: IndexedSeq[Array[Byte]] = (st until st + cnt).map {
index => getIndexedKey(name, (size - 1) - index)
} //read backwards
val all: JMap[Array[Byte], Versioned[Array[Byte]]] = vectorClient.getAll(JavaConversions.asIterable(seq))
var storage = new ArrayBuffer[Array[Byte]](seq.size)
@ -199,14 +205,23 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
vectorClient.getValue(getIndexedKey(name, index), Array.empty[Byte])
val size = getVectorStorageSizeFor(name)
if (size > 0 && index < size) {
vectorClient.getValue(getIndexedKey(name, /*read backwards*/ (size - 1) - index))
} else {
throw new StorageException("In Vector:" + name + " No such Index:" + index)
}
}
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = {
val size = getVectorStorageSizeFor(name)
vectorClient.put(getIndexedKey(name, index), elem)
if (size < index + 1) {
vectorClient.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(index + 1))
if (size > 0 && index < size) {
elem match {
case null => vectorClient.delete(getIndexedKey(name, /*read backwards*/ (size - 1) - index))
case _ => vectorClient.put(getIndexedKey(name, /*read backwards*/ (size - 1) - index), elem)
}
} else {
throw new StorageException("In Vector:" + name + " No such Index:" + index)
}
}
@ -214,7 +229,9 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
var size = getVectorStorageSizeFor(name)
elements.foreach {
element =>
vectorClient.put(getIndexedKey(name, size), element)
if (element != null) {
vectorClient.put(getIndexedKey(name, size), element)
}
size += 1
}
vectorClient.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(size))
@ -263,7 +280,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
try {
queueClient.delete(key)
} catch {
//a failure to delete is ok, just leaves a K-V in Voldemort that will be overwritten if the queue ever wraps around
//a failure to delete is ok, just leaves a K-V in Voldemort that will be overwritten if the queue ever wraps around
case e: Exception => log.warn(e, "caught an exception while deleting a dequeued element, however this will not cause any inconsistency in the queue")
}
}
@ -276,7 +293,10 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
val mdata = getQueueMetadata(name)
if (mdata.canEnqueue) {
val key = getIndexedKey(name, mdata.tail)
queueClient.put(key, item)
item match {
case null => queueClient.delete(key)
case _ => queueClient.put(key, item)
}
queueClient.put(getKey(name, queueTailIndex), IntSerializer.toBytes(mdata.nextEnqueue))
Some(mdata.size + 1)
} else {
@ -332,6 +352,39 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
IntSerializer.fromBytes(indexBytes)
}
def getMapKeyFromKey(owner: String, key: Array[Byte]): Array[Byte] = {
val mapKeyLength = key.length - IntSerializer.bytesPerInt - owner.getBytes("UTF-8").length
val mapkey = new Array[Byte](mapKeyLength)
System.arraycopy(key, key.length - mapKeyLength, mapkey, 0, mapKeyLength)
mapkey
}
//wrapper for null
def getStoredMapValue(value: Array[Byte]): Array[Byte] = {
value match {
case null => nullMapValue
case value => {
val stored = new Array[Byte](value.length + 1)
stored(0) = notNullMapValueHeader
System.arraycopy(value, 0, stored, 1, value.length)
stored
}
}
}
def getMapValueFromStored(value: Array[Byte]): Array[Byte] = {
if (value(0) == nullMapValueHeader) {
null
} else if (value(0) == notNullMapValueHeader) {
val returned = new Array[Byte](value.length - 1)
System.arraycopy(value, 1, returned, 0, value.length - 1)
returned
} else {
throw new StorageException("unknown header byte on map value:" + value(0))
}
}
def getClientConfig(configMap: Map[String, String]): Properties = {
val properites = new Properties
@ -450,6 +503,8 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
}
def fromBytes(bytes: Array[Byte]): SortedSet[Array[Byte]] = {
import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._
var set = new TreeSet[Array[Byte]]
if (bytes.length > IntSerializer.bytesPerInt) {
var pos = 0

View file

@ -1,20 +1,20 @@
package se.scalablesolutions.akka.persistence.voldemort
import org.scalatest.matchers.ShouldMatchers
import voldemort.server.{VoldemortServer, VoldemortConfig}
import org.scalatest.{Suite, BeforeAndAfterAll, FunSuite}
import org.scalatest.{Suite, BeforeAndAfterAll}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import voldemort.utils.Utils
import java.io.File
import se.scalablesolutions.akka.util.{Logging}
import collection.JavaConversions
import voldemort.store.memory.InMemoryStorageConfiguration
import voldemort.client.protocol.admin.{AdminClientConfig, AdminClient}
@RunWith(classOf[JUnitRunner])
trait EmbeddedVoldemort extends BeforeAndAfterAll with Logging {
this: Suite =>
var server: VoldemortServer = null
var admin: AdminClient = null
override protected def beforeAll(): Unit = {
@ -28,6 +28,7 @@ trait EmbeddedVoldemort extends BeforeAndAfterAll with Logging {
server = new VoldemortServer(config)
server.start
VoldemortStorageBackend.initStoreClients
admin = new AdminClient(VoldemortStorageBackend.clientConfig.getProperty(VoldemortStorageBackend.bootstrapUrlsProp), new AdminClientConfig)
log.info("Started")
} catch {
case e => log.error(e, "Error Starting Voldemort")
@ -36,6 +37,7 @@ trait EmbeddedVoldemort extends BeforeAndAfterAll with Logging {
}
override protected def afterAll(): Unit = {
admin.stop
server.stop
}
}

View file

@ -1,87 +0,0 @@
package se.scalablesolutions.akka.persistence.voldemort
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import se.scalablesolutions.akka.persistence.voldemort.VoldemortStorageBackend._
import se.scalablesolutions.akka.actor.{newUuid,Uuid}
import collection.immutable.TreeSet
import VoldemortStorageBackendSuite._
import se.scalablesolutions.akka.stm._
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.persistence.common._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.Config.config
@RunWith(classOf[JUnitRunner])
class VoldemortPersistentDatastructureSuite extends FunSuite with ShouldMatchers with EmbeddedVoldemort with Logging {
test("persistentRefs work as expected") {
val name = newUuid.toString
val one = "one".getBytes
atomic {
val ref = VoldemortStorage.getRef(name)
ref.isDefined should be(false)
ref.swap(one)
ref.get match {
case Some(bytes) => bytes should be(one)
case None => true should be(false)
}
}
val two = "two".getBytes
atomic {
val ref = VoldemortStorage.getRef(name)
ref.isDefined should be(true)
ref.swap(two)
ref.get match {
case Some(bytes) => bytes should be(two)
case None => true should be(false)
}
}
}
test("Persistent Vectors function as expected") {
val name = newUuid.toString
val one = "one".getBytes
val two = "two".getBytes
atomic {
val vec = VoldemortStorage.getVector(name)
vec.add(one)
}
atomic {
val vec = VoldemortStorage.getVector(name)
vec.size should be(1)
vec.add(two)
}
atomic {
val vec = VoldemortStorage.getVector(name)
vec.get(0) should be(one)
vec.get(1) should be(two)
vec.size should be(2)
vec.update(0, two)
}
atomic {
val vec = VoldemortStorage.getVector(name)
vec.get(0) should be(two)
vec.get(1) should be(two)
vec.size should be(2)
vec.update(0, Array.empty[Byte])
vec.update(1, Array.empty[Byte])
}
atomic {
val vec = VoldemortStorage.getVector(name)
vec.get(0) should be(Array.empty[Byte])
vec.get(1) should be(Array.empty[Byte])
vec.size should be(2)
}
}
}

View file

@ -0,0 +1,49 @@
package se.scalablesolutions.akka.persistence.voldemort
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import se.scalablesolutions.akka.persistence.common.{QueueStorageBackendTest, VectorStorageBackendTest, MapStorageBackendTest, RefStorageBackendTest}
@RunWith(classOf[JUnitRunner])
class VoldemortRefStorageBackendTest extends RefStorageBackendTest with EmbeddedVoldemort {
def dropRefs = {
admin.truncate(0, VoldemortStorageBackend.refStore)
}
def storage = VoldemortStorageBackend
}
@RunWith(classOf[JUnitRunner])
class VoldemortMapStorageBackendTest extends MapStorageBackendTest with EmbeddedVoldemort {
def dropMaps = {
admin.truncate(0, VoldemortStorageBackend.mapStore)
}
def storage = VoldemortStorageBackend
}
@RunWith(classOf[JUnitRunner])
class VoldemortVectorStorageBackendTest extends VectorStorageBackendTest with EmbeddedVoldemort {
def dropVectors = {
admin.truncate(0, VoldemortStorageBackend.vectorStore)
}
def storage = VoldemortStorageBackend
}
@RunWith(classOf[JUnitRunner])
class VoldemortQueueStorageBackendTest extends QueueStorageBackendTest with EmbeddedVoldemort {
def dropQueues = {
admin.truncate(0, VoldemortStorageBackend.queueStore)
}
def storage = VoldemortStorageBackend
}

View file

@ -103,10 +103,7 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb
vectorClient.delete(getKey(key, vectorSizeIndex))
vectorClient.delete(getIndexedKey(key, 0))
vectorClient.delete(getIndexedKey(key, 1))
getVectorStorageEntryFor(key, 0) should be(empty)
getVectorStorageEntryFor(key, 1) should be(empty)
getVectorStorageRangeFor(key, None, None, 1).head should be(empty)
insertVectorStorageEntryFor(key, value)
//again
insertVectorStorageEntryFor(key, value)

View file

@ -0,0 +1,22 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.persistence.voldemort
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import se.scalablesolutions.akka.persistence.common._
@RunWith(classOf[JUnitRunner])
class VoldemortTicket343Test extends Ticket343Test with EmbeddedVoldemort {
def dropMapsAndVectors: Unit = {
admin.truncate(0, VoldemortStorageBackend.mapStore)
admin.truncate(0, VoldemortStorageBackend.vectorStore)
}
def getVector: (String) => PersistentVector[Array[Byte]] = VoldemortStorage.getVector
def getMap: (String) => PersistentMap[Array[Byte], Array[Byte]] = VoldemortStorage.getMap
}

View file

@ -116,6 +116,8 @@ class SerializableTypeClassActorSpec extends
(actor2 !! "hello").getOrElse("_") should equal("world 3")
actor2.receiveTimeout should equal (Some(1000))
actor1.stop
actor2.stop
}
it("should be able to serialize and deserialize a MyStatelessActorWithMessagesInMailbox") {

View file

@ -595,7 +595,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val dbcp = Dependencies.dbcp
val sjson = Dependencies.sjson_test
override def testOptions = createTestFilter( _.endsWith("Suite"))
override def testOptions = createTestFilter({ s:String=> s.endsWith("Suite") || s.endsWith("Test")})
}