diff --git a/akka-stm/src/main/scala/akka/transactor/Coordinated.scala b/akka-stm/src/main/scala/akka/transactor/Coordinated.scala index d459b4b307..b8dc293d49 100644 --- a/akka-stm/src/main/scala/akka/transactor/Coordinated.scala +++ b/akka-stm/src/main/scala/akka/transactor/Coordinated.scala @@ -6,11 +6,13 @@ package akka.transactor import akka.AkkaException import akka.config.Config -import akka.stm.{ Atomic, DefaultTransactionConfig, TransactionFactory } +import akka.stm.{Atomic, DefaultTransactionConfig, TransactionFactory} -import org.multiverse.api.{ Transaction ⇒ MultiverseTransaction } import org.multiverse.commitbarriers.CountDownCommitBarrier import org.multiverse.templates.TransactionalCallable +import akka.actor.ActorTimeoutException +import org.multiverse.api.{TransactionConfiguration, Transaction ⇒ MultiverseTransaction} +import org.multiverse.api.exceptions.ControlFlowError /** * Akka-specific exception for coordinated transactions. @@ -92,8 +94,9 @@ object Coordinated { class Coordinated(val message: Any, barrier: CountDownCommitBarrier) { // Java API constructors - def this(message: Any) = this(message, Coordinated.createBarrier) - def this() = this(null, Coordinated.createBarrier) + def this(message: Any) = this (message, Coordinated.createBarrier) + + def this() = this (null, Coordinated.createBarrier) /** * Create a new Coordinated object and increment the number of parties by one. @@ -124,6 +127,8 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) { /** * Delimits the coordinated transaction. The transaction will wait for all other transactions * in this coordination before committing. The timeout is specified by the transaction factory. + * + * @throws ActorTimeoutException if the coordinated transaction times out. */ def atomic[T](body: ⇒ T)(implicit factory: TransactionFactory = Coordinated.DefaultFactory): T = atomic(factory)(body) @@ -131,19 +136,28 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) { /** * Delimits the coordinated transaction. The transaction will wait for all other transactions * in this coordination before committing. The timeout is specified by the transaction factory. + * + * @throws ActorTimeoutException if the coordinated transaction times out. */ def atomic[T](factory: TransactionFactory)(body: ⇒ T): T = { factory.boilerplate.execute(new TransactionalCallable[T]() { def call(mtx: MultiverseTransaction): T = { - val result = try { body } catch { case e: Exception ⇒ barrier.abort(); throw e } - val timeout = factory.config.timeout - try { - barrier.tryJoinCommit(mtx, timeout.length, timeout.unit) + val result = try { + body } catch { - case e: org.multiverse.api.exceptions.DeadTransactionException ⇒ - throw new CoordinatedTransactionException("Coordinated transaction aborted") - case e: java.lang.IllegalStateException ⇒ - throw new CoordinatedTransactionException("Coordinated transaction aborted") + case e: ControlFlowError => throw e + case e: Exception => { + barrier.abort() + throw e + } + } + + val timeout = factory.config.timeout + if (!barrier.tryJoinCommit(mtx, timeout.length, timeout.unit)) { + val config: TransactionConfiguration = mtx.getConfiguration + throw new ActorTimeoutException( + "Failed to complete transaction [" + config.getFamilyName + "] " + + "with a maxium timeout of [" + config.getTimeoutNs + "] ns") } result } @@ -154,6 +168,8 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) { * Java API: coordinated atomic method that accepts an [[akka.stm.Atomic]]. * Delimits the coordinated transaction. The transaction will wait for all other transactions * in this coordination before committing. The timeout is specified by the transaction factory. + * + * @throws ActorTimeoutException if the coordinated transaction times out */ def atomic[T](jatomic: Atomic[T]): T = atomic(jatomic.factory)(jatomic.atomically) @@ -161,6 +177,8 @@ class Coordinated(val message: Any, barrier: CountDownCommitBarrier) { * Java API: coordinated atomic method that accepts an [[akka.transactor.Atomically]]. * Delimits the coordinated transaction. The transaction will wait for all other transactions * in this coordination before committing. The timeout is specified by the transaction factory. + * + * @throws ActorTimeoutException if the coordinated transaction times out. */ def atomic(atomically: Atomically): Unit = atomic(atomically.factory)(atomically.atomically)