fixed cassandra persistenc STM tests + added generic Map and Seq traits to Transactional datastructures

This commit is contained in:
Jonas Boner 2009-05-13 19:28:55 +02:00
parent 1a06a67cfc
commit 9349bc3ad8
18 changed files with 689 additions and 439 deletions

View file

@ -7,6 +7,9 @@ package se.scalablesolutions.akka.kernel
import se.scalablesolutions.akka.collection._
import scala.collection.mutable.HashMap
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Transactional {
private[kernel] def begin
private[kernel] def commit
@ -20,14 +23,40 @@ trait Transactional {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait TransactionalMap[K, V] extends Transactional {
def put(key: K, value: V)
def remove(key: K)
def get(key: K): V
def contains(key: K): Boolean
def elements: Iterator[(K, V)]
def size: Int
def clear
trait TransactionalMap[K, V] extends Transactional with scala.collection.mutable.Map[K, V] {
def remove(key: K)
}
/**
* Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class InMemoryTransactionalMap[K, V] extends TransactionalMap[K, V] {
protected[kernel] var state = new HashTrie[K, V]
protected[kernel] var snapshot = state
// ---- For Transactional ----
override def begin = snapshot = state
override def commit = snapshot = state
override def rollback = state = snapshot
// ---- Overriding scala.collection.mutable.Map behavior ----
override def contains(key: K): Boolean = state.contains(key)
override def clear = state = new HashTrie[K, V]
override def size: Int = state.size
// ---- For scala.collection.mutable.Map ----
override def remove(key: K) = state = state - key
override def elements: Iterator[(K, V)] = state.elements
override def get(key: K): Option[V] = state.get(key)
override def put(key: K, value: V): Option[V] = {
val oldValue = state.get(key)
state = state.update(key, value)
oldValue
}
override def -=(key: K) = remove(key)
override def update(key: K, value: V) = put(key, value)
}
/**
@ -40,42 +69,21 @@ trait TransactionalMap[K, V] extends Transactional {
*/
abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
protected[kernel] val changeSet = new HashMap[K, V]
override def begin = {
changeSet.clear
}
override def put(key: K, value: V) = {
changeSet += key -> value
}
override def remove(key: K) = {
changeSet -= key
}
def getRange(start: Int, count: Int)
}
/**
* Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class InMemoryTransactionalMap[K, V] extends TransactionalMap[K, V] {
protected[kernel] var state = new HashTrie[K, V]
protected[kernel] var snapshot = state
override def begin = snapshot = state
override def commit = snapshot = state
override def rollback = state = snapshot
override def put(key: K, value: V) = state = state.update(key, value)
override def get(key: K): V = state.get(key).getOrElse(throw new NoSuchElementException("No value for key [" + key + "]"))
override def remove(key: K) = state = state - key
override def contains(key: K): Boolean = state.contains(key)
override def elements: Iterator[(K, V)] = state.elements
override def size: Int = state.size
override def clear = state = new HashTrie[K, V]
def getRange(start: Int, count: Int)
// ---- For Transactional ----
override def begin = changeSet.clear
override def rollback = {}
// ---- For scala.collection.mutable.Map ----
override def put(key: K, value: V): Option[V] = {
changeSet += key -> value
None // always return None to speed up writes (else need to go to DB to get
}
override def remove(key: K) = changeSet -= key
override def -=(key: K) = remove(key)
override def update(key: K, value: V) = put(key, value)
}
/**
@ -83,11 +91,14 @@ class InMemoryTransactionalMap[K, V] extends TransactionalMap[K, V] {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class CassandraPersistentTransactionalMap(actorNameInstance: AnyRef) extends PersistentTransactionalMap[String, AnyRef] {
val actorName = actorNameInstance.getClass.getName
override def begin = {}
override def rollback = {}
class CassandraPersistentTransactionalMap(actorNameInstance: AnyRef)
extends PersistentTransactionalMap[String, AnyRef] {
val actorName = actorNameInstance.getClass.getName
override def getRange(start: Int, count: Int) = CassandraNode.getActorStorageRange(actorName, start, count)
// ---- For Transactional ----
override def commit = {
// FIXME: should use batch function once the bug is resolved
for (entry <- changeSet) {
@ -96,28 +107,22 @@ class CassandraPersistentTransactionalMap(actorNameInstance: AnyRef) extends Per
}
}
override def get(key: String): AnyRef = CassandraNode.getActorStorageEntryFor(actorName, key)
.getOrElse(throw new NoSuchElementException("Could not find element for key [" + key + "]"))
override def contains(key: String): Boolean = CassandraNode.getActorStorageEntryFor(actorName, key).isDefined
override def size: Int = CassandraNode.getActorStorageSizeFor(actorName)
// ---- Overriding scala.collection.mutable.Map behavior ----
override def clear = CassandraNode.removeActorStorageFor(actorName)
override def getRange(start: Int, count: Int) = CassandraNode.getActorStorageRange(actorName, start, count)
override def elements: Iterator[Tuple2[String, AnyRef]] = {
override def contains(key: String): Boolean = CassandraNode.getActorStorageEntryFor(actorName, key).isDefined
override def size: Int = CassandraNode.getActorStorageSizeFor(actorName)
// ---- For scala.collection.mutable.Map ----
override def get(key: String): Option[AnyRef] = CassandraNode.getActorStorageEntryFor(actorName, key)
override def elements: Iterator[Tuple2[String, AnyRef]] = {
new Iterator[Tuple2[String, AnyRef]] {
private val originalList: List[Tuple2[String, AnyRef]] = CassandraNode.getActorStorageFor(actorName)
private var elements = originalList.reverse
override def next: Tuple2[String, AnyRef]= synchronized {
val element = elements.head
elements = elements.tail
element
}
}
override def hasNext: Boolean = synchronized { !elements.isEmpty }
}
}
@ -129,10 +134,9 @@ class CassandraPersistentTransactionalMap(actorNameInstance: AnyRef) extends Per
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
abstract class TransactionalVector[T] extends Transactional {
abstract class TransactionalVector[T] extends Transactional with RandomAccessSeq[T] {
def add(elem: T)
def get(index: Int): T
def size: Int
}
/**
@ -146,13 +150,19 @@ class InMemoryTransactionalVector[T] extends TransactionalVector[T] {
private[kernel] var state: Vector[T] = EmptyVector
private[kernel] var snapshot = state
override def add(elem: T) = state = state + elem
override def get(index: Int): T = state(index)
// ---- For Transactional ----
override def begin = snapshot = state
override def commit = snapshot = state
override def rollback = state = snapshot
override def add(elem: T) = state = state + elem
override def get(index: Int): T = state(index)
override def size: Int = state.size
// ---- For Seq ----
def length: Int = state.length
def apply(index: Int): T = state(index)
override def elements: Iterator[T] = state.elements
override def toList: List[T] = state.toList
}
/**