Fixed bug in STM and Persistence integration: added trait Abortable and added abort methods to all Persistent datastructures and removed redundant errornous atomic block

This commit is contained in:
Jonas Bonér 2010-05-30 12:51:24 +02:00
parent 22630c5329
commit 4b03a9904c
5 changed files with 79 additions and 76 deletions

View file

@ -309,7 +309,7 @@ object Transaction {
val id = Transaction.idFactory.incrementAndGet
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
private[akka] var transaction: Option[MultiverseTransaction] = None
private[this] val persistentStateMap = new HashMap[String, Committable]
private[this] val persistentStateMap = new HashMap[String, Committable with Abortable]
private[akka] val depth = new AtomicInteger(0)
val jta: Option[TransactionContainer] =
@ -329,9 +329,7 @@ object Transaction {
def commit = synchronized {
log.trace("Committing transaction %s", toString)
Transaction.atomic0 {
persistentStateMap.valuesIterator.foreach(_.commit)
}
persistentStateMap.valuesIterator.foreach(_.commit)
status = TransactionStatus.Completed
jta.foreach(_.commit)
}
@ -339,6 +337,8 @@ object Transaction {
def abort = synchronized {
log.trace("Aborting transaction %s", toString)
jta.foreach(_.rollback)
persistentStateMap.valuesIterator.foreach(_.abort)
persistentStateMap.clear
}
def isNew = synchronized { status == TransactionStatus.New }
@ -361,7 +361,7 @@ object Transaction {
private[akka] def isTopLevel = depth.get == 0
private[akka] def register(uuid: String, storage: Committable) = persistentStateMap.put(uuid, storage)
private[akka] def register(uuid: String, storage: Committable with Abortable) = persistentStateMap.put(uuid, storage)
private def ensureIsActive = if (status != TransactionStatus.Active)
throw new StmConfigurationException(

View file

@ -56,6 +56,13 @@ trait Committable {
def commit: Unit
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Abortable {
def abort: Unit
}
object RefFactory {
private val factory = getGlobalStmInstance.getProgrammaticReferenceFactoryBuilder.build

View file

@ -81,7 +81,7 @@ trait Storage {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
with Transactional with Committable with Logging {
with Transactional with Committable with Abortable with Logging {
protected val newAndUpdatedEntries = TransactionalState.newMap[K, V]
protected val removedEntries = TransactionalState.newVector[K]
protected val shouldClearOnCommit = TransactionalRef[Boolean]()
@ -97,6 +97,12 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
removedEntries.clear
}
def abort = {
newAndUpdatedEntries.clear
removedEntries.clear
shouldClearOnCommit.swap(false)
}
def -=(key: K) = {
remove(key)
this
@ -188,7 +194,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committable {
trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committable with Abortable {
protected val newElems = TransactionalState.newVector[T]
protected val updatedElems = TransactionalState.newMap[Int, T]
protected val removedElems = TransactionalState.newVector[T]
@ -203,6 +209,13 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
updatedElems.clear
}
def abort = {
newElems.clear
updatedElems.clear
removedElems.clear
shouldClearOnCommit.swap(false)
}
def +(elem: T) = add(elem)
def add(elem: T) = {
@ -262,7 +275,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait PersistentRef[T] extends Transactional with Committable {
trait PersistentRef[T] extends Transactional with Committable with Abortable {
protected val ref = new TransactionalRef[T]
val storage: RefStorageBackend[T]
@ -272,6 +285,8 @@ trait PersistentRef[T] extends Transactional with Committable {
ref.swap(null.asInstanceOf[T])
}
def abort = ref.swap(null.asInstanceOf[T])
def swap(elem: T) = {
register
ref.swap(elem)
@ -319,7 +334,7 @@ trait PersistentRef[T] extends Transactional with Committable {
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
with Transactional with Committable with Logging {
with Transactional with Committable with Abortable with Logging {
sealed trait QueueOp
case object ENQ extends QueueOp
@ -356,8 +371,17 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
enqueuedNDequeuedEntries.clear
localQ.swap(Queue.empty)
pickMeForDQ.swap(0)
shouldClearOnCommit.swap(false)
}
def abort = {
enqueuedNDequeuedEntries.clear
shouldClearOnCommit.swap(false)
localQ.swap(Queue.empty)
pickMeForDQ.swap(0)
}
override def enqueue(elems: A*) {
register
elems.foreach(e => {
@ -382,9 +406,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
val (a, q) = localQ.get.get.dequeue
localQ.swap(q)
a
}
else
throw new NoSuchElementException("trying to dequeue from empty queue")
} else throw new NoSuchElementException("trying to dequeue from empty queue")
}
}
@ -457,9 +479,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
*
* @author <a href="http://debasishg.blogspot.com"</a>
*/
trait PersistentSortedSet[A]
extends Transactional
with Committable {
trait PersistentSortedSet[A] extends Transactional with Committable with Abortable {
protected val newElems = TransactionalState.newMap[A, Float]
protected val removedElems = TransactionalState.newVector[A]
@ -473,6 +493,11 @@ trait PersistentSortedSet[A]
removedElems.clear
}
def abort = {
newElems.clear
removedElems.clear
}
def +(elem: A, score: Float) = add(elem, score)
def add(elem: A, score: Float) = {

View file

@ -1,10 +1,12 @@
package se.scalablesolutions.akka.persistence.redis
import junit.framework.TestCase
import org.junit.{Test, Before}
import org.junit.Assert._
import se.scalablesolutions.akka.actor.{Actor, ActorRef, Transactor}
import Actor._
import se.scalablesolutions.akka.actor.{ActorRef, Transactor}
import se.scalablesolutions.akka.actor.Actor._
/**
* A persistent actor based on Redis storage.
@ -23,10 +25,10 @@ case class Balance(accountNo: String)
case class Debit(accountNo: String, amount: BigInt, failer: ActorRef)
case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: ActorRef)
case class Credit(accountNo: String, amount: BigInt)
case class Log(start: Int, finish: Int)
case object LogSize
class AccountActor extends Transactor {
import self._
private lazy val accountState = RedisStorage.newMap
private lazy val txnLog = RedisStorage.newVector
//timeout = 5000
@ -35,7 +37,7 @@ class AccountActor extends Transactor {
// check balance
case Balance(accountNo) =>
txnLog.add("Balance:%s".format(accountNo).getBytes)
self.reply(BigInt(new String(accountState.get(accountNo.getBytes).get)))
reply(BigInt(new String(accountState.get(accountNo.getBytes).get)))
// debit amount: can fail
case Debit(accountNo, amount, failer) =>
@ -49,7 +51,7 @@ class AccountActor extends Transactor {
accountState.put(accountNo.getBytes, (m - amount).toString.getBytes)
if (amount > m)
failer !! "Failure"
else self.reply(m - amount)
reply(m - amount)
// many debits: can fail
// demonstrates true rollback even if multiple puts have been done
@ -67,7 +69,7 @@ class AccountActor extends Transactor {
accountState.put(accountNo.getBytes, (m - bal).toString.getBytes)
}
if (bal > m) failer !! "Failure"
self.reply(m - bal)
reply(m - bal)
// credit amount
case Credit(accountNo, amount) =>
@ -79,13 +81,10 @@ class AccountActor extends Transactor {
case None => 0
}
accountState.put(accountNo.getBytes, (m + amount).toString.getBytes)
self.reply(m + amount)
reply(m + amount)
case LogSize =>
self.reply(txnLog.length.asInstanceOf[AnyRef])
case Log(start, finish) =>
self.reply(txnLog.slice(start, finish))
reply(txnLog.length.asInstanceOf[AnyRef])
}
}
@ -97,62 +96,35 @@ class AccountActor extends Transactor {
}
}
import org.scalatest.junit.JUnitSuite
class RedisPersistentActorSpec extends JUnitSuite {
class RedisPersistentActorSpec extends TestCase {
@Test
def testSuccessfulDebit {
def testSuccessfulDebit = {
val bactor = actorOf[AccountActor]
bactor.start
val failer = actorOf[PersistentFailerActor]
failer.start
bactor !! Credit("a-123", 5000)
bactor !! Debit("a-123", 3000, failer)
assertEquals(BigInt(2000), (bactor !! Balance("a-123")).get)
val acc = "a-123"
bactor !! Credit("a-123", 7000)
assertEquals(BigInt(9000), (bactor !! Balance("a-123")).get)
println("----------- SIZE 0 " + (bactor !! LogSize).get)
bactor !! Debit("a-123", 8000, failer)
assertEquals(BigInt(1000), (bactor !! Balance("a-123")).get)
bactor !! Credit(acc, 5000)
println("----------- SIZE 1 " + (bactor !! LogSize).get)
println(bactor !! Balance(acc))
println("----------- SIZE 2 " + (bactor !! LogSize).get)
bactor !! Debit(acc, 3000, failer)
println("----------- SIZE 3 " + (bactor !! LogSize).get)
assertEquals(BigInt(2000), (bactor !! Balance(acc)).get)
println("----------- SIZE 4 " + (bactor !! LogSize).get)
bactor !! Credit(acc, 7000)
println("----------- SIZE 5 " + (bactor !! LogSize).get)
assertEquals(BigInt(9000), (bactor !! Balance(acc)).get)
println("----------- SIZE 6 " + (bactor !! LogSize).get)
bactor !! Debit(acc, 8000, failer)
println("----------- SIZE 7 " + (bactor !! LogSize).get)
assertEquals(BigInt(1000), (bactor !! Balance(acc)).get)
println("----------- SIZE 8 " + (bactor !! LogSize).get)
assert(7 === (bactor !! LogSize).get) // Not counting the failed transaction => 7
import scala.collection.mutable.ArrayBuffer
assert((bactor !! Log(0, 7)).get.asInstanceOf[ArrayBuffer[String]].size == 7)
assert((bactor !! Log(0, 0)).get.asInstanceOf[ArrayBuffer[String]].size == 0)
assert((bactor !! Log(1, 2)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
assert((bactor !! Log(6, 7)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
assert((bactor !! Log(0, 1)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
val c: Int = (bactor !! LogSize).get
assertTrue(7 == c)
}
/**
@Test
def testUnsuccessfulDebit {
val bactor = actorOf(new AccountActor)
def testUnsuccessfulDebit = {
val bactor = actorOf[AccountActor]
bactor.start
bactor !! Credit("a-123", 5000)
assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get)
val failer = actorOf(new PersistentFailerActor)
val failer = actorOf[PersistentFailerActor]
failer.start
try {
bactor !! Debit("a-123", 7000, failer)
@ -162,19 +134,19 @@ class RedisPersistentActorSpec extends JUnitSuite {
assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get)
// should not count the failed one
// val c: Int = (bactor !! LogSize).get
// assertTrue(3 == c)
val c: Int = (bactor !! LogSize).get
assertTrue(3 == c)
}
@Test
def testUnsuccessfulMultiDebit {
val bactor = actorOf(new AccountActor)
def testUnsuccessfulMultiDebit = {
val bactor = actorOf[AccountActor]
bactor.start
bactor !! Credit("a-123", 5000)
assertEquals(BigInt(5000), (bactor !! (Balance("a-123"), 5000)).get)
val failer = actorOf(new PersistentFailerActor)
val failer = actorOf[PersistentFailerActor]
failer.start
try {
bactor !! MultiDebit("a-123", List(500, 2000, 1000, 3000), failer)
@ -184,8 +156,7 @@ class RedisPersistentActorSpec extends JUnitSuite {
assertEquals(BigInt(5000), (bactor !! (Balance("a-123"), 5000)).get)
// should not count the failed one
// val c: Int = (bactor !! LogSize).get
// assertTrue(3 == c)
val c: Int = (bactor !! LogSize).get
assertTrue(3 == c)
}
**/
}

View file

@ -8,7 +8,7 @@
<log>
filename = "./logs/akka.log"
roll = "daily" # Options: never, hourly, daily, sunday/monday/...
level = "trace" # Options: fatal, critical, error, warning, info, debug, trace
level = "debug" # Options: fatal, critical, error, warning, info, debug, trace
console = on
# syslog_host = ""
# syslog_server_name = ""