diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index ffd551cd21..015fb02974 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -342,7 +342,6 @@ trait Actor extends Logging with TransactionManagement { if (!linkedActors.contains(actor)) throw new IllegalStateException("Actor [" + actor + "] is not a linked actor, can't unlink") linkedActors.remove(actor) actor.supervisor = None - log.debug("Unlinking actor [%s] from actor [%s]", actor, this) } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } diff --git a/akka-actors/src/main/scala/stm/Transaction.scala b/akka-actors/src/main/scala/stm/Transaction.scala index 476acb7262..6e384e609a 100644 --- a/akka-actors/src/main/scala/stm/Transaction.scala +++ b/akka-actors/src/main/scala/stm/Transaction.scala @@ -47,7 +47,7 @@ object Multiverse { * @author Jonas Bonér */ object Transaction { - val idFactory = new AtomicLong(0L) + val idFactory = new AtomicLong(-1L) // -- Monad -------------------------- diff --git a/akka-actors/src/main/scala/stm/TransactionManagement.scala b/akka-actors/src/main/scala/stm/TransactionManagement.scala index fc7d1b0334..63b3efab9d 100644 --- a/akka-actors/src/main/scala/stm/TransactionManagement.scala +++ b/akka-actors/src/main/scala/stm/TransactionManagement.scala @@ -59,6 +59,7 @@ trait TransactionManagement extends Logging { val currentTx = cflowTx.get currentTx.join(uuid) activeTx = Some(currentTx) + log.debug("Joining transaction [%s]", currentTx) } } @@ -101,21 +102,17 @@ trait TransactionManagement extends Logging { } } - protected def isTransactionTopLevel = activeTx.isDefined && activeTx.get.isTopLevel - protected def isInExistingTransaction = TransactionManagement.threadBoundTx.get.isDefined + protected def isTransactionTopLevel = activeTx.isDefined && activeTx.get.isTopLevel + protected def isTransactionAborted = activeTx.isDefined && activeTx.get.isAborted protected def incrementTransaction = if (activeTx.isDefined) activeTx.get.increment protected def decrementTransaction = if (activeTx.isDefined) activeTx.get.decrement - protected def removeTransactionIfTopLevel = if (isTransactionTopLevel) { - activeTx = None - threadBoundTx.set(None) - setThreadLocalTransaction(null) - } + protected def removeTransactionIfTopLevel = if (isTransactionTopLevel) { activeTx = None } protected def reenteringExistingTransaction= if (activeTx.isDefined) { val cflowTx = threadBoundTx.get diff --git a/akka-actors/src/main/scala/stm/TransactionalState.scala b/akka-actors/src/main/scala/stm/TransactionalState.scala index 6806c039bf..769b97b948 100644 --- a/akka-actors/src/main/scala/stm/TransactionalState.scala +++ b/akka-actors/src/main/scala/stm/TransactionalState.scala @@ -15,8 +15,6 @@ import org.codehaus.aspectwerkz.proxy.Uuid import scala.collection.mutable.{ArrayBuffer, HashMap} /** - * Scala API. - *

* Example Scala usage: *

  * val myMap = TransactionalState.newMap
@@ -29,44 +27,23 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
  * val myVector = TransactionalVector()
  * val myRef = TransactionalRef()
  * 
- */ -object TransactionalState extends TransactionalState - -/** - * Java API. + * *

* Example Java usage: *

- * TransactionalState state = new TransactionalState();
- * TransactionalMap myMap = state.newMap();
+ * TransactionalMap myMap = TransactionalState.newMap();
  * 
+ * + * @author Jonas Bonér */ -class TransactionalState { - def newMap[K, V] = { -// new AtomicTemplate[TransactionalMap[K, V]]() { -// def execute(t: Transaction): TransactionalMap[K, V] = { - new TransactionalMap[K, V] -// } -// }.execute() - } - def newVector[T] = { -// new AtomicTemplate[TransactionalVector[T]]() { -// def execute(t: Transaction): TransactionalVector[T] = { - new TransactionalVector[T] -// } -// }.execute() - } - def newRef[T] = { -// new AtomicTemplate[TransactionalRef[T]]() { -// def execute(t: Transaction): TransactionalRef[T] = { - new TransactionalRef[T] -// } -// }.execute() - } +object TransactionalState { + def newMap[K, V] = TransactionalMap[K, V]() + def newVector[T] = TransactionalVector[T]() + def newRef[T] = TransactionalRef[T]() } /** - * @author Jonas Bonér + * @author Jonas Bonér */ @serializable trait Transactional { @@ -74,48 +51,67 @@ trait Transactional { val uuid = Uuid.newUuid.toString } +/** + * @author Jonas Bonér + */ +object TransactionalRef { + /** + * An implicit conversion that converts an option to an iterable value + */ + implicit def ref2Iterable[T](ref: TransactionalRef[T]): Iterable[T] = ref.toList + + def apply[T]() = new TransactionalRef[T] +} + /** * Implements a transactional managed reference. * * @author Jonas Bonér - * -class TransactionalRef[T] extends Transactional { - protected[this] var ref: Option[Ref[T]] = None - - def set(elem: T) = swap(elem) - - def swap(elem: T) = { - synchronized { if (ref.isEmpty) ref = Some(new Ref[T]) } - ref.get.set(elem) + */ +class TransactionalRef[+T] extends Transactional { + private[this] val ref = new Ref[T] + + def swap(elem: T) = ref.set(elem) + + def get: Option[T] = { + if (ref.isNull) None + else Some(ref.get) } - def get: Option[T] = - if (isEmpty) None - else Some(ref.get.get) - - def getOrWait: T = { - synchronized { if (ref.isEmpty) ref = Some(new Ref[T]) } - ref.get.getOrAwait + def getOrWait: T = ref.getOrAwait + + def getOrElse(default: => T): T = { + if (ref.isNull) default + else ref.get } - def getOrElse(default: => T): T = - if (isEmpty) default - else ref.get.get - - def isDefined: Boolean = ref.isDefined //&& !ref.get.isNull + def isDefined: Boolean = !ref.isNull - def isEmpty: Boolean = !isDefined + def isEmpty: Boolean = ref.isNull + + def map[B](f: T => B): Option[B] = if (isEmpty) None else Some(f(ref.get)) + + def flatMap[B](f: T => Option[B]): Option[B] = if (isEmpty) None else f(ref.get) + + def filter(p: T => Boolean): Option[T] = if (isEmpty || p(ref.get)) this else None + + def foreach(f: T => Unit) { if (!isEmpty) f(ref.get) } + + def elements: Iterator[T] = if (isEmpty) Iterator.empty else Iterator.fromValues(ref.get) + + def toList: List[T] = if (isEmpty) List() else List(ref.get) + + def toRight[X](left: => X) = if (isEmpty) Left(left) else Right(ref.get) + + def toLeft[X](right: => X) = if (isEmpty) Right(right) else Left(ref.get) + + def orElse[B >: T](alternative: => TransactionalRef[B]): TransactionalRef[B] = if (isEmpty) alternative else this } -object TransactionalRef { - def apply[T](elem: T) = { - if (elem == null) throw new IllegalArgumentException("Can't define TransactionalRef with a null initial value, needs to be a PersistentDataStructure") - val ref = new TransactionalRef[T] - ref.swap(elem) - ref - } +object TransactionalMap { + def apply[K, V]() = new TransactionalMap[K, V] } -*/ + /** * Implements an in-memory transactional Map based on Clojure's PersistentMap. * @@ -163,8 +159,8 @@ class TransactionalMap[K, V] extends Transactional with scala.collection.mutable other.hashCode == hashCode } -object TransactionalMap { - def apply[K, V]() = new TransactionalMap[K, V] +object TransactionalVector { + def apply[T]() = new TransactionalVector } /** @@ -204,42 +200,3 @@ class TransactionalVector[T] extends Transactional with RandomAccessSeq[T] { other.hashCode == hashCode } -object TransactionalVector { - def apply[T]() = new TransactionalVector -} - -class TransactionalRef[T] extends Transactional { - private[this] val ref = new Ref[T] - - def swap(elem: T) = - try { ref.set(elem) } catch { case e: org.multiverse.api.exceptions.LoadTooOldVersionException => ref.set(elem) } - - - def get: Option[T] = { - if (ref.isNull) None - else Some(ref.get) - } - - def getOrWait: T = ref.getOrAwait - - def getOrElse(default: => T): T = { - if (ref.isNull) default - else ref.get - } - - def isDefined: Boolean = !ref.isNull - - def isEmpty: Boolean = ref.isNull -} - -object TransactionalRef { - def apply[T]() = { -// new AtomicTemplate[TransactionalRef[T]]() { -// def execute(t: Transaction): TransactionalRef[T] = { - new TransactionalRef[T] -// } -// }.execute() - } -} - - diff --git a/akka-actors/src/test/scala/InMemoryActorSpec.scala b/akka-actors/src/test/scala/InMemoryActorSpec.scala index f03a066813..91147c5acc 100644 --- a/akka-actors/src/test/scala/InMemoryActorSpec.scala +++ b/akka-actors/src/test/scala/InMemoryActorSpec.scala @@ -226,13 +226,12 @@ class InMemoryActorSpec extends TestCase { val stateful = new InMemStatefulActor stateful.start stateful !! SetRefState("init") // set init state -/* val failer = new InMemFailerActor + val failer = new InMemFailerActor failer.start try { stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method fail("should have thrown an exception") - } catch {case e: RuntimeException => { - }} -*/ assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state + } catch {case e: RuntimeException => {}} + assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state } } diff --git a/akka-persistence/src/main/scala/CassandraStorage.scala b/akka-persistence/src/main/scala/CassandraStorage.scala index 3537b2f2eb..1ac84b77b1 100644 --- a/akka-persistence/src/main/scala/CassandraStorage.scala +++ b/akka-persistence/src/main/scala/CassandraStorage.scala @@ -64,7 +64,7 @@ object CassandraStorage extends MapStorage StackPool(SocketProvider(CASSANDRA_SERVER_HOSTNAME, CASSANDRA_SERVER_PORT)), protocol, CONSISTENCY_LEVEL) - + // =============================================================== // For Ref // =============================================================== diff --git a/akka-persistence/src/main/scala/PersistentState.scala b/akka-persistence/src/main/scala/PersistentState.scala index 30eab61f14..332da23b33 100644 --- a/akka-persistence/src/main/scala/PersistentState.scala +++ b/akka-persistence/src/main/scala/PersistentState.scala @@ -19,25 +19,18 @@ case class TokyoCabinetStorageConfig extends PersistentStorageConfig case class MongoStorageConfig extends PersistentStorageConfig /** - * Scala API. - *

* Example Scala usage: *

  * val myMap = PersistentState.newMap(CassandraStorageConfig)
  * 
- */ -object PersistentState extends PersistentState - -/** - * Java API. *

+ * * Example Java usage: *

- * PersistentState state = new PersistentState();
- * TransactionalMap myMap = state.newMap(new CassandraStorageConfig());
+ * TransactionalMap myMap = PersistentState.newMap(new CassandraStorageConfig());
  * 
*/ -class PersistentState { +object PersistentState { def newMap(config: PersistentStorageConfig): PersistentMap = config match { case CassandraStorageConfig() => new CassandraPersistentMap case MongoStorageConfig() => new MongoPersistentMap @@ -72,7 +65,7 @@ class PersistentState { trait PersistentMap extends scala.collection.mutable.Map[AnyRef, AnyRef] with Transactional { protected val newAndUpdatedEntries = TransactionalState.newMap[AnyRef, AnyRef] protected val removedEntries = TransactionalState.newMap[AnyRef, AnyRef] - protected val shouldClearOnCommit = TransactionalRef[Boolean](false) + protected val shouldClearOnCommit = TransactionalRef[Boolean]() // to be concretized in subclasses val storage: MapStorage @@ -165,7 +158,7 @@ trait PersistentVector extends RandomAccessSeq[AnyRef] with Transactional { protected val newElems = TransactionalState.newVector[AnyRef] protected val updatedElems = TransactionalState.newMap[Int, AnyRef] protected val removedElems = TransactionalState.newVector[AnyRef] - protected val shouldClearOnCommit = TransactionalRef[Boolean](false) + protected val shouldClearOnCommit = TransactionalRef[Boolean]() val storage: VectorStorage diff --git a/akka-persistence/src/test/scala/AllTest.scala b/akka-persistence/src/test/scala/AllTest.scala index e09ace7996..68b5d22235 100644 --- a/akka-persistence/src/test/scala/AllTest.scala +++ b/akka-persistence/src/test/scala/AllTest.scala @@ -9,8 +9,8 @@ object AllTest extends TestCase { def suite(): Test = { val suite = new TestSuite("All Scala tests") suite.addTestSuite(classOf[CassandraPersistentActorSpec]) - //suite.addTestSuite(classOf[MongoPersistentActorSpec]) - //suite.addTestSuite(classOf[MongoStorageSpec]) + suite.addTestSuite(classOf[MongoPersistentActorSpec]) + suite.addTestSuite(classOf[MongoStorageSpec]) suite } diff --git a/akka-persistence/src/test/scala/CassandraPersistentActorSpec.scala b/akka-persistence/src/test/scala/CassandraPersistentActorSpec.scala index 53a4d5e625..db755294dc 100644 --- a/akka-persistence/src/test/scala/CassandraPersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/CassandraPersistentActorSpec.scala @@ -30,9 +30,16 @@ case class FailureOneWay(key: String, value: String, failer: Actor) class CassandraPersistentActor extends Actor { timeout = 100000 makeTransactionRequired - private val mapState = PersistentState.newMap(CassandraStorageConfig()) - private val vectorState = PersistentState.newVector(CassandraStorageConfig()) - private val refState = PersistentState.newRef(CassandraStorageConfig()) + + private var mapState: PersistentMap = _ + private var vectorState: PersistentVector = _ + private var refState: PersistentRef = _ + + override def initializeTransactionalState = { + mapState = PersistentState.newMap(CassandraStorageConfig()) + vectorState = PersistentState.newVector(CassandraStorageConfig()) + refState = PersistentState.newRef(CassandraStorageConfig()) + } def receive: PartialFunction[Any, Unit] = { case GetMapState(key) => diff --git a/akka-persistence/src/test/scala/MongoPersistentActorSpec.scala b/akka-persistence/src/test/scala/MongoPersistentActorSpec.scala index 830c36bfb5..dc060224f6 100644 --- a/akka-persistence/src/test/scala/MongoPersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/MongoPersistentActorSpec.scala @@ -28,10 +28,13 @@ case object LogSize class BankAccountActor extends Actor { makeTransactionRequired - private val accountState = - PersistentState.newMap(MongoStorageConfig()) - private val txnLog = - PersistentState.newVector(MongoStorageConfig()) + + private var accountState: PersistentMap = _ + private var txnLog: PersistentVector = _ + override def initializeTransactionalState = { + accountState = PersistentState.newMap(MongoStorageConfig()) + txnLog = PersistentState.newVector(MongoStorageConfig()) + } def receive: PartialFunction[Any, Unit] = { // check balance diff --git a/akka.iws b/akka.iws index d3b34d651f..650d20a2ac 100644 --- a/akka.iws +++ b/akka.iws @@ -2,15 +2,16 @@ - - + - - + + + - + + @@ -25,6 +26,47 @@ + + + + + + + +