=tra #3668 Deprecate transactors

This commit is contained in:
Patrik Nordwall 2013-12-11 14:44:45 +01:00
parent b9c62eed61
commit dd3d3da452
25 changed files with 20 additions and 987 deletions

View file

@ -379,12 +379,6 @@ akka-cluster
.. literalinclude:: ../../../akka-cluster/src/main/resources/reference.conf .. literalinclude:: ../../../akka-cluster/src/main/resources/reference.conf
:language: none :language: none
akka-transactor
~~~~~~~~~~~~~~~
.. literalinclude:: ../../../akka-transactor/src/main/resources/reference.conf
:language: none
akka-agent akka-agent
~~~~~~~~~~ ~~~~~~~~~~

View file

@ -48,9 +48,6 @@ Akka is very modular and consists of several JARs containing different features.
- ``akka-testkit`` Toolkit for testing Actor systems - ``akka-testkit`` Toolkit for testing Actor systems
- ``akka-transactor`` Transactors - transactional actors, integrated with
Scala STM
- ``akka-zeromq`` ZeroMQ integration - ``akka-zeromq`` ZeroMQ integration
In addition to these stable modules there are several which are on their way In addition to these stable modules there are several which are on their way

View file

@ -54,14 +54,6 @@ interactions of actors use pure message passing and everything is asynchronous.
For an overview of the remoting see :ref:`remoting` For an overview of the remoting see :ref:`remoting`
Transactors
-----------
Transactors combine actors and Software Transactional Memory (STM) into transactional actors.
It allows you to compose atomic message flows with automatic retry and rollback.
See :ref:`Transactors (Scala) <transactors-scala>` and :ref:`Transactors (Java) <transactors-java>`
Persistence Persistence
----------- -----------

View file

@ -1,39 +0,0 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.transactor;
//#class
import akka.actor.*;
import akka.transactor.*;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.STM;
public class CoordinatedCounter extends UntypedActor {
private Ref.View<Integer> count = STM.newRef(0);
public void onReceive(Object incoming) throws Exception {
if (incoming instanceof Coordinated) {
Coordinated coordinated = (Coordinated) incoming;
Object message = coordinated.getMessage();
if (message instanceof Increment) {
Increment increment = (Increment) message;
if (increment.hasFriend()) {
increment.getFriend().tell(
coordinated.coordinate(new Increment()), getSelf());
}
coordinated.atomic(new Runnable() {
public void run() {
STM.increment(count, 1);
}
});
}
} else if ("GetCount".equals(incoming)) {
getSender().tell(count.get(), getSelf());
} else {
unhandled(incoming);
}
}
}
//#class

View file

@ -1,28 +0,0 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.transactor;
import akka.actor.*;
import akka.transactor.*;
public class Coordinator extends UntypedActor {
public void onReceive(Object incoming) throws Exception {
if (incoming instanceof Coordinated) {
Coordinated coordinated = (Coordinated) incoming;
Object message = coordinated.getMessage();
if (message instanceof Message) {
//#coordinated-atomic
coordinated.atomic(new Runnable() {
public void run() {
// do something in the coordinated transaction ...
}
});
//#coordinated-atomic
}
} else {
unhandled(incoming);
}
}
}

View file

@ -1,28 +0,0 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.transactor;
//#class
import akka.transactor.*;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.STM;
public class Counter extends UntypedTransactor {
Ref.View<Integer> count = STM.newRef(0);
public void atomically(Object message) {
if (message instanceof Increment) {
STM.increment(count, 1);
}
}
@Override public boolean normally(Object message) {
if ("GetCount".equals(message)) {
getSender().tell(count.get(), getSelf());
return true;
} else return false;
}
}
//#class

View file

@ -1,38 +0,0 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.transactor;
//#class
import akka.transactor.*;
import java.util.Set;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.STM;
public class FriendlyCounter extends UntypedTransactor {
Ref.View<Integer> count = STM.newRef(0);
@Override public Set<SendTo> coordinate(Object message) {
if (message instanceof Increment) {
Increment increment = (Increment) message;
if (increment.hasFriend())
return include(increment.getFriend(), new Increment());
}
return nobody();
}
public void atomically(Object message) {
if (message instanceof Increment) {
STM.increment(count, 1);
}
}
@Override public boolean normally(Object message) {
if ("GetCount".equals(message)) {
getSender().tell(count.get(), getSelf());
return true;
} else return false;
}
}
//#class

View file

