Initial port of transactor to scala-stm

No java api for the stm yet but have converted the
untyped transactor and java tests by accessing the
underlying scala-stm stuff directly
This commit is contained in:
Peter Vlugter 2011-12-21 13:40:32 +13:00
parent a624c74045
commit dc3507fcf7
24 changed files with 398 additions and 744 deletions

View file

@ -1,21 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import akka.stm.TransactionFactory
/**
* For Java-friendly coordinated atomic blocks.
*
* Similar to [[akka.stm.Atomic]] but used to pass a block to Coordinated.atomic
* or to Coordination.coordinate.
*
* @see [[akka.transactor.Coordinated]]
* @see [[akka.transactor.Coordination]]
*/
abstract class Atomically(val factory: TransactionFactory) {
def this() = this(Coordinated.DefaultFactory)
def atomically: Unit
}

View file

@ -1,199 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import akka.AkkaException
import akka.stm.{ Atomic, DefaultTransactionConfig, TransactionFactory }
import org.multiverse.commitbarriers.CountDownCommitBarrier
import org.multiverse.templates.TransactionalCallable
import akka.actor.ActorTimeoutException
import org.multiverse.api.{ TransactionConfiguration, Transaction MultiverseTransaction }
import org.multiverse.api.exceptions.ControlFlowError
/**
* Akka-specific exception for coordinated transactions.
*/
class CoordinatedTransactionException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null);
}
/**
* Coordinated transactions across actors.
*/
object Coordinated {
val DefaultFactory = TransactionFactory(DefaultTransactionConfig, "DefaultCoordinatedTransaction")
def apply(message: Any = null) = new Coordinated(message, createBarrier)
def unapply(c: Coordinated): Option[Any] = Some(c.message)
def createBarrier = new CountDownCommitBarrier(1, true)
}
/**
* `Coordinated` is a message wrapper that adds a `CountDownCommitBarrier` for explicitly
* coordinating transactions across actors or threads.
*
* Creating a `Coordinated` will create a count down barrier with initially one member.
* For each member in the coordination set a transaction is expected to be created using
* the coordinated atomic method. The number of included parties must match the number of
* transactions, otherwise a successful transaction cannot be coordinated.
* <br/><br/>
*
* To start a new coordinated transaction set that you will also participate in just create
* a `Coordinated` object:
*
* {{{
* val coordinated = Coordinated()
* }}}
* <br/>
*
* To start a coordinated transaction that you won't participate in yourself you can create a
* `Coordinated` object with a message and send it directly to an actor. The recipient of the message
* will be the first member of the coordination set:
*
* {{{
* actor ! Coordinated(Message)
* }}}
* <br/>
*
* To receive a coordinated message in an actor simply match it in a case statement:
*
* {{{
* def receive = {
* case coordinated @ Coordinated(Message) => ...
* }
* }}}
* <br/>
*
* To include another actor in the same coordinated transaction set that you've created or
* received, use the apply method on that object. This will increment the number of parties
* involved by one and create a new `Coordinated` object to be sent.
*
* {{{
* actor ! coordinated(Message)
* }}}
* <br/>
*
* To enter the coordinated transaction use the atomic method of the coordinated object:
*
* {{{
* coordinated atomic {
* // do something in transaction ...
* }
* }}}
*
* The coordinated transaction will wait for the other transactions before committing.
* If any of the coordinated transactions fail then they all fail.
*
* @see [[akka.actor.Transactor]] for an actor that implements coordinated transactions
*/
class Coordinated(val message: Any, barrier: CountDownCommitBarrier) {
// Java API constructors
def this(message: Any) = this(message, Coordinated.createBarrier)
def this() = this(null, Coordinated.createBarrier)
/**
* Create a new Coordinated object and increment the number of parties by one.
* Use this method to ''pass on'' the coordination.
*/
def apply(msg: Any) = {
barrier.incParties(1)
new Coordinated(msg, barrier)
}
/**
* Create a new Coordinated object but *do not* increment the number of parties by one.
* Only use this method if you know this is what you need.
*/
def noIncrement(msg: Any) = new Coordinated(msg, barrier)
/**
* Java API: get the message for this Coordinated.
*/
def getMessage() = message
/**
* Java API: create a new Coordinated object and increment the number of parties by one.
* Use this method to ''pass on'' the coordination.
*/
def coordinate(msg: Any) = apply(msg)
/**
* Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified by the transaction factory.
*
* @throws ActorTimeoutException if the coordinated transaction times out.
*/
def atomic[T](body: T)(implicit factory: TransactionFactory = Coordinated.DefaultFactory): T =
atomic(factory)(body)
/**
* Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified by the transaction factory.
*
* @throws ActorTimeoutException if the coordinated transaction times out.
*/
def atomic[T](factory: TransactionFactory)(body: T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = {
val result = try {
body
} catch {
case e: ControlFlowError throw e
case e: Exception {
barrier.abort()
throw e
}
}
val timeout = factory.config.timeout
val success = try {
barrier.tryJoinCommit(mtx, timeout.length, timeout.unit)
} catch {
case e: IllegalStateException {
val config: TransactionConfiguration = mtx.getConfiguration
throw new CoordinatedTransactionException("Coordinated transaction [" + config.getFamilyName + "] aborted", e)
}
}
if (!success) {
val config: TransactionConfiguration = mtx.getConfiguration
throw new ActorTimeoutException(
"Failed to complete coordinated transaction [" + config.getFamilyName + "] " +
"with a maxium timeout of [" + config.getTimeoutNs + "] ns")
}
result
}
})
}
/**
* Java API: coordinated atomic method that accepts an [[akka.stm.Atomic]].
* Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified by the transaction factory.
*
* @throws ActorTimeoutException if the coordinated transaction times out
*/
def atomic[T](jatomic: Atomic[T]): T = atomic(jatomic.factory)(jatomic.atomically)
/**
* Java API: coordinated atomic method that accepts an [[akka.transactor.Atomically]].
* Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified by the transaction factory.
*
* @throws ActorTimeoutException if the coordinated transaction times out.
*/
def atomic(atomically: Atomically): Unit = atomic(atomically.factory)(atomically.atomically)
/**
* An empty coordinated atomic block. Can be used to complete the number of parties involved
* and wait for all transactions to complete. The default timeout is used.
*/
def await() = atomic(Coordinated.DefaultFactory) {}
}

View file

@ -1,9 +0,0 @@
package akka.transactor.test;
import akka.transactor.UntypedTransactor;
public class UntypedFailer extends UntypedTransactor {
public void atomically(Object message) throws Exception {
throw new ExpectedFailureException();
}
}

View file

@ -1,37 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stm.test
import org.junit.runner.RunWith
import org.scalatest.WordSpec
import org.scalatest.junit.JUnitRunner
import org.scalatest.matchers.MustMatchers
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
import akka.testkit.AkkaSpec
@RunWith(classOf[JUnitRunner])
class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) {
"The default configuration file (i.e. reference.conf)" should {
"contain all configuration properties for akka-stm that are used in code with their correct defaults" in {
val config = system.settings.config
import config._
// TODO are these config values used anywhere?
getBoolean("akka.stm.blocking-allowed") must equal(false)
getBoolean("akka.stm.fair") must equal(true)
getBoolean("akka.stm.interruptible") must equal(false)
getInt("akka.stm.max-retries") must equal(1000)
getString("akka.stm.propagation") must equal("requires")
getBoolean("akka.stm.quick-release") must equal(true)
getBoolean("akka.stm.speculative") must equal(true)
getMilliseconds("akka.stm.timeout") must equal(5 * 1000)
getString("akka.stm.trace-level") must equal("none")
getBoolean("akka.stm.write-skew") must equal(true)
}
}
}

View file

@ -1,5 +0,0 @@
package akka.stm.test
import org.scalatest.junit.JUnitWrapperSuite
class JavaStmSpec extends JUnitWrapperSuite("akka.stm.test.JavaStmTests", Thread.currentThread.getContextClassLoader)

View file

@ -1,152 +0,0 @@
package akka.stm.test
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
class RefSpec extends WordSpec with MustMatchers {
import akka.stm._
"A Ref" should {
"optionally accept an initial value" in {
val emptyRef = Ref[Int]
val empty = atomic { emptyRef.opt }
empty must be(None)
val ref = Ref(3)
val value = atomic { ref.get }
value must be(3)
}
"keep the initial value, even if the first transaction is rolled back" in {
val ref = Ref(3)
try {
atomic(DefaultTransactionFactory) {
ref.swap(5)
throw new Exception
}
} catch {
case e {}
}
val value = atomic { ref.get }
value must be(3)
}
"be settable using set" in {
val ref = Ref[Int]
atomic { ref.set(3) }
val value = atomic { ref.get }
value must be(3)
}
"be settable using swap" in {
val ref = Ref[Int]
atomic { ref.swap(3) }
val value = atomic { ref.get }
value must be(3)
}
"be changeable using alter" in {
val ref = Ref(0)
def increment = atomic {
ref alter (_ + 1)
}
increment
increment
increment
val value = atomic { ref.get }
value must be(3)
}
"be able to be mapped" in {
val ref1 = Ref(1)
val ref2 = atomic {
ref1 map (_ + 1)
}
val value1 = atomic { ref1.get }
val value2 = atomic { ref2.get }
value1 must be(1)
value2 must be(2)
}
"be able to be used in a 'foreach' for comprehension" in {
val ref = Ref(3)
var result = 0
atomic {
for (value ref) {
result += value
}
}
result must be(3)
}
"be able to be used in a 'map' for comprehension" in {
val ref1 = Ref(1)
val ref2 = atomic {
for (value ref1) yield value + 2
}
val value2 = atomic { ref2.get }
value2 must be(3)
}
"be able to be used in a 'flatMap' for comprehension" in {
val ref1 = Ref(1)
val ref2 = Ref(2)
val ref3 = atomic {
for {
value1 ref1
value2 ref2
} yield value1 + value2
}
val value3 = atomic { ref3.get }
value3 must be(3)
}
"be able to be used in a 'filter' for comprehension" in {
val ref1 = Ref(1)
val refLess2 = atomic {
for (value ref1 if value < 2) yield value
}
val optLess2 = atomic { refLess2.opt }
val refGreater2 = atomic {
for (value ref1 if value > 2) yield value
}
val optGreater2 = atomic { refGreater2.opt }
optLess2 must be(Some(1))
optGreater2 must be(None)
}
}
}

View file

