Removed TransactionalState and TransactionalRef
This commit is contained in:
parent
ce41e170dd
commit
fa044ca1ad
16 changed files with 358 additions and 388 deletions
|
|
@ -10,14 +10,14 @@ import se.scalablesolutions.akka.stm.*;
|
|||
public class InMemStateful {
|
||||
private TransactionalMap<String, String> mapState;
|
||||
private TransactionalVector<String> vectorState;
|
||||
private TransactionalRef<String> refState;
|
||||
private Ref<String> refState;
|
||||
private boolean isInitialized = false;
|
||||
|
||||
public void init() {
|
||||
if (!isInitialized) {
|
||||
mapState = TransactionalState.newMap();
|
||||
vectorState = TransactionalState.newVector();
|
||||
refState = TransactionalState.newRef();
|
||||
mapState = new TransactionalMap();
|
||||
vectorState = new TransactionalVector();
|
||||
refState = new Ref();
|
||||
isInitialized = true;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,14 +8,14 @@ import se.scalablesolutions.akka.stm.*;
|
|||
public class InMemStatefulNested {
|
||||
private TransactionalMap<String, String> mapState;
|
||||
private TransactionalVector<String> vectorState;
|
||||
private TransactionalRef<String> refState;
|
||||
private Ref<String> refState;
|
||||
private boolean isInitialized = false;
|
||||
|
||||
public void init() {
|
||||
if (!isInitialized) {
|
||||
mapState = TransactionalState.newMap();
|
||||
vectorState = TransactionalState.newVector();
|
||||
refState = TransactionalState.newRef();
|
||||
mapState = new TransactionalMap();
|
||||
vectorState = new TransactionalVector();
|
||||
refState = new Ref();
|
||||
isInitialized = true;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
150
akka-core/src/main/scala/stm/Ref.scala
Normal file
150
akka-core/src/main/scala/stm/Ref.scala
Normal file
|
|
@ -0,0 +1,150 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.stm
|
||||
|
||||
import se.scalablesolutions.akka.util.UUID
|
||||
|
||||
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
|
||||
|
||||
object RefFactory {
|
||||
private val factory = getGlobalStmInstance.getProgrammaticRefFactoryBuilder.build
|
||||
|
||||
def createRef[T] = factory.atomicCreateRef[T]()
|
||||
|
||||
def createRef[T](value: T) = factory.atomicCreateRef(value)
|
||||
}
|
||||
|
||||
/**
|
||||
* Ref.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object Ref {
|
||||
def apply[T]() = new Ref[T]
|
||||
|
||||
def apply[T](initialValue: T) = new Ref[T](Some(initialValue))
|
||||
|
||||
/**
|
||||
* An implicit conversion that converts a Ref to an Iterable value.
|
||||
*/
|
||||
implicit def ref2Iterable[T](ref: Ref[T]): Iterable[T] = ref.toList
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements a transactional managed reference.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class Ref[T](initialOpt: Option[T] = None) extends Transactional {
|
||||
self =>
|
||||
|
||||
def this() = this(None) // Java compatibility
|
||||
|
||||
import org.multiverse.api.ThreadLocalTransaction._
|
||||
|
||||
val uuid = UUID.newUuid.toString
|
||||
|
||||
private[this] val ref = {
|
||||
if (initialOpt.isDefined) RefFactory.createRef(initialOpt.get)
|
||||
else RefFactory.createRef[T]
|
||||
}
|
||||
|
||||
def swap(elem: T) = {
|
||||
ensureIsInTransaction
|
||||
ref.set(elem)
|
||||
}
|
||||
|
||||
def alter(f: T => T): T = {
|
||||
ensureIsInTransaction
|
||||
ensureNotNull
|
||||
ref.set(f(ref.get))
|
||||
ref.get
|
||||
}
|
||||
|
||||
def get: Option[T] = {
|
||||
ensureIsInTransaction
|
||||
if (ref.isNull) None
|
||||
else Some(ref.get)
|
||||
}
|
||||
|
||||
def getOrWait: T = {
|
||||
ensureIsInTransaction
|
||||
ref.getOrAwait
|
||||
}
|
||||
|
||||
def getOrElse(default: => T): T = {
|
||||
ensureIsInTransaction
|
||||
if (ref.isNull) default
|
||||
else ref.get
|
||||
}
|
||||
|
||||
def isDefined: Boolean = {
|
||||
ensureIsInTransaction
|
||||
!ref.isNull
|
||||
}
|
||||
|
||||
def isEmpty: Boolean = {
|
||||
ensureIsInTransaction
|
||||
ref.isNull
|
||||
}
|
||||
|
||||
def map[B](f: T => B): Ref[B] = {
|
||||
ensureIsInTransaction
|
||||
if (isEmpty) Ref[B] else Ref(f(ref.get))
|
||||
}
|
||||
|
||||
def flatMap[B](f: T => Ref[B]): Ref[B] = {
|
||||
ensureIsInTransaction
|
||||
if (isEmpty) Ref[B] else f(ref.get)
|
||||
}
|
||||
|
||||
def filter(p: T => Boolean): Ref[T] = {
|
||||
ensureIsInTransaction
|
||||
if (isDefined && p(ref.get)) Ref(ref.get) else Ref[T]
|
||||
}
|
||||
|
||||
/**
|
||||
* Necessary to keep from being implicitly converted to Iterable in for comprehensions.
|
||||
*/
|
||||
def withFilter(p: T => Boolean): WithFilter = new WithFilter(p)
|
||||
|
||||
class WithFilter(p: T => Boolean) {
|
||||
def map[B](f: T => B): Ref[B] = self filter p map f
|
||||
def flatMap[B](f: T => Ref[B]): Ref[B] = self filter p flatMap f
|
||||
def foreach[U](f: T => U): Unit = self filter p foreach f
|
||||
def withFilter(q: T => Boolean): WithFilter = new WithFilter(x => p(x) && q(x))
|
||||
}
|
||||
|
||||
def foreach[U](f: T => U): Unit = {
|
||||
ensureIsInTransaction
|
||||
if (isDefined) f(ref.get)
|
||||
}
|
||||
|
||||
def elements: Iterator[T] = {
|
||||
ensureIsInTransaction
|
||||
if (isEmpty) Iterator.empty else Iterator(ref.get)
|
||||
}
|
||||
|
||||
def toList: List[T] = {
|
||||
ensureIsInTransaction
|
||||
if (isEmpty) List() else List(ref.get)
|
||||
}
|
||||
|
||||
def toRight[X](left: => X) = {
|
||||
ensureIsInTransaction
|
||||
if (isEmpty) Left(left) else Right(ref.get)
|
||||
}
|
||||
|
||||
def toLeft[X](right: => X) = {
|
||||
ensureIsInTransaction
|
||||
if (isEmpty) Right(right) else Left(ref.get)
|
||||
}
|
||||
|
||||
private def ensureIsInTransaction =
|
||||
if (getThreadLocalTransaction eq null) throw new NoTransactionInScopeException
|
||||
|
||||
private def ensureNotNull =
|
||||
if (ref.isNull) throw new RuntimeException("Cannot alter Ref's value when it is null")
|
||||
}
|
||||
|
|
@ -265,3 +265,24 @@ object TransactionStatus {
|
|||
case object Completed extends TransactionStatus
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
@serializable
|
||||
trait Transactional {
|
||||
val uuid: String
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait Committable {
|
||||
def commit: Unit
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait Abortable {
|
||||
def abort: Unit
|
||||
}
|
||||
|
|
|
|||
85
akka-core/src/main/scala/stm/TransactionalMap.scala
Normal file
85
akka-core/src/main/scala/stm/TransactionalMap.scala
Normal file
|
|
@ -0,0 +1,85 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.stm
|
||||
|
||||
import se.scalablesolutions.akka.util.UUID
|
||||
|
||||
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
|
||||
|
||||
object TransactionalMap {
|
||||
def apply[K, V]() = new TransactionalMap[K, V]
|
||||
|
||||
def apply[K, V](pairs: (K, V)*) = new TransactionalMap(Some(HashTrie(pairs: _*)))
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements an in-memory transactional Map based on Clojure's PersistentMap.
|
||||
*
|
||||
* Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class TransactionalMap[K, V](initialOpt: Option[HashTrie[K, V]] = None) extends Transactional with scala.collection.mutable.Map[K, V] {
|
||||
def this() = this(None) // Java compatibility
|
||||
|
||||
val uuid = UUID.newUuid.toString
|
||||
|
||||
protected[this] val ref = new Ref(initialOpt.orElse(Some(new HashTrie[K, V])))
|
||||
|
||||
def -=(key: K) = {
|
||||
remove(key)
|
||||
this
|
||||
}
|
||||
|
||||
def +=(key: K, value: V) = put(key, value)
|
||||
|
||||
def +=(kv: (K, V)) = {
|
||||
put(kv._1,kv._2)
|
||||
this
|
||||
}
|
||||
|
||||
override def remove(key: K) = {
|
||||
val map = ref.get.get
|
||||
val oldValue = map.get(key)
|
||||
ref.swap(ref.get.get - key)
|
||||
oldValue
|
||||
}
|
||||
|
||||
def get(key: K): Option[V] = ref.get.get.get(key)
|
||||
|
||||
override def put(key: K, value: V): Option[V] = {
|
||||
val map = ref.get.get
|
||||
val oldValue = map.get(key)
|
||||
ref.swap(map.update(key, value))
|
||||
oldValue
|
||||
}
|
||||
|
||||
override def update(key: K, value: V) = {
|
||||
val map = ref.get.get
|
||||
val oldValue = map.get(key)
|
||||
ref.swap(map.update(key, value))
|
||||
}
|
||||
|
||||
def iterator = ref.get.get.iterator
|
||||
|
||||
override def elements: Iterator[(K, V)] = ref.get.get.iterator
|
||||
|
||||
override def contains(key: K): Boolean = ref.get.get.contains(key)
|
||||
|
||||
override def clear = ref.swap(new HashTrie[K, V])
|
||||
|
||||
override def size: Int = ref.get.get.size
|
||||
|
||||
override def hashCode: Int = System.identityHashCode(this);
|
||||
|
||||
override def equals(other: Any): Boolean =
|
||||
other.isInstanceOf[TransactionalMap[_, _]] &&
|
||||
other.hashCode == hashCode
|
||||
|
||||
override def toString = if (outsideTransaction) "<TransactionalMap>" else super.toString
|
||||
|
||||
def outsideTransaction =
|
||||
org.multiverse.api.ThreadLocalTransaction.getThreadLocalTransaction eq null
|
||||
}
|
||||
|
|
@ -1,343 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.stm
|
||||
|
||||
import se.scalablesolutions.akka.util.UUID
|
||||
|
||||
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
|
||||
|
||||
/**
|
||||
* Example Scala usage:
|
||||
* <pre>
|
||||
* val myMap = TransactionalState.newMap
|
||||
* val myVector = TransactionalState.newVector
|
||||
* val myRef = TransactionalState.newRef
|
||||
* </pre>
|
||||
* Or:
|
||||
* <pre>
|
||||
* val myMap = TransactionalMap()
|
||||
* val myVector = TransactionalVector()
|
||||
* val myRef = TransactionalRef()
|
||||
* </pre>
|
||||
*
|
||||
* <p/>
|
||||
* Example Java usage:
|
||||
* <pre>
|
||||
* TransactionalMap myMap = TransactionalState.newMap();
|
||||
* </pre>
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object TransactionalState {
|
||||
def newMap[K, V] = TransactionalMap[K, V]()
|
||||
def newMap[K, V](pairs: (K, V)*) = TransactionalMap(pairs: _*)
|
||||
|
||||
def newVector[T] = TransactionalVector[T]()
|
||||
def newVector[T](elems: T*) = TransactionalVector(elems :_*)
|
||||
|
||||
def newRef[T] = TransactionalRef[T]()
|
||||
def newRef[T](initialValue: T) = TransactionalRef(initialValue)
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
@serializable
|
||||
trait Transactional {
|
||||
val uuid: String
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait Committable {
|
||||
def commit: Unit
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait Abortable {
|
||||
def abort: Unit
|
||||
}
|
||||
|
||||
object RefFactory {
|
||||
private val factory = getGlobalStmInstance.getProgrammaticRefFactoryBuilder.build
|
||||
|
||||
def createRef[T] = factory.atomicCreateRef[T]()
|
||||
|
||||
def createRef[T](value: T) = factory.atomicCreateRef(value)
|
||||
}
|
||||
|
||||
/**
|
||||
* Alias to TransactionalRef.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object Ref {
|
||||
type Ref[T] = TransactionalRef[T]
|
||||
|
||||
def apply[T]() = new Ref[T]
|
||||
|
||||
def apply[T](initialValue: T) = new Ref[T](Some(initialValue))
|
||||
}
|
||||
|
||||
/**
|
||||
* Alias to Ref.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object TransactionalRef {
|
||||
|
||||
/**
|
||||
* An implicit conversion that converts a TransactionalRef to an Iterable value.
|
||||
*/
|
||||
implicit def ref2Iterable[T](ref: TransactionalRef[T]): Iterable[T] = ref.toList
|
||||
|
||||
def apply[T]() = new TransactionalRef[T]
|
||||
|
||||
def apply[T](initialValue: T) = new TransactionalRef[T](Some(initialValue))
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements a transactional managed reference.
|
||||
* Alias to Ref.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class TransactionalRef[T](initialOpt: Option[T] = None) extends Transactional {
|
||||
self =>
|
||||
|
||||
import org.multiverse.api.ThreadLocalTransaction._
|
||||
|
||||
implicit val txInitName = "TransactionalRef:Init"
|
||||
val uuid = UUID.newUuid.toString
|
||||
|
||||
private[this] val ref = {
|
||||
if (initialOpt.isDefined) RefFactory.createRef(initialOpt.get)
|
||||
else RefFactory.createRef[T]
|
||||
}
|
||||
|
||||
def swap(elem: T) = {
|
||||
ensureIsInTransaction
|
||||
ref.set(elem)
|
||||
}
|
||||
|
||||
def alter(f: T => T): T = {
|
||||
ensureIsInTransaction
|
||||
ensureNotNull
|
||||
ref.set(f(ref.get))
|
||||
ref.get
|
||||
}
|
||||
|
||||
def get: Option[T] = {
|
||||
ensureIsInTransaction
|
||||
if (ref.isNull) None
|
||||
else Some(ref.get)
|
||||
}
|
||||
|
||||
def getOrWait: T = {
|
||||
ensureIsInTransaction
|
||||
ref.getOrAwait
|
||||
}
|
||||
|
||||
def getOrElse(default: => T): T = {
|
||||
ensureIsInTransaction
|
||||
if (ref.isNull) default
|
||||
else ref.get
|
||||
}
|
||||
|
||||
def isDefined: Boolean = {
|
||||
ensureIsInTransaction
|
||||
!ref.isNull
|
||||
}
|
||||
|
||||
def isEmpty: Boolean = {
|
||||
ensureIsInTransaction
|
||||
ref.isNull
|
||||
}
|
||||
|
||||
def map[B](f: T => B): TransactionalRef[B] = {
|
||||
ensureIsInTransaction
|
||||
if (isEmpty) TransactionalRef[B] else TransactionalRef(f(ref.get))
|
||||
}
|
||||
|
||||
def flatMap[B](f: T => TransactionalRef[B]): TransactionalRef[B] = {
|
||||
ensureIsInTransaction
|
||||
if (isEmpty) TransactionalRef[B] else f(ref.get)
|
||||
}
|
||||
|
||||
def filter(p: T => Boolean): TransactionalRef[T] = {
|
||||
ensureIsInTransaction
|
||||
if (isDefined && p(ref.get)) TransactionalRef(ref.get) else TransactionalRef[T]
|
||||
}
|
||||
|
||||
/**
|
||||
* Necessary to keep from being implicitly converted to Iterable in for comprehensions.
|
||||
*/
|
||||
def withFilter(p: T => Boolean): WithFilter = new WithFilter(p)
|
||||
|
||||
class WithFilter(p: T => Boolean) {
|
||||
def map[B](f: T => B): TransactionalRef[B] = self filter p map f
|
||||
def flatMap[B](f: T => TransactionalRef[B]): TransactionalRef[B] = self filter p flatMap f
|
||||
def foreach[U](f: T => U): Unit = self filter p foreach f
|
||||
def withFilter(q: T => Boolean): WithFilter = new WithFilter(x => p(x) && q(x))
|
||||
}
|
||||
|
||||
def foreach[U](f: T => U): Unit = {
|
||||
ensureIsInTransaction
|
||||
if (isDefined) f(ref.get)
|
||||
}
|
||||
|
||||
def elements: Iterator[T] = {
|
||||
ensureIsInTransaction
|
||||
if (isEmpty) Iterator.empty else Iterator(ref.get)
|
||||
}
|
||||
|
||||
def toList: List[T] = {
|
||||
ensureIsInTransaction
|
||||
if (isEmpty) List() else List(ref.get)
|
||||
}
|
||||
|
||||
def toRight[X](left: => X) = {
|
||||
ensureIsInTransaction
|
||||
if (isEmpty) Left(left) else Right(ref.get)
|
||||
}
|
||||
|
||||
def toLeft[X](right: => X) = {
|
||||
ensureIsInTransaction
|
||||
if (isEmpty) Right(right) else Left(ref.get)
|
||||
}
|
||||
|
||||
private def ensureIsInTransaction =
|
||||
()// if (getThreadLocalTransaction eq null) throw new NoTransactionInScopeException
|
||||
|
||||
private def ensureNotNull =
|
||||
if (ref.isNull) throw new RuntimeException("Cannot alter Ref's value when it is null")
|
||||
}
|
||||
|
||||
object TransactionalMap {
|
||||
def apply[K, V]() = new TransactionalMap[K, V]
|
||||
|
||||
def apply[K, V](pairs: (K, V)*) = new TransactionalMap(Some(HashTrie(pairs: _*)))
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements an in-memory transactional Map based on Clojure's PersistentMap.
|
||||
*
|
||||
* Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class TransactionalMap[K, V](initialOpt: Option[HashTrie[K, V]] = None) extends Transactional with scala.collection.mutable.Map[K, V] {
|
||||
val uuid = UUID.newUuid.toString
|
||||
|
||||
protected[this] val ref = new TransactionalRef(initialOpt.orElse(Some(new HashTrie[K, V])))
|
||||
|
||||
def -=(key: K) = {
|
||||
remove(key)
|
||||
this
|
||||
}
|
||||
|
||||
def +=(key: K, value: V) = put(key, value)
|
||||
|
||||
def +=(kv: (K, V)) = {
|
||||
put(kv._1,kv._2)
|
||||
this
|
||||
}
|
||||
|
||||
override def remove(key: K) = {
|
||||
val map = ref.get.get
|
||||
val oldValue = map.get(key)
|
||||
ref.swap(ref.get.get - key)
|
||||
oldValue
|
||||
}
|
||||
|
||||
def get(key: K): Option[V] = ref.get.get.get(key)
|
||||
|
||||
override def put(key: K, value: V): Option[V] = {
|
||||
val map = ref.get.get
|
||||
val oldValue = map.get(key)
|
||||
ref.swap(map.update(key, value))
|
||||
oldValue
|
||||
}
|
||||
|
||||
override def update(key: K, value: V) = {
|
||||
val map = ref.get.get
|
||||
val oldValue = map.get(key)
|
||||
ref.swap(map.update(key, value))
|
||||
}
|
||||
|
||||
def iterator = ref.get.get.iterator
|
||||
|
||||
override def elements: Iterator[(K, V)] = ref.get.get.iterator
|
||||
|
||||
override def contains(key: K): Boolean = ref.get.get.contains(key)
|
||||
|
||||
override def clear = ref.swap(new HashTrie[K, V])
|
||||
|
||||
override def size: Int = ref.get.get.size
|
||||
|
||||
override def hashCode: Int = System.identityHashCode(this);
|
||||
|
||||
override def equals(other: Any): Boolean =
|
||||
other.isInstanceOf[TransactionalMap[_, _]] &&
|
||||
other.hashCode == hashCode
|
||||
|
||||
override def toString = if (outsideTransaction) "<TransactionalMap>" else super.toString
|
||||
|
||||
def outsideTransaction =
|
||||
org.multiverse.api.ThreadLocalTransaction.getThreadLocalTransaction eq null
|
||||
}
|
||||
|
||||
object TransactionalVector {
|
||||
def apply[T]() = new TransactionalVector[T]
|
||||
|
||||
def apply[T](elems: T*) = new TransactionalVector(Some(Vector(elems: _*)))
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements an in-memory transactional Vector based on Clojure's PersistentVector.
|
||||
*
|
||||
* Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class TransactionalVector[T](initialOpt: Option[Vector[T]] = None) extends Transactional with IndexedSeq[T] {
|
||||
val uuid = UUID.newUuid.toString
|
||||
|
||||
private[this] val ref = new TransactionalRef(initialOpt.orElse(Some(EmptyVector)))
|
||||
|
||||
def clear = ref.swap(EmptyVector)
|
||||
|
||||
def +(elem: T) = add(elem)
|
||||
|
||||
def add(elem: T) = ref.swap(ref.get.get + elem)
|
||||
|
||||
def get(index: Int): T = ref.get.get.apply(index)
|
||||
|
||||
/**
|
||||
* Removes the <i>tail</i> element of this vector.
|
||||
*/
|
||||
def pop = ref.swap(ref.get.get.pop)
|
||||
|
||||
def update(index: Int, elem: T) = ref.swap(ref.get.get.update(index, elem))
|
||||
|
||||
def length: Int = ref.get.get.length
|
||||
|
||||
def apply(index: Int): T = ref.get.get.apply(index)
|
||||
|
||||
override def hashCode: Int = System.identityHashCode(this);
|
||||
|
||||
override def equals(other: Any): Boolean =
|
||||
other.isInstanceOf[TransactionalVector[_]] &&
|
||||
other.hashCode == hashCode
|
||||
|
||||
override def toString = if (outsideTransaction) "<TransactionalVector>" else super.toString
|
||||
|
||||
def outsideTransaction =
|
||||
org.multiverse.api.ThreadLocalTransaction.getThreadLocalTransaction eq null
|
||||
}
|
||||
|
||||
60
akka-core/src/main/scala/stm/TransactionalVector.scala
Normal file
60
akka-core/src/main/scala/stm/TransactionalVector.scala
Normal file
|
|
@ -0,0 +1,60 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.stm
|
||||
|
||||
import se.scalablesolutions.akka.util.UUID
|
||||
|
||||
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
|
||||
|
||||
object TransactionalVector {
|
||||
def apply[T]() = new TransactionalVector[T]
|
||||
|
||||
def apply[T](elems: T*) = new TransactionalVector(Some(Vector(elems: _*)))
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements an in-memory transactional Vector based on Clojure's PersistentVector.
|
||||
*
|
||||
* Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class TransactionalVector[T](initialOpt: Option[Vector[T]] = None) extends Transactional with IndexedSeq[T] {
|
||||
def this() = this(None) // Java compatibility
|
||||
|
||||
val uuid = UUID.newUuid.toString
|
||||
|
||||
private[this] val ref = new Ref(initialOpt.orElse(Some(EmptyVector)))
|
||||
|
||||
def clear = ref.swap(EmptyVector)
|
||||
|
||||
def +(elem: T) = add(elem)
|
||||
|
||||
def add(elem: T) = ref.swap(ref.get.get + elem)
|
||||
|
||||
def get(index: Int): T = ref.get.get.apply(index)
|
||||
|
||||
/**
|
||||
* Removes the <i>tail</i> element of this vector.
|
||||
*/
|
||||
def pop = ref.swap(ref.get.get.pop)
|
||||
|
||||
def update(index: Int, elem: T) = ref.swap(ref.get.get.update(index, elem))
|
||||
|
||||
def length: Int = ref.get.get.length
|
||||
|
||||
def apply(index: Int): T = ref.get.get.apply(index)
|
||||
|
||||
override def hashCode: Int = System.identityHashCode(this);
|
||||
|
||||
override def equals(other: Any): Boolean =
|
||||
other.isInstanceOf[TransactionalVector[_]] &&
|
||||
other.hashCode == hashCode
|
||||
|
||||
override def toString = if (outsideTransaction) "<TransactionalVector>" else super.toString
|
||||
|
||||
def outsideTransaction =
|
||||
org.multiverse.api.ThreadLocalTransaction.getThreadLocalTransaction eq null
|
||||
}
|
||||
|
|
@ -4,7 +4,7 @@ import java.util.concurrent.{TimeUnit, CountDownLatch}
|
|||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
import se.scalablesolutions.akka.stm.{TransactionalState, TransactionalMap, TransactionalRef, TransactionalVector}
|
||||
import se.scalablesolutions.akka.stm.{Ref, TransactionalMap, TransactionalVector}
|
||||
import Actor._
|
||||
|
||||
object InMemoryActorSpec {
|
||||
|
|
@ -35,9 +35,9 @@ class InMemStatefulActor(expectedInvocationCount: Int) extends Transactor {
|
|||
|
||||
val notifier = new CountDownLatch(expectedInvocationCount)
|
||||
|
||||
private lazy val mapState = TransactionalState.newMap[String, String]
|
||||
private lazy val vectorState = TransactionalState.newVector[String]
|
||||
private lazy val refState = TransactionalState.newRef[String]
|
||||
private lazy val mapState = TransactionalMap[String, String]()
|
||||
private lazy val vectorState = TransactionalVector[String]()
|
||||
private lazy val refState = Ref[String]()
|
||||
|
||||
def receive = {
|
||||
case GetNotifier =>
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ class StmSpec extends
|
|||
it("should be able to do multiple consecutive atomic {..} statements") {
|
||||
import Transaction.Local._
|
||||
|
||||
lazy val ref = TransactionalState.newRef[Int]
|
||||
lazy val ref = Ref[Int]()
|
||||
|
||||
def increment = atomic {
|
||||
ref.swap(ref.get.getOrElse(0) + 1)
|
||||
|
|
@ -40,7 +40,7 @@ class StmSpec extends
|
|||
it("should be able to do nested atomic {..} statements") {
|
||||
import Transaction.Local._
|
||||
|
||||
lazy val ref = TransactionalState.newRef[Int]
|
||||
lazy val ref = Ref[Int]()
|
||||
|
||||
def increment = atomic {
|
||||
ref.swap(ref.get.getOrElse(0) + 1)
|
||||
|
|
@ -62,7 +62,7 @@ class StmSpec extends
|
|||
it("should roll back failing nested atomic {..} statements") {
|
||||
import Transaction.Local._
|
||||
|
||||
lazy val ref = TransactionalState.newRef[Int]
|
||||
lazy val ref = Ref[Int]()
|
||||
|
||||
def increment = atomic {
|
||||
ref.swap(ref.get.getOrElse(0) + 1)
|
||||
|
|
@ -213,7 +213,7 @@ class NestedTransactorLevelOneActor extends Actor {
|
|||
}
|
||||
}
|
||||
|
||||
class NestedTransactorLevelTwoActor extends Actor {
|
||||
class NestedTransactorLevelTwoActor extends Transactor {
|
||||
import GlobalTransactionVectorTestActor._
|
||||
private val ref = Ref(0)
|
||||
|
||||
|
|
|
|||
|
|
@ -82,9 +82,9 @@ trait Storage {
|
|||
*/
|
||||
trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
|
||||
with Transactional with Committable with Abortable with Logging {
|
||||
protected val newAndUpdatedEntries = TransactionalState.newMap[K, V]
|
||||
protected val removedEntries = TransactionalState.newVector[K]
|
||||
protected val shouldClearOnCommit = TransactionalRef[Boolean]()
|
||||
protected val newAndUpdatedEntries = TransactionalMap[K, V]()
|
||||
protected val removedEntries = TransactionalVector[K]()
|
||||
protected val shouldClearOnCommit = Ref[Boolean]()
|
||||
|
||||
// to be concretized in subclasses
|
||||
val storage: MapStorageBackend[K, V]
|
||||
|
|
@ -195,10 +195,10 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committable with Abortable {
|
||||
protected val newElems = TransactionalState.newVector[T]
|
||||
protected val updatedElems = TransactionalState.newMap[Int, T]
|
||||
protected val removedElems = TransactionalState.newVector[T]
|
||||
protected val shouldClearOnCommit = TransactionalRef[Boolean]()
|
||||
protected val newElems = TransactionalVector[T]()
|
||||
protected val updatedElems = TransactionalMap[Int, T]()
|
||||
protected val removedElems = TransactionalVector[T]()
|
||||
protected val shouldClearOnCommit = Ref[Boolean]()
|
||||
|
||||
val storage: VectorStorageBackend[T]
|
||||
|
||||
|
|
@ -276,7 +276,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait PersistentRef[T] extends Transactional with Committable with Abortable {
|
||||
protected val ref = new TransactionalRef[T]
|
||||
protected val ref = Ref[T]()
|
||||
|
||||
val storage: RefStorageBackend[T]
|
||||
|
||||
|
|
@ -343,14 +343,14 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
|
|||
import scala.collection.immutable.Queue
|
||||
|
||||
// current trail that will be played on commit to the underlying store
|
||||
protected val enqueuedNDequeuedEntries = TransactionalState.newVector[(Option[A], QueueOp)]
|
||||
protected val shouldClearOnCommit = TransactionalRef[Boolean]()
|
||||
protected val enqueuedNDequeuedEntries = TransactionalVector[(Option[A], QueueOp)]()
|
||||
protected val shouldClearOnCommit = Ref[Boolean]()
|
||||
|
||||
// local queue that will record all enqueues and dequeues in the current txn
|
||||
protected val localQ = TransactionalRef[Queue[A]]()
|
||||
protected val localQ = Ref[Queue[A]]()
|
||||
|
||||
// keeps a pointer to the underlying storage for the enxt candidate to be dequeued
|
||||
protected val pickMeForDQ = TransactionalRef[Int]()
|
||||
protected val pickMeForDQ = Ref[Int]()
|
||||
|
||||
localQ.swap(Queue.empty)
|
||||
pickMeForDQ.swap(0)
|
||||
|
|
@ -481,8 +481,8 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
|
|||
*/
|
||||
trait PersistentSortedSet[A] extends Transactional with Committable with Abortable {
|
||||
|
||||
protected val newElems = TransactionalState.newMap[A, Float]
|
||||
protected val removedElems = TransactionalState.newVector[A]
|
||||
protected val newElems = TransactionalMap[A, Float]()
|
||||
protected val removedElems = TransactionalVector[A]()
|
||||
|
||||
val storage: SortedSetStorageBackend[A]
|
||||
|
||||
|
|
|
|||
|
|
@ -10,7 +10,6 @@ import se.scalablesolutions.akka
|
|||
import akka.actor.{ActorRef, Transactor, Scheduler}
|
||||
import akka.actor.Actor.actorOf
|
||||
import akka.stm.{Vector => _, _}
|
||||
import akka.stm.Ref.Ref
|
||||
import akka.stm.Transaction.Local._
|
||||
|
||||
object Config {
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ package sample.lift
|
|||
|
||||
import se.scalablesolutions.akka.actor.{Transactor, Actor}
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.stm.TransactionalState
|
||||
import se.scalablesolutions.akka.stm.TransactionalMap
|
||||
import se.scalablesolutions.akka.persistence.cassandra.CassandraStorage
|
||||
import Actor._
|
||||
|
||||
|
|
@ -22,7 +22,7 @@ class SimpleService extends Transactor {
|
|||
case object Tick
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
private lazy val storage = TransactionalState.newMap[String, Integer]
|
||||
private lazy val storage = TransactionalMap[String, Integer]()
|
||||
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
|
|
|
|||
|
|
@ -9,7 +9,6 @@ import se.scalablesolutions.akka.actor.ActiveObjectContext;
|
|||
import se.scalablesolutions.akka.actor.annotation.transactionrequired;
|
||||
import se.scalablesolutions.akka.actor.annotation.prerestart;
|
||||
import se.scalablesolutions.akka.actor.annotation.postrestart;
|
||||
import se.scalablesolutions.akka.stm.TransactionalState;
|
||||
import se.scalablesolutions.akka.stm.TransactionalMap;
|
||||
|
||||
@transactionrequired
|
||||
|
|
@ -21,7 +20,7 @@ public class SimpleService {
|
|||
private Receiver receiver = ActiveObject.newInstance(Receiver.class);
|
||||
|
||||
public String count() {
|
||||
if (storage == null) storage = TransactionalState.newMap();
|
||||
if (storage == null) storage = new TransactionalMap<String, Integer>();
|
||||
if (!hasStartedTicking) {
|
||||
storage.put(KEY, 0);
|
||||
hasStartedTicking = true;
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ package sample.rest.scala
|
|||
|
||||
import se.scalablesolutions.akka.actor.{Transactor, SupervisorFactory, Actor}
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
import se.scalablesolutions.akka.stm.TransactionalState
|
||||
import se.scalablesolutions.akka.stm.TransactionalMap
|
||||
import se.scalablesolutions.akka.persistence.cassandra.CassandraStorage
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
|
|
@ -63,7 +63,7 @@ class SimpleService {
|
|||
class SimpleServiceActor extends Transactor {
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
private lazy val storage = TransactionalState.newMap[String, Integer]
|
||||
private lazy val storage = TransactionalMap[String, Integer]()
|
||||
|
||||
def receive = {
|
||||
case "Tick" => if (hasStartedTicking) {
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import se.scalablesolutions.akka.actor.Actor._
|
|||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.security.{BasicAuthenticationActor,BasicCredentials,SpnegoAuthenticationActor,DigestAuthenticationActor, UserInfo}
|
||||
import se.scalablesolutions.akka.stm.TransactionalState
|
||||
import se.scalablesolutions.akka.stm.TransactionalMap
|
||||
import se.scalablesolutions.akka.actor.ActorRegistry.actorsFor
|
||||
|
||||
class Boot {
|
||||
|
|
@ -135,7 +135,7 @@ class SecureTickService {
|
|||
class SecureTickActor extends Transactor with Logging {
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
private lazy val storage = TransactionalState.newMap[String, Integer]
|
||||
private lazy val storage = TransactionalMap[String, Integer]()
|
||||
def receive = {
|
||||
case "Tick" => if (hasStartedTicking) {
|
||||
val counter = storage.get(KEY).get.intValue
|
||||
|
|
|
|||
|
|
@ -3,21 +3,20 @@ package se.scalablesolutions.akka.spring.foo;
|
|||
import se.scalablesolutions.akka.actor.annotation.inittransactionalstate;
|
||||
import se.scalablesolutions.akka.stm.TransactionalMap;
|
||||
import se.scalablesolutions.akka.stm.TransactionalVector;
|
||||
import se.scalablesolutions.akka.stm.TransactionalRef;
|
||||
import se.scalablesolutions.akka.stm.TransactionalState;
|
||||
import se.scalablesolutions.akka.stm.Ref;
|
||||
|
||||
public class StatefulPojo {
|
||||
private TransactionalMap<String, String> mapState;
|
||||
private TransactionalVector<String> vectorState;
|
||||
private TransactionalRef<String> refState;
|
||||
private Ref<String> refState;
|
||||
private boolean isInitialized = false;
|
||||
|
||||
@inittransactionalstate
|
||||
public void init() {
|
||||
if (!isInitialized) {
|
||||
mapState = TransactionalState.newMap();
|
||||
vectorState = TransactionalState.newVector();
|
||||
refState = TransactionalState.newRef();
|
||||
mapState = new TransactionalMap();
|
||||
vectorState = new TransactionalVector();
|
||||
refState = new Ref();
|
||||
isInitialized = true;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue