!tra #3920 Remove deprecated akka-transactor

This commit is contained in:
Patrik Nordwall 2014-03-12 11:10:55 +01:00
parent 9f906b2de9
commit 9cc586b674
19 changed files with 2 additions and 1300 deletions

View file

@ -25,4 +25,5 @@ Removed Deprecated Features
The following, previously deprecated, features have been removed:
* akka-dataflow
* akka-transactor

View file

@ -1,14 +0,0 @@
#########################################
# Akka Transactor Reference Config File #
#########################################
# This is the reference config file that contains all the default settings.
# Make your edits/overrides in your application.conf.
akka {
# akka.transactor is deprecated in 2.3
transactor {
# The timeout used for coordinated transactions across actors
coordinated-timeout = 5s
}
}

View file

@ -1,178 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import akka.AkkaException
import akka.util.Timeout
import scala.concurrent.stm.{ CommitBarrier, InTxn }
import java.util.concurrent.Callable
/**
* Akka-specific exception for coordinated transactions.
*/
@deprecated("akka.transactor will be removed", "2.3")
class CoordinatedTransactionException(message: String, cause: Throwable) extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null)
}
/**
* Coordinated transactions across actors.
*/
@deprecated("akka.transactor will be removed", "2.3")
object Coordinated {
/**
* Creates a new Coordinated with the given message and Timeout
* @param message - the message which will be coordinated
* @param timeout - the timeout for the coordination
* @return a new Coordinated
*/
def apply(message: Any = null)(implicit timeout: Timeout): Coordinated =
new Coordinated(message, CommitBarrier(timeout.duration.toMillis).addMember())
/**
* @param c - a Coordinated to be unapplied
* @return the message associated with the given Coordinated
*/
def unapply(c: Coordinated): Option[Any] = Some(c.message)
}
/**
* `Coordinated` is a message wrapper that adds a `CommitBarrier` for explicitly
* coordinating transactions across actors or threads.
*
* Creating a `Coordinated` will create a commit barrier with initially one member.
* For each member in the coordination set a transaction is expected to be created using
* the coordinated atomic method, or the coordination cancelled using the cancel method.
*
* The number of included members must match the number of transactions, otherwise a
* successful transaction cannot be coordinated.
* <br/><br/>
*
* To start a new coordinated transaction set that you will also participate in just create
* a `Coordinated` object:
*
* {{{
* val coordinated = Coordinated()
* }}}
* <br/>
*
* To start a coordinated transaction that you won't participate in yourself you can create a
* `Coordinated` object with a message and send it directly to an actor. The recipient of the message
* will be the first member of the coordination set:
*
* {{{
* actor ! Coordinated(Message)
* }}}
* <br/>
*
* To receive a coordinated message in an actor simply match it in a case statement:
*
* {{{
* def receive = {
* case coordinated @ Coordinated(Message) => ...
* }
* }}}
* <br/>
*
* To include another actor in the same coordinated transaction set that you've created or
* received, use the apply method on that object. This will increment the number of parties
* involved by one and create a new `Coordinated` object to be sent.
*
* {{{
* actor ! coordinated(Message)
* }}}
* <br/>
*
* To enter the coordinated transaction use the atomic method of the coordinated object:
*
* {{{
* coordinated.atomic { implicit txn =>
* // do something in transaction ...
* }
* }}}
*
* The coordinated transaction will wait for the other transactions before committing.
* If any of the coordinated transactions fail then they all fail.
*
* @see [[akka.transactor.Transactor]] for an actor that implements coordinated transactions
*/
@deprecated("akka.transactor will be removed", "2.3")
class Coordinated(val message: Any, member: CommitBarrier.Member) {
// Java API constructors
def this(message: Any, timeout: Timeout) = this(message, CommitBarrier(timeout.duration.toMillis).addMember())
def this(timeout: Timeout) = this(null, timeout)
/**
* Create a new Coordinated object and increment the number of members by one.
* Use this method to ''pass on'' the coordination.
*/
def apply(msg: Any): Coordinated = new Coordinated(msg, member.commitBarrier.addMember())
/**
* Create a new Coordinated object but *do not* increment the number of members by one.
* Only use this method if you know this is what you need.
*/
def noIncrement(msg: Any): Coordinated = new Coordinated(msg, member)
/**
* Java API: get the message for this Coordinated.
*/
def getMessage(): Any = message
/**
* Java API: create a new Coordinated object and increment the number of members by one.
* Use this method to ''pass on'' the coordination.
*/
def coordinate(msg: Any): Coordinated = apply(msg)
/**
* Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified when creating the Coordinated.
*
* @throws CoordinatedTransactionException if the coordinated transaction fails.
*/
def atomic[A](body: InTxn A): A = {
member.atomic(body) match {
case Right(result) result
case Left(CommitBarrier.MemberUncaughtExceptionCause(x))
throw new CoordinatedTransactionException("Exception in coordinated atomic", x)
case Left(cause)
throw new CoordinatedTransactionException("Failed due to " + cause)
}
}
/**
* Java API: coordinated atomic method that accepts a `java.lang.Runnable`.
* Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified when creating the Coordinated.
*
* @throws CoordinatedTransactionException if the coordinated transaction fails.
*/
def atomic(runnable: Runnable): Unit = atomic { _ runnable.run }
/**
* Java API: coordinated atomic method that accepts a `java.util.concurrent.Callable`.
* Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified when creating the Coordinated.
*
* @throws CoordinatedTransactionException if the coordinated transaction fails.
*/
def atomic[A](callable: Callable[A]): A = atomic { _ callable.call }
/**
* An empty coordinated atomic block. Can be used to complete the number of members involved
* and wait for all transactions to complete.
*/
def await(): Unit = atomic(txn ())
/**
* Cancel this Coordinated transaction.
*/
def cancel(info: Any): Unit = member.cancel(CommitBarrier.UserCancel(info))
}