@ -1,129 +0,0 @@
package akka.stm.test
import akka.actor.Actor
import akka.actor.Actor._
import org.multiverse.api.exceptions.ReadonlyException
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
class StmSpec extends WordSpec with MustMatchers {
import akka.stm._
"Local STM" should {
"be able to do multiple consecutive atomic {..} statements" in {
val ref = Ref(0)
def increment = atomic {
ref alter (_ + 1)
}
def total: Int = atomic {
ref.getOrElse(0)
}
increment
increment
increment
total must be(3)
}
"be able to do nested atomic {..} statements" in {
val ref = Ref(0)
def increment = atomic {
ref alter (_ + 1)
}
def total: Int = atomic {
ref.getOrElse(0)
}
atomic {
increment
increment
}
atomic {
increment
total must be(3)
}
}
"roll back failing nested atomic {..} statements" in {
val ref = Ref(0)
def increment = atomic {
ref alter (_ + 1)
}
def total: Int = atomic {
ref.getOrElse(0)
}
try {
atomic(DefaultTransactionFactory) {
increment
increment
throw new Exception
}
} catch {
case e {}
}
total must be(0)
}
"use the outer transaction settings by default" in {
val readonlyFactory = TransactionFactory(readonly = true)
val writableFactory = TransactionFactory(readonly = false)
val ref = Ref(0)
def writableOuter =
atomic(writableFactory) {
atomic(readonlyFactory) {
ref alter (_ + 1)
}
}
def readonlyOuter =
atomic(readonlyFactory) {
atomic(writableFactory) {
ref alter (_ + 1)
}
}
writableOuter must be(1)
evaluating { readonlyOuter } must produce[ReadonlyException]
}
"allow propagation settings for nested transactions" in {
val readonlyFactory = TransactionFactory(readonly = true)
val writableRequiresNewFactory = TransactionFactory(readonly = false, propagation = Propagation.RequiresNew)
val ref = Ref(0)
def writableOuter =
atomic(writableRequiresNewFactory) {
atomic(readonlyFactory) {
ref alter (_ + 1)
}
}
def readonlyOuter =
atomic(readonlyFactory) {
atomic(writableRequiresNewFactory) {
ref alter (_ + 1)
}
}
writableOuter must be(1)
readonlyOuter must be(2)
}
}
}

View file

@ -0,0 +1,18 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import scala.concurrent.stm.InTxn
/**
* Java API.
*
* For creating Java-friendly coordinated atomic blocks.
*
* @see [[akka.transactor.Coordinated]]
*/
trait Atomically {
def atomically(txn: InTxn): Unit
}

View file

@ -0,0 +1,157 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import akka.AkkaException
import akka.util.Timeout
import scala.concurrent.stm._
/**
* Akka-specific exception for coordinated transactions.
*/
class CoordinatedTransactionException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null);
}
/**
* Coordinated transactions across actors.
*/
object Coordinated {
def apply(message: Any = null)(implicit timeout: Timeout) = new Coordinated(message, createInitialMember(timeout))
def unapply(c: Coordinated): Option[Any] = Some(c.message)
def createInitialMember(timeout: Timeout) = CommitBarrier(timeout.duration.toMillis).addMember()
}
/**
* `Coordinated` is a message wrapper that adds a `CommitBarrier` for explicitly
* coordinating transactions across actors or threads.
*
* Creating a `Coordinated` will create a commit barrier with initially one member.
* For each member in the coordination set a transaction is expected to be created using
* the coordinated atomic method, or the coordination cancelled using the cancel method.
*
* The number of included members must match the number of transactions, otherwise a
* successful transaction cannot be coordinated.
* <br/><br/>
*
* To start a new coordinated transaction set that you will also participate in just create
* a `Coordinated` object:
*
* {{{
* val coordinated = Coordinated()
* }}}
* <br/>
*
* To start a coordinated transaction that you won't participate in yourself you can create a
* `Coordinated` object with a message and send it directly to an actor. The recipient of the message
* will be the first member of the coordination set:
*
* {{{
* actor ! Coordinated(Message)
* }}}
* <br/>
*
* To receive a coordinated message in an actor simply match it in a case statement:
*
* {{{
* def receive = {
* case coordinated @ Coordinated(Message) => ...
* }
* }}}
* <br/>
*
* To include another actor in the same coordinated transaction set that you've created or
* received, use the apply method on that object. This will increment the number of parties
* involved by one and create a new `Coordinated` object to be sent.
*
* {{{
* actor ! coordinated(Message)
* }}}
* <br/>
*
* To enter the coordinated transaction use the atomic method of the coordinated object:
*
* {{{
* coordinated.atomic { implicit txn =>
* // do something in transaction ...
* }
* }}}
*
* The coordinated transaction will wait for the other transactions before committing.
* If any of the coordinated transactions fail then they all fail.
*
* @see [[akka.actor.Transactor]] for an actor that implements coordinated transactions
*/
class Coordinated(val message: Any, member: CommitBarrier.Member) {
// Java API constructors
def this(message: Any, timeout: Timeout) = this(message, Coordinated.createInitialMember(timeout))
def this(timeout: Timeout) = this(null, Coordinated.createInitialMember(timeout))
/**
* Create a new Coordinated object and increment the number of members by one.
* Use this method to ''pass on'' the coordination.
*/
def apply(msg: Any) = {
new Coordinated(msg, member.commitBarrier.addMember())
}
/**
* Create a new Coordinated object but *do not* increment the number of members by one.
* Only use this method if you know this is what you need.
*/
def noIncrement(msg: Any) = new Coordinated(msg, member)
/**
* Java API: get the message for this Coordinated.
*/
def getMessage() = message
/**
* Java API: create a new Coordinated object and increment the number of members by one.
* Use this method to ''pass on'' the coordination.
*/
def coordinate(msg: Any) = apply(msg)
/**
* Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified when creating the Coordinated.
*
* @throws CoordinatedTransactionException if the coordinated transaction fails.
*/
def atomic[T](body: InTxn T): T = {
member.atomic(body) match {
case Right(result) result
case Left(CommitBarrier.MemberUncaughtExceptionCause(x))
throw new CoordinatedTransactionException("Exception in coordinated atomic", x)
case Left(cause)
throw new CoordinatedTransactionException("Failed due to " + cause)
}
}
/**
* Java API: coordinated atomic method that accepts an [[akka.transactor.Atomically]].
* Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified when creating the Coordinated.
*
* @throws CoordinatedTransactionException if the coordinated transaction fails.
*/
def atomic(atomically: Atomically): Unit = atomic(txn atomically.atomically(txn))
/**
* An empty coordinated atomic block. Can be used to complete the number of members involved
* and wait for all transactions to complete.
*/
def await() = atomic(txn ())
/**
* Cancel this Coordinated transaction.
*/
def cancel(info: Any) = member.cancel(CommitBarrier.UserCancel(info))
}

