diff --git a/kernel/src/main/scala/actor/Actor.scala b/kernel/src/main/scala/actor/Actor.scala
old mode 100644
new mode 100755
index 5e21048fd4..139e5d914b
--- a/kernel/src/main/scala/actor/Actor.scala
+++ b/kernel/src/main/scala/actor/Actor.scala
@@ -19,6 +19,7 @@ import nio.{RemoteProtocolBuilder, RemoteClient, RemoteServer, RemoteRequestIdFa
import management.Management
import com.twitter.service.Stats
+import org.multiverse.utils.TransactionThreadLocal._
sealed abstract class LifecycleMessage
case class Init(config: AnyRef) extends LifecycleMessage
@@ -464,7 +465,10 @@ trait Actor extends Logging with TransactionManagement {
}
private def dispatch[T](messageHandle: MessageInvocation) = {
- if (messageHandle.tx.isDefined) TransactionManagement.threadBoundTx.set(messageHandle.tx)
+ if (messageHandle.tx.isDefined) {
+ TransactionManagement.threadBoundTx.set(messageHandle.tx)
+ setThreadLocalTransaction(messageHandle.tx.get.transaction)
+ }
val message = messageHandle.message //serializeMessage(messageHandle.message)
val future = messageHandle.future
try {
@@ -479,11 +483,15 @@ trait Actor extends Logging with TransactionManagement {
else e.printStackTrace
} finally {
TransactionManagement.threadBoundTx.set(None)
+ setThreadLocalTransaction(null)
}
}
private def transactionalDispatch[T](messageHandle: MessageInvocation) = {
- if (messageHandle.tx.isDefined) TransactionManagement.threadBoundTx.set(messageHandle.tx)
+ if (messageHandle.tx.isDefined) {
+ TransactionManagement.threadBoundTx.set(messageHandle.tx)
+ setThreadLocalTransaction(messageHandle.tx.get.transaction)
+ }
val message = messageHandle.message //serializeMessage(messageHandle.message)
val future = messageHandle.future
try {
@@ -500,6 +508,7 @@ trait Actor extends Logging with TransactionManagement {
case e =>
rollback(activeTx)
TransactionManagement.threadBoundTx.set(None) // need to clear threadBoundTx before call to supervisor
+ setThreadLocalTransaction(null)
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
if (supervisor.isDefined) supervisor.get ! Exit(this, e)
if (future.isDefined) future.get.completeWithException(this, e)
@@ -510,6 +519,7 @@ trait Actor extends Logging with TransactionManagement {
else tryToPrecommitTransaction
rescheduleClashedMessages
TransactionManagement.threadBoundTx.set(None)
+ setThreadLocalTransaction(null)
}
}
diff --git a/kernel/src/main/scala/nio/RemoteServer.scala b/kernel/src/main/scala/nio/RemoteServer.scala
old mode 100644
new mode 100755
index bbc223247e..fda24718c4
--- a/kernel/src/main/scala/nio/RemoteServer.scala
+++ b/kernel/src/main/scala/nio/RemoteServer.scala
@@ -235,7 +235,11 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
if (tx.isDefined) {
tx.get.reinit
TransactionManagement.threadBoundTx.set(tx)
- } else TransactionManagement.threadBoundTx.set(None)
+ setThreadLocalTransaction(tx.transaction)
+ } else {
+ TransactionManagement.threadBoundTx.set(None)
+ setThreadLocalTransaction(null)
+ }
}
*/
private def unescapeArgs(args: scala.List[AnyRef], argClasses: scala.List[Class[_]], timeout: Long) = {
diff --git a/kernel/src/main/scala/state/InMemoryState.scala b/kernel/src/main/scala/state/InMemoryState.scala
new file mode 100755
index 0000000000..002929ab5e
--- /dev/null
+++ b/kernel/src/main/scala/state/InMemoryState.scala
@@ -0,0 +1,61 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.kernel.state
+
+import kernel.stm.{Ref, TransactionManagement}
+import akka.collection._
+
+import org.codehaus.aspectwerkz.proxy.Uuid
+
+import scala.collection.mutable.{ArrayBuffer, HashMap}
+
+/**
+ * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
+ *
+ * @author Jonas Bonér
+ */
+class InMemoryTransactionalMap[K, V] extends TransactionalMap[K, V] {
+ protected[this] val ref = new TransactionalRef[HashTrie[K, V]](new HashTrie[K, V])
+
+ // ---- Overriding scala.collection.mutable.Map behavior ----
+ override def contains(key: K): Boolean = ref.get.get.contains(key)
+ override def clear = ref.swap(new HashTrie[K, V])
+ override def size: Int = ref.get.get.size
+
+ // ---- For scala.collection.mutable.Map ----
+ override def remove(key: K) = ref.swap(ref.get.get - key)
+ override def elements: Iterator[(K, V)] = ref.get.get.elements
+ override def get(key: K): Option[V] = ref.get.get.get(key)
+ override def put(key: K, value: V): Option[V] = {
+ val map = ref.get.get
+ val oldValue = map.get(key)
+ ref.swap(map.update(key, value))
+ oldValue
+ }
+ override def -=(key: K) = remove(key)
+ override def update(key: K, value: V) = put(key, value)
+}
+
+/**
+ * Implements an in-memory transactional vector.
+ *
+ * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
+ *
+ * @author Jonas Bonér
+ */
+class InMemoryTransactionalVector[T] extends TransactionalVector[T] {
+ private[kernel] val ref = new TransactionalRef[Vector[T]](EmptyVector)
+
+ def add(elem: T) = ref.swap(ref.get.get + elem)
+ def get(index: Int): T = ref.get.get.apply(index)
+ def getRange(start: Int, count: Int): List[T] = ref.get.get.slice(start, count).toList.asInstanceOf[List[T]]
+
+ // ---- For Seq ----
+ def length: Int = ref.get.get.length
+ def apply(index: Int): T = ref.get.get.apply(index)
+ override def elements: Iterator[T] = ref.get.get.elements
+ override def toList: List[T] = ref.get.get.toList
+}
+
diff --git a/kernel/src/main/scala/state/PersistentState.scala b/kernel/src/main/scala/state/PersistentState.scala
new file mode 100755
index 0000000000..9b75c38751
--- /dev/null
+++ b/kernel/src/main/scala/state/PersistentState.scala
@@ -0,0 +1,269 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.kernel.state
+
+import kernel.stm.{Ref, TransactionManagement}
+import akka.collection._
+
+import org.codehaus.aspectwerkz.proxy.Uuid
+
+import scala.collection.mutable.{ArrayBuffer, HashMap}
+
+import org.multiverse.utils.TransactionThreadLocal._
+
+/**
+ * Base class for all persistent transactional map implementations should extend.
+ * Implements a Unit of Work, records changes into a change set.
+ *
+ * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
+ *
+ * @author Jonas Bonér
+ */
+abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
+
+ // FIXME: need to handle remove in another changeSet
+ protected[kernel] val changeSet = new HashMap[K, V]
+
+ def getRange(start: Option[AnyRef], count: Int)
+
+ // ---- For scala.collection.mutable.Map ----
+ override def put(key: K, value: V): Option[V] = {
+ verifyTransaction
+ changeSet += key -> value
+ None // always return None to speed up writes (else need to go to DB to get
+ }
+
+ override def -=(key: K) = remove(key)
+
+ override def update(key: K, value: V) = put(key, value)
+}
+
+/**
+ * Implementation of PersistentTransactionalMap for every concrete
+ * storage will have the same workflow. This abstracts the workflow.
+ *
+ * Subclasses just need to provide the actual concrete instance for the
+ * abstract val storage.
+ *
+ * @author Jonas Bonér
+ */
+abstract class TemplatePersistentTransactionalMap extends PersistentTransactionalMap[AnyRef, AnyRef] {
+
+ // to be concretized in subclasses
+ val storage: MapStorage
+
+ override def remove(key: AnyRef) = {
+ if (changeSet.contains(key)) changeSet -= key
+ else storage.removeMapStorageFor(uuid, key)
+ }
+
+ override def getRange(start: Option[AnyRef], count: Int) =
+ getRange(start, None, count)
+
+ def getRange(start: Option[AnyRef], finish: Option[AnyRef], count: Int) = {
+ try {
+ storage.getMapStorageRangeFor(uuid, start, finish, count)
+ } catch {
+ case e: Exception => Nil
+ }
+ }
+
+ // ---- For Transactional ----
+ override def commit = {
+ storage.insertMapStorageEntriesFor(uuid, changeSet.toList)
+ changeSet.clear
+ }
+
+ // ---- Overriding scala.collection.mutable.Map behavior ----
+ override def clear = {
+ try {
+ storage.removeMapStorageFor(uuid)
+ } catch {
+ case e: Exception => {}
+ }
+ }
+
+ override def contains(key: AnyRef): Boolean = {
+ try {
+ verifyTransaction
+ storage.getMapStorageEntryFor(uuid, key).isDefined
+ } catch {
+ case e: Exception => false
+ }
+ }
+
+ override def size: Int = {
+ verifyTransaction
+ try {
+ storage.getMapStorageSizeFor(uuid)
+ } catch {
+ case e: Exception => 0
+ }
+ }
+
+ // ---- For scala.collection.mutable.Map ----
+ override def get(key: AnyRef): Option[AnyRef] = {
+ verifyTransaction
+ // if (changeSet.contains(key)) changeSet.get(key)
+ // else {
+ val result = try {
+ storage.getMapStorageEntryFor(uuid, key)
+ } catch {
+ case e: Exception => None
+ }
+ result
+ //}
+ }
+
+ override def elements: Iterator[Tuple2[AnyRef, AnyRef]] = {
+ //verifyTransaction
+ new Iterator[Tuple2[AnyRef, AnyRef]] {
+ private val originalList: List[Tuple2[AnyRef, AnyRef]] = try {
+ storage.getMapStorageFor(uuid)
+ } catch {
+ case e: Throwable => Nil
+ }
+ private var elements = originalList.reverse
+ override def next: Tuple2[AnyRef, AnyRef]= synchronized {
+ val element = elements.head
+ elements = elements.tail
+ element
+ }
+ override def hasNext: Boolean = synchronized { !elements.isEmpty }
+ }
+ }
+}
+
+
+/**
+ * Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage.
+ *
+ * @author Debasish Ghosh
+ */
+class CassandraPersistentTransactionalMap extends TemplatePersistentTransactionalMap {
+ val storage = CassandraStorage
+}
+
+/**
+ * Implements a persistent transactional map based on the MongoDB distributed P2P key-value storage.
+ *
+ * @author Debasish Ghosh
+ */
+class MongoPersistentTransactionalMap extends TemplatePersistentTransactionalMap {
+ val storage = MongoStorage
+}
+
+/**
+ * Base class for all persistent transactional vector implementations should extend.
+ * Implements a Unit of Work, records changes into a change set.
+ *
+ * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
+ *
+ * @author Jonas Bonér
+ */
+abstract class PersistentTransactionalVector[T] extends TransactionalVector[T] {
+
+ // FIXME: need to handle remove in another changeSet
+ protected[kernel] val changeSet = new ArrayBuffer[T]
+
+ // ---- For TransactionalVector ----
+ override def add(value: T) = {
+ verifyTransaction
+ changeSet += value
+ }
+}
+
+/**
+ * Implements a template for a concrete persistent transactional vector based storage.
+ *
+ * @author Debasish Ghosh
+ */
+abstract class TemplatePersistentTransactionalVector extends PersistentTransactionalVector[AnyRef] {
+
+ val storage: VectorStorage
+
+ // ---- For TransactionalVector ----
+ override def get(index: Int): AnyRef = {
+ verifyTransaction
+ if (changeSet.size > index) changeSet(index)
+ else storage.getVectorStorageEntryFor(uuid, index)
+ }
+
+ override def getRange(start: Int, count: Int): List[AnyRef] =
+ getRange(Some(start), None, count)
+
+ def getRange(start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
+ verifyTransaction
+ storage.getVectorStorageRangeFor(uuid, start, finish, count)
+ }
+
+ override def length: Int = {
+ verifyTransaction
+ storage.getVectorStorageSizeFor(uuid)
+ }
+
+ override def apply(index: Int): AnyRef = get(index)
+
+ override def first: AnyRef = get(0)
+
+ override def last: AnyRef = {
+ verifyTransaction
+ val l = length
+ if (l == 0) throw new NoSuchElementException("Vector is empty")
+ get(length - 1)
+ }
+
+ override def commit = {
+ // FIXME: should use batch function once the bug is resolved
+ for (element <- changeSet) storage.insertVectorStorageEntryFor(uuid, element)
+ changeSet.clear
+ }
+}
+
+/**
+ * Implements a persistent transactional vector based on the Cassandra distributed P2P key-value storage.
+ *
+ * @author Debaissh Ghosh
+ */
+class CassandraPersistentTransactionalVector extends TemplatePersistentTransactionalVector {
+ val storage = CassandraStorage
+}
+
+/**
+ * Implements a persistent transactional vector based on the MongoDB distributed P2P key-value storage.
+ *
+ * @author Debaissh Ghosh
+ */
+class MongoPersistentTransactionalVector extends TemplatePersistentTransactionalVector {
+ val storage = MongoStorage
+}
+
+/**
+ * Implements a transactional reference.
+ *
+ * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
+ *
+ * @author Jonas Bonér
+ */
+class CassandraPersistentTransactionalRef extends TransactionalRef[AnyRef] {
+ override def commit = if (isDefined) {
+ CassandraStorage.insertRefStorageFor(uuid, ref.get)
+ ref.clear
+ }
+
+ override def get: Option[AnyRef] = {
+ verifyTransaction
+ if (isDefined) super.get
+ else CassandraStorage.getRefStorageFor(uuid)
+ }
+
+ override def isDefined: Boolean = get.isDefined
+
+ override def getOrElse(default: => AnyRef): AnyRef = {
+ val ref = get
+ if (ref.isDefined) ref.get
+ else default
+ }
+}
diff --git a/kernel/src/main/scala/state/State.scala b/kernel/src/main/scala/state/State.scala
deleted file mode 100644
index 6e012837d7..0000000000
--- a/kernel/src/main/scala/state/State.scala
+++ /dev/null
@@ -1,548 +0,0 @@
-/**
- * Copyright (C) 2009 Scalable Solutions.
- */
-
-package se.scalablesolutions.akka.kernel.state
-
-import kernel.stm.TransactionManagement
-import akka.collection._
-
-import org.codehaus.aspectwerkz.proxy.Uuid
-
-import scala.collection.mutable.{ArrayBuffer, HashMap}
-
-sealed abstract class TransactionalStateConfig
-abstract class PersistentStorageConfig extends TransactionalStateConfig
-case class CassandraStorageConfig extends PersistentStorageConfig
-case class TerracottaStorageConfig extends PersistentStorageConfig
-case class TokyoCabinetStorageConfig extends PersistentStorageConfig
-case class MongoStorageConfig extends PersistentStorageConfig
-
-/**
- * Scala API.
- *
- * Example Scala usage:
- *
- * val myMap = TransactionalState.newPersistentMap(CassandraStorageConfig)
- *
- */
-object TransactionalState extends TransactionalState
-
-/**
- * Java API.
- *
- * Example Java usage:
- *
- * TransactionalState state = new TransactionalState();
- * TransactionalMap myMap = state.newPersistentMap(new CassandraStorageConfig());
- *
- */
-class TransactionalState {
- def newPersistentMap(config: PersistentStorageConfig): TransactionalMap[AnyRef, AnyRef] = config match {
- case CassandraStorageConfig() => new CassandraPersistentTransactionalMap
- case MongoStorageConfig() => new MongoPersistentTransactionalMap
- case TerracottaStorageConfig() => throw new UnsupportedOperationException
- case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
- }
-
- def newPersistentVector(config: PersistentStorageConfig): TransactionalVector[AnyRef] = config match {
- case CassandraStorageConfig() => new CassandraPersistentTransactionalVector
- case MongoStorageConfig() => new MongoPersistentTransactionalVector
- case TerracottaStorageConfig() => throw new UnsupportedOperationException
- case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
- }
-
- def newPersistentRef(config: PersistentStorageConfig): TransactionalRef[AnyRef] = config match {
- case CassandraStorageConfig() => new CassandraPersistentTransactionalRef
- case MongoStorageConfig() => new MongoPersistentTransactionalRef
- case TerracottaStorageConfig() => throw new UnsupportedOperationException
- case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
- }
-
- def newInMemoryMap[K, V]: TransactionalMap[K, V] = new InMemoryTransactionalMap[K, V]
-
- def newInMemoryVector[T]: TransactionalVector[T] = new InMemoryTransactionalVector[T]
-
- def newInMemoryRef[T]: TransactionalRef[T] = new TransactionalRef[T]
-}
-
-/**
- * @author Jonas Bonér
- */
-@serializable
-trait Transactional {
- // FIXME: won't work across the cluster
- val uuid = Uuid.newUuid.toString
-
- private[kernel] def begin
- private[kernel] def commit
- private[kernel] def rollback
-
- protected def verifyTransaction = {
- val cflowTx = TransactionManagement.threadBoundTx.get
- if (!cflowTx.isDefined) {
- throw new IllegalStateException("Can't access transactional reference outside the scope of a transaction [" + this + "]")
- } else {
- cflowTx.get.register(this)
- }
- }
-}
-
-/**
- * Base trait for all state implementations (persistent or in-memory).
- *
- * FIXME: Create Java versions using pcollections
- *
- * @author Jonas Bonér
- */
-trait TransactionalMap[K, V] extends Transactional with scala.collection.mutable.Map[K, V] {
- override def hashCode: Int = System.identityHashCode(this);
- override def equals(other: Any): Boolean = false
- def remove(key: K)
-}
-
-/**
- * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
- *
- * @author Jonas Bonér
- */
-class InMemoryTransactionalMap[K, V] extends TransactionalMap[K, V] {
- protected[kernel] var state = new HashTrie[K, V]
- protected[kernel] var snapshot = state
-
- // ---- For Transactional ----
- override def begin = snapshot = state
- override def commit = snapshot = state
- override def rollback = state = snapshot
-
- // ---- Overriding scala.collection.mutable.Map behavior ----
- override def contains(key: K): Boolean = {
- verifyTransaction
- state.contains(key)
- }
-
- override def clear = {
- verifyTransaction
- state = new HashTrie[K, V]
- }
-
- override def size: Int = {
- verifyTransaction
- state.size
- }
-
- // ---- For scala.collection.mutable.Map ----
- override def remove(key: K) = {
- verifyTransaction
- state = state - key
- }
-
- override def elements: Iterator[(K, V)] = {
-// verifyTransaction
- state.elements
- }
-
- override def get(key: K): Option[V] = {
- verifyTransaction
- state.get(key)
- }
-
- override def put(key: K, value: V): Option[V] = {
- verifyTransaction
- val oldValue = state.get(key)
- state = state.update(key, value)
- oldValue
- }
-
- override def -=(key: K) = {
- verifyTransaction
- remove(key)
- }
-
- override def update(key: K, value: V) = {
- verifyTransaction
- put(key, value)
- }
-}
-
-/**
- * Base class for all persistent transactional map implementations should extend.
- * Implements a Unit of Work, records changes into a change set.
- *
- * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
- *
- * @author Jonas Bonér
- */
-abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
-
- // FIXME: need to handle remove in another changeSet
- protected[kernel] val changeSet = new HashMap[K, V]
-
- def getRange(start: Option[AnyRef], count: Int)
-
- // ---- For Transactional ----
- override def begin = {}
-
- override def rollback = changeSet.clear
-
- // ---- For scala.collection.mutable.Map ----
- override def put(key: K, value: V): Option[V] = {
- verifyTransaction
- changeSet += key -> value
- None // always return None to speed up writes (else need to go to DB to get
- }
-
- override def -=(key: K) = remove(key)
-
- override def update(key: K, value: V) = put(key, value)
-}
-
-/**
- * Implementation of PersistentTransactionalMap for every concrete
- * storage will have the same workflow. This abstracts the workflow.
- *
- * Subclasses just need to provide the actual concrete instance for the
- * abstract val storage.
- *
- * @author Jonas Bonér
- */
-abstract class TemplatePersistentTransactionalMap extends PersistentTransactionalMap[AnyRef, AnyRef] {
-
- // to be concretized in subclasses
- val storage: MapStorage
-
- override def remove(key: AnyRef) = {
- verifyTransaction
- if (changeSet.contains(key)) changeSet -= key
- else storage.removeMapStorageFor(uuid, key)
- }
-
- override def getRange(start: Option[AnyRef], count: Int) =
- getRange(start, None, count)
-
- def getRange(start: Option[AnyRef], finish: Option[AnyRef], count: Int) = {
- verifyTransaction
- try {
- storage.getMapStorageRangeFor(uuid, start, finish, count)
- } catch {
- case e: Exception => Nil
- }
- }
-
- // ---- For Transactional ----
- override def commit = {
- storage.insertMapStorageEntriesFor(uuid, changeSet.toList)
- changeSet.clear
- }
-
- // ---- Overriding scala.collection.mutable.Map behavior ----
- override def clear = {
- verifyTransaction
- try {
- storage.removeMapStorageFor(uuid)
- } catch {
- case e: Exception => {}
- }
- }
-
- override def contains(key: AnyRef): Boolean = {
- try {
- verifyTransaction
- storage.getMapStorageEntryFor(uuid, key).isDefined
- } catch {
- case e: Exception => false
- }
- }
-
- override def size: Int = {
- verifyTransaction
- try {
- storage.getMapStorageSizeFor(uuid)
- } catch {
- case e: Exception => 0
- }
- }
-
- // ---- For scala.collection.mutable.Map ----
- override def get(key: AnyRef): Option[AnyRef] = {
- verifyTransaction
- // if (changeSet.contains(key)) changeSet.get(key)
- // else {
- val result = try {
- storage.getMapStorageEntryFor(uuid, key)
- } catch {
- case e: Exception => None
- }
- result
- //}
- }
-
- override def elements: Iterator[Tuple2[AnyRef, AnyRef]] = {
- //verifyTransaction
- new Iterator[Tuple2[AnyRef, AnyRef]] {
- private val originalList: List[Tuple2[AnyRef, AnyRef]] = try {
- storage.getMapStorageFor(uuid)
- } catch {
- case e: Throwable => Nil
- }
- private var elements = originalList.reverse
- override def next: Tuple2[AnyRef, AnyRef]= synchronized {
- val element = elements.head
- elements = elements.tail
- element
- }
- override def hasNext: Boolean = synchronized { !elements.isEmpty }
- }
- }
-}
-
-
-/**
- * Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage.
- *
- * @author Debasish Ghosh
- */
-class CassandraPersistentTransactionalMap extends TemplatePersistentTransactionalMap {
- val storage = CassandraStorage
-}
-
-/**
- * Implements a persistent transactional map based on the MongoDB distributed P2P key-value storage.
- *
- * @author Debasish Ghosh
- */
-class MongoPersistentTransactionalMap extends TemplatePersistentTransactionalMap {
- val storage = MongoStorage
-}
-
-/**
- * Base for all transactional vector implementations.
- *
- * @author Jonas Bonér
- */
-abstract class TransactionalVector[T] extends Transactional with RandomAccessSeq[T] {
- override def hashCode: Int = System.identityHashCode(this);
- override def equals(other: Any): Boolean = false
-
- def add(elem: T)
-
- def get(index: Int): T
-
- def getRange(start: Int, count: Int): List[T]
-}
-
-/**
- * Implements an in-memory transactional vector.
- *
- * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
- *
- * @author Jonas Bonér
- */
-class InMemoryTransactionalVector[T] extends TransactionalVector[T] {
- private[kernel] var state: Vector[T] = EmptyVector
- private[kernel] var snapshot = state
-
- def add(elem: T) = {
- verifyTransaction
- state = state + elem
- }
-
- def get(index: Int): T = {
- verifyTransaction
- state(index)
- }
-
- def getRange(start: Int, count: Int): List[T] = {
- verifyTransaction
- state.slice(start, count).toList.asInstanceOf[List[T]]
- }
-
- // ---- For Transactional ----
- override def begin = snapshot = state
-
- override def commit = snapshot = state
-
- override def rollback = state = snapshot
-
- // ---- For Seq ----
- def length: Int = {
- verifyTransaction
- state.length
- }
-
- def apply(index: Int): T = {
- verifyTransaction
- state(index)
- }
-
- override def elements: Iterator[T] = {
- //verifyTransaction
- state.elements
- }
-
- override def toList: List[T] = {
- verifyTransaction
- state.toList
- }
-}
-
-/**
- * Base class for all persistent transactional vector implementations should extend.
- * Implements a Unit of Work, records changes into a change set.
- *
- * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
- *
- * @author Jonas Bonér
- */
-abstract class PersistentTransactionalVector[T] extends TransactionalVector[T] {
-
- // FIXME: need to handle remove in another changeSet
- protected[kernel] val changeSet = new ArrayBuffer[T]
-
- // ---- For Transactional ----
- override def begin = {}
-
- override def rollback = changeSet.clear
-
- // ---- For TransactionalVector ----
- override def add(value: T) = {
- verifyTransaction
- changeSet += value
- }
-}
-
-/**
- * Implements a template for a concrete persistent transactional vector based storage.
- *
- * @author Debasish Ghosh
- */
-abstract class TemplatePersistentTransactionalVector extends PersistentTransactionalVector[AnyRef] {
-
- val storage: VectorStorage
-
- // ---- For TransactionalVector ----
- override def get(index: Int): AnyRef = {
- verifyTransaction
- if (changeSet.size > index) changeSet(index)
- else storage.getVectorStorageEntryFor(uuid, index)
- }
-
- override def getRange(start: Int, count: Int): List[AnyRef] =
- getRange(Some(start), None, count)
-
- def getRange(start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
- verifyTransaction
- storage.getVectorStorageRangeFor(uuid, start, finish, count)
- }
-
- override def length: Int = {
- verifyTransaction
- storage.getVectorStorageSizeFor(uuid)
- }
-
- override def apply(index: Int): AnyRef = get(index)
-
- override def first: AnyRef = get(0)
-
- override def last: AnyRef = {
- verifyTransaction
- val l = length
- if (l == 0) throw new NoSuchElementException("Vector is empty")
- get(length - 1)
- }
-
- // ---- For Transactional ----
- override def commit = {
- // FIXME: should use batch function once the bug is resolved
- for (element <- changeSet) storage.insertVectorStorageEntryFor(uuid, element)
- changeSet.clear
- }
-}
-
-/**
- * Implements a persistent transactional vector based on the Cassandra distributed P2P key-value storage.
- *
- * @author Debaissh Ghosh
- */
-class CassandraPersistentTransactionalVector extends TemplatePersistentTransactionalVector {
- val storage = CassandraStorage
-}
-
-/**
- * Implements a persistent transactional vector based on the MongoDB distributed P2P key-value storage.
- *
- * @author Debaissh Ghosh
- */
-class MongoPersistentTransactionalVector extends TemplatePersistentTransactionalVector {
- val storage = MongoStorage
-}
-
-/**
- * Implements a transactional reference.
- *
- * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
- *
- * @author Jonas Bonér
- */
-class TransactionalRef[T] extends Transactional {
- private[kernel] var ref: Option[T] = None
- private[kernel] var snapshot: Option[T] = None
-
- override def begin = if (ref.isDefined) snapshot = Some(ref.get)
-
- override def commit = if (ref.isDefined) snapshot = Some(ref.get)
-
- override def rollback = if (snapshot.isDefined) ref = Some(snapshot.get)
-
- def swap(elem: T) = {
- verifyTransaction
- ref = Some(elem)
- }
-
- def get: Option[T] = {
- verifyTransaction
- ref
- }
-
- def getOrElse(default: => T): T = {
- verifyTransaction
- ref.getOrElse(default)
- }
-
- def isDefined: Boolean = {
- verifyTransaction
- ref.isDefined
- }
-}
-
-abstract class TemplatePersistentTransactionalRef extends TransactionalRef[AnyRef] {
- val storage: RefStorage
-
- override def commit = if (ref.isDefined) {
- storage.insertRefStorageFor(uuid, ref.get)
- ref = None
- }
-
- override def rollback = ref = None
-
- override def get: Option[AnyRef] = {
- verifyTransaction
- storage.getRefStorageFor(uuid)
- }
-
- override def isDefined: Boolean = get.isDefined
-
- override def getOrElse(default: => AnyRef): AnyRef = {
- val ref = get
- if (ref.isDefined) ref
- else default
- }
-}
-
-class CassandraPersistentTransactionalRef extends TemplatePersistentTransactionalRef {
- val storage = CassandraStorage
-}
-
-class MongoPersistentTransactionalRef extends TemplatePersistentTransactionalRef {
- val storage = MongoStorage
-}
diff --git a/kernel/src/main/scala/state/TransactionalState.scala b/kernel/src/main/scala/state/TransactionalState.scala
new file mode 100755
index 0000000000..cf1d124c26
--- /dev/null
+++ b/kernel/src/main/scala/state/TransactionalState.scala
@@ -0,0 +1,143 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.kernel.state
+
+import kernel.stm.{Ref, TransactionManagement}
+import akka.collection._
+
+import org.codehaus.aspectwerkz.proxy.Uuid
+
+import scala.collection.mutable.{ArrayBuffer, HashMap}
+
+sealed abstract class TransactionalStateConfig
+abstract class PersistentStorageConfig extends TransactionalStateConfig
+case class CassandraStorageConfig extends PersistentStorageConfig
+case class TerracottaStorageConfig extends PersistentStorageConfig
+case class TokyoCabinetStorageConfig extends PersistentStorageConfig
+case class MongoStorageConfig extends PersistentStorageConfig
+
+/**
+ * Scala API.
+ *
+ * Example Scala usage:
+ *
+ * val myMap = TransactionalState.newPersistentMap(CassandraStorageConfig)
+ *
+ */
+object TransactionalState extends TransactionalState
+
+/**
+ * Java API.
+ *
+ * Example Java usage:
+ *
+ * TransactionalState state = new TransactionalState();
+ * TransactionalMap myMap = state.newPersistentMap(new CassandraStorageConfig());
+ *
+ */
+class TransactionalState {
+ def newPersistentMap(config: PersistentStorageConfig): TransactionalMap[AnyRef, AnyRef] = config match {
+ case CassandraStorageConfig() => new CassandraPersistentTransactionalMap
+ case MongoStorageConfig() => new MongoPersistentTransactionalMap
+ case TerracottaStorageConfig() => throw new UnsupportedOperationException
+ case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
+ }
+
+ def newPersistentVector(config: PersistentStorageConfig): TransactionalVector[AnyRef] = config match {
+ case CassandraStorageConfig() => new CassandraPersistentTransactionalVector
+ case MongoStorageConfig() => new MongoPersistentTransactionalVector
+ case TerracottaStorageConfig() => throw new UnsupportedOperationException
+ case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
+ }
+
+ def newPersistentRef(config: PersistentStorageConfig): TransactionalRef[AnyRef] = config match {
+ case CassandraStorageConfig() => new CassandraPersistentTransactionalRef
+ case MongoStorageConfig() => throw new UnsupportedOperationException
+ case TerracottaStorageConfig() => throw new UnsupportedOperationException
+ case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
+ }
+
+ def newMap[K, V]: TransactionalMap[K, V] = new InMemoryTransactionalMap[K, V]
+
+ def newVector[T]: TransactionalVector[T] = new InMemoryTransactionalVector[T]
+
+ def newRef[T]: TransactionalRef[T] = new TransactionalRef[T]
+ def newRef[T](init: T): TransactionalRef[T] = new TransactionalRef[T](init)
+}
+
+/**
+ * Implements a transactional reference. Based on the Multiverse STM.
+ *
+ * @author Jonas Bonér
+ */
+class TransactionalRef[T](elem: T) extends Transactional {
+ def this() = this(null.asInstanceOf[T])
+
+ private[kernel] val ref = new Ref[T](elem)
+
+ def swap(elem: T) = ref.set(elem)
+
+ def get: Option[T] = {
+ if (ref.isNull) None
+ else Some(ref.get)
+ }
+
+ def getOrWait: T = ref.getOrAwait
+
+ def getOrElse(default: => T): T = {
+ if (ref.isNull) default
+ else ref.get
+ }
+
+ def isDefined: Boolean = !ref.isNull
+}
+
+/**
+ * @author Jonas Bonér
+ */
+@serializable
+trait Transactional {
+ // FIXME: won't work across the cluster
+ val uuid = Uuid.newUuid.toString
+
+ private[kernel] def commit = {}
+
+ protected def verifyTransaction = {
+ val cflowTx = TransactionManagement.threadBoundTx.get
+ if (!cflowTx.isDefined) throw new IllegalStateException("Can't access transactional reference outside the scope of a transaction [" + this + "]")
+ else cflowTx.get.register(this)
+ }
+}
+
+/**
+ * Base trait for all state implementations (persistent or in-memory).
+ *
+ * FIXME: Create Java versions using pcollections
+ *
+ * @author Jonas Bonér
+ */
+trait TransactionalMap[K, V] extends Transactional with scala.collection.mutable.Map[K, V] {
+ override def hashCode: Int = System.identityHashCode(this)
+ override def equals(other: Any): Boolean = false
+ def remove(key: K)
+}
+
+/**
+ * Base for all transactional vector implementations.
+ *
+ * @author Jonas Bonér
+ */
+abstract class TransactionalVector[T] extends Transactional with RandomAccessSeq[T] {
+ override def hashCode: Int = System.identityHashCode(this)
+ override def equals(other: Any): Boolean = false
+
+ def add(elem: T)
+
+ def get(index: Int): T
+
+ def getRange(start: Int, count: Int): List[T]
+}
+
+
diff --git a/kernel/src/main/scala/stm/Transaction.scala b/kernel/src/main/scala/stm/Transaction.scala
old mode 100644
new mode 100755
index b72eb7c2b2..bb47a397bb
--- a/kernel/src/main/scala/stm/Transaction.scala
+++ b/kernel/src/main/scala/stm/Transaction.scala
@@ -4,9 +4,180 @@
package se.scalablesolutions.akka.kernel.stm
-import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import kernel.state.Transactional
import kernel.util.Logging
+
+import org.multiverse.api.{Transaction => MultiverseTransaction}
+import org.multiverse.stms.alpha.AlphaStm
+import org.multiverse.utils.GlobalStmInstance
+import org.multiverse.utils.TransactionThreadLocal._
+
+import java.util.concurrent.atomic.AtomicInteger
+
+/**
+ * @author Jonas Bonér
+ */
+object Multiverse {
+ val STM = new AlphaStm
+ GlobalStmInstance.set(STM)
+ setThreadLocalTransaction(null)
+}
+
+/**
+ * @author Jonas Bonér
+ */
+@serializable class Transaction extends Logging {
+ private[this] var _id = 0L
+ def id = _id
+ @volatile private[this] var status: TransactionStatus = TransactionStatus.New
+ private[kernel] var transaction: MultiverseTransaction = _
+
+ private[this] val transactionalItems = new ChangeSet
+
+ private[this] var participants: List[String] = Nil
+ private[this] var precommitted: List[String] = Nil
+
+ private[this] val depth = new AtomicInteger(0)
+
+ def increment = synchronized { depth.incrementAndGet }
+ def decrement = synchronized { depth.decrementAndGet }
+ def isTopLevel = synchronized { depth.get == 0 }
+
+ def register(transactional: Transactional) = synchronized {
+ ensureIsActiveOrNew
+ transactionalItems + transactional
+ }
+
+ def begin(participant: String) = synchronized {
+ ensureIsActiveOrNew
+ transaction = Multiverse.STM.startUpdateTransaction
+ _id = transaction.getReadVersion
+ log.debug("Creating a new transaction with id [%s]", _id)
+
+ if (status == TransactionStatus.New) log.debug("TX BEGIN - Server with UUID [%s] is starting NEW transaction [%s]", participant, toString)
+ else log.debug("Server [%s] is participating in transaction", participant)
+ participants ::= participant
+ status = TransactionStatus.Active
+ }
+
+ def precommit(participant: String) = synchronized {
+ if (status == TransactionStatus.Active) {
+ log.debug("TX PRECOMMIT - Pre-committing transaction [%s] for server with UUID [%s]", toString, participant)
+ precommitted ::= participant
+ }
+ }
+
+ def commit(participant: String): Boolean = synchronized {
+ if (status == TransactionStatus.Active) {
+ log.debug("TX COMMIT - Committing transaction [%s] for server with UUID [%s]", toString, participant)
+ val haveAllPreCommitted =
+ if (participants.size == precommitted.size) {{
+ for (part <- participants) yield {
+ if (precommitted.exists(_ == part)) true
+ else false
+ }}.exists(_ == true)
+ } else false
+ if (haveAllPreCommitted && transaction != null) {
+ transaction.commit
+ transactionalItems.items.foreach(_.commit)
+ status = TransactionStatus.Completed
+ reset
+ true
+ } else false
+ } else {
+ reset
+ true
+ }
+ }
+
+ def rollback(participant: String) = synchronized {
+ ensureIsActiveOrAborted
+ log.debug("TX ROLLBACK - Server with UUID [%s] has initiated transaction rollback for [%s]", participant, toString)
+ transaction.abort
+ status = TransactionStatus.Aborted
+ reset
+ }
+
+ def rollbackForRescheduling(participant: String) = synchronized {
+ ensureIsActiveOrAborted
+ log.debug("TX ROLLBACK for recheduling - Server with UUID [%s] has initiated transaction rollback for [%s]", participant, toString)
+ transaction.abort
+ reset
+ }
+
+ def join(participant: String) = synchronized {
+ ensureIsActive
+ log.debug("TX JOIN - Server with UUID [%s] is joining transaction [%s]" , participant, toString)
+ participants ::= participant
+ }
+
+ def isNew = status == TransactionStatus.New
+ def isActive = status == TransactionStatus.Active
+ def isCompleted = status == TransactionStatus.Completed
+ def isAborted = status == TransactionStatus.Aborted
+
+ private def reset = {
+ transactionalItems.clear
+ participants = Nil
+ precommitted = Nil
+ }
+
+ private def ensureIsActive = if (status != TransactionStatus.Active)
+ throw new IllegalStateException("Expected ACTIVE transaction - current status [" + status + "]: " + toString)
+
+ private def ensureIsActiveOrAborted = if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted))
+ throw new IllegalStateException("Expected ACTIVE or ABORTED transaction - current status [" + status + "]: " + toString)
+
+ private def ensureIsActiveOrNew = if (!(status == TransactionStatus.Active || status == TransactionStatus.New))
+ throw new IllegalStateException("Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString)
+
+ // For reinitialize transaction after sending it over the wire
+ private[kernel] def reinit = synchronized {
+ import net.lag.logging.{Logger, Level}
+ if (log == null) {
+ log = Logger.get(this.getClass.getName)
+ log.setLevel(Level.ALL)
+ }
+ }
+
+ override def equals(that: Any): Boolean = synchronized {
+ that != null &&
+ that.isInstanceOf[Transaction] &&
+ that.asInstanceOf[Transaction].id == this.id
+ }
+
+ override def hashCode(): Int = id.toInt
+
+ override def toString(): String = synchronized {
+ "Transaction[" + id + ", " + status + "]"
+ }
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@serializable sealed abstract class TransactionStatus
object TransactionStatus {
@@ -20,18 +191,17 @@ object TransactionStatus {
* Represents a snapshot of the current invocation.
*
* @author Jonas Bonér
- */
object TransactionIdFactory {
// FIXME: will not work in distributed env
private val currentId = new AtomicLong(0L)
def newId = currentId.getAndIncrement
}
+ */
/**
* Represents a snapshot of the current invocation.
*
* @author Jonas Bonér
- */
@serializable class Transaction extends Logging {
val id = TransactionIdFactory.newId
@@ -154,3 +324,4 @@ object TransactionIdFactory {
"Transaction[" + id + ", " + status + "]"
}
}
+ */
diff --git a/kernel/src/main/scala/stm/TransactionManagement.scala b/kernel/src/main/scala/stm/TransactionManagement.scala
old mode 100644
new mode 100755
index e67f4ce861..2f0c4015fb
--- a/kernel/src/main/scala/stm/TransactionManagement.scala
+++ b/kernel/src/main/scala/stm/TransactionManagement.scala
@@ -10,6 +10,8 @@ import kernel.reactor.MessageInvocation
import kernel.util.Logging
import org.codehaus.aspectwerkz.proxy.Uuid // FIXME is java.util.UUID better?
+import org.multiverse.utils.TransactionThreadLocal._
+
class TransactionRollbackException(msg: String) extends RuntimeException(msg)
class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
@@ -46,6 +48,7 @@ trait TransactionManagement extends Logging {
val tx = Some(newTx)
activeTx = tx
threadBoundTx.set(tx)
+ setThreadLocalTransaction(tx.get.transaction)
tx
}
@@ -110,6 +113,7 @@ trait TransactionManagement extends Logging {
protected def removeTransactionIfTopLevel = if (isTransactionTopLevel) {
activeTx = None
threadBoundTx.set(None)
+ setThreadLocalTransaction(null)
}
protected def reenteringExistingTransaction= if (activeTx.isDefined) {
diff --git a/kernel/src/test/scala/InMemoryActorSpec.scala b/kernel/src/test/scala/InMemoryActorSpec.scala
index 5ce06a10b3..95dc081f13 100644
--- a/kernel/src/test/scala/InMemoryActorSpec.scala
+++ b/kernel/src/test/scala/InMemoryActorSpec.scala
@@ -26,9 +26,9 @@ case class FailureOneWay(key: String, value: String, failer: Actor)
class InMemStatefulActor extends Actor {
timeout = 100000
makeTransactionRequired
- private val mapState = TransactionalState.newInMemoryMap[String, String]
- private val vectorState = TransactionalState.newInMemoryVector[String]
- private val refState = TransactionalState.newInMemoryRef[String]
+ private val mapState = TransactionalState.newMap[String, String]
+ private val vectorState = TransactionalState.newVector[String]
+ private val refState = TransactionalState.newRef[String]("")
def receive: PartialFunction[Any, Unit] = {
case GetMapState(key) =>
@@ -217,14 +217,26 @@ class InMemoryActorSpec extends TestCase {
def testRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
stateful.start
+ println("------------ 1")
+ try {
stateful !! SetRefState("init") // set init state
+ } catch {
+ case e: RuntimeException =>
+ println("------------ 1.1")
+ stateful !! SetRefState("init") // set init state
+ println("------------ 1.2")
+ }
+ println("------------ 2")
val failer = new InMemFailerActor
failer.start
try {
+ println("------------ 3")
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
+ println("------------ 4")
fail("should have thrown an exception")
- } catch {case e: RuntimeException => {}}
+ } catch {case e: RuntimeException => {
+ println("------------ 5")
+ }}
assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state
}
-
-}
\ No newline at end of file
+}
diff --git a/kernel/src/test/scala/TransactionClasherSpec.scala b/kernel/src/test/scala/TransactionClasherSpec.scala
index b04f14d120..d955331225 100644
--- a/kernel/src/test/scala/TransactionClasherSpec.scala
+++ b/kernel/src/test/scala/TransactionClasherSpec.scala
@@ -24,7 +24,7 @@ class TxActor(clasher: Actor) extends Actor {
}
class TxClasherActor extends Actor {
- val vector = TransactionalState.newInMemoryVector[String]
+ val vector = TransactionalState.newVector[String]
timeout = 1000000
makeTransactionRequired
var count = 0
@@ -59,7 +59,7 @@ class TxActorOneWay(clasher: Actor) extends Actor {
}
class TxClasherActorOneWay extends Actor {
- val vector = TransactionalState.newInMemoryVector[String]
+ val vector = TransactionalState.newVector[String]
timeout = 1000000
makeTransactionRequired
var count = 0
@@ -146,4 +146,4 @@ class TransactionClasherSpec extends TestCase {
assertEquals("First", (clasher !! "Index1").get)
}
*/
-}
\ No newline at end of file
+}
diff --git a/pom.xml b/pom.xml
old mode 100644
new mode 100755
diff --git a/util-java/pom.xml b/util-java/pom.xml
old mode 100644
new mode 100755
index fdcbbb5113..7a7a3199e9
--- a/util-java/pom.xml
+++ b/util-java/pom.xml
@@ -25,6 +25,11 @@
protobuf-java
2.1.0
+