fixed a bunch of persistence bugs
This commit is contained in:
parent
5b8b46d21c
commit
059502b463
25 changed files with 674 additions and 572 deletions
|
|
@ -6,9 +6,14 @@ package se.scalablesolutions.akka.stm
|
|||
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
import reactor.MessageInvocation
|
||||
import util.Logging
|
||||
import org.codehaus.aspectwerkz.proxy.Uuid // FIXME is java.util.UUID better?
|
||||
import se.scalablesolutions.akka.reactor.MessageInvocation
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
|
||||
import org.codehaus.aspectwerkz.proxy.Uuid
|
||||
|
||||
import scala.collection.mutable.HashSet
|
||||
|
||||
// FIXME is java.util.UUID better?
|
||||
|
||||
import org.multiverse.utils.TransactionThreadLocal._
|
||||
|
||||
|
|
@ -19,9 +24,10 @@ class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Tran
|
|||
}
|
||||
|
||||
object TransactionManagement {
|
||||
import Config._
|
||||
import se.scalablesolutions.akka.Config._
|
||||
val TIME_WAITING_FOR_COMPLETION = config.getInt("akka.stm.wait-for-completion", 100)
|
||||
val NR_OF_TIMES_WAITING_FOR_COMPLETION = config.getInt("akka.stm.wait-nr-of-times", 3)
|
||||
val MAX_NR_OF_RETRIES = config.getInt("akka.stm.max-nr-of-retries", 10)
|
||||
val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", false))
|
||||
// FIXME reenable 'akka.stm.restart-on-collision' when new STM is in place
|
||||
val RESTART_TRANSACTION_ON_COLLISION = false //akka.Kernel.config.getBool("akka.stm.restart-on-collision", true)
|
||||
|
|
@ -29,7 +35,7 @@ object TransactionManagement {
|
|||
def isTransactionalityEnabled = TRANSACTION_ENABLED.get
|
||||
def disableTransactions = TRANSACTION_ENABLED.set(false)
|
||||
|
||||
private[akka] val threadBoundTx: ThreadLocal[Option[Transaction]] = new ThreadLocal[Option[Transaction]]() {
|
||||
private[akka] val currentTransaction: ThreadLocal[Option[Transaction]] = new ThreadLocal[Option[Transaction]]() {
|
||||
override protected def initialValue: Option[Transaction] = None
|
||||
}
|
||||
}
|
||||
|
|
@ -37,41 +43,41 @@ object TransactionManagement {
|
|||
trait TransactionManagement extends Logging {
|
||||
var uuid = Uuid.newUuid.toString
|
||||
|
||||
protected[this] var latestMessage: Option[MessageInvocation] = None
|
||||
protected[this] var messageToReschedule: Option[MessageInvocation] = None
|
||||
import TransactionManagement.currentTransaction
|
||||
private[akka] val activeTransactions = new HashSet[Transaction]
|
||||
|
||||
import TransactionManagement.threadBoundTx
|
||||
private[akka] var activeTx: Option[Transaction] = None
|
||||
|
||||
protected def startNewTransaction: Option[Transaction] = {
|
||||
protected def startNewTransaction(message: MessageInvocation) = {
|
||||
val newTx = new Transaction
|
||||
newTx.begin(uuid)
|
||||
val tx = Some(newTx)
|
||||
activeTx = tx
|
||||
threadBoundTx.set(tx)
|
||||
setThreadLocalTransaction(tx.get.transaction)
|
||||
tx
|
||||
newTx.begin(uuid, message)
|
||||
activeTransactions += newTx
|
||||
currentTransaction.set(Some(newTx))
|
||||
setThreadLocalTransaction(newTx.transaction)
|
||||
}
|
||||
|
||||
protected def joinExistingTransaction = {
|
||||
val cflowTx = threadBoundTx.get
|
||||
if (!activeTx.isDefined && cflowTx.isDefined) {
|
||||
val cflowTx = currentTransaction.get
|
||||
if (activeTransactions.isEmpty && cflowTx.isDefined) {
|
||||
val currentTx = cflowTx.get
|
||||
currentTx.join(uuid)
|
||||
activeTx = Some(currentTx)
|
||||
activeTransactions += currentTx
|
||||
}
|
||||
}
|
||||
|
||||
protected def tryToPrecommitTransactions = activeTransactions.foreach(_.precommit(uuid))
|
||||
|
||||
protected def tryToCommitTransactions = {
|
||||
for (tx <- activeTransactions) {
|
||||
if (tx.commit(uuid)) activeTransactions -= tx
|
||||
else if (tx.isTopLevel) {
|
||||
println("------------ COULD NOT COMMIT -- WAITING OR TIMEOUT? ---------")
|
||||
//tx.retry
|
||||
} else {
|
||||
// continue, try to commit on next received message
|
||||
// FIXME check if TX hase timed out => throw exception
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected def tryToPrecommitTransaction = if (activeTx.isDefined) activeTx.get.precommit(uuid)
|
||||
|
||||
protected def tryToCommitTransaction: Boolean = if (activeTx.isDefined) {
|
||||
val tx = activeTx.get
|
||||
if (tx.commit(uuid)) {
|
||||
removeTransactionIfTopLevel
|
||||
true
|
||||
} else false
|
||||
} else true
|
||||
|
||||
|
||||
protected def rollback(tx: Option[Transaction]) = tx match {
|
||||
case None => {} // no tx; nothing to do
|
||||
case Some(tx) =>
|
||||
|
|
@ -84,39 +90,12 @@ trait TransactionManagement extends Logging {
|
|||
tx.rollbackForRescheduling(uuid)
|
||||
}
|
||||
|
||||
protected def handleCollision = {
|
||||
var nrRetries = 0
|
||||
var failed = true
|
||||
do {
|
||||
Thread.sleep(TransactionManagement.TIME_WAITING_FOR_COMPLETION)
|
||||
nrRetries += 1
|
||||
log.debug("Pending transaction [%s] not completed, waiting %s milliseconds. Attempt %s", activeTx.get.id, TransactionManagement.TIME_WAITING_FOR_COMPLETION, nrRetries)
|
||||
failed = !tryToCommitTransaction
|
||||
} while(nrRetries < TransactionManagement.NR_OF_TIMES_WAITING_FOR_COMPLETION && failed)
|
||||
if (failed) {
|
||||
log.debug("Pending transaction [%s] still not completed, aborting and rescheduling message [%s]", activeTx.get.id, latestMessage)
|
||||
rollback(activeTx)
|
||||
if (TransactionManagement.RESTART_TRANSACTION_ON_COLLISION) messageToReschedule = Some(latestMessage.get)
|
||||
else throw new TransactionRollbackException("Conflicting transactions, rolling back transaction for message [" + latestMessage + "]")
|
||||
}
|
||||
}
|
||||
protected def isInExistingTransaction = currentTransaction.get.isDefined
|
||||
|
||||
protected def isInExistingTransaction = TransactionManagement.threadBoundTx.get.isDefined
|
||||
protected def incrementTransaction = if (currentTransaction.get.isDefined) currentTransaction.get.get.increment
|
||||
|
||||
protected def isTransactionTopLevel = activeTx.isDefined && activeTx.get.isTopLevel
|
||||
|
||||
protected def isTransactionAborted = activeTx.isDefined && activeTx.get.isAborted
|
||||
protected def decrementTransaction = if (currentTransaction.get.isDefined) currentTransaction.get.get.decrement
|
||||
|
||||
protected def incrementTransaction = if (activeTx.isDefined) activeTx.get.increment
|
||||
|
||||
protected def decrementTransaction = if (activeTx.isDefined) activeTx.get.decrement
|
||||
|
||||
protected def removeTransactionIfTopLevel = if (isTransactionTopLevel) activeTx = None
|
||||
|
||||
protected def reenteringExistingTransaction= if (activeTx.isDefined) {
|
||||
val cflowTx = threadBoundTx.get
|
||||
if (cflowTx.isDefined && cflowTx.get.id == activeTx.get.id) false
|
||||
else true
|
||||
} else true
|
||||
protected def removeTransactionIfTopLevel(tx: Transaction) = if (tx.isTopLevel) { activeTransactions -= tx }
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue