pekko/kernel/src/main/scala/state/State.scala

445 lines
12 KiB
Scala
Raw Normal View History

2009-04-19 10:58:20 +02:00
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.state
2009-04-19 10:58:20 +02:00
import kernel.stm.TransactionManagement
import akka.collection._
import org.codehaus.aspectwerkz.proxy.Uuid
import scala.collection.mutable.{ArrayBuffer, HashMap}
2009-04-19 10:58:20 +02:00
import org.multiverse.standard.manuallyinstrumented.Ref
2009-06-22 14:12:09 +02:00
sealed abstract class TransactionalStateConfig
abstract class PersistentStorageConfig extends TransactionalStateConfig
case class CassandraStorageConfig extends PersistentStorageConfig
case class TerracottaStorageConfig extends PersistentStorageConfig
case class TokyoCabinetStorageConfig extends PersistentStorageConfig
2009-06-22 14:12:09 +02:00
/**
* Scala API.
* <p/>
* Example Scala usage:
* <pre>
* val myMap = TransactionalState.newPersistentMap(CassandraStorageConfig)
* </pre>
*/
2009-06-22 14:12:09 +02:00
object TransactionalState extends TransactionalState
/**
* Java API.
* <p/>
* Example Java usage:
* <pre>
* TransactionalState state = new TransactionalState();
* TransactionalMap myMap = state.newPersistentMap(new CassandraStorageConfig());
* </pre>
*/
class TransactionalState {
def newPersistentMap(config: PersistentStorageConfig): TransactionalMap[String, AnyRef] = config match {
case CassandraStorageConfig() => new CassandraPersistentTransactionalMap
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
2009-06-22 14:12:09 +02:00
}
def newPersistentVector(config: PersistentStorageConfig): TransactionalVector[AnyRef] = config match {
case CassandraStorageConfig() => new CassandraPersistentTransactionalVector
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
2009-06-22 14:12:09 +02:00
}
def newPersistentRef(config: PersistentStorageConfig): TransactionalRef[AnyRef] = config match {
case CassandraStorageConfig() => new CassandraPersistentTransactionalRef
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
2009-06-22 14:12:09 +02:00
}
def newInMemoryMap[K, V]: TransactionalMap[K, V] = new InMemoryTransactionalMap[K, V]
def newInMemoryVector[T]: TransactionalVector[T] = new InMemoryTransactionalVector[T]
def newInMemoryRef[T]: TransactionalRef[T] = new TransactionalRef[T]
2009-06-22 14:12:09 +02:00
}
/**
2009-06-22 14:12:09 +02:00
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@serializable
2009-04-19 10:58:20 +02:00
trait Transactional {
// FIXME: won't work across the cluster
val uuid = Uuid.newUuid.toString
protected def verifyTransaction = {
val cflowTx = TransactionManagement.threadBoundTx.get
if (!cflowTx.isDefined) {
throw new IllegalStateException("Can't access transactional reference outside the scope of a transaction [" + this + "]")
} else {
cflowTx.get.register(this)
}
}
2009-04-19 10:58:20 +02:00
}
/**
* Base trait for all state implementations (persistent or in-memory).
*
* FIXME: Create Java versions using pcollections
*
2009-04-27 20:06:48 +02:00
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait TransactionalMap[K, V] extends Transactional with scala.collection.mutable.Map[K, V] {
override def hashCode: Int = System.identityHashCode(this);
override def equals(other: Any): Boolean = false
def remove(key: K)
}
2009-04-19 10:58:20 +02:00
/**
* Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
*
2009-04-27 20:06:48 +02:00
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
2009-04-19 10:58:20 +02:00
*/
class InMemoryTransactionalMap[K, V] extends TransactionalMap[K, V] {
protected[this] var ref = new TransactionalRef(new HashTrie[K, V])
// ---- Overriding scala.collection.mutable.Map behavior ----
override def contains(key: K): Boolean = ref.get.contains(key)
override def clear = ref.set(new HashTrie[K, V])
override def size: Int = ref.get.size
2009-04-19 10:58:20 +02:00
// ---- For scala.collection.mutable.Map ----
override def remove(key: K) = ref.set(ref.get - key)
override def elements: Iterator[(K, V)] = ref.get.elements
override def get(key: K): Option[V] = ref.get.get(key)
override def put(key: K, value: V): Option[V] = {
val map = ref.get
val oldValue = map.get(key)
ref.set(map.update(key, value))
oldValue
2009-04-19 10:58:20 +02:00
}
override def -=(key: K) = remove(key)
override def update(key: K, value: V) = put(key, value)
}
2009-04-19 10:58:20 +02:00
/**
* Base class for all persistent transactional map implementations should extend.
* Implements a Unit of Work, records changes into a change set.
*
* Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
*
2009-04-27 20:06:48 +02:00
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
// FIXME: need to handle remove in another changeSet
protected[kernel] val changeSet = new HashMap[K, V]
def getRange(start: Int, count: Int)
def begin
def commit
def rollback = changeSet.clear
// ---- For scala.collection.mutable.Map ----
override def put(key: K, value: V): Option[V] = {
verifyTransaction
changeSet += key -> value
None // always return None to speed up writes (else need to go to DB to get
}
override def remove(key: K) = {
verifyTransaction
changeSet -= key
}
override def -=(key: K) = remove(key)
override def update(key: K, value: V) = put(key, value)
}
2009-04-19 10:58:20 +02:00
/**
* Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage.
*
2009-04-27 20:06:48 +02:00
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[String, AnyRef] {
override def getRange(start: Int, count: Int) = {
verifyTransaction
try {
CassandraStorage.getMapStorageRangeFor(uuid, start, count)
} catch {
case e: Exception => Nil
}
}
// ---- For Transactional ----
override def commit = {
CassandraStorage.insertMapStorageEntriesFor(uuid, changeSet.toList)
changeSet.clear
}
// ---- Overriding scala.collection.mutable.Map behavior ----
override def clear = {
verifyTransaction
try {
CassandraStorage.removeMapStorageFor(uuid)
} catch {
case e: Exception => {}
}
}
override def contains(key: String): Boolean = {
try {
verifyTransaction
CassandraStorage.getMapStorageEntryFor(uuid, key).isDefined
} catch {
case e: Exception => false
}
}
override def size: Int = {
verifyTransaction
try {
CassandraStorage.getMapStorageSizeFor(uuid)
} catch {
case e: Exception => 0
}
}
// ---- For scala.collection.mutable.Map ----
override def get(key: String): Option[AnyRef] = {
verifyTransaction
// if (changeSet.contains(key)) changeSet.get(key)
// else {
val result = try {
CassandraStorage.getMapStorageEntryFor(uuid, key)
} catch {
case e: Exception => None
}
result
//}
}
override def elements: Iterator[Tuple2[String, AnyRef]] = {
//verifyTransaction
new Iterator[Tuple2[String, AnyRef]] {
private val originalList: List[Tuple2[String, AnyRef]] = try {
CassandraStorage.getMapStorageFor(uuid)
} catch {
case e: Throwable => Nil
}
private var elements = originalList.reverse
override def next: Tuple2[String, AnyRef]= synchronized {
val element = elements.head
elements = elements.tail
element
}
override def hasNext: Boolean = synchronized { !elements.isEmpty }
}
}
}
2009-04-19 10:58:20 +02:00
/**
* Base for all transactional vector implementations.
*
2009-04-27 20:06:48 +02:00
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
abstract class TransactionalVector[T] extends Transactional with RandomAccessSeq[T] {
override def hashCode: Int = System.identityHashCode(this);
override def equals(other: Any): Boolean = false
def add(elem: T)
def get(index: Int): T
def getRange(start: Int, count: Int): List[T]
2009-04-19 10:58:20 +02:00
}
/**
* Implements an in-memory transactional vector.
*
2009-04-19 10:58:20 +02:00
* Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
*
2009-04-27 20:06:48 +02:00
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
2009-04-19 10:58:20 +02:00
*/
class InMemoryTransactionalVector[T] extends TransactionalVector[T] {
private[kernel] var state = new Ref[Vector[T]](EmptyVector)
2009-04-19 10:58:20 +02:00
def add(elem: T) = {
state.get.set(elem)
}
def get(index: Int): T = {
state(index)
}
def getRange(start: Int, count: Int): List[T] = {
verifyTransaction
state.slice(start, count).toList.asInstanceOf[List[T]]
}
// ---- For Transactional ----
override def begin = snapshot = state
override def commit = snapshot = state
override def rollback = state = snapshot
2009-04-19 10:58:20 +02:00
// ---- For Seq ----
def length: Int = {
verifyTransaction
state.length
}
def apply(index: Int): T = {
verifyTransaction
state(index)
}
override def elements: Iterator[T] = {
//verifyTransaction
state.elements
}
override def toList: List[T] = {
verifyTransaction
state.toList
}
2009-04-19 10:58:20 +02:00
}
/**
* Base class for all persistent transactional vector implementations should extend.
* Implements a Unit of Work, records changes into a change set.
*
* Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
abstract class PersistentTransactionalVector[T] extends TransactionalVector[T] {
// FIXME: need to handle remove in another changeSet
protected[kernel] val changeSet = new ArrayBuffer[T]
// ---- For Transactional ----
override def begin = {}
override def rollback = changeSet.clear
// ---- For TransactionalVector ----
override def add(value: T) = {
verifyTransaction
changeSet += value
}
}
/**
* Implements a persistent transactional vector based on the Cassandra distributed P2P key-value storage.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class CassandraPersistentTransactionalVector extends PersistentTransactionalVector[AnyRef] {
// ---- For TransactionalVector ----
override def get(index: Int): AnyRef = {
verifyTransaction
if (changeSet.size > index) changeSet(index)
else CassandraStorage.getVectorStorageEntryFor(uuid, index)
}
override def getRange(start: Int, count: Int): List[AnyRef] = {
verifyTransaction
CassandraStorage.getVectorStorageRangeFor(uuid, start, count)
}
override def length: Int = {
verifyTransaction
CassandraStorage.getVectorStorageSizeFor(uuid)
}
override def apply(index: Int): AnyRef = get(index)
override def first: AnyRef = get(0)
override def last: AnyRef = {
verifyTransaction
val l = length
if (l == 0) throw new NoSuchElementException("Vector is empty")
get(length - 1)
}
// ---- For Transactional ----
override def commit = {
// FIXME: should use batch function once the bug is resolved
for (element <- changeSet) CassandraStorage.insertVectorStorageEntryFor(uuid, element)
changeSet.clear
}
}
/**
* Implements a transactional reference.
*
* Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
*
2009-04-27 20:06:48 +02:00
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class TransactionalRef[T](elem: T) extends Transactional {
def this() = this(null)
private[kernel] val ref = new Ref[T](elem)
def swap(elem: T) = ref.set(elem)
def get: Option[T] = {
if (ref.isNull) None
else Some(ref.get)
}
def getOrElse(default: => T): T = {
if (ref.isNull) default
else ref.get
}
def isDefined: Boolean = !ref.isNull)
}
/**
* Implements a transactional reference.
*
* Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class CassandraPersistentTransactionalRef extends TransactionalRef[String] {
def commit = if (isDefined) {
CassandraStorage.insertRefStorageFor(uuid, ref.get)
ref.clear
}
def rollback = ref.clear
override def get: Option[AnyRef] = {
verifyTransaction
if (isDefined) super.get
else CassandraStorage.getRefStorageFor(uuid)
}
override def isDefined: Boolean = get.isDefined
override def getOrElse(default: => AnyRef): AnyRef = {
val ref = get
if (ref.isDefined) ref.get
else default
}
}