@ -1,27 +0,0 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.transactor;
//#class
import akka.actor.ActorRef;
public class Increment {
private ActorRef friend = null;
public Increment() {}
public Increment(ActorRef friend) {
this.friend = friend;
}
public boolean hasFriend() {
return friend != null;
}
public ActorRef getFriend() {
return friend;
}
}
//#class

View file

@ -1,7 +0,0 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.transactor;
public class Message {}

View file

@ -1,11 +0,0 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.transactor
import org.scalatest.junit.JUnitWrapperSuite
class TransactorDocJavaSpec extends JUnitWrapperSuite(
"docs.transactor.TransactorDocTest",
Thread.currentThread.getContextClassLoader)

View file

@ -1,102 +0,0 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.transactor;
import static org.junit.Assert.*;
import akka.testkit.JavaTestKit;
import org.junit.Test;
//#imports
import akka.actor.*;
import scala.concurrent.Await;
import static akka.pattern.Patterns.ask;
import akka.transactor.Coordinated;
import akka.util.Timeout;
import static java.util.concurrent.TimeUnit.SECONDS;
//#imports
public class TransactorDocTest {
@Test
public void coordinatedExample() throws Exception {
//#coordinated-example
ActorSystem system = ActorSystem.create("CoordinatedExample");
ActorRef counter1 = system.actorOf(Props.create(CoordinatedCounter.class));
ActorRef counter2 = system.actorOf(Props.create(CoordinatedCounter.class));
Timeout timeout = new Timeout(5, SECONDS);
counter1.tell(new Coordinated(new Increment(counter2), timeout), ActorRef.noSender());
Integer count = (Integer) Await.result(
ask(counter1, "GetCount", timeout), timeout.duration());
//#coordinated-example
assertEquals(count, new Integer(1));
JavaTestKit.shutdownActorSystem(system);
}
@Test
public void coordinatedApi() {
//#create-coordinated
Timeout timeout = new Timeout(5, SECONDS);
Coordinated coordinated = new Coordinated(timeout);
//#create-coordinated
ActorSystem system = ActorSystem.create("CoordinatedApi");
ActorRef actor = system.actorOf(Props.create(Coordinator.class));
//#send-coordinated
actor.tell(new Coordinated(new Message(), timeout), ActorRef.noSender());
//#send-coordinated
//#include-coordinated
actor.tell(coordinated.coordinate(new Message()), ActorRef.noSender());
//#include-coordinated
coordinated.await();
JavaTestKit.shutdownActorSystem(system);
}
@Test
public void counterTransactor() throws Exception {
ActorSystem system = ActorSystem.create("CounterTransactor");
ActorRef counter = system.actorOf(Props.create(Counter.class));
Timeout timeout = new Timeout(5, SECONDS);
Coordinated coordinated = new Coordinated(timeout);
counter.tell(coordinated.coordinate(new Increment()), ActorRef.noSender());
coordinated.await();
Integer count = (Integer) Await.result(ask(counter, "GetCount", timeout), timeout.duration());
assertEquals(count, new Integer(1));
JavaTestKit.shutdownActorSystem(system);
}
@Test
public void friendlyCounterTransactor() throws Exception {
ActorSystem system = ActorSystem.create("FriendlyCounterTransactor");
ActorRef friend = system.actorOf(Props.create(Counter.class));
ActorRef friendlyCounter = system.actorOf(Props.create(FriendlyCounter.class));
Timeout timeout = new Timeout(5, SECONDS);
Coordinated coordinated = new Coordinated(timeout);
friendlyCounter.tell(coordinated.coordinate(new Increment(friend)), ActorRef.noSender());
coordinated.await();
Integer count1 = (Integer) Await.result(ask(friendlyCounter, "GetCount", timeout), timeout.duration());
assertEquals(count1, new Integer(1));
Integer count2 = (Integer) Await.result(ask(friend, "GetCount", timeout), timeout.duration());
assertEquals(count2, new Integer(1));
JavaTestKit.shutdownActorSystem(system);
}
}

View file

@ -5,6 +5,4 @@ Futures and Agents
:maxdepth: 2 :maxdepth: 2
futures futures
stm
agents agents
transactors

View file

