From 8a265ca7b977c862e80cfe6e81fdcb10b84d0ba6 Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Thu, 14 Jul 2011 14:31:32 +0300 Subject: [PATCH] 1011 and 909 --- .../scala/akka/transactor/Coordinated.scala | 37 ++++++++++++++++--- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/akka-stm/src/main/scala/akka/transactor/Coordinated.scala b/akka-stm/src/main/scala/akka/transactor/Coordinated.scala index 2adec6098f..b777861323 100644 --- a/akka-stm/src/main/scala/akka/transactor/Coordinated.scala +++ b/akka-stm/src/main/scala/akka/transactor/Coordinated.scala @@ -5,11 +5,13 @@ package akka.transactor import akka.config.Config -import akka.stm.{ Atomic, DefaultTransactionConfig, TransactionFactory } +import akka.stm.{Atomic, DefaultTransactionConfig, TransactionFactory} -import org.multiverse.api.{ Transaction ⇒ MultiverseTransaction } import org.multiverse.commitbarriers.CountDownCommitBarrier import org.multiverse.templates.TransactionalCallable +import akka.actor.ActorTimeoutException +import org.multiverse.api.{TransactionConfiguration, Transaction ⇒ MultiverseTransaction} +import org.multiverse.api.exceptions.ControlFlowError /** * Coordinated transactions across actors. @@ -86,8 +88,9 @@ object Coordinated { class Coordinated(val message: Any, barrier: CountDownCommitBarrier) { // Java API constructors - def this(message: Any) = this(message, Coordinated.createBarrier) - def this() = this(null, Coordinated.createBarrier) + def this(message: Any) = this (message, Coordinated.createBarrier) + + def this() = this (null, Coordinated.createBarrier) /** * Create a new Coordinated object and increment the number of parties by one. @@ -118,6 +121,8 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) { /** * Delimits the coordinated transaction. The transaction will wait for all other transactions * in this coordination before committing. The timeout is specified by the transaction factory. + * + * @throws ActorTimeoutException if the coordinated transaction times out. */ def atomic[T](body: ⇒ T)(implicit factory: TransactionFactory = Coordinated.DefaultFactory): T = atomic(factory)(body) @@ -125,13 +130,29 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) { /** * Delimits the coordinated transaction. The transaction will wait for all other transactions * in this coordination before committing. The timeout is specified by the transaction factory. + * + * @throws ActorTimeoutException if the coordinated transaction times out. */ def atomic[T](factory: TransactionFactory)(body: ⇒ T): T = { factory.boilerplate.execute(new TransactionalCallable[T]() { def call(mtx: MultiverseTransaction): T = { - val result = body + val result = try { + body + } catch { + case e: ControlFlowError => throw e + case e: Exception => { + barrier.abort() + throw e + } + } + val timeout = factory.config.timeout - barrier.tryJoinCommit(mtx, timeout.length, timeout.unit) + if (!barrier.tryJoinCommit(mtx, timeout.length, timeout.unit)) { + val config: TransactionConfiguration = mtx.getConfiguration + throw new ActorTimeoutException( + "Failed to complete transaction [" + config.getFamilyName + "] " + + "with a maxium timeout of [" + config.getTimeoutNs + "] ns") + } result } }) @@ -141,6 +162,8 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) { * Java API: coordinated atomic method that accepts an [[akka.stm.Atomic]]. * Delimits the coordinated transaction. The transaction will wait for all other transactions * in this coordination before committing. The timeout is specified by the transaction factory. + * + * @throws ActorTimeoutException if the coordinated transaction times out */ def atomic[T](jatomic: Atomic[T]): T = atomic(jatomic.factory)(jatomic.atomically) @@ -148,6 +171,8 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) { * Java API: coordinated atomic method that accepts an [[akka.transactor.Atomically]]. * Delimits the coordinated transaction. The transaction will wait for all other transactions * in this coordination before committing. The timeout is specified by the transaction factory. + * + * @throws ActorTimeoutException if the coordinated transaction times out. */ def atomic(atomically: Atomically): Unit = atomic(atomically.factory)(atomically.atomically)