upgraded to multiverse 0.4-SNAPSHOT
This commit is contained in:
parent
93f2fe0c35
commit
87f66d0c4f
8 changed files with 136 additions and 462 deletions
|
|
@ -46,6 +46,34 @@
|
||||||
<artifactId>netty</artifactId>
|
<artifactId>netty</artifactId>
|
||||||
<version>3.2.0.ALPHA3</version>
|
<version>3.2.0.ALPHA3</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.multiverse</groupId>
|
||||||
|
<artifactId>multiverse-alpha</artifactId>
|
||||||
|
<version>0.4-SNAPSHOT</version>
|
||||||
|
<classifier>jar-with-dependencies</classifier>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>org.multiverse</groupId>
|
||||||
|
<artifactId>multiverse-core</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>asm</groupId>
|
||||||
|
<artifactId>asm-tree</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>asm</groupId>
|
||||||
|
<artifactId>asm-analysis</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>asm</groupId>
|
||||||
|
<artifactId>asm-commons</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>asm</groupId>
|
||||||
|
<artifactId>asm-util</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.scala-tools</groupId>
|
<groupId>org.scala-tools</groupId>
|
||||||
<artifactId>javautils</artifactId>
|
<artifactId>javautils</artifactId>
|
||||||
|
|
|
||||||
|
|
@ -911,10 +911,10 @@ trait Actor extends TransactionManagement {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (isTransactionRequiresNew && !isTransactionInScope) {
|
if (isTransactionRequiresNew && !isTransactionInScope) {
|
||||||
if (senderFuture.isEmpty) throw new StmException(
|
//if (senderFuture.isEmpty) throw new StmException(
|
||||||
"Can't continue transaction in a one-way fire-forget message send" +
|
// "Can't continue transaction in a one-way fire-forget message send" +
|
||||||
"\n\tE.g. using Actor '!' method or Active Object 'void' method" +
|
// "\n\tE.g. using Actor '!' method or Active Object 'void' method" +
|
||||||
"\n\tPlease use the Actor '!!' method or Active Object method with non-void return type")
|
// "\n\tPlease use the Actor '!!' method or Active Object method with non-void return type")
|
||||||
atomic {
|
atomic {
|
||||||
proceed
|
proceed
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -13,9 +13,10 @@ import se.scalablesolutions.akka.util.Logging
|
||||||
import org.multiverse.api.{Transaction => MultiverseTransaction}
|
import org.multiverse.api.{Transaction => MultiverseTransaction}
|
||||||
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
|
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
|
||||||
import org.multiverse.api.ThreadLocalTransaction._
|
import org.multiverse.api.ThreadLocalTransaction._
|
||||||
import org.multiverse.templates.OrElseTemplate
|
import org.multiverse.commitbarriers.VetoCommitBarrier
|
||||||
|
|
||||||
import scala.collection.mutable.HashMap
|
import scala.collection.mutable.HashMap
|
||||||
|
import org.multiverse.templates.{TransactionTemplate, OrElseTemplate}
|
||||||
|
|
||||||
class NoTransactionInScopeException extends RuntimeException
|
class NoTransactionInScopeException extends RuntimeException
|
||||||
class TransactionRetryException(message: String) extends RuntimeException(message)
|
class TransactionRetryException(message: String) extends RuntimeException(message)
|
||||||
|
|
@ -30,8 +31,8 @@ class TransactionRetryException(message: String) extends RuntimeException(messag
|
||||||
* Here are some examples (assuming implicit transaction family name in scope):
|
* Here are some examples (assuming implicit transaction family name in scope):
|
||||||
* <pre>
|
* <pre>
|
||||||
* import se.scalablesolutions.akka.stm.Transaction._
|
* import se.scalablesolutions.akka.stm.Transaction._
|
||||||
*
|
*
|
||||||
* atomic {
|
* atomic {
|
||||||
* .. // do something within a transaction
|
* .. // do something within a transaction
|
||||||
* }
|
* }
|
||||||
* </pre>
|
* </pre>
|
||||||
|
|
@ -39,8 +40,8 @@ class TransactionRetryException(message: String) extends RuntimeException(messag
|
||||||
* Example of atomic transaction management using atomic block with retry count:
|
* Example of atomic transaction management using atomic block with retry count:
|
||||||
* <pre>
|
* <pre>
|
||||||
* import se.scalablesolutions.akka.stm.Transaction._
|
* import se.scalablesolutions.akka.stm.Transaction._
|
||||||
*
|
*
|
||||||
* atomic(maxNrOfRetries) {
|
* atomic(maxNrOfRetries) {
|
||||||
* .. // do something within a transaction
|
* .. // do something within a transaction
|
||||||
* }
|
* }
|
||||||
* </pre>
|
* </pre>
|
||||||
|
|
@ -49,10 +50,10 @@ class TransactionRetryException(message: String) extends RuntimeException(messag
|
||||||
* Which is a good way to reduce contention and transaction collisions.
|
* Which is a good way to reduce contention and transaction collisions.
|
||||||
* <pre>
|
* <pre>
|
||||||
* import se.scalablesolutions.akka.stm.Transaction._
|
* import se.scalablesolutions.akka.stm.Transaction._
|
||||||
*
|
*
|
||||||
* atomically {
|
* atomically {
|
||||||
* .. // try to do something
|
* .. // try to do something
|
||||||
* } orElse {
|
* } orElse {
|
||||||
* .. // if transaction clashes try do do something else to minimize contention
|
* .. // if transaction clashes try do do something else to minimize contention
|
||||||
* }
|
* }
|
||||||
* </pre>
|
* </pre>
|
||||||
|
|
@ -61,11 +62,11 @@ class TransactionRetryException(message: String) extends RuntimeException(messag
|
||||||
*
|
*
|
||||||
* <pre>
|
* <pre>
|
||||||
* import se.scalablesolutions.akka.stm.Transaction._
|
* import se.scalablesolutions.akka.stm.Transaction._
|
||||||
* for (tx <- Transaction) {
|
* for (tx <- Transaction) {
|
||||||
* ... // do transactional stuff
|
* ... // do transactional stuff
|
||||||
* }
|
* }
|
||||||
*
|
*
|
||||||
* val result = for (tx <- Transaction) yield {
|
* val result = for (tx <- Transaction) yield {
|
||||||
* ... // do transactional stuff yielding a result
|
* ... // do transactional stuff yielding a result
|
||||||
* }
|
* }
|
||||||
* </pre>
|
* </pre>
|
||||||
|
|
@ -78,17 +79,17 @@ class TransactionRetryException(message: String) extends RuntimeException(messag
|
||||||
*
|
*
|
||||||
* // You can use them together with Transaction in a for comprehension since
|
* // You can use them together with Transaction in a for comprehension since
|
||||||
* // TransactionalRef is also monadic
|
* // TransactionalRef is also monadic
|
||||||
* for {
|
* for {
|
||||||
* tx <- Transaction
|
* tx <- Transaction
|
||||||
* ref <- refs
|
* ref <- refs
|
||||||
* } {
|
* } {
|
||||||
* ... // use the ref inside a transaction
|
* ... // use the ref inside a transaction
|
||||||
* }
|
* }
|
||||||
*
|
*
|
||||||
* val result = for {
|
* val result = for {
|
||||||
* tx <- Transaction
|
* tx <- Transaction
|
||||||
* ref <- refs
|
* ref <- refs
|
||||||
* } yield {
|
* } yield {
|
||||||
* ... // use the ref inside a transaction, yield a result
|
* ... // use the ref inside a transaction, yield a result
|
||||||
* }
|
* }
|
||||||
* </pre>
|
* </pre>
|
||||||
|
|
@ -101,57 +102,61 @@ object Transaction extends TransactionManagement {
|
||||||
/**
|
/**
|
||||||
* See ScalaDoc on class.
|
* See ScalaDoc on class.
|
||||||
*/
|
*/
|
||||||
def map[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic { f(getTransactionInScope) }
|
def map[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic {f(getTransactionInScope)}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* See ScalaDoc on class.
|
* See ScalaDoc on class.
|
||||||
*/
|
*/
|
||||||
def flatMap[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic { f(getTransactionInScope) }
|
def flatMap[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic {f(getTransactionInScope)}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* See ScalaDoc on class.
|
* See ScalaDoc on class.
|
||||||
*/
|
*/
|
||||||
def foreach(f: Transaction => Unit)(implicit transactionFamilyName: String): Unit = atomic { f(getTransactionInScope) }
|
def foreach(f: Transaction => Unit)(implicit transactionFamilyName: String): Unit = atomic {f(getTransactionInScope)}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a "pure" STM atomic transaction and by-passes all transactions hooks
|
* Creates a "pure" STM atomic transaction and by-passes all transactions hooks
|
||||||
* such as persistence etc.
|
* such as persistence etc.
|
||||||
* Only for internal usage.
|
* Only for internal usage.
|
||||||
*/
|
*/
|
||||||
private[akka] def pureAtomic[T](body: => T): T = new AtomicTemplate[T](
|
private[akka] def pureAtomic[T](body: => T): T = new TransactionTemplate[T]() {
|
||||||
getGlobalStmInstance, "internal", false, false, TransactionManagement.MAX_NR_OF_RETRIES) {
|
|
||||||
def execute(mtx: MultiverseTransaction): T = body
|
def execute(mtx: MultiverseTransaction): T = body
|
||||||
}.execute()
|
}.execute()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* See ScalaDoc on class.
|
* See ScalaDoc on class.
|
||||||
*/
|
*/
|
||||||
def atomic[T](body: => T)(implicit transactionFamilyName: String): T = new AtomicTemplate[T](
|
def atomic[T](body: => T)(implicit transactionFamilyName: String): T = {
|
||||||
getGlobalStmInstance, transactionFamilyName, false, false, TransactionManagement.MAX_NR_OF_RETRIES) {
|
new TransactionTemplate[T]() { // FIXME take factory
|
||||||
def execute(mtx: MultiverseTransaction): T = body
|
def execute(mtx: MultiverseTransaction): T = body
|
||||||
override def postStart(mtx: MultiverseTransaction) = {
|
|
||||||
val tx = new Transaction
|
override def onStart(mtx: MultiverseTransaction) = {
|
||||||
tx.transaction = Some(mtx)
|
val tx = new Transaction
|
||||||
setTransaction(Some(tx))
|
tx.transaction = Some(mtx)
|
||||||
}
|
setTransaction(Some(tx))
|
||||||
override def postCommit = {
|
}
|
||||||
if (isTransactionInScope) getTransactionInScope.commit
|
|
||||||
else throw new IllegalStateException("No transaction in scope")
|
override def onPostCommit = {
|
||||||
}
|
if (isTransactionInScope) getTransactionInScope.commit
|
||||||
}.execute()
|
else throw new IllegalStateException("No transaction in scope")
|
||||||
|
}
|
||||||
|
}.execute()
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* See ScalaDoc on class.
|
* See ScalaDoc on class.
|
||||||
*/
|
*/
|
||||||
def atomic[T](retryCount: Int)(body: => T)(implicit transactionFamilyName: String): T = {
|
def atomic[T](retryCount: Int)(body: => T)(implicit transactionFamilyName: String): T = {
|
||||||
new AtomicTemplate[T](getGlobalStmInstance, transactionFamilyName, false, false, retryCount) {
|
new TransactionTemplate[T]() { // FIXME take factory
|
||||||
def execute(mtx: MultiverseTransaction): T = body
|
def execute(mtx: MultiverseTransaction): T = body
|
||||||
override def postStart(mtx: MultiverseTransaction) = {
|
|
||||||
|
override def onStart(mtx: MultiverseTransaction) = {
|
||||||
val tx = new Transaction
|
val tx = new Transaction
|
||||||
tx.transaction = Some(mtx)
|
tx.transaction = Some(mtx)
|
||||||
setTransaction(Some(tx))
|
setTransaction(Some(tx))
|
||||||
}
|
}
|
||||||
override def postCommit = {
|
|
||||||
|
override def onPostCommit = {
|
||||||
if (isTransactionInScope) getTransactionInScope.commit
|
if (isTransactionInScope) getTransactionInScope.commit
|
||||||
else throw new IllegalStateException("No transaction in scope")
|
else throw new IllegalStateException("No transaction in scope")
|
||||||
}
|
}
|
||||||
|
|
@ -162,14 +167,16 @@ object Transaction extends TransactionManagement {
|
||||||
* See ScalaDoc on class.
|
* See ScalaDoc on class.
|
||||||
*/
|
*/
|
||||||
def atomicReadOnly[T](retryCount: Int)(body: => T)(implicit transactionFamilyName: String): T = {
|
def atomicReadOnly[T](retryCount: Int)(body: => T)(implicit transactionFamilyName: String): T = {
|
||||||
new AtomicTemplate[T](getGlobalStmInstance, transactionFamilyName, false, true, retryCount) {
|
new TransactionTemplate[T]() { // FIXME take factory
|
||||||
def execute(mtx: MultiverseTransaction): T = body
|
def execute(mtx: MultiverseTransaction): T = body
|
||||||
override def postStart(mtx: MultiverseTransaction) = {
|
|
||||||
|
override def onStart(mtx: MultiverseTransaction) = {
|
||||||
val tx = new Transaction
|
val tx = new Transaction
|
||||||
tx.transaction = Some(mtx)
|
tx.transaction = Some(mtx)
|
||||||
setTransaction(Some(tx))
|
setTransaction(Some(tx))
|
||||||
}
|
}
|
||||||
override def postCommit = {
|
|
||||||
|
override def onPostCommit = {
|
||||||
if (isTransactionInScope) getTransactionInScope.commit
|
if (isTransactionInScope) getTransactionInScope.commit
|
||||||
else throw new IllegalStateException("No transaction in scope")
|
else throw new IllegalStateException("No transaction in scope")
|
||||||
}
|
}
|
||||||
|
|
@ -180,14 +187,16 @@ object Transaction extends TransactionManagement {
|
||||||
* See ScalaDoc on class.
|
* See ScalaDoc on class.
|
||||||
*/
|
*/
|
||||||
def atomicReadOnly[T](body: => T): T = {
|
def atomicReadOnly[T](body: => T): T = {
|
||||||
new AtomicTemplate[T](true) {
|
new TransactionTemplate[T]() { // FIXME take factory
|
||||||
def execute(mtx: MultiverseTransaction): T = body
|
def execute(mtx: MultiverseTransaction): T = body
|
||||||
override def postStart(mtx: MultiverseTransaction) = {
|
|
||||||
|
override def onStart(mtx: MultiverseTransaction) = {
|
||||||
val tx = new Transaction
|
val tx = new Transaction
|
||||||
tx.transaction = Some(mtx)
|
tx.transaction = Some(mtx)
|
||||||
setTransaction(Some(tx))
|
setTransaction(Some(tx))
|
||||||
}
|
}
|
||||||
override def postCommit = {
|
|
||||||
|
override def onPostCommit = {
|
||||||
if (isTransactionInScope) getTransactionInScope.commit
|
if (isTransactionInScope) getTransactionInScope.commit
|
||||||
else throw new IllegalStateException("No transaction in scope")
|
else throw new IllegalStateException("No transaction in scope")
|
||||||
}
|
}
|
||||||
|
|
@ -206,6 +215,7 @@ object Transaction extends TransactionManagement {
|
||||||
def elseBody[A](firstBody: => A) = new {
|
def elseBody[A](firstBody: => A) = new {
|
||||||
def orElse(secondBody: => A) = new OrElseTemplate[A] {
|
def orElse(secondBody: => A) = new OrElseTemplate[A] {
|
||||||
def run(t: MultiverseTransaction) = firstBody
|
def run(t: MultiverseTransaction) = firstBody
|
||||||
|
|
||||||
def orelserun(t: MultiverseTransaction) = secondBody
|
def orelserun(t: MultiverseTransaction) = secondBody
|
||||||
}.execute()
|
}.execute()
|
||||||
}
|
}
|
||||||
|
|
@ -216,30 +226,38 @@ object Transaction extends TransactionManagement {
|
||||||
*/
|
*/
|
||||||
@serializable class Transaction extends Logging {
|
@serializable class Transaction extends Logging {
|
||||||
import Transaction._
|
import Transaction._
|
||||||
|
|
||||||
val id = Transaction.idFactory.incrementAndGet
|
val id = Transaction.idFactory.incrementAndGet
|
||||||
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
|
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
|
||||||
private[akka] var transaction: Option[MultiverseTransaction] = None
|
private[akka] var transaction: Option[MultiverseTransaction] = None
|
||||||
private[this] val persistentStateMap = new HashMap[String, Committable]
|
private[this] val persistentStateMap = new HashMap[String, Committable]
|
||||||
private[akka] val depth = new AtomicInteger(0)
|
private[akka] val depth = new AtomicInteger(0)
|
||||||
|
//private[akka] val transactionSet = new VetoCommitBarrier
|
||||||
|
|
||||||
|
//transactionSet.vetoCommit(this)
|
||||||
|
|
||||||
// --- public methods ---------
|
// --- public methods ---------
|
||||||
|
|
||||||
|
def abort = synchronized {
|
||||||
|
// transactionSet.abort
|
||||||
|
}
|
||||||
|
|
||||||
def commit = synchronized {
|
def commit = synchronized {
|
||||||
pureAtomic {
|
pureAtomic {
|
||||||
persistentStateMap.values.foreach(_.commit)
|
persistentStateMap.values.foreach(_.commit)
|
||||||
TransactionManagement.clearTransaction
|
TransactionManagement.clearTransaction
|
||||||
}
|
}
|
||||||
|
//transactionSet.vetoCommit(this)
|
||||||
status = TransactionStatus.Completed
|
status = TransactionStatus.Completed
|
||||||
}
|
}
|
||||||
|
|
||||||
def isNew = synchronized { status == TransactionStatus.New }
|
def isNew = synchronized {status == TransactionStatus.New}
|
||||||
|
|
||||||
def isActive = synchronized { status == TransactionStatus.Active }
|
def isActive = synchronized {status == TransactionStatus.Active}
|
||||||
|
|
||||||
def isCompleted = synchronized { status == TransactionStatus.Completed }
|
def isCompleted = synchronized {status == TransactionStatus.Completed}
|
||||||
|
|
||||||
def isAborted = synchronized { status == TransactionStatus.Aborted }
|
def isAborted = synchronized {status == TransactionStatus.Aborted}
|
||||||
|
|
||||||
// --- internal methods ---------
|
// --- internal methods ---------
|
||||||
|
|
||||||
|
|
@ -259,13 +277,13 @@ object Transaction extends TransactionManagement {
|
||||||
|
|
||||||
private def ensureIsActiveOrAborted =
|
private def ensureIsActiveOrAborted =
|
||||||
if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted))
|
if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted))
|
||||||
throw new IllegalStateException(
|
throw new IllegalStateException(
|
||||||
"Expected ACTIVE or ABORTED transaction - current status [" + status + "]: " + toString)
|
"Expected ACTIVE or ABORTED transaction - current status [" + status + "]: " + toString)
|
||||||
|
|
||||||
private def ensureIsActiveOrNew =
|
private def ensureIsActiveOrNew =
|
||||||
if (!(status == TransactionStatus.Active || status == TransactionStatus.New))
|
if (!(status == TransactionStatus.Active || status == TransactionStatus.New))
|
||||||
throw new IllegalStateException(
|
throw new IllegalStateException(
|
||||||
"Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString)
|
"Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString)
|
||||||
|
|
||||||
// For reinitialize transaction after sending it over the wire
|
// For reinitialize transaction after sending it over the wire
|
||||||
private[akka] def reinit = synchronized {
|
private[akka] def reinit = synchronized {
|
||||||
|
|
@ -277,14 +295,14 @@ object Transaction extends TransactionManagement {
|
||||||
}
|
}
|
||||||
|
|
||||||
override def equals(that: Any): Boolean = synchronized {
|
override def equals(that: Any): Boolean = synchronized {
|
||||||
that != null &&
|
that != null &&
|
||||||
that.isInstanceOf[Transaction] &&
|
that.isInstanceOf[Transaction] &&
|
||||||
that.asInstanceOf[Transaction].id == this.id
|
that.asInstanceOf[Transaction].id == this.id
|
||||||
}
|
}
|
||||||
|
|
||||||
override def hashCode(): Int = synchronized { id.toInt }
|
override def hashCode(): Int = synchronized {id.toInt}
|
||||||
|
|
||||||
override def toString(): String = synchronized { "Transaction[" + id + ", " + status + "]" }
|
override def toString(): String = synchronized {"Transaction[" + id + ", " + status + "]"}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,7 @@ object TransactionManagement extends TransactionManagement {
|
||||||
def isTransactionalityEnabled = TRANSACTION_ENABLED.get
|
def isTransactionalityEnabled = TRANSACTION_ENABLED.get
|
||||||
def disableTransactions = TRANSACTION_ENABLED.set(false)
|
def disableTransactions = TRANSACTION_ENABLED.set(false)
|
||||||
|
|
||||||
private[akka] val currentTransaction: ThreadLocal[Option[Transaction]] = new ThreadLocal[Option[Transaction]]() {
|
private[akka] val currentTransaction = new ThreadLocal[Option[Transaction]]() {
|
||||||
override protected def initialValue: Option[Transaction] = None
|
override protected def initialValue: Option[Transaction] = None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -52,6 +52,8 @@ trait TransactionManagement extends Logging {
|
||||||
|
|
||||||
private[akka] def isTransactionInScope = currentTransaction.get.isDefined
|
private[akka] def isTransactionInScope = currentTransaction.get.isDefined
|
||||||
|
|
||||||
|
private[akka] def isTransactionTopLevel = if (isTransactionInScope) getTransactionInScope.isTopLevel
|
||||||
|
|
||||||
private[akka] def incrementTransaction = if (isTransactionInScope) getTransactionInScope.increment
|
private[akka] def incrementTransaction = if (isTransactionInScope) getTransactionInScope.increment
|
||||||
|
|
||||||
private[akka] def decrementTransaction = if (isTransactionInScope) getTransactionInScope.decrement
|
private[akka] def decrementTransaction = if (isTransactionInScope) getTransactionInScope.decrement
|
||||||
|
|
|
||||||
|
|
@ -8,8 +8,7 @@ import se.scalablesolutions.akka.stm.Transaction.atomic
|
||||||
import se.scalablesolutions.akka.stm.NoTransactionInScopeException
|
import se.scalablesolutions.akka.stm.NoTransactionInScopeException
|
||||||
import se.scalablesolutions.akka.collection._
|
import se.scalablesolutions.akka.collection._
|
||||||
import se.scalablesolutions.akka.util.UUID
|
import se.scalablesolutions.akka.util.UUID
|
||||||
|
import org.multiverse.stms.alpha.AlphaRef
|
||||||
import org.multiverse.datastructures.refs.manual.Ref;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Example Scala usage:
|
* Example Scala usage:
|
||||||
|
|
@ -78,7 +77,7 @@ class TransactionalRef[T] extends Transactional {
|
||||||
implicit val txInitName = "TransactionalRef:Init"
|
implicit val txInitName = "TransactionalRef:Init"
|
||||||
val uuid = UUID.newUuid.toString
|
val uuid = UUID.newUuid.toString
|
||||||
|
|
||||||
private[this] val ref: Ref[T] = atomic { new Ref }
|
private[this] val ref: AlphaRef[T] = atomic { new AlphaRef }
|
||||||
|
|
||||||
def swap(elem: T) = {
|
def swap(elem: T) = {
|
||||||
ensureIsInTransaction
|
ensureIsInTransaction
|
||||||
|
|
|
||||||
|
|
@ -24,34 +24,6 @@
|
||||||
<artifactId>protobuf-java</artifactId>
|
<artifactId>protobuf-java</artifactId>
|
||||||
<version>2.2.0</version>
|
<version>2.2.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.multiverse</groupId>
|
|
||||||
<artifactId>multiverse-alpha</artifactId>
|
|
||||||
<version>0.4-SNAPSHOT</version>
|
|
||||||
<classifier>jar-with-dependencies</classifier>
|
|
||||||
<exclusions>
|
|
||||||
<exclusion>
|
|
||||||
<groupId>org.multiverse</groupId>
|
|
||||||
<artifactId>multiverse-core</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
<exclusion>
|
|
||||||
<groupId>asm</groupId>
|
|
||||||
<artifactId>asm-tree</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
<exclusion>
|
|
||||||
<groupId>asm</groupId>
|
|
||||||
<artifactId>asm-analysis</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
<exclusion>
|
|
||||||
<groupId>asm</groupId>
|
|
||||||
<artifactId>asm-commons</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
<exclusion>
|
|
||||||
<groupId>asm</groupId>
|
|
||||||
<artifactId>asm-util</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
</exclusions>
|
|
||||||
</dependency>
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|
|
||||||
|
|
@ -1,341 +0,0 @@
|
||||||
package se.scalablesolutions.akka.stm;
|
|
||||||
|
|
||||||
import static org.multiverse.api.GlobalStmInstance.getGlobalStmInstance;
|
|
||||||
import org.multiverse.api.Stm;
|
|
||||||
import static org.multiverse.api.ThreadLocalTransaction.getThreadLocalTransaction;
|
|
||||||
import static org.multiverse.api.ThreadLocalTransaction.setThreadLocalTransaction;
|
|
||||||
import org.multiverse.api.Transaction;
|
|
||||||
import org.multiverse.api.TransactionStatus;
|
|
||||||
import org.multiverse.api.exceptions.CommitFailureException;
|
|
||||||
import org.multiverse.api.exceptions.LoadException;
|
|
||||||
import org.multiverse.api.exceptions.RetryError;
|
|
||||||
import org.multiverse.api.exceptions.TooManyRetriesException;
|
|
||||||
import org.multiverse.templates.AbortedException;
|
|
||||||
import org.multiverse.utils.latches.CheapLatch;
|
|
||||||
import org.multiverse.utils.latches.Latch;
|
|
||||||
|
|
||||||
import static java.lang.String.format;
|
|
||||||
import java.util.logging.Logger;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A Template that handles the boilerplate code for transactions. A transaction will be placed if none is available
|
|
||||||
* around a section and if all goes right, commits at the end.
|
|
||||||
* <p/>
|
|
||||||
* example:
|
|
||||||
* <pre>
|
|
||||||
* new AtomicTemplate(){
|
|
||||||
* Object execute(Transaction t){
|
|
||||||
* queue.push(1);
|
|
||||||
* return null;
|
|
||||||
* }
|
|
||||||
* }.execute();
|
|
||||||
* </pre>
|
|
||||||
* <p/>
|
|
||||||
* It could also be that the transaction is retried (e.g. caused by optimistic locking failures). This is also a task
|
|
||||||
* for template. In the future this retry behavior will be customizable.
|
|
||||||
* <p/>
|
|
||||||
* If a transaction already is available on the TransactionThreadLocal, no new transaction is started and essentially
|
|
||||||
* the whole AtomicTemplate is ignored.
|
|
||||||
* <p/>
|
|
||||||
* If no transaction is available on the TransactionThreadLocal, a new one will be created and used during the execution
|
|
||||||
* of the AtomicTemplate and will be removed once the AtomicTemplate finishes.
|
|
||||||
* <p/>
|
|
||||||
* All uncaught throwable's lead to a rollback of the transaction.
|
|
||||||
* <p/>
|
|
||||||
* AtomicTemplates are not thread-safe to use.
|
|
||||||
* <p/>
|
|
||||||
* AtomicTemplates can completely work without threadlocals. See the {@link AtomicTemplate#AtomicTemplate(org.multiverse.api.Stm
|
|
||||||
* ,String, boolean, boolean, int)} for more information.
|
|
||||||
*
|
|
||||||
* @author Peter Veentjer
|
|
||||||
*/
|
|
||||||
public abstract class AtomicTemplate<E> {
|
|
||||||
|
|
||||||
private final static Logger logger = Logger.getLogger(AtomicTemplate.class.getName());
|
|
||||||
|
|
||||||
private final Stm stm;
|
|
||||||
private final boolean ignoreThreadLocalTransaction;
|
|
||||||
private final int retryCount;
|
|
||||||
private final boolean readonly;
|
|
||||||
private int attemptCount;
|
|
||||||
private final String familyName;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new AtomicTemplate that uses the STM stored in the GlobalStm and works the the {@link
|
|
||||||
* org.multiverse.utils.ThreadLocalTransaction}.
|
|
||||||
*/
|
|
||||||
public AtomicTemplate() {
|
|
||||||
this(getGlobalStmInstance());
|
|
||||||
}
|
|
||||||
|
|
||||||
public AtomicTemplate(boolean readonly) {
|
|
||||||
this(getGlobalStmInstance(), null, false, readonly, Integer.MAX_VALUE);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new AtomicTemplate using the provided stm. The transaction used is stores/retrieved from the {@link
|
|
||||||
* org.multiverse.utils.ThreadLocalTransaction}.
|
|
||||||
*
|
|
||||||
* @param stm the stm to use for transactions.
|
|
||||||
* @throws NullPointerException if stm is null.
|
|
||||||
*/
|
|
||||||
public AtomicTemplate(Stm stm) {
|
|
||||||
this(stm, null, false, false, Integer.MAX_VALUE);
|
|
||||||
}
|
|
||||||
|
|
||||||
public AtomicTemplate(String familyName, boolean readonly, int retryCount) {
|
|
||||||
this(getGlobalStmInstance(), familyName, false, readonly, retryCount);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new AtomicTemplate that uses the provided STM. This method is provided to make Multiverse easy to
|
|
||||||
* integrate with environment that don't want to depend on threadlocals.
|
|
||||||
*
|
|
||||||
* @param stm the stm to use for transactions.
|
|
||||||
* @param ignoreThreadLocalTransaction true if this Template should completely ignore the ThreadLocalTransaction.
|
|
||||||
* This is useful for using the AtomicTemplate in other environments that don't
|
|
||||||
* want to depend on threadlocals but do want to use the AtomicTemplate.
|
|
||||||
* @throws NullPointerException if stm is null.
|
|
||||||
*/
|
|
||||||
public AtomicTemplate(Stm stm, String familyName, boolean ignoreThreadLocalTransaction, boolean readonly,
|
|
||||||
int retryCount) {
|
|
||||||
if (stm == null) {
|
|
||||||
throw new NullPointerException();
|
|
||||||
}
|
|
||||||
if (retryCount < 0) {
|
|
||||||
throw new IllegalArgumentException();
|
|
||||||
}
|
|
||||||
this.stm = stm;
|
|
||||||
this.ignoreThreadLocalTransaction = ignoreThreadLocalTransaction;
|
|
||||||
this.readonly = readonly;
|
|
||||||
this.retryCount = retryCount;
|
|
||||||
this.familyName = familyName;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getFamilyName() {
|
|
||||||
return familyName;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the current attempt. Value will always be larger than zero and increases everytime the transaction needs
|
|
||||||
* to be retried.
|
|
||||||
*
|
|
||||||
* @return the current attempt count.
|
|
||||||
*/
|
|
||||||
public final int getAttemptCount() {
|
|
||||||
return attemptCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the number of retries that this AtomicTemplate is allowed to do. The returned value will always be equal
|
|
||||||
* or larger than 0.
|
|
||||||
*
|
|
||||||
* @return the number of retries.
|
|
||||||
*/
|
|
||||||
public final int getRetryCount() {
|
|
||||||
return retryCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the {@link Stm} used by this AtomicTemplate to execute transactions on.
|
|
||||||
*
|
|
||||||
* @return the Stm used by this AtomicTemplate.
|
|
||||||
*/
|
|
||||||
public final Stm getStm() {
|
|
||||||
return stm;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if this AtomicTemplate ignores the ThreadLocalTransaction.
|
|
||||||
*
|
|
||||||
* @return true if this AtomicTemplate ignores the ThreadLocalTransaction, false otherwise.
|
|
||||||
*/
|
|
||||||
public final boolean isIgnoreThreadLocalTransaction() {
|
|
||||||
return ignoreThreadLocalTransaction;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Checks if this AtomicTemplate executes readonly transactions.
|
|
||||||
*
|
|
||||||
* @return true if it executes readonly transactions, false otherwise.
|
|
||||||
*/
|
|
||||||
public final boolean isReadonly() {
|
|
||||||
return readonly;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This is the method can be overridden to do pre-start tasks.
|
|
||||||
*/
|
|
||||||
public void preStart() {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This is the method can be overridden to do post-start tasks.
|
|
||||||
*
|
|
||||||
* @param t the transaction used for this execution.
|
|
||||||
*/
|
|
||||||
public void postStart(Transaction t) {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This is the method can be overridden to do pre-commit tasks.
|
|
||||||
*/
|
|
||||||
public void preCommit() {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This is the method can be overridden to do post-commit tasks.
|
|
||||||
*/
|
|
||||||
public void postCommit() {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This is the method that needs to be implemented.
|
|
||||||
*
|
|
||||||
* @param t the transaction used for this execution.
|
|
||||||
* @return the result of the execution.
|
|
||||||
*
|
|
||||||
* @throws Exception the Exception thrown
|
|
||||||
*/
|
|
||||||
public abstract E execute(Transaction t) throws Exception;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Executes the template.
|
|
||||||
*
|
|
||||||
* @return the result of the {@link #execute(org.multiverse.api.Transaction)} method.
|
|
||||||
*
|
|
||||||
* @throws InvisibleCheckedException if a checked exception was thrown while executing the {@link
|
|
||||||
* #execute(org.multiverse.api.Transaction)} method.
|
|
||||||
* @throws AbortedException if the exception was explicitly aborted.
|
|
||||||
* @throws TooManyRetriesException if the template retried the transaction too many times. The cause of the last
|
|
||||||
* failure (also an exception) is included as cause. So you have some idea where
|
|
||||||
* to look for problems
|
|
||||||
*/
|
|
||||||
public final E execute() {
|
|
||||||
try {
|
|
||||||
return executeChecked();
|
|
||||||
} catch (Exception ex) {
|
|
||||||
if (ex instanceof RuntimeException) {
|
|
||||||
throw (RuntimeException) ex;
|
|
||||||
} else {
|
|
||||||
throw new AtomicTemplate.InvisibleCheckedException(ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Executes the Template and rethrows the checked exception instead of wrapping it in a InvisibleCheckedException.
|
|
||||||
*
|
|
||||||
* @return the result
|
|
||||||
*
|
|
||||||
* @throws Exception the Exception thrown inside the {@link #execute(org.multiverse.api.Transaction)}
|
|
||||||
* method.
|
|
||||||
* @throws AbortedException if the exception was explicitly aborted.
|
|
||||||
* @throws TooManyRetriesException if the template retried the transaction too many times. The cause of the last
|
|
||||||
* failure (also an exception) is included as cause. So you have some idea where to
|
|
||||||
* look for problems
|
|
||||||
*/
|
|
||||||
public final E executeChecked() throws Exception {
|
|
||||||
preStart();
|
|
||||||
Transaction t = getTransaction();
|
|
||||||
if (noUsableTransaction(t)) {
|
|
||||||
t = startTransaction();
|
|
||||||
setTransaction(t);
|
|
||||||
postStart(t);
|
|
||||||
try {
|
|
||||||
attemptCount = 1;
|
|
||||||
Exception lastRetryCause = null;
|
|
||||||
while (attemptCount - 1 <= retryCount) {
|
|
||||||
boolean abort = true;
|
|
||||||
boolean reset = false;
|
|
||||||
try {
|
|
||||||
E result = execute(t);
|
|
||||||
if (t.getStatus().equals(TransactionStatus.aborted)) {
|
|
||||||
String msg = format("Transaction with familyname %s is aborted", t.getFamilyName());
|
|
||||||
throw new AbortedException(msg);
|
|
||||||
}
|
|
||||||
preCommit();
|
|
||||||
t.commit();
|
|
||||||
abort = false;
|
|
||||||
reset = false;
|
|
||||||
postCommit();
|
|
||||||
return result;
|
|
||||||
} catch (RetryError e) {
|
|
||||||
Latch latch = new CheapLatch();
|
|
||||||
t.abortAndRegisterRetryLatch(latch);
|
|
||||||
latch.awaitUninterruptible();
|
|
||||||
//since the abort is already done, no need to do it again.
|
|
||||||
abort = false;
|
|
||||||
} catch (CommitFailureException ex) {
|
|
||||||
lastRetryCause = ex;
|
|
||||||
reset = true;
|
|
||||||
//ignore, just retry the transaction
|
|
||||||
} catch (LoadException ex) {
|
|
||||||
lastRetryCause = ex;
|
|
||||||
reset = true;
|
|
||||||
//ignore, just retry the transaction
|
|
||||||
} finally {
|
|
||||||
if (abort) {
|
|
||||||
t.abort();
|
|
||||||
if (reset) {
|
|
||||||
t = t.abortAndReturnRestarted();
|
|
||||||
setTransaction(t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
attemptCount++;
|
|
||||||
}
|
|
||||||
|
|
||||||
throw new TooManyRetriesException("Too many retries", lastRetryCause);
|
|
||||||
} finally {
|
|
||||||
setTransaction(null);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return execute(t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Transaction startTransaction() {
|
|
||||||
return readonly ? stm.startReadOnlyTransaction(familyName) : stm.startUpdateTransaction(familyName);
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean noUsableTransaction(Transaction t) {
|
|
||||||
return t == null || t.getStatus() != TransactionStatus.active;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets the current Transaction stored in the TransactionThreadLocal.
|
|
||||||
* <p/>
|
|
||||||
* If the ignoreThreadLocalTransaction is set, the threadlocal stuff is completeley ignored.
|
|
||||||
*
|
|
||||||
* @return the found transaction, or null if none is found.
|
|
||||||
*/
|
|
||||||
private Transaction getTransaction() {
|
|
||||||
return ignoreThreadLocalTransaction ? null : getThreadLocalTransaction();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Stores the transaction in the TransactionThreadLocal.
|
|
||||||
* <p/>
|
|
||||||
* This call is ignored if the ignoreThreadLocalTransaction is true.
|
|
||||||
*
|
|
||||||
* @param t the transaction to set (is allowed to be null).
|
|
||||||
*/
|
|
||||||
private void setTransaction(Transaction t) {
|
|
||||||
if (!ignoreThreadLocalTransaction) {
|
|
||||||
setThreadLocalTransaction(t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class InvisibleCheckedException extends RuntimeException {
|
|
||||||
|
|
||||||
public InvisibleCheckedException(Exception cause) {
|
|
||||||
super(cause);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Exception getCause() {
|
|
||||||
return (Exception) super.getCause();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
52
pom.xml
52
pom.xml
|
|
@ -14,25 +14,27 @@
|
||||||
|
|
||||||
<description>
|
<description>
|
||||||
Akka implements a unique hybrid of:
|
Akka implements a unique hybrid of:
|
||||||
* Actors , which gives you:
|
* Actors , which gives you:
|
||||||
* Simple and high-level abstractions for concurrency and parallelism.
|
* Simple and high-level abstractions for concurrency and parallelism.
|
||||||
* Asynchronous, non-blocking and highly performant event-driven programming model.
|
* Asynchronous, non-blocking and highly performant event-driven programming model.
|
||||||
* Very lightweight event-driven processes (create ~6.5 million actors on 4 G RAM).
|
* Very lightweight event-driven processes (create ~6.5 million actors on 4 G RAM).
|
||||||
* Supervision hierarchies with let-it-crash semantics. For writing highly fault-tolerant systems that never stop, systems that self-heal.
|
* Supervision hierarchies with let-it-crash semantics. For writing highly fault-tolerant systems that never stop,
|
||||||
* Software Transactional Memory (STM). (Distributed transactions coming soon).
|
systems that self-heal.
|
||||||
* Transactors: combine actors and STM into transactional actors. Allows you to compose atomic message flows with automatic rollback and retry.
|
* Software Transactional Memory (STM). (Distributed transactions coming soon).
|
||||||
* Remoting: highly performant distributed actors with remote supervision and error management.
|
* Transactors: combine actors and STM into transactional actors. Allows you to compose atomic message flows with
|
||||||
* Cluster membership management.
|
automatic rollback and retry.
|
||||||
|
* Remoting: highly performant distributed actors with remote supervision and error management.
|
||||||
|
* Cluster membership management.
|
||||||
|
|
||||||
Akka also has a set of add-on modules:
|
Akka also has a set of add-on modules:
|
||||||
* Persistence: A set of pluggable back-end storage modules that works in sync with the STM.
|
* Persistence: A set of pluggable back-end storage modules that works in sync with the STM.
|
||||||
* Cassandra distributed and highly scalable database.
|
* Cassandra distributed and highly scalable database.
|
||||||
* MongoDB document database.
|
* MongoDB document database.
|
||||||
* Redis data structures database (upcoming)
|
* Redis data structures database (upcoming)
|
||||||
* REST (JAX-RS): Expose actors as REST services.
|
* REST (JAX-RS): Expose actors as REST services.
|
||||||
* Comet: Expose actors as Comet services.
|
* Comet: Expose actors as Comet services.
|
||||||
* Security: Digest and Kerberos based security.
|
* Security: Digest and Kerberos based security.
|
||||||
* Microkernel: Run Akka as a stand-alone kernel.
|
* Microkernel: Run Akka as a stand-alone kernel.
|
||||||
</description>
|
</description>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
|
|
@ -155,15 +157,9 @@
|
||||||
<url>http://www.lag.net/repo</url>
|
<url>http://www.lag.net/repo</url>
|
||||||
</repository>
|
</repository>
|
||||||
<repository>
|
<repository>
|
||||||
<id>multiverse-releases</id>
|
<id>repository.codehaus.org</id>
|
||||||
<url>http://multiverse.googlecode.com/svn/maven-repository/releases</url>
|
<name>Codehaus Maven Repository</name>
|
||||||
<snapshots>
|
<url>http://repository.codehaus.org</url>
|
||||||
<enabled>false</enabled>
|
|
||||||
</snapshots>
|
|
||||||
</repository>
|
|
||||||
<repository>
|
|
||||||
<id>multiverse-snaphosts</id>
|
|
||||||
<url>http://multiverse.googlecode.com/svn/maven-repository/snapshots</url>
|
|
||||||
</repository>
|
</repository>
|
||||||
<repository>
|
<repository>
|
||||||
<id>maven2-repository.dev.java.net</id>
|
<id>maven2-repository.dev.java.net</id>
|
||||||
|
|
@ -258,7 +254,7 @@
|
||||||
<sourceDirectory>src/main/scala</sourceDirectory>
|
<sourceDirectory>src/main/scala</sourceDirectory>
|
||||||
<testSourceDirectory>src/test/scala</testSourceDirectory>
|
<testSourceDirectory>src/test/scala</testSourceDirectory>
|
||||||
<plugins>
|
<plugins>
|
||||||
<plugin>
|
<plugin>
|
||||||
<groupId>org.apache.maven.plugins</groupId>
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
<artifactId>maven-enforcer-plugin</artifactId>
|
<artifactId>maven-enforcer-plugin</artifactId>
|
||||||
<version>1.0-beta-1</version>
|
<version>1.0-beta-1</version>
|
||||||
|
|
@ -276,7 +272,7 @@
|
||||||
</requireProperty>
|
</requireProperty>
|
||||||
<requireFilesExist>
|
<requireFilesExist>
|
||||||
<files>
|
<files>
|
||||||
<file>${env.AKKA_HOME}/embedded-repo</file>
|
<file>${env.AKKA_HOME}/embedded-repo</file>
|
||||||
</files>
|
</files>
|
||||||
</requireFilesExist>
|
</requireFilesExist>
|
||||||
</rules>
|
</rules>
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue