2009-06-29 15:01:20 +02:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009 Scalable Solutions.
|
|
|
|
|
*/
|
|
|
|
|
|
2009-09-02 09:10:21 +02:00
|
|
|
package se.scalablesolutions.akka.stm
|
2009-06-29 15:01:20 +02:00
|
|
|
|
|
|
|
|
import java.util.concurrent.atomic.AtomicBoolean
|
|
|
|
|
|
2009-09-02 09:10:21 +02:00
|
|
|
import reactor.MessageInvocation
|
|
|
|
|
import util.Logging
|
2009-07-06 23:45:15 +02:00
|
|
|
import org.codehaus.aspectwerkz.proxy.Uuid // FIXME is java.util.UUID better?
|
|
|
|
|
|
2009-08-15 22:44:29 +02:00
|
|
|
import org.multiverse.utils.TransactionThreadLocal._
|
|
|
|
|
|
2009-07-06 23:45:15 +02:00
|
|
|
class TransactionRollbackException(msg: String) extends RuntimeException(msg)
|
2009-06-29 15:01:20 +02:00
|
|
|
|
|
|
|
|
class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
|
|
|
|
|
override def toString(): String = "TransactionAwareWrapperException[" + cause + ", " + tx + "]"
|
|
|
|
|
}
|
|
|
|
|
|
2009-06-29 17:33:38 +02:00
|
|
|
object TransactionManagement {
|
2009-09-02 09:10:21 +02:00
|
|
|
import Config._
|
|
|
|
|
val TIME_WAITING_FOR_COMPLETION = config.getInt("akka.stm.wait-for-completion", 100)
|
|
|
|
|
val NR_OF_TIMES_WAITING_FOR_COMPLETION = config.getInt("akka.stm.wait-nr-of-times", 3)
|
2009-09-30 20:08:55 +02:00
|
|
|
val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", false))
|
2009-07-06 23:45:15 +02:00
|
|
|
// FIXME reenable 'akka.stm.restart-on-collision' when new STM is in place
|
2009-09-02 09:10:21 +02:00
|
|
|
val RESTART_TRANSACTION_ON_COLLISION = false //akka.Kernel.config.getBool("akka.stm.restart-on-collision", true)
|
2009-06-29 17:33:38 +02:00
|
|
|
|
2009-07-04 12:06:07 +02:00
|
|
|
def isTransactionalityEnabled = TRANSACTION_ENABLED.get
|
|
|
|
|
def disableTransactions = TRANSACTION_ENABLED.set(false)
|
2009-06-29 15:01:20 +02:00
|
|
|
|
2009-09-02 09:10:21 +02:00
|
|
|
private[akka] val threadBoundTx: ThreadLocal[Option[Transaction]] = new ThreadLocal[Option[Transaction]]() {
|
2009-07-01 15:29:06 +02:00
|
|
|
override protected def initialValue: Option[Transaction] = None
|
2009-06-29 15:01:20 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
trait TransactionManagement extends Logging {
|
2009-07-12 23:08:17 +02:00
|
|
|
var uuid = Uuid.newUuid.toString
|
2009-07-06 23:45:15 +02:00
|
|
|
|
|
|
|
|
protected[this] var latestMessage: Option[MessageInvocation] = None
|
|
|
|
|
protected[this] var messageToReschedule: Option[MessageInvocation] = None
|
2009-06-29 15:01:20 +02:00
|
|
|
|
|
|
|
|
import TransactionManagement.threadBoundTx
|
2009-09-02 09:10:21 +02:00
|
|
|
private[akka] var activeTx: Option[Transaction] = None
|
2009-06-29 15:01:20 +02:00
|
|
|
|
2009-07-04 12:06:07 +02:00
|
|
|
protected def startNewTransaction: Option[Transaction] = {
|
2009-06-29 15:01:20 +02:00
|
|
|
val newTx = new Transaction
|
2009-07-02 18:07:29 +02:00
|
|
|
newTx.begin(uuid)
|
2009-06-29 15:01:20 +02:00
|
|
|
val tx = Some(newTx)
|
|
|
|
|
activeTx = tx
|
|
|
|
|
threadBoundTx.set(tx)
|
2009-08-15 22:44:29 +02:00
|
|
|
setThreadLocalTransaction(tx.get.transaction)
|
2009-07-04 12:06:07 +02:00
|
|
|
tx
|
2009-06-29 15:01:20 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected def joinExistingTransaction = {
|
|
|
|
|
val cflowTx = threadBoundTx.get
|
2009-06-29 23:38:10 +02:00
|
|
|
if (!activeTx.isDefined && cflowTx.isDefined) {
|
2009-06-29 15:01:20 +02:00
|
|
|
val currentTx = cflowTx.get
|
2009-07-02 18:07:29 +02:00
|
|
|
currentTx.join(uuid)
|
2009-06-29 15:01:20 +02:00
|
|
|
activeTx = Some(currentTx)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2009-07-02 18:07:29 +02:00
|
|
|
protected def tryToPrecommitTransaction = if (activeTx.isDefined) activeTx.get.precommit(uuid)
|
2009-06-29 15:01:20 +02:00
|
|
|
|
|
|
|
|
protected def tryToCommitTransaction: Boolean = if (activeTx.isDefined) {
|
|
|
|
|
val tx = activeTx.get
|
2009-07-04 12:06:07 +02:00
|
|
|
if (tx.commit(uuid)) {
|
|
|
|
|
removeTransactionIfTopLevel
|
|
|
|
|
true
|
|
|
|
|
} else false
|
|
|
|
|
} else true
|
2009-06-29 15:01:20 +02:00
|
|
|
|
|
|
|
|
protected def rollback(tx: Option[Transaction]) = tx match {
|
|
|
|
|
case None => {} // no tx; nothing to do
|
|
|
|
|
case Some(tx) =>
|
2009-07-02 18:07:29 +02:00
|
|
|
tx.rollback(uuid)
|
2009-06-29 15:01:20 +02:00
|
|
|
}
|
|
|
|
|
|
2009-07-04 12:06:07 +02:00
|
|
|
protected def rollbackForRescheduling(tx: Option[Transaction]) = tx match {
|
|
|
|
|
case None => {} // no tx; nothing to do
|
|
|
|
|
case Some(tx) =>
|
|
|
|
|
tx.rollbackForRescheduling(uuid)
|
|
|
|
|
}
|
|
|
|
|
|
2009-07-06 23:45:15 +02:00
|
|
|
protected def handleCollision = {
|
|
|
|
|
var nrRetries = 0
|
|
|
|
|
var failed = true
|
|
|
|
|
do {
|
|
|
|
|
Thread.sleep(TransactionManagement.TIME_WAITING_FOR_COMPLETION)
|
|
|
|
|
nrRetries += 1
|
2009-09-24 10:56:51 +02:00
|
|
|
log.debug("Pending transaction [%s] not completed, waiting %s milliseconds. Attempt %s", activeTx.get.id, TransactionManagement.TIME_WAITING_FOR_COMPLETION, nrRetries)
|
2009-07-06 23:45:15 +02:00
|
|
|
failed = !tryToCommitTransaction
|
|
|
|
|
} while(nrRetries < TransactionManagement.NR_OF_TIMES_WAITING_FOR_COMPLETION && failed)
|
|
|
|
|
if (failed) {
|
2009-09-24 10:56:51 +02:00
|
|
|
log.debug("Pending transaction [%s] still not completed, aborting and rescheduling message [%s]", activeTx.get.id, latestMessage)
|
2009-07-06 23:45:15 +02:00
|
|
|
rollback(activeTx)
|
|
|
|
|
if (TransactionManagement.RESTART_TRANSACTION_ON_COLLISION) messageToReschedule = Some(latestMessage.get)
|
|
|
|
|
else throw new TransactionRollbackException("Conflicting transactions, rolling back transaction for message [" + latestMessage + "]")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2009-07-04 12:06:07 +02:00
|
|
|
protected def isInExistingTransaction = TransactionManagement.threadBoundTx.get.isDefined
|
2009-06-29 15:01:20 +02:00
|
|
|
|
2009-09-17 09:47:22 +02:00
|
|
|
protected def isTransactionTopLevel = activeTx.isDefined && activeTx.get.isTopLevel
|
|
|
|
|
|
2009-06-29 15:01:20 +02:00
|
|
|
protected def isTransactionAborted = activeTx.isDefined && activeTx.get.isAborted
|
|
|
|
|
|
|
|
|
|
protected def incrementTransaction = if (activeTx.isDefined) activeTx.get.increment
|
|
|
|
|
|
|
|
|
|
protected def decrementTransaction = if (activeTx.isDefined) activeTx.get.decrement
|
|
|
|
|
|
2009-09-24 10:56:51 +02:00
|
|
|
protected def removeTransactionIfTopLevel = if (isTransactionTopLevel) activeTx = None
|
2009-06-29 15:01:20 +02:00
|
|
|
|
|
|
|
|
protected def reenteringExistingTransaction= if (activeTx.isDefined) {
|
|
|
|
|
val cflowTx = threadBoundTx.get
|
|
|
|
|
if (cflowTx.isDefined && cflowTx.get.id == activeTx.get.id) false
|
|
|
|
|
else true
|
|
|
|
|
} else true
|
|
|
|
|
}
|
|
|
|
|
|