1011 and 909

This commit is contained in:
Peter Veentjer 2011-07-14 14:43:34 +03:00
commit c6d8ff4d25

View file

@ -6,11 +6,13 @@ package akka.transactor
import akka.AkkaException
import akka.config.Config
import akka.stm.{ Atomic, DefaultTransactionConfig, TransactionFactory }
import akka.stm.{Atomic, DefaultTransactionConfig, TransactionFactory}
import org.multiverse.api.{ Transaction MultiverseTransaction }
import org.multiverse.commitbarriers.CountDownCommitBarrier
import org.multiverse.templates.TransactionalCallable
import akka.actor.ActorTimeoutException
import org.multiverse.api.{TransactionConfiguration, Transaction MultiverseTransaction}
import org.multiverse.api.exceptions.ControlFlowError
/**
* Akka-specific exception for coordinated transactions.
@ -92,8 +94,9 @@ object Coordinated {
class Coordinated(val message: Any, barrier: CountDownCommitBarrier) {
// Java API constructors
def this(message: Any) = this(message, Coordinated.createBarrier)
def this() = this(null, Coordinated.createBarrier)
def this(message: Any) = this (message, Coordinated.createBarrier)
def this() = this (null, Coordinated.createBarrier)
/**
* Create a new Coordinated object and increment the number of parties by one.
@ -124,6 +127,8 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) {
/**
* Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified by the transaction factory.
*
* @throws ActorTimeoutException if the coordinated transaction times out.
*/
def atomic[T](body: T)(implicit factory: TransactionFactory = Coordinated.DefaultFactory): T =
atomic(factory)(body)
@ -131,19 +136,28 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) {
/**
* Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified by the transaction factory.
*
* @throws ActorTimeoutException if the coordinated transaction times out.
*/
def atomic[T](factory: TransactionFactory)(body: T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = {
val result = try { body } catch { case e: Exception barrier.abort(); throw e }
val timeout = factory.config.timeout
try {
barrier.tryJoinCommit(mtx, timeout.length, timeout.unit)
val result = try {
body
} catch {
case e: org.multiverse.api.exceptions.DeadTransactionException
throw new CoordinatedTransactionException("Coordinated transaction aborted")
case e: java.lang.IllegalStateException
throw new CoordinatedTransactionException("Coordinated transaction aborted")
case e: ControlFlowError => throw e
case e: Exception => {
barrier.abort()
throw e
}
}
val timeout = factory.config.timeout
if (!barrier.tryJoinCommit(mtx, timeout.length, timeout.unit)) {
val config: TransactionConfiguration = mtx.getConfiguration
throw new ActorTimeoutException(
"Failed to complete transaction [" + config.getFamilyName + "] " +
"with a maxium timeout of [" + config.getTimeoutNs + "] ns")
}
result
}
@ -154,6 +168,8 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) {
* Java API: coordinated atomic method that accepts an [[akka.stm.Atomic]].
* Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified by the transaction factory.
*
* @throws ActorTimeoutException if the coordinated transaction times out
*/
def atomic[T](jatomic: Atomic[T]): T = atomic(jatomic.factory)(jatomic.atomically)
@ -161,6 +177,8 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) {
* Java API: coordinated atomic method that accepts an [[akka.transactor.Atomically]].
* Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified by the transaction factory.
*
* @throws ActorTimeoutException if the coordinated transaction times out.
*/
def atomic(atomically: Atomically): Unit = atomic(atomically.factory)(atomically.atomically)