View file

@ -1,192 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import language.postfixOps
import akka.actor.{ Actor, ActorRef }
import scala.concurrent.stm.InTxn
/**
* Used for specifying actor refs and messages to send to during coordination.
*/
@deprecated("akka.transactor will be removed", "2.3")
final case class SendTo(actor: ActorRef, message: Option[Any] = None)
/**
* An actor with built-in support for coordinated transactions.
*
* Transactors implement the general pattern for using [[akka.transactor.Coordinated]] where
* coordination messages are sent to other transactors then the coordinated transaction is
* entered. Transactors can also accept explicitly sent `Coordinated` messages.
* <br/><br/>
*
* Simple transactors will just implement the `atomically` method which is similar to
* the actor `receive` method but runs within a coordinated transaction.
*
* Example of a simple transactor that will join a coordinated transaction:
*
* {{{
* class Counter extends Transactor {
* val count = Ref(0)
*
* def atomically = implicit txn => {
* case Increment => count transform (_ + 1)
* }
* }
* }}}
* <br/>
*
* To coordinate with other transactors override the `coordinate` method.
* The `coordinate` method maps a message to a set of [[akka.transactor.SendTo]]
* objects, pairs of `ActorRef` and a message.
* You can use the `include` and `sendTo` methods to easily coordinate with other transactors.
* The `include` method will send on the same message that was received to other transactors.
* The `sendTo` method allows you to specify both the actor to send to, and message to send.
*
* Example of coordinating an increment:
*
* {{{
* class FriendlyCounter(friend: ActorRef) extends Transactor {
* val count = Ref(0)
*
* override def coordinate = {
* case Increment => include(friend)
* }
*
* def atomically = implicit txn => {
* case Increment => count transform (_ + 1)
* }
* }
* }}}
* <br/>
*
* Using `include` to include more than one transactor:
*
* {{{
* override def coordinate = {
* case Message => include(actor1, actor2, actor3)
* }
* }}}
* <br/>
*
* Using `sendTo` to coordinate transactions but send on a different message
* than the one that was received:
*
* {{{
* override def coordinate = {
* case Message => sendTo(someActor -> SomeOtherMessage)
* case SomeMessage => sendTo(actor1 -> Message1, actor2 -> Message2)
* }
* }}}
* <br/>
*
* To execute directly before or after the coordinated transaction, override
* the `before` and `after` methods. These methods also expect partial functions
* like the receive method. They do not execute within the transaction.
*
* To completely bypass coordinated transactions override the `normally` method.
* Any message matched by `normally` will not be matched by the other methods,
* and will not be involved in coordinated transactions. In this method you
* can implement normal actor behavior, or use the normal STM atomic for
* local transactions.
*
* @see [[akka.transactor.Coordinated]]
*/
@deprecated("akka.transactor will be removed", "2.3")
trait Transactor extends Actor {
private val settings = TransactorExtension(context.system)
/**
* Implement a general pattern for using coordinated transactions.
*/
final def receive = {
case coordinated @ Coordinated(message) {
val others = (coordinate orElse alone)(message)
for (sendTo others) {
sendTo.actor ! coordinated(sendTo.message.getOrElse(message))
}
(before orElse doNothing)(message)
coordinated.atomic { txn (atomically(txn) orElse doNothing)(message) }
(after orElse doNothing)(message)
}
case message {
if (normally.isDefinedAt(message)) normally(message)
else receive(Coordinated(message)(settings.CoordinatedTimeout))
}
}
/**
* Override this method to coordinate with other transactors.
* The other transactors are added to the coordinated transaction barrier
* and sent a Coordinated message. The message to send can be specified
* or otherwise the same message as received is sent. Use the 'include' and
* 'sendTo' methods to easily create the set of transactors to be involved.
*/
def coordinate: PartialFunction[Any, Set[SendTo]] = alone
/**
* Default coordination - no other transactors.
*/
def alone: PartialFunction[Any, Set[SendTo]] = { case _ nobody }
/**
* Empty set of transactors to send to.
*/
def nobody: Set[SendTo] = Set.empty
/**
* Include other actors in this coordinated transaction and send
* them the same message as received. Use as the result in 'coordinated'.
*/
def include(actors: ActorRef*): Set[SendTo] = actors map (SendTo(_)) toSet
/**
* Include other actors in this coordinated transaction and specify the message
* to send by providing ActorRef -> Message pairs. Use as the result in 'coordinated'.
*/
def sendTo(pairs: (ActorRef, Any)*): Set[SendTo] = pairs map (p SendTo(p._1, Some(p._2))) toSet
/**
* A Receive block that runs before the coordinated transaction is entered.
*/
def before: Receive = doNothing
/**
* The Receive block to run inside the coordinated transaction.
* This is a function from InTxn to Receive block.
*
* For example:
* {{{
* def atomically = implicit txn => {
* case Increment => count transform (_ + 1)
* }
* }}}
*/
def atomically: InTxn Receive
/**
* A Receive block that runs after the coordinated transaction.
*/
def after: Receive = doNothing
/**
* Bypass transactionality and behave like a normal actor.
*/
def normally: Receive = doNothing
/**
* Default catch-all for the different Receive methods.
*/
def doNothing: Receive = EmptyReceive
}
/**
* INTERNAL API
*/
private[akka] object EmptyReceive extends PartialFunction[Any, Unit] {
def apply(any: Any): Unit = ()
def isDefinedAt(any: Any): Boolean = false
}

View file

@ -1,27 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import akka.actor.{ ActorSystem, ExtensionId, ExtensionIdProvider, ExtendedActorSystem }
import akka.actor.Extension
import com.typesafe.config.Config
import akka.util.Timeout
import scala.concurrent.duration.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
/**
* TransactorExtension is an Akka Extension to hold settings for transactors.
*/
@deprecated("akka.transactor will be removed", "2.3")
object TransactorExtension extends ExtensionId[TransactorSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): TransactorSettings = super.get(system)
override def lookup: TransactorExtension.type = TransactorExtension
override def createExtension(system: ExtendedActorSystem): TransactorSettings = new TransactorSettings(system.settings.config)
}
@deprecated("akka.transactor will be removed", "2.3")
class TransactorSettings(val config: Config) extends Extension {
import config._
val CoordinatedTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.transactor.coordinated-timeout"), MILLISECONDS))
}

View file

@ -1,103 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import akka.actor.{ UntypedActor, ActorRef }
import java.util.{ Set JSet }
import java.util.Collections.{ emptySet, singleton singletonSet }
/**
* An UntypedActor version of transactor for using from Java.
*/
@deprecated("akka.transactor will be removed", "2.3")
abstract class UntypedTransactor extends UntypedActor {
import scala.collection.JavaConverters.asScalaSetConverter
private val settings = TransactorExtension(context.system)
/**
* Implement a general pattern for using coordinated transactions.
*/
@throws(classOf[Exception])
final def onReceive(message: Any) {
message match {
case coordinated @ Coordinated(message) {
for (sendTo coordinate(message).asScala) {
sendTo.actor ! coordinated(sendTo.message.getOrElse(message))
}
before(message)
coordinated.atomic { txn atomically(message) }
after(message)
}
case message {
val normal = normally(message)
if (!normal) onReceive(Coordinated(message)(settings.CoordinatedTimeout))
}
}
}
/**
* Override this method to coordinate with other transactors.
* The other transactors are added to the coordinated transaction barrier
* and sent a Coordinated message. The message to send can be specified
* or otherwise the same message as received is sent. Use the 'include' and
* 'sendTo' methods to easily create the set of transactors to be involved.
*/
@throws(classOf[Exception])
def coordinate(message: Any): JSet[SendTo] = nobody
/**
* Empty set of transactors to send to.
*/
def nobody: JSet[SendTo] = emptySet()
/**
* For including one other actor in this coordinated transaction and sending
* them the same message as received. Use as the result in `coordinated`.
*/
def include(actor: ActorRef): JSet[SendTo] = singletonSet(SendTo(actor))
/**
* For including one other actor in this coordinated transaction and specifying the
* message to send. Use as the result in `coordinated`.
*/
def include(actor: ActorRef, message: Any): JSet[SendTo] = singletonSet(SendTo(actor, Some(message)))
/**
* For including another actor in this coordinated transaction and sending
* them the same message as received. Use to create the result in `coordinated`.
*/
def sendTo(actor: ActorRef): SendTo = SendTo(actor)
/**
* For including another actor in this coordinated transaction and specifying the
* message to send. Use to create the result in `coordinated`.
*/
def sendTo(actor: ActorRef, message: Any): SendTo = SendTo(actor, Some(message))
/**
* A Receive block that runs before the coordinated transaction is entered.
*/
@throws(classOf[Exception])
def before(message: Any) {}
/**
* The Receive block to run inside the coordinated transaction.
*/
@throws(classOf[Exception])
def atomically(message: Any)
/**
* A Receive block that runs after the coordinated transaction.
*/
@throws(classOf[Exception])
def after(message: Any) {}
/**
* Bypass transactionality and behave like a normal actor.
*/
@throws(classOf[Exception])
def normally(message: Any): Boolean = false
}

