1011 and 909

This commit is contained in:
Peter Veentjer 2011-07-14 14:31:32 +03:00
parent 5238caf25a
commit 8a265ca7b9

View file

@ -5,11 +5,13 @@
package akka.transactor
import akka.config.Config
import akka.stm.{ Atomic, DefaultTransactionConfig, TransactionFactory }
import akka.stm.{Atomic, DefaultTransactionConfig, TransactionFactory}
import org.multiverse.api.{ Transaction MultiverseTransaction }
import org.multiverse.commitbarriers.CountDownCommitBarrier
import org.multiverse.templates.TransactionalCallable
import akka.actor.ActorTimeoutException
import org.multiverse.api.{TransactionConfiguration, Transaction MultiverseTransaction}
import org.multiverse.api.exceptions.ControlFlowError
/**
* Coordinated transactions across actors.
@ -86,8 +88,9 @@ object Coordinated {
class Coordinated(val message: Any, barrier: CountDownCommitBarrier) {
// Java API constructors
def this(message: Any) = this(message, Coordinated.createBarrier)
def this() = this(null, Coordinated.createBarrier)
def this(message: Any) = this (message, Coordinated.createBarrier)
def this() = this (null, Coordinated.createBarrier)
/**
* Create a new Coordinated object and increment the number of parties by one.
@ -118,6 +121,8 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) {
/**
* Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified by the transaction factory.
*
* @throws ActorTimeoutException if the coordinated transaction times out.
*/
def atomic[T](body: T)(implicit factory: TransactionFactory = Coordinated.DefaultFactory): T =
atomic(factory)(body)
@ -125,13 +130,29 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) {
/**
* Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified by the transaction factory.
*
* @throws ActorTimeoutException if the coordinated transaction times out.
*/
def atomic[T](factory: TransactionFactory)(body: T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = {
val result = body
val result = try {
body
} catch {
case e: ControlFlowError => throw e
case e: Exception => {
barrier.abort()
throw e
}
}
val timeout = factory.config.timeout
barrier.tryJoinCommit(mtx, timeout.length, timeout.unit)
if (!barrier.tryJoinCommit(mtx, timeout.length, timeout.unit)) {
val config: TransactionConfiguration = mtx.getConfiguration
throw new ActorTimeoutException(
"Failed to complete transaction [" + config.getFamilyName + "] " +
"with a maxium timeout of [" + config.getTimeoutNs + "] ns")
}
result
}
})
@ -141,6 +162,8 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) {
* Java API: coordinated atomic method that accepts an [[akka.stm.Atomic]].
* Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified by the transaction factory.
*
* @throws ActorTimeoutException if the coordinated transaction times out
*/
def atomic[T](jatomic: Atomic[T]): T = atomic(jatomic.factory)(jatomic.atomically)
@ -148,6 +171,8 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) {
* Java API: coordinated atomic method that accepts an [[akka.transactor.Atomically]].
* Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified by the transaction factory.
*
* @throws ActorTimeoutException if the coordinated transaction times out.
*/
def atomic(atomically: Atomically): Unit = atomic(atomically.factory)(atomically.atomically)