diff --git a/akka-stm/src/main/scala/akka/transactor/Atomically.scala b/akka-stm/src/main/scala/akka/transactor/Atomically.scala deleted file mode 100644 index 7f74d34cdc..0000000000 --- a/akka-stm/src/main/scala/akka/transactor/Atomically.scala +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.transactor - -import akka.stm.TransactionFactory - -/** - * For Java-friendly coordinated atomic blocks. - * - * Similar to [[akka.stm.Atomic]] but used to pass a block to Coordinated.atomic - * or to Coordination.coordinate. - * - * @see [[akka.transactor.Coordinated]] - * @see [[akka.transactor.Coordination]] - */ -abstract class Atomically(val factory: TransactionFactory) { - def this() = this(Coordinated.DefaultFactory) - def atomically: Unit -} diff --git a/akka-stm/src/main/scala/akka/transactor/Coordinated.scala b/akka-stm/src/main/scala/akka/transactor/Coordinated.scala deleted file mode 100644 index a7a30e7cee..0000000000 --- a/akka-stm/src/main/scala/akka/transactor/Coordinated.scala +++ /dev/null @@ -1,199 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.transactor - -import akka.AkkaException -import akka.stm.{ Atomic, DefaultTransactionConfig, TransactionFactory } - -import org.multiverse.commitbarriers.CountDownCommitBarrier -import org.multiverse.templates.TransactionalCallable -import akka.actor.ActorTimeoutException -import org.multiverse.api.{ TransactionConfiguration, Transaction ⇒ MultiverseTransaction } -import org.multiverse.api.exceptions.ControlFlowError - -/** - * Akka-specific exception for coordinated transactions. - */ -class CoordinatedTransactionException(message: String, cause: Throwable = null) extends AkkaException(message, cause) { - def this(msg: String) = this(msg, null); -} - -/** - * Coordinated transactions across actors. - */ -object Coordinated { - val DefaultFactory = TransactionFactory(DefaultTransactionConfig, "DefaultCoordinatedTransaction") - - def apply(message: Any = null) = new Coordinated(message, createBarrier) - - def unapply(c: Coordinated): Option[Any] = Some(c.message) - - def createBarrier = new CountDownCommitBarrier(1, true) -} - -/** - * `Coordinated` is a message wrapper that adds a `CountDownCommitBarrier` for explicitly - * coordinating transactions across actors or threads. - * - * Creating a `Coordinated` will create a count down barrier with initially one member. - * For each member in the coordination set a transaction is expected to be created using - * the coordinated atomic method. The number of included parties must match the number of - * transactions, otherwise a successful transaction cannot be coordinated. - *

- * - * To start a new coordinated transaction set that you will also participate in just create - * a `Coordinated` object: - * - * {{{ - * val coordinated = Coordinated() - * }}} - *
- * - * To start a coordinated transaction that you won't participate in yourself you can create a - * `Coordinated` object with a message and send it directly to an actor. The recipient of the message - * will be the first member of the coordination set: - * - * {{{ - * actor ! Coordinated(Message) - * }}} - *
- * - * To receive a coordinated message in an actor simply match it in a case statement: - * - * {{{ - * def receive = { - * case coordinated @ Coordinated(Message) => ... - * } - * }}} - *
- * - * To include another actor in the same coordinated transaction set that you've created or - * received, use the apply method on that object. This will increment the number of parties - * involved by one and create a new `Coordinated` object to be sent. - * - * {{{ - * actor ! coordinated(Message) - * }}} - *
- * - * To enter the coordinated transaction use the atomic method of the coordinated object: - * - * {{{ - * coordinated atomic { - * // do something in transaction ... - * } - * }}} - * - * The coordinated transaction will wait for the other transactions before committing. - * If any of the coordinated transactions fail then they all fail. - * - * @see [[akka.actor.Transactor]] for an actor that implements coordinated transactions - */ -class Coordinated(val message: Any, barrier: CountDownCommitBarrier) { - - // Java API constructors - def this(message: Any) = this(message, Coordinated.createBarrier) - - def this() = this(null, Coordinated.createBarrier) - - /** - * Create a new Coordinated object and increment the number of parties by one. - * Use this method to ''pass on'' the coordination. - */ - def apply(msg: Any) = { - barrier.incParties(1) - new Coordinated(msg, barrier) - } - - /** - * Create a new Coordinated object but *do not* increment the number of parties by one. - * Only use this method if you know this is what you need. - */ - def noIncrement(msg: Any) = new Coordinated(msg, barrier) - - /** - * Java API: get the message for this Coordinated. - */ - def getMessage() = message - - /** - * Java API: create a new Coordinated object and increment the number of parties by one. - * Use this method to ''pass on'' the coordination. - */ - def coordinate(msg: Any) = apply(msg) - - /** - * Delimits the coordinated transaction. The transaction will wait for all other transactions - * in this coordination before committing. The timeout is specified by the transaction factory. - * - * @throws ActorTimeoutException if the coordinated transaction times out. - */ - def atomic[T](body: ⇒ T)(implicit factory: TransactionFactory = Coordinated.DefaultFactory): T = - atomic(factory)(body) - - /** - * Delimits the coordinated transaction. The transaction will wait for all other transactions - * in this coordination before committing. The timeout is specified by the transaction factory. - * - * @throws ActorTimeoutException if the coordinated transaction times out. - */ - def atomic[T](factory: TransactionFactory)(body: ⇒ T): T = { - factory.boilerplate.execute(new TransactionalCallable[T]() { - def call(mtx: MultiverseTransaction): T = { - val result = try { - body - } catch { - case e: ControlFlowError ⇒ throw e - case e: Exception ⇒ { - barrier.abort() - throw e - } - } - - val timeout = factory.config.timeout - val success = try { - barrier.tryJoinCommit(mtx, timeout.length, timeout.unit) - } catch { - case e: IllegalStateException ⇒ { - val config: TransactionConfiguration = mtx.getConfiguration - throw new CoordinatedTransactionException("Coordinated transaction [" + config.getFamilyName + "] aborted", e) - } - } - - if (!success) { - val config: TransactionConfiguration = mtx.getConfiguration - throw new ActorTimeoutException( - "Failed to complete coordinated transaction [" + config.getFamilyName + "] " + - "with a maxium timeout of [" + config.getTimeoutNs + "] ns") - } - result - } - }) - } - - /** - * Java API: coordinated atomic method that accepts an [[akka.stm.Atomic]]. - * Delimits the coordinated transaction. The transaction will wait for all other transactions - * in this coordination before committing. The timeout is specified by the transaction factory. - * - * @throws ActorTimeoutException if the coordinated transaction times out - */ - def atomic[T](jatomic: Atomic[T]): T = atomic(jatomic.factory)(jatomic.atomically) - - /** - * Java API: coordinated atomic method that accepts an [[akka.transactor.Atomically]]. - * Delimits the coordinated transaction. The transaction will wait for all other transactions - * in this coordination before committing. The timeout is specified by the transaction factory. - * - * @throws ActorTimeoutException if the coordinated transaction times out. - */ - def atomic(atomically: Atomically): Unit = atomic(atomically.factory)(atomically.atomically) - - /** - * An empty coordinated atomic block. Can be used to complete the number of parties involved - * and wait for all transactions to complete. The default timeout is used. - */ - def await() = atomic(Coordinated.DefaultFactory) {} -} diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedFailer.java b/akka-stm/src/test/java/akka/transactor/test/UntypedFailer.java deleted file mode 100644 index 5e7328c566..0000000000 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedFailer.java +++ /dev/null @@ -1,9 +0,0 @@ -package akka.transactor.test; - -import akka.transactor.UntypedTransactor; - -public class UntypedFailer extends UntypedTransactor { - public void atomically(Object message) throws Exception { - throw new ExpectedFailureException(); - } -} diff --git a/akka-stm/src/test/scala/akka/stm/test/ConfigSpec.scala b/akka-stm/src/test/scala/akka/stm/test/ConfigSpec.scala deleted file mode 100644 index 8bc40cc1d1..0000000000 --- a/akka-stm/src/test/scala/akka/stm/test/ConfigSpec.scala +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.stm.test - -import org.junit.runner.RunWith -import org.scalatest.WordSpec -import org.scalatest.junit.JUnitRunner -import org.scalatest.matchers.MustMatchers -import akka.actor.ActorSystem -import com.typesafe.config.ConfigFactory -import akka.testkit.AkkaSpec - -@RunWith(classOf[JUnitRunner]) -class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) { - - "The default configuration file (i.e. reference.conf)" should { - "contain all configuration properties for akka-stm that are used in code with their correct defaults" in { - val config = system.settings.config - import config._ - - // TODO are these config values used anywhere? - - getBoolean("akka.stm.blocking-allowed") must equal(false) - getBoolean("akka.stm.fair") must equal(true) - getBoolean("akka.stm.interruptible") must equal(false) - getInt("akka.stm.max-retries") must equal(1000) - getString("akka.stm.propagation") must equal("requires") - getBoolean("akka.stm.quick-release") must equal(true) - getBoolean("akka.stm.speculative") must equal(true) - getMilliseconds("akka.stm.timeout") must equal(5 * 1000) - getString("akka.stm.trace-level") must equal("none") - getBoolean("akka.stm.write-skew") must equal(true) - } - } -} diff --git a/akka-stm/src/test/scala/akka/stm/test/JavaStmSpec.scala b/akka-stm/src/test/scala/akka/stm/test/JavaStmSpec.scala deleted file mode 100644 index a5847d2e87..0000000000 --- a/akka-stm/src/test/scala/akka/stm/test/JavaStmSpec.scala +++ /dev/null @@ -1,5 +0,0 @@ -package akka.stm.test - -import org.scalatest.junit.JUnitWrapperSuite - -class JavaStmSpec extends JUnitWrapperSuite("akka.stm.test.JavaStmTests", Thread.currentThread.getContextClassLoader) diff --git a/akka-stm/src/test/scala/akka/stm/test/RefSpec.scala b/akka-stm/src/test/scala/akka/stm/test/RefSpec.scala deleted file mode 100644 index 019d693e54..0000000000 --- a/akka-stm/src/test/scala/akka/stm/test/RefSpec.scala +++ /dev/null @@ -1,152 +0,0 @@ -package akka.stm.test - -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers - -class RefSpec extends WordSpec with MustMatchers { - - import akka.stm._ - - "A Ref" should { - - "optionally accept an initial value" in { - val emptyRef = Ref[Int] - val empty = atomic { emptyRef.opt } - - empty must be(None) - - val ref = Ref(3) - val value = atomic { ref.get } - - value must be(3) - } - - "keep the initial value, even if the first transaction is rolled back" in { - val ref = Ref(3) - - try { - atomic(DefaultTransactionFactory) { - ref.swap(5) - throw new Exception - } - } catch { - case e ⇒ {} - } - - val value = atomic { ref.get } - - value must be(3) - } - - "be settable using set" in { - val ref = Ref[Int] - - atomic { ref.set(3) } - - val value = atomic { ref.get } - - value must be(3) - } - - "be settable using swap" in { - val ref = Ref[Int] - - atomic { ref.swap(3) } - - val value = atomic { ref.get } - - value must be(3) - } - - "be changeable using alter" in { - val ref = Ref(0) - - def increment = atomic { - ref alter (_ + 1) - } - - increment - increment - increment - - val value = atomic { ref.get } - - value must be(3) - } - - "be able to be mapped" in { - val ref1 = Ref(1) - - val ref2 = atomic { - ref1 map (_ + 1) - } - - val value1 = atomic { ref1.get } - val value2 = atomic { ref2.get } - - value1 must be(1) - value2 must be(2) - } - - "be able to be used in a 'foreach' for comprehension" in { - val ref = Ref(3) - - var result = 0 - - atomic { - for (value ← ref) { - result += value - } - } - - result must be(3) - } - - "be able to be used in a 'map' for comprehension" in { - val ref1 = Ref(1) - - val ref2 = atomic { - for (value ← ref1) yield value + 2 - } - - val value2 = atomic { ref2.get } - - value2 must be(3) - } - - "be able to be used in a 'flatMap' for comprehension" in { - val ref1 = Ref(1) - val ref2 = Ref(2) - - val ref3 = atomic { - for { - value1 ← ref1 - value2 ← ref2 - } yield value1 + value2 - } - - val value3 = atomic { ref3.get } - - value3 must be(3) - } - - "be able to be used in a 'filter' for comprehension" in { - val ref1 = Ref(1) - - val refLess2 = atomic { - for (value ← ref1 if value < 2) yield value - } - - val optLess2 = atomic { refLess2.opt } - - val refGreater2 = atomic { - for (value ← ref1 if value > 2) yield value - } - - val optGreater2 = atomic { refGreater2.opt } - - optLess2 must be(Some(1)) - optGreater2 must be(None) - } - } -} diff --git a/akka-stm/src/test/scala/akka/stm/test/StmSpec.scala b/akka-stm/src/test/scala/akka/stm/test/StmSpec.scala deleted file mode 100644 index 1f547fc1ae..0000000000 --- a/akka-stm/src/test/scala/akka/stm/test/StmSpec.scala +++ /dev/null @@ -1,129 +0,0 @@ -package akka.stm.test - -import akka.actor.Actor -import akka.actor.Actor._ - -import org.multiverse.api.exceptions.ReadonlyException - -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers - -class StmSpec extends WordSpec with MustMatchers { - - import akka.stm._ - - "Local STM" should { - - "be able to do multiple consecutive atomic {..} statements" in { - val ref = Ref(0) - - def increment = atomic { - ref alter (_ + 1) - } - - def total: Int = atomic { - ref.getOrElse(0) - } - - increment - increment - increment - - total must be(3) - } - - "be able to do nested atomic {..} statements" in { - val ref = Ref(0) - - def increment = atomic { - ref alter (_ + 1) - } - - def total: Int = atomic { - ref.getOrElse(0) - } - - atomic { - increment - increment - } - - atomic { - increment - total must be(3) - } - } - - "roll back failing nested atomic {..} statements" in { - val ref = Ref(0) - - def increment = atomic { - ref alter (_ + 1) - } - - def total: Int = atomic { - ref.getOrElse(0) - } - - try { - atomic(DefaultTransactionFactory) { - increment - increment - throw new Exception - } - } catch { - case e ⇒ {} - } - - total must be(0) - } - - "use the outer transaction settings by default" in { - val readonlyFactory = TransactionFactory(readonly = true) - val writableFactory = TransactionFactory(readonly = false) - - val ref = Ref(0) - - def writableOuter = - atomic(writableFactory) { - atomic(readonlyFactory) { - ref alter (_ + 1) - } - } - - def readonlyOuter = - atomic(readonlyFactory) { - atomic(writableFactory) { - ref alter (_ + 1) - } - } - - writableOuter must be(1) - evaluating { readonlyOuter } must produce[ReadonlyException] - } - - "allow propagation settings for nested transactions" in { - val readonlyFactory = TransactionFactory(readonly = true) - val writableRequiresNewFactory = TransactionFactory(readonly = false, propagation = Propagation.RequiresNew) - - val ref = Ref(0) - - def writableOuter = - atomic(writableRequiresNewFactory) { - atomic(readonlyFactory) { - ref alter (_ + 1) - } - } - - def readonlyOuter = - atomic(readonlyFactory) { - atomic(writableRequiresNewFactory) { - ref alter (_ + 1) - } - } - - writableOuter must be(1) - readonlyOuter must be(2) - } - } -} diff --git a/akka-transactor/src/main/scala/akka/transactor/Atomically.scala b/akka-transactor/src/main/scala/akka/transactor/Atomically.scala new file mode 100644 index 0000000000..4314d04d76 --- /dev/null +++ b/akka-transactor/src/main/scala/akka/transactor/Atomically.scala @@ -0,0 +1,18 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.transactor + +import scala.concurrent.stm.InTxn + +/** + * Java API. + * + * For creating Java-friendly coordinated atomic blocks. + * + * @see [[akka.transactor.Coordinated]] + */ +trait Atomically { + def atomically(txn: InTxn): Unit +} diff --git a/akka-transactor/src/main/scala/akka/transactor/Coordinated.scala b/akka-transactor/src/main/scala/akka/transactor/Coordinated.scala new file mode 100644 index 0000000000..f9ef8538be --- /dev/null +++ b/akka-transactor/src/main/scala/akka/transactor/Coordinated.scala @@ -0,0 +1,157 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.transactor + +import akka.AkkaException +import akka.util.Timeout +import scala.concurrent.stm._ + +/** + * Akka-specific exception for coordinated transactions. + */ +class CoordinatedTransactionException(message: String, cause: Throwable = null) extends AkkaException(message, cause) { + def this(msg: String) = this(msg, null); +} + +/** + * Coordinated transactions across actors. + */ +object Coordinated { + def apply(message: Any = null)(implicit timeout: Timeout) = new Coordinated(message, createInitialMember(timeout)) + + def unapply(c: Coordinated): Option[Any] = Some(c.message) + + def createInitialMember(timeout: Timeout) = CommitBarrier(timeout.duration.toMillis).addMember() +} + +/** + * `Coordinated` is a message wrapper that adds a `CommitBarrier` for explicitly + * coordinating transactions across actors or threads. + * + * Creating a `Coordinated` will create a commit barrier with initially one member. + * For each member in the coordination set a transaction is expected to be created using + * the coordinated atomic method, or the coordination cancelled using the cancel method. + * + * The number of included members must match the number of transactions, otherwise a + * successful transaction cannot be coordinated. + *

+ * + * To start a new coordinated transaction set that you will also participate in just create + * a `Coordinated` object: + * + * {{{ + * val coordinated = Coordinated() + * }}} + *
+ * + * To start a coordinated transaction that you won't participate in yourself you can create a + * `Coordinated` object with a message and send it directly to an actor. The recipient of the message + * will be the first member of the coordination set: + * + * {{{ + * actor ! Coordinated(Message) + * }}} + *
+ * + * To receive a coordinated message in an actor simply match it in a case statement: + * + * {{{ + * def receive = { + * case coordinated @ Coordinated(Message) => ... + * } + * }}} + *
+ * + * To include another actor in the same coordinated transaction set that you've created or + * received, use the apply method on that object. This will increment the number of parties + * involved by one and create a new `Coordinated` object to be sent. + * + * {{{ + * actor ! coordinated(Message) + * }}} + *
+ * + * To enter the coordinated transaction use the atomic method of the coordinated object: + * + * {{{ + * coordinated.atomic { implicit txn => + * // do something in transaction ... + * } + * }}} + * + * The coordinated transaction will wait for the other transactions before committing. + * If any of the coordinated transactions fail then they all fail. + * + * @see [[akka.actor.Transactor]] for an actor that implements coordinated transactions + */ +class Coordinated(val message: Any, member: CommitBarrier.Member) { + + // Java API constructors + + def this(message: Any, timeout: Timeout) = this(message, Coordinated.createInitialMember(timeout)) + + def this(timeout: Timeout) = this(null, Coordinated.createInitialMember(timeout)) + + /** + * Create a new Coordinated object and increment the number of members by one. + * Use this method to ''pass on'' the coordination. + */ + def apply(msg: Any) = { + new Coordinated(msg, member.commitBarrier.addMember()) + } + + /** + * Create a new Coordinated object but *do not* increment the number of members by one. + * Only use this method if you know this is what you need. + */ + def noIncrement(msg: Any) = new Coordinated(msg, member) + + /** + * Java API: get the message for this Coordinated. + */ + def getMessage() = message + + /** + * Java API: create a new Coordinated object and increment the number of members by one. + * Use this method to ''pass on'' the coordination. + */ + def coordinate(msg: Any) = apply(msg) + + /** + * Delimits the coordinated transaction. The transaction will wait for all other transactions + * in this coordination before committing. The timeout is specified when creating the Coordinated. + * + * @throws CoordinatedTransactionException if the coordinated transaction fails. + */ + def atomic[T](body: InTxn ⇒ T): T = { + member.atomic(body) match { + case Right(result) ⇒ result + case Left(CommitBarrier.MemberUncaughtExceptionCause(x)) ⇒ + throw new CoordinatedTransactionException("Exception in coordinated atomic", x) + case Left(cause) ⇒ + throw new CoordinatedTransactionException("Failed due to " + cause) + } + } + + /** + * Java API: coordinated atomic method that accepts an [[akka.transactor.Atomically]]. + * Delimits the coordinated transaction. The transaction will wait for all other transactions + * in this coordination before committing. The timeout is specified when creating the Coordinated. + * + * @throws CoordinatedTransactionException if the coordinated transaction fails. + */ + def atomic(atomically: Atomically): Unit = atomic(txn ⇒ atomically.atomically(txn)) + + /** + * An empty coordinated atomic block. Can be used to complete the number of members involved + * and wait for all transactions to complete. + */ + def await() = atomic(txn ⇒ ()) + + /** + * Cancel this Coordinated transaction. + */ + def cancel(info: Any) = member.cancel(CommitBarrier.UserCancel(info)) +} diff --git a/akka-stm/src/main/scala/akka/transactor/Transactor.scala b/akka-transactor/src/main/scala/akka/transactor/Transactor.scala similarity index 81% rename from akka-stm/src/main/scala/akka/transactor/Transactor.scala rename to akka-transactor/src/main/scala/akka/transactor/Transactor.scala index a3572eaa2a..d33cd85e58 100644 --- a/akka-stm/src/main/scala/akka/transactor/Transactor.scala +++ b/akka-transactor/src/main/scala/akka/transactor/Transactor.scala @@ -5,7 +5,7 @@ package akka.transactor import akka.actor.{ Actor, ActorRef } -import akka.stm.{ DefaultTransactionConfig, TransactionFactory } +import scala.concurrent.stm.InTxn /** * Used for specifying actor refs and messages to send to during coordination. @@ -15,10 +15,9 @@ case class SendTo(actor: ActorRef, message: Option[Any] = None) /** * An actor with built-in support for coordinated transactions. * - * Transactors implement the general pattern for using [[akka.stm.Coordinated]] where - * first any coordination messages are sent to other transactors, then the coordinated - * transaction is entered. - * Transactors can also accept explicitly sent `Coordinated` messages. + * Transactors implement the general pattern for using [[akka.transactor.Coordinated]] where + * coordination messages are sent to other transactors then the coordinated transaction is + * entered. Transactors can also accept explicitly sent `Coordinated` messages. *

* * Simple transactors will just implement the `atomically` method which is similar to @@ -30,16 +29,16 @@ case class SendTo(actor: ActorRef, message: Option[Any] = None) * class Counter extends Transactor { * val count = Ref(0) * - * def atomically = { - * case Increment => count alter (_ + 1) + * def atomically = implicit txn => { + * case Increment => count transform (_ + 1) * } * } * }}} *
* * To coordinate with other transactors override the `coordinate` method. - * The `coordinate` method maps a message to a set - * of [[akka.actor.Transactor.SendTo]] objects, pairs of `ActorRef` and a message. + * The `coordinate` method maps a message to a set of [[akka.actor.Transactor.SendTo]] + * objects, pairs of `ActorRef` and a message. * You can use the `include` and `sendTo` methods to easily coordinate with other transactors. * The `include` method will send on the same message that was received to other transactors. * The `sendTo` method allows you to specify both the actor to send to, and message to send. @@ -54,8 +53,8 @@ case class SendTo(actor: ActorRef, message: Option[Any] = None) * case Increment => include(friend) * } * - * def atomically = { - * case Increment => count alter (_ + 1) + * def atomically = implicit txn => { + * case Increment => count transform (_ + 1) * } * } * }}} @@ -91,16 +90,9 @@ case class SendTo(actor: ActorRef, message: Option[Any] = None) * can implement normal actor behavior, or use the normal STM atomic for * local transactions. * - * @see [[akka.stm.Coordinated]] for more information about the underlying mechanism + * @see [[akka.transactor.Coordinated]] for more information about the underlying mechanism */ trait Transactor extends Actor { - private lazy val txFactory = transactionFactory - - /** - * Create default transaction factory. Override to provide custom configuration. - */ - def transactionFactory = TransactionFactory(DefaultTransactionConfig) - /** * Implement a general pattern for using coordinated transactions. */ @@ -111,12 +103,12 @@ trait Transactor extends Actor { sendTo.actor ! coordinated(sendTo.message.getOrElse(message)) } (before orElse doNothing)(message) - coordinated.atomic(txFactory) { (atomically orElse doNothing)(message) } + coordinated.atomic { txn ⇒ (atomically(txn) orElse doNothing)(message) } (after orElse doNothing)(message) } case message ⇒ { if (normally.isDefinedAt(message)) normally(message) - else receive(Coordinated(message)) + else receive(Coordinated(message)(context.system.settings.ActorTimeout)) } } @@ -158,8 +150,16 @@ trait Transactor extends Actor { /** * The Receive block to run inside the coordinated transaction. + * This is a function from InTxn to Receive block. + * + * For example: + * {{{ + * def atomically = implicit txn => { + * case Increment => count transform (_ + 1) + * } + * }}} */ - def atomically: Receive + def atomically: InTxn ⇒ Receive /** * A Receive block that runs after the coordinated transaction. diff --git a/akka-stm/src/main/scala/akka/transactor/UntypedTransactor.scala b/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala similarity index 87% rename from akka-stm/src/main/scala/akka/transactor/UntypedTransactor.scala rename to akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala index e299bdd1c1..9a37f81915 100644 --- a/akka-stm/src/main/scala/akka/transactor/UntypedTransactor.scala +++ b/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala @@ -5,23 +5,14 @@ package akka.transactor import akka.actor.{ UntypedActor, ActorRef } -import akka.stm.{ DefaultTransactionConfig, TransactionFactory } - -import java.util.{ Set ⇒ JSet } - import scala.collection.JavaConversions._ +import scala.concurrent.stm.InTxn +import java.util.{ Set ⇒ JSet } /** * An UntypedActor version of transactor for using from Java. */ abstract class UntypedTransactor extends UntypedActor { - private lazy val txFactory = transactionFactory - - /** - * Create default transaction factory. Override to provide custom configuration. - */ - def transactionFactory = TransactionFactory(DefaultTransactionConfig) - /** * Implement a general pattern for using coordinated transactions. */ @@ -34,12 +25,12 @@ abstract class UntypedTransactor extends UntypedActor { sendTo.actor.tell(coordinated(sendTo.message.getOrElse(message))) } before(message) - coordinated.atomic(txFactory) { atomically(message) } + coordinated.atomic { txn ⇒ atomically(txn, message) } after(message) } case message ⇒ { val normal = normally(message) - if (!normal) onReceive(Coordinated(message)) + if (!normal) onReceive(Coordinated(message)(context.system.settings.ActorTimeout)) } } } @@ -93,7 +84,7 @@ abstract class UntypedTransactor extends UntypedActor { * The Receive block to run inside the coordinated transaction. */ @throws(classOf[Exception]) - def atomically(message: Any) {} + def atomically(txn: InTxn, message: Any) {} /** * A Receive block that runs after the coordinated transaction. diff --git a/akka-stm/src/test/java/akka/transactor/test/ExpectedFailureException.java b/akka-transactor/src/test/java/akka/transactor/ExpectedFailureException.java similarity index 59% rename from akka-stm/src/test/java/akka/transactor/test/ExpectedFailureException.java rename to akka-transactor/src/test/java/akka/transactor/ExpectedFailureException.java index a4f1beb647..5727317415 100644 --- a/akka-stm/src/test/java/akka/transactor/test/ExpectedFailureException.java +++ b/akka-transactor/src/test/java/akka/transactor/ExpectedFailureException.java @@ -1,4 +1,8 @@ -package akka.transactor.test; +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.transactor; public class ExpectedFailureException extends RuntimeException { public ExpectedFailureException() { diff --git a/akka-stm/src/test/java/akka/transactor/test/Increment.java b/akka-transactor/src/test/java/akka/transactor/Increment.java similarity index 82% rename from akka-stm/src/test/java/akka/transactor/test/Increment.java rename to akka-transactor/src/test/java/akka/transactor/Increment.java index e66fab1318..cdbd3fcfae 100644 --- a/akka-stm/src/test/java/akka/transactor/test/Increment.java +++ b/akka-transactor/src/test/java/akka/transactor/Increment.java @@ -1,4 +1,8 @@ -package akka.transactor.test; +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.transactor; import akka.actor.ActorRef; import java.util.List; diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedCounter.java b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java similarity index 53% rename from akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedCounter.java rename to akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java index ecc8a1739d..ad073273a7 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedCounter.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java @@ -1,33 +1,34 @@ -package akka.transactor.test; +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.transactor; -import akka.transactor.Coordinated; -import akka.transactor.Atomically; import akka.actor.ActorRef; import akka.actor.Actors; import akka.actor.UntypedActor; -import akka.stm.*; import akka.util.FiniteDuration; - -import org.multiverse.api.StmUtils; - +import scala.Function1; +import scala.concurrent.stm.*; +import scala.reflect.*; +import scala.runtime.AbstractFunction1; +import scala.runtime.BoxedUnit; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class UntypedCoordinatedCounter extends UntypedActor { private String name; - private Ref count = new Ref(0); - private TransactionFactory txFactory = new TransactionFactoryBuilder() - .setTimeout(new FiniteDuration(3, TimeUnit.SECONDS)) - .build(); + private Manifest manifest = Manifest$.MODULE$.classType(Integer.class); + private Ref count = Ref$.MODULE$.apply(0, manifest); public UntypedCoordinatedCounter(String name) { this.name = name; } - private void increment() { - //System.out.println(name + ": incrementing"); - count.set(count.get() + 1); + private void increment(InTxn txn) { + Integer newValue = count.get(txn) + 1; + count.set(newValue, txn); } public void onReceive(Object incoming) throws Exception { @@ -38,20 +39,24 @@ public class UntypedCoordinatedCounter extends UntypedActor { Increment increment = (Increment) message; List friends = increment.getFriends(); final CountDownLatch latch = increment.getLatch(); + final Function1 countDown = new AbstractFunction1() { + public BoxedUnit apply(Txn.Status status) { + latch.countDown(); return null; + } + }; if (!friends.isEmpty()) { Increment coordMessage = new Increment(friends.subList(1, friends.size()), latch); friends.get(0).tell(coordinated.coordinate(coordMessage)); } - coordinated.atomic(new Atomically(txFactory) { - public void atomically() { - increment(); - StmUtils.scheduleDeferredTask(new Runnable() { public void run() { latch.countDown(); } }); - StmUtils.scheduleCompensatingTask(new Runnable() { public void run() { latch.countDown(); } }); + coordinated.atomic(new Atomically() { + public void atomically(InTxn txn) { + increment(txn); + Txn.afterCompletion(countDown, txn); } }); } } else if ("GetCount".equals(incoming)) { - getSender().tell(count.get()); + getSender().tell(count.single().get()); } } } diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java similarity index 71% rename from akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java rename to akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java index e09d15b74d..fee7515531 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java @@ -1,27 +1,31 @@ -package akka.transactor.test; +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.transactor; import static org.junit.Assert.*; -import akka.dispatch.Await; -import akka.util.Duration; -import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.Before; import akka.actor.ActorSystem; -import akka.transactor.Coordinated; import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; +import akka.dispatch.Await; import akka.dispatch.Future; import akka.testkit.AkkaSpec; import akka.testkit.EventFilter; import akka.testkit.ErrorFilter; import akka.testkit.TestEvent; +import akka.transactor.Coordinated; import akka.transactor.CoordinatedTransactionException; +import akka.util.Duration; +import akka.util.Timeout; import java.util.Arrays; import java.util.ArrayList; @@ -33,8 +37,6 @@ import scala.collection.JavaConverters; import scala.collection.Seq; public class UntypedCoordinatedIncrementTest { - ActorSystem application = ActorSystem.create("UntypedCoordinatedIncrementTest", AkkaSpec.testConf()); - private static ActorSystem system; @BeforeClass @@ -52,37 +54,38 @@ public class UntypedCoordinatedIncrementTest { ActorRef failer; int numCounters = 3; - int timeout = 5; - int askTimeout = 5000; + int timeoutSeconds = 5; + + Timeout timeout = new Timeout(Duration.create(timeoutSeconds, TimeUnit.SECONDS)); @Before public void initialise() { - Props p = new Props().withCreator(UntypedFailer.class); counters = new ArrayList(); for (int i = 1; i <= numCounters; i++) { final String name = "counter" + i; - ActorRef counter = application.actorOf(new Props().withCreator(new UntypedActorFactory() { + ActorRef counter = system.actorOf(new Props().withCreator(new UntypedActorFactory() { public UntypedActor create() { return new UntypedCoordinatedCounter(name); } })); counters.add(counter); } - failer = application.actorOf(p); + failer = system.actorOf(new Props().withCreator(UntypedFailer.class)); } @Test public void incrementAllCountersWithSuccessfulTransaction() { CountDownLatch incrementLatch = new CountDownLatch(numCounters); Increment message = new Increment(counters.subList(1, counters.size()), incrementLatch); - counters.get(0).tell(new Coordinated(message)); + counters.get(0).tell(new Coordinated(message, timeout)); try { - incrementLatch.await(timeout, TimeUnit.SECONDS); + incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS); } catch (InterruptedException exception) { } for (ActorRef counter : counters) { - Future future = counter.ask("GetCount", askTimeout); - assertEquals(1, ((Integer) Await.result(future, Duration.create(timeout, TimeUnit.SECONDS))).intValue()); + Future future = counter.ask("GetCount", timeout); + int count = (Integer) Await.result(future, timeout.duration()); + assertEquals(1, count); } } @@ -91,28 +94,24 @@ public class UntypedCoordinatedIncrementTest { EventFilter expectedFailureFilter = (EventFilter) new ErrorFilter(ExpectedFailureException.class); EventFilter coordinatedFilter = (EventFilter) new ErrorFilter(CoordinatedTransactionException.class); Seq ignoreExceptions = seq(expectedFailureFilter, coordinatedFilter); - application.eventStream().publish(new TestEvent.Mute(ignoreExceptions)); + system.eventStream().publish(new TestEvent.Mute(ignoreExceptions)); CountDownLatch incrementLatch = new CountDownLatch(numCounters); List actors = new ArrayList(counters); actors.add(failer); Increment message = new Increment(actors.subList(1, actors.size()), incrementLatch); - actors.get(0).tell(new Coordinated(message)); + actors.get(0).tell(new Coordinated(message, timeout)); try { - incrementLatch.await(timeout, TimeUnit.SECONDS); + incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS); } catch (InterruptedException exception) { } for (ActorRef counter : counters) { - Futurefuture = counter.ask("GetCount", askTimeout); - assertEquals(0,((Integer) Await.result(future, Duration.create(timeout, TimeUnit.SECONDS))).intValue()); + Futurefuture = counter.ask("GetCount", timeout); + int count = (Integer) Await.result(future, timeout.duration()); + assertEquals(0, count); } } public Seq seq(A... args) { return JavaConverters.collectionAsScalaIterableConverter(Arrays.asList(args)).asScala().toSeq(); } - - @After - public void stop() { - application.shutdown(); - } } diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedCounter.java b/akka-transactor/src/test/java/akka/transactor/UntypedCounter.java similarity index 53% rename from akka-stm/src/test/java/akka/transactor/test/UntypedCounter.java rename to akka-transactor/src/test/java/akka/transactor/UntypedCounter.java index d4d53b084c..455be2624b 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedCounter.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedCounter.java @@ -1,13 +1,18 @@ -package akka.transactor.test; +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.transactor; + +import akka.actor.ActorRef; import akka.transactor.UntypedTransactor; import akka.transactor.SendTo; -import akka.actor.ActorRef; -import akka.stm.*; import akka.util.FiniteDuration; - -import org.multiverse.api.StmUtils; - +import scala.Function1; +import scala.concurrent.stm.*; +import scala.reflect.*; +import scala.runtime.AbstractFunction1; +import scala.runtime.BoxedUnit; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -15,21 +20,16 @@ import java.util.concurrent.TimeUnit; public class UntypedCounter extends UntypedTransactor { private String name; - private Ref count = new Ref(0); + private Manifest manifest = Manifest$.MODULE$.classType(Integer.class); + private Ref count = Ref$.MODULE$.apply(0, manifest); public UntypedCounter(String name) { this.name = name; } - @Override public TransactionFactory transactionFactory() { - return new TransactionFactoryBuilder() - .setTimeout(new FiniteDuration(3, TimeUnit.SECONDS)) - .build(); - } - - private void increment() { - //System.out.println(name + ": incrementing"); - count.set(count.get() + 1); + private void increment(InTxn txn) { + Integer newValue = count.get(txn) + 1; + count.set(newValue, txn); } @Override public Set coordinate(Object message) { @@ -47,30 +47,22 @@ public class UntypedCounter extends UntypedTransactor { } } - @Override public void before(Object message) { - //System.out.println(name + ": before transaction"); - } - - public void atomically(Object message) { + public void atomically(InTxn txn, Object message) { if (message instanceof Increment) { - increment(); + increment(txn); final Increment increment = (Increment) message; - StmUtils.scheduleDeferredTask(new Runnable() { - public void run() { increment.getLatch().countDown(); } - }); - StmUtils.scheduleCompensatingTask(new Runnable() { - public void run() { increment.getLatch().countDown(); } - }); + final Function1 countDown = new AbstractFunction1() { + public BoxedUnit apply(Txn.Status status) { + increment.getLatch().countDown(); return null; + } + }; + Txn.afterCompletion(countDown, txn); } } - @Override public void after(Object message) { - //System.out.println(name + ": after transaction"); - } - @Override public boolean normally(Object message) { if ("GetCount".equals(message)) { - getSender().tell(count.get()); + getSender().tell(count.single().get()); return true; } else return false; } diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedFailer.java b/akka-transactor/src/test/java/akka/transactor/UntypedFailer.java new file mode 100644 index 0000000000..7efce68e13 --- /dev/null +++ b/akka-transactor/src/test/java/akka/transactor/UntypedFailer.java @@ -0,0 +1,14 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.transactor; + +import akka.transactor.UntypedTransactor; +import scala.concurrent.stm.InTxn; + +public class UntypedFailer extends UntypedTransactor { + public void atomically(InTxn txn, Object message) throws Exception { + throw new ExpectedFailureException(); + } +} diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java b/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java similarity index 82% rename from akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java rename to akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java index db5528f10c..95c695b4e8 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java @@ -1,9 +1,11 @@ -package akka.transactor.test; +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.transactor; import static org.junit.Assert.*; -import akka.dispatch.Await; -import akka.util.Duration; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -14,11 +16,15 @@ import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; +import akka.dispatch.Await; import akka.dispatch.Future; +import akka.testkit.AkkaSpec; import akka.testkit.EventFilter; import akka.testkit.ErrorFilter; import akka.testkit.TestEvent; import akka.transactor.CoordinatedTransactionException; +import akka.util.Duration; +import akka.util.Timeout; import java.util.Arrays; import java.util.ArrayList; @@ -28,7 +34,6 @@ import java.util.concurrent.TimeUnit; import scala.collection.JavaConverters; import scala.collection.Seq; -import akka.testkit.AkkaSpec; public class UntypedTransactorTest { @@ -49,8 +54,9 @@ public class UntypedTransactorTest { ActorRef failer; int numCounters = 3; - int timeout = 5; - int askTimeout = 5000; + int timeoutSeconds = 5; + + Timeout timeout = new Timeout(Duration.create(timeoutSeconds, TimeUnit.SECONDS)); @Before public void initialise() { @@ -73,12 +79,12 @@ public class UntypedTransactorTest { Increment message = new Increment(counters.subList(1, counters.size()), incrementLatch); counters.get(0).tell(message); try { - incrementLatch.await(timeout, TimeUnit.SECONDS); + incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS); } catch (InterruptedException exception) { } for (ActorRef counter : counters) { - Future future = counter.ask("GetCount", askTimeout); - int count = (Integer) Await.result(future, Duration.create(askTimeout, TimeUnit.MILLISECONDS)); + Future future = counter.ask("GetCount", timeout); + int count = (Integer) Await.result(future, timeout.duration()); assertEquals(1, count); } } @@ -95,12 +101,12 @@ public class UntypedTransactorTest { Increment message = new Increment(actors.subList(1, actors.size()), incrementLatch); actors.get(0).tell(message); try { - incrementLatch.await(timeout, TimeUnit.SECONDS); + incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS); } catch (InterruptedException exception) { } for (ActorRef counter : counters) { - Future future = counter.ask("GetCount", askTimeout); - int count = (Integer) Await.result(future, Duration.create(askTimeout, TimeUnit.MILLISECONDS)); + Future future = counter.ask("GetCount", timeout); + int count = (Integer) Await.result(future, timeout.duration()); assertEquals(0, count); } } diff --git a/akka-stm/src/test/scala/akka/transactor/test/CoordinatedIncrementSpec.scala b/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala similarity index 86% rename from akka-stm/src/test/scala/akka/transactor/test/CoordinatedIncrementSpec.scala rename to akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala index 660945db4a..47067d3595 100644 --- a/akka-stm/src/test/scala/akka/transactor/test/CoordinatedIncrementSpec.scala +++ b/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala @@ -1,14 +1,17 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + package akka.transactor import org.scalatest.BeforeAndAfterAll -import akka.actor.ActorSystem import akka.actor._ -import akka.stm.{ Ref, TransactionFactory } +import akka.dispatch.Await import akka.util.duration._ import akka.util.Timeout import akka.testkit._ -import akka.dispatch.Await +import scala.concurrent.stm._ object CoordinatedIncrement { case class Increment(friends: Seq[ActorRef]) @@ -17,34 +20,27 @@ object CoordinatedIncrement { class Counter(name: String) extends Actor { val count = Ref(0) - implicit val txFactory = TransactionFactory(timeout = 3 seconds) - - def increment = { - count alter (_ + 1) - } - def receive = { case coordinated @ Coordinated(Increment(friends)) ⇒ { if (friends.nonEmpty) { friends.head ! coordinated(Increment(friends.tail)) } - coordinated atomic { - increment + coordinated.atomic { implicit t ⇒ + count transform (_ + 1) } } - case GetCount ⇒ sender ! count.get + case GetCount ⇒ sender ! count.single.get } } class ExpectedFailureException extends RuntimeException("Expected failure") class Failer extends Actor { - val txFactory = TransactionFactory(timeout = 3 seconds) def receive = { case coordinated @ Coordinated(Increment(friends)) ⇒ { - coordinated.atomic(txFactory) { + coordinated.atomic { t ⇒ throw new ExpectedFailureException } } diff --git a/akka-stm/src/test/scala/akka/transactor/test/FickleFriendsSpec.scala b/akka-transactor/src/test/scala/akka/transactor/FickleFriendsSpec.scala similarity index 75% rename from akka-stm/src/test/scala/akka/transactor/test/FickleFriendsSpec.scala rename to akka-transactor/src/test/scala/akka/transactor/FickleFriendsSpec.scala index 5a850444c9..6f4e46de6a 100644 --- a/akka-stm/src/test/scala/akka/transactor/test/FickleFriendsSpec.scala +++ b/akka-transactor/src/test/scala/akka/transactor/FickleFriendsSpec.scala @@ -1,21 +1,23 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + package akka.transactor -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers import org.scalatest.BeforeAndAfterAll -import akka.actor.ActorSystem + import akka.actor._ -import akka.util.Timeout -import akka.stm._ +import akka.dispatch.Await import akka.util.duration._ +import akka.util.Timeout import akka.testkit._ +import akka.testkit.TestEvent.Mute +import scala.concurrent.stm._ import scala.util.Random.{ nextInt ⇒ random } import java.util.concurrent.CountDownLatch -import akka.testkit.TestEvent.Mute -import akka.dispatch.Await object FickleFriends { - case class FriendlyIncrement(friends: Seq[ActorRef], latch: CountDownLatch) + case class FriendlyIncrement(friends: Seq[ActorRef], timeout: Timeout, latch: CountDownLatch) case class Increment(friends: Seq[ActorRef]) case object GetCount @@ -25,24 +27,22 @@ object FickleFriends { class Coordinator(name: String) extends Actor { val count = Ref(0) - implicit val txFactory = TransactionFactory(timeout = 3 seconds) - - def increment = { - count alter (_ + 1) + def increment(implicit txn: InTxn) = { + count transform (_ + 1) } def receive = { - case FriendlyIncrement(friends, latch) ⇒ { + case FriendlyIncrement(friends, timeout, latch) ⇒ { var success = false while (!success) { try { - val coordinated = Coordinated() + val coordinated = Coordinated()(timeout) if (friends.nonEmpty) { friends.head ! coordinated(Increment(friends.tail)) } - coordinated atomic { + coordinated.atomic { implicit t ⇒ increment - deferred { + Txn.afterCommit { status ⇒ success = true latch.countDown() } @@ -53,7 +53,7 @@ object FickleFriends { } } - case GetCount ⇒ sender ! count.get + case GetCount ⇒ sender ! count.single.get } } @@ -65,14 +65,18 @@ object FickleFriends { class FickleCounter(name: String) extends Actor { val count = Ref(0) - implicit val txFactory = TransactionFactory(timeout = 3 seconds) + val maxFailures = 3 + var failures = 0 - def increment = { - count alter (_ + 1) + def increment(implicit txn: InTxn) = { + count transform (_ + 1) } def failIf(x: Int, y: Int) = { - if (x == y) throw new ExpectedFailureException("Random fail at position " + x) + if (x == y && failures < maxFailures) { + failures += 1 + throw new ExpectedFailureException("Random fail at position " + x) + } } def receive = { @@ -83,14 +87,14 @@ object FickleFriends { friends.head ! coordinated(Increment(friends.tail)) } failIf(failAt, 1) - coordinated atomic { + coordinated.atomic { implicit t ⇒ failIf(failAt, 2) increment failIf(failAt, 3) } } - case GetCount ⇒ sender ! count.get + case GetCount ⇒ sender ! count.single.get } } } @@ -119,7 +123,7 @@ class FickleFriendsSpec extends AkkaSpec with BeforeAndAfterAll { system.eventStream.publish(Mute(ignoreExceptions)) val (counters, coordinator) = actorOfs val latch = new CountDownLatch(1) - coordinator ! FriendlyIncrement(counters, latch) + coordinator ! FriendlyIncrement(counters, timeout, latch) latch.await // this could take a while Await.result(coordinator ? GetCount, timeout.duration) must be === 1 for (counter ← counters) { diff --git a/akka-stm/src/test/scala/akka/transactor/test/JavaUntypedCoordinatedSpec.scala b/akka-transactor/src/test/scala/akka/transactor/JavaUntypedCoordinatedSpec.scala similarity index 57% rename from akka-stm/src/test/scala/akka/transactor/test/JavaUntypedCoordinatedSpec.scala rename to akka-transactor/src/test/scala/akka/transactor/JavaUntypedCoordinatedSpec.scala index f48705469c..6c18959ce9 100644 --- a/akka-stm/src/test/scala/akka/transactor/test/JavaUntypedCoordinatedSpec.scala +++ b/akka-transactor/src/test/scala/akka/transactor/JavaUntypedCoordinatedSpec.scala @@ -1,7 +1,11 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + package akka.transactor import org.scalatest.junit.JUnitWrapperSuite class JavaUntypedCoordinatedSpec extends JUnitWrapperSuite( - "akka.transactor.test.UntypedCoordinatedIncrementTest", + "akka.transactor.UntypedCoordinatedIncrementTest", Thread.currentThread.getContextClassLoader) diff --git a/akka-stm/src/test/scala/akka/transactor/test/JavaUntypedTransactorSpec.scala b/akka-transactor/src/test/scala/akka/transactor/JavaUntypedTransactorSpec.scala similarity index 59% rename from akka-stm/src/test/scala/akka/transactor/test/JavaUntypedTransactorSpec.scala rename to akka-transactor/src/test/scala/akka/transactor/JavaUntypedTransactorSpec.scala index d4da5f0545..7e3c0e8294 100644 --- a/akka-stm/src/test/scala/akka/transactor/test/JavaUntypedTransactorSpec.scala +++ b/akka-transactor/src/test/scala/akka/transactor/JavaUntypedTransactorSpec.scala @@ -1,7 +1,11 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + package akka.transactor import org.scalatest.junit.JUnitWrapperSuite class JavaUntypedTransactorSpec extends JUnitWrapperSuite( - "akka.transactor.test.UntypedTransactorTest", + "akka.transactor.UntypedTransactorTest", Thread.currentThread.getContextClassLoader) diff --git a/akka-stm/src/test/scala/akka/transactor/test/TransactorSpec.scala b/akka-transactor/src/test/scala/akka/transactor/TransactorSpec.scala similarity index 85% rename from akka-stm/src/test/scala/akka/transactor/test/TransactorSpec.scala rename to akka-transactor/src/test/scala/akka/transactor/TransactorSpec.scala index 545ef13c6a..3130fd2d64 100644 --- a/akka-stm/src/test/scala/akka/transactor/test/TransactorSpec.scala +++ b/akka-transactor/src/test/scala/akka/transactor/TransactorSpec.scala @@ -1,15 +1,15 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + package akka.transactor -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers - -import akka.actor.ActorSystem import akka.actor._ -import akka.util.Timeout -import akka.stm._ -import akka.util.duration._ -import akka.testkit._ import akka.dispatch.Await +import akka.util.duration._ +import akka.util.Timeout +import akka.testkit._ +import scala.concurrent.stm._ object TransactorIncrement { case class Increment(friends: Seq[ActorRef], latch: TestLatch) @@ -18,10 +18,8 @@ object TransactorIncrement { class Counter(name: String) extends Transactor { val count = Ref(0) - override def transactionFactory = TransactionFactory(timeout = 3 seconds) - - def increment = { - count alter (_ + 1) + def increment(implicit txn: InTxn) = { + count transform (_ + 1) } override def coordinate = { @@ -35,11 +33,10 @@ object TransactorIncrement { case i: Increment ⇒ } - def atomically = { + def atomically = implicit txn ⇒ { case Increment(friends, latch) ⇒ { increment - deferred { latch.countDown() } - compensating { latch.countDown() } + Txn.afterCompletion { status ⇒ latch.countDown() } } } @@ -48,14 +45,14 @@ object TransactorIncrement { } override def normally = { - case GetCount ⇒ sender ! count.get + case GetCount ⇒ sender ! count.single.get } } class ExpectedFailureException extends RuntimeException("Expected failure") class Failer extends Transactor { - def atomically = { + def atomically = implicit txn ⇒ { case _ ⇒ throw new ExpectedFailureException } } @@ -65,10 +62,10 @@ object SimpleTransactor { case class Set(ref: Ref[Int], value: Int, latch: TestLatch) class Setter extends Transactor { - def atomically = { + def atomically = implicit txn ⇒ { case Set(ref, value, latch) ⇒ { - ref.set(value) - deferred { latch.countDown() } + ref() = value + Txn.afterCompletion { status ⇒ latch.countDown() } } } } @@ -129,7 +126,7 @@ class TransactorSpec extends AkkaSpec { val latch = TestLatch(1) transactor ! Set(ref, 5, latch) latch.await - val value = atomic { ref.get } + val value = ref.single.get value must be === 5 system.stop(transactor) } diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index a12964b8db..0138db65c5 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -30,7 +30,7 @@ object AkkaBuild extends Build { Unidoc.unidocExclude := Seq(samples.id, tutorials.id), Dist.distExclude := Seq(actorTests.id, akkaSbtPlugin.id, docs.id) ), - aggregate = Seq(actor, testkit, actorTests, remote, slf4j, agent, mailboxes, kernel, akkaSbtPlugin, samples, tutorials, docs) + aggregate = Seq(actor, testkit, actorTests, remote, slf4j, agent, transactor, mailboxes, kernel, akkaSbtPlugin, samples, tutorials, docs) ) lazy val actor = Project( @@ -103,6 +103,15 @@ object AkkaBuild extends Build { ) ) + lazy val transactor = Project( + id = "akka-transactor", + base = file("akka-transactor"), + dependencies = Seq(actor, testkit % "test->test"), + settings = defaultSettings ++ Seq( + libraryDependencies ++= Dependencies.transactor + ) + ) + // lazy val amqp = Project( // id = "akka-amqp", // base = file("akka-amqp"), @@ -265,7 +274,7 @@ object AkkaBuild extends Build { lazy val docs = Project( id = "akka-docs", base = file("akka-docs"), - dependencies = Seq(actor, testkit % "test->test", remote, slf4j, agent, fileMailbox, mongoMailbox, redisMailbox, beanstalkMailbox, zookeeperMailbox), + dependencies = Seq(actor, testkit % "test->test", remote, slf4j, agent, transactor, fileMailbox, mongoMailbox, redisMailbox, beanstalkMailbox, zookeeperMailbox), settings = defaultSettings ++ Seq( unmanagedSourceDirectories in Test <<= baseDirectory { _ ** "code" get }, libraryDependencies ++= Dependencies.docs, @@ -380,6 +389,8 @@ object Dependencies { val agent = Seq(scalaStm, Test.scalatest, Test.junit) + val transactor = Seq(scalaStm, Test.scalatest, Test.junit) + val amqp = Seq(rabbit, commonsIo, protobuf) val mailboxes = Seq(Test.scalatest, Test.junit)