View file

@ -1,11 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor;
public class ExpectedFailureException extends RuntimeException {
public ExpectedFailureException() {
super("Expected failure");
}
}

View file

@ -1,27 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor;
import akka.actor.ActorRef;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class Increment {
private List<ActorRef> friends;
private CountDownLatch latch;
public Increment(List<ActorRef> friends, CountDownLatch latch) {
this.friends = friends;
this.latch = latch;
}
public List<ActorRef> getFriends() {
return friends;
}
public CountDownLatch getLatch() {
return latch;
}
}

View file

@ -1,51 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.STM;
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
public class UntypedCoordinatedCounter extends UntypedActor {
private String name;
private Ref.View<Integer> count = STM.newRef(0);
public UntypedCoordinatedCounter(String name) {
this.name = name;
}
public void onReceive(Object incoming) throws Exception {
if (incoming instanceof Coordinated) {
Coordinated coordinated = (Coordinated) incoming;
Object message = coordinated.getMessage();
if (message instanceof Increment) {
Increment increment = (Increment) message;
List<ActorRef> friends = increment.getFriends();
final CountDownLatch latch = increment.getLatch();
final Runnable countDown = new Runnable() {
public void run() {
latch.countDown();
}
};
if (!friends.isEmpty()) {
Increment coordMessage = new Increment(friends.subList(1, friends.size()), latch);
friends.get(0).tell(coordinated.coordinate(coordMessage), getSelf());
}
coordinated.atomic(new Runnable() {
public void run() {
STM.increment(count, 1);
STM.afterCompletion(countDown);
}
});
}
} else if ("GetCount".equals(incoming)) {
getSender().tell(count.get(), getSelf());
}
}
}

View file

@ -1,98 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor;
import static org.junit.Assert.*;
import akka.testkit.*;
import org.junit.*;
import akka.actor.ActorSystem;
import akka.actor.ActorRef;
import akka.actor.Props;
import scala.concurrent.Await;
import scala.concurrent.Future;
import static akka.pattern.Patterns.ask;
import akka.util.Timeout;
import static akka.japi.Util.immutableSeq;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import scala.collection.JavaConverters;
import scala.collection.immutable.Seq;
public class UntypedCoordinatedIncrementTest {
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("UntypedCoordinatedIncrementTest", AkkaSpec.testConf());
private final ActorSystem system = actorSystemResource.getSystem();
List<ActorRef> counters;
ActorRef failer;
int numCounters = 3;
int timeoutSeconds = 5;
Timeout timeout = new Timeout(timeoutSeconds, TimeUnit.SECONDS);
@Before
public void initialize() {
counters = new ArrayList<ActorRef>();
for (int i = 1; i <= numCounters; i++) {
final String name = "counter" + i;
ActorRef counter = system.actorOf(Props.create(UntypedCoordinatedCounter.class, name));
counters.add(counter);
}
failer = system.actorOf(Props.create(UntypedFailer.class));
}
@Test
public void incrementAllCountersWithSuccessfulTransaction() throws Exception {
CountDownLatch incrementLatch = new CountDownLatch(numCounters);
Increment message = new Increment(counters.subList(1, counters.size()), incrementLatch);
counters.get(0).tell(new Coordinated(message, timeout), null);
try {
incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException exception) {
}
for (ActorRef counter : counters) {
Future<Object> future = ask(counter, "GetCount", timeout);
int count = (Integer) Await.result(future, timeout.duration());
assertEquals(1, count);
}
}
@Test
public void incrementNoCountersWithFailingTransaction() throws Exception {
EventFilter expectedFailureFilter = (EventFilter) new ErrorFilter(ExpectedFailureException.class);
EventFilter coordinatedFilter = (EventFilter) new ErrorFilter(CoordinatedTransactionException.class);
Seq<EventFilter> ignoreExceptions = seq(expectedFailureFilter, coordinatedFilter);
system.eventStream().publish(new TestEvent.Mute(ignoreExceptions));
CountDownLatch incrementLatch = new CountDownLatch(numCounters);
List<ActorRef> actors = new ArrayList<ActorRef>(counters);
actors.add(failer);
Increment message = new Increment(actors.subList(1, actors.size()), incrementLatch);
actors.get(0).tell(new Coordinated(message, timeout), null);
try {
incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException exception) {
}
for (ActorRef counter : counters) {
Future<Object>future = ask(counter, "GetCount", timeout);
int count = (Integer) Await.result(future, timeout.duration());
assertEquals(0, count);
}
}
public <A> Seq<A> seq(A... args) {
return immutableSeq(args);
}
}

View file

@ -1,56 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor;
import java.util.List;
import java.util.Set;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.STM;
import akka.actor.ActorRef;
public class UntypedCounter extends UntypedTransactor {
private String name;
private Ref.View<Integer> count = STM.newRef(0);
public UntypedCounter(String name) {
this.name = name;
}
@Override public Set<SendTo> coordinate(Object message) {
if (message instanceof Increment) {
Increment increment = (Increment) message;
List<ActorRef> friends = increment.getFriends();
if (!friends.isEmpty()) {
Increment coordMessage = new Increment(friends.subList(1, friends.size()), increment.getLatch());
return include(friends.get(0), coordMessage);
} else {
return nobody();
}
} else {
return nobody();
}
}
public void atomically(Object message) {
if (message instanceof Increment) {
STM.increment(count, 1);
final Increment increment = (Increment) message;
Runnable countDown = new Runnable() {
public void run() {
increment.getLatch().countDown();
}
};
STM.afterCompletion(countDown);
}
}
@Override public boolean normally(Object message) {
if ("GetCount".equals(message)) {
getSender().tell(count.get(), getSelf());
return true;
} else return false;
}
}

View file

@ -1,11 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor;
public class UntypedFailer extends UntypedTransactor {
public void atomically(Object message) throws Exception {
throw new ExpectedFailureException();
}
}

View file