@ -1,60 +0,0 @@
.. _stm-java:
#####################################
Software Transactional Memory
#####################################
Overview of STM
===============
An `STM <http://en.wikipedia.org/wiki/Software_transactional_memory>`_ turns the
Java heap into a transactional data set with begin/commit/rollback
semantics. Very much like a regular database. It implements the first three
letters in `ACID`_; ACI:
* Atomic
* Consistent
* Isolated
.. _ACID: http://en.wikipedia.org/wiki/ACID
Generally, the STM is not needed very often when working with Akka. Some
use-cases (that we can think of) are:
- When you really need composable message flows across many actors updating
their **internal local** state but need them to do that atomically in one big
transaction. Might not be often, but when you do need this then you are
screwed without it.
- When you want to share a datastructure across actors.
The use of STM in Akka is inspired by the concepts and views in `Clojure`_\'s
STM. Please take the time to read `this excellent document`_ about state in
clojure and view `this presentation`_ by Rich Hickey (the genius behind
Clojure).
.. _Clojure: http://clojure.org/
.. _this excellent document: http://clojure.org/state
.. _this presentation: http://www.infoq.com/presentations/Value-Identity-State-Rich-Hickey
Scala STM
=========
The STM supported in Akka is `ScalaSTM`_ which will be soon included in the
Scala standard library.
.. _ScalaSTM: http://nbronson.github.com/scala-stm/
The STM is based on Transactional References (referred to as Refs). Refs are
memory cells, holding an (arbitrary) immutable value, that implement CAS
(Compare-And-Swap) semantics and are managed and enforced by the STM for
coordinated changes across many Refs.
Integration with Actors
=======================
In Akka we've also integrated Actors and STM in :ref:`agents-java` and
:ref:`transactors-java`.

View file

@ -1,153 +0,0 @@
.. _transactors-java:
####################
Transactors
####################
Why Transactors?
================
Actors are excellent for solving problems where you have many independent
processes that can work in isolation and only interact with other Actors through
message passing. This model fits many problems. But the actor model is
unfortunately a terrible model for implementing truly shared state. E.g. when
you need to have consensus and a stable view of state across many
components. The classic example is the bank account where clients can deposit
and withdraw, in which each operation needs to be atomic. For detailed
discussion on the topic see `this JavaOne presentation
<http://www.slideshare.net/jboner/state-youre-doing-it-wrong-javaone-2009>`_.
STM on the other hand is excellent for problems where you need consensus and a
stable view of the state by providing compositional transactional shared
state. Some of the really nice traits of STM are that transactions compose, and
it raises the abstraction level from lock-based concurrency.
Akka's Transactors combine Actors and STM to provide the best of the Actor model
(concurrency and asynchronous event-based programming) and STM (compositional
transactional shared state) by providing transactional, compositional,
asynchronous, event-based message flows.
Generally, the STM is not needed very often when working with Akka. Some
use-cases (that we can think of) are:
- When you really need composable message flows across many actors updating
their **internal local** state but need them to do that atomically in one big
transaction. Might not be often but when you do need this then you are
screwed without it.
- When you want to share a datastructure across actors.
Actors and STM
==============
You can combine Actors and STM in several ways. An Actor may use STM internally
so that particular changes are guaranteed to be atomic. Actors may also share
transactional datastructures as the STM provides safe shared state across
threads.
It's also possible to coordinate transactions across Actors or threads so that
either the transactions in a set all commit successfully or they all fail. This
is the focus of Transactors and the explicit support for coordinated
transactions in this section.
Coordinated transactions
========================
Akka provides an explicit mechanism for coordinating transactions across
actors. Under the hood it uses a ``CommitBarrier``, similar to a CountDownLatch.
Here is an example of coordinating two simple counter UntypedActors so that they
both increment together in coordinated transactions. If one of them was to fail
to increment, the other would also fail.
.. includecode:: code/docs/transactor/Increment.java#class
:language: java
.. includecode:: code/docs/transactor/CoordinatedCounter.java#class
:language: java
.. includecode:: code/docs/transactor/TransactorDocTest.java#imports
:language: java
.. includecode:: code/docs/transactor/TransactorDocTest.java#coordinated-example
:language: java
To start a new coordinated transaction that you will also participate in, create
a ``Coordinated`` object, passing in a ``Timeout``:
.. includecode:: code/docs/transactor/TransactorDocTest.java#create-coordinated
:language: java
To start a coordinated transaction that you won't participate in yourself you
can create a ``Coordinated`` object with a message and send it directly to an
actor. The recipient of the message will be the first member of the coordination
set:
.. includecode:: code/docs/transactor/TransactorDocTest.java#send-coordinated
:language: java
To include another actor in the same coordinated transaction that you've created
or received, use the ``coordinate`` method on that object. This will increment
the number of parties involved by one and create a new ``Coordinated`` object to
be sent.
.. includecode:: code/docs/transactor/TransactorDocTest.java#include-coordinated
:language: java
To enter the coordinated transaction use the atomic method of the coordinated
object, passing in a ``java.lang.Runnable``.
.. includecode:: code/docs/transactor/Coordinator.java#coordinated-atomic
:language: java
The coordinated transaction will wait for the other transactions before
committing. If any of the coordinated transactions fail then they all fail.
.. note::
The same actor should not be added to a coordinated transaction more than
once. The transaction will not be able to complete as an actor only processes
a single message at a time. When processing the first message the coordinated
transaction will wait for the commit barrier, which in turn needs the second
message to be received to proceed.
UntypedTransactor
=================
UntypedTransactors are untyped actors that provide a general pattern for
coordinating transactions, using the explicit coordination described above.
Here's an example of a simple untyped transactor that will join a coordinated
transaction:
.. includecode:: code/docs/transactor/Counter.java#class
:language: java
You could send this Counter transactor a ``Coordinated(Increment)`` message. If
you were to send it just an ``Increment`` message it will create its own
``Coordinated`` (but in this particular case wouldn't be coordinating
transactions with any other transactors).
To coordinate with other transactors override the ``coordinate`` method. The
``coordinate`` method maps a message to a set of ``SendTo`` objects, pairs of
``ActorRef`` and a message. You can use the ``include`` and ``sendTo`` methods
to easily coordinate with other transactors.
Here's an example of coordinating an increment, using an untyped transactor,
similar to the explicitly coordinated example above.
.. includecode:: code/docs/transactor/FriendlyCounter.java#class
:language: java
To execute directly before or after the coordinated transaction, override the
``before`` and ``after`` methods. They do not execute within the transaction.
To completely bypass coordinated transactions override the ``normally``
method. Any message matched by ``normally`` will not be matched by the other
methods, and will not be involved in coordinated transactions. In this method
you can implement normal actor behavior, or use the normal STM atomic for local
transactions.

View file

@ -118,6 +118,12 @@ Deprecated STM Support for Agents
Agents participating in enclosing STM transaction is a deprecated feature. Agents participating in enclosing STM transaction is a deprecated feature.
Transactor Module is Deprecated
===============================
The integration between actors and STM in the module ``akka-transactor`` is deprecated and will be
removed in a future version.
Removed Deprecated Features Removed Deprecated Features
=========================== ===========================
@ -127,3 +133,4 @@ The following, previously deprecated, features have been removed:
* `API changes to FSM and TestFSMRef <http://doc.akka.io/docs/akka/2.2.3/project/migration-guide-2.1.x-2.2.x.html#API_changes_to_FSM_and_TestFSMRef>`_ * `API changes to FSM and TestFSMRef <http://doc.akka.io/docs/akka/2.2.3/project/migration-guide-2.1.x-2.2.x.html#API_changes_to_FSM_and_TestFSMRef>`_
* DefaultScheduler superseded by LightArrayRevolverScheduler * DefaultScheduler superseded by LightArrayRevolverScheduler

View file

@ -1,234 +0,0 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.transactor
import language.postfixOps
import akka.actor._
import akka.transactor._
import scala.concurrent.duration._
import akka.util.Timeout
import akka.testkit._
import scala.concurrent.stm._
object CoordinatedExample {
//#coordinated-example
import akka.actor._
import akka.transactor._
import scala.concurrent.stm._
case class Increment(friend: Option[ActorRef] = None)
case object GetCount
class Counter extends Actor {
val count = Ref(0)
def receive = {
case coordinated @ Coordinated(Increment(friend)) => {
friend foreach (_ ! coordinated(Increment()))
coordinated atomic { implicit t =>
count transform (_ + 1)
}
}
case GetCount => sender ! count.single.get
}
}
//#coordinated-example
}
object CoordinatedApi {
case object Message
class Coordinator extends Actor {
//#receive-coordinated
def receive = {
case coordinated @ Coordinated(Message) => {
//#coordinated-atomic
coordinated atomic { implicit t =>
// do something in the coordinated transaction ...
}
//#coordinated-atomic
}
}
//#receive-coordinated
}
}
object CounterExample {
//#counter-example
import akka.transactor._
import scala.concurrent.stm._
case object Increment
class Counter extends Transactor {
val count = Ref(0)
def atomically = implicit txn => {
case Increment => count transform (_ + 1)
}
}
//#counter-example
}
object FriendlyCounterExample {
//#friendly-counter-example
import akka.actor._
import akka.transactor._
import scala.concurrent.stm._
case object Increment
class FriendlyCounter(friend: ActorRef) extends Transactor {
val count = Ref(0)
override def coordinate = {
case Increment => include(friend)
}
def atomically = implicit txn => {
case Increment => count transform (_ + 1)
}
}
//#friendly-counter-example
class Friend extends Transactor {
val count = Ref(0)
def atomically = implicit txn => {
case Increment => count transform (_ + 1)
}
}
}
// Only checked for compilation
object TransactorCoordinate {
case object Message
case object SomeMessage
case object SomeOtherMessage
case object OtherMessage
case object Message1
case object Message2
class TestCoordinateInclude(actor1: ActorRef, actor2: ActorRef, actor3: ActorRef) extends Transactor {
//#coordinate-include
override def coordinate = {
case Message => include(actor1, actor2, actor3)
}
//#coordinate-include
def atomically = txn => doNothing
}
class TestCoordinateSendTo(someActor: ActorRef, actor1: ActorRef, actor2: ActorRef) extends Transactor {
//#coordinate-sendto
override def coordinate = {
case SomeMessage => sendTo(someActor -> SomeOtherMessage)
case OtherMessage => sendTo(actor1 -> Message1, actor2 -> Message2)
}
//#coordinate-sendto
def atomically = txn => doNothing
}
}
class TransactorDocSpec extends AkkaSpec {
"coordinated example" in {
import CoordinatedExample._
//#run-coordinated-example
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.util.Timeout
import akka.pattern.ask
val system = ActorSystem("app")
val counter1 = system.actorOf(Props[Counter], name = "counter1")
val counter2 = system.actorOf(Props[Counter], name = "counter2")
implicit val timeout = Timeout(5 seconds)
counter1 ! Coordinated(Increment(Some(counter2)))
val count = Await.result(counter1 ? GetCount, timeout.duration)
// count == 1
//#run-coordinated-example
count must be === 1
shutdown(system)
}
"coordinated api" in {
import CoordinatedApi._
//#implicit-timeout
import scala.concurrent.duration._
import akka.util.Timeout
implicit val timeout = Timeout(5 seconds)
//#implicit-timeout
//#create-coordinated
val coordinated = Coordinated()
//#create-coordinated
val system = ActorSystem("coordinated")
val actor = system.actorOf(Props[Coordinator], name = "coordinator")
//#send-coordinated
actor ! Coordinated(Message)
//#send-coordinated
//#include-coordinated
actor ! coordinated(Message)
//#include-coordinated
coordinated.await()
shutdown(system)
}
"counter transactor" in {
import CounterExample._
val system = ActorSystem("transactors")
// FIXME, or remove the whole transactor module, srsly
lazy val underlyingCounter = new Counter
val counter = system.actorOf(Props(underlyingCounter), name = "counter")
val coordinated = Coordinated()(Timeout(5 seconds))
counter ! coordinated(Increment)
coordinated.await()
underlyingCounter.count.single.get must be === 1
shutdown(system)
}
"friendly counter transactor" in {
import FriendlyCounterExample._
val system = ActorSystem("transactors")
lazy val underlyingFriend = new Friend
val friend = system.actorOf(Props(underlyingFriend), name = "friend")
lazy val underlyingFriendlyCounter = new FriendlyCounter(friend)
val friendlyCounter = system.actorOf(Props(underlyingFriendlyCounter), name = "friendly")
val coordinated = Coordinated()(Timeout(5 seconds))
friendlyCounter ! coordinated(Increment)
coordinated.await()
underlyingFriendlyCounter.count.single.get must be === 1
underlyingFriend.count.single.get must be === 1
shutdown(system)
}
}

View file

@ -5,6 +5,4 @@ Futures and Agents
:maxdepth: 2 :maxdepth: 2
futures futures
stm
agents agents
transactors

View file

@ -1,75 +0,0 @@
.. _stm-scala:
#######################################
Software Transactional Memory
#######################################
Overview of STM
===============
An `STM <http://en.wikipedia.org/wiki/Software_transactional_memory>`_ turns the
Java heap into a transactional data set with begin/commit/rollback
semantics. Very much like a regular database. It implements the first three
letters in `ACID`_; ACI:
* Atomic
* Consistent
* Isolated
.. _ACID: http://en.wikipedia.org/wiki/ACID
Generally, the STM is not needed very often when working with Akka. Some
use-cases (that we can think of) are:
- When you really need composable message flows across many actors updating
their **internal local** state but need them to do that atomically in one big
transaction. Might not be often, but when you do need this then you are
screwed without it.
- When you want to share a datastructure across actors.
The use of STM in Akka is inspired by the concepts and views in `Clojure`_\'s
STM. Please take the time to read `this excellent document`_ about state in
clojure and view `this presentation`_ by Rich Hickey (the genius behind
Clojure).
.. _Clojure: http://clojure.org/
.. _this excellent document: http://clojure.org/state
.. _this presentation: http://www.infoq.com/presentations/Value-Identity-State-Rich-Hickey
Scala STM
=========
The STM supported in Akka is `ScalaSTM`_ which will be soon included in the
Scala standard library.
.. _ScalaSTM: http://nbronson.github.com/scala-stm/
The STM is based on Transactional References (referred to as Refs). Refs are
memory cells, holding an (arbitrary) immutable value, that implement CAS
(Compare-And-Swap) semantics and are managed and enforced by the STM for
coordinated changes across many Refs.
Persistent Datastructures
=========================
Working with immutable collections can sometimes give bad performance due to
extensive copying. Scala provides so-called persistent datastructures which
makes working with immutable collections fast. They are immutable but with
constant time access and modification. They use structural sharing and an insert
or update does not ruin the old structure, hence "persistent". Makes working
with immutable composite types fast. The persistent datastructures currently
consist of a `Map`_ and `Vector`_.
.. _Map: http://www.scala-lang.org/api/current/index.html#scala.collection.immutable.Map
.. _Vector: http://www.scala-lang.org/api/current/index.html#scala.collection.immutable.Vector
Integration with Actors
=======================
In Akka we've also integrated Actors and STM in :ref:`agents-scala` and
:ref:`transactors-scala`.

View file

@ -1,163 +0,0 @@
.. _transactors-scala:
#####################
Transactors
#####################
Why Transactors?
================
Actors are excellent for solving problems where you have many independent
processes that can work in isolation and only interact with other Actors through
message passing. This model fits many problems. But the actor model is
unfortunately a terrible model for implementing truly shared state. E.g. when
you need to have consensus and a stable view of state across many
components. The classic example is the bank account where clients can deposit
and withdraw, in which each operation needs to be atomic. For detailed
discussion on the topic see `this JavaOne presentation
<http://www.slideshare.net/jboner/state-youre-doing-it-wrong-javaone-2009>`_.
STM on the other hand is excellent for problems where you need consensus and a
stable view of the state by providing compositional transactional shared
state. Some of the really nice traits of STM are that transactions compose, and
it raises the abstraction level from lock-based concurrency.
Akka's Transactors combine Actors and STM to provide the best of the Actor model
(concurrency and asynchronous event-based programming) and STM (compositional
transactional shared state) by providing transactional, compositional,
asynchronous, event-based message flows.
Generally, the STM is not needed very often when working with Akka. Some
use-cases (that we can think of) are:
- When you really need composable message flows across many actors updating
their **internal local** state but need them to do that atomically in one big
transaction. Might not be often but when you do need this then you are
screwed without it.
- When you want to share a datastructure across actors.
Actors and STM
==============
You can combine Actors and STM in several ways. An Actor may use STM internally
so that particular changes are guaranteed to be atomic. Actors may also share
transactional datastructures as the STM provides safe shared state across
threads.
It's also possible to coordinate transactions across Actors or threads so that
either the transactions in a set all commit successfully or they all fail. This
is the focus of Transactors and the explicit support for coordinated
transactions in this section.
Coordinated transactions
========================
Akka provides an explicit mechanism for coordinating transactions across
Actors. Under the hood it uses a ``CommitBarrier``, similar to a CountDownLatch.
Here is an example of coordinating two simple counter Actors so that they both
increment together in coordinated transactions. If one of them was to fail to
increment, the other would also fail.
.. includecode:: code/docs/transactor/TransactorDocSpec.scala#coordinated-example
.. includecode:: code/docs/transactor/TransactorDocSpec.scala#run-coordinated-example
Note that creating a ``Coordinated`` object requires a ``Timeout`` to be
specified for the coordinated transaction. This can be done implicitly, by
having an implicit ``Timeout`` in scope, or explicitly, by passing the timeout
when creating a a ``Coordinated`` object. Here's an example of specifying an
implicit timeout:
.. includecode:: code/docs/transactor/TransactorDocSpec.scala#implicit-timeout
To start a new coordinated transaction that you will also participate in, just
create a ``Coordinated`` object (this assumes an implicit timeout):
.. includecode:: code/docs/transactor/TransactorDocSpec.scala#create-coordinated
To start a coordinated transaction that you won't participate in yourself you
can create a ``Coordinated`` object with a message and send it directly to an
actor. The recipient of the message will be the first member of the coordination
set:
.. includecode:: code/docs/transactor/TransactorDocSpec.scala#send-coordinated
To receive a coordinated message in an actor simply match it in a case
statement:
.. includecode:: code/docs/transactor/TransactorDocSpec.scala#receive-coordinated
:exclude: coordinated-atomic
To include another actor in the same coordinated transaction that you've created
or received, use the apply method on that object. This will increment the number
of parties involved by one and create a new ``Coordinated`` object to be sent.
.. includecode:: code/docs/transactor/TransactorDocSpec.scala#include-coordinated
To enter the coordinated transaction use the atomic method of the coordinated
object:
.. includecode:: code/docs/transactor/TransactorDocSpec.scala#coordinated-atomic
The coordinated transaction will wait for the other transactions before
committing. If any of the coordinated transactions fail then they all fail.
.. note::
The same actor should not be added to a coordinated transaction more than
once. The transaction will not be able to complete as an actor only processes
a single message at a time. When processing the first message the coordinated
transaction will wait for the commit barrier, which in turn needs the second
message to be received to proceed.
Transactor
==========
Transactors are actors that provide a general pattern for coordinating
transactions, using the explicit coordination described above.
Here's an example of a simple transactor that will join a coordinated
transaction:
.. includecode:: code/docs/transactor/TransactorDocSpec.scala#counter-example
You could send this Counter transactor a ``Coordinated(Increment)`` message. If
you were to send it just an ``Increment`` message it will create its own
``Coordinated`` (but in this particular case wouldn't be coordinating
transactions with any other transactors).
To coordinate with other transactors override the ``coordinate`` method. The
``coordinate`` method maps a message to a set of ``SendTo`` objects, pairs of
``ActorRef`` and a message. You can use the ``include`` and ``sendTo`` methods
to easily coordinate with other transactors. The ``include`` method will send on
the same message that was received to other transactors. The ``sendTo`` method
allows you to specify both the actor to send to, and the message to send.
Example of coordinating an increment:
.. includecode:: code/docs/transactor/TransactorDocSpec.scala#friendly-counter-example
Using ``include`` to include more than one transactor:
.. includecode:: code/docs/transactor/TransactorDocSpec.scala#coordinate-include
Using ``sendTo`` to coordinate transactions but pass-on a different message than
the one that was received:
.. includecode:: code/docs/transactor/TransactorDocSpec.scala#coordinate-sendto
To execute directly before or after the coordinated transaction, override the
``before`` and ``after`` methods. These methods also expect partial functions
like the receive method. They do not execute within the transaction.
To completely bypass coordinated transactions override the ``normally``
method. Any message matched by ``normally`` will not be matched by the other
methods, and will not be involved in coordinated transactions. In this method
you can implement normal actor behavior, or use the normal STM atomic for local
transactions.

View file

@ -6,6 +6,7 @@
# Make your edits/overrides in your application.conf. # Make your edits/overrides in your application.conf.
akka { akka {
# akka.transactor is deprecated in 2.3
transactor { transactor {
# The timeout used for coordinated transactions across actors # The timeout used for coordinated transactions across actors
coordinated-timeout = 5s coordinated-timeout = 5s

View file

@ -12,6 +12,7 @@ import java.util.concurrent.Callable
/** /**
* Akka-specific exception for coordinated transactions. * Akka-specific exception for coordinated transactions.
*/ */
@deprecated("akka.transactor will be removed", "2.3")
class CoordinatedTransactionException(message: String, cause: Throwable) extends AkkaException(message, cause) { class CoordinatedTransactionException(message: String, cause: Throwable) extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null) def this(msg: String) = this(msg, null)
} }
@ -19,6 +20,7 @@ class CoordinatedTransactionException(message: String, cause: Throwable) extends
/** /**
* Coordinated transactions across actors. * Coordinated transactions across actors.
*/ */
@deprecated("akka.transactor will be removed", "2.3")
object Coordinated { object Coordinated {
/** /**
@ -97,6 +99,7 @@ object Coordinated {
* *
* @see [[akka.transactor.Transactor]] for an actor that implements coordinated transactions * @see [[akka.transactor.Transactor]] for an actor that implements coordinated transactions
*/ */
@deprecated("akka.transactor will be removed", "2.3")
class Coordinated(val message: Any, member: CommitBarrier.Member) { class Coordinated(val message: Any, member: CommitBarrier.Member) {
// Java API constructors // Java API constructors

View file

@ -12,6 +12,7 @@ import scala.concurrent.stm.InTxn
/** /**
* Used for specifying actor refs and messages to send to during coordination. * Used for specifying actor refs and messages to send to during coordination.
*/ */
@deprecated("akka.transactor will be removed", "2.3")
case class SendTo(actor: ActorRef, message: Option[Any] = None) case class SendTo(actor: ActorRef, message: Option[Any] = None)
/** /**
@ -94,6 +95,7 @@ case class SendTo(actor: ActorRef, message: Option[Any] = None)
* *
* @see [[akka.transactor.Coordinated]] * @see [[akka.transactor.Coordinated]]
*/ */
@deprecated("akka.transactor will be removed", "2.3")
trait Transactor extends Actor { trait Transactor extends Actor {
private val settings = TransactorExtension(context.system) private val settings = TransactorExtension(context.system)
@ -181,6 +183,9 @@ trait Transactor extends Actor {
def doNothing: Receive = EmptyReceive def doNothing: Receive = EmptyReceive
} }
/**
* INTERNAL API
*/
private[akka] object EmptyReceive extends PartialFunction[Any, Unit] { private[akka] object EmptyReceive extends PartialFunction[Any, Unit] {
def apply(any: Any): Unit = () def apply(any: Any): Unit = ()
def isDefinedAt(any: Any): Boolean = false def isDefinedAt(any: Any): Boolean = false

View file

@ -13,12 +13,14 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
/** /**
* TransactorExtension is an Akka Extension to hold settings for transactors. * TransactorExtension is an Akka Extension to hold settings for transactors.
*/ */
@deprecated("akka.transactor will be removed", "2.3")
object TransactorExtension extends ExtensionId[TransactorSettings] with ExtensionIdProvider { object TransactorExtension extends ExtensionId[TransactorSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): TransactorSettings = super.get(system) override def get(system: ActorSystem): TransactorSettings = super.get(system)
override def lookup: TransactorExtension.type = TransactorExtension override def lookup: TransactorExtension.type = TransactorExtension
override def createExtension(system: ExtendedActorSystem): TransactorSettings = new TransactorSettings(system.settings.config) override def createExtension(system: ExtendedActorSystem): TransactorSettings = new TransactorSettings(system.settings.config)
} }
@deprecated("akka.transactor will be removed", "2.3")
class TransactorSettings(val config: Config) extends Extension { class TransactorSettings(val config: Config) extends Extension {
import config._ import config._
val CoordinatedTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.transactor.coordinated-timeout"), MILLISECONDS)) val CoordinatedTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.transactor.coordinated-timeout"), MILLISECONDS))

View file

@ -11,6 +11,7 @@ import java.util.Collections.{ emptySet, singleton ⇒ singletonSet }
/** /**
* An UntypedActor version of transactor for using from Java. * An UntypedActor version of transactor for using from Java.
*/ */
@deprecated("akka.transactor will be removed", "2.3")
abstract class UntypedTransactor extends UntypedActor { abstract class UntypedTransactor extends UntypedActor {
import scala.collection.JavaConverters.asScalaSetConverter import scala.collection.JavaConverters.asScalaSetConverter

View file

@ -619,7 +619,7 @@ object AkkaBuild extends Build {
id = "akka-docs", id = "akka-docs",
base = file("akka-docs"), base = file("akka-docs"),
dependencies = Seq(actor, testkit % "test->test", channels, dependencies = Seq(actor, testkit % "test->test", channels,
remote % "compile;test->test", cluster, slf4j, agent, transactor, zeroMQ, camel, osgi, osgiAries, remote % "compile;test->test", cluster, slf4j, agent, zeroMQ, camel, osgi, osgiAries,
persistence % "compile;test->test"), persistence % "compile;test->test"),
settings = defaultSettings ++ docFormatSettings ++ site.settings ++ site.sphinxSupport() ++ site.publishSite ++ sphinxPreprocessing ++ cpsPlugin ++ Seq( settings = defaultSettings ++ docFormatSettings ++ site.settings ++ site.sphinxSupport() ++ site.publishSite ++ sphinxPreprocessing ++ cpsPlugin ++ Seq(
sourceDirectory in Sphinx <<= baseDirectory / "rst", sourceDirectory in Sphinx <<= baseDirectory / "rst",