Merge pull request #179 from jboner/migrate-transactor

Migrate transactor to scala-stm
This commit is contained in:
Peter Vlugter 2011-12-21 15:47:36 -08:00
commit 0eb1ecbbf4
71 changed files with 1345 additions and 3829 deletions

View file

@ -0,0 +1,230 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.docs.transactor
import akka.actor._
import akka.transactor._
import akka.util.duration._
import akka.util.Timeout
import akka.testkit._
import scala.concurrent.stm._
object CoordinatedExample {
//#coordinated-example
import akka.actor._
import akka.transactor._
import scala.concurrent.stm._
case class Increment(friend: Option[ActorRef] = None)
case object GetCount
class Counter extends Actor {
val count = Ref(0)
def receive = {
case coordinated @ Coordinated(Increment(friend)) {
friend foreach (_ ! coordinated(Increment()))
coordinated atomic { implicit t
count transform (_ + 1)
}
}
case GetCount sender ! count.single.get
}
}
//#coordinated-example
}
object CoordinatedApi {
case object Message
class Coordinator extends Actor {
//#receive-coordinated
def receive = {
case coordinated @ Coordinated(Message) {
//#coordinated-atomic
coordinated atomic { implicit t
// do something in the coordinated transaction ...
}
//#coordinated-atomic
}
}
//#receive-coordinated
}
}
object CounterExample {
//#counter-example
import akka.transactor._
import scala.concurrent.stm._
case object Increment
class Counter extends Transactor {
val count = Ref(0)
def atomically = implicit txn {
case Increment count transform (_ + 1)
}
}
//#counter-example
}
object FriendlyCounterExample {
//#friendly-counter-example
import akka.actor._
import akka.transactor._
import scala.concurrent.stm._
case object Increment
class FriendlyCounter(friend: ActorRef) extends Transactor {
val count = Ref(0)
override def coordinate = {
case Increment include(friend)
}
def atomically = implicit txn {
case Increment count transform (_ + 1)
}
}
//#friendly-counter-example
class Friend extends Transactor {
val count = Ref(0)
def atomically = implicit txn {
case Increment count transform (_ + 1)
}
}
}
// Only checked for compilation
object TransactorCoordinate {
case object Message
case object SomeMessage
case object SomeOtherMessage
case object OtherMessage
case object Message1
case object Message2
class TestCoordinateInclude(actor1: ActorRef, actor2: ActorRef, actor3: ActorRef) extends Transactor {
//#coordinate-include
override def coordinate = {
case Message include(actor1, actor2, actor3)
}
//#coordinate-include
def atomically = txn doNothing
}
class TestCoordinateSendTo(someActor: ActorRef, actor1: ActorRef, actor2: ActorRef) extends Transactor {
//#coordinate-sendto
override def coordinate = {
case SomeMessage sendTo(someActor -> SomeOtherMessage)
case OtherMessage sendTo(actor1 -> Message1, actor2 -> Message2)
}
//#coordinate-sendto
def atomically = txn doNothing
}
}
class TransactorDocSpec extends AkkaSpec {
"coordinated example" in {
import CoordinatedExample._
//#run-coordinated-example
import akka.dispatch.Await
import akka.util.duration._
import akka.util.Timeout
val system = ActorSystem("app")
val counter1 = system.actorOf(Props[Counter], name = "counter1")
val counter2 = system.actorOf(Props[Counter], name = "counter2")
implicit val timeout = Timeout(5 seconds)
counter1 ! Coordinated(Increment(Some(counter2)))
val count = Await.result(counter1 ? GetCount, timeout.duration)
// count == 1
//#run-coordinated-example
count must be === 1
system.shutdown()
}
"coordinated api" in {
import CoordinatedApi._
//#implicit-timeout
import akka.util.duration._
import akka.util.Timeout
implicit val timeout = Timeout(5 seconds)
//#implicit-timeout
//#create-coordinated
val coordinated = Coordinated()
//#create-coordinated
val system = ActorSystem("coordinated")
val actor = system.actorOf(Props[Coordinator], name = "coordinator")
//#send-coordinated
actor ! Coordinated(Message)
//#send-coordinated
//#include-coordinated
actor ! coordinated(Message)
//#include-coordinated
coordinated.await()
system.shutdown()
}
"counter transactor" in {
import CounterExample._
val system = ActorSystem("transactors")
lazy val underlyingCounter = new Counter
val counter = system.actorOf(Props(underlyingCounter), name = "counter")
val coordinated = Coordinated()(Timeout(5 seconds))
counter ! coordinated(Increment)
coordinated.await()
underlyingCounter.count.single.get must be === 1
system.shutdown()
}
"friendly counter transactor" in {
import FriendlyCounterExample._
val system = ActorSystem("transactors")
lazy val underlyingFriend = new Friend
val friend = system.actorOf(Props(underlyingFriend), name = "friend")
lazy val underlyingFriendlyCounter = new FriendlyCounter(friend)
val friendlyCounter = system.actorOf(Props(underlyingFriendlyCounter), name = "friendly")
val coordinated = Coordinated()(Timeout(5 seconds))
friendlyCounter ! coordinated(Increment)
coordinated.await()
underlyingFriendlyCounter.count.single.get must be === 1
underlyingFriend.count.single.get must be === 1
system.shutdown()
}
}

