Improved stm docs

This commit is contained in:
Patrik Nordwall 2011-04-26 16:09:48 +02:00
parent 371ac01a4b
commit e5cee9faa4
2 changed files with 190 additions and 135 deletions

View file

@ -12,9 +12,10 @@ An `STM <http://en.wikipedia.org/wiki/Software_transactional_memory>`_ turns the
* Isolated: changes made by concurrent execution transactions are not visible to each other.
Generally, the STM is not needed that often when working with Akka. Some use-cases (that we can think of) are:
# When you really need composable message flows across many actors updating their **internal local** state but need them to do that atomically in one big transaction. Might not often, but when you do need this then you are screwed without it.
# When you want to share a datastructure across actors.
# When you need to use the persistence modules.
- When you really need composable message flows across many actors updating their **internal local** state but need them to do that atomically in one big transaction. Might not often, but when you do need this then you are screwed without it.
- When you want to share a datastructure across actors.
- When you need to use the persistence modules.
Akkas STM implements the concept in `Clojures <http://clojure.org/>`_ STM view on state in general. Please take the time to read `this excellent document <http://clojure.org/state>`_ and view `this presentation <http://www.infoq.com/presentations/Value-Identity-State-Rich-Hickey>`_ by Rich Hickey (the genius behind Clojure), since it forms the basis of Akkas view on STM and state in general.
@ -163,33 +164,36 @@ Configuring transactions with a ``TransactionFactory``:
}.execute();
The following settings are possible on a TransactionFactory:
* familyName - Family name for transactions. Useful for debugging because the familyName is shown in exceptions, logging and in the future also will be used for profiling.
* readonly - Sets transaction as readonly. Readonly transactions are cheaper and can be used to prevent modification to transactional objects.
* maxRetries - The maximum number of times a transaction will retry.
* timeout - The maximum time a transaction will block for.
* trackReads - Whether all reads should be tracked. Needed for blocking operations. Readtracking makes a transaction more expensive, but makes subsequent reads cheaper and also lowers the chance of a readconflict.
* writeSkew - Whether writeskew is allowed. Disable with care.
* blockingAllowed - Whether explicit retries are allowed.
* interruptible - Whether a blocking transaction can be interrupted if it is blocked.
* speculative - Whether speculative configuration should be enabled.
* quickRelease - Whether locks should be released as quickly as possible (before whole commit).
* propagation - For controlling how nested transactions behave.
* traceLevel - Transaction trace level.
- familyName - Family name for transactions. Useful for debugging because the familyName is shown in exceptions, logging and in the future also will be used for profiling.
- readonly - Sets transaction as readonly. Readonly transactions are cheaper and can be used to prevent modification to transactional objects.
- maxRetries - The maximum number of times a transaction will retry.
- timeout - The maximum time a transaction will block for.
- trackReads - Whether all reads should be tracked. Needed for blocking operations. Readtracking makes a transaction more expensive, but makes subsequent reads cheaper and also lowers the chance of a readconflict.
- writeSkew - Whether writeskew is allowed. Disable with care.
- blockingAllowed - Whether explicit retries are allowed.
- interruptible - Whether a blocking transaction can be interrupted if it is blocked.
- speculative - Whether speculative configuration should be enabled.
- quickRelease - Whether locks should be released as quickly as possible (before whole commit).
- propagation - For controlling how nested transactions behave.
- traceLevel - Transaction trace level.
You can also specify the default values for some of these options in akka.conf. Here they are with their default values:
::
stm {
max-retries = 1000
timeout = 10
write-skew = true
fair = on # Should global transactions be fair or non-fair (non fair yield better performance)
max-retries = 1000
timeout = 5 # Default timeout for blocking transactions and transaction set (in unit defined by
# the time-unit property)
write-skew = true
blocking-allowed = false
interruptible = false
speculative = true
quick-release = true
propagation = requires
trace-level = none
interruptible = false
speculative = true
quick-release = true
propagation = "requires"
trace-level = "none"
}
Transaction lifecycle listeners
@ -232,15 +236,19 @@ Here is an example of using ``retry`` to block until an account has enough money
import akka.stm.*;
public class Transfer {
public Ref<Double> from;
public Ref<Double> to;
public double amount;
private final Ref<Double> from;
private final Ref<Double> to;
private final double amount;
public Transfer(Ref<Double> from, Ref<Double> to, double amount) {
this.from = from;
this.to = to;
this.amount = amount;
}
public Transfer(Ref<Double> from, Ref<Double> to, double amount) {
this.from = from;
this.to = to;
this.amount = amount;
}
public Ref<Double> getFrom() { return from; }
public Ref<Double> getTo() { return to; }
public double getAmount() { return amount; }
}
.. code-block:: java
@ -250,6 +258,7 @@ Here is an example of using ``retry`` to block until an account has enough money
import akka.actor.*;
import akka.util.FiniteDuration;
import java.util.concurrent.TimeUnit;
import akka.event.EventHandler;
public class Transferer extends UntypedActor {
TransactionFactory txFactory = new TransactionFactoryBuilder()
@ -261,16 +270,16 @@ Here is an example of using ``retry`` to block until an account has enough money
public void onReceive(Object message) throws Exception {
if (message instanceof Transfer) {
Transfer transfer = (Transfer) message;
final Ref<Double> from = transfer.from;
final Ref<Double> to = transfer.to;
final double amount = transfer.amount;
final Ref<Double> from = transfer.getFrom();
final Ref<Double> to = transfer.getTo();
final double amount = transfer.getAmount();
new Atomic(txFactory) {
public Object atomically() {
if (from.get() < amount) {
System.out.println("Transferer: not enough money - retrying");
EventHandler.info(this, "not enough money - retrying");
retry();
}
System.out.println("Transferer: transferring");
EventHandler.info(this, "transferring");
from.set(from.get() - amount);
to.set(to.get() + amount);
return null;
@ -285,40 +294,48 @@ Here is an example of using ``retry`` to block until an account has enough money
import akka.stm.*;
import akka.actor.*;
final Ref<Double> account1 = new Ref<Double>(100.0);
final Ref<Double> account2 = new Ref<Double>(100.0);
public class Main {
public static void main(String...args) throws Exception {
final Ref<Double> account1 = new Ref<Double>(100.0);
final Ref<Double> account2 = new Ref<Double>(100.0);
ActorRef transferer = Actors.actorOf(Transferer.class).start();
ActorRef transferer = Actors.actorOf(Transferer.class).start();
transferer.sendOneWay(new Transfer(account1, account2, 500.0));
// Transferer: not enough money - retrying
transferer.sendOneWay(new Transfer(account1, account2, 500.0));
// Transferer: not enough money - retrying
new Atomic() {
public Object atomically() {
return account1.set(account1.get() + 2000);
}
}.execute();
// Transferer: transferring
new Atomic() {
public Object atomically() {
return account1.set(account1.get() + 2000);
}
}.execute();
// Transferer: transferring
Double acc1 = new Atomic<Double>() {
public Double atomically() {
return account1.get();
}
}.execute();
Thread.sleep(1000);
Double acc2 = new Atomic<Double>() {
public Double atomically() {
return account2.get();
}
}.execute();
Double acc1 = new Atomic<Double>() {
public Double atomically() {
return account1.get();
}
}.execute();
System.out.println("Account 1: " + acc1);
// Account 1: 1600.0
Double acc2 = new Atomic<Double>() {
public Double atomically() {
return account2.get();
}
}.execute();
System.out.println("Account 2: " + acc2);
// Account 2: 600.0
transferer.stop();
System.out.println("Account 1: " + acc1);
// Account 1: 1600.0
System.out.println("Account 2: " + acc2);
// Account 2: 600.0
transferer.stop();
}
}
Alternative blocking transactions
---------------------------------
@ -330,24 +347,31 @@ You can also have two alternative blocking transactions, one of which can succee
import akka.stm.*;
public class Branch {
public Ref<Integer> left;
public Ref<Integer> right;
public int amount;
private final Ref<Integer> left;
private final Ref<Integer> right;
private final double amount;
public Branch(Ref<Integer> left, Ref<Integer> right, int amount) {
this.left = left;
this.right = right;
this.amount = amount;
}
public Branch(Ref<Integer> left, Ref<Integer> right, int amount) {
this.left = left;
this.right = right;
this.amount = amount;
}
public Ref<Integer> getLeft() { return left; }
public Ref<Integer> getRight() { return right; }
public double getAmount() { return amount; }
}
.. code-block:: java
import akka.actor.*;
import akka.stm.*;
import static akka.stm.StmUtils.retry;
import akka.actor.*;
import akka.util.FiniteDuration;
import java.util.concurrent.TimeUnit;
import akka.event.EventHandler;
public class Brancher extends UntypedActor {
TransactionFactory txFactory = new TransactionFactoryBuilder()
@ -359,26 +383,26 @@ You can also have two alternative blocking transactions, one of which can succee
public void onReceive(Object message) throws Exception {
if (message instanceof Branch) {
Branch branch = (Branch) message;
final Ref<Integer> left = branch.left;
final Ref<Integer> right = branch.right;
final double amount = branch.amount;
final Ref<Integer> left = branch.getLeft();
final Ref<Integer> right = branch.getRight();
final double amount = branch.getAmount();
new Atomic<Integer>(txFactory) {
public Integer atomically() {
return new EitherOrElse<Integer>() {
public Integer either() {
if (left.get() < amount) {
System.out.println("not enough on left - retrying");
EventHandler.info(this, "not enough on left - retrying");
retry();
}
System.out.println("going left");
EventHandler.info(this, "going left");
return left.get();
}
public Integer orElse() {
if (right.get() < amount) {
System.out.println("not enough on right - retrying");
EventHandler.info(this, "not enough on right - retrying");
retry();
}
System.out.println("going right");
EventHandler.info(this, "going right");
return right.get();
}
}.execute();
@ -393,23 +417,31 @@ You can also have two alternative blocking transactions, one of which can succee
import akka.stm.*;
import akka.actor.*;
final Ref<Integer> left = new Ref<Integer>(100);
final Ref<Integer> right = new Ref<Integer>(100);
public class Main2 {
public static void main(String...args) throws Exception {
final Ref<Integer> left = new Ref<Integer>(100);
final Ref<Integer> right = new Ref<Integer>(100);
ActorRef brancher = Actors.actorOf(Brancher.class).start();
ActorRef brancher = Actors.actorOf(Brancher.class).start();
brancher.sendOneWay(new Branch(left, right, 500));
// not enough on left - retrying
// not enough on right - retrying
brancher.sendOneWay(new Branch(left, right, 500));
// not enough on left - retrying
// not enough on right - retrying
new Atomic() {
public Object atomically() {
return right.set(right.get() + 1000);
}
}.execute();
// going right
Thread.sleep(1000);
brancher.stop();
new Atomic() {
public Object atomically() {
return right.set(right.get() + 1000);
}
}.execute();
// going right
brancher.stop();
}
}
----
@ -417,8 +449,9 @@ Transactional datastructures
============================
Akka provides two datastructures that are managed by the STM.
* TransactionalMap
* TransactionalVector
- TransactionalMap
- TransactionalVector
TransactionalMap and TransactionalVector look like regular mutable datastructures, they even implement the standard Scala 'Map' and 'RandomAccessSeq' interfaces, but they are implemented using persistent datastructures and managed references under the hood. Therefore they are safe to use in a concurrent environment. Underlying TransactionalMap is HashMap, an immutable Map but with near constant time access and modification operations. Similarly TransactionalVector uses a persistent Vector. See the Persistent Datastructures section below for more details.
@ -483,14 +516,15 @@ Persistent datastructures
=========================
Akka's STM should only be used with immutable data. This can be costly if you have large datastructures and are using a naive copy-on-write. In order to make working with immutable datastructures fast enough Scala provides what are called Persistent Datastructures. There are currently two different ones:
* HashMap (`scaladoc <http://www.scala-lang.org/api/current/scala/collection/immutable/HashMap.html>`_)
* Vector (`scaladoc <http://www.scala-lang.org/api/current/scala/collection/immutable/Vector.html>`_)
- HashMap (`scaladoc <http://www.scala-lang.org/api/current/scala/collection/immutable/HashMap.html>`_)
- Vector (`scaladoc <http://www.scala-lang.org/api/current/scala/collection/immutable/Vector.html>`_)
They are immutable and each update creates a completely new version but they are using clever structural sharing in order to make them almost as fast, for both read and update, as regular mutable datastructures.
This illustration is taken from Rich Hickey's presentation. Copyright Rich Hickey 2009.
`<image:http://eclipsesource.com/blogs/wp-content/uploads/2009/12/clojure-trees.png>`_
.. image:: http://eclipsesource.com/blogs/wp-content/uploads/2009/12/clojure-trees.png
----
@ -512,11 +546,13 @@ You can enable JTA support in the 'stm' section in the config:
You also have to configure which JTA provider to use etc in the 'jta' config section:
`<code>`_
jta {
provider = "from-jndi" # Options: "from-jndi" (means that Akka will try to detect a TransactionManager in the JNDI)
# "atomikos" (means that Akka will use the Atomikos based JTA impl in 'akka-jta',
# e.g. you need the akka-jta JARs on classpath).
timeout = 60
}
`<code>`_
::
jta {
provider = "from-jndi" # Options: "from-jndi" (means that Akka will try to detect a TransactionManager in the JNDI)
# "atomikos" (means that Akka will use the Atomikos based JTA impl in 'akka-jta',
# e.g. you need the akka-jta JARs on classpath).
timeout = 60
}
----

View file

@ -187,6 +187,20 @@ A transaction is delimited using ``atomic``.
// ...
}
All changes made to transactional objects are isolated from other changes, all make it or non make it (so failure atomicity) and are consistent. With the AkkaSTM you automatically have the Oracle version of the SERIALIZED isolation level, lower isolation is not possible. To make it fully serialized, set the writeskew property that checks if a writeskew problem is allowed to happen.
Retries
-------
A transaction is automatically retried when it runs into some read or write conflict, until the operation completes, an exception (throwable) is thrown or when there are too many retries. When a read or writeconflict is encountered, the transaction uses a bounded exponential backoff to prevent cause more contention and give other transactions some room to complete.
If you are using non transactional resources in an atomic block, there could be problems because a transaction can be retried. If you are using print statements or logging, it could be that they are called more than once. So you need to be prepared to deal with this. One of the possible solutions is to work with a deferred or compensating task that is executed after the transaction aborts or commits.
Unexpected retries
------------------
It can happen for the first few executions that you get a few failures of execution that lead to unexpected retries, even though there is not any read or writeconflict. The cause of this is that speculative transaction configuration/selection is used. There are transactions optimized for a single transactional object, for 1..n and for n to unlimited. So based on the execution of the transaction, the system learns; it begins with a cheap one and upgrades to more expensive ones. Once it has learned, it will reuse this knowledge. It can be activated/deactivated using the speculative property on the TransactionFactory. In most cases it is best use the default value (enabled) so you get more out of performance.
Coordinated transactions and Transactors
----------------------------------------
@ -222,33 +236,36 @@ Configuring transactions with an **explicit** ``TransactionFactory``:
}
The following settings are possible on a TransactionFactory:
* familyName - Family name for transactions. Useful for debugging.
* readonly - Sets transaction as readonly. Readonly transactions are cheaper.
* maxRetries - The maximum number of times a transaction will retry.
* timeout - The maximum time a transaction will block for.
* trackReads - Whether all reads should be tracked. Needed for blocking operations.
* writeSkew - Whether writeskew is allowed. Disable with care.
* blockingAllowed - Whether explicit retries are allowed.
* interruptible - Whether a blocking transaction can be interrupted.
* speculative - Whether speculative configuration should be enabled.
* quickRelease - Whether locks should be released as quickly as possible (before whole commit).
* propagation - For controlling how nested transactions behave.
* traceLevel - Transaction trace level.
- familyName - Family name for transactions. Useful for debugging.
- readonly - Sets transaction as readonly. Readonly transactions are cheaper.
- maxRetries - The maximum number of times a transaction will retry.
- timeout - The maximum time a transaction will block for.
- trackReads - Whether all reads should be tracked. Needed for blocking operations.
- writeSkew - Whether writeskew is allowed. Disable with care.
- blockingAllowed - Whether explicit retries are allowed.
- interruptible - Whether a blocking transaction can be interrupted.
- speculative - Whether speculative configuration should be enabled.
- quickRelease - Whether locks should be released as quickly as possible (before whole commit).
- propagation - For controlling how nested transactions behave.
- traceLevel - Transaction trace level.
You can also specify the default values for some of these options in akka.conf. Here they are with their default values:
::
stm {
max-retries = 1000
timeout = 10
write-skew = true
fair = on # Should global transactions be fair or non-fair (non fair yield better performance)
max-retries = 1000
timeout = 5 # Default timeout for blocking transactions and transaction set (in unit defined by
# the time-unit property)
write-skew = true
blocking-allowed = false
interruptible = false
speculative = true
quick-release = true
propagation = requires
trace-level = none
interruptible = false
speculative = true
quick-release = true
propagation = "requires"
trace-level = "none"
}
You can also determine at which level a transaction factory is shared or not shared, which affects the way in which the STM can optimise transactions.
@ -323,23 +340,23 @@ Here is an example of using ``retry`` to block until an account has enough money
import akka.stm._
import akka.actor._
import akka.util.duration._
import akka.util.Logging
import akka.event.EventHandler
type Account = Ref[Double]
case class Transfer(from: Account, to: Account, amount: Double)
class Transferer extends Actor with Logging {
class Transferer extends Actor {
implicit val txFactory = TransactionFactory(blockingAllowed = true, trackReads = true, timeout = 60 seconds)
def receive = {
case Transfer(from, to, amount) =>
atomic {
if (from.get < amount) {
log.info("not enough money - retrying")
EventHandler.info(this, "not enough money - retrying")
retry
}
log.info("transferring")
EventHandler.info(this, "transferring")
from alter (_ - amount)
to alter (_ + amount)
}
@ -375,11 +392,11 @@ You can also have two alternative blocking transactions, one of which can succee
import akka.stm._
import akka.actor._
import akka.util.duration._
import akka.util.Logging
import akka.event.EventHandler
case class Branch(left: Ref[Int], right: Ref[Int], amount: Int)
class Brancher extends Actor with Logging {
class Brancher extends Actor {
implicit val txFactory = TransactionFactory(blockingAllowed = true, trackReads = true, timeout = 60 seconds)
def receive = {
@ -387,13 +404,13 @@ You can also have two alternative blocking transactions, one of which can succee
atomic {
either {
if (left.get < amount) {
log.info("not enough on left - retrying")
EventHandler.info(this, "not enough on left - retrying")
retry
}
log.info("going left")
} orElse {
if (right.get < amount) {
log.info("not enough on right - retrying")
EventHandler.info(this, "not enough on right - retrying")
retry
}
log.info("going right")
@ -423,8 +440,9 @@ Transactional datastructures
============================
Akka provides two datastructures that are managed by the STM.
* TransactionalMap
* TransactionalVector
- TransactionalMap
- TransactionalVector
TransactionalMap and TransactionalVector look like regular mutable datastructures, they even implement the standard Scala 'Map' and 'RandomAccessSeq' interfaces, but they are implemented using persistent datastructures and managed references under the hood. Therefore they are safe to use in a concurrent environment. Underlying TransactionalMap is HashMap, an immutable Map but with near constant time access and modification operations. Similarly TransactionalVector uses a persistent Vector. See the Persistent Datastructures section below for more details.
@ -506,7 +524,8 @@ They are immutable and each update creates a completely new version but they are
This illustration is taken from Rich Hickey's presentation. Copyright Rich Hickey 2009.
`<image:http://eclipsesource.com/blogs/wp-content/uploads/2009/12/clojure-trees.png>`_
.. image:: http://eclipsesource.com/blogs/wp-content/uploads/2009/12/clojure-trees.png
----