diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala
index dfd6c53fdf..a7870cec93 100644
--- a/akka-core/src/main/scala/stm/Transaction.scala
+++ b/akka-core/src/main/scala/stm/Transaction.scala
@@ -309,7 +309,7 @@ object Transaction {
val id = Transaction.idFactory.incrementAndGet
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
private[akka] var transaction: Option[MultiverseTransaction] = None
- private[this] val persistentStateMap = new HashMap[String, Committable]
+ private[this] val persistentStateMap = new HashMap[String, Committable with Abortable]
private[akka] val depth = new AtomicInteger(0)
val jta: Option[TransactionContainer] =
@@ -329,9 +329,7 @@ object Transaction {
def commit = synchronized {
log.trace("Committing transaction %s", toString)
- Transaction.atomic0 {
- persistentStateMap.valuesIterator.foreach(_.commit)
- }
+ persistentStateMap.valuesIterator.foreach(_.commit)
status = TransactionStatus.Completed
jta.foreach(_.commit)
}
@@ -339,6 +337,8 @@ object Transaction {
def abort = synchronized {
log.trace("Aborting transaction %s", toString)
jta.foreach(_.rollback)
+ persistentStateMap.valuesIterator.foreach(_.abort)
+ persistentStateMap.clear
}
def isNew = synchronized { status == TransactionStatus.New }
@@ -361,7 +361,7 @@ object Transaction {
private[akka] def isTopLevel = depth.get == 0
- private[akka] def register(uuid: String, storage: Committable) = persistentStateMap.put(uuid, storage)
+ private[akka] def register(uuid: String, storage: Committable with Abortable) = persistentStateMap.put(uuid, storage)
private def ensureIsActive = if (status != TransactionStatus.Active)
throw new StmConfigurationException(
diff --git a/akka-core/src/main/scala/stm/TransactionalState.scala b/akka-core/src/main/scala/stm/TransactionalState.scala
index e3e3f4ac7f..8f449db1b1 100644
--- a/akka-core/src/main/scala/stm/TransactionalState.scala
+++ b/akka-core/src/main/scala/stm/TransactionalState.scala
@@ -56,6 +56,13 @@ trait Committable {
def commit: Unit
}
+/**
+ * @author Jonas Bonér
+ */
+trait Abortable {
+ def abort: Unit
+}
+
object RefFactory {
private val factory = getGlobalStmInstance.getProgrammaticReferenceFactoryBuilder.build
diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
index dcb1f4f85e..bcfb2c1025 100644
--- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
+++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
@@ -81,7 +81,7 @@ trait Storage {
* @author Jonas Bonér
*/
trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
- with Transactional with Committable with Logging {
+ with Transactional with Committable with Abortable with Logging {
protected val newAndUpdatedEntries = TransactionalState.newMap[K, V]
protected val removedEntries = TransactionalState.newVector[K]
protected val shouldClearOnCommit = TransactionalRef[Boolean]()
@@ -97,6 +97,12 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
removedEntries.clear
}
+ def abort = {
+ newAndUpdatedEntries.clear
+ removedEntries.clear
+ shouldClearOnCommit.swap(false)
+ }
+
def -=(key: K) = {
remove(key)
this
@@ -188,7 +194,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
*
* @author Jonas Bonér
*/
-trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committable {
+trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committable with Abortable {
protected val newElems = TransactionalState.newVector[T]
protected val updatedElems = TransactionalState.newMap[Int, T]
protected val removedElems = TransactionalState.newVector[T]
@@ -203,6 +209,13 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
updatedElems.clear
}
+ def abort = {
+ newElems.clear
+ updatedElems.clear
+ removedElems.clear
+ shouldClearOnCommit.swap(false)
+ }
+
def +(elem: T) = add(elem)
def add(elem: T) = {
@@ -262,7 +275,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
*
* @author Jonas Bonér
*/
-trait PersistentRef[T] extends Transactional with Committable {
+trait PersistentRef[T] extends Transactional with Committable with Abortable {
protected val ref = new TransactionalRef[T]
val storage: RefStorageBackend[T]
@@ -272,6 +285,8 @@ trait PersistentRef[T] extends Transactional with Committable {
ref.swap(null.asInstanceOf[T])
}
+ def abort = ref.swap(null.asInstanceOf[T])
+
def swap(elem: T) = {
register
ref.swap(elem)
@@ -319,7 +334,7 @@ trait PersistentRef[T] extends Transactional with Committable {
* @author Debasish Ghosh
*/
trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
- with Transactional with Committable with Logging {
+ with Transactional with Committable with Abortable with Logging {
sealed trait QueueOp
case object ENQ extends QueueOp
@@ -356,8 +371,17 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
enqueuedNDequeuedEntries.clear
localQ.swap(Queue.empty)
pickMeForDQ.swap(0)
+ shouldClearOnCommit.swap(false)
}
+ def abort = {
+ enqueuedNDequeuedEntries.clear
+ shouldClearOnCommit.swap(false)
+ localQ.swap(Queue.empty)
+ pickMeForDQ.swap(0)
+ }
+
+
override def enqueue(elems: A*) {
register
elems.foreach(e => {
@@ -382,9 +406,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
val (a, q) = localQ.get.get.dequeue
localQ.swap(q)
a
- }
- else
- throw new NoSuchElementException("trying to dequeue from empty queue")
+ } else throw new NoSuchElementException("trying to dequeue from empty queue")
}
}
@@ -457,9 +479,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
*
* @author
*/
-trait PersistentSortedSet[A]
- extends Transactional
- with Committable {
+trait PersistentSortedSet[A] extends Transactional with Committable with Abortable {
protected val newElems = TransactionalState.newMap[A, Float]
protected val removedElems = TransactionalState.newVector[A]
@@ -473,6 +493,11 @@ trait PersistentSortedSet[A]
removedElems.clear
}
+ def abort = {
+ newElems.clear
+ removedElems.clear
+ }
+
def +(elem: A, score: Float) = add(elem, score)
def add(elem: A, score: Float) = {
diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala
index a09be9bd51..26aa9eb052 100644
--- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala
+++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala
@@ -1,10 +1,12 @@
package se.scalablesolutions.akka.persistence.redis
+import junit.framework.TestCase
+
import org.junit.{Test, Before}
import org.junit.Assert._
-import se.scalablesolutions.akka.actor.{Actor, ActorRef, Transactor}
-import Actor._
+import se.scalablesolutions.akka.actor.{ActorRef, Transactor}
+import se.scalablesolutions.akka.actor.Actor._
/**
* A persistent actor based on Redis storage.
@@ -23,10 +25,10 @@ case class Balance(accountNo: String)
case class Debit(accountNo: String, amount: BigInt, failer: ActorRef)
case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: ActorRef)
case class Credit(accountNo: String, amount: BigInt)
-case class Log(start: Int, finish: Int)
case object LogSize
class AccountActor extends Transactor {
+ import self._
private lazy val accountState = RedisStorage.newMap
private lazy val txnLog = RedisStorage.newVector
//timeout = 5000
@@ -35,7 +37,7 @@ class AccountActor extends Transactor {
// check balance
case Balance(accountNo) =>
txnLog.add("Balance:%s".format(accountNo).getBytes)
- self.reply(BigInt(new String(accountState.get(accountNo.getBytes).get)))
+ reply(BigInt(new String(accountState.get(accountNo.getBytes).get)))
// debit amount: can fail
case Debit(accountNo, amount, failer) =>
@@ -49,7 +51,7 @@ class AccountActor extends Transactor {
accountState.put(accountNo.getBytes, (m - amount).toString.getBytes)
if (amount > m)
failer !! "Failure"
- else self.reply(m - amount)
+ reply(m - amount)
// many debits: can fail
// demonstrates true rollback even if multiple puts have been done
@@ -67,7 +69,7 @@ class AccountActor extends Transactor {
accountState.put(accountNo.getBytes, (m - bal).toString.getBytes)
}
if (bal > m) failer !! "Failure"
- self.reply(m - bal)
+ reply(m - bal)
// credit amount
case Credit(accountNo, amount) =>
@@ -79,13 +81,10 @@ class AccountActor extends Transactor {
case None => 0
}
accountState.put(accountNo.getBytes, (m + amount).toString.getBytes)
- self.reply(m + amount)
+ reply(m + amount)
case LogSize =>
- self.reply(txnLog.length.asInstanceOf[AnyRef])
-
- case Log(start, finish) =>
- self.reply(txnLog.slice(start, finish))
+ reply(txnLog.length.asInstanceOf[AnyRef])
}
}
@@ -97,62 +96,35 @@ class AccountActor extends Transactor {
}
}
-import org.scalatest.junit.JUnitSuite
-class RedisPersistentActorSpec extends JUnitSuite {
+class RedisPersistentActorSpec extends TestCase {
@Test
- def testSuccessfulDebit {
+ def testSuccessfulDebit = {
val bactor = actorOf[AccountActor]
bactor.start
val failer = actorOf[PersistentFailerActor]
failer.start
+ bactor !! Credit("a-123", 5000)
+ bactor !! Debit("a-123", 3000, failer)
+ assertEquals(BigInt(2000), (bactor !! Balance("a-123")).get)
- val acc = "a-123"
+ bactor !! Credit("a-123", 7000)
+ assertEquals(BigInt(9000), (bactor !! Balance("a-123")).get)
- println("----------- SIZE 0 " + (bactor !! LogSize).get)
+ bactor !! Debit("a-123", 8000, failer)
+ assertEquals(BigInt(1000), (bactor !! Balance("a-123")).get)
- bactor !! Credit(acc, 5000)
- println("----------- SIZE 1 " + (bactor !! LogSize).get)
-
- println(bactor !! Balance(acc))
- println("----------- SIZE 2 " + (bactor !! LogSize).get)
-
- bactor !! Debit(acc, 3000, failer)
- println("----------- SIZE 3 " + (bactor !! LogSize).get)
-
- assertEquals(BigInt(2000), (bactor !! Balance(acc)).get)
- println("----------- SIZE 4 " + (bactor !! LogSize).get)
-
- bactor !! Credit(acc, 7000)
- println("----------- SIZE 5 " + (bactor !! LogSize).get)
-
- assertEquals(BigInt(9000), (bactor !! Balance(acc)).get)
- println("----------- SIZE 6 " + (bactor !! LogSize).get)
-
- bactor !! Debit(acc, 8000, failer)
- println("----------- SIZE 7 " + (bactor !! LogSize).get)
-
- assertEquals(BigInt(1000), (bactor !! Balance(acc)).get)
- println("----------- SIZE 8 " + (bactor !! LogSize).get)
-
- assert(7 === (bactor !! LogSize).get) // Not counting the failed transaction => 7
-
- import scala.collection.mutable.ArrayBuffer
- assert((bactor !! Log(0, 7)).get.asInstanceOf[ArrayBuffer[String]].size == 7)
- assert((bactor !! Log(0, 0)).get.asInstanceOf[ArrayBuffer[String]].size == 0)
- assert((bactor !! Log(1, 2)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
- assert((bactor !! Log(6, 7)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
- assert((bactor !! Log(0, 1)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
+ val c: Int = (bactor !! LogSize).get
+ assertTrue(7 == c)
}
- /**
@Test
- def testUnsuccessfulDebit {
- val bactor = actorOf(new AccountActor)
+ def testUnsuccessfulDebit = {
+ val bactor = actorOf[AccountActor]
bactor.start
bactor !! Credit("a-123", 5000)
assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get)
- val failer = actorOf(new PersistentFailerActor)
+ val failer = actorOf[PersistentFailerActor]
failer.start
try {
bactor !! Debit("a-123", 7000, failer)
@@ -162,19 +134,19 @@ class RedisPersistentActorSpec extends JUnitSuite {
assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get)
// should not count the failed one
- // val c: Int = (bactor !! LogSize).get
- // assertTrue(3 == c)
+ val c: Int = (bactor !! LogSize).get
+ assertTrue(3 == c)
}
@Test
- def testUnsuccessfulMultiDebit {
- val bactor = actorOf(new AccountActor)
+ def testUnsuccessfulMultiDebit = {
+ val bactor = actorOf[AccountActor]
bactor.start
bactor !! Credit("a-123", 5000)
assertEquals(BigInt(5000), (bactor !! (Balance("a-123"), 5000)).get)
- val failer = actorOf(new PersistentFailerActor)
+ val failer = actorOf[PersistentFailerActor]
failer.start
try {
bactor !! MultiDebit("a-123", List(500, 2000, 1000, 3000), failer)
@@ -184,8 +156,7 @@ class RedisPersistentActorSpec extends JUnitSuite {
assertEquals(BigInt(5000), (bactor !! (Balance("a-123"), 5000)).get)
// should not count the failed one
- // val c: Int = (bactor !! LogSize).get
- // assertTrue(3 == c)
+ val c: Int = (bactor !! LogSize).get
+ assertTrue(3 == c)
}
-**/
}
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index d029b8ff0d..c0499632c8 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -8,7 +8,7 @@
filename = "./logs/akka.log"
roll = "daily" # Options: never, hourly, daily, sunday/monday/...
- level = "trace" # Options: fatal, critical, error, warning, info, debug, trace
+ level = "debug" # Options: fatal, critical, error, warning, info, debug, trace
console = on
# syslog_host = ""
# syslog_server_name = ""