From dd3d3da4527b17fd46985433db7ffc21139f9eed Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 11 Dec 2013 14:44:45 +0100 Subject: [PATCH] =tra #3668 Deprecate transactors --- akka-docs/rst/general/configuration.rst | 6 - akka-docs/rst/intro/getting-started.rst | 3 - akka-docs/rst/intro/what-is-akka.rst | 8 - .../docs/transactor/CoordinatedCounter.java | 39 --- .../code/docs/transactor/Coordinator.java | 28 --- .../java/code/docs/transactor/Counter.java | 28 --- .../code/docs/transactor/FriendlyCounter.java | 38 --- .../java/code/docs/transactor/Increment.java | 27 -- .../java/code/docs/transactor/Message.java | 7 - .../transactor/TransactorDocJavaSpec.scala | 11 - .../docs/transactor/TransactorDocTest.java | 102 -------- akka-docs/rst/java/index-futures.rst | 2 - akka-docs/rst/java/stm.rst | 60 ----- akka-docs/rst/java/transactors.rst | 153 ------------ .../project/migration-guide-2.2.x-2.3.x.rst | 7 + .../docs/transactor/TransactorDocSpec.scala | 234 ------------------ akka-docs/rst/scala/index-futures.rst | 2 - akka-docs/rst/scala/stm.rst | 75 ------ akka-docs/rst/scala/transactors.rst | 163 ------------ .../src/main/resources/reference.conf | 1 + .../scala/akka/transactor/Coordinated.scala | 3 + .../scala/akka/transactor/Transactor.scala | 5 + .../akka/transactor/TransactorExtension.scala | 2 + .../akka/transactor/UntypedTransactor.scala | 1 + project/AkkaBuild.scala | 2 +- 25 files changed, 20 insertions(+), 987 deletions(-) delete mode 100644 akka-docs/rst/java/code/docs/transactor/CoordinatedCounter.java delete mode 100644 akka-docs/rst/java/code/docs/transactor/Coordinator.java delete mode 100644 akka-docs/rst/java/code/docs/transactor/Counter.java delete mode 100644 akka-docs/rst/java/code/docs/transactor/FriendlyCounter.java delete mode 100644 akka-docs/rst/java/code/docs/transactor/Increment.java delete mode 100644 akka-docs/rst/java/code/docs/transactor/Message.java delete mode 100644 akka-docs/rst/java/code/docs/transactor/TransactorDocJavaSpec.scala delete mode 100644 akka-docs/rst/java/code/docs/transactor/TransactorDocTest.java delete mode 100644 akka-docs/rst/java/stm.rst delete mode 100644 akka-docs/rst/java/transactors.rst delete mode 100644 akka-docs/rst/scala/code/docs/transactor/TransactorDocSpec.scala delete mode 100644 akka-docs/rst/scala/stm.rst delete mode 100644 akka-docs/rst/scala/transactors.rst diff --git a/akka-docs/rst/general/configuration.rst b/akka-docs/rst/general/configuration.rst index 70a69f3bda..cfc649b58c 100644 --- a/akka-docs/rst/general/configuration.rst +++ b/akka-docs/rst/general/configuration.rst @@ -379,12 +379,6 @@ akka-cluster .. literalinclude:: ../../../akka-cluster/src/main/resources/reference.conf :language: none -akka-transactor -~~~~~~~~~~~~~~~ - -.. literalinclude:: ../../../akka-transactor/src/main/resources/reference.conf - :language: none - akka-agent ~~~~~~~~~~ diff --git a/akka-docs/rst/intro/getting-started.rst b/akka-docs/rst/intro/getting-started.rst index 9781feae2e..9990771cb3 100644 --- a/akka-docs/rst/intro/getting-started.rst +++ b/akka-docs/rst/intro/getting-started.rst @@ -48,9 +48,6 @@ Akka is very modular and consists of several JARs containing different features. - ``akka-testkit`` – Toolkit for testing Actor systems -- ``akka-transactor`` – Transactors - transactional actors, integrated with - Scala STM - - ``akka-zeromq`` – ZeroMQ integration In addition to these stable modules there are several which are on their way diff --git a/akka-docs/rst/intro/what-is-akka.rst b/akka-docs/rst/intro/what-is-akka.rst index 3f89bcfa6e..604eea2169 100644 --- a/akka-docs/rst/intro/what-is-akka.rst +++ b/akka-docs/rst/intro/what-is-akka.rst @@ -54,14 +54,6 @@ interactions of actors use pure message passing and everything is asynchronous. For an overview of the remoting see :ref:`remoting` -Transactors ------------ - -Transactors combine actors and Software Transactional Memory (STM) into transactional actors. -It allows you to compose atomic message flows with automatic retry and rollback. - -See :ref:`Transactors (Scala) ` and :ref:`Transactors (Java) ` - Persistence ----------- diff --git a/akka-docs/rst/java/code/docs/transactor/CoordinatedCounter.java b/akka-docs/rst/java/code/docs/transactor/CoordinatedCounter.java deleted file mode 100644 index b9ab6a523d..0000000000 --- a/akka-docs/rst/java/code/docs/transactor/CoordinatedCounter.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ - -package docs.transactor; - -//#class -import akka.actor.*; -import akka.transactor.*; -import scala.concurrent.stm.Ref; -import scala.concurrent.stm.japi.STM; - -public class CoordinatedCounter extends UntypedActor { - private Ref.View count = STM.newRef(0); - - public void onReceive(Object incoming) throws Exception { - if (incoming instanceof Coordinated) { - Coordinated coordinated = (Coordinated) incoming; - Object message = coordinated.getMessage(); - if (message instanceof Increment) { - Increment increment = (Increment) message; - if (increment.hasFriend()) { - increment.getFriend().tell( - coordinated.coordinate(new Increment()), getSelf()); - } - coordinated.atomic(new Runnable() { - public void run() { - STM.increment(count, 1); - } - }); - } - } else if ("GetCount".equals(incoming)) { - getSender().tell(count.get(), getSelf()); - } else { - unhandled(incoming); - } - } -} -//#class diff --git a/akka-docs/rst/java/code/docs/transactor/Coordinator.java b/akka-docs/rst/java/code/docs/transactor/Coordinator.java deleted file mode 100644 index 1559a7fcce..0000000000 --- a/akka-docs/rst/java/code/docs/transactor/Coordinator.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ - -package docs.transactor; - -import akka.actor.*; -import akka.transactor.*; - -public class Coordinator extends UntypedActor { - public void onReceive(Object incoming) throws Exception { - if (incoming instanceof Coordinated) { - Coordinated coordinated = (Coordinated) incoming; - Object message = coordinated.getMessage(); - if (message instanceof Message) { - //#coordinated-atomic - coordinated.atomic(new Runnable() { - public void run() { - // do something in the coordinated transaction ... - } - }); - //#coordinated-atomic - } - } else { - unhandled(incoming); - } - } -} diff --git a/akka-docs/rst/java/code/docs/transactor/Counter.java b/akka-docs/rst/java/code/docs/transactor/Counter.java deleted file mode 100644 index 274fd6496f..0000000000 --- a/akka-docs/rst/java/code/docs/transactor/Counter.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ - -package docs.transactor; - -//#class -import akka.transactor.*; -import scala.concurrent.stm.Ref; -import scala.concurrent.stm.japi.STM; - -public class Counter extends UntypedTransactor { - Ref.View count = STM.newRef(0); - - public void atomically(Object message) { - if (message instanceof Increment) { - STM.increment(count, 1); - } - } - - @Override public boolean normally(Object message) { - if ("GetCount".equals(message)) { - getSender().tell(count.get(), getSelf()); - return true; - } else return false; - } -} -//#class diff --git a/akka-docs/rst/java/code/docs/transactor/FriendlyCounter.java b/akka-docs/rst/java/code/docs/transactor/FriendlyCounter.java deleted file mode 100644 index 3242e006fd..0000000000 --- a/akka-docs/rst/java/code/docs/transactor/FriendlyCounter.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ - -package docs.transactor; - -//#class -import akka.transactor.*; -import java.util.Set; -import scala.concurrent.stm.Ref; -import scala.concurrent.stm.japi.STM; - -public class FriendlyCounter extends UntypedTransactor { - Ref.View count = STM.newRef(0); - - @Override public Set coordinate(Object message) { - if (message instanceof Increment) { - Increment increment = (Increment) message; - if (increment.hasFriend()) - return include(increment.getFriend(), new Increment()); - } - return nobody(); - } - - public void atomically(Object message) { - if (message instanceof Increment) { - STM.increment(count, 1); - } - } - - @Override public boolean normally(Object message) { - if ("GetCount".equals(message)) { - getSender().tell(count.get(), getSelf()); - return true; - } else return false; - } -} -//#class diff --git a/akka-docs/rst/java/code/docs/transactor/Increment.java b/akka-docs/rst/java/code/docs/transactor/Increment.java deleted file mode 100644 index 7dda01189f..0000000000 --- a/akka-docs/rst/java/code/docs/transactor/Increment.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ - -package docs.transactor; - -//#class -import akka.actor.ActorRef; - -public class Increment { - private ActorRef friend = null; - - public Increment() {} - - public Increment(ActorRef friend) { - this.friend = friend; - } - - public boolean hasFriend() { - return friend != null; - } - - public ActorRef getFriend() { - return friend; - } -} -//#class diff --git a/akka-docs/rst/java/code/docs/transactor/Message.java b/akka-docs/rst/java/code/docs/transactor/Message.java deleted file mode 100644 index aae2924558..0000000000 --- a/akka-docs/rst/java/code/docs/transactor/Message.java +++ /dev/null @@ -1,7 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ - -package docs.transactor; - -public class Message {} diff --git a/akka-docs/rst/java/code/docs/transactor/TransactorDocJavaSpec.scala b/akka-docs/rst/java/code/docs/transactor/TransactorDocJavaSpec.scala deleted file mode 100644 index 35b335c8e6..0000000000 --- a/akka-docs/rst/java/code/docs/transactor/TransactorDocJavaSpec.scala +++ /dev/null @@ -1,11 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ - -package docs.transactor - -import org.scalatest.junit.JUnitWrapperSuite - -class TransactorDocJavaSpec extends JUnitWrapperSuite( - "docs.transactor.TransactorDocTest", - Thread.currentThread.getContextClassLoader) diff --git a/akka-docs/rst/java/code/docs/transactor/TransactorDocTest.java b/akka-docs/rst/java/code/docs/transactor/TransactorDocTest.java deleted file mode 100644 index 7dbf8d9ae5..0000000000 --- a/akka-docs/rst/java/code/docs/transactor/TransactorDocTest.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ - -package docs.transactor; - -import static org.junit.Assert.*; - -import akka.testkit.JavaTestKit; -import org.junit.Test; - -//#imports -import akka.actor.*; -import scala.concurrent.Await; -import static akka.pattern.Patterns.ask; -import akka.transactor.Coordinated; -import akka.util.Timeout; -import static java.util.concurrent.TimeUnit.SECONDS; -//#imports - -public class TransactorDocTest { - - @Test - public void coordinatedExample() throws Exception { - //#coordinated-example - ActorSystem system = ActorSystem.create("CoordinatedExample"); - - ActorRef counter1 = system.actorOf(Props.create(CoordinatedCounter.class)); - ActorRef counter2 = system.actorOf(Props.create(CoordinatedCounter.class)); - - Timeout timeout = new Timeout(5, SECONDS); - - counter1.tell(new Coordinated(new Increment(counter2), timeout), ActorRef.noSender()); - - Integer count = (Integer) Await.result( - ask(counter1, "GetCount", timeout), timeout.duration()); - //#coordinated-example - - assertEquals(count, new Integer(1)); - - JavaTestKit.shutdownActorSystem(system); - } - - @Test - public void coordinatedApi() { - //#create-coordinated - Timeout timeout = new Timeout(5, SECONDS); - Coordinated coordinated = new Coordinated(timeout); - //#create-coordinated - - ActorSystem system = ActorSystem.create("CoordinatedApi"); - ActorRef actor = system.actorOf(Props.create(Coordinator.class)); - - //#send-coordinated - actor.tell(new Coordinated(new Message(), timeout), ActorRef.noSender()); - //#send-coordinated - - //#include-coordinated - actor.tell(coordinated.coordinate(new Message()), ActorRef.noSender()); - //#include-coordinated - - coordinated.await(); - - JavaTestKit.shutdownActorSystem(system); - } - - @Test - public void counterTransactor() throws Exception { - ActorSystem system = ActorSystem.create("CounterTransactor"); - ActorRef counter = system.actorOf(Props.create(Counter.class)); - - Timeout timeout = new Timeout(5, SECONDS); - Coordinated coordinated = new Coordinated(timeout); - counter.tell(coordinated.coordinate(new Increment()), ActorRef.noSender()); - coordinated.await(); - - Integer count = (Integer) Await.result(ask(counter, "GetCount", timeout), timeout.duration()); - assertEquals(count, new Integer(1)); - - JavaTestKit.shutdownActorSystem(system); - } - - @Test - public void friendlyCounterTransactor() throws Exception { - ActorSystem system = ActorSystem.create("FriendlyCounterTransactor"); - ActorRef friend = system.actorOf(Props.create(Counter.class)); - ActorRef friendlyCounter = system.actorOf(Props.create(FriendlyCounter.class)); - - Timeout timeout = new Timeout(5, SECONDS); - Coordinated coordinated = new Coordinated(timeout); - friendlyCounter.tell(coordinated.coordinate(new Increment(friend)), ActorRef.noSender()); - coordinated.await(); - - Integer count1 = (Integer) Await.result(ask(friendlyCounter, "GetCount", timeout), timeout.duration()); - assertEquals(count1, new Integer(1)); - - Integer count2 = (Integer) Await.result(ask(friend, "GetCount", timeout), timeout.duration()); - assertEquals(count2, new Integer(1)); - - JavaTestKit.shutdownActorSystem(system); - } -} diff --git a/akka-docs/rst/java/index-futures.rst b/akka-docs/rst/java/index-futures.rst index 7768a39f41..59b846d0d6 100644 --- a/akka-docs/rst/java/index-futures.rst +++ b/akka-docs/rst/java/index-futures.rst @@ -5,6 +5,4 @@ Futures and Agents :maxdepth: 2 futures - stm agents - transactors diff --git a/akka-docs/rst/java/stm.rst b/akka-docs/rst/java/stm.rst deleted file mode 100644 index 787e504801..0000000000 --- a/akka-docs/rst/java/stm.rst +++ /dev/null @@ -1,60 +0,0 @@ - -.. _stm-java: - -##################################### - Software Transactional Memory -##################################### - - -Overview of STM -=============== - -An `STM `_ turns the -Java heap into a transactional data set with begin/commit/rollback -semantics. Very much like a regular database. It implements the first three -letters in `ACID`_; ACI: - -* Atomic -* Consistent -* Isolated - -.. _ACID: http://en.wikipedia.org/wiki/ACID - -Generally, the STM is not needed very often when working with Akka. Some -use-cases (that we can think of) are: - -- When you really need composable message flows across many actors updating - their **internal local** state but need them to do that atomically in one big - transaction. Might not be often, but when you do need this then you are - screwed without it. -- When you want to share a datastructure across actors. - -The use of STM in Akka is inspired by the concepts and views in `Clojure`_\'s -STM. Please take the time to read `this excellent document`_ about state in -clojure and view `this presentation`_ by Rich Hickey (the genius behind -Clojure). - -.. _Clojure: http://clojure.org/ -.. _this excellent document: http://clojure.org/state -.. _this presentation: http://www.infoq.com/presentations/Value-Identity-State-Rich-Hickey - - -Scala STM -========= - -The STM supported in Akka is `ScalaSTM`_ which will be soon included in the -Scala standard library. - -.. _ScalaSTM: http://nbronson.github.com/scala-stm/ - -The STM is based on Transactional References (referred to as Refs). Refs are -memory cells, holding an (arbitrary) immutable value, that implement CAS -(Compare-And-Swap) semantics and are managed and enforced by the STM for -coordinated changes across many Refs. - - -Integration with Actors -======================= - -In Akka we've also integrated Actors and STM in :ref:`agents-java` and -:ref:`transactors-java`. diff --git a/akka-docs/rst/java/transactors.rst b/akka-docs/rst/java/transactors.rst deleted file mode 100644 index f729c5f781..0000000000 --- a/akka-docs/rst/java/transactors.rst +++ /dev/null @@ -1,153 +0,0 @@ -.. _transactors-java: - -#################### - Transactors -#################### - - -Why Transactors? -================ - -Actors are excellent for solving problems where you have many independent -processes that can work in isolation and only interact with other Actors through -message passing. This model fits many problems. But the actor model is -unfortunately a terrible model for implementing truly shared state. E.g. when -you need to have consensus and a stable view of state across many -components. The classic example is the bank account where clients can deposit -and withdraw, in which each operation needs to be atomic. For detailed -discussion on the topic see `this JavaOne presentation -`_. - -STM on the other hand is excellent for problems where you need consensus and a -stable view of the state by providing compositional transactional shared -state. Some of the really nice traits of STM are that transactions compose, and -it raises the abstraction level from lock-based concurrency. - -Akka's Transactors combine Actors and STM to provide the best of the Actor model -(concurrency and asynchronous event-based programming) and STM (compositional -transactional shared state) by providing transactional, compositional, -asynchronous, event-based message flows. - -Generally, the STM is not needed very often when working with Akka. Some -use-cases (that we can think of) are: - -- When you really need composable message flows across many actors updating - their **internal local** state but need them to do that atomically in one big - transaction. Might not be often but when you do need this then you are - screwed without it. - -- When you want to share a datastructure across actors. - - -Actors and STM -============== - -You can combine Actors and STM in several ways. An Actor may use STM internally -so that particular changes are guaranteed to be atomic. Actors may also share -transactional datastructures as the STM provides safe shared state across -threads. - -It's also possible to coordinate transactions across Actors or threads so that -either the transactions in a set all commit successfully or they all fail. This -is the focus of Transactors and the explicit support for coordinated -transactions in this section. - - -Coordinated transactions -======================== - -Akka provides an explicit mechanism for coordinating transactions across -actors. Under the hood it uses a ``CommitBarrier``, similar to a CountDownLatch. - -Here is an example of coordinating two simple counter UntypedActors so that they -both increment together in coordinated transactions. If one of them was to fail -to increment, the other would also fail. - -.. includecode:: code/docs/transactor/Increment.java#class - :language: java - -.. includecode:: code/docs/transactor/CoordinatedCounter.java#class - :language: java - -.. includecode:: code/docs/transactor/TransactorDocTest.java#imports - :language: java - -.. includecode:: code/docs/transactor/TransactorDocTest.java#coordinated-example - :language: java - -To start a new coordinated transaction that you will also participate in, create -a ``Coordinated`` object, passing in a ``Timeout``: - -.. includecode:: code/docs/transactor/TransactorDocTest.java#create-coordinated - :language: java - -To start a coordinated transaction that you won't participate in yourself you -can create a ``Coordinated`` object with a message and send it directly to an -actor. The recipient of the message will be the first member of the coordination -set: - -.. includecode:: code/docs/transactor/TransactorDocTest.java#send-coordinated - :language: java - -To include another actor in the same coordinated transaction that you've created -or received, use the ``coordinate`` method on that object. This will increment -the number of parties involved by one and create a new ``Coordinated`` object to -be sent. - -.. includecode:: code/docs/transactor/TransactorDocTest.java#include-coordinated - :language: java - -To enter the coordinated transaction use the atomic method of the coordinated -object, passing in a ``java.lang.Runnable``. - -.. includecode:: code/docs/transactor/Coordinator.java#coordinated-atomic - :language: java - -The coordinated transaction will wait for the other transactions before -committing. If any of the coordinated transactions fail then they all fail. - -.. note:: - - The same actor should not be added to a coordinated transaction more than - once. The transaction will not be able to complete as an actor only processes - a single message at a time. When processing the first message the coordinated - transaction will wait for the commit barrier, which in turn needs the second - message to be received to proceed. - - -UntypedTransactor -================= - -UntypedTransactors are untyped actors that provide a general pattern for -coordinating transactions, using the explicit coordination described above. - -Here's an example of a simple untyped transactor that will join a coordinated -transaction: - -.. includecode:: code/docs/transactor/Counter.java#class - :language: java - -You could send this Counter transactor a ``Coordinated(Increment)`` message. If -you were to send it just an ``Increment`` message it will create its own -``Coordinated`` (but in this particular case wouldn't be coordinating -transactions with any other transactors). - -To coordinate with other transactors override the ``coordinate`` method. The -``coordinate`` method maps a message to a set of ``SendTo`` objects, pairs of -``ActorRef`` and a message. You can use the ``include`` and ``sendTo`` methods -to easily coordinate with other transactors. - -Here's an example of coordinating an increment, using an untyped transactor, -similar to the explicitly coordinated example above. - -.. includecode:: code/docs/transactor/FriendlyCounter.java#class - :language: java - -To execute directly before or after the coordinated transaction, override the -``before`` and ``after`` methods. They do not execute within the transaction. - -To completely bypass coordinated transactions override the ``normally`` -method. Any message matched by ``normally`` will not be matched by the other -methods, and will not be involved in coordinated transactions. In this method -you can implement normal actor behavior, or use the normal STM atomic for local -transactions. diff --git a/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst b/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst index c449360c2e..422223f316 100644 --- a/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst +++ b/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst @@ -118,6 +118,12 @@ Deprecated STM Support for Agents Agents participating in enclosing STM transaction is a deprecated feature. +Transactor Module is Deprecated +=============================== + +The integration between actors and STM in the module ``akka-transactor`` is deprecated and will be +removed in a future version. + Removed Deprecated Features =========================== @@ -127,3 +133,4 @@ The following, previously deprecated, features have been removed: * `API changes to FSM and TestFSMRef `_ * DefaultScheduler superseded by LightArrayRevolverScheduler + diff --git a/akka-docs/rst/scala/code/docs/transactor/TransactorDocSpec.scala b/akka-docs/rst/scala/code/docs/transactor/TransactorDocSpec.scala deleted file mode 100644 index 645e4a744e..0000000000 --- a/akka-docs/rst/scala/code/docs/transactor/TransactorDocSpec.scala +++ /dev/null @@ -1,234 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ - -package docs.transactor - -import language.postfixOps - -import akka.actor._ -import akka.transactor._ -import scala.concurrent.duration._ -import akka.util.Timeout -import akka.testkit._ -import scala.concurrent.stm._ - -object CoordinatedExample { - //#coordinated-example - import akka.actor._ - import akka.transactor._ - import scala.concurrent.stm._ - - case class Increment(friend: Option[ActorRef] = None) - case object GetCount - - class Counter extends Actor { - val count = Ref(0) - - def receive = { - case coordinated @ Coordinated(Increment(friend)) => { - friend foreach (_ ! coordinated(Increment())) - coordinated atomic { implicit t => - count transform (_ + 1) - } - } - case GetCount => sender ! count.single.get - } - } - //#coordinated-example -} - -object CoordinatedApi { - case object Message - - class Coordinator extends Actor { - //#receive-coordinated - def receive = { - case coordinated @ Coordinated(Message) => { - //#coordinated-atomic - coordinated atomic { implicit t => - // do something in the coordinated transaction ... - } - //#coordinated-atomic - } - } - //#receive-coordinated - } -} - -object CounterExample { - //#counter-example - import akka.transactor._ - import scala.concurrent.stm._ - - case object Increment - - class Counter extends Transactor { - val count = Ref(0) - - def atomically = implicit txn => { - case Increment => count transform (_ + 1) - } - } - //#counter-example -} - -object FriendlyCounterExample { - //#friendly-counter-example - import akka.actor._ - import akka.transactor._ - import scala.concurrent.stm._ - - case object Increment - - class FriendlyCounter(friend: ActorRef) extends Transactor { - val count = Ref(0) - - override def coordinate = { - case Increment => include(friend) - } - - def atomically = implicit txn => { - case Increment => count transform (_ + 1) - } - } - //#friendly-counter-example - - class Friend extends Transactor { - val count = Ref(0) - - def atomically = implicit txn => { - case Increment => count transform (_ + 1) - } - } -} - -// Only checked for compilation -object TransactorCoordinate { - case object Message - case object SomeMessage - case object SomeOtherMessage - case object OtherMessage - case object Message1 - case object Message2 - - class TestCoordinateInclude(actor1: ActorRef, actor2: ActorRef, actor3: ActorRef) extends Transactor { - //#coordinate-include - override def coordinate = { - case Message => include(actor1, actor2, actor3) - } - //#coordinate-include - - def atomically = txn => doNothing - } - - class TestCoordinateSendTo(someActor: ActorRef, actor1: ActorRef, actor2: ActorRef) extends Transactor { - //#coordinate-sendto - override def coordinate = { - case SomeMessage => sendTo(someActor -> SomeOtherMessage) - case OtherMessage => sendTo(actor1 -> Message1, actor2 -> Message2) - } - //#coordinate-sendto - - def atomically = txn => doNothing - } -} - -class TransactorDocSpec extends AkkaSpec { - - "coordinated example" in { - import CoordinatedExample._ - - //#run-coordinated-example - import scala.concurrent.Await - import scala.concurrent.duration._ - import akka.util.Timeout - import akka.pattern.ask - - val system = ActorSystem("app") - - val counter1 = system.actorOf(Props[Counter], name = "counter1") - val counter2 = system.actorOf(Props[Counter], name = "counter2") - - implicit val timeout = Timeout(5 seconds) - - counter1 ! Coordinated(Increment(Some(counter2))) - - val count = Await.result(counter1 ? GetCount, timeout.duration) - - // count == 1 - //#run-coordinated-example - - count must be === 1 - - shutdown(system) - } - - "coordinated api" in { - import CoordinatedApi._ - - //#implicit-timeout - import scala.concurrent.duration._ - import akka.util.Timeout - - implicit val timeout = Timeout(5 seconds) - //#implicit-timeout - - //#create-coordinated - val coordinated = Coordinated() - //#create-coordinated - - val system = ActorSystem("coordinated") - val actor = system.actorOf(Props[Coordinator], name = "coordinator") - - //#send-coordinated - actor ! Coordinated(Message) - //#send-coordinated - - //#include-coordinated - actor ! coordinated(Message) - //#include-coordinated - - coordinated.await() - - shutdown(system) - } - - "counter transactor" in { - import CounterExample._ - - val system = ActorSystem("transactors") - - // FIXME, or remove the whole transactor module, srsly - lazy val underlyingCounter = new Counter - val counter = system.actorOf(Props(underlyingCounter), name = "counter") - val coordinated = Coordinated()(Timeout(5 seconds)) - counter ! coordinated(Increment) - coordinated.await() - - underlyingCounter.count.single.get must be === 1 - - shutdown(system) - } - - "friendly counter transactor" in { - import FriendlyCounterExample._ - - val system = ActorSystem("transactors") - - lazy val underlyingFriend = new Friend - val friend = system.actorOf(Props(underlyingFriend), name = "friend") - - lazy val underlyingFriendlyCounter = new FriendlyCounter(friend) - val friendlyCounter = system.actorOf(Props(underlyingFriendlyCounter), name = "friendly") - - val coordinated = Coordinated()(Timeout(5 seconds)) - friendlyCounter ! coordinated(Increment) - coordinated.await() - - underlyingFriendlyCounter.count.single.get must be === 1 - underlyingFriend.count.single.get must be === 1 - - shutdown(system) - } -} diff --git a/akka-docs/rst/scala/index-futures.rst b/akka-docs/rst/scala/index-futures.rst index 7768a39f41..59b846d0d6 100644 --- a/akka-docs/rst/scala/index-futures.rst +++ b/akka-docs/rst/scala/index-futures.rst @@ -5,6 +5,4 @@ Futures and Agents :maxdepth: 2 futures - stm agents - transactors diff --git a/akka-docs/rst/scala/stm.rst b/akka-docs/rst/scala/stm.rst deleted file mode 100644 index 972fa47b04..0000000000 --- a/akka-docs/rst/scala/stm.rst +++ /dev/null @@ -1,75 +0,0 @@ - -.. _stm-scala: - -####################################### - Software Transactional Memory -####################################### - - -Overview of STM -=============== - -An `STM `_ turns the -Java heap into a transactional data set with begin/commit/rollback -semantics. Very much like a regular database. It implements the first three -letters in `ACID`_; ACI: - -* Atomic -* Consistent -* Isolated - -.. _ACID: http://en.wikipedia.org/wiki/ACID - -Generally, the STM is not needed very often when working with Akka. Some -use-cases (that we can think of) are: - -- When you really need composable message flows across many actors updating - their **internal local** state but need them to do that atomically in one big - transaction. Might not be often, but when you do need this then you are - screwed without it. -- When you want to share a datastructure across actors. - -The use of STM in Akka is inspired by the concepts and views in `Clojure`_\'s -STM. Please take the time to read `this excellent document`_ about state in -clojure and view `this presentation`_ by Rich Hickey (the genius behind -Clojure). - -.. _Clojure: http://clojure.org/ -.. _this excellent document: http://clojure.org/state -.. _this presentation: http://www.infoq.com/presentations/Value-Identity-State-Rich-Hickey - - -Scala STM -========= - -The STM supported in Akka is `ScalaSTM`_ which will be soon included in the -Scala standard library. - -.. _ScalaSTM: http://nbronson.github.com/scala-stm/ - -The STM is based on Transactional References (referred to as Refs). Refs are -memory cells, holding an (arbitrary) immutable value, that implement CAS -(Compare-And-Swap) semantics and are managed and enforced by the STM for -coordinated changes across many Refs. - - -Persistent Datastructures -========================= - -Working with immutable collections can sometimes give bad performance due to -extensive copying. Scala provides so-called persistent datastructures which -makes working with immutable collections fast. They are immutable but with -constant time access and modification. They use structural sharing and an insert -or update does not ruin the old structure, hence "persistent". Makes working -with immutable composite types fast. The persistent datastructures currently -consist of a `Map`_ and `Vector`_. - -.. _Map: http://www.scala-lang.org/api/current/index.html#scala.collection.immutable.Map -.. _Vector: http://www.scala-lang.org/api/current/index.html#scala.collection.immutable.Vector - - -Integration with Actors -======================= - -In Akka we've also integrated Actors and STM in :ref:`agents-scala` and -:ref:`transactors-scala`. diff --git a/akka-docs/rst/scala/transactors.rst b/akka-docs/rst/scala/transactors.rst deleted file mode 100644 index da5b0415e7..0000000000 --- a/akka-docs/rst/scala/transactors.rst +++ /dev/null @@ -1,163 +0,0 @@ -.. _transactors-scala: - -##################### - Transactors -##################### - - -Why Transactors? -================ - -Actors are excellent for solving problems where you have many independent -processes that can work in isolation and only interact with other Actors through -message passing. This model fits many problems. But the actor model is -unfortunately a terrible model for implementing truly shared state. E.g. when -you need to have consensus and a stable view of state across many -components. The classic example is the bank account where clients can deposit -and withdraw, in which each operation needs to be atomic. For detailed -discussion on the topic see `this JavaOne presentation -`_. - -STM on the other hand is excellent for problems where you need consensus and a -stable view of the state by providing compositional transactional shared -state. Some of the really nice traits of STM are that transactions compose, and -it raises the abstraction level from lock-based concurrency. - -Akka's Transactors combine Actors and STM to provide the best of the Actor model -(concurrency and asynchronous event-based programming) and STM (compositional -transactional shared state) by providing transactional, compositional, -asynchronous, event-based message flows. - -Generally, the STM is not needed very often when working with Akka. Some -use-cases (that we can think of) are: - -- When you really need composable message flows across many actors updating - their **internal local** state but need them to do that atomically in one big - transaction. Might not be often but when you do need this then you are - screwed without it. - -- When you want to share a datastructure across actors. - - -Actors and STM -============== - -You can combine Actors and STM in several ways. An Actor may use STM internally -so that particular changes are guaranteed to be atomic. Actors may also share -transactional datastructures as the STM provides safe shared state across -threads. - -It's also possible to coordinate transactions across Actors or threads so that -either the transactions in a set all commit successfully or they all fail. This -is the focus of Transactors and the explicit support for coordinated -transactions in this section. - - -Coordinated transactions -======================== - -Akka provides an explicit mechanism for coordinating transactions across -Actors. Under the hood it uses a ``CommitBarrier``, similar to a CountDownLatch. - -Here is an example of coordinating two simple counter Actors so that they both -increment together in coordinated transactions. If one of them was to fail to -increment, the other would also fail. - -.. includecode:: code/docs/transactor/TransactorDocSpec.scala#coordinated-example - -.. includecode:: code/docs/transactor/TransactorDocSpec.scala#run-coordinated-example - -Note that creating a ``Coordinated`` object requires a ``Timeout`` to be -specified for the coordinated transaction. This can be done implicitly, by -having an implicit ``Timeout`` in scope, or explicitly, by passing the timeout -when creating a a ``Coordinated`` object. Here's an example of specifying an -implicit timeout: - -.. includecode:: code/docs/transactor/TransactorDocSpec.scala#implicit-timeout - -To start a new coordinated transaction that you will also participate in, just -create a ``Coordinated`` object (this assumes an implicit timeout): - -.. includecode:: code/docs/transactor/TransactorDocSpec.scala#create-coordinated - -To start a coordinated transaction that you won't participate in yourself you -can create a ``Coordinated`` object with a message and send it directly to an -actor. The recipient of the message will be the first member of the coordination -set: - -.. includecode:: code/docs/transactor/TransactorDocSpec.scala#send-coordinated - -To receive a coordinated message in an actor simply match it in a case -statement: - -.. includecode:: code/docs/transactor/TransactorDocSpec.scala#receive-coordinated - :exclude: coordinated-atomic - -To include another actor in the same coordinated transaction that you've created -or received, use the apply method on that object. This will increment the number -of parties involved by one and create a new ``Coordinated`` object to be sent. - -.. includecode:: code/docs/transactor/TransactorDocSpec.scala#include-coordinated - -To enter the coordinated transaction use the atomic method of the coordinated -object: - -.. includecode:: code/docs/transactor/TransactorDocSpec.scala#coordinated-atomic - -The coordinated transaction will wait for the other transactions before -committing. If any of the coordinated transactions fail then they all fail. - -.. note:: - - The same actor should not be added to a coordinated transaction more than - once. The transaction will not be able to complete as an actor only processes - a single message at a time. When processing the first message the coordinated - transaction will wait for the commit barrier, which in turn needs the second - message to be received to proceed. - - -Transactor -========== - -Transactors are actors that provide a general pattern for coordinating -transactions, using the explicit coordination described above. - -Here's an example of a simple transactor that will join a coordinated -transaction: - -.. includecode:: code/docs/transactor/TransactorDocSpec.scala#counter-example - -You could send this Counter transactor a ``Coordinated(Increment)`` message. If -you were to send it just an ``Increment`` message it will create its own -``Coordinated`` (but in this particular case wouldn't be coordinating -transactions with any other transactors). - -To coordinate with other transactors override the ``coordinate`` method. The -``coordinate`` method maps a message to a set of ``SendTo`` objects, pairs of -``ActorRef`` and a message. You can use the ``include`` and ``sendTo`` methods -to easily coordinate with other transactors. The ``include`` method will send on -the same message that was received to other transactors. The ``sendTo`` method -allows you to specify both the actor to send to, and the message to send. - -Example of coordinating an increment: - -.. includecode:: code/docs/transactor/TransactorDocSpec.scala#friendly-counter-example - -Using ``include`` to include more than one transactor: - -.. includecode:: code/docs/transactor/TransactorDocSpec.scala#coordinate-include - -Using ``sendTo`` to coordinate transactions but pass-on a different message than -the one that was received: - -.. includecode:: code/docs/transactor/TransactorDocSpec.scala#coordinate-sendto - -To execute directly before or after the coordinated transaction, override the -``before`` and ``after`` methods. These methods also expect partial functions -like the receive method. They do not execute within the transaction. - -To completely bypass coordinated transactions override the ``normally`` -method. Any message matched by ``normally`` will not be matched by the other -methods, and will not be involved in coordinated transactions. In this method -you can implement normal actor behavior, or use the normal STM atomic for local -transactions. diff --git a/akka-transactor/src/main/resources/reference.conf b/akka-transactor/src/main/resources/reference.conf index d1216bd73e..02f9aef270 100644 --- a/akka-transactor/src/main/resources/reference.conf +++ b/akka-transactor/src/main/resources/reference.conf @@ -6,6 +6,7 @@ # Make your edits/overrides in your application.conf. akka { + # akka.transactor is deprecated in 2.3 transactor { # The timeout used for coordinated transactions across actors coordinated-timeout = 5s diff --git a/akka-transactor/src/main/scala/akka/transactor/Coordinated.scala b/akka-transactor/src/main/scala/akka/transactor/Coordinated.scala index 0ae7f0b603..7aa9d6bed3 100644 --- a/akka-transactor/src/main/scala/akka/transactor/Coordinated.scala +++ b/akka-transactor/src/main/scala/akka/transactor/Coordinated.scala @@ -12,6 +12,7 @@ import java.util.concurrent.Callable /** * Akka-specific exception for coordinated transactions. */ +@deprecated("akka.transactor will be removed", "2.3") class CoordinatedTransactionException(message: String, cause: Throwable) extends AkkaException(message, cause) { def this(msg: String) = this(msg, null) } @@ -19,6 +20,7 @@ class CoordinatedTransactionException(message: String, cause: Throwable) extends /** * Coordinated transactions across actors. */ +@deprecated("akka.transactor will be removed", "2.3") object Coordinated { /** @@ -97,6 +99,7 @@ object Coordinated { * * @see [[akka.transactor.Transactor]] for an actor that implements coordinated transactions */ +@deprecated("akka.transactor will be removed", "2.3") class Coordinated(val message: Any, member: CommitBarrier.Member) { // Java API constructors diff --git a/akka-transactor/src/main/scala/akka/transactor/Transactor.scala b/akka-transactor/src/main/scala/akka/transactor/Transactor.scala index 71578d5ca9..cd8e25586c 100644 --- a/akka-transactor/src/main/scala/akka/transactor/Transactor.scala +++ b/akka-transactor/src/main/scala/akka/transactor/Transactor.scala @@ -12,6 +12,7 @@ import scala.concurrent.stm.InTxn /** * Used for specifying actor refs and messages to send to during coordination. */ +@deprecated("akka.transactor will be removed", "2.3") case class SendTo(actor: ActorRef, message: Option[Any] = None) /** @@ -94,6 +95,7 @@ case class SendTo(actor: ActorRef, message: Option[Any] = None) * * @see [[akka.transactor.Coordinated]] */ +@deprecated("akka.transactor will be removed", "2.3") trait Transactor extends Actor { private val settings = TransactorExtension(context.system) @@ -181,6 +183,9 @@ trait Transactor extends Actor { def doNothing: Receive = EmptyReceive } +/** + * INTERNAL API + */ private[akka] object EmptyReceive extends PartialFunction[Any, Unit] { def apply(any: Any): Unit = () def isDefinedAt(any: Any): Boolean = false diff --git a/akka-transactor/src/main/scala/akka/transactor/TransactorExtension.scala b/akka-transactor/src/main/scala/akka/transactor/TransactorExtension.scala index 926dbfe6b1..15b1ab8e56 100644 --- a/akka-transactor/src/main/scala/akka/transactor/TransactorExtension.scala +++ b/akka-transactor/src/main/scala/akka/transactor/TransactorExtension.scala @@ -13,12 +13,14 @@ import java.util.concurrent.TimeUnit.MILLISECONDS /** * TransactorExtension is an Akka Extension to hold settings for transactors. */ +@deprecated("akka.transactor will be removed", "2.3") object TransactorExtension extends ExtensionId[TransactorSettings] with ExtensionIdProvider { override def get(system: ActorSystem): TransactorSettings = super.get(system) override def lookup: TransactorExtension.type = TransactorExtension override def createExtension(system: ExtendedActorSystem): TransactorSettings = new TransactorSettings(system.settings.config) } +@deprecated("akka.transactor will be removed", "2.3") class TransactorSettings(val config: Config) extends Extension { import config._ val CoordinatedTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.transactor.coordinated-timeout"), MILLISECONDS)) diff --git a/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala b/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala index 91bedf1832..6f9227b03a 100644 --- a/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala +++ b/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala @@ -11,6 +11,7 @@ import java.util.Collections.{ emptySet, singleton ⇒ singletonSet } /** * An UntypedActor version of transactor for using from Java. */ +@deprecated("akka.transactor will be removed", "2.3") abstract class UntypedTransactor extends UntypedActor { import scala.collection.JavaConverters.asScalaSetConverter diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 406ab712b8..1c781823f2 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -619,7 +619,7 @@ object AkkaBuild extends Build { id = "akka-docs", base = file("akka-docs"), dependencies = Seq(actor, testkit % "test->test", channels, - remote % "compile;test->test", cluster, slf4j, agent, transactor, zeroMQ, camel, osgi, osgiAries, + remote % "compile;test->test", cluster, slf4j, agent, zeroMQ, camel, osgi, osgiAries, persistence % "compile;test->test"), settings = defaultSettings ++ docFormatSettings ++ site.settings ++ site.sphinxSupport() ++ site.publishSite ++ sphinxPreprocessing ++ cpsPlugin ++ Seq( sourceDirectory in Sphinx <<= baseDirectory / "rst",