diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index a140d7cd9f..c880fb8f84 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -25,4 +25,5 @@ Removed Deprecated Features The following, previously deprecated, features have been removed: * akka-dataflow +* akka-transactor diff --git a/akka-transactor/src/main/resources/reference.conf b/akka-transactor/src/main/resources/reference.conf deleted file mode 100644 index 02f9aef270..0000000000 --- a/akka-transactor/src/main/resources/reference.conf +++ /dev/null @@ -1,14 +0,0 @@ -######################################### -# Akka Transactor Reference Config File # -######################################### - -# This is the reference config file that contains all the default settings. -# Make your edits/overrides in your application.conf. - -akka { - # akka.transactor is deprecated in 2.3 - transactor { - # The timeout used for coordinated transactions across actors - coordinated-timeout = 5s - } -} diff --git a/akka-transactor/src/main/scala/akka/transactor/Coordinated.scala b/akka-transactor/src/main/scala/akka/transactor/Coordinated.scala deleted file mode 100644 index 7d1c7ba3e5..0000000000 --- a/akka-transactor/src/main/scala/akka/transactor/Coordinated.scala +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.transactor - -import akka.AkkaException -import akka.util.Timeout -import scala.concurrent.stm.{ CommitBarrier, InTxn } -import java.util.concurrent.Callable - -/** - * Akka-specific exception for coordinated transactions. - */ -@deprecated("akka.transactor will be removed", "2.3") -class CoordinatedTransactionException(message: String, cause: Throwable) extends AkkaException(message, cause) { - def this(msg: String) = this(msg, null) -} - -/** - * Coordinated transactions across actors. - */ -@deprecated("akka.transactor will be removed", "2.3") -object Coordinated { - - /** - * Creates a new Coordinated with the given message and Timeout - * @param message - the message which will be coordinated - * @param timeout - the timeout for the coordination - * @return a new Coordinated - */ - def apply(message: Any = null)(implicit timeout: Timeout): Coordinated = - new Coordinated(message, CommitBarrier(timeout.duration.toMillis).addMember()) - - /** - * @param c - a Coordinated to be unapplied - * @return the message associated with the given Coordinated - */ - def unapply(c: Coordinated): Option[Any] = Some(c.message) -} - -/** - * `Coordinated` is a message wrapper that adds a `CommitBarrier` for explicitly - * coordinating transactions across actors or threads. - * - * Creating a `Coordinated` will create a commit barrier with initially one member. - * For each member in the coordination set a transaction is expected to be created using - * the coordinated atomic method, or the coordination cancelled using the cancel method. - * - * The number of included members must match the number of transactions, otherwise a - * successful transaction cannot be coordinated. - *

- * - * To start a new coordinated transaction set that you will also participate in just create - * a `Coordinated` object: - * - * {{{ - * val coordinated = Coordinated() - * }}} - *
- * - * To start a coordinated transaction that you won't participate in yourself you can create a - * `Coordinated` object with a message and send it directly to an actor. The recipient of the message - * will be the first member of the coordination set: - * - * {{{ - * actor ! Coordinated(Message) - * }}} - *
- * - * To receive a coordinated message in an actor simply match it in a case statement: - * - * {{{ - * def receive = { - * case coordinated @ Coordinated(Message) => ... - * } - * }}} - *
- * - * To include another actor in the same coordinated transaction set that you've created or - * received, use the apply method on that object. This will increment the number of parties - * involved by one and create a new `Coordinated` object to be sent. - * - * {{{ - * actor ! coordinated(Message) - * }}} - *
- * - * To enter the coordinated transaction use the atomic method of the coordinated object: - * - * {{{ - * coordinated.atomic { implicit txn => - * // do something in transaction ... - * } - * }}} - * - * The coordinated transaction will wait for the other transactions before committing. - * If any of the coordinated transactions fail then they all fail. - * - * @see [[akka.transactor.Transactor]] for an actor that implements coordinated transactions - */ -@deprecated("akka.transactor will be removed", "2.3") -class Coordinated(val message: Any, member: CommitBarrier.Member) { - - // Java API constructors - - def this(message: Any, timeout: Timeout) = this(message, CommitBarrier(timeout.duration.toMillis).addMember()) - - def this(timeout: Timeout) = this(null, timeout) - - /** - * Create a new Coordinated object and increment the number of members by one. - * Use this method to ''pass on'' the coordination. - */ - def apply(msg: Any): Coordinated = new Coordinated(msg, member.commitBarrier.addMember()) - - /** - * Create a new Coordinated object but *do not* increment the number of members by one. - * Only use this method if you know this is what you need. - */ - def noIncrement(msg: Any): Coordinated = new Coordinated(msg, member) - - /** - * Java API: get the message for this Coordinated. - */ - def getMessage(): Any = message - - /** - * Java API: create a new Coordinated object and increment the number of members by one. - * Use this method to ''pass on'' the coordination. - */ - def coordinate(msg: Any): Coordinated = apply(msg) - - /** - * Delimits the coordinated transaction. The transaction will wait for all other transactions - * in this coordination before committing. The timeout is specified when creating the Coordinated. - * - * @throws CoordinatedTransactionException if the coordinated transaction fails. - */ - def atomic[A](body: InTxn ⇒ A): A = { - member.atomic(body) match { - case Right(result) ⇒ result - case Left(CommitBarrier.MemberUncaughtExceptionCause(x)) ⇒ - throw new CoordinatedTransactionException("Exception in coordinated atomic", x) - case Left(cause) ⇒ - throw new CoordinatedTransactionException("Failed due to " + cause) - } - } - - /** - * Java API: coordinated atomic method that accepts a `java.lang.Runnable`. - * Delimits the coordinated transaction. The transaction will wait for all other transactions - * in this coordination before committing. The timeout is specified when creating the Coordinated. - * - * @throws CoordinatedTransactionException if the coordinated transaction fails. - */ - def atomic(runnable: Runnable): Unit = atomic { _ ⇒ runnable.run } - - /** - * Java API: coordinated atomic method that accepts a `java.util.concurrent.Callable`. - * Delimits the coordinated transaction. The transaction will wait for all other transactions - * in this coordination before committing. The timeout is specified when creating the Coordinated. - * - * @throws CoordinatedTransactionException if the coordinated transaction fails. - */ - def atomic[A](callable: Callable[A]): A = atomic { _ ⇒ callable.call } - - /** - * An empty coordinated atomic block. Can be used to complete the number of members involved - * and wait for all transactions to complete. - */ - def await(): Unit = atomic(txn ⇒ ()) - - /** - * Cancel this Coordinated transaction. - */ - def cancel(info: Any): Unit = member.cancel(CommitBarrier.UserCancel(info)) -} diff --git a/akka-transactor/src/main/scala/akka/transactor/Transactor.scala b/akka-transactor/src/main/scala/akka/transactor/Transactor.scala deleted file mode 100644 index c7ed6fb066..0000000000 --- a/akka-transactor/src/main/scala/akka/transactor/Transactor.scala +++ /dev/null @@ -1,192 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.transactor - -import language.postfixOps - -import akka.actor.{ Actor, ActorRef } -import scala.concurrent.stm.InTxn - -/** - * Used for specifying actor refs and messages to send to during coordination. - */ -@deprecated("akka.transactor will be removed", "2.3") -final case class SendTo(actor: ActorRef, message: Option[Any] = None) - -/** - * An actor with built-in support for coordinated transactions. - * - * Transactors implement the general pattern for using [[akka.transactor.Coordinated]] where - * coordination messages are sent to other transactors then the coordinated transaction is - * entered. Transactors can also accept explicitly sent `Coordinated` messages. - *

- * - * Simple transactors will just implement the `atomically` method which is similar to - * the actor `receive` method but runs within a coordinated transaction. - * - * Example of a simple transactor that will join a coordinated transaction: - * - * {{{ - * class Counter extends Transactor { - * val count = Ref(0) - * - * def atomically = implicit txn => { - * case Increment => count transform (_ + 1) - * } - * } - * }}} - *
- * - * To coordinate with other transactors override the `coordinate` method. - * The `coordinate` method maps a message to a set of [[akka.transactor.SendTo]] - * objects, pairs of `ActorRef` and a message. - * You can use the `include` and `sendTo` methods to easily coordinate with other transactors. - * The `include` method will send on the same message that was received to other transactors. - * The `sendTo` method allows you to specify both the actor to send to, and message to send. - * - * Example of coordinating an increment: - * - * {{{ - * class FriendlyCounter(friend: ActorRef) extends Transactor { - * val count = Ref(0) - * - * override def coordinate = { - * case Increment => include(friend) - * } - * - * def atomically = implicit txn => { - * case Increment => count transform (_ + 1) - * } - * } - * }}} - *
- * - * Using `include` to include more than one transactor: - * - * {{{ - * override def coordinate = { - * case Message => include(actor1, actor2, actor3) - * } - * }}} - *
- * - * Using `sendTo` to coordinate transactions but send on a different message - * than the one that was received: - * - * {{{ - * override def coordinate = { - * case Message => sendTo(someActor -> SomeOtherMessage) - * case SomeMessage => sendTo(actor1 -> Message1, actor2 -> Message2) - * } - * }}} - *
- * - * To execute directly before or after the coordinated transaction, override - * the `before` and `after` methods. These methods also expect partial functions - * like the receive method. They do not execute within the transaction. - * - * To completely bypass coordinated transactions override the `normally` method. - * Any message matched by `normally` will not be matched by the other methods, - * and will not be involved in coordinated transactions. In this method you - * can implement normal actor behavior, or use the normal STM atomic for - * local transactions. - * - * @see [[akka.transactor.Coordinated]] - */ -@deprecated("akka.transactor will be removed", "2.3") -trait Transactor extends Actor { - private val settings = TransactorExtension(context.system) - - /** - * Implement a general pattern for using coordinated transactions. - */ - final def receive = { - case coordinated @ Coordinated(message) ⇒ { - val others = (coordinate orElse alone)(message) - for (sendTo ← others) { - sendTo.actor ! coordinated(sendTo.message.getOrElse(message)) - } - (before orElse doNothing)(message) - coordinated.atomic { txn ⇒ (atomically(txn) orElse doNothing)(message) } - (after orElse doNothing)(message) - } - case message ⇒ { - if (normally.isDefinedAt(message)) normally(message) - else receive(Coordinated(message)(settings.CoordinatedTimeout)) - } - } - - /** - * Override this method to coordinate with other transactors. - * The other transactors are added to the coordinated transaction barrier - * and sent a Coordinated message. The message to send can be specified - * or otherwise the same message as received is sent. Use the 'include' and - * 'sendTo' methods to easily create the set of transactors to be involved. - */ - def coordinate: PartialFunction[Any, Set[SendTo]] = alone - - /** - * Default coordination - no other transactors. - */ - def alone: PartialFunction[Any, Set[SendTo]] = { case _ ⇒ nobody } - - /** - * Empty set of transactors to send to. - */ - def nobody: Set[SendTo] = Set.empty - - /** - * Include other actors in this coordinated transaction and send - * them the same message as received. Use as the result in 'coordinated'. - */ - def include(actors: ActorRef*): Set[SendTo] = actors map (SendTo(_)) toSet - - /** - * Include other actors in this coordinated transaction and specify the message - * to send by providing ActorRef -> Message pairs. Use as the result in 'coordinated'. - */ - def sendTo(pairs: (ActorRef, Any)*): Set[SendTo] = pairs map (p ⇒ SendTo(p._1, Some(p._2))) toSet - - /** - * A Receive block that runs before the coordinated transaction is entered. - */ - def before: Receive = doNothing - - /** - * The Receive block to run inside the coordinated transaction. - * This is a function from InTxn to Receive block. - * - * For example: - * {{{ - * def atomically = implicit txn => { - * case Increment => count transform (_ + 1) - * } - * }}} - */ - def atomically: InTxn ⇒ Receive - - /** - * A Receive block that runs after the coordinated transaction. - */ - def after: Receive = doNothing - - /** - * Bypass transactionality and behave like a normal actor. - */ - def normally: Receive = doNothing - - /** - * Default catch-all for the different Receive methods. - */ - def doNothing: Receive = EmptyReceive -} - -/** - * INTERNAL API - */ -private[akka] object EmptyReceive extends PartialFunction[Any, Unit] { - def apply(any: Any): Unit = () - def isDefinedAt(any: Any): Boolean = false -} diff --git a/akka-transactor/src/main/scala/akka/transactor/TransactorExtension.scala b/akka-transactor/src/main/scala/akka/transactor/TransactorExtension.scala deleted file mode 100644 index 240c0dfefe..0000000000 --- a/akka-transactor/src/main/scala/akka/transactor/TransactorExtension.scala +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.transactor - -import akka.actor.{ ActorSystem, ExtensionId, ExtensionIdProvider, ExtendedActorSystem } -import akka.actor.Extension -import com.typesafe.config.Config -import akka.util.Timeout -import scala.concurrent.duration.Duration -import java.util.concurrent.TimeUnit.MILLISECONDS - -/** - * TransactorExtension is an Akka Extension to hold settings for transactors. - */ -@deprecated("akka.transactor will be removed", "2.3") -object TransactorExtension extends ExtensionId[TransactorSettings] with ExtensionIdProvider { - override def get(system: ActorSystem): TransactorSettings = super.get(system) - override def lookup: TransactorExtension.type = TransactorExtension - override def createExtension(system: ExtendedActorSystem): TransactorSettings = new TransactorSettings(system.settings.config) -} - -@deprecated("akka.transactor will be removed", "2.3") -class TransactorSettings(val config: Config) extends Extension { - import config._ - val CoordinatedTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.transactor.coordinated-timeout"), MILLISECONDS)) -} diff --git a/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala b/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala deleted file mode 100644 index 6b150c69b9..0000000000 --- a/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.transactor - -import akka.actor.{ UntypedActor, ActorRef } -import java.util.{ Set ⇒ JSet } -import java.util.Collections.{ emptySet, singleton ⇒ singletonSet } - -/** - * An UntypedActor version of transactor for using from Java. - */ -@deprecated("akka.transactor will be removed", "2.3") -abstract class UntypedTransactor extends UntypedActor { - import scala.collection.JavaConverters.asScalaSetConverter - - private val settings = TransactorExtension(context.system) - - /** - * Implement a general pattern for using coordinated transactions. - */ - @throws(classOf[Exception]) - final def onReceive(message: Any) { - message match { - case coordinated @ Coordinated(message) ⇒ { - for (sendTo ← coordinate(message).asScala) { - sendTo.actor ! coordinated(sendTo.message.getOrElse(message)) - } - before(message) - coordinated.atomic { txn ⇒ atomically(message) } - after(message) - } - case message ⇒ { - val normal = normally(message) - if (!normal) onReceive(Coordinated(message)(settings.CoordinatedTimeout)) - } - } - } - - /** - * Override this method to coordinate with other transactors. - * The other transactors are added to the coordinated transaction barrier - * and sent a Coordinated message. The message to send can be specified - * or otherwise the same message as received is sent. Use the 'include' and - * 'sendTo' methods to easily create the set of transactors to be involved. - */ - @throws(classOf[Exception]) - def coordinate(message: Any): JSet[SendTo] = nobody - - /** - * Empty set of transactors to send to. - */ - def nobody: JSet[SendTo] = emptySet() - - /** - * For including one other actor in this coordinated transaction and sending - * them the same message as received. Use as the result in `coordinated`. - */ - def include(actor: ActorRef): JSet[SendTo] = singletonSet(SendTo(actor)) - - /** - * For including one other actor in this coordinated transaction and specifying the - * message to send. Use as the result in `coordinated`. - */ - def include(actor: ActorRef, message: Any): JSet[SendTo] = singletonSet(SendTo(actor, Some(message))) - - /** - * For including another actor in this coordinated transaction and sending - * them the same message as received. Use to create the result in `coordinated`. - */ - def sendTo(actor: ActorRef): SendTo = SendTo(actor) - - /** - * For including another actor in this coordinated transaction and specifying the - * message to send. Use to create the result in `coordinated`. - */ - def sendTo(actor: ActorRef, message: Any): SendTo = SendTo(actor, Some(message)) - - /** - * A Receive block that runs before the coordinated transaction is entered. - */ - @throws(classOf[Exception]) - def before(message: Any) {} - - /** - * The Receive block to run inside the coordinated transaction. - */ - @throws(classOf[Exception]) - def atomically(message: Any) - - /** - * A Receive block that runs after the coordinated transaction. - */ - @throws(classOf[Exception]) - def after(message: Any) {} - - /** - * Bypass transactionality and behave like a normal actor. - */ - @throws(classOf[Exception]) - def normally(message: Any): Boolean = false -} diff --git a/akka-transactor/src/test/java/akka/transactor/ExpectedFailureException.java b/akka-transactor/src/test/java/akka/transactor/ExpectedFailureException.java deleted file mode 100644 index bdc5218249..0000000000 --- a/akka-transactor/src/test/java/akka/transactor/ExpectedFailureException.java +++ /dev/null @@ -1,11 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.transactor; - -public class ExpectedFailureException extends RuntimeException { - public ExpectedFailureException() { - super("Expected failure"); - } -} diff --git a/akka-transactor/src/test/java/akka/transactor/Increment.java b/akka-transactor/src/test/java/akka/transactor/Increment.java deleted file mode 100644 index 9e67546042..0000000000 --- a/akka-transactor/src/test/java/akka/transactor/Increment.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.transactor; - -import akka.actor.ActorRef; -import java.util.List; -import java.util.concurrent.CountDownLatch; - -public class Increment { - private List friends; - private CountDownLatch latch; - - public Increment(List friends, CountDownLatch latch) { - this.friends = friends; - this.latch = latch; - } - - public List getFriends() { - return friends; - } - - public CountDownLatch getLatch() { - return latch; - } -} diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java deleted file mode 100644 index 6a255355af..0000000000 --- a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.transactor; - -import java.util.List; -import java.util.concurrent.CountDownLatch; - -import scala.concurrent.stm.Ref; -import scala.concurrent.stm.japi.STM; -import akka.actor.ActorRef; -import akka.actor.UntypedActor; - -public class UntypedCoordinatedCounter extends UntypedActor { - private String name; - private Ref.View count = STM.newRef(0); - - public UntypedCoordinatedCounter(String name) { - this.name = name; - } - - public void onReceive(Object incoming) throws Exception { - if (incoming instanceof Coordinated) { - Coordinated coordinated = (Coordinated) incoming; - Object message = coordinated.getMessage(); - if (message instanceof Increment) { - Increment increment = (Increment) message; - List friends = increment.getFriends(); - final CountDownLatch latch = increment.getLatch(); - final Runnable countDown = new Runnable() { - public void run() { - latch.countDown(); - } - }; - if (!friends.isEmpty()) { - Increment coordMessage = new Increment(friends.subList(1, friends.size()), latch); - friends.get(0).tell(coordinated.coordinate(coordMessage), getSelf()); - } - coordinated.atomic(new Runnable() { - public void run() { - STM.increment(count, 1); - STM.afterCompletion(countDown); - } - }); - } - } else if ("GetCount".equals(incoming)) { - getSender().tell(count.get(), getSelf()); - } - } -} diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java deleted file mode 100644 index d1b9bb9b9f..0000000000 --- a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.transactor; - -import static org.junit.Assert.*; - -import akka.testkit.*; -import org.junit.*; - -import akka.actor.ActorSystem; -import akka.actor.ActorRef; -import akka.actor.Props; -import scala.concurrent.Await; -import scala.concurrent.Future; -import static akka.pattern.Patterns.ask; - -import akka.util.Timeout; - -import static akka.japi.Util.immutableSeq; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import scala.collection.JavaConverters; -import scala.collection.immutable.Seq; - -public class UntypedCoordinatedIncrementTest { - - @ClassRule - public static AkkaJUnitActorSystemResource actorSystemResource = - new AkkaJUnitActorSystemResource("UntypedCoordinatedIncrementTest", AkkaSpec.testConf()); - - private final ActorSystem system = actorSystemResource.getSystem(); - - List counters; - ActorRef failer; - - int numCounters = 3; - int timeoutSeconds = 5; - - Timeout timeout = new Timeout(timeoutSeconds, TimeUnit.SECONDS); - - @Before - public void initialize() { - counters = new ArrayList(); - for (int i = 1; i <= numCounters; i++) { - final String name = "counter" + i; - ActorRef counter = system.actorOf(Props.create(UntypedCoordinatedCounter.class, name)); - counters.add(counter); - } - failer = system.actorOf(Props.create(UntypedFailer.class)); - } - - @Test - public void incrementAllCountersWithSuccessfulTransaction() throws Exception { - CountDownLatch incrementLatch = new CountDownLatch(numCounters); - Increment message = new Increment(counters.subList(1, counters.size()), incrementLatch); - counters.get(0).tell(new Coordinated(message, timeout), null); - try { - incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS); - } catch (InterruptedException exception) { - } - for (ActorRef counter : counters) { - Future future = ask(counter, "GetCount", timeout); - int count = (Integer) Await.result(future, timeout.duration()); - assertEquals(1, count); - } - } - - @Test - public void incrementNoCountersWithFailingTransaction() throws Exception { - EventFilter expectedFailureFilter = (EventFilter) new ErrorFilter(ExpectedFailureException.class); - EventFilter coordinatedFilter = (EventFilter) new ErrorFilter(CoordinatedTransactionException.class); - Seq ignoreExceptions = seq(expectedFailureFilter, coordinatedFilter); - system.eventStream().publish(new TestEvent.Mute(ignoreExceptions)); - CountDownLatch incrementLatch = new CountDownLatch(numCounters); - List actors = new ArrayList(counters); - actors.add(failer); - Increment message = new Increment(actors.subList(1, actors.size()), incrementLatch); - actors.get(0).tell(new Coordinated(message, timeout), null); - try { - incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS); - } catch (InterruptedException exception) { - } - for (ActorRef counter : counters) { - Futurefuture = ask(counter, "GetCount", timeout); - int count = (Integer) Await.result(future, timeout.duration()); - assertEquals(0, count); - } - } - - public Seq seq(A... args) { - return immutableSeq(args); - } -} diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedCounter.java b/akka-transactor/src/test/java/akka/transactor/UntypedCounter.java deleted file mode 100644 index c756950b47..0000000000 --- a/akka-transactor/src/test/java/akka/transactor/UntypedCounter.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.transactor; - -import java.util.List; -import java.util.Set; - -import scala.concurrent.stm.Ref; -import scala.concurrent.stm.japi.STM; -import akka.actor.ActorRef; - -public class UntypedCounter extends UntypedTransactor { - private String name; - private Ref.View count = STM.newRef(0); - - public UntypedCounter(String name) { - this.name = name; - } - - @Override public Set coordinate(Object message) { - if (message instanceof Increment) { - Increment increment = (Increment) message; - List friends = increment.getFriends(); - if (!friends.isEmpty()) { - Increment coordMessage = new Increment(friends.subList(1, friends.size()), increment.getLatch()); - return include(friends.get(0), coordMessage); - } else { - return nobody(); - } - } else { - return nobody(); - } - } - - public void atomically(Object message) { - if (message instanceof Increment) { - STM.increment(count, 1); - final Increment increment = (Increment) message; - Runnable countDown = new Runnable() { - public void run() { - increment.getLatch().countDown(); - } - }; - STM.afterCompletion(countDown); - } - } - - @Override public boolean normally(Object message) { - if ("GetCount".equals(message)) { - getSender().tell(count.get(), getSelf()); - return true; - } else return false; - } -} diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedFailer.java b/akka-transactor/src/test/java/akka/transactor/UntypedFailer.java deleted file mode 100644 index 9f9b66bf48..0000000000 --- a/akka-transactor/src/test/java/akka/transactor/UntypedFailer.java +++ /dev/null @@ -1,11 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.transactor; - -public class UntypedFailer extends UntypedTransactor { - public void atomically(Object message) throws Exception { - throw new ExpectedFailureException(); - } -} diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java b/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java deleted file mode 100644 index cb43ae0a9c..0000000000 --- a/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.transactor; - -import static org.junit.Assert.*; - -import akka.testkit.*; -import org.junit.*; - -import akka.actor.ActorSystem; -import akka.actor.ActorRef; -import akka.actor.Props; -import scala.concurrent.Await; -import scala.concurrent.Future; -import static akka.pattern.Patterns.ask; - -import akka.util.Timeout; - -import static akka.japi.Util.immutableSeq; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import scala.collection.JavaConverters; -import scala.collection.immutable.Seq; - -public class UntypedTransactorTest { - - @ClassRule - public static AkkaJUnitActorSystemResource actorSystemResource = - new AkkaJUnitActorSystemResource("UntypedTransactorTest", AkkaSpec.testConf()); - - private final ActorSystem system = actorSystemResource.getSystem(); - - List counters; - ActorRef failer; - - int numCounters = 3; - int timeoutSeconds = 5; - - Timeout timeout = new Timeout(timeoutSeconds, TimeUnit.SECONDS); - - @Before - public void initialize() { - counters = new ArrayList(); - for (int i = 1; i <= numCounters; i++) { - final String name = "counter" + i; - ActorRef counter = system.actorOf(Props.create(UntypedCounter.class, name)); - counters.add(counter); - } - failer = system.actorOf(Props.create(UntypedFailer.class)); - } - - @Test - public void incrementAllCountersWithSuccessfulTransaction() throws Exception { - CountDownLatch incrementLatch = new CountDownLatch(numCounters); - Increment message = new Increment(counters.subList(1, counters.size()), - incrementLatch); - counters.get(0).tell(message, null); - try { - incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS); - } catch (InterruptedException exception) { - } - for (ActorRef counter : counters) { - Future future = ask(counter, "GetCount", timeout); - int count = (Integer) Await.result(future, timeout.duration()); - assertEquals(1, count); - } - } - - @Test - public void incrementNoCountersWithFailingTransaction() throws Exception { - EventFilter expectedFailureFilter = (EventFilter) new ErrorFilter( - ExpectedFailureException.class); - EventFilter coordinatedFilter = (EventFilter) new ErrorFilter( - CoordinatedTransactionException.class); - Seq ignoreExceptions = seq(expectedFailureFilter, - coordinatedFilter); - system.eventStream().publish(new TestEvent.Mute(ignoreExceptions)); - CountDownLatch incrementLatch = new CountDownLatch(numCounters); - List actors = new ArrayList(counters); - actors.add(failer); - Increment message = new Increment(actors.subList(1, actors.size()), - incrementLatch); - actors.get(0).tell(message, null); - try { - incrementLatch.await(timeoutSeconds, TimeUnit.SECONDS); - } catch (InterruptedException exception) { - } - for (ActorRef counter : counters) { - Future future = ask(counter, "GetCount", timeout); - int count = (Integer) Await.result(future, timeout.duration()); - assertEquals(0, count); - } - } - - public Seq seq(A... args) { - return immutableSeq(args); - } -} diff --git a/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala b/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala deleted file mode 100644 index e59119c0f3..0000000000 --- a/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.transactor - -import org.scalatest.BeforeAndAfterAll - -import scala.concurrent.Await -import scala.concurrent.duration._ -import scala.concurrent.stm._ -import scala.collection.immutable -import akka.actor._ -import akka.util.Timeout -import akka.testkit._ -import akka.pattern.{ AskTimeoutException, ask } - -object CoordinatedIncrement { - - val config = """ - akka { - actor { - default-dispatcher { - executor = "thread-pool-executor" - thread-pool-executor { - core-pool-size-min = 5 - core-pool-size-max = 16 - } - } - } - } - """ - - final case class Increment(friends: immutable.Seq[ActorRef]) - case object GetCount - - class Counter(name: String) extends Actor { - val count = Ref(0) - - def receive = { - case coordinated @ Coordinated(Increment(friends)) ⇒ { - if (friends.nonEmpty) { - friends.head ! coordinated(Increment(friends.tail)) - } - coordinated.atomic { implicit t ⇒ - count transform (_ + 1) - } - } - - case GetCount ⇒ sender ! count.single.get - } - } - - class ExpectedFailureException extends RuntimeException("Expected failure") - - class Failer extends Actor { - - def receive = { - case coordinated @ Coordinated(Increment(friends)) ⇒ { - coordinated.atomic { t ⇒ - throw new ExpectedFailureException - } - } - } - } -} - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class CoordinatedIncrementSpec extends AkkaSpec(CoordinatedIncrement.config) with BeforeAndAfterAll { - import CoordinatedIncrement._ - - val numCounters = 4 - - def actorOfs = { - def createCounter(i: Int) = system.actorOf(Props(classOf[Counter], "counter" + i)) - val counters = (1 to numCounters) map createCounter - val failer = system.actorOf(Props[Failer]) - (counters, failer) - } - - "Coordinated increment" should { - implicit val timeout = Timeout(2.seconds.dilated) - "increment all counters by one with successful transactions" in { - val (counters, failer) = actorOfs - val coordinated = Coordinated() - counters(0) ! coordinated(Increment(counters.tail)) - coordinated.await - for (counter ← counters) { - Await.result((counter ? GetCount).mapTo[Int], timeout.duration) should be(1) - } - counters foreach (system.stop(_)) - system.stop(failer) - } - - "increment no counters with a failing transaction" in { - val ignoreExceptions = Seq( - EventFilter[ExpectedFailureException](), - EventFilter[CoordinatedTransactionException](), - EventFilter[AskTimeoutException]()) - filterEvents(ignoreExceptions) { - val (counters, failer) = actorOfs - val coordinated = Coordinated() - counters(0) ! Coordinated(Increment(counters.tail :+ failer)) - coordinated.await - for (counter ← counters) { - Await.result(counter ? GetCount, timeout.duration) should be(0) - } - counters foreach (system.stop(_)) - system.stop(failer) - } - } - } -} diff --git a/akka-transactor/src/test/scala/akka/transactor/FickleFriendsSpec.scala b/akka-transactor/src/test/scala/akka/transactor/FickleFriendsSpec.scala deleted file mode 100644 index da6c7828ff..0000000000 --- a/akka-transactor/src/test/scala/akka/transactor/FickleFriendsSpec.scala +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.transactor - -import language.postfixOps - -import org.scalatest.BeforeAndAfterAll - -import scala.concurrent.Await -import scala.concurrent.duration._ -import scala.concurrent.stm._ -import scala.collection.immutable -import scala.util.Random.{ nextInt ⇒ random } -import scala.util.control.NonFatal -import akka.actor._ -import akka.testkit._ -import akka.testkit.TestEvent.Mute -import java.util.concurrent.CountDownLatch -import akka.pattern.{ AskTimeoutException, ask } -import akka.util.Timeout - -object FickleFriends { - final case class FriendlyIncrement(friends: immutable.Seq[ActorRef], timeout: Timeout, latch: CountDownLatch) - final case class Increment(friends: immutable.Seq[ActorRef]) - case object GetCount - - /** - * Coordinator will keep trying to coordinate an increment until successful. - */ - class Coordinator(name: String) extends Actor { - val count = Ref(0) - - def increment(implicit txn: InTxn) = { - count transform (_ + 1) - } - - def receive = { - case FriendlyIncrement(friends, timeout, latch) ⇒ { - var success = false - while (!success) { - try { - val coordinated = Coordinated()(timeout) - if (friends.nonEmpty) { - friends.head ! coordinated(Increment(friends.tail)) - } - coordinated.atomic { implicit t ⇒ - increment - Txn.afterCommit { status ⇒ - success = true - latch.countDown() - } - } - } catch { - case NonFatal(_) ⇒ () // swallow exceptions - } - } - } - - case GetCount ⇒ sender ! count.single.get - } - } - - class ExpectedFailureException(message: String) extends RuntimeException(message) - - /** - * FickleCounter randomly fails at different points with 50% chance of failing overall. - */ - class FickleCounter(name: String) extends Actor { - val count = Ref(0) - - val maxFailures = 3 - var failures = 0 - - def increment(implicit txn: InTxn) = { - count transform (_ + 1) - } - - def failIf(x: Int, y: Int) = { - if (x == y && failures < maxFailures) { - failures += 1 - throw new ExpectedFailureException("Random fail at position " + x) - } - } - - def receive = { - case coordinated @ Coordinated(Increment(friends)) ⇒ { - val failAt = random(8) - failIf(failAt, 0) - if (friends.nonEmpty) { - friends.head ! coordinated(Increment(friends.tail)) - } - failIf(failAt, 1) - coordinated.atomic { implicit t ⇒ - failIf(failAt, 2) - increment - failIf(failAt, 3) - } - } - - case GetCount ⇒ sender ! count.single.get - } - } -} - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class FickleFriendsSpec extends AkkaSpec with BeforeAndAfterAll { - import FickleFriends._ - - implicit val timeout = Timeout(5.seconds.dilated) - - val numCounters = 2 - - def actorOfs = { - def createCounter(i: Int) = system.actorOf(Props(classOf[FickleCounter], "counter" + i)) - val counters = (1 to numCounters) map createCounter - val coordinator = system.actorOf(Props(classOf[Coordinator], "coordinator")) - (counters, coordinator) - } - - "Coordinated fickle friends" should { - "eventually succeed to increment all counters by one" in { - val ignoreExceptions = immutable.Seq( - EventFilter[ExpectedFailureException](), - EventFilter[CoordinatedTransactionException](), - EventFilter[AskTimeoutException]()) - system.eventStream.publish(Mute(ignoreExceptions)) - val (counters, coordinator) = actorOfs - val latch = new CountDownLatch(1) - coordinator ! FriendlyIncrement(counters, timeout, latch) - latch.await // this could take a while - Await.result(coordinator ? GetCount, timeout.duration) should be(1) - for (counter ← counters) { - Await.result(counter ? GetCount, timeout.duration) should be(1) - } - counters foreach (system.stop(_)) - system.stop(coordinator) - } - } -} diff --git a/akka-transactor/src/test/scala/akka/transactor/JavaUntypedCoordinatedSpec.scala b/akka-transactor/src/test/scala/akka/transactor/JavaUntypedCoordinatedSpec.scala deleted file mode 100644 index 0022422484..0000000000 --- a/akka-transactor/src/test/scala/akka/transactor/JavaUntypedCoordinatedSpec.scala +++ /dev/null @@ -1,11 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.transactor - -import org.scalatest.junit.JUnitWrapperSuite - -class JavaUntypedCoordinatedSpec extends JUnitWrapperSuite( - "akka.transactor.UntypedCoordinatedIncrementTest", - Thread.currentThread.getContextClassLoader) diff --git a/akka-transactor/src/test/scala/akka/transactor/JavaUntypedTransactorSpec.scala b/akka-transactor/src/test/scala/akka/transactor/JavaUntypedTransactorSpec.scala deleted file mode 100644 index faa618aa63..0000000000 --- a/akka-transactor/src/test/scala/akka/transactor/JavaUntypedTransactorSpec.scala +++ /dev/null @@ -1,11 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.transactor - -import org.scalatest.junit.JUnitWrapperSuite - -class JavaUntypedTransactorSpec extends JUnitWrapperSuite( - "akka.transactor.UntypedTransactorTest", - Thread.currentThread.getContextClassLoader) diff --git a/akka-transactor/src/test/scala/akka/transactor/TransactorSpec.scala b/akka-transactor/src/test/scala/akka/transactor/TransactorSpec.scala deleted file mode 100644 index 38d0278a95..0000000000 --- a/akka-transactor/src/test/scala/akka/transactor/TransactorSpec.scala +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.transactor - -import language.postfixOps - -import akka.actor._ -import scala.collection.immutable -import scala.concurrent.Await -import scala.concurrent.duration._ -import scala.concurrent.stm._ -import akka.util.Timeout -import akka.testkit._ -import akka.pattern.{ AskTimeoutException, ask } - -object TransactorIncrement { - final case class Increment(friends: immutable.Seq[ActorRef], latch: TestLatch) - case object GetCount - - class Counter(name: String) extends Transactor { - val count = Ref(0) - - def increment(implicit txn: InTxn) = { - count transform (_ + 1) - } - - override def coordinate = { - case Increment(friends, latch) ⇒ { - if (friends.nonEmpty) sendTo(friends.head -> Increment(friends.tail, latch)) - else nobody - } - } - - override def before = { - case i: Increment ⇒ - } - - def atomically = implicit txn ⇒ { - case Increment(friends, latch) ⇒ { - increment - Txn.afterCompletion { status ⇒ latch.countDown() } - } - } - - override def after = { - case i: Increment ⇒ - } - - override def normally = { - case GetCount ⇒ sender ! count.single.get - } - } - - class ExpectedFailureException extends RuntimeException("Expected failure") - - class Failer extends Transactor { - def atomically = implicit txn ⇒ { - case _ ⇒ throw new ExpectedFailureException - } - } -} - -object SimpleTransactor { - final case class Set(ref: Ref[Int], value: Int, latch: TestLatch) - - class Setter extends Transactor { - def atomically = implicit txn ⇒ { - case Set(ref, value, latch) ⇒ { - ref() = value - Txn.afterCompletion { status ⇒ latch.countDown() } - } - } - } -} - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class TransactorSpec extends AkkaSpec { - import TransactorIncrement._ - import SimpleTransactor._ - - implicit val timeout = Timeout(5.seconds.dilated) - - val numCounters = 3 - - def createTransactors = { - def createCounter(i: Int) = system.actorOf(Props(classOf[Counter], "counter" + i)) - val counters = (1 to numCounters) map createCounter - val failer = system.actorOf(Props[Failer]) - (counters, failer) - } - - "Transactor increment" should { - "increment all counters by one with successful transactions" in { - val (counters, failer) = createTransactors - val incrementLatch = TestLatch(numCounters) - counters(0) ! Increment(counters.tail, incrementLatch) - Await.ready(incrementLatch, 5 seconds) - for (counter ← counters) { - Await.result(counter ? GetCount, timeout.duration) should be(1) - } - counters foreach (system.stop(_)) - system.stop(failer) - } - - "increment no counters with a failing transaction" in { - val ignoreExceptions = Seq( - EventFilter[ExpectedFailureException](), - EventFilter[CoordinatedTransactionException](), - EventFilter[AskTimeoutException]()) - filterEvents(ignoreExceptions) { - val (counters, failer) = createTransactors - val failLatch = TestLatch(numCounters) - counters(0) ! Increment(counters.tail :+ failer, failLatch) - Await.ready(failLatch, 5 seconds) - for (counter ← counters) { - Await.result(counter ? GetCount, timeout.duration) should be(0) - } - counters foreach (system.stop(_)) - system.stop(failer) - } - } - } - - "Transactor" should { - "be usable without overriding normally" in { - val transactor = system.actorOf(Props[Setter]) - val ref = Ref(0) - val latch = TestLatch(1) - transactor ! Set(ref, 5, latch) - Await.ready(latch, 5 seconds) - val value = ref.single.get - value should be(5) - system.stop(transactor) - } - } -} diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 67b5f68f1b..f09084616f 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -75,7 +75,7 @@ object AkkaBuild extends Build { // add reportBinaryIssues to validatePullRequest on minor version maintenance branch validatePullRequest <<= (Unidoc.unidoc, SphinxSupport.generate in Sphinx in docs) map { (_, _) => } ), - aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, agent, transactor, + aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, agent, persistence, mailboxes, zeroMQ, kernel, osgi, docs, contrib, samples, multiNodeTestkit) ) @@ -252,16 +252,6 @@ object AkkaBuild extends Build { ) ) - lazy val transactor = Project( - id = "akka-transactor", - base = file("akka-transactor"), - dependencies = Seq(actor, testkit % "test->test"), - settings = defaultSettings ++ formatSettings ++ scaladocSettings ++ javadocSettings ++ OSGi.transactor ++ Seq( - libraryDependencies ++= Dependencies.transactor, - previousArtifact := akkaPreviousArtifact("akka-transactor") - ) - ) - lazy val persistence = Project( id = "akka-persistence-experimental", base = file("akka-persistence"), @@ -1029,8 +1019,6 @@ object AkkaBuild extends Build { val slf4j = exports(Seq("akka.event.slf4j.*")) - val transactor = exports(Seq("akka.transactor.*")) - val persistence = exports(Seq("akka.persistence.*"), imports = Seq(protobufImport())) val testkit = exports(Seq("akka.testkit.*")) @@ -1151,8 +1139,6 @@ object Dependencies { val agent = Seq(scalaStm, Test.scalatest, Test.junit) - val transactor = Seq(scalaStm, Test.scalatest, Test.junit) - val persistence = Seq(levelDB, levelDBNative, protobuf, Test.scalatest, Test.junit, Test.commonsIo) val mailboxes = Seq(Test.scalatest, Test.junit)