From 4b03a9904caf513553dd5b2e5e5353ffcbf8770f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Sun, 30 May 2010 12:51:24 +0200 Subject: [PATCH] Fixed bug in STM and Persistence integration: added trait Abortable and added abort methods to all Persistent datastructures and removed redundant errornous atomic block --- .../src/main/scala/stm/Transaction.scala | 10 +- .../main/scala/stm/TransactionalState.scala | 7 ++ .../src/main/scala/Storage.scala | 45 +++++++-- .../test/scala/RedisPersistentActorSpec.scala | 91 +++++++------------ config/akka-reference.conf | 2 +- 5 files changed, 79 insertions(+), 76 deletions(-) diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala index dfd6c53fdf..a7870cec93 100644 --- a/akka-core/src/main/scala/stm/Transaction.scala +++ b/akka-core/src/main/scala/stm/Transaction.scala @@ -309,7 +309,7 @@ object Transaction { val id = Transaction.idFactory.incrementAndGet @volatile private[this] var status: TransactionStatus = TransactionStatus.New private[akka] var transaction: Option[MultiverseTransaction] = None - private[this] val persistentStateMap = new HashMap[String, Committable] + private[this] val persistentStateMap = new HashMap[String, Committable with Abortable] private[akka] val depth = new AtomicInteger(0) val jta: Option[TransactionContainer] = @@ -329,9 +329,7 @@ object Transaction { def commit = synchronized { log.trace("Committing transaction %s", toString) - Transaction.atomic0 { - persistentStateMap.valuesIterator.foreach(_.commit) - } + persistentStateMap.valuesIterator.foreach(_.commit) status = TransactionStatus.Completed jta.foreach(_.commit) } @@ -339,6 +337,8 @@ object Transaction { def abort = synchronized { log.trace("Aborting transaction %s", toString) jta.foreach(_.rollback) + persistentStateMap.valuesIterator.foreach(_.abort) + persistentStateMap.clear } def isNew = synchronized { status == TransactionStatus.New } @@ -361,7 +361,7 @@ object Transaction { private[akka] def isTopLevel = depth.get == 0 - private[akka] def register(uuid: String, storage: Committable) = persistentStateMap.put(uuid, storage) + private[akka] def register(uuid: String, storage: Committable with Abortable) = persistentStateMap.put(uuid, storage) private def ensureIsActive = if (status != TransactionStatus.Active) throw new StmConfigurationException( diff --git a/akka-core/src/main/scala/stm/TransactionalState.scala b/akka-core/src/main/scala/stm/TransactionalState.scala index e3e3f4ac7f..8f449db1b1 100644 --- a/akka-core/src/main/scala/stm/TransactionalState.scala +++ b/akka-core/src/main/scala/stm/TransactionalState.scala @@ -56,6 +56,13 @@ trait Committable { def commit: Unit } +/** + * @author Jonas Bonér + */ +trait Abortable { + def abort: Unit +} + object RefFactory { private val factory = getGlobalStmInstance.getProgrammaticReferenceFactoryBuilder.build diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala index dcb1f4f85e..bcfb2c1025 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala @@ -81,7 +81,7 @@ trait Storage { * @author Jonas Bonér */ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] - with Transactional with Committable with Logging { + with Transactional with Committable with Abortable with Logging { protected val newAndUpdatedEntries = TransactionalState.newMap[K, V] protected val removedEntries = TransactionalState.newVector[K] protected val shouldClearOnCommit = TransactionalRef[Boolean]() @@ -97,6 +97,12 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] removedEntries.clear } + def abort = { + newAndUpdatedEntries.clear + removedEntries.clear + shouldClearOnCommit.swap(false) + } + def -=(key: K) = { remove(key) this @@ -188,7 +194,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] * * @author Jonas Bonér */ -trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committable { +trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committable with Abortable { protected val newElems = TransactionalState.newVector[T] protected val updatedElems = TransactionalState.newMap[Int, T] protected val removedElems = TransactionalState.newVector[T] @@ -203,6 +209,13 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa updatedElems.clear } + def abort = { + newElems.clear + updatedElems.clear + removedElems.clear + shouldClearOnCommit.swap(false) + } + def +(elem: T) = add(elem) def add(elem: T) = { @@ -262,7 +275,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa * * @author Jonas Bonér */ -trait PersistentRef[T] extends Transactional with Committable { +trait PersistentRef[T] extends Transactional with Committable with Abortable { protected val ref = new TransactionalRef[T] val storage: RefStorageBackend[T] @@ -272,6 +285,8 @@ trait PersistentRef[T] extends Transactional with Committable { ref.swap(null.asInstanceOf[T]) } + def abort = ref.swap(null.asInstanceOf[T]) + def swap(elem: T) = { register ref.swap(elem) @@ -319,7 +334,7 @@ trait PersistentRef[T] extends Transactional with Committable { * @author Debasish Ghosh */ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] - with Transactional with Committable with Logging { + with Transactional with Committable with Abortable with Logging { sealed trait QueueOp case object ENQ extends QueueOp @@ -356,8 +371,17 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] enqueuedNDequeuedEntries.clear localQ.swap(Queue.empty) pickMeForDQ.swap(0) + shouldClearOnCommit.swap(false) } + def abort = { + enqueuedNDequeuedEntries.clear + shouldClearOnCommit.swap(false) + localQ.swap(Queue.empty) + pickMeForDQ.swap(0) + } + + override def enqueue(elems: A*) { register elems.foreach(e => { @@ -382,9 +406,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] val (a, q) = localQ.get.get.dequeue localQ.swap(q) a - } - else - throw new NoSuchElementException("trying to dequeue from empty queue") + } else throw new NoSuchElementException("trying to dequeue from empty queue") } } @@ -457,9 +479,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] * * @author */ -trait PersistentSortedSet[A] - extends Transactional - with Committable { +trait PersistentSortedSet[A] extends Transactional with Committable with Abortable { protected val newElems = TransactionalState.newMap[A, Float] protected val removedElems = TransactionalState.newVector[A] @@ -473,6 +493,11 @@ trait PersistentSortedSet[A] removedElems.clear } + def abort = { + newElems.clear + removedElems.clear + } + def +(elem: A, score: Float) = add(elem, score) def add(elem: A, score: Float) = { diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala index a09be9bd51..26aa9eb052 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala @@ -1,10 +1,12 @@ package se.scalablesolutions.akka.persistence.redis +import junit.framework.TestCase + import org.junit.{Test, Before} import org.junit.Assert._ -import se.scalablesolutions.akka.actor.{Actor, ActorRef, Transactor} -import Actor._ +import se.scalablesolutions.akka.actor.{ActorRef, Transactor} +import se.scalablesolutions.akka.actor.Actor._ /** * A persistent actor based on Redis storage. @@ -23,10 +25,10 @@ case class Balance(accountNo: String) case class Debit(accountNo: String, amount: BigInt, failer: ActorRef) case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: ActorRef) case class Credit(accountNo: String, amount: BigInt) -case class Log(start: Int, finish: Int) case object LogSize class AccountActor extends Transactor { + import self._ private lazy val accountState = RedisStorage.newMap private lazy val txnLog = RedisStorage.newVector //timeout = 5000 @@ -35,7 +37,7 @@ class AccountActor extends Transactor { // check balance case Balance(accountNo) => txnLog.add("Balance:%s".format(accountNo).getBytes) - self.reply(BigInt(new String(accountState.get(accountNo.getBytes).get))) + reply(BigInt(new String(accountState.get(accountNo.getBytes).get))) // debit amount: can fail case Debit(accountNo, amount, failer) => @@ -49,7 +51,7 @@ class AccountActor extends Transactor { accountState.put(accountNo.getBytes, (m - amount).toString.getBytes) if (amount > m) failer !! "Failure" - else self.reply(m - amount) + reply(m - amount) // many debits: can fail // demonstrates true rollback even if multiple puts have been done @@ -67,7 +69,7 @@ class AccountActor extends Transactor { accountState.put(accountNo.getBytes, (m - bal).toString.getBytes) } if (bal > m) failer !! "Failure" - self.reply(m - bal) + reply(m - bal) // credit amount case Credit(accountNo, amount) => @@ -79,13 +81,10 @@ class AccountActor extends Transactor { case None => 0 } accountState.put(accountNo.getBytes, (m + amount).toString.getBytes) - self.reply(m + amount) + reply(m + amount) case LogSize => - self.reply(txnLog.length.asInstanceOf[AnyRef]) - - case Log(start, finish) => - self.reply(txnLog.slice(start, finish)) + reply(txnLog.length.asInstanceOf[AnyRef]) } } @@ -97,62 +96,35 @@ class AccountActor extends Transactor { } } -import org.scalatest.junit.JUnitSuite -class RedisPersistentActorSpec extends JUnitSuite { +class RedisPersistentActorSpec extends TestCase { @Test - def testSuccessfulDebit { + def testSuccessfulDebit = { val bactor = actorOf[AccountActor] bactor.start val failer = actorOf[PersistentFailerActor] failer.start + bactor !! Credit("a-123", 5000) + bactor !! Debit("a-123", 3000, failer) + assertEquals(BigInt(2000), (bactor !! Balance("a-123")).get) - val acc = "a-123" + bactor !! Credit("a-123", 7000) + assertEquals(BigInt(9000), (bactor !! Balance("a-123")).get) - println("----------- SIZE 0 " + (bactor !! LogSize).get) + bactor !! Debit("a-123", 8000, failer) + assertEquals(BigInt(1000), (bactor !! Balance("a-123")).get) - bactor !! Credit(acc, 5000) - println("----------- SIZE 1 " + (bactor !! LogSize).get) - - println(bactor !! Balance(acc)) - println("----------- SIZE 2 " + (bactor !! LogSize).get) - - bactor !! Debit(acc, 3000, failer) - println("----------- SIZE 3 " + (bactor !! LogSize).get) - - assertEquals(BigInt(2000), (bactor !! Balance(acc)).get) - println("----------- SIZE 4 " + (bactor !! LogSize).get) - - bactor !! Credit(acc, 7000) - println("----------- SIZE 5 " + (bactor !! LogSize).get) - - assertEquals(BigInt(9000), (bactor !! Balance(acc)).get) - println("----------- SIZE 6 " + (bactor !! LogSize).get) - - bactor !! Debit(acc, 8000, failer) - println("----------- SIZE 7 " + (bactor !! LogSize).get) - - assertEquals(BigInt(1000), (bactor !! Balance(acc)).get) - println("----------- SIZE 8 " + (bactor !! LogSize).get) - - assert(7 === (bactor !! LogSize).get) // Not counting the failed transaction => 7 - - import scala.collection.mutable.ArrayBuffer - assert((bactor !! Log(0, 7)).get.asInstanceOf[ArrayBuffer[String]].size == 7) - assert((bactor !! Log(0, 0)).get.asInstanceOf[ArrayBuffer[String]].size == 0) - assert((bactor !! Log(1, 2)).get.asInstanceOf[ArrayBuffer[String]].size == 1) - assert((bactor !! Log(6, 7)).get.asInstanceOf[ArrayBuffer[String]].size == 1) - assert((bactor !! Log(0, 1)).get.asInstanceOf[ArrayBuffer[String]].size == 1) + val c: Int = (bactor !! LogSize).get + assertTrue(7 == c) } - /** @Test - def testUnsuccessfulDebit { - val bactor = actorOf(new AccountActor) + def testUnsuccessfulDebit = { + val bactor = actorOf[AccountActor] bactor.start bactor !! Credit("a-123", 5000) assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get) - val failer = actorOf(new PersistentFailerActor) + val failer = actorOf[PersistentFailerActor] failer.start try { bactor !! Debit("a-123", 7000, failer) @@ -162,19 +134,19 @@ class RedisPersistentActorSpec extends JUnitSuite { assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get) // should not count the failed one - // val c: Int = (bactor !! LogSize).get - // assertTrue(3 == c) + val c: Int = (bactor !! LogSize).get + assertTrue(3 == c) } @Test - def testUnsuccessfulMultiDebit { - val bactor = actorOf(new AccountActor) + def testUnsuccessfulMultiDebit = { + val bactor = actorOf[AccountActor] bactor.start bactor !! Credit("a-123", 5000) assertEquals(BigInt(5000), (bactor !! (Balance("a-123"), 5000)).get) - val failer = actorOf(new PersistentFailerActor) + val failer = actorOf[PersistentFailerActor] failer.start try { bactor !! MultiDebit("a-123", List(500, 2000, 1000, 3000), failer) @@ -184,8 +156,7 @@ class RedisPersistentActorSpec extends JUnitSuite { assertEquals(BigInt(5000), (bactor !! (Balance("a-123"), 5000)).get) // should not count the failed one - // val c: Int = (bactor !! LogSize).get - // assertTrue(3 == c) + val c: Int = (bactor !! LogSize).get + assertTrue(3 == c) } -**/ } diff --git a/config/akka-reference.conf b/config/akka-reference.conf index d029b8ff0d..c0499632c8 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -8,7 +8,7 @@ filename = "./logs/akka.log" roll = "daily" # Options: never, hourly, daily, sunday/monday/... - level = "trace" # Options: fatal, critical, error, warning, info, debug, trace + level = "debug" # Options: fatal, critical, error, warning, info, debug, trace console = on # syslog_host = "" # syslog_server_name = ""