@ -1,103 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor;
import static org.junit.Assert.*;
import akka.testkit.*;
import org.junit.*;
import akka.actor.ActorSystem;
import akka.actor.ActorRef;
import akka.actor.Props;
import scala.concurrent.Await;
import scala.concurrent.Future;
import static akka.pattern.Patterns.ask;
import akka.util.Timeout;
import static akka.japi.Util.immutableSeq;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import scala.collection.JavaConverters;
import scala.collection.immutable.Seq;
public class UntypedTransactorTest {
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("UntypedTransactorTest", AkkaSpec.testConf());
private final ActorSystem system = actorSystemResource.getSystem();
List<ActorRef> counters;
ActorRef failer;
int numCounters = 3;
int timeoutSeconds = 5;
Timeout timeout = new Timeout(timeoutSeconds, TimeUnit.SECONDS);
@Before
public void initialize() {
counters = new ArrayList<ActorRef>();
for (int i = 1; i <= numCounters; i++) {
final String name = "counter" + i;
ActorRef counter = system.actorOf(Props.create(UntypedCounter.class, name));
counters.add(counter);
}
failer = system.actorOf(Props.create(UntypedFailer.class));
}
@Test
public void incrementAllCountersWithSuccessfulTransaction() throws Exception {
CountDownLatch incrementLatch = new CountDownLatch(numCounters);
Increment message = new Increment(counters.subList(1, counters.size()),
incrementLatch);
counters.get(0).tell(message, null);
try {
incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException exception) {
}
for (ActorRef counter : counters) {
Future<Object> future = ask(counter, "GetCount", timeout);
int count = (Integer) Await.result(future, timeout.duration());
assertEquals(1, count);
}
}
@Test
public void incrementNoCountersWithFailingTransaction() throws Exception {
EventFilter expectedFailureFilter = (EventFilter) new ErrorFilter(
ExpectedFailureException.class);
EventFilter coordinatedFilter = (EventFilter) new ErrorFilter(
CoordinatedTransactionException.class);
Seq<EventFilter> ignoreExceptions = seq(expectedFailureFilter,
coordinatedFilter);
system.eventStream().publish(new TestEvent.Mute(ignoreExceptions));
CountDownLatch incrementLatch = new CountDownLatch(numCounters);
List<ActorRef> actors = new ArrayList<ActorRef>(counters);
actors.add(failer);
Increment message = new Increment(actors.subList(1, actors.size()),
incrementLatch);
actors.get(0).tell(message, null);
try {
incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException exception) {
}
for (ActorRef counter : counters) {
Future<Object> future = ask(counter, "GetCount", timeout);
int count = (Integer) Await.result(future, timeout.duration());
assertEquals(0, count);
}
}
public <A> Seq<A> seq(A... args) {
return immutableSeq(args);
}
}

View file