View file

@ -18,7 +18,8 @@ Scala API
remoting
serialization
fsm
stm
agents
transactors
testing
extending-akka
transactors

75
akka-docs/scala/stm.rst Normal file
View file

@ -0,0 +1,75 @@
.. _stm-scala:
#######################################
Software Transactional Memory (Scala)
#######################################
Overview of STM
===============
An `STM <http://en.wikipedia.org/wiki/Software_transactional_memory>`_ turns the
Java heap into a transactional data set with begin/commit/rollback
semantics. Very much like a regular database. It implements the first three
letters in `ACID`_; ACI:
* Atomic
* Consistent
* Isolated
.. _ACID: http://en.wikipedia.org/wiki/ACID
Generally, the STM is not needed very often when working with Akka. Some
use-cases (that we can think of) are:
- When you really need composable message flows across many actors updating
their **internal local** state but need them to do that atomically in one big
transaction. Might not be often, but when you do need this then you are
screwed without it.
- When you want to share a datastructure across actors.
The use of STM in Akka is inspired by the concepts and views in `Clojure`_\'s
STM. Please take the time to read `this excellent document`_ about state in
clojure and view `this presentation`_ by Rich Hickey (the genius behind
Clojure).
.. _Clojure: http://clojure.org/
.. _this excellent document: http://clojure.org/state
.. _this presentation: http://www.infoq.com/presentations/Value-Identity-State-Rich-Hickey
Scala STM
=========
The STM supported in Akka is `ScalaSTM`_ which will be soon included in the
Scala standard library.
.. _ScalaSTM: http://nbronson.github.com/scala-stm/
The STM is based on Transactional References (referred to as Refs). Refs are
memory cells, holding an (arbitrary) immutable value, that implement CAS
(Compare-And-Swap) semantics and are managed and enforced by the STM for
coordinated changes across many Refs.
Persistent Datastructures
=========================
Working with immutable collections can sometimes give bad performance due to
extensive copying. Scala provides so-called persistent datastructures which
makes working with immutable collections fast. They are immutable but with
constant time access and modification. They use structural sharing and an insert
or update does not ruin the old structure, hence "persistent". Makes working
with immutable composite types fast. The persistent datastructures currently
consist of a `Map`_ and `Vector`_.
.. _Map: http://www.scala-lang.org/api/current/index.html#scala.collection.immutable.Map
.. _Vector: http://www.scala-lang.org/api/current/index.html#scala.collection.immutable.Vector
Integration with Actors
=======================
In Akka we've also integrated Actors and STM in :ref:`agents-scala` and
:ref:`transactors-scala`.

View file

