diff --git a/kernel/src/main/scala/actor/Actor.scala b/kernel/src/main/scala/actor/Actor.scala
old mode 100644
new mode 100755
index 5e21048fd4..139e5d914b
--- a/kernel/src/main/scala/actor/Actor.scala
+++ b/kernel/src/main/scala/actor/Actor.scala
@@ -19,6 +19,7 @@ import nio.{RemoteProtocolBuilder, RemoteClient, RemoteServer, RemoteRequestIdFa
import management.Management
import com.twitter.service.Stats
+import org.multiverse.utils.TransactionThreadLocal._
sealed abstract class LifecycleMessage
case class Init(config: AnyRef) extends LifecycleMessage
@@ -464,7 +465,10 @@ trait Actor extends Logging with TransactionManagement {
}
private def dispatch[T](messageHandle: MessageInvocation) = {
- if (messageHandle.tx.isDefined) TransactionManagement.threadBoundTx.set(messageHandle.tx)
+ if (messageHandle.tx.isDefined) {
+ TransactionManagement.threadBoundTx.set(messageHandle.tx)
+ setThreadLocalTransaction(messageHandle.tx.get.transaction)
+ }
val message = messageHandle.message //serializeMessage(messageHandle.message)
val future = messageHandle.future
try {
@@ -479,11 +483,15 @@ trait Actor extends Logging with TransactionManagement {
else e.printStackTrace
} finally {
TransactionManagement.threadBoundTx.set(None)
+ setThreadLocalTransaction(null)
}
}
private def transactionalDispatch[T](messageHandle: MessageInvocation) = {
- if (messageHandle.tx.isDefined) TransactionManagement.threadBoundTx.set(messageHandle.tx)
+ if (messageHandle.tx.isDefined) {
+ TransactionManagement.threadBoundTx.set(messageHandle.tx)
+ setThreadLocalTransaction(messageHandle.tx.get.transaction)
+ }
val message = messageHandle.message //serializeMessage(messageHandle.message)
val future = messageHandle.future
try {
@@ -500,6 +508,7 @@ trait Actor extends Logging with TransactionManagement {
case e =>
rollback(activeTx)
TransactionManagement.threadBoundTx.set(None) // need to clear threadBoundTx before call to supervisor
+ setThreadLocalTransaction(null)
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
if (supervisor.isDefined) supervisor.get ! Exit(this, e)
if (future.isDefined) future.get.completeWithException(this, e)
@@ -510,6 +519,7 @@ trait Actor extends Logging with TransactionManagement {
else tryToPrecommitTransaction
rescheduleClashedMessages
TransactionManagement.threadBoundTx.set(None)
+ setThreadLocalTransaction(null)
}
}
diff --git a/kernel/src/main/scala/nio/RemoteServer.scala b/kernel/src/main/scala/nio/RemoteServer.scala
old mode 100644
new mode 100755
index bbc223247e..fda24718c4
--- a/kernel/src/main/scala/nio/RemoteServer.scala
+++ b/kernel/src/main/scala/nio/RemoteServer.scala
@@ -235,7 +235,11 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
if (tx.isDefined) {
tx.get.reinit
TransactionManagement.threadBoundTx.set(tx)
- } else TransactionManagement.threadBoundTx.set(None)
+ setThreadLocalTransaction(tx.transaction)
+ } else {
+ TransactionManagement.threadBoundTx.set(None)
+ setThreadLocalTransaction(null)
+ }
}
*/
private def unescapeArgs(args: scala.List[AnyRef], argClasses: scala.List[Class[_]], timeout: Long) = {
diff --git a/kernel/src/main/scala/state/InMemoryState.scala b/kernel/src/main/scala/state/InMemoryState.scala
new file mode 100755
index 0000000000..002929ab5e
--- /dev/null
+++ b/kernel/src/main/scala/state/InMemoryState.scala
@@ -0,0 +1,61 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.kernel.state
+
+import kernel.stm.{Ref, TransactionManagement}
+import akka.collection._
+
+import org.codehaus.aspectwerkz.proxy.Uuid
+
+import scala.collection.mutable.{ArrayBuffer, HashMap}
+
+/**
+ * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
+ *
+ * @author Jonas Bonér
+ */
+class InMemoryTransactionalMap[K, V] extends TransactionalMap[K, V] {
+ protected[this] val ref = new TransactionalRef[HashTrie[K, V]](new HashTrie[K, V])
+
+ // ---- Overriding scala.collection.mutable.Map behavior ----
+ override def contains(key: K): Boolean = ref.get.get.contains(key)
+ override def clear = ref.swap(new HashTrie[K, V])
+ override def size: Int = ref.get.get.size
+
+ // ---- For scala.collection.mutable.Map ----
+ override def remove(key: K) = ref.swap(ref.get.get - key)
+ override def elements: Iterator[(K, V)] = ref.get.get.elements
+ override def get(key: K): Option[V] = ref.get.get.get(key)
+ override def put(key: K, value: V): Option[V] = {
+ val map = ref.get.get
+ val oldValue = map.get(key)
+ ref.swap(map.update(key, value))
+ oldValue
+ }
+ override def -=(key: K) = remove(key)
+ override def update(key: K, value: V) = put(key, value)
+}
+
+/**
+ * Implements an in-memory transactional vector.
+ *
+ * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
+ *
+ * @author Jonas Bonér
+ */
+class InMemoryTransactionalVector[T] extends TransactionalVector[T] {
+ private[kernel] val ref = new TransactionalRef[Vector[T]](EmptyVector)
+
+ def add(elem: T) = ref.swap(ref.get.get + elem)
+ def get(index: Int): T = ref.get.get.apply(index)
+ def getRange(start: Int, count: Int): List[T] = ref.get.get.slice(start, count).toList.asInstanceOf[List[T]]
+
+ // ---- For Seq ----
+ def length: Int = ref.get.get.length
+ def apply(index: Int): T = ref.get.get.apply(index)
+ override def elements: Iterator[T] = ref.get.get.elements
+ override def toList: List[T] = ref.get.get.toList
+}
+
diff --git a/kernel/src/main/scala/state/State.scala b/kernel/src/main/scala/state/PersistentState.scala
old mode 100644
new mode 100755
similarity index 53%
rename from kernel/src/main/scala/state/State.scala
rename to kernel/src/main/scala/state/PersistentState.scala
index 45c0d7b1dc..9b75c38751
--- a/kernel/src/main/scala/state/State.scala
+++ b/kernel/src/main/scala/state/PersistentState.scala
@@ -4,133 +4,14 @@
package se.scalablesolutions.akka.kernel.state
-import kernel.stm.TransactionManagement
+import kernel.stm.{Ref, TransactionManagement}
import akka.collection._
import org.codehaus.aspectwerkz.proxy.Uuid
import scala.collection.mutable.{ArrayBuffer, HashMap}
-import org.multiverse.standard.manuallyinstrumented.Ref
-
-sealed abstract class TransactionalStateConfig
-abstract class PersistentStorageConfig extends TransactionalStateConfig
-case class CassandraStorageConfig extends PersistentStorageConfig
-case class TerracottaStorageConfig extends PersistentStorageConfig
-case class TokyoCabinetStorageConfig extends PersistentStorageConfig
-case class MongoStorageConfig extends PersistentStorageConfig
-
-/**
- * Scala API.
- *
- * Example Scala usage:
- *
- * val myMap = TransactionalState.newPersistentMap(CassandraStorageConfig)
- *
- */
-object TransactionalState extends TransactionalState
-
-/**
- * Java API.
- *
- * Example Java usage:
- *
- * TransactionalState state = new TransactionalState();
- * TransactionalMap myMap = state.newPersistentMap(new CassandraStorageConfig());
- *
- */
-class TransactionalState {
- def newPersistentMap(config: PersistentStorageConfig): TransactionalMap[AnyRef, AnyRef] = config match {
- case CassandraStorageConfig() => new CassandraPersistentTransactionalMap
- case MongoStorageConfig() => new MongoPersistentTransactionalMap
- case TerracottaStorageConfig() => throw new UnsupportedOperationException
- case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
- }
-
- def newPersistentVector(config: PersistentStorageConfig): TransactionalVector[AnyRef] = config match {
- case CassandraStorageConfig() => new CassandraPersistentTransactionalVector
- case MongoStorageConfig() => new MongoPersistentTransactionalVector
- case TerracottaStorageConfig() => throw new UnsupportedOperationException
- case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
- }
-
- def newPersistentRef(config: PersistentStorageConfig): TransactionalRef[AnyRef] = config match {
- case CassandraStorageConfig() => new CassandraPersistentTransactionalRef
- case TerracottaStorageConfig() => throw new UnsupportedOperationException
- case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
- }
-
- def newInMemoryMap[K, V]: TransactionalMap[K, V] = new InMemoryTransactionalMap[K, V]
-
- def newInMemoryVector[T]: TransactionalVector[T] = new InMemoryTransactionalVector[T]
-
- def newInMemoryRef[T]: TransactionalRef[T] = new TransactionalRef[T]
-}
-
-/**
- * @author Jonas Bonér
- */
-@serializable
-trait Transactional {
- // FIXME: won't work across the cluster
- val uuid = Uuid.newUuid.toString
-
- protected def verifyTransaction = {
- val cflowTx = TransactionManagement.threadBoundTx.get
- if (!cflowTx.isDefined) {
- throw new IllegalStateException("Can't access transactional reference outside the scope of a transaction [" + this + "]")
- } else {
- cflowTx.get.register(this)
- }
- }
-}
-
-/**
- * Base trait for all state implementations (persistent or in-memory).
- *
- * FIXME: Create Java versions using pcollections
- *
- * @author Jonas Bonér
- */
-trait TransactionalMap[K, V] extends Transactional with scala.collection.mutable.Map[K, V] {
- override def hashCode: Int = System.identityHashCode(this);
- override def equals(other: Any): Boolean = false
- def remove(key: K)
-}
-
-/**
- * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
- *
- * @author Jonas Bonér
- */
-class InMemoryTransactionalMap[K, V] extends TransactionalMap[K, V] {
- protected[this] var ref = new TransactionalRef(new HashTrie[K, V])
-
- // ---- Overriding scala.collection.mutable.Map behavior ----
- override def contains(key: K): Boolean = ref.get.contains(key)
-
- override def clear = ref.set(new HashTrie[K, V])
-
- override def size: Int = ref.get.size
-
- // ---- For scala.collection.mutable.Map ----
- override def remove(key: K) = ref.set(ref.get - key)
-
- override def elements: Iterator[(K, V)] = ref.get.elements
-
- override def get(key: K): Option[V] = ref.get.get(key)
-
- override def put(key: K, value: V): Option[V] = {
- val map = ref.get
- val oldValue = map.get(key)
- ref.set(map.update(key, value))
- oldValue
- }
-
- override def -=(key: K) = remove(key)
-
- override def update(key: K, value: V) = put(key, value)
-}
+import org.multiverse.utils.TransactionThreadLocal._
/**
* Base class for all persistent transactional map implementations should extend.
@@ -147,10 +28,6 @@ abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
def getRange(start: Option[AnyRef], count: Int)
- def begin
- def commit
- def rollback = changeSet.clear
-
// ---- For scala.collection.mutable.Map ----
override def put(key: K, value: V): Option[V] = {
verifyTransaction
@@ -178,7 +55,6 @@ abstract class TemplatePersistentTransactionalMap extends PersistentTransactiona
val storage: MapStorage
override def remove(key: AnyRef) = {
- verifyTransaction
if (changeSet.contains(key)) changeSet -= key
else storage.removeMapStorageFor(uuid, key)
}
@@ -187,7 +63,6 @@ abstract class TemplatePersistentTransactionalMap extends PersistentTransactiona
getRange(start, None, count)
def getRange(start: Option[AnyRef], finish: Option[AnyRef], count: Int) = {
- verifyTransaction
try {
storage.getMapStorageRangeFor(uuid, start, finish, count)
} catch {
@@ -203,7 +78,6 @@ abstract class TemplatePersistentTransactionalMap extends PersistentTransactiona
// ---- Overriding scala.collection.mutable.Map behavior ----
override def clear = {
- verifyTransaction
try {
storage.removeMapStorageFor(uuid)
} catch {
@@ -281,76 +155,6 @@ class MongoPersistentTransactionalMap extends TemplatePersistentTransactionalMap
val storage = MongoStorage
}
-/**
- * Base for all transactional vector implementations.
- *
- * @author Jonas Bonér
- */
-abstract class TransactionalVector[T] extends Transactional with RandomAccessSeq[T] {
- override def hashCode: Int = System.identityHashCode(this);
- override def equals(other: Any): Boolean = false
-
- def add(elem: T)
-
- def get(index: Int): T
-
- def getRange(start: Int, count: Int): List[T]
-}
-
-/**
- * Implements an in-memory transactional vector.
- *
- * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
- *
- * @author Jonas Bonér
- */
-class InMemoryTransactionalVector[T] extends TransactionalVector[T] {
- private[kernel] var state = new Ref[Vector[T]](EmptyVector)
-
- def add(elem: T) = {
- state.get.set(elem)
- }
-
- def get(index: Int): T = {
-
-
- state(index)
- }
-
- def getRange(start: Int, count: Int): List[T] = {
- verifyTransaction
- state.slice(start, count).toList.asInstanceOf[List[T]]
- }
-
- // ---- For Transactional ----
- override def begin = snapshot = state
-
- override def commit = snapshot = state
-
- override def rollback = state = snapshot
-
- // ---- For Seq ----
- def length: Int = {
- verifyTransaction
- state.length
- }
-
- def apply(index: Int): T = {
- verifyTransaction
- state(index)
- }
-
- override def elements: Iterator[T] = {
- //verifyTransaction
- state.elements
- }
-
- override def toList: List[T] = {
- verifyTransaction
- state.toList
- }
-}
-
/**
* Base class for all persistent transactional vector implementations should extend.
* Implements a Unit of Work, records changes into a change set.
@@ -364,11 +168,6 @@ abstract class PersistentTransactionalVector[T] extends TransactionalVector[T] {
// FIXME: need to handle remove in another changeSet
protected[kernel] val changeSet = new ArrayBuffer[T]
- // ---- For Transactional ----
- override def begin = {}
-
- override def rollback = changeSet.clear
-
// ---- For TransactionalVector ----
override def add(value: T) = {
verifyTransaction
@@ -416,7 +215,6 @@ abstract class TemplatePersistentTransactionalVector extends PersistentTransacti
get(length - 1)
}
- // ---- For Transactional ----
override def commit = {
// FIXME: should use batch function once the bug is resolved
for (element <- changeSet) storage.insertVectorStorageEntryFor(uuid, element)
@@ -449,41 +247,12 @@ class MongoPersistentTransactionalVector extends TemplatePersistentTransactional
*
* @author Jonas Bonér
*/
-class TransactionalRef[T](elem: T) extends Transactional {
- def this() = this(null)
-
- private[kernel] val ref = new Ref[T](elem)
-
- def swap(elem: T) = ref.set(elem)
-
- def get: Option[T] = {
- if (ref.isNull) None
- else Some(ref.get)
- }
-
- def getOrElse(default: => T): T = {
- if (ref.isNull) default
- else ref.get
- }
-
- def isDefined: Boolean = !ref.isNull)
-}
-
-/**
- * Implements a transactional reference.
- *
- * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
- *
- * @author Jonas Bonér
- */
-class CassandraPersistentTransactionalRef extends TransactionalRef[String] {
- def commit = if (isDefined) {
+class CassandraPersistentTransactionalRef extends TransactionalRef[AnyRef] {
+ override def commit = if (isDefined) {
CassandraStorage.insertRefStorageFor(uuid, ref.get)
ref.clear
}
- def rollback = ref.clear
-
override def get: Option[AnyRef] = {
verifyTransaction
if (isDefined) super.get
diff --git a/kernel/src/main/scala/state/TransactionalState.scala b/kernel/src/main/scala/state/TransactionalState.scala
new file mode 100755
index 0000000000..cf1d124c26
--- /dev/null
+++ b/kernel/src/main/scala/state/TransactionalState.scala
@@ -0,0 +1,143 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.kernel.state
+
+import kernel.stm.{Ref, TransactionManagement}
+import akka.collection._
+
+import org.codehaus.aspectwerkz.proxy.Uuid
+
+import scala.collection.mutable.{ArrayBuffer, HashMap}
+
+sealed abstract class TransactionalStateConfig
+abstract class PersistentStorageConfig extends TransactionalStateConfig
+case class CassandraStorageConfig extends PersistentStorageConfig
+case class TerracottaStorageConfig extends PersistentStorageConfig
+case class TokyoCabinetStorageConfig extends PersistentStorageConfig
+case class MongoStorageConfig extends PersistentStorageConfig
+
+/**
+ * Scala API.
+ *
+ * Example Scala usage:
+ *
+ * val myMap = TransactionalState.newPersistentMap(CassandraStorageConfig)
+ *
+ */
+object TransactionalState extends TransactionalState
+
+/**
+ * Java API.
+ *
+ * Example Java usage:
+ *
+ * TransactionalState state = new TransactionalState();
+ * TransactionalMap myMap = state.newPersistentMap(new CassandraStorageConfig());
+ *
+ */
+class TransactionalState {
+ def newPersistentMap(config: PersistentStorageConfig): TransactionalMap[AnyRef, AnyRef] = config match {
+ case CassandraStorageConfig() => new CassandraPersistentTransactionalMap
+ case MongoStorageConfig() => new MongoPersistentTransactionalMap
+ case TerracottaStorageConfig() => throw new UnsupportedOperationException
+ case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
+ }
+
+ def newPersistentVector(config: PersistentStorageConfig): TransactionalVector[AnyRef] = config match {
+ case CassandraStorageConfig() => new CassandraPersistentTransactionalVector
+ case MongoStorageConfig() => new MongoPersistentTransactionalVector
+ case TerracottaStorageConfig() => throw new UnsupportedOperationException
+ case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
+ }
+
+ def newPersistentRef(config: PersistentStorageConfig): TransactionalRef[AnyRef] = config match {
+ case CassandraStorageConfig() => new CassandraPersistentTransactionalRef
+ case MongoStorageConfig() => throw new UnsupportedOperationException
+ case TerracottaStorageConfig() => throw new UnsupportedOperationException
+ case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
+ }
+
+ def newMap[K, V]: TransactionalMap[K, V] = new InMemoryTransactionalMap[K, V]
+
+ def newVector[T]: TransactionalVector[T] = new InMemoryTransactionalVector[T]
+
+ def newRef[T]: TransactionalRef[T] = new TransactionalRef[T]
+ def newRef[T](init: T): TransactionalRef[T] = new TransactionalRef[T](init)
+}
+
+/**
+ * Implements a transactional reference. Based on the Multiverse STM.
+ *
+ * @author Jonas Bonér
+ */
+class TransactionalRef[T](elem: T) extends Transactional {
+ def this() = this(null.asInstanceOf[T])
+
+ private[kernel] val ref = new Ref[T](elem)
+
+ def swap(elem: T) = ref.set(elem)
+
+ def get: Option[T] = {
+ if (ref.isNull) None
+ else Some(ref.get)
+ }
+
+ def getOrWait: T = ref.getOrAwait
+
+ def getOrElse(default: => T): T = {
+ if (ref.isNull) default
+ else ref.get
+ }
+
+ def isDefined: Boolean = !ref.isNull
+}
+
+/**
+ * @author Jonas Bonér
+ */
+@serializable
+trait Transactional {
+ // FIXME: won't work across the cluster
+ val uuid = Uuid.newUuid.toString
+
+ private[kernel] def commit = {}
+
+ protected def verifyTransaction = {
+ val cflowTx = TransactionManagement.threadBoundTx.get
+ if (!cflowTx.isDefined) throw new IllegalStateException("Can't access transactional reference outside the scope of a transaction [" + this + "]")
+ else cflowTx.get.register(this)
+ }
+}
+
+/**
+ * Base trait for all state implementations (persistent or in-memory).
+ *
+ * FIXME: Create Java versions using pcollections
+ *
+ * @author Jonas Bonér
+ */
+trait TransactionalMap[K, V] extends Transactional with scala.collection.mutable.Map[K, V] {
+ override def hashCode: Int = System.identityHashCode(this)
+ override def equals(other: Any): Boolean = false
+ def remove(key: K)
+}
+
+/**
+ * Base for all transactional vector implementations.
+ *
+ * @author Jonas Bonér
+ */
+abstract class TransactionalVector[T] extends Transactional with RandomAccessSeq[T] {
+ override def hashCode: Int = System.identityHashCode(this)
+ override def equals(other: Any): Boolean = false
+
+ def add(elem: T)
+
+ def get(index: Int): T
+
+ def getRange(start: Int, count: Int): List[T]
+}
+
+
diff --git a/kernel/src/main/scala/stm/Transaction.scala b/kernel/src/main/scala/stm/Transaction.scala
old mode 100644
new mode 100755
index 32aefa8a13..bb47a397bb
--- a/kernel/src/main/scala/stm/Transaction.scala
+++ b/kernel/src/main/scala/stm/Transaction.scala
@@ -7,31 +7,30 @@ package se.scalablesolutions.akka.kernel.stm
import kernel.state.Transactional
import kernel.util.Logging
-import org.multiverse.api.Transaction
-import org.multiverse.standard.DefaultStm
+import org.multiverse.api.{Transaction => MultiverseTransaction}
+import org.multiverse.stms.alpha.AlphaStm
import org.multiverse.utils.GlobalStmInstance
import org.multiverse.utils.TransactionThreadLocal._
+import java.util.concurrent.atomic.AtomicInteger
/**
* @author Jonas Bonér
*/
-object STM {
- val stm = new DefaultStm
- GlobalStmInstance.set(stm)
+object Multiverse {
+ val STM = new AlphaStm
+ GlobalStmInstance.set(STM)
setThreadLocalTransaction(null)
}
-
-
/**
* @author Jonas Bonér
*/
@serializable class Transaction extends Logging {
- log.debug("Creating a new transaction with id [%s]", id)
-
-
+ private[this] var _id = 0L
+ def id = _id
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
+ private[kernel] var transaction: MultiverseTransaction = _
private[this] val transactionalItems = new ChangeSet
@@ -51,6 +50,10 @@ object STM {
def begin(participant: String) = synchronized {
ensureIsActiveOrNew
+ transaction = Multiverse.STM.startUpdateTransaction
+ _id = transaction.getReadVersion
+ log.debug("Creating a new transaction with id [%s]", _id)
+
if (status == TransactionStatus.New) log.debug("TX BEGIN - Server with UUID [%s] is starting NEW transaction [%s]", participant, toString)
else log.debug("Server [%s] is participating in transaction", participant)
participants ::= participant
@@ -74,7 +77,8 @@ object STM {
else false
}}.exists(_ == true)
} else false
- if (haveAllPreCommitted) {
+ if (haveAllPreCommitted && transaction != null) {
+ transaction.commit
transactionalItems.items.foreach(_.commit)
status = TransactionStatus.Completed
reset
@@ -89,7 +93,7 @@ object STM {
def rollback(participant: String) = synchronized {
ensureIsActiveOrAborted
log.debug("TX ROLLBACK - Server with UUID [%s] has initiated transaction rollback for [%s]", participant, toString)
- transactionalItems.items.foreach(_.rollback)
+ transaction.abort
status = TransactionStatus.Aborted
reset
}
@@ -97,7 +101,7 @@ object STM {
def rollbackForRescheduling(participant: String) = synchronized {
ensureIsActiveOrAborted
log.debug("TX ROLLBACK for recheduling - Server with UUID [%s] has initiated transaction rollback for [%s]", participant, toString)
- transactionalItems.items.foreach(_.rollback)
+ transaction.abort
reset
}
diff --git a/kernel/src/main/scala/stm/TransactionManagement.scala b/kernel/src/main/scala/stm/TransactionManagement.scala
old mode 100644
new mode 100755
index e67f4ce861..2f0c4015fb
--- a/kernel/src/main/scala/stm/TransactionManagement.scala
+++ b/kernel/src/main/scala/stm/TransactionManagement.scala
@@ -10,6 +10,8 @@ import kernel.reactor.MessageInvocation
import kernel.util.Logging
import org.codehaus.aspectwerkz.proxy.Uuid // FIXME is java.util.UUID better?
+import org.multiverse.utils.TransactionThreadLocal._
+
class TransactionRollbackException(msg: String) extends RuntimeException(msg)
class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
@@ -46,6 +48,7 @@ trait TransactionManagement extends Logging {
val tx = Some(newTx)
activeTx = tx
threadBoundTx.set(tx)
+ setThreadLocalTransaction(tx.get.transaction)
tx
}
@@ -110,6 +113,7 @@ trait TransactionManagement extends Logging {
protected def removeTransactionIfTopLevel = if (isTransactionTopLevel) {
activeTx = None
threadBoundTx.set(None)
+ setThreadLocalTransaction(null)
}
protected def reenteringExistingTransaction= if (activeTx.isDefined) {
diff --git a/kernel/src/test/scala/InMemoryActorSpec.scala b/kernel/src/test/scala/InMemoryActorSpec.scala
index 5ce06a10b3..95dc081f13 100644
--- a/kernel/src/test/scala/InMemoryActorSpec.scala
+++ b/kernel/src/test/scala/InMemoryActorSpec.scala
@@ -26,9 +26,9 @@ case class FailureOneWay(key: String, value: String, failer: Actor)
class InMemStatefulActor extends Actor {
timeout = 100000
makeTransactionRequired
- private val mapState = TransactionalState.newInMemoryMap[String, String]
- private val vectorState = TransactionalState.newInMemoryVector[String]
- private val refState = TransactionalState.newInMemoryRef[String]
+ private val mapState = TransactionalState.newMap[String, String]
+ private val vectorState = TransactionalState.newVector[String]
+ private val refState = TransactionalState.newRef[String]("")
def receive: PartialFunction[Any, Unit] = {
case GetMapState(key) =>
@@ -217,14 +217,26 @@ class InMemoryActorSpec extends TestCase {
def testRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
stateful.start
+ println("------------ 1")
+ try {
stateful !! SetRefState("init") // set init state
+ } catch {
+ case e: RuntimeException =>
+ println("------------ 1.1")
+ stateful !! SetRefState("init") // set init state
+ println("------------ 1.2")
+ }
+ println("------------ 2")
val failer = new InMemFailerActor
failer.start
try {
+ println("------------ 3")
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
+ println("------------ 4")
fail("should have thrown an exception")
- } catch {case e: RuntimeException => {}}
+ } catch {case e: RuntimeException => {
+ println("------------ 5")
+ }}
assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state
}
-
-}
\ No newline at end of file
+}
diff --git a/kernel/src/test/scala/TransactionClasherSpec.scala b/kernel/src/test/scala/TransactionClasherSpec.scala
index b04f14d120..d955331225 100644
--- a/kernel/src/test/scala/TransactionClasherSpec.scala
+++ b/kernel/src/test/scala/TransactionClasherSpec.scala
@@ -24,7 +24,7 @@ class TxActor(clasher: Actor) extends Actor {
}
class TxClasherActor extends Actor {
- val vector = TransactionalState.newInMemoryVector[String]
+ val vector = TransactionalState.newVector[String]
timeout = 1000000
makeTransactionRequired
var count = 0
@@ -59,7 +59,7 @@ class TxActorOneWay(clasher: Actor) extends Actor {
}
class TxClasherActorOneWay extends Actor {
- val vector = TransactionalState.newInMemoryVector[String]
+ val vector = TransactionalState.newVector[String]
timeout = 1000000
makeTransactionRequired
var count = 0
@@ -146,4 +146,4 @@ class TransactionClasherSpec extends TestCase {
assertEquals("First", (clasher !! "Index1").get)
}
*/
-}
\ No newline at end of file
+}
diff --git a/pom.xml b/pom.xml
old mode 100644
new mode 100755
diff --git a/util-java/pom.xml b/util-java/pom.xml
old mode 100644
new mode 100755
index 77565af204..a682897c3a
--- a/util-java/pom.xml
+++ b/util-java/pom.xml
@@ -24,6 +24,11 @@
protobuf-java
2.1.0
+