@ -1,113 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import org.scalatest.BeforeAndAfterAll
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.stm._
import scala.collection.immutable
import akka.actor._
import akka.util.Timeout
import akka.testkit._
import akka.pattern.{ AskTimeoutException, ask }
object CoordinatedIncrement {
val config = """
akka {
actor {
default-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 5
core-pool-size-max = 16
}
}
}
}
"""
final case class Increment(friends: immutable.Seq[ActorRef])
case object GetCount
class Counter(name: String) extends Actor {
val count = Ref(0)
def receive = {
case coordinated @ Coordinated(Increment(friends)) {
if (friends.nonEmpty) {
friends.head ! coordinated(Increment(friends.tail))
}
coordinated.atomic { implicit t
count transform (_ + 1)
}
}
case GetCount sender ! count.single.get
}
}
class ExpectedFailureException extends RuntimeException("Expected failure")
class Failer extends Actor {
def receive = {
case coordinated @ Coordinated(Increment(friends)) {
coordinated.atomic { t
throw new ExpectedFailureException
}
}
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class CoordinatedIncrementSpec extends AkkaSpec(CoordinatedIncrement.config) with BeforeAndAfterAll {
import CoordinatedIncrement._
val numCounters = 4
def actorOfs = {
def createCounter(i: Int) = system.actorOf(Props(classOf[Counter], "counter" + i))
val counters = (1 to numCounters) map createCounter
val failer = system.actorOf(Props[Failer])
(counters, failer)
}
"Coordinated increment" should {
implicit val timeout = Timeout(2.seconds.dilated)
"increment all counters by one with successful transactions" in {
val (counters, failer) = actorOfs
val coordinated = Coordinated()
counters(0) ! coordinated(Increment(counters.tail))
coordinated.await
for (counter counters) {
Await.result((counter ? GetCount).mapTo[Int], timeout.duration) should be(1)
}
counters foreach (system.stop(_))
system.stop(failer)
}
"increment no counters with a failing transaction" in {
val ignoreExceptions = Seq(
EventFilter[ExpectedFailureException](),
EventFilter[CoordinatedTransactionException](),
EventFilter[AskTimeoutException]())
filterEvents(ignoreExceptions) {
val (counters, failer) = actorOfs
val coordinated = Coordinated()
counters(0) ! Coordinated(Increment(counters.tail :+ failer))
coordinated.await
for (counter counters) {
Await.result(counter ? GetCount, timeout.duration) should be(0)
}
counters foreach (system.stop(_))
system.stop(failer)
}
}
}
}

View file

@ -1,141 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import language.postfixOps
import org.scalatest.BeforeAndAfterAll
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.stm._
import scala.collection.immutable
import scala.util.Random.{ nextInt random }
import scala.util.control.NonFatal
import akka.actor._
import akka.testkit._
import akka.testkit.TestEvent.Mute
import java.util.concurrent.CountDownLatch
import akka.pattern.{ AskTimeoutException, ask }
import akka.util.Timeout
object FickleFriends {
final case class FriendlyIncrement(friends: immutable.Seq[ActorRef], timeout: Timeout, latch: CountDownLatch)
final case class Increment(friends: immutable.Seq[ActorRef])
case object GetCount
/**
* Coordinator will keep trying to coordinate an increment until successful.
*/
class Coordinator(name: String) extends Actor {
val count = Ref(0)
def increment(implicit txn: InTxn) = {
count transform (_ + 1)
}
def receive = {
case FriendlyIncrement(friends, timeout, latch) {
var success = false
while (!success) {
try {
val coordinated = Coordinated()(timeout)
if (friends.nonEmpty) {
friends.head ! coordinated(Increment(friends.tail))
}
coordinated.atomic { implicit t
increment
Txn.afterCommit { status
success = true
latch.countDown()
}
}
} catch {
case NonFatal(_) () // swallow exceptions
}
}
}
case GetCount sender ! count.single.get
}
}
class ExpectedFailureException(message: String) extends RuntimeException(message)
/**
* FickleCounter randomly fails at different points with 50% chance of failing overall.
*/
class FickleCounter(name: String) extends Actor {
val count = Ref(0)
val maxFailures = 3
var failures = 0
def increment(implicit txn: InTxn) = {
count transform (_ + 1)
}
def failIf(x: Int, y: Int) = {
if (x == y && failures < maxFailures) {
failures += 1
throw new ExpectedFailureException("Random fail at position " + x)
}
}
def receive = {
case coordinated @ Coordinated(Increment(friends)) {
val failAt = random(8)
failIf(failAt, 0)
if (friends.nonEmpty) {
friends.head ! coordinated(Increment(friends.tail))
}
failIf(failAt, 1)
coordinated.atomic { implicit t
failIf(failAt, 2)
increment
failIf(failAt, 3)
}
}
case GetCount sender ! count.single.get
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FickleFriendsSpec extends AkkaSpec with BeforeAndAfterAll {
import FickleFriends._
implicit val timeout = Timeout(5.seconds.dilated)
val numCounters = 2
def actorOfs = {
def createCounter(i: Int) = system.actorOf(Props(classOf[FickleCounter], "counter" + i))
val counters = (1 to numCounters) map createCounter
val coordinator = system.actorOf(Props(classOf[Coordinator], "coordinator"))
(counters, coordinator)
}
"Coordinated fickle friends" should {
"eventually succeed to increment all counters by one" in {
val ignoreExceptions = immutable.Seq(
EventFilter[ExpectedFailureException](),
EventFilter[CoordinatedTransactionException](),
EventFilter[AskTimeoutException]())
system.eventStream.publish(Mute(ignoreExceptions))
val (counters, coordinator) = actorOfs
val latch = new CountDownLatch(1)
coordinator ! FriendlyIncrement(counters, timeout, latch)
latch.await // this could take a while
Await.result(coordinator ? GetCount, timeout.duration) should be(1)
for (counter counters) {
Await.result(counter ? GetCount, timeout.duration) should be(1)
}
counters foreach (system.stop(_))
system.stop(coordinator)
}
}
}

View file

@ -1,11 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import org.scalatest.junit.JUnitWrapperSuite
class JavaUntypedCoordinatedSpec extends JUnitWrapperSuite(
"akka.transactor.UntypedCoordinatedIncrementTest",
Thread.currentThread.getContextClassLoader)

View file

@ -1,11 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import org.scalatest.junit.JUnitWrapperSuite
class JavaUntypedTransactorSpec extends JUnitWrapperSuite(
"akka.transactor.UntypedTransactorTest",
Thread.currentThread.getContextClassLoader)

View file

@ -1,138 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import language.postfixOps
import akka.actor._
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.stm._
import akka.util.Timeout
import akka.testkit._
import akka.pattern.{ AskTimeoutException, ask }
object TransactorIncrement {
final case class Increment(friends: immutable.Seq[ActorRef], latch: TestLatch)
case object GetCount
class Counter(name: String) extends Transactor {
val count = Ref(0)
def increment(implicit txn: InTxn) = {
count transform (_ + 1)
}
override def coordinate = {
case Increment(friends, latch) {
if (friends.nonEmpty) sendTo(friends.head -> Increment(friends.tail, latch))
else nobody
}
}
override def before = {
case i: Increment
}
def atomically = implicit txn {
case Increment(friends, latch) {
increment
Txn.afterCompletion { status latch.countDown() }
}
}
override def after = {
case i: Increment
}
override def normally = {
case GetCount sender ! count.single.get
}
}
class ExpectedFailureException extends RuntimeException("Expected failure")
class Failer extends Transactor {
def atomically = implicit txn {
case _ throw new ExpectedFailureException
}
}
}
object SimpleTransactor {
final case class Set(ref: Ref[Int], value: Int, latch: TestLatch)
class Setter extends Transactor {
def atomically = implicit txn {
case Set(ref, value, latch) {
ref() = value
Txn.afterCompletion { status latch.countDown() }
}
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TransactorSpec extends AkkaSpec {
import TransactorIncrement._
import SimpleTransactor._
implicit val timeout = Timeout(5.seconds.dilated)
val numCounters = 3
def createTransactors = {
def createCounter(i: Int) = system.actorOf(Props(classOf[Counter], "counter" + i))
val counters = (1 to numCounters) map createCounter
val failer = system.actorOf(Props[Failer])
(counters, failer)
}
"Transactor increment" should {
"increment all counters by one with successful transactions" in {
val (counters, failer) = createTransactors
val incrementLatch = TestLatch(numCounters)
counters(0) ! Increment(counters.tail, incrementLatch)
Await.ready(incrementLatch, 5 seconds)
for (counter counters) {
Await.result(counter ? GetCount, timeout.duration) should be(1)
}
counters foreach (system.stop(_))
system.stop(failer)
}
"increment no counters with a failing transaction" in {
val ignoreExceptions = Seq(
EventFilter[ExpectedFailureException](),
EventFilter[CoordinatedTransactionException](),
EventFilter[AskTimeoutException]())
filterEvents(ignoreExceptions) {
val (counters, failer) = createTransactors
val failLatch = TestLatch(numCounters)
counters(0) ! Increment(counters.tail :+ failer, failLatch)
Await.ready(failLatch, 5 seconds)
for (counter counters) {
Await.result(counter ? GetCount, timeout.duration) should be(0)
}
counters foreach (system.stop(_))
system.stop(failer)
}
}
}
"Transactor" should {
"be usable without overriding normally" in {
val transactor = system.actorOf(Props[Setter])
val ref = Ref(0)
val latch = TestLatch(1)
transactor ! Set(ref, 5, latch)
Await.ready(latch, 5 seconds)
val value = ref.single.get
value should be(5)
system.stop(transactor)
}
}
}

View file

@ -75,7 +75,7 @@ object AkkaBuild extends Build {
// add reportBinaryIssues to validatePullRequest on minor version maintenance branch
validatePullRequest <<= (Unidoc.unidoc, SphinxSupport.generate in Sphinx in docs) map { (_, _) => }
),
aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, agent, transactor,
aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, agent,
persistence, mailboxes, zeroMQ, kernel, osgi, docs, contrib, samples, multiNodeTestkit)
)
@ -252,16 +252,6 @@ object AkkaBuild extends Build {
)
)
lazy val transactor = Project(
id = "akka-transactor",
base = file("akka-transactor"),
dependencies = Seq(actor, testkit % "test->test"),
settings = defaultSettings ++ formatSettings ++ scaladocSettings ++ javadocSettings ++ OSGi.transactor ++ Seq(
libraryDependencies ++= Dependencies.transactor,
previousArtifact := akkaPreviousArtifact("akka-transactor")
)
)
lazy val persistence = Project(
id = "akka-persistence-experimental",
base = file("akka-persistence"),
@ -1029,8 +1019,6 @@ object AkkaBuild extends Build {
val slf4j = exports(Seq("akka.event.slf4j.*"))
val transactor = exports(Seq("akka.transactor.*"))
val persistence = exports(Seq("akka.persistence.*"), imports = Seq(protobufImport()))
val testkit = exports(Seq("akka.testkit.*"))
@ -1151,8 +1139,6 @@ object Dependencies {
val agent = Seq(scalaStm, Test.scalatest, Test.junit)
val transactor = Seq(scalaStm, Test.scalatest, Test.junit)
val persistence = Seq(levelDB, levelDBNative, protobuf, Test.scalatest, Test.junit, Test.commonsIo)
val mailboxes = Seq(Test.scalatest, Test.junit)