pekko/akka-actors/src/main/scala/stm/TransactionManagement.scala

102 lines
3.6 KiB
Scala
Raw Normal View History

/**
* Copyright (C) 2009 Scalable Solutions.
*/
2009-09-02 09:10:21 +02:00
package se.scalablesolutions.akka.stm
import java.util.concurrent.atomic.AtomicBoolean
2009-10-06 00:07:27 +02:00
import se.scalablesolutions.akka.reactor.MessageInvocation
import se.scalablesolutions.akka.util.Logging
import org.codehaus.aspectwerkz.proxy.Uuid
import scala.collection.mutable.HashSet
// FIXME is java.util.UUID better?
import org.multiverse.utils.TransactionThreadLocal._
class TransactionRollbackException(msg: String) extends RuntimeException(msg)
class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
override def toString(): String = "TransactionAwareWrapperException[" + cause + ", " + tx + "]"
}
object TransactionManagement {
2009-10-06 00:07:27 +02:00
import se.scalablesolutions.akka.Config._
2009-09-02 09:10:21 +02:00
val TIME_WAITING_FOR_COMPLETION = config.getInt("akka.stm.wait-for-completion", 100)
val NR_OF_TIMES_WAITING_FOR_COMPLETION = config.getInt("akka.stm.wait-nr-of-times", 3)
2009-10-06 00:07:27 +02:00
val MAX_NR_OF_RETRIES = config.getInt("akka.stm.max-nr-of-retries", 10)
2009-09-30 20:08:55 +02:00
val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", false))
// FIXME reenable 'akka.stm.restart-on-collision' when new STM is in place
2009-09-02 09:10:21 +02:00
val RESTART_TRANSACTION_ON_COLLISION = false //akka.Kernel.config.getBool("akka.stm.restart-on-collision", true)
def isTransactionalityEnabled = TRANSACTION_ENABLED.get
def disableTransactions = TRANSACTION_ENABLED.set(false)
2009-10-06 00:07:27 +02:00
private[akka] val currentTransaction: ThreadLocal[Option[Transaction]] = new ThreadLocal[Option[Transaction]]() {
override protected def initialValue: Option[Transaction] = None
}
}
trait TransactionManagement extends Logging {
var uuid = Uuid.newUuid.toString
2009-10-06 00:07:27 +02:00
import TransactionManagement.currentTransaction
private[akka] val activeTransactions = new HashSet[Transaction]
2009-10-06 00:07:27 +02:00
protected def startNewTransaction(message: MessageInvocation) = {
val newTx = new Transaction
2009-10-06 00:07:27 +02:00
newTx.begin(uuid, message)
activeTransactions += newTx
currentTransaction.set(Some(newTx))
setThreadLocalTransaction(newTx.transaction)
}
protected def joinExistingTransaction = {
2009-10-06 00:07:27 +02:00
val cflowTx = currentTransaction.get
if (activeTransactions.isEmpty && cflowTx.isDefined) {
val currentTx = cflowTx.get
currentTx.join(uuid)
2009-10-06 00:07:27 +02:00
activeTransactions += currentTx
}
}
protected def tryToPrecommitTransactions = activeTransactions.foreach(_.precommit(uuid))
protected def tryToCommitTransactions = {
for (tx <- activeTransactions) {
if (tx.commit(uuid)) activeTransactions -= tx
else if (tx.isTopLevel) {
println("------------ COULD NOT COMMIT -- WAITING OR TIMEOUT? ---------")
//tx.retry
} else {
// continue, try to commit on next received message
// FIXME check if TX hase timed out => throw exception
}
}
}
2009-10-06 00:07:27 +02:00
protected def rollback(tx: Option[Transaction]) = tx match {
case None => {} // no tx; nothing to do
case Some(tx) =>
tx.rollback(uuid)
}
protected def rollbackForRescheduling(tx: Option[Transaction]) = tx match {
case None => {} // no tx; nothing to do
case Some(tx) =>
tx.rollbackForRescheduling(uuid)
}
2009-10-06 00:07:27 +02:00
protected def isInExistingTransaction = currentTransaction.get.isDefined
2009-10-06 00:07:27 +02:00
protected def incrementTransaction = if (currentTransaction.get.isDefined) currentTransaction.get.get.increment
2009-10-06 00:07:27 +02:00
protected def decrementTransaction = if (currentTransaction.get.isDefined) currentTransaction.get.get.decrement
2009-10-06 00:07:27 +02:00
protected def removeTransactionIfTopLevel(tx: Transaction) = if (tx.isTopLevel) { activeTransactions -= tx }
}