@ -1,6 +1,159 @@
.. _transactors-scala:
Transactors (Scala)
===================
#####################
Transactors (Scala)
#####################
The Akka Transactors module has not been migrated to Akka 2.0-SNAPSHOT yet.
.. sidebar:: Contents
.. contents:: :local:
Why Transactors?
================
Actors are excellent for solving problems where you have many independent
processes that can work in isolation and only interact with other Actors through
message passing. This model fits many problems. But the actor model is
unfortunately a terrible model for implementing truly shared state. E.g. when
you need to have consensus and a stable view of state across many
components. The classic example is the bank account where clients can deposit
and withdraw, in which each operation needs to be atomic. For detailed
discussion on the topic see `this JavaOne presentation
<http://www.slideshare.net/jboner/state-youre-doing-it-wrong-javaone-2009>`_.
STM on the other hand is excellent for problems where you need consensus and a
stable view of the state by providing compositional transactional shared
state. Some of the really nice traits of STM are that transactions compose, and
it raises the abstraction level from lock-based concurrency.
Akka's Transactors combine Actors and STM to provide the best of the Actor model
(concurrency and asynchronous event-based programming) and STM (compositional
transactional shared state) by providing transactional, compositional,
asynchronous, event-based message flows.
Generally, the STM is not needed very often when working with Akka. Some
use-cases (that we can think of) are:
- When you really need composable message flows across many actors updating
their **internal local** state but need them to do that atomically in one big
transaction. Might not be often but when you do need this then you are
screwed without it.
- When you want to share a datastructure across actors.
Actors and STM
==============
You can combine Actors and STM in several ways. An Actor may use STM internally
so that particular changes are guaranteed to be atomic. Actors may also share
transactional datastructures as the STM provides safe shared state across
threads.
It's also possible to coordinate transactions across Actors or threads so that
either the transactions in a set all commit successfully or they all fail. This
is the focus of Transactors and the explicit support for coordinated
transactions in this section.
Coordinated transactions
========================
Akka provides an explicit mechanism for coordinating transactions across
Actors. Under the hood it uses a ``CommitBarrier``, similar to a CountDownLatch.
Here is an example of coordinating two simple counter Actors so that they both
increment together in coordinated transactions. If one of them was to fail to
increment, the other would also fail.
.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#coordinated-example
.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#run-coordinated-example
Note that creating a ``Coordinated`` object requires a ``Timeout`` to be
specified for the coordinated transaction. This can be done implicitly, by
having an implicit ``Timeout`` in scope, or explicitly, by passing the timeout
when creating a a ``Coordinated`` object. Here's an example of specifying an
implicit timeout:
.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#implicit-timeout
To start a new coordinated transaction that you will also participate in, just
create a ``Coordinated`` object (this assumes an implicit timeout):
.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#create-coordinated
To start a coordinated transaction that you won't participate in yourself you
can create a ``Coordinated`` object with a message and send it directly to an
actor. The recipient of the message will be the first member of the coordination
set:
.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#send-coordinated
To receive a coordinated message in an actor simply match it in a case
statement:
.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#receive-coordinated
:exclude: coordinated-atomic
To include another actor in the same coordinated transaction that you've created
or received, use the apply method on that object. This will increment the number
of parties involved by one and create a new ``Coordinated`` object to be sent.
.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#include-coordinated
To enter the coordinated transaction use the atomic method of the coordinated
object:
.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#coordinated-atomic
The coordinated transaction will wait for the other transactions before
committing. If any of the coordinated transactions fail then they all fail.
Transactor
==========
Transactors are actors that provide a general pattern for coordinating
transactions, using the explicit coordination described above.
Here's an example of a simple transactor that will join a coordinated
transaction:
.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#counter-example
You could send this Counter transactor a ``Coordinated(Increment)`` message. If
you were to send it just an ``Increment`` message it will create its own
``Coordinated`` (but in this particular case wouldn't be coordinating
transactions with any other transactors).
To coordinate with other transactors override the ``coordinate`` method. The
``coordinate`` method maps a message to a set of ``SendTo`` objects, pairs of
``ActorRef`` and a message. You can use the ``include`` and ``sendTo`` methods
to easily coordinate with other transactors. The ``include`` method will send on
the same message that was received to other transactors. The ``sendTo`` method
allows you to specify both the actor to send to, and the message to send.
Example of coordinating an increment:
.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#friendly-counter-example
Using ``include`` to include more than one transactor:
.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#coordinate-include
Using ``sendTo`` to coordinate transactions but pass-on a different message than
the one that was received:
.. includecode:: code/akka/docs/transactor/TransactorDocSpec.scala#coordinate-sendto
To execute directly before or after the coordinated transaction, override the
``before`` and ``after`` methods. These methods also expect partial functions
like the receive method. They do not execute within the transaction.
To completely bypass coordinated transactions override the ``normally``
method. Any message matched by ``normally`` will not be matched by the other
methods, and will not be involved in coordinated transactions. In this method
you can implement normal actor behavior, or use the normal STM atomic for local
transactions.