View file

@ -5,7 +5,7 @@
package akka.transactor
import akka.actor.{ Actor, ActorRef }
import akka.stm.{ DefaultTransactionConfig, TransactionFactory }
import scala.concurrent.stm.InTxn
/**
* Used for specifying actor refs and messages to send to during coordination.
@ -15,10 +15,9 @@ case class SendTo(actor: ActorRef, message: Option[Any] = None)
/**
* An actor with built-in support for coordinated transactions.
*
* Transactors implement the general pattern for using [[akka.stm.Coordinated]] where
* first any coordination messages are sent to other transactors, then the coordinated
* transaction is entered.
* Transactors can also accept explicitly sent `Coordinated` messages.
* Transactors implement the general pattern for using [[akka.transactor.Coordinated]] where
* coordination messages are sent to other transactors then the coordinated transaction is
* entered. Transactors can also accept explicitly sent `Coordinated` messages.
* <br/><br/>
*
* Simple transactors will just implement the `atomically` method which is similar to
@ -30,16 +29,16 @@ case class SendTo(actor: ActorRef, message: Option[Any] = None)
* class Counter extends Transactor {
* val count = Ref(0)
*
* def atomically = {
* case Increment => count alter (_ + 1)
* def atomically = implicit txn => {
* case Increment => count transform (_ + 1)
* }
* }
* }}}
* <br/>
*
* To coordinate with other transactors override the `coordinate` method.
* The `coordinate` method maps a message to a set
* of [[akka.actor.Transactor.SendTo]] objects, pairs of `ActorRef` and a message.
* The `coordinate` method maps a message to a set of [[akka.actor.Transactor.SendTo]]
* objects, pairs of `ActorRef` and a message.
* You can use the `include` and `sendTo` methods to easily coordinate with other transactors.
* The `include` method will send on the same message that was received to other transactors.
* The `sendTo` method allows you to specify both the actor to send to, and message to send.
@ -54,8 +53,8 @@ case class SendTo(actor: ActorRef, message: Option[Any] = None)
* case Increment => include(friend)
* }
*
* def atomically = {
* case Increment => count alter (_ + 1)
* def atomically = implicit txn => {
* case Increment => count transform (_ + 1)
* }
* }
* }}}
@ -91,16 +90,9 @@ case class SendTo(actor: ActorRef, message: Option[Any] = None)
* can implement normal actor behavior, or use the normal STM atomic for
* local transactions.
*
* @see [[akka.stm.Coordinated]] for more information about the underlying mechanism
* @see [[akka.transactor.Coordinated]] for more information about the underlying mechanism
*/
trait Transactor extends Actor {
private lazy val txFactory = transactionFactory
/**
* Create default transaction factory. Override to provide custom configuration.
*/
def transactionFactory = TransactionFactory(DefaultTransactionConfig)
/**
* Implement a general pattern for using coordinated transactions.
*/
@ -111,12 +103,12 @@ trait Transactor extends Actor {
sendTo.actor ! coordinated(sendTo.message.getOrElse(message))
}
(before orElse doNothing)(message)
coordinated.atomic(txFactory) { (atomically orElse doNothing)(message) }
coordinated.atomic { txn (atomically(txn) orElse doNothing)(message) }
(after orElse doNothing)(message)
}
case message {
if (normally.isDefinedAt(message)) normally(message)
else receive(Coordinated(message))
else receive(Coordinated(message)(context.system.settings.ActorTimeout))
}
}
@ -158,8 +150,16 @@ trait Transactor extends Actor {
/**
* The Receive block to run inside the coordinated transaction.
* This is a function from InTxn to Receive block.
*
* For example:
* {{{
* def atomically = implicit txn => {
* case Increment => count transform (_ + 1)
* }
* }}}
*/
def atomically: Receive
def atomically: InTxn Receive
/**
* A Receive block that runs after the coordinated transaction.

View file

@ -5,23 +5,14 @@
package akka.transactor
import akka.actor.{ UntypedActor, ActorRef }
import akka.stm.{ DefaultTransactionConfig, TransactionFactory }
import java.util.{ Set JSet }
import scala.collection.JavaConversions._
import scala.concurrent.stm.InTxn
import java.util.{ Set JSet }
/**
* An UntypedActor version of transactor for using from Java.
*/
abstract class UntypedTransactor extends UntypedActor {
private lazy val txFactory = transactionFactory
/**
* Create default transaction factory. Override to provide custom configuration.
*/
def transactionFactory = TransactionFactory(DefaultTransactionConfig)
/**
* Implement a general pattern for using coordinated transactions.
*/
@ -34,12 +25,12 @@ abstract class UntypedTransactor extends UntypedActor {
sendTo.actor.tell(coordinated(sendTo.message.getOrElse(message)))
}
before(message)
coordinated.atomic(txFactory) { atomically(message) }
coordinated.atomic { txn atomically(txn, message) }
after(message)
}
case message {
val normal = normally(message)
if (!normal) onReceive(Coordinated(message))
if (!normal) onReceive(Coordinated(message)(context.system.settings.ActorTimeout))
}
}
}
@ -93,7 +84,7 @@ abstract class UntypedTransactor extends UntypedActor {
* The Receive block to run inside the coordinated transaction.
*/
@throws(classOf[Exception])
def atomically(message: Any) {}
def atomically(txn: InTxn, message: Any) {}
/**
* A Receive block that runs after the coordinated transaction.

View file

@ -1,4 +1,8 @@
package akka.transactor.test;
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor;
public class ExpectedFailureException extends RuntimeException {
public ExpectedFailureException() {

View file

@ -1,4 +1,8 @@
package akka.transactor.test;
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor;
import akka.actor.ActorRef;
import java.util.List;

View file

@ -1,33 +1,34 @@
package akka.transactor.test;
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor;
import akka.transactor.Coordinated;
import akka.transactor.Atomically;
import akka.actor.ActorRef;
import akka.actor.Actors;
import akka.actor.UntypedActor;
import akka.stm.*;
import akka.util.FiniteDuration;
import org.multiverse.api.StmUtils;
import scala.Function1;
import scala.concurrent.stm.*;
import scala.reflect.*;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class UntypedCoordinatedCounter extends UntypedActor {
private String name;
private Ref<Integer> count = new Ref<Integer>(0);
private TransactionFactory txFactory = new TransactionFactoryBuilder()
.setTimeout(new FiniteDuration(3, TimeUnit.SECONDS))
.build();
private Manifest<Integer> manifest = Manifest$.MODULE$.classType(Integer.class);
private Ref<Integer> count = Ref$.MODULE$.apply(0, manifest);
public UntypedCoordinatedCounter(String name) {
this.name = name;
}
private void increment() {
//System.out.println(name + ": incrementing");
count.set(count.get() + 1);
private void increment(InTxn txn) {
Integer newValue = count.get(txn) + 1;
count.set(newValue, txn);
}
public void onReceive(Object incoming) throws Exception {
@ -38,20 +39,24 @@ public class UntypedCoordinatedCounter extends UntypedActor {
Increment increment = (Increment) message;
List<ActorRef> friends = increment.getFriends();
final CountDownLatch latch = increment.getLatch();
final Function1<Txn.Status, BoxedUnit> countDown = new AbstractFunction1<Txn.Status, BoxedUnit>() {
public BoxedUnit apply(Txn.Status status) {
latch.countDown(); return null;
}
};
if (!friends.isEmpty()) {
Increment coordMessage = new Increment(friends.subList(1, friends.size()), latch);
friends.get(0).tell(coordinated.coordinate(coordMessage));
}
coordinated.atomic(new Atomically(txFactory) {
public void atomically() {
increment();
StmUtils.scheduleDeferredTask(new Runnable() { public void run() { latch.countDown(); } });
StmUtils.scheduleCompensatingTask(new Runnable() { public void run() { latch.countDown(); } });
coordinated.atomic(new Atomically() {
public void atomically(InTxn txn) {
increment(txn);
Txn.afterCompletion(countDown, txn);
}
});
}
} else if ("GetCount".equals(incoming)) {
getSender().tell(count.get());
getSender().tell(count.single().get());
}
}
}

View file

@ -1,27 +1,31 @@
package akka.transactor.test;
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor;
import static org.junit.Assert.*;
import akka.dispatch.Await;
import akka.util.Duration;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.Before;
import akka.actor.ActorSystem;
import akka.transactor.Coordinated;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.dispatch.Await;
import akka.dispatch.Future;
import akka.testkit.AkkaSpec;
import akka.testkit.EventFilter;
import akka.testkit.ErrorFilter;
import akka.testkit.TestEvent;
import akka.transactor.Coordinated;
import akka.transactor.CoordinatedTransactionException;
import akka.util.Duration;
import akka.util.Timeout;
import java.util.Arrays;
import java.util.ArrayList;
@ -33,8 +37,6 @@ import scala.collection.JavaConverters;
import scala.collection.Seq;
public class UntypedCoordinatedIncrementTest {
ActorSystem application = ActorSystem.create("UntypedCoordinatedIncrementTest", AkkaSpec.testConf());
private static ActorSystem system;
@BeforeClass
@ -52,37 +54,38 @@ public class UntypedCoordinatedIncrementTest {
ActorRef failer;
int numCounters = 3;
int timeout = 5;
int askTimeout = 5000;
int timeoutSeconds = 5;
Timeout timeout = new Timeout(Duration.create(timeoutSeconds, TimeUnit.SECONDS));
@Before
public void initialise() {
Props p = new Props().withCreator(UntypedFailer.class);
counters = new ArrayList<ActorRef>();
for (int i = 1; i <= numCounters; i++) {
final String name = "counter" + i;
ActorRef counter = application.actorOf(new Props().withCreator(new UntypedActorFactory() {
ActorRef counter = system.actorOf(new Props().withCreator(new UntypedActorFactory() {
public UntypedActor create() {
return new UntypedCoordinatedCounter(name);
}
}));
counters.add(counter);
}
failer = application.actorOf(p);
failer = system.actorOf(new Props().withCreator(UntypedFailer.class));
}
@Test
public void incrementAllCountersWithSuccessfulTransaction() {
CountDownLatch incrementLatch = new CountDownLatch(numCounters);
Increment message = new Increment(counters.subList(1, counters.size()), incrementLatch);
counters.get(0).tell(new Coordinated(message));
counters.get(0).tell(new Coordinated(message, timeout));
try {
incrementLatch.await(timeout, TimeUnit.SECONDS);
incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException exception) {
}
for (ActorRef counter : counters) {
Future<Object> future = counter.ask("GetCount", askTimeout);
assertEquals(1, ((Integer) Await.result(future, Duration.create(timeout, TimeUnit.SECONDS))).intValue());
Future<Object> future = counter.ask("GetCount", timeout);
int count = (Integer) Await.result(future, timeout.duration());
assertEquals(1, count);
}
}
@ -91,28 +94,24 @@ public class UntypedCoordinatedIncrementTest {
EventFilter expectedFailureFilter = (EventFilter) new ErrorFilter(ExpectedFailureException.class);
EventFilter coordinatedFilter = (EventFilter) new ErrorFilter(CoordinatedTransactionException.class);
Seq<EventFilter> ignoreExceptions = seq(expectedFailureFilter, coordinatedFilter);
application.eventStream().publish(new TestEvent.Mute(ignoreExceptions));
system.eventStream().publish(new TestEvent.Mute(ignoreExceptions));
CountDownLatch incrementLatch = new CountDownLatch(numCounters);
List<ActorRef> actors = new ArrayList<ActorRef>(counters);
actors.add(failer);
Increment message = new Increment(actors.subList(1, actors.size()), incrementLatch);
actors.get(0).tell(new Coordinated(message));
actors.get(0).tell(new Coordinated(message, timeout));
try {
incrementLatch.await(timeout, TimeUnit.SECONDS);
incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException exception) {
}
for (ActorRef counter : counters) {
Future<Object>future = counter.ask("GetCount", askTimeout);
assertEquals(0,((Integer) Await.result(future, Duration.create(timeout, TimeUnit.SECONDS))).intValue());
Future<Object>future = counter.ask("GetCount", timeout);
int count = (Integer) Await.result(future, timeout.duration());
assertEquals(0, count);
}
}
public <A> Seq<A> seq(A... args) {
return JavaConverters.collectionAsScalaIterableConverter(Arrays.asList(args)).asScala().toSeq();
}
@After
public void stop() {
application.shutdown();
}
}

View file

@ -1,13 +1,18 @@
package akka.transactor.test;
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor;
import akka.actor.ActorRef;
import akka.transactor.UntypedTransactor;
import akka.transactor.SendTo;
import akka.actor.ActorRef;
import akka.stm.*;
import akka.util.FiniteDuration;
import org.multiverse.api.StmUtils;
import scala.Function1;
import scala.concurrent.stm.*;
import scala.reflect.*;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@ -15,21 +20,16 @@ import java.util.concurrent.TimeUnit;
public class UntypedCounter extends UntypedTransactor {
private String name;
private Ref<Integer> count = new Ref<Integer>(0);
private Manifest<Integer> manifest = Manifest$.MODULE$.classType(Integer.class);
private Ref<Integer> count = Ref$.MODULE$.apply(0, manifest);
public UntypedCounter(String name) {
this.name = name;
}
@Override public TransactionFactory transactionFactory() {
return new TransactionFactoryBuilder()
.setTimeout(new FiniteDuration(3, TimeUnit.SECONDS))
.build();
}
private void increment() {
//System.out.println(name + ": incrementing");
count.set(count.get() + 1);
private void increment(InTxn txn) {
Integer newValue = count.get(txn) + 1;
count.set(newValue, txn);
}
@Override public Set<SendTo> coordinate(Object message) {
@ -47,30 +47,22 @@ public class UntypedCounter extends UntypedTransactor {
}
}
@Override public void before(Object message) {
//System.out.println(name + ": before transaction");
}
public void atomically(Object message) {
public void atomically(InTxn txn, Object message) {
if (message instanceof Increment) {
increment();
increment(txn);
final Increment increment = (Increment) message;
StmUtils.scheduleDeferredTask(new Runnable() {
public void run() { increment.getLatch().countDown(); }
});
StmUtils.scheduleCompensatingTask(new Runnable() {
public void run() { increment.getLatch().countDown(); }
});
final Function1<Txn.Status, BoxedUnit> countDown = new AbstractFunction1<Txn.Status, BoxedUnit>() {
public BoxedUnit apply(Txn.Status status) {
increment.getLatch().countDown(); return null;
}
};
Txn.afterCompletion(countDown, txn);
}
}
@Override public void after(Object message) {
//System.out.println(name + ": after transaction");
}
@Override public boolean normally(Object message) {
if ("GetCount".equals(message)) {
getSender().tell(count.get());
getSender().tell(count.single().get());
return true;
} else return false;
}

View file

@ -0,0 +1,14 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor;
import akka.transactor.UntypedTransactor;
import scala.concurrent.stm.InTxn;
public class UntypedFailer extends UntypedTransactor {
public void atomically(InTxn txn, Object message) throws Exception {
throw new ExpectedFailureException();
}
}

View file

@ -1,9 +1,11 @@
package akka.transactor.test;
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor;
import static org.junit.Assert.*;
import akka.dispatch.Await;
import akka.util.Duration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -14,11 +16,15 @@ import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.dispatch.Await;
import akka.dispatch.Future;
import akka.testkit.AkkaSpec;
import akka.testkit.EventFilter;
import akka.testkit.ErrorFilter;
import akka.testkit.TestEvent;
import akka.transactor.CoordinatedTransactionException;
import akka.util.Duration;
import akka.util.Timeout;
import java.util.Arrays;
import java.util.ArrayList;
@ -28,7 +34,6 @@ import java.util.concurrent.TimeUnit;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import akka.testkit.AkkaSpec;
public class UntypedTransactorTest {
@ -49,8 +54,9 @@ public class UntypedTransactorTest {
ActorRef failer;
int numCounters = 3;
int timeout = 5;
int askTimeout = 5000;
int timeoutSeconds = 5;
Timeout timeout = new Timeout(Duration.create(timeoutSeconds, TimeUnit.SECONDS));
@Before
public void initialise() {
@ -73,12 +79,12 @@ public class UntypedTransactorTest {
Increment message = new Increment(counters.subList(1, counters.size()), incrementLatch);
counters.get(0).tell(message);
try {
incrementLatch.await(timeout, TimeUnit.SECONDS);
incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException exception) {
}
for (ActorRef counter : counters) {
Future<Object> future = counter.ask("GetCount", askTimeout);
int count = (Integer) Await.result(future, Duration.create(askTimeout, TimeUnit.MILLISECONDS));
Future<Object> future = counter.ask("GetCount", timeout);
int count = (Integer) Await.result(future, timeout.duration());
assertEquals(1, count);
}
}
@ -95,12 +101,12 @@ public class UntypedTransactorTest {
Increment message = new Increment(actors.subList(1, actors.size()), incrementLatch);
actors.get(0).tell(message);
try {
incrementLatch.await(timeout, TimeUnit.SECONDS);
incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException exception) {
}
for (ActorRef counter : counters) {
Future<Object> future = counter.ask("GetCount", askTimeout);
int count = (Integer) Await.result(future, Duration.create(askTimeout, TimeUnit.MILLISECONDS));
Future<Object> future = counter.ask("GetCount", timeout);
int count = (Integer) Await.result(future, timeout.duration());
assertEquals(0, count);
}
}

View file

@ -1,14 +1,17 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import org.scalatest.BeforeAndAfterAll
import akka.actor.ActorSystem
import akka.actor._
import akka.stm.{ Ref, TransactionFactory }
import akka.dispatch.Await
import akka.util.duration._
import akka.util.Timeout
import akka.testkit._
import akka.dispatch.Await
import scala.concurrent.stm._
object CoordinatedIncrement {
case class Increment(friends: Seq[ActorRef])
@ -17,34 +20,27 @@ object CoordinatedIncrement {
class Counter(name: String) extends Actor {
val count = Ref(0)
implicit val txFactory = TransactionFactory(timeout = 3 seconds)
def increment = {
count alter (_ + 1)
}
def receive = {
case coordinated @ Coordinated(Increment(friends)) {
if (friends.nonEmpty) {
friends.head ! coordinated(Increment(friends.tail))
}
coordinated atomic {
increment
coordinated.atomic { implicit t
count transform (_ + 1)
}
}
case GetCount sender ! count.get
case GetCount sender ! count.single.get
}
}
class ExpectedFailureException extends RuntimeException("Expected failure")
class Failer extends Actor {
val txFactory = TransactionFactory(timeout = 3 seconds)
def receive = {
case coordinated @ Coordinated(Increment(friends)) {
coordinated.atomic(txFactory) {
coordinated.atomic { t
throw new ExpectedFailureException
}
}

View file

@ -1,21 +1,23 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.BeforeAndAfterAll
import akka.actor.ActorSystem
import akka.actor._
import akka.util.Timeout
import akka.stm._
import akka.dispatch.Await
import akka.util.duration._
import akka.util.Timeout
import akka.testkit._
import akka.testkit.TestEvent.Mute
import scala.concurrent.stm._
import scala.util.Random.{ nextInt random }
import java.util.concurrent.CountDownLatch
import akka.testkit.TestEvent.Mute
import akka.dispatch.Await
object FickleFriends {
case class FriendlyIncrement(friends: Seq[ActorRef], latch: CountDownLatch)
case class FriendlyIncrement(friends: Seq[ActorRef], timeout: Timeout, latch: CountDownLatch)
case class Increment(friends: Seq[ActorRef])
case object GetCount
@ -25,24 +27,22 @@ object FickleFriends {
class Coordinator(name: String) extends Actor {
val count = Ref(0)
implicit val txFactory = TransactionFactory(timeout = 3 seconds)
def increment = {
count alter (_ + 1)
def increment(implicit txn: InTxn) = {
count transform (_ + 1)
}
def receive = {
case FriendlyIncrement(friends, latch) {
case FriendlyIncrement(friends, timeout, latch) {
var success = false
while (!success) {
try {
val coordinated = Coordinated()
val coordinated = Coordinated()(timeout)
if (friends.nonEmpty) {
friends.head ! coordinated(Increment(friends.tail))
}
coordinated atomic {
coordinated.atomic { implicit t
increment
deferred {
Txn.afterCommit { status
success = true
latch.countDown()
}
@ -53,7 +53,7 @@ object FickleFriends {
}
}
case GetCount sender ! count.get
case GetCount sender ! count.single.get
}
}
@ -65,14 +65,18 @@ object FickleFriends {
class FickleCounter(name: String) extends Actor {
val count = Ref(0)
implicit val txFactory = TransactionFactory(timeout = 3 seconds)
val maxFailures = 3
var failures = 0
def increment = {
count alter (_ + 1)
def increment(implicit txn: InTxn) = {
count transform (_ + 1)
}
def failIf(x: Int, y: Int) = {
if (x == y) throw new ExpectedFailureException("Random fail at position " + x)
if (x == y && failures < maxFailures) {
failures += 1
throw new ExpectedFailureException("Random fail at position " + x)
}
}
def receive = {
@ -83,14 +87,14 @@ object FickleFriends {
friends.head ! coordinated(Increment(friends.tail))
}
failIf(failAt, 1)
coordinated atomic {
coordinated.atomic { implicit t
failIf(failAt, 2)
increment
failIf(failAt, 3)
}
}
case GetCount sender ! count.get
case GetCount sender ! count.single.get
}
}
}
@ -119,7 +123,7 @@ class FickleFriendsSpec extends AkkaSpec with BeforeAndAfterAll {
system.eventStream.publish(Mute(ignoreExceptions))
val (counters, coordinator) = actorOfs
val latch = new CountDownLatch(1)
coordinator ! FriendlyIncrement(counters, latch)
coordinator ! FriendlyIncrement(counters, timeout, latch)
latch.await // this could take a while
Await.result(coordinator ? GetCount, timeout.duration) must be === 1
for (counter counters) {

View file

@ -1,7 +1,11 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import org.scalatest.junit.JUnitWrapperSuite
class JavaUntypedCoordinatedSpec extends JUnitWrapperSuite(
"akka.transactor.test.UntypedCoordinatedIncrementTest",
"akka.transactor.UntypedCoordinatedIncrementTest",
Thread.currentThread.getContextClassLoader)

View file

@ -1,7 +1,11 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import org.scalatest.junit.JUnitWrapperSuite
class JavaUntypedTransactorSpec extends JUnitWrapperSuite(
"akka.transactor.test.UntypedTransactorTest",
"akka.transactor.UntypedTransactorTest",
Thread.currentThread.getContextClassLoader)

View file

@ -1,15 +1,15 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.actor.ActorSystem
import akka.actor._
import akka.util.Timeout
import akka.stm._
import akka.util.duration._
import akka.testkit._
import akka.dispatch.Await
import akka.util.duration._
import akka.util.Timeout
import akka.testkit._
import scala.concurrent.stm._
object TransactorIncrement {
case class Increment(friends: Seq[ActorRef], latch: TestLatch)
@ -18,10 +18,8 @@ object TransactorIncrement {
class Counter(name: String) extends Transactor {
val count = Ref(0)
override def transactionFactory = TransactionFactory(timeout = 3 seconds)
def increment = {
count alter (_ + 1)
def increment(implicit txn: InTxn) = {
count transform (_ + 1)
}
override def coordinate = {
@ -35,11 +33,10 @@ object TransactorIncrement {
case i: Increment
}
def atomically = {
def atomically = implicit txn {
case Increment(friends, latch) {
increment
deferred { latch.countDown() }
compensating { latch.countDown() }
Txn.afterCompletion { status latch.countDown() }
}
}
@ -48,14 +45,14 @@ object TransactorIncrement {
}
override def normally = {
case GetCount sender ! count.get
case GetCount sender ! count.single.get
}
}
class ExpectedFailureException extends RuntimeException("Expected failure")
class Failer extends Transactor {
def atomically = {
def atomically = implicit txn {
case _ throw new ExpectedFailureException
}
}
@ -65,10 +62,10 @@ object SimpleTransactor {
case class Set(ref: Ref[Int], value: Int, latch: TestLatch)
class Setter extends Transactor {
def atomically = {
def atomically = implicit txn {
case Set(ref, value, latch) {
ref.set(value)
deferred { latch.countDown() }
ref() = value
Txn.afterCompletion { status latch.countDown() }
}
}
}
@ -129,7 +126,7 @@ class TransactorSpec extends AkkaSpec {
val latch = TestLatch(1)
transactor ! Set(ref, 5, latch)
latch.await
val value = atomic { ref.get }
val value = ref.single.get
value must be === 5
system.stop(transactor)
}

View file

@ -30,7 +30,7 @@ object AkkaBuild extends Build {
Unidoc.unidocExclude := Seq(samples.id, tutorials.id),
Dist.distExclude := Seq(actorTests.id, akkaSbtPlugin.id, docs.id)
),
aggregate = Seq(actor, testkit, actorTests, remote, slf4j, agent, mailboxes, kernel, akkaSbtPlugin, samples, tutorials, docs)
aggregate = Seq(actor, testkit, actorTests, remote, slf4j, agent, transactor, mailboxes, kernel, akkaSbtPlugin, samples, tutorials, docs)
)
lazy val actor = Project(
@ -103,6 +103,15 @@ object AkkaBuild extends Build {
)
)
lazy val transactor = Project(
id = "akka-transactor",
base = file("akka-transactor"),
dependencies = Seq(actor, testkit % "test->test"),
settings = defaultSettings ++ Seq(
libraryDependencies ++= Dependencies.transactor
)
)
// lazy val amqp = Project(
// id = "akka-amqp",
// base = file("akka-amqp"),
@ -265,7 +274,7 @@ object AkkaBuild extends Build {
lazy val docs = Project(
id = "akka-docs",
base = file("akka-docs"),
dependencies = Seq(actor, testkit % "test->test", remote, slf4j, agent, fileMailbox, mongoMailbox, redisMailbox, beanstalkMailbox, zookeeperMailbox),
dependencies = Seq(actor, testkit % "test->test", remote, slf4j, agent, transactor, fileMailbox, mongoMailbox, redisMailbox, beanstalkMailbox, zookeeperMailbox),
settings = defaultSettings ++ Seq(
unmanagedSourceDirectories in Test <<= baseDirectory { _ ** "code" get },
libraryDependencies ++= Dependencies.docs,
@ -380,6 +389,8 @@ object Dependencies {
val agent = Seq(scalaStm, Test.scalatest, Test.junit)
val transactor = Seq(scalaStm, Test.scalatest, Test.junit)
val amqp = Seq(rabbit, commonsIo, protobuf)
val mailboxes = Seq(Test.scalatest, Test.junit)