diff --git a/akka-docs/disabled/java-transactors.rst b/akka-docs/disabled/java-transactors.rst deleted file mode 100644 index 2e1c3dc769..0000000000 --- a/akka-docs/disabled/java-transactors.rst +++ /dev/null @@ -1,269 +0,0 @@ -.. _transactors-java: - -Transactors (Java) -================== - -.. sidebar:: Contents - - .. contents:: :local: - -Why Transactors? ----------------- - -Actors are excellent for solving problems where you have many independent processes that can work in isolation and only interact with other Actors through message passing. This model fits many problems. But the actor model is unfortunately a terrible model for implementing truly shared state. E.g. when you need to have consensus and a stable view of state across many components. The classic example is the bank account where clients can deposit and withdraw, in which each operation needs to be atomic. For detailed discussion on the topic see `this JavaOne presentation `_. - -**STM** on the other hand is excellent for problems where you need consensus and a stable view of the state by providing compositional transactional shared state. Some of the really nice traits of STM are that transactions compose, and it raises the abstraction level from lock-based concurrency. - -Akka's Transactors combine Actors and STM to provide the best of the Actor model (concurrency and asynchronous event-based programming) and STM (compositional transactional shared state) by providing transactional, compositional, asynchronous, event-based message flows. - -If you need Durability then you should not use one of the in-memory data structures but one of the persistent ones. - -Generally, the STM is not needed very often when working with Akka. Some use-cases (that we can think of) are: - -- When you really need composable message flows across many actors updating their **internal local** state but need them to do that atomically in one big transaction. Might not often, but when you do need this then you are screwed without it. -- When you want to share a datastructure across actors. -- When you need to use the persistence modules. - -Actors and STM -^^^^^^^^^^^^^^ - -You can combine Actors and STM in several ways. An Actor may use STM internally so that particular changes are guaranteed to be atomic. Actors may also share transactional datastructures as the STM provides safe shared state across threads. - -It's also possible to coordinate transactions across Actors or threads so that either the transactions in a set all commit successfully or they all fail. This is the focus of Transactors and the explicit support for coordinated transactions in this section. - - -Coordinated transactions ------------------------- - -Akka provides an explicit mechanism for coordinating transactions across actors. Under the hood it uses a ``CountDownCommitBarrier``, similar to a CountDownLatch. - -Here is an example of coordinating two simple counter UntypedActors so that they both increment together in coordinated transactions. If one of them was to fail to increment, the other would also fail. - -.. code-block:: java - - import akka.actor.ActorRef; - - public class Increment { - private final ActorRef friend; - - public Increment() { - this.friend = null; - } - - public Increment(ActorRef friend) { - this.friend = friend; - } - - public boolean hasFriend() { - return friend != null; - } - - public ActorRef getFriend() { - return friend; - } - } - -.. code-block:: java - - import akka.actor.UntypedActor; - import akka.stm.Ref; - import akka.transactor.Atomically; - import akka.transactor.Coordinated; - - public class Counter extends UntypedActor { - private Ref count = new Ref(0); - - private void increment() { - count.set(count.get() + 1); - } - - public void onReceive(Object incoming) throws Exception { - if (incoming instanceof Coordinated) { - Coordinated coordinated = (Coordinated) incoming; - Object message = coordinated.getMessage(); - if (message instanceof Increment) { - Increment increment = (Increment) message; - if (increment.hasFriend()) { - increment.getFriend().tell(coordinated.coordinate(new Increment())); - } - coordinated.atomic(new Atomically() { - public void atomically() { - increment(); - } - }); - } - } else if (incoming.equals("GetCount")) { - getContext().reply(count.get()); - } - } - } - -.. code-block:: java - - ActorRef counter1 = actorOf(Counter.class); - ActorRef counter2 = actorOf(Counter.class); - - counter1.tell(new Coordinated(new Increment(counter2))); - -To start a new coordinated transaction that you will also participate in, just create a ``Coordinated`` object: - -.. code-block:: java - - Coordinated coordinated = new Coordinated(); - -To start a coordinated transaction that you won't participate in yourself you can create a ``Coordinated`` object with a message and send it directly to an actor. The recipient of the message will be the first member of the coordination set: - -.. code-block:: java - - actor.tell(new Coordinated(new Message())); - -To include another actor in the same coordinated transaction that you've created or received, use the ``coordinate`` method on that object. This will increment the number of parties involved by one and create a new ``Coordinated`` object to be sent. - -.. code-block:: java - - actor.tell(coordinated.coordinate(new Message())); - -To enter the coordinated transaction use the atomic method of the coordinated object. This accepts either an ``akka.transactor.Atomically`` object, or an ``Atomic`` object the same as used normally in the STM (just don't execute it - the coordination will do that). - -.. code-block:: java - - coordinated.atomic(new Atomically() { - public void atomically() { - // do something in a transaction - } - }); - -The coordinated transaction will wait for the other transactions before committing. If any of the coordinated transactions fail then they all fail. - - -UntypedTransactor ------------------ - -UntypedTransactors are untyped actors that provide a general pattern for coordinating transactions, using the explicit coordination described above. - -Here's an example of a simple untyped transactor that will join a coordinated transaction: - -.. code-block:: java - - import akka.transactor.UntypedTransactor; - import akka.stm.Ref; - - public class Counter extends UntypedTransactor { - Ref count = new Ref(0); - - @Override - public void atomically(Object message) { - if (message instanceof Increment) { - count.set(count.get() + 1); - } - } - } - -You could send this Counter transactor a ``Coordinated(Increment)`` message. If you were to send it just an ``Increment`` message it will create its own ``Coordinated`` (but in this particular case wouldn't be coordinating transactions with any other transactors). - -To coordinate with other transactors override the ``coordinate`` method. The ``coordinate`` method maps a message to a set of ``SendTo`` objects, pairs of ``ActorRef`` and a message. You can use the ``include`` and ``sendTo`` methods to easily coordinate with other transactors. - -Example of coordinating an increment, similar to the explicitly coordinated example: - -.. code-block:: java - - import akka.transactor.UntypedTransactor; - import akka.transactor.SendTo; - import akka.stm.Ref; - - import java.util.Set; - - public class Counter extends UntypedTransactor { - Ref count = new Ref(0); - - @Override - public Set coordinate(Object message) { - if (message instanceof Increment) { - Increment increment = (Increment) message; - if (increment.hasFriend()) - return include(increment.getFriend(), new Increment()); - } - return nobody(); - } - - @Override - public void atomically(Object message) { - if (message instanceof Increment) { - count.set(count.get() + 1); - } - } - } - -To execute directly before or after the coordinated transaction, override the ``before`` and ``after`` methods. They do not execute within the transaction. - -To completely bypass coordinated transactions override the ``normally`` method. Any message matched by ``normally`` will not be matched by the other methods, and will not be involved in coordinated transactions. In this method you can implement normal actor behavior, or use the normal STM atomic for local transactions. - - -Coordinating Typed Actors -------------------------- - -It's also possible to use coordinated transactions with typed actors. You can explicitly pass around ``Coordinated`` objects, or use built-in support with the ``@Coordinated`` annotation and the ``Coordination.coordinate`` method. - -To specify a method should use coordinated transactions add the ``@Coordinated`` annotation. **Note**: the ``@Coordinated`` annotation will only work with void (one-way) methods. - -.. code-block:: java - - public interface Counter { - @Coordinated public void increment(); - public Integer get(); - } - -To coordinate transactions use a ``coordinate`` block. This accepts either an ``akka.transactor.Atomically`` object, or an ``Atomic`` object liked used in the STM (but don't execute it). The first boolean parameter specifies whether or not to wait for the transactions to complete. - -.. code-block:: java - - Coordination.coordinate(true, new Atomically() { - public void atomically() { - counter1.increment(); - counter2.increment(); - } - }); - -Here's an example of using ``@Coordinated`` with a TypedActor to coordinate increments: - -.. code-block:: java - - import akka.transactor.annotation.Coordinated; - - public interface Counter { - @Coordinated public void increment(); - public Integer get(); - } - -.. code-block:: java - - import akka.actor.TypedActor; - import akka.stm.Ref; - - public class CounterImpl extends TypedActor implements Counter { - private Ref count = new Ref(0); - - public void increment() { - count.set(count.get() + 1); - } - - public Integer get() { - return count.get(); - } - } - -.. code-block:: java - - Counter counter1 = (Counter) TypedActor.newInstance(Counter.class, CounterImpl.class); - Counter counter2 = (Counter) TypedActor.newInstance(Counter.class, CounterImpl.class); - - Coordination.coordinate(true, new Atomically() { - public void atomically() { - counter1.increment(); - counter2.increment(); - } - }); - - TypedActor.stop(counter1); - TypedActor.stop(counter2); - diff --git a/akka-docs/disabled/scala-transactors.rst b/akka-docs/disabled/scala-transactors.rst deleted file mode 100644 index 7d654d5f15..0000000000 --- a/akka-docs/disabled/scala-transactors.rst +++ /dev/null @@ -1,248 +0,0 @@ -.. _transactors-scala: - -Transactors (Scala) -=================== - -.. sidebar:: Contents - - .. contents:: :local: - -Why Transactors? ----------------- - -Actors are excellent for solving problems where you have many independent processes that can work in isolation and only interact with other Actors through message passing. This model fits many problems. But the actor model is unfortunately a terrible model for implementing truly shared state. E.g. when you need to have consensus and a stable view of state across many components. The classic example is the bank account where clients can deposit and withdraw, in which each operation needs to be atomic. For detailed discussion on the topic see `this JavaOne presentation `_. - -**STM** on the other hand is excellent for problems where you need consensus and a stable view of the state by providing compositional transactional shared state. Some of the really nice traits of STM are that transactions compose, and it raises the abstraction level from lock-based concurrency. - -Akka's Transactors combine Actors and STM to provide the best of the Actor model (concurrency and asynchronous event-based programming) and STM (compositional transactional shared state) by providing transactional, compositional, asynchronous, event-based message flows. - -If you need Durability then you should not use one of the in-memory data structures but one of the persistent ones. - -Generally, the STM is not needed very often when working with Akka. Some use-cases (that we can think of) are: - -- When you really need composable message flows across many actors updating their **internal local** state but need them to do that atomically in one big transaction. Might not often, but when you do need this then you are screwed without it. -- When you want to share a datastructure across actors. -- When you need to use the persistence modules. - -Actors and STM -^^^^^^^^^^^^^^ - -You can combine Actors and STM in several ways. An Actor may use STM internally so that particular changes are guaranteed to be atomic. Actors may also share transactional datastructures as the STM provides safe shared state across threads. - -It's also possible to coordinate transactions across Actors or threads so that either the transactions in a set all commit successfully or they all fail. This is the focus of Transactors and the explicit support for coordinated transactions in this section. - - -Coordinated transactions ------------------------- - -Akka provides an explicit mechanism for coordinating transactions across Actors. Under the hood it uses a ``CountDownCommitBarrier``, similar to a CountDownLatch. - -Here is an example of coordinating two simple counter Actors so that they both increment together in coordinated transactions. If one of them was to fail to increment, the other would also fail. - -.. code-block:: scala - - import akka.transactor.Coordinated - import akka.stm.Ref - import akka.actor.{Actor, ActorRef} - - case class Increment(friend: Option[ActorRef] = None) - case object GetCount - - class Counter extends Actor { - val count = Ref(0) - - def receive = { - case coordinated @ Coordinated(Increment(friend)) => { - friend foreach (_ ! coordinated(Increment())) - coordinated atomic { - count alter (_ + 1) - } - } - case GetCount => self.reply(count.get) - } - } - - val counter1 = Actor.actorOf[Counter] - val counter2 = Actor.actorOf[Counter] - - counter1 ! Coordinated(Increment(Some(counter2))) - - ... - - (counter1 ? GetCount).as[Int] // Some(1) - - counter1.stop() - counter2.stop() - -To start a new coordinated transaction that you will also participate in, just create a ``Coordinated`` object: - -.. code-block:: scala - - val coordinated = Coordinated() - -To start a coordinated transaction that you won't participate in yourself you can create a ``Coordinated`` object with a message and send it directly to an actor. The recipient of the message will be the first member of the coordination set: - -.. code-block:: scala - - actor ! Coordinated(Message) - -To receive a coordinated message in an actor simply match it in a case statement: - -.. code-block:: scala - - def receive = { - case coordinated @ Coordinated(Message) => ... - } - -To include another actor in the same coordinated transaction that you've created or received, use the apply method on that object. This will increment the number of parties involved by one and create a new ``Coordinated`` object to be sent. - -.. code-block:: scala - - actor ! coordinated(Message) - -To enter the coordinated transaction use the atomic method of the coordinated object: - -.. code-block:: scala - - coordinated atomic { - // do something in transaction ... - } - -The coordinated transaction will wait for the other transactions before committing. If any of the coordinated transactions fail then they all fail. - - -Transactor ----------- - -Transactors are actors that provide a general pattern for coordinating transactions, using the explicit coordination described above. - -Here's an example of a simple transactor that will join a coordinated transaction: - -.. code-block:: scala - - import akka.transactor.Transactor - import akka.stm.Ref - - case object Increment - - class Counter extends Transactor { - val count = Ref(0) - - override def atomically = { - case Increment => count alter (_ + 1) - } - } - -You could send this Counter transactor a ``Coordinated(Increment)`` message. If you were to send it just an ``Increment`` message it will create its own ``Coordinated`` (but in this particular case wouldn't be coordinating transactions with any other transactors). - -To coordinate with other transactors override the ``coordinate`` method. The ``coordinate`` method maps a message to a set of ``SendTo`` objects, pairs of ``ActorRef`` and a message. You can use the ``include`` and ``sendTo`` methods to easily coordinate with other transactors. The ``include`` method will send on the same message that was received to other transactors. The ``sendTo`` method allows you to specify both the actor to send to, and the message to send. - -Example of coordinating an increment: - -.. code-block:: scala - - import akka.transactor.Transactor - import akka.stm.Ref - import akka.actor.ActorRef - - case object Increment - - class FriendlyCounter(friend: ActorRef) extends Transactor { - val count = Ref(0) - - override def coordinate = { - case Increment => include(friend) - } - - override def atomically = { - case Increment => count alter (_ + 1) - } - } - -Using ``include`` to include more than one transactor: - -.. code-block:: scala - - override def coordinate = { - case Message => include(actor1, actor2, actor3) - } - -Using ``sendTo`` to coordinate transactions but pass-on a different message than the one that was received: - -.. code-block:: scala - - override def coordinate = { - case Message => sendTo(someActor -> SomeOtherMessage) - case SomeMessage => sendTo(actor1 -> Message1, actor2 -> Message2) - } - -To execute directly before or after the coordinated transaction, override the ``before`` and ``after`` methods. These methods also expect partial functions like the receive method. They do not execute within the transaction. - -To completely bypass coordinated transactions override the ``normally`` method. Any message matched by ``normally`` will not be matched by the other methods, and will not be involved in coordinated transactions. In this method you can implement normal actor behavior, or use the normal STM atomic for local transactions. - - -Coordinating Typed Actors -------------------------- - -It's also possible to use coordinated transactions with typed actors. You can explicitly pass around ``Coordinated`` objects, or use built-in support with the ``@Coordinated`` annotation and the ``Coordination.coordinate`` method. - -To specify a method should use coordinated transactions add the ``@Coordinated`` annotation. **Note**: the ``@Coordinated`` annotation only works with methods that return Unit (one-way methods). - -.. code-block:: scala - - trait Counter { - @Coordinated def increment() - def get: Int - } - -To coordinate transactions use a ``coordinate`` block: - -.. code-block:: scala - - coordinate { - counter1.increment() - counter2.increment() - } - -Here's an example of using ``@Coordinated`` with a TypedActor to coordinate increments. - -.. code-block:: scala - - import akka.actor.TypedActor - import akka.stm.Ref - import akka.transactor.annotation.Coordinated - import akka.transactor.Coordination._ - - trait Counter { - @Coordinated def increment() - def get: Int - } - - class CounterImpl extends TypedActor with Counter { - val ref = Ref(0) - def increment() { ref alter (_ + 1) } - def get = ref.get - } - - ... - - val counter1 = TypedActor.newInstance(classOf[Counter], classOf[CounterImpl]) - val counter2 = TypedActor.newInstance(classOf[Counter], classOf[CounterImpl]) - - coordinate { - counter1.increment() - counter2.increment() - } - - TypedActor.stop(counter1) - TypedActor.stop(counter2) - -The ``coordinate`` block will wait for the transactions to complete. If you do not want to wait then you can specify this explicitly: - -.. code-block:: scala - - coordinate(wait = false) { - counter1.increment() - counter2.increment() - } - diff --git a/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java b/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java new file mode 100644 index 0000000000..84bd33cb3b --- /dev/null +++ b/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.docs.transactor; + +//#class +import akka.actor.*; +import akka.transactor.*; +import scala.concurrent.stm.*; + +public class CoordinatedCounter extends UntypedActor { + private Ref count = Stm.ref(0); + + private void increment(InTxn txn) { + Integer newValue = count.get(txn) + 1; + count.set(newValue, txn); + } + + public void onReceive(Object incoming) throws Exception { + if (incoming instanceof Coordinated) { + Coordinated coordinated = (Coordinated) incoming; + Object message = coordinated.getMessage(); + if (message instanceof Increment) { + Increment increment = (Increment) message; + if (increment.hasFriend()) { + increment.getFriend().tell(coordinated.coordinate(new Increment())); + } + coordinated.atomic(new Atomically() { + public void atomically(InTxn txn) { + increment(txn); + } + }); + } + } else if ("GetCount".equals(incoming)) { + getSender().tell(count.single().get()); + } + } +} +//#class diff --git a/akka-docs/java/code/akka/docs/transactor/Coordinator.java b/akka-docs/java/code/akka/docs/transactor/Coordinator.java new file mode 100644 index 0000000000..37d7c935cb --- /dev/null +++ b/akka-docs/java/code/akka/docs/transactor/Coordinator.java @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.docs.transactor; + +import akka.actor.*; +import akka.transactor.*; +import scala.concurrent.stm.*; + +public class Coordinator extends UntypedActor { + public void onReceive(Object incoming) throws Exception { + if (incoming instanceof Coordinated) { + Coordinated coordinated = (Coordinated) incoming; + Object message = coordinated.getMessage(); + if (message instanceof Message) { + //#coordinated-atomic + coordinated.atomic(new Atomically() { + public void atomically(InTxn txn) { + // do something in the coordinated transaction ... + } + }); + //#coordinated-atomic + } + } + } +} diff --git a/akka-docs/java/code/akka/docs/transactor/Counter.java b/akka-docs/java/code/akka/docs/transactor/Counter.java new file mode 100644 index 0000000000..0a6b7b2219 --- /dev/null +++ b/akka-docs/java/code/akka/docs/transactor/Counter.java @@ -0,0 +1,28 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.docs.transactor; + +//#class +import akka.transactor.*; +import scala.concurrent.stm.*; + +public class Counter extends UntypedTransactor { + Ref count = Stm.ref(0); + + public void atomically(InTxn txn, Object message) { + if (message instanceof Increment) { + Integer newValue = count.get(txn) + 1; + count.set(newValue, txn); + } + } + + @Override public boolean normally(Object message) { + if ("GetCount".equals(message)) { + getSender().tell(count.single().get()); + return true; + } else return false; + } +} +//#class diff --git a/akka-stm/src/test/java/akka/transactor/example/UntypedCounter.java b/akka-docs/java/code/akka/docs/transactor/FriendlyCounter.java similarity index 53% rename from akka-stm/src/test/java/akka/transactor/example/UntypedCounter.java rename to akka-docs/java/code/akka/docs/transactor/FriendlyCounter.java index b580ee88f8..d70c653063 100644 --- a/akka-stm/src/test/java/akka/transactor/example/UntypedCounter.java +++ b/akka-docs/java/code/akka/docs/transactor/FriendlyCounter.java @@ -1,13 +1,17 @@ -package akka.transactor.example; +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ -import akka.transactor.UntypedTransactor; -import akka.transactor.SendTo; -import akka.stm.Ref; +package akka.docs.transactor; +//#class +import akka.actor.*; +import akka.transactor.*; import java.util.Set; +import scala.concurrent.stm.*; -public class UntypedCounter extends UntypedTransactor { - Ref count = new Ref(0); +public class FriendlyCounter extends UntypedTransactor { + Ref count = Stm.ref(0); @Override public Set coordinate(Object message) { if (message instanceof Increment) { @@ -18,16 +22,18 @@ public class UntypedCounter extends UntypedTransactor { return nobody(); } - public void atomically(Object message) { + public void atomically(InTxn txn, Object message) { if (message instanceof Increment) { - count.set(count.get() + 1); + Integer newValue = count.get(txn) + 1; + count.set(newValue, txn); } } @Override public boolean normally(Object message) { if ("GetCount".equals(message)) { - getSender().tell(count.get()); + getSender().tell(count.single().get()); return true; } else return false; } } +//#class diff --git a/akka-stm/src/test/java/akka/transactor/example/Increment.java b/akka-docs/java/code/akka/docs/transactor/Increment.java similarity index 72% rename from akka-stm/src/test/java/akka/transactor/example/Increment.java rename to akka-docs/java/code/akka/docs/transactor/Increment.java index bcb0988d41..ef1459a391 100644 --- a/akka-stm/src/test/java/akka/transactor/example/Increment.java +++ b/akka-docs/java/code/akka/docs/transactor/Increment.java @@ -1,5 +1,10 @@ -package akka.transactor.example; +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.docs.transactor; + +//#class import akka.actor.ActorRef; public class Increment { @@ -19,3 +24,4 @@ public class Increment { return friend; } } +//#class diff --git a/akka-docs/java/code/akka/docs/transactor/Message.java b/akka-docs/java/code/akka/docs/transactor/Message.java new file mode 100644 index 0000000000..4f182ba43a --- /dev/null +++ b/akka-docs/java/code/akka/docs/transactor/Message.java @@ -0,0 +1,7 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.docs.transactor; + +public class Message {} diff --git a/akka-docs/java/code/akka/docs/transactor/TransactorDocJavaSpec.scala b/akka-docs/java/code/akka/docs/transactor/TransactorDocJavaSpec.scala new file mode 100644 index 0000000000..d8bf2bf692 --- /dev/null +++ b/akka-docs/java/code/akka/docs/transactor/TransactorDocJavaSpec.scala @@ -0,0 +1,11 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.docs.transactor + +import org.scalatest.junit.JUnitWrapperSuite + +class TransactorDocJavaSpec extends JUnitWrapperSuite( + "akka.docs.transactor.TransactorDocTest", + Thread.currentThread.getContextClassLoader) \ No newline at end of file diff --git a/akka-docs/java/code/akka/docs/transactor/TransactorDocTest.java b/akka-docs/java/code/akka/docs/transactor/TransactorDocTest.java new file mode 100644 index 0000000000..090d336b37 --- /dev/null +++ b/akka-docs/java/code/akka/docs/transactor/TransactorDocTest.java @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.docs.transactor; + +import static org.junit.Assert.*; +import org.junit.Test; + +//#imports +import akka.actor.*; +import akka.dispatch.Await; +import akka.transactor.Coordinated; +import akka.util.Duration; +import akka.util.Timeout; +import java.util.concurrent.TimeUnit; +//#imports + +public class TransactorDocTest { + + @Test + public void coordinatedExample() { + //#coordinated-example + ActorSystem system = ActorSystem.create("CoordinatedExample"); + + ActorRef counter1 = system.actorOf(new Props().withCreator(CoordinatedCounter.class)); + ActorRef counter2 = system.actorOf(new Props().withCreator(CoordinatedCounter.class)); + + Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS)); + + counter1.tell(new Coordinated(new Increment(counter2), timeout)); + + Integer count = (Integer) Await.result(counter1.ask("GetCount", timeout), timeout.duration()); + //#coordinated-example + + assertEquals(count, new Integer(1)); + + system.shutdown(); + } + + @Test + public void coordinatedApi() { + //#create-coordinated + Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS)); + Coordinated coordinated = new Coordinated(timeout); + //#create-coordinated + + ActorSystem system = ActorSystem.create("CoordinatedApi"); + ActorRef actor = system.actorOf(new Props().withCreator(Coordinator.class)); + + //#send-coordinated + actor.tell(new Coordinated(new Message(), timeout)); + //#send-coordinated + + //#include-coordinated + actor.tell(coordinated.coordinate(new Message())); + //#include-coordinated + + coordinated.await(); + + system.shutdown(); + } + + @Test + public void counterTransactor() { + ActorSystem system = ActorSystem.create("CounterTransactor"); + ActorRef counter = system.actorOf(new Props().withCreator(Counter.class)); + + Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS)); + Coordinated coordinated = new Coordinated(timeout); + counter.tell(coordinated.coordinate(new Increment())); + coordinated.await(); + + Integer count = (Integer) Await.result(counter.ask("GetCount", timeout), timeout.duration()); + assertEquals(count, new Integer(1)); + + system.shutdown(); + } + + @Test + public void friendlyCounterTransactor() { + ActorSystem system = ActorSystem.create("FriendlyCounterTransactor"); + ActorRef friend = system.actorOf(new Props().withCreator(Counter.class)); + ActorRef friendlyCounter = system.actorOf(new Props().withCreator(FriendlyCounter.class)); + + Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS)); + Coordinated coordinated = new Coordinated(timeout); + friendlyCounter.tell(coordinated.coordinate(new Increment(friend))); + coordinated.await(); + + Integer count1 = (Integer) Await.result(friendlyCounter.ask("GetCount", timeout), timeout.duration()); + assertEquals(count1, new Integer(1)); + + Integer count2 = (Integer) Await.result(friend.ask("GetCount", timeout), timeout.duration()); + assertEquals(count2, new Integer(1)); + + system.shutdown(); + } +} diff --git a/akka-docs/java/index.rst b/akka-docs/java/index.rst index dc8ae98537..9357871b39 100644 --- a/akka-docs/java/index.rst +++ b/akka-docs/java/index.rst @@ -18,5 +18,5 @@ Java API remoting serialization agents - extending-akka transactors + extending-akka diff --git a/akka-docs/java/transactors.rst b/akka-docs/java/transactors.rst index 994ad00cb5..6795d8ba8d 100644 --- a/akka-docs/java/transactors.rst +++ b/akka-docs/java/transactors.rst @@ -1,6 +1,149 @@ .. _transactors-java: -Transactors (Java) -================== +#################### + Transactors (Java) +#################### -The Akka Transactors module has not been migrated to Akka 2.0-SNAPSHOT yet. \ No newline at end of file +.. sidebar:: Contents + + .. contents:: :local: + + +Why Transactors? +================ + +Actors are excellent for solving problems where you have many independent +processes that can work in isolation and only interact with other Actors through +message passing. This model fits many problems. But the actor model is +unfortunately a terrible model for implementing truly shared state. E.g. when +you need to have consensus and a stable view of state across many +components. The classic example is the bank account where clients can deposit +and withdraw, in which each operation needs to be atomic. For detailed +discussion on the topic see `this JavaOne presentation +`_. + +STM on the other hand is excellent for problems where you need consensus and a +stable view of the state by providing compositional transactional shared +state. Some of the really nice traits of STM are that transactions compose, and +it raises the abstraction level from lock-based concurrency. + +Akka's Transactors combine Actors and STM to provide the best of the Actor model +(concurrency and asynchronous event-based programming) and STM (compositional +transactional shared state) by providing transactional, compositional, +asynchronous, event-based message flows. + +Generally, the STM is not needed very often when working with Akka. Some +use-cases (that we can think of) are: + +- When you really need composable message flows across many actors updating + their **internal local** state but need them to do that atomically in one big + transaction. Might not be often but when you do need this then you are + screwed without it. + +- When you want to share a datastructure across actors. + + +Actors and STM +============== + +You can combine Actors and STM in several ways. An Actor may use STM internally +so that particular changes are guaranteed to be atomic. Actors may also share +transactional datastructures as the STM provides safe shared state across +threads. + +It's also possible to coordinate transactions across Actors or threads so that +either the transactions in a set all commit successfully or they all fail. This +is the focus of Transactors and the explicit support for coordinated +transactions in this section. + + +Coordinated transactions +======================== + +Akka provides an explicit mechanism for coordinating transactions across +actors. Under the hood it uses a ``CommitBarrier``, similar to a CountDownLatch. + +Here is an example of coordinating two simple counter UntypedActors so that they +both increment together in coordinated transactions. If one of them was to fail +to increment, the other would also fail. + +.. includecode:: code/akka/docs/transactor/Increment.java#class + :language: java + +.. includecode:: code/akka/docs/transactor/CoordinatedCounter.java#class + :language: java + +.. includecode:: code/akka/docs/transactor/TransactorDocTest.java#imports + :language: java + +.. includecode:: code/akka/docs/transactor/TransactorDocTest.java#coordinated-example + :language: java + +To start a new coordinated transaction that you will also participate in, create +a ``Coordinated`` object, passing in a ``Timeout``: + +.. includecode:: code/akka/docs/transactor/TransactorDocTest.java#create-coordinated + :language: java + +To start a coordinated transaction that you won't participate in yourself you +can create a ``Coordinated`` object with a message and send it directly to an +actor. The recipient of the message will be the first member of the coordination +set: + +.. includecode:: code/akka/docs/transactor/TransactorDocTest.java#send-coordinated + :language: java + +To include another actor in the same coordinated transaction that you've created +or received, use the ``coordinate`` method on that object. This will increment +the number of parties involved by one and create a new ``Coordinated`` object to +be sent. + +.. includecode:: code/akka/docs/transactor/TransactorDocTest.java#include-coordinated + :language: java + +To enter the coordinated transaction use the atomic method of the coordinated +object, passing in an ``akka.transactor.Atomically`` object. + +.. includecode:: code/akka/docs/transactor/Coordinator.java#coordinated-atomic + :language: java + +The coordinated transaction will wait for the other transactions before +committing. If any of the coordinated transactions fail then they all fail. + + +UntypedTransactor +================= + +UntypedTransactors are untyped actors that provide a general pattern for +coordinating transactions, using the explicit coordination described above. + +Here's an example of a simple untyped transactor that will join a coordinated +transaction: + +.. includecode:: code/akka/docs/transactor/Counter.java#class + :language: java + +You could send this Counter transactor a ``Coordinated(Increment)`` message. If +you were to send it just an ``Increment`` message it will create its own +``Coordinated`` (but in this particular case wouldn't be coordinating +transactions with any other transactors). + +To coordinate with other transactors override the ``coordinate`` method. The +``coordinate`` method maps a message to a set of ``SendTo`` objects, pairs of +``ActorRef`` and a message. You can use the ``include`` and ``sendTo`` methods +to easily coordinate with other transactors. + +Here's an example of coordinating an increment, using an untyped transactor, +similar to the explicitly coordinated example above. + +.. includecode:: code/akka/docs/transactor/FriendlyCounter.java#class + :language: java + +To execute directly before or after the coordinated transaction, override the +``before`` and ``after`` methods. They do not execute within the transaction. + +To completely bypass coordinated transactions override the ``normally`` +method. Any message matched by ``normally`` will not be matched by the other +methods, and will not be involved in coordinated transactions. In this method +you can implement normal actor behavior, or use the normal STM atomic for local +transactions. diff --git a/akka-docs/scala/code/akka/docs/transactor/TransactorDocSpec.scala b/akka-docs/scala/code/akka/docs/transactor/TransactorDocSpec.scala new file mode 100644 index 0000000000..246ea0a352 --- /dev/null +++ b/akka-docs/scala/code/akka/docs/transactor/TransactorDocSpec.scala @@ -0,0 +1,230 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.docs.transactor + +import akka.actor._ +import akka.transactor._ +import akka.util.duration._ +import akka.util.Timeout +import akka.testkit._ +import scala.concurrent.stm._ + +object CoordinatedExample { + //#coordinated-example + import akka.actor._ + import akka.transactor._ + import scala.concurrent.stm._ + + case class Increment(friend: Option[ActorRef] = None) + case object GetCount + + class Counter extends Actor { + val count = Ref(0) + + def receive = { + case coordinated @ Coordinated(Increment(friend)) ⇒ { + friend foreach (_ ! coordinated(Increment())) + coordinated atomic { implicit t ⇒ + count transform (_ + 1) + } + } + case GetCount ⇒ sender ! count.single.get + } + } + //#coordinated-example +} + +object CoordinatedApi { + case object Message + + class Coordinator extends Actor { + //#receive-coordinated + def receive = { + case coordinated @ Coordinated(Message) ⇒ { + //#coordinated-atomic + coordinated atomic { implicit t ⇒ + // do something in the coordinated transaction ... + } + //#coordinated-atomic + } + } + //#receive-coordinated + } +} + +object CounterExample { + //#counter-example + import akka.transactor._ + import scala.concurrent.stm._ + + case object Increment + + class Counter extends Transactor { + val count = Ref(0) + + def atomically = implicit txn ⇒ { + case Increment ⇒ count transform (_ + 1) + } + } + //#counter-example +} + +object FriendlyCounterExample { + //#friendly-counter-example + import akka.actor._ + import akka.transactor._ + import scala.concurrent.stm._ + + case object Increment + + class FriendlyCounter(friend: ActorRef) extends Transactor { + val count = Ref(0) + + override def coordinate = { + case Increment ⇒ include(friend) + } + + def atomically = implicit txn ⇒ { + case Increment ⇒ count transform (_ + 1) + } + } + //#friendly-counter-example + + class Friend extends Transactor { + val count = Ref(0) + + def atomically = implicit txn ⇒ { + case Increment ⇒ count transform (_ + 1) + } + } +} + +// Only checked for compilation +object TransactorCoordinate { + case object Message + case object SomeMessage + case object SomeOtherMessage + case object OtherMessage + case object Message1 + case object Message2 + + class TestCoordinateInclude(actor1: ActorRef, actor2: ActorRef, actor3: ActorRef) extends Transactor { + //#coordinate-include + override def coordinate = { + case Message ⇒ include(actor1, actor2, actor3) + } + //#coordinate-include + + def atomically = txn ⇒ doNothing + } + + class TestCoordinateSendTo(someActor: ActorRef, actor1: ActorRef, actor2: ActorRef) extends Transactor { + //#coordinate-sendto + override def coordinate = { + case SomeMessage ⇒ sendTo(someActor -> SomeOtherMessage) + case OtherMessage ⇒ sendTo(actor1 -> Message1, actor2 -> Message2) + } + //#coordinate-sendto + + def atomically = txn ⇒ doNothing + } +} + +class TransactorDocSpec extends AkkaSpec { + + "coordinated example" in { + import CoordinatedExample._ + + //#run-coordinated-example + import akka.dispatch.Await + import akka.util.duration._ + import akka.util.Timeout + + val system = ActorSystem("app") + + val counter1 = system.actorOf(Props[Counter], name = "counter1") + val counter2 = system.actorOf(Props[Counter], name = "counter2") + + implicit val timeout = Timeout(5 seconds) + + counter1 ! Coordinated(Increment(Some(counter2))) + + val count = Await.result(counter1 ? GetCount, timeout.duration) + + // count == 1 + //#run-coordinated-example + + count must be === 1 + + system.shutdown() + } + + "coordinated api" in { + import CoordinatedApi._ + + //#implicit-timeout + import akka.util.duration._ + import akka.util.Timeout + + implicit val timeout = Timeout(5 seconds) + //#implicit-timeout + + //#create-coordinated + val coordinated = Coordinated() + //#create-coordinated + + val system = ActorSystem("coordinated") + val actor = system.actorOf(Props[Coordinator], name = "coordinator") + + //#send-coordinated + actor ! Coordinated(Message) + //#send-coordinated + + //#include-coordinated + actor ! coordinated(Message) + //#include-coordinated + + coordinated.await() + + system.shutdown() + } + + "counter transactor" in { + import CounterExample._ + + val system = ActorSystem("transactors") + + lazy val underlyingCounter = new Counter + val counter = system.actorOf(Props(underlyingCounter), name = "counter") + val coordinated = Coordinated()(Timeout(5 seconds)) + counter ! coordinated(Increment) + coordinated.await() + + underlyingCounter.count.single.get must be === 1 + + system.shutdown() + } + + "friendly counter transactor" in { + import FriendlyCounterExample._ + + val system = ActorSystem("transactors") + + lazy val underlyingFriend = new Friend + val friend = system.actorOf(Props(underlyingFriend), name = "friend") + + lazy val underlyingFriendlyCounter = new FriendlyCounter(friend) + val friendlyCounter = system.actorOf(Props(underlyingFriendlyCounter), name = "friendly") + + val coordinated = Coordinated()(Timeout(5 seconds)) + friendlyCounter ! coordinated(Increment) + coordinated.await() + + underlyingFriendlyCounter.count.single.get must be === 1 + underlyingFriend.count.single.get must be === 1 + + system.shutdown() + } +} diff --git a/akka-docs/scala/index.rst b/akka-docs/scala/index.rst index f571e2fec8..e055922403 100644 --- a/akka-docs/scala/index.rst +++ b/akka-docs/scala/index.rst @@ -19,6 +19,6 @@ Scala API serialization fsm agents + transactors testing extending-akka - transactors diff --git a/akka-docs/scala/transactors.rst b/akka-docs/scala/transactors.rst index cdd284ae43..318523a513 100644 --- a/akka-docs/scala/transactors.rst +++ b/akka-docs/scala/transactors.rst @@ -1,6 +1,159 @@ .. _transactors-scala: -Transactors (Scala) -=================== +##################### + Transactors (Scala) +##################### -The Akka Transactors module has not been migrated to Akka 2.0-SNAPSHOT yet. \ No newline at end of file +.. sidebar:: Contents + + .. contents:: :local: + + +Why Transactors? +================ + +Actors are excellent for solving problems where you have many independent +processes that can work in isolation and only interact with other Actors through +message passing. This model fits many problems. But the actor model is +unfortunately a terrible model for implementing truly shared state. E.g. when +you need to have consensus and a stable view of state across many +components. The classic example is the bank account where clients can deposit +and withdraw, in which each operation needs to be atomic. For detailed +discussion on the topic see `this JavaOne presentation +`_. + +STM on the other hand is excellent for problems where you need consensus and a +stable view of the state by providing compositional transactional shared +state. Some of the really nice traits of STM are that transactions compose, and +it raises the abstraction level from lock-based concurrency. + +Akka's Transactors combine Actors and STM to provide the best of the Actor model +(concurrency and asynchronous event-based programming) and STM (compositional +transactional shared state) by providing transactional, compositional, +asynchronous, event-based message flows. + +Generally, the STM is not needed very often when working with Akka. Some +use-cases (that we can think of) are: + +- When you really need composable message flows across many actors updating + their **internal local** state but need them to do that atomically in one big + transaction. Might not be often but when you do need this then you are + screwed without it. + +- When you want to share a datastructure across actors. + + +Actors and STM +============== + +You can combine Actors and STM in several ways. An Actor may use STM internally +so that particular changes are guaranteed to be atomic. Actors may also share +transactional datastructures as the STM provides safe shared state across +threads. + +It's also possible to coordinate transactions across Actors or threads so that +either the transactions in a set all commit successfully or they all fail. This +is the focus of Transactors and the explicit support for coordinated +transactions in this section. + + +Coordinated transactions +======================== + +Akka provides an explicit mechanism for coordinating transactions across +Actors. Under the hood it uses a ``CommitBarrier``, similar to a CountDownLatch. + +Here is an example of coordinating two simple counter Actors so that they both +increment together in coordinated transactions. If one of them was to fail to +increment, the other would also fail. + +.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#coordinated-example + +.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#run-coordinated-example + +Note that creating a ``Coordinated`` object requires a ``Timeout`` to be +specified for the coordinated transaction. This can be done implicitly, by +having an implicit ``Timeout`` in scope, or explicitly, by passing the timeout +when creating a a ``Coordinated`` object. Here's an example of specifying an +implicit timeout: + +.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#implicit-timeout + +To start a new coordinated transaction that you will also participate in, just +create a ``Coordinated`` object (this assumes an implicit timeout): + +.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#create-coordinated + +To start a coordinated transaction that you won't participate in yourself you +can create a ``Coordinated`` object with a message and send it directly to an +actor. The recipient of the message will be the first member of the coordination +set: + +.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#send-coordinated + +To receive a coordinated message in an actor simply match it in a case +statement: + +.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#receive-coordinated + :exclude: coordinated-atomic + +To include another actor in the same coordinated transaction that you've created +or received, use the apply method on that object. This will increment the number +of parties involved by one and create a new ``Coordinated`` object to be sent. + +.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#include-coordinated + +To enter the coordinated transaction use the atomic method of the coordinated +object: + +.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#coordinated-atomic + +The coordinated transaction will wait for the other transactions before +committing. If any of the coordinated transactions fail then they all fail. + + +Transactor +========== + +Transactors are actors that provide a general pattern for coordinating +transactions, using the explicit coordination described above. + +Here's an example of a simple transactor that will join a coordinated +transaction: + +.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#counter-example + +You could send this Counter transactor a ``Coordinated(Increment)`` message. If +you were to send it just an ``Increment`` message it will create its own +``Coordinated`` (but in this particular case wouldn't be coordinating +transactions with any other transactors). + +To coordinate with other transactors override the ``coordinate`` method. The +``coordinate`` method maps a message to a set of ``SendTo`` objects, pairs of +``ActorRef`` and a message. You can use the ``include`` and ``sendTo`` methods +to easily coordinate with other transactors. The ``include`` method will send on +the same message that was received to other transactors. The ``sendTo`` method +allows you to specify both the actor to send to, and the message to send. + +Example of coordinating an increment: + +.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#friendly-counter-example + +Using ``include`` to include more than one transactor: + +.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#coordinate-include + +Using ``sendTo`` to coordinate transactions but pass-on a different message than +the one that was received: + +.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#coordinate-sendto + +To execute directly before or after the coordinated transaction, override the +``before`` and ``after`` methods. These methods also expect partial functions +like the receive method. They do not execute within the transaction. + +To completely bypass coordinated transactions override the ``normally`` +method. Any message matched by ``normally`` will not be matched by the other +methods, and will not be involved in coordinated transactions. In this method +you can implement normal actor behavior, or use the normal STM atomic for local +transactions. diff --git a/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedCounter.java b/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedCounter.java deleted file mode 100644 index 3d76ff37c2..0000000000 --- a/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedCounter.java +++ /dev/null @@ -1,39 +0,0 @@ -package akka.transactor.example; - -import akka.transactor.Coordinated; -import akka.transactor.Atomically; -import akka.actor.Actors; -import akka.actor.UntypedActor; -import akka.stm.Ref; - -public class UntypedCoordinatedCounter extends UntypedActor { - private Ref count = new Ref(0); - - private void increment() { - //System.out.println("incrementing"); - count.set(count.get() + 1); - } - - public void onReceive(Object incoming) throws Exception { - if (incoming instanceof Coordinated) { - Coordinated coordinated = (Coordinated) incoming; - Object message = coordinated.getMessage(); - if (message instanceof Increment) { - Increment increment = (Increment) message; - if (increment.hasFriend()) { - increment.getFriend().tell(coordinated.coordinate(new Increment())); - } - coordinated.atomic(new Atomically() { - public void atomically() { - increment(); - } - }); - } - } else if (incoming instanceof String) { - String message = (String) incoming; - if (message.equals("GetCount")) { - getSender().tell(count.get()); - } - } - } -} diff --git a/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java b/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java deleted file mode 100644 index d5b236694f..0000000000 --- a/akka-stm/src/test/java/akka/transactor/example/UntypedCoordinatedExample.java +++ /dev/null @@ -1,39 +0,0 @@ -package akka.transactor.example; - -import akka.actor.ActorSystem; -import akka.actor.ActorRef; -import akka.actor.Props; -import akka.dispatch.Await; -import akka.dispatch.Future; -import akka.testkit.AkkaSpec; -import akka.transactor.Coordinated; - -import akka.util.Duration; -import java.util.concurrent.TimeUnit; - -public class UntypedCoordinatedExample { - public static void main(String[] args) throws InterruptedException { - - ActorSystem app = ActorSystem.create("UntypedCoordinatedExample", AkkaSpec.testConf()); - - ActorRef counter1 = app.actorOf(new Props().withCreator(UntypedCoordinatedCounter.class)); - ActorRef counter2 = app.actorOf(new Props().withCreator(UntypedCoordinatedCounter.class)); - - counter1.tell(new Coordinated(new Increment(counter2))); - - Thread.sleep(3000); - - long timeout = 5000; - Duration d = Duration.create(timeout, TimeUnit.MILLISECONDS); - - Future future1 = counter1.ask("GetCount", timeout); - Future future2 = counter2.ask("GetCount", timeout); - - int count1 = (Integer) Await.result(future1, d); - System.out.println("counter 1: " + count1); - int count2 = (Integer) Await.result(future2, d); - System.out.println("counter 1: " + count2); - - app.shutdown(); - } -} diff --git a/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java b/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java deleted file mode 100644 index 8a63415c27..0000000000 --- a/akka-stm/src/test/java/akka/transactor/example/UntypedTransactorExample.java +++ /dev/null @@ -1,38 +0,0 @@ -package akka.transactor.example; - -import akka.actor.ActorSystem; -import akka.actor.ActorRef; -import akka.actor.Props; -import akka.dispatch.Await; -import akka.dispatch.Future; -import akka.testkit.AkkaSpec; -import akka.util.Duration; - -import java.util.concurrent.TimeUnit; - -public class UntypedTransactorExample { - public static void main(String[] args) throws InterruptedException { - - ActorSystem app = ActorSystem.create("UntypedTransactorExample", AkkaSpec.testConf()); - - ActorRef counter1 = app.actorOf(new Props().withCreator(UntypedCounter.class)); - ActorRef counter2 = app.actorOf(new Props().withCreator(UntypedCounter.class)); - - counter1.tell(new Increment(counter2)); - - Thread.sleep(3000); - - long timeout = 5000; - Duration d = Duration.create(timeout, TimeUnit.MILLISECONDS); - - Future future1 = counter1.ask("GetCount", timeout); - Future future2 = counter2.ask("GetCount", timeout); - - int count1 = (Integer) Await.result(future1, d); - System.out.println("counter 1: " + count1); - int count2 = (Integer) Await.result(future2, d); - System.out.println("counter 1: " + count2); - - app.shutdown(); - } -}