Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Viktor Klang 2011-03-23 11:31:50 +01:00
commit 532a34684e
9 changed files with 18 additions and 284 deletions

View file

@ -67,7 +67,7 @@ object World {
lazy val ants = setup
lazy val evaporator = actorOf[Evaporator].start
private val snapshotFactory = TransactionFactory(readonly = true, familyName = "snapshot", hooks = false)
private val snapshotFactory = TransactionFactory(readonly = true, familyName = "snapshot")
def snapshot = atomic(snapshotFactory) { Array.tabulate(Dim, Dim)(place(_, _).opt) }
@ -138,7 +138,7 @@ class AntActor(initLoc: (Int, Int)) extends WorldActor {
val locRef = Ref(initLoc)
val name = "ant-from-" + initLoc._1 + "-" + initLoc._2
implicit val txFactory = TransactionFactory(familyName = name, hooks = false)
implicit val txFactory = TransactionFactory(familyName = name)
val homing = (p: Place) => p.pher + (100 * (if (p.home) 0 else 1))
val foraging = (p: Place) => p.pher + p.food
@ -210,7 +210,7 @@ class Evaporator extends WorldActor {
import Config._
import World._
implicit val txFactory = TransactionFactory(familyName = "evaporator", hooks = false)
implicit val txFactory = TransactionFactory(familyName = "evaporator")
val evaporate = (pher: Float) => pher * EvapRate
def act = for (x <- 0 until Dim; y <- 0 until Dim) {

View file

@ -8,6 +8,13 @@ import akka.actor.{newUuid, Uuid}
import org.multiverse.transactional.refs.BasicRef
/**
* Common trait for all the transactional objects.
*/
@serializable trait Transactional {
val uuid: String
}
/**
* Transactional managed reference. See the companion class for more information.
*/

View file

@ -48,10 +48,7 @@ trait Stm {
def atomic[T](factory: TransactionFactory)(body: => T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = {
factory.addHooks
body
}
def call(mtx: MultiverseTransaction): T = body
})
}
}

View file

@ -1,252 +0,0 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.stm
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.HashMap
import akka.util.ReflectiveAccess
import akka.config.Config._
import akka.config.ModuleNotAvailableException
import akka.AkkaException
import org.multiverse.api.{Transaction => MultiverseTransaction}
import org.multiverse.api.lifecycle.{TransactionLifecycleListener, TransactionLifecycleEvent}
import org.multiverse.api.ThreadLocalTransaction._
import org.multiverse.api.{PropagationLevel => MultiversePropagationLevel}
import org.multiverse.api.{TraceLevel => MultiverseTraceLevel}
class NoTransactionInScopeException extends AkkaException("No transaction in scope")
class TransactionRetryException(message: String) extends AkkaException(message)
class StmConfigurationException(message: String) extends AkkaException(message)
/**
* Internal helper methods for managing Akka-specific transaction.
*/
object TransactionManagement extends TransactionManagement {
private[akka] val transaction = new ThreadLocal[Option[Transaction]]() {
override protected def initialValue: Option[Transaction] = None
}
private[akka] def getTransaction: Transaction = {
val option = transaction.get
if ((option eq null) || option.isEmpty) throw new StmConfigurationException("No Transaction in scope")
option.get
}
}
/**
* Internal helper methods for managing Akka-specific transaction.
*/
trait TransactionManagement {
private[akka] def setTransaction(tx: Option[Transaction]) =
if (tx.isDefined) TransactionManagement.transaction.set(tx)
private[akka] def clearTransaction = {
TransactionManagement.transaction.set(None)
setThreadLocalTransaction(null)
}
private[akka] def getTransactionInScope = TransactionManagement.getTransaction
private[akka] def isTransactionInScope = {
val option = TransactionManagement.transaction.get
(option ne null) && option.isDefined
}
}
object Transaction {
val idFactory = new AtomicLong(-1L)
/**
* Attach an Akka-specific Transaction to the current Multiverse transaction.
* Must be called within a Multiverse transaction. Used by TransactionFactory.addHooks
*/
private[akka] def attach = {
val mtx = getRequiredThreadLocalTransaction
val tx = new Transaction
tx.begin
tx.transaction = Some(mtx)
TransactionManagement.transaction.set(Some(tx))
mtx.registerLifecycleListener(new TransactionLifecycleListener() {
def notify(mtx: MultiverseTransaction, event: TransactionLifecycleEvent) = event match {
case TransactionLifecycleEvent.PostCommit => tx.commitJta
case TransactionLifecycleEvent.PreCommit => tx.commitPersistentState
case TransactionLifecycleEvent.PostAbort => tx.abort
case _ => {}
}
})
}
}
/**
* The Akka-specific Transaction class.
* For integration with persistence modules and JTA support.
*/
@serializable class Transaction {
val JTA_AWARE = config.getBool("akka.stm.jta-aware", false)
val STATE_RETRIES = config.getInt("akka.storage.max-retries",10)
val id = Transaction.idFactory.incrementAndGet
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
private[akka] var transaction: Option[MultiverseTransaction] = None
private[this] val persistentStateMap = new HashMap[String, Committable with Abortable]
private[akka] val depth = new AtomicInteger(0)
val jta: Option[ReflectiveJtaModule.TransactionContainer] =
if (JTA_AWARE) Some(ReflectiveJtaModule.createTransactionContainer)
else None
// --- public methods ---------
def begin = synchronized {
jta.foreach { _.beginWithStmSynchronization(this) }
}
def commitPersistentState = synchronized {
retry(STATE_RETRIES){
persistentStateMap.valuesIterator.foreach(_.commit)
persistentStateMap.clear
}
status = TransactionStatus.Completed
}
def commitJta = synchronized {
jta.foreach(_.commit)
}
def abort = synchronized {
jta.foreach(_.rollback)
persistentStateMap.valuesIterator.foreach(_.abort)
persistentStateMap.clear
}
def retry(tries:Int)(block: => Unit):Unit={
if(tries==0){
throw new TransactionRetryException("Exhausted Retries while committing persistent state")
}
try{
block
} catch{
case e:Exception=>{
retry(tries-1){block}
}
}
}
def isNew = synchronized { status == TransactionStatus.New }
def isActive = synchronized { status == TransactionStatus.Active }
def isCompleted = synchronized { status == TransactionStatus.Completed }
def isAborted = synchronized { status == TransactionStatus.Aborted }
// --- internal methods ---------
//private def isJtaTxActive(status: Int) = status == Status.STATUS_ACTIVE
private[akka] def status_? = status
private[akka] def increment = depth.incrementAndGet
private[akka] def decrement = depth.decrementAndGet
private[akka] def isTopLevel = depth.get == 0
//when calling this method, make sure to prefix the uuid with the type so you
//have no possibility of kicking a diffferent type with the same uuid out of a transction
private[akka] def register(uuid: String, storage: Committable with Abortable) = {
if(persistentStateMap.getOrElseUpdate(uuid, {storage}) ne storage){
throw new IllegalStateException("attempted to register an instance of persistent data structure for id [%s] when there is already a different instance registered".format(uuid))
}
}
private def ensureIsActive = if (status != TransactionStatus.Active)
throw new StmConfigurationException(
"Expected ACTIVE transaction - current status [" + status + "]: " + toString)
private def ensureIsActiveOrAborted =
if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted))
throw new StmConfigurationException(
"Expected ACTIVE or ABORTED transaction - current status [" + status + "]: " + toString)
private def ensureIsActiveOrNew =
if (!(status == TransactionStatus.Active || status == TransactionStatus.New))
throw new StmConfigurationException(
"Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString)
override def equals(that: Any): Boolean = synchronized {
that.isInstanceOf[Transaction] &&
that.asInstanceOf[Transaction].id == this.id
}
override def hashCode: Int = synchronized { id.toInt }
override def toString = synchronized { "Transaction[" + id + ", " + status + "]" }
}
@serializable sealed abstract class TransactionStatus
object TransactionStatus {
case object New extends TransactionStatus
case object Active extends TransactionStatus
case object Aborted extends TransactionStatus
case object Completed extends TransactionStatus
}
/**
* Common trait for all the transactional objects:
* Ref, TransactionalMap, TransactionalVector,
* PersistentRef, PersistentMap, PersistentVector, PersistentQueue, PersistentSortedSet
*/
@serializable trait Transactional {
val uuid: String
}
/**
* Used for integration with the persistence modules.
*/
trait Committable {
def commit(): Unit
}
/**
* Used for integration with the persistence modules.
*/
trait Abortable {
def abort(): Unit
}
/**
* Used internally for reflective access to the JTA module.
* Allows JTA integration to work when akka-jta.jar is on the classpath.
*/
object ReflectiveJtaModule {
type TransactionContainerObject = {
def apply(): TransactionContainer
}
type TransactionContainer = {
def beginWithStmSynchronization(transaction: Transaction): Unit
def commit: Unit
def rollback: Unit
}
lazy val isJtaEnabled = transactionContainerObjectInstance.isDefined
def ensureJtaEnabled = if (!isJtaEnabled) throw new ModuleNotAvailableException(
"Can't load the JTA module, make sure that akka-jta.jar is on the classpath")
val transactionContainerObjectInstance: Option[TransactionContainerObject] =
ReflectiveAccess.getObjectFor("akka.jta.TransactionContainer$")
def createTransactionContainer: TransactionContainer = {
ensureJtaEnabled
transactionContainerObjectInstance.get.apply.asInstanceOf[TransactionContainer]
}
}

View file

@ -32,7 +32,6 @@ object TransactionConfig {
val QUICK_RELEASE = config.getBool("akka.stm.quick-release", true)
val PROPAGATION = propagation(config.getString("akka.stm.propagation", "requires"))
val TRACE_LEVEL = traceLevel(config.getString("akka.stm.trace-level", "none"))
val HOOKS = config.getBool("akka.stm.hooks", true)
val DefaultTimeout = Duration(TIMEOUT, TIME_UNIT)
@ -65,7 +64,6 @@ object TransactionConfig {
* @param quickRelease Whether locks should be released as quickly as possible (before whole commit).
* @param propagation For controlling how nested transactions behave.
* @param traceLevel Transaction trace level.
* @param hooks Whether hooks for persistence modules and JTA should be added to the transaction.
*/
def apply(familyName: String = FAMILY_NAME,
readonly: JBoolean = READONLY,
@ -78,10 +76,9 @@ object TransactionConfig {
speculative: Boolean = SPECULATIVE,
quickRelease: Boolean = QUICK_RELEASE,
propagation: MPropagation = PROPAGATION,
traceLevel: MTraceLevel = TRACE_LEVEL,
hooks: Boolean = HOOKS) = {
traceLevel: MTraceLevel = TRACE_LEVEL) = {
new TransactionConfig(familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed,
interruptible, speculative, quickRelease, propagation, traceLevel, hooks)
interruptible, speculative, quickRelease, propagation, traceLevel)
}
}
@ -100,7 +97,6 @@ object TransactionConfig {
* <p>quickRelease - Whether locks should be released as quickly as possible (before whole commit).
* <p>propagation - For controlling how nested transactions behave.
* <p>traceLevel - Transaction trace level.
* <p>hooks - Whether hooks for persistence modules and JTA should be added to the transaction.
*/
class TransactionConfig(val familyName: String = TransactionConfig.FAMILY_NAME,
val readonly: JBoolean = TransactionConfig.READONLY,
@ -113,8 +109,7 @@ class TransactionConfig(val familyName: String = TransactionConfig.FAMILY
val speculative: Boolean = TransactionConfig.SPECULATIVE,
val quickRelease: Boolean = TransactionConfig.QUICK_RELEASE,
val propagation: MPropagation = TransactionConfig.PROPAGATION,
val traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL,
val hooks: Boolean = TransactionConfig.HOOKS)
val traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL)
object DefaultTransactionConfig extends TransactionConfig
@ -137,11 +132,10 @@ object TransactionFactory {
speculative: Boolean = TransactionConfig.SPECULATIVE,
quickRelease: Boolean = TransactionConfig.QUICK_RELEASE,
propagation: MPropagation = TransactionConfig.PROPAGATION,
traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL,
hooks: Boolean = TransactionConfig.HOOKS) = {
traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL) = {
val config = new TransactionConfig(
familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed,
interruptible, speculative, quickRelease, propagation, traceLevel, hooks)
interruptible, speculative, quickRelease, propagation, traceLevel)
new TransactionFactory(config)
}
}
@ -199,8 +193,6 @@ class TransactionFactory(
}
val boilerplate = new TransactionBoilerplate(factory)
def addHooks = if (config.hooks) Transaction.attach
}
/**

View file

@ -27,7 +27,6 @@ class TransactionConfigBuilder {
var quickRelease: Boolean = TransactionConfig.QUICK_RELEASE
var propagation: MPropagation = TransactionConfig.PROPAGATION
var traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL
var hooks: Boolean = TransactionConfig.HOOKS
def setFamilyName(familyName: String) = { this.familyName = familyName; this }
def setReadonly(readonly: JBoolean) = { this.readonly = readonly; this }
@ -41,11 +40,10 @@ class TransactionConfigBuilder {
def setQuickRelease(quickRelease: Boolean) = { this.quickRelease = quickRelease; this }
def setPropagation(propagation: MPropagation) = { this.propagation = propagation; this }
def setTraceLevel(traceLevel: MTraceLevel) = { this.traceLevel = traceLevel; this }
def setHooks(hooks: Boolean) = { this.hooks = hooks; this }
def build() = new TransactionConfig(
familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed,
interruptible, speculative, quickRelease, propagation, traceLevel, hooks)
interruptible, speculative, quickRelease, propagation, traceLevel)
}
/**
@ -64,7 +62,6 @@ class TransactionFactoryBuilder {
var quickRelease: Boolean = TransactionConfig.QUICK_RELEASE
var propagation: MPropagation = TransactionConfig.PROPAGATION
var traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL
var hooks: Boolean = TransactionConfig.HOOKS
def setFamilyName(familyName: String) = { this.familyName = familyName; this }
def setReadonly(readonly: JBoolean) = { this.readonly = readonly; this }
@ -78,12 +75,11 @@ class TransactionFactoryBuilder {
def setQuickRelease(quickRelease: Boolean) = { this.quickRelease = quickRelease; this }
def setPropagation(propagation: MPropagation) = { this.propagation = propagation; this }
def setTraceLevel(traceLevel: MTraceLevel) = { this.traceLevel = traceLevel; this }
def setHooks(hooks: Boolean) = { this.hooks = hooks; this }
def build() = {
val config = new TransactionConfig(
familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed,
interruptible, speculative, quickRelease, propagation, traceLevel, hooks)
interruptible, speculative, quickRelease, propagation, traceLevel)
new TransactionFactory(config)
}
}

View file

@ -129,7 +129,6 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) {
def atomic[T](factory: TransactionFactory)(body: => T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = {
factory.addHooks
val result = body
val timeout = factory.config.timeout
barrier.tryJoinCommit(mtx, timeout.length, timeout.unit)

View file

@ -20,9 +20,7 @@ class ConfigSpec extends WordSpec with MustMatchers {
getBool("akka.stm.blocking-allowed") must equal(Some(false))
getBool("akka.stm.fair") must equal(Some(true))
getBool("akka.stm.hooks") must equal(Some(true))
getBool("akka.stm.interruptible") must equal(Some(false))
getBool("akka.stm.jta-aware") must equal(Some(false))
getInt("akka.stm.max-retries") must equal(Some(1000))
getString("akka.stm.propagation") must equal(Some("requires"))
getBool("akka.stm.quick-release") must equal(Some(true))

View file

@ -71,9 +71,6 @@ akka {
quick-release = true
propagation = "requires"
trace-level = "none"
hooks = true
jta-aware = off # Option 'on' means that if there JTA Transaction Manager available then the STM will
# begin (or join), commit or rollback the JTA transaction. Default is 'off'